97 lines
1.6 KiB
Go
97 lines
1.6 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"go-common/app/job/main/member/model/queue"
|
|
"go-common/library/log"
|
|
)
|
|
|
|
// Item is
|
|
type Item struct {
|
|
Mid int64
|
|
Time time.Time
|
|
Action string
|
|
}
|
|
|
|
// Compare is
|
|
func (i *Item) Compare(other queue.Item) int {
|
|
o := asItem(other)
|
|
if o == nil {
|
|
return -1
|
|
}
|
|
if i.Time.Equal(o.Time) {
|
|
return 0
|
|
}
|
|
if i.Time.After(o.Time) {
|
|
return 1
|
|
}
|
|
return -1
|
|
}
|
|
|
|
// HashCode is
|
|
func (i *Item) HashCode() int64 {
|
|
return i.Mid
|
|
}
|
|
|
|
func asItem(in queue.Item) *Item {
|
|
o, ok := in.(*Item)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return o
|
|
}
|
|
|
|
func asItems(in []queue.Item) []*Item {
|
|
out := make([]*Item, 0, len(in))
|
|
for _, i := range in {
|
|
item := asItem(i)
|
|
if item == nil {
|
|
continue
|
|
}
|
|
out = append(out, item)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (s *Service) cachedelayproc(ctx context.Context) {
|
|
fiveSeconds := time.Second * 5
|
|
t := time.NewTicker(fiveSeconds)
|
|
|
|
delayed := func(t time.Time) bool {
|
|
top := asItem(s.cachepq.Peek())
|
|
if top == nil {
|
|
log.Info("Empty cache queue top at: %v", t)
|
|
return false
|
|
}
|
|
if t.Sub(top.Time) < fiveSeconds {
|
|
log.Info("Top item is in five seconds, skip and waiting for next tick")
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
for ti := range t.C {
|
|
if !delayed(ti) {
|
|
continue
|
|
}
|
|
|
|
for {
|
|
qitems, err := s.cachepq.Get(1)
|
|
if err != nil {
|
|
log.Error("Failed to get queue items from cache queue: %+v", err)
|
|
return
|
|
}
|
|
items := asItems(qitems)
|
|
for _, it := range items {
|
|
log.Info("Notify purge cache in delay queue with mid: %d", it.Mid)
|
|
s.dao.NotifyPurgeCache(ctx, it.Mid, it.Action)
|
|
}
|
|
if s.cachepq.Empty() || !delayed(time.Now()) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|