bilibili-backup/app/job/main/member/service/cache_delay.go
2019-04-22 02:59:20 +00:00

97 lines
1.6 KiB
Go

package service
import (
"context"
"time"
"go-common/app/job/main/member/model/queue"
"go-common/library/log"
)
// Item is
type Item struct {
Mid int64
Time time.Time
Action string
}
// Compare is
func (i *Item) Compare(other queue.Item) int {
o := asItem(other)
if o == nil {
return -1
}
if i.Time.Equal(o.Time) {
return 0
}
if i.Time.After(o.Time) {
return 1
}
return -1
}
// HashCode is
func (i *Item) HashCode() int64 {
return i.Mid
}
func asItem(in queue.Item) *Item {
o, ok := in.(*Item)
if !ok {
return nil
}
return o
}
func asItems(in []queue.Item) []*Item {
out := make([]*Item, 0, len(in))
for _, i := range in {
item := asItem(i)
if item == nil {
continue
}
out = append(out, item)
}
return out
}
func (s *Service) cachedelayproc(ctx context.Context) {
fiveSeconds := time.Second * 5
t := time.NewTicker(fiveSeconds)
delayed := func(t time.Time) bool {
top := asItem(s.cachepq.Peek())
if top == nil {
log.Info("Empty cache queue top at: %v", t)
return false
}
if t.Sub(top.Time) < fiveSeconds {
log.Info("Top item is in five seconds, skip and waiting for next tick")
return false
}
return true
}
for ti := range t.C {
if !delayed(ti) {
continue
}
for {
qitems, err := s.cachepq.Get(1)
if err != nil {
log.Error("Failed to get queue items from cache queue: %+v", err)
return
}
items := asItems(qitems)
for _, it := range items {
log.Info("Notify purge cache in delay queue with mid: %d", it.Mid)
s.dao.NotifyPurgeCache(ctx, it.Mid, it.Action)
}
if s.cachepq.Empty() || !delayed(time.Now()) {
break
}
}
}
}