bilibili-backup/app/job/main/member-cache/service/block.go
2019-04-22 02:59:20 +00:00

49 lines
1.1 KiB
Go

package service
import (
"context"
"encoding/json"
"go-common/app/job/main/member-cache/model"
"go-common/library/log"
"go-common/library/queue/databus"
"github.com/pkg/errors"
)
func (s *Service) handleBlockBinLog(ctx context.Context, msg *databus.Message) error {
defer func() {
if err := msg.Commit(); err != nil {
log.Error("Failed to commit message: %s: %+v", BeautifyMessage(msg), err)
return
}
}()
mu := &model.Binlog{}
if err := json.Unmarshal(msg.Value, mu); err != nil {
return errors.WithStack(err)
}
mmid := &model.NeastMid{}
bs := mu.New
if len(bs) <= 0 {
bs = mu.Old
}
if err := json.Unmarshal(bs, mmid); err != nil {
return errors.WithStack(err)
}
defer s.NotifyPurgeCache(ctx, mmid.Mid, model.ActBlockUser)
return s.dao.DeleteUserBlockCache(ctx, mmid.Mid)
}
func (s *Service) blockBinLogproc(ctx context.Context) {
for msg := range s.blockBinLog.Messages() {
if err := s.handleBlockBinLog(ctx, msg); err != nil {
log.Error("Failed to handle block binlog: %s: %+v", BeautifyMessage(msg), err)
continue
}
log.Info("Succeed to handle block binlog: %s", BeautifyMessage(msg))
}
}