730 lines
20 KiB
Go
730 lines
20 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"runtime"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
actrpc "go-common/app/interface/main/activity/rpc/client"
|
|
artmdl "go-common/app/interface/openplatform/article/model"
|
|
artrpc "go-common/app/interface/openplatform/article/rpc/client"
|
|
"go-common/app/job/main/activity/conf"
|
|
"go-common/app/job/main/activity/dao/bnj"
|
|
"go-common/app/job/main/activity/dao/dm"
|
|
"go-common/app/job/main/activity/dao/kfc"
|
|
"go-common/app/job/main/activity/dao/like"
|
|
kfcmdl "go-common/app/job/main/activity/model/kfc"
|
|
l "go-common/app/job/main/activity/model/like"
|
|
"go-common/app/job/main/activity/model/match"
|
|
"go-common/app/service/main/account/api"
|
|
arcapi "go-common/app/service/main/archive/api"
|
|
arcrpc "go-common/app/service/main/archive/api/gorpc"
|
|
comarcmdl "go-common/app/service/main/archive/model/archive"
|
|
"go-common/app/service/main/coin/api/gorpc"
|
|
"go-common/library/log"
|
|
"go-common/library/queue/databus"
|
|
|
|
"github.com/robfig/cron"
|
|
)
|
|
|
|
const (
|
|
//_startVoteP = 2
|
|
//_startVote = 3
|
|
//_endingVote = 4
|
|
//_endVote = 5
|
|
//_goOn = 6
|
|
//_next = 7
|
|
_matchObjTable = "act_matchs_object"
|
|
_subjectTable = "act_subject"
|
|
_likesTable = "likes"
|
|
_likeContentTable = "like_content"
|
|
_likeActionTable = "like_action"
|
|
//_vipActOrderTable = "vip_order_activity_record"
|
|
_objectPieceSize = 100
|
|
_retryTimes = 3
|
|
_typeArc = "archive"
|
|
_typeArt = "article"
|
|
_sharding = 10
|
|
)
|
|
|
|
// Service service
|
|
type Service struct {
|
|
c *conf.Config
|
|
dao *like.Dao
|
|
bnj *bnj.Dao
|
|
dm *dm.Dao
|
|
kfcDao *kfc.Dao
|
|
// waiter
|
|
waiter sync.WaitGroup
|
|
closed bool
|
|
// cache: type, upper
|
|
// arc rpc
|
|
arcRPC *arcrpc.Service2
|
|
coinRPC *coin.Service
|
|
actRPC *actrpc.Service
|
|
articleRPC *artrpc.Service
|
|
//grpc
|
|
accClient api.AccountClient
|
|
// databus
|
|
actSub *databus.Databus
|
|
bnjSub *databus.Databus
|
|
// vip binlog databus
|
|
//vipSub *databus.Databus
|
|
kfcSub *databus.Databus
|
|
kfcActionCh []chan *kfcmdl.CouponMsg
|
|
kfcShare int
|
|
subActionCh []chan *l.Action
|
|
actionSM []map[int64]*l.LastTmStat
|
|
// bnj
|
|
bnjMaxSecond int64
|
|
bnjLessSecond int64
|
|
bnjTimeFinish int64
|
|
bnjMsgFlagMap map[int]int64
|
|
bnjMsgFlagMu sync.Mutex
|
|
bnjWxMsgFlagMap map[int]int64
|
|
bnjWxMsgFlagMu sync.Mutex
|
|
// cron
|
|
cron *cron.Cron
|
|
}
|
|
|
|
// New is archive service implementation.
|
|
func New(c *conf.Config) (s *Service) {
|
|
s = &Service{
|
|
c: c,
|
|
dao: like.New(c),
|
|
dm: dm.New(c),
|
|
bnj: bnj.New(c),
|
|
kfcDao: kfc.New(c),
|
|
arcRPC: arcrpc.New2(c.ArchiveRPC),
|
|
articleRPC: artrpc.New(c.ArticleRPC),
|
|
coinRPC: coin.New(c.CoinRPC),
|
|
actRPC: actrpc.New(c.ActRPC),
|
|
actSub: databus.New(c.ActSub),
|
|
bnjSub: databus.New(c.BnjSub),
|
|
//vipSub: databus.New(c.VipSub),
|
|
kfcSub: databus.New(c.KfcSub),
|
|
cron: cron.New(),
|
|
}
|
|
var err error
|
|
if s.accClient, err = api.NewClient(c.AccClient); err != nil {
|
|
panic(err)
|
|
}
|
|
if s.c.Bnj2019.MsgSpec != "" {
|
|
if err = s.cron.AddFunc(s.c.Bnj2019.MsgSpec, s.cronInformationMessage); err != nil {
|
|
panic(err)
|
|
}
|
|
log.Info("cronInformationMessage init")
|
|
s.cron.Start()
|
|
}
|
|
//time.Sleep(2 * time.Second)
|
|
//subject, err := s.sub(context.Background(), c.Rule.BroadcastSid)
|
|
//if err != nil {
|
|
// log.Error("error(%v)", err)
|
|
// return
|
|
//}
|
|
//log.Info("start-subject")
|
|
//log.Info("subject(%v)", subject)
|
|
//log.Info("end-subject")
|
|
//if subject != nil {
|
|
// go s.genesis(subject)
|
|
//}
|
|
s.bnjMsgFlagMap = make(map[int]int64, len(bnjSteps))
|
|
s.bnjWxMsgFlagMap = make(map[int]int64, len(bnjSteps))
|
|
for _, step := range bnjSteps {
|
|
s.bnjMsgFlagMap[step] = 0
|
|
s.bnjWxMsgFlagMap[step] = 0
|
|
}
|
|
for i := 0; i < _sharding; i++ {
|
|
s.subActionCh = append(s.subActionCh, make(chan *l.Action, 10240))
|
|
s.actionSM = append(s.actionSM, map[int64]*l.LastTmStat{})
|
|
s.waiter.Add(1)
|
|
go s.actionDealProc(i)
|
|
}
|
|
//s.waiter.Add(1)
|
|
//go s.vipCanal()
|
|
s.waiter.Add(1)
|
|
go s.consumeCanal()
|
|
go s.subjectStat(s.c.Rule.ArcObjStatSid, _typeArc)
|
|
go s.subjectStat(s.c.Rule.ArtObjStatSid, _typeArt)
|
|
go s.kingStoryTotalStat(s.c.Rule.KingStorySid)
|
|
go s.subsRankproc()
|
|
go s.initBnjSecond()
|
|
if runtime.NumCPU() <= 4 {
|
|
s.kfcShare = 4
|
|
} else if runtime.NumCPU() > 32 {
|
|
s.kfcShare = 32
|
|
} else {
|
|
s.kfcShare = runtime.NumCPU()
|
|
}
|
|
for j := 0; j < s.kfcShare; j++ {
|
|
s.kfcActionCh = append(s.kfcActionCh, make(chan *kfcmdl.CouponMsg, 10240))
|
|
s.waiter.Add(1)
|
|
go s.kfcActionDeal(j)
|
|
}
|
|
s.waiter.Add(1)
|
|
go s.kfcCanal()
|
|
return s
|
|
}
|
|
|
|
func (s *Service) likeArc(c context.Context, sub *l.Subject) (res *l.Subject, err error) {
|
|
if sub != nil {
|
|
if sub.ID == 0 {
|
|
res = nil
|
|
} else {
|
|
res = sub
|
|
var (
|
|
ok bool
|
|
arcs map[int64]*arcapi.Arc
|
|
aids []int64
|
|
)
|
|
for _, l := range res.List {
|
|
aids = append(aids, l.Wid)
|
|
}
|
|
argAids := &comarcmdl.ArgAids2{
|
|
Aids: aids,
|
|
}
|
|
if arcs, err = s.arcRPC.Archives3(c, argAids); err != nil {
|
|
log.Error("s.arcRPC.Archives(arcAids:(%v), arcs), err(%v)", aids, err)
|
|
return
|
|
}
|
|
for _, l := range res.List {
|
|
if l.Archive, ok = arcs[l.Wid]; !ok {
|
|
log.Info("s.arcs.wid:(%d), (%v)", l.Wid, ok)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
//func (s *Service) genesis(l *l.Subject) {
|
|
// var (
|
|
// index int
|
|
// nowTime, stage, yes, no, next int64
|
|
// err error
|
|
// c = context.Background()
|
|
// )
|
|
// log.Info("st")
|
|
// for {
|
|
// if time.Now().Unix() >= l.Stime.Time().Unix() {
|
|
// break
|
|
// }
|
|
// time.Sleep(time.Second)
|
|
// }
|
|
// lstime := map[string]interface{}{
|
|
// "aid": 0,
|
|
// "time": 0,
|
|
// "index": 0,
|
|
// "stage": 0,
|
|
// }
|
|
// go s.inLtime(lstime, l.ID)
|
|
// for i, a := range l.List {
|
|
// arg1 := &comarcmdl.ArgAid2{Aid: a.Archive.Aid}
|
|
// arc, errRPC := s.arcRPC.Archive3(c, arg1)
|
|
// if errRPC != nil {
|
|
// log.Error("act-job s.arcRPC.Archive3(%v) error(%v)", arg1, errRPC)
|
|
// errRPC = nil
|
|
// continue
|
|
// }
|
|
// if arc.State < 0 && arc.State != -6 {
|
|
// log.Error("act-job s.arcRPC.Archive3(%v) stat err", arg1)
|
|
// }
|
|
// index = i
|
|
// nowTime = 0
|
|
// stage = 0
|
|
// next = 0
|
|
// time.Sleep(time.Duration(l.Ltime) * time.Second)
|
|
// log.Info("aid sleep")
|
|
// aidTime := time.Now().Unix()
|
|
// ltime := map[string]interface{}{
|
|
// "aid": a.Archive.Aid,
|
|
// "time": aidTime,
|
|
// "index": i,
|
|
// "stage": stage,
|
|
// "title": arc.Title,
|
|
// "author": arc.Author.Name,
|
|
// "tname": arc.TypeName,
|
|
// }
|
|
// go s.inLtime(ltime, l.ID)
|
|
// for {
|
|
// fmt.Println(stage)
|
|
// log.Info("s.stage{%d}", stage)
|
|
// //演出开始
|
|
// ltime := map[string]interface{}{
|
|
// "aid": a.Archive.Aid,
|
|
// "time": aidTime,
|
|
// "index": i,
|
|
// "stage": stage,
|
|
// "title": arc.Title,
|
|
// "author": arc.Author.Name,
|
|
// "tname": arc.TypeName,
|
|
// }
|
|
// go s.inLtime(ltime, l.ID)
|
|
// tp := l.Interval - l.Ltime
|
|
// nowTime += tp
|
|
// time.Sleep(time.Duration(tp) * time.Second)
|
|
// //预备投票
|
|
// sdm := &dmm.ActDM{Act: _startVoteP, Aid: a.Archive.Aid, Next: next, Yes: 0, No: 0, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
|
|
// go s.brodcast(sdm)
|
|
// log.Info("act:1")
|
|
// go s.dao.CreateSelection(c, a.Archive.Aid, stage)
|
|
// nowTime += l.Ltime
|
|
// time.Sleep(time.Duration(l.Ltime) * time.Second)
|
|
// //投票开始
|
|
// interval := &dmm.ActDM{Act: _startVote, Aid: a.Archive.Aid, Next: next, Yes: 0, No: 0, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
|
|
// go s.brodcast(interval)
|
|
// log.Info("act:2")
|
|
// tl := l.Tlimit - l.Ltime
|
|
// nowTime += tl
|
|
// time.Sleep(time.Duration(tl) * time.Second)
|
|
// //投票预结束
|
|
// intervalP := &dmm.ActDM{Act: _endingVote, Aid: a.Archive.Aid, Next: next, Yes: 0, No: 0, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
|
|
// log.Info("act:3")
|
|
// go s.brodcast(intervalP)
|
|
// nowTime += l.Ltime
|
|
// time.Sleep(time.Duration(l.Ltime) * time.Second)
|
|
// //投票结果
|
|
// if yes, no, err = s.dao.Selection(c, a.Archive.Aid, stage); err != nil {
|
|
// log.Error("s.dao.Selection() error(%v)", err)
|
|
// return
|
|
// }
|
|
// goNext := true
|
|
// if yes != 0 || no != 0 {
|
|
// goNext = (float64(no)/float64(yes+no)*100 > 40)
|
|
// }
|
|
// if goNext {
|
|
// next = 1
|
|
// } else {
|
|
// next = 0
|
|
// }
|
|
// intervalEnd := &dmm.ActDM{Act: _endVote, Aid: a.Archive.Aid, Next: next, Yes: yes, No: no, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
|
|
// go s.brodcast(intervalEnd)
|
|
// log.Info("act:4")
|
|
// nowTime += l.Ltime
|
|
// time.Sleep(time.Duration(l.Ltime) * time.Second)
|
|
// go s.inOnlinelog(c, l.ID, a.Archive.Aid, stage, yes, no)
|
|
// if goNext {
|
|
// tlimit := &dmm.ActDM{Act: _next, Aid: a.Archive.Aid, Next: next, Yes: yes, No: no, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
|
|
// go s.brodcast(tlimit)
|
|
// log.Info("act:6")
|
|
// nowTime += l.Ltime
|
|
// ldtime := map[string]interface{}{
|
|
// "aid": 0,
|
|
// "time": time.Now().Unix() + l.Ltime,
|
|
// "index": i + 1,
|
|
// "stage": 0,
|
|
// "title": arc.Title,
|
|
// "author": arc.Author.Name,
|
|
// "tname": arc.TypeName,
|
|
// }
|
|
// go s.inLtime(ldtime, l.ID)
|
|
// time.Sleep(time.Duration(l.Ltime) * time.Second)
|
|
// break
|
|
// }
|
|
// tlimit := &dmm.ActDM{Act: _goOn, Aid: a.Archive.Aid, Next: next, Yes: yes, No: no, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
|
|
// log.Info("bro:%v", tlimit)
|
|
// go s.brodcast(tlimit)
|
|
// log.Info("act:5")
|
|
// //投票结果判断
|
|
// stage++
|
|
// if a.Archive.Duration-nowTime < 60 {
|
|
// go s.inOnlinelog(c, l.ID, a.Archive.Aid, 100, 0, 0)
|
|
// time.Sleep(time.Duration(a.Archive.Duration-nowTime) * time.Second)
|
|
// break
|
|
// }
|
|
// }
|
|
// }
|
|
// ltime := map[string]interface{}{
|
|
// "aid": 0,
|
|
// "time": time.Now().Unix(),
|
|
// "index": index + 1,
|
|
// "stage": 0,
|
|
// "title": "",
|
|
// "author": "",
|
|
// "tname": "",
|
|
// }
|
|
// go s.inLtime(ltime, l.ID)
|
|
// log.Info("end")
|
|
//}
|
|
//
|
|
//func (s *Service) inOnlinelog(c context.Context, sid, aid, stage, yes, no int64) {
|
|
// if row, err := s.dao.InOnlinelog(c, sid, aid, stage, yes, no); err != nil {
|
|
// log.Error("s.dao.inOnlinelog, err(%v) row(%v)", err, row)
|
|
// }
|
|
//}
|
|
//
|
|
//func (s *Service) inLtime(lt map[string]interface{}, sid int64) {
|
|
// var v, err = json.Marshal(lt)
|
|
// if err != nil {
|
|
// log.Error("s.genesis.inLtime.json.Marshal(dm:(%v)), err(%v)", v, err)
|
|
// return
|
|
// }
|
|
// s.dao.RbSet(context.Background(), "ltime:"+strconv.FormatInt(sid, 10), v)
|
|
//}
|
|
//
|
|
//func (s *Service) brodcast(d *dmm.ActDM) {
|
|
// var ds, err = json.Marshal(d)
|
|
// if err != nil {
|
|
// log.Error("s.genesis.json.Marshal(dm:(%v)), err(%v)", d, err)
|
|
// return
|
|
// }
|
|
// var m = &dmm.Broadcast{
|
|
// RoomID: s.c.Rule.BroadcastCid,
|
|
// CMD: dmm.BroadcastCMDACT,
|
|
// Info: ds,
|
|
// }
|
|
// s.dm.Broadcast(context.Background(), m)
|
|
//}
|
|
//
|
|
//func (s *Service) sub(c context.Context, sid int64) (res *l.Subject, err error) {
|
|
// var (
|
|
// eg errgroup.Group
|
|
// ls []*l.Like
|
|
// )
|
|
// eg.Go(func() (err error) {
|
|
// res, err = s.dao.Subject(c, sid)
|
|
// return
|
|
// })
|
|
// eg.Go(func() (err error) {
|
|
// ls, err = s.dao.Like(c, sid)
|
|
// return
|
|
// })
|
|
// if err = eg.Wait(); err != nil {
|
|
// log.Error("eg.Wait error(%v)", err)
|
|
// return
|
|
// }
|
|
// if res != nil {
|
|
// res.List = ls
|
|
// }
|
|
// if res, err = s.likeArc(c, res); err != nil {
|
|
// return
|
|
// }
|
|
// return
|
|
//}
|
|
|
|
//func (s *Service) vipCanal() {
|
|
// defer s.waiter.Done()
|
|
// var c = context.Background()
|
|
// for {
|
|
// msg, ok := <-s.vipSub.Messages()
|
|
// if !ok {
|
|
// log.Info("databus:activity-job vip binlog consumer exit!")
|
|
// return
|
|
// }
|
|
// msg.Commit()
|
|
// m := &match.Message{}
|
|
// if err := json.Unmarshal(msg.Value, m); err != nil {
|
|
// log.Error("json.Unmarshal(%s) error(%+v)", msg.Value, err)
|
|
// continue
|
|
// }
|
|
// switch m.Table {
|
|
// case _vipActOrderTable:
|
|
// if m.Action == match.ActInsert {
|
|
// s.addElemeLottery(c, m.New)
|
|
// }
|
|
// }
|
|
// log.Info("vipCanal key:%s partition:%d offset:%d table:%s", msg.Key, msg.Partition, msg.Offset, m.Table)
|
|
// }
|
|
//}
|
|
|
|
func (s *Service) consumeCanal() {
|
|
defer s.waiter.Done()
|
|
var c = context.Background()
|
|
for {
|
|
msg, ok := <-s.actSub.Messages()
|
|
if !ok {
|
|
log.Info("databus: activity-job binlog consumer exit!")
|
|
return
|
|
}
|
|
msg.Commit()
|
|
m := &match.Message{}
|
|
if err := json.Unmarshal(msg.Value, m); err != nil {
|
|
log.Error("json.Unmarshal(%s) error(%+v)", msg.Value, err)
|
|
continue
|
|
}
|
|
switch m.Table {
|
|
case _matchObjTable:
|
|
if m.Action == match.ActUpdate {
|
|
s.upMatchUser(c, m.New, m.Old)
|
|
}
|
|
case _subjectTable:
|
|
if m.Action == match.ActInsert || m.Action == match.ActUpdate {
|
|
s.upSubject(c, m.New)
|
|
} else if m.Action == match.ActDelete {
|
|
s.upSubject(c, m.Old)
|
|
}
|
|
case _likesTable:
|
|
if m.Action == match.ActInsert {
|
|
s.AddLike(c, m.New)
|
|
} else if m.Action == match.ActUpdate {
|
|
s.UpLike(c, m.New, m.Old)
|
|
} else if m.Action == match.ActDelete {
|
|
s.DelLike(c, m.Old)
|
|
}
|
|
case _likeContentTable:
|
|
if m.Action == match.ActDelete {
|
|
s.upLikeContent(c, m.Old)
|
|
} else {
|
|
s.upLikeContent(c, m.New)
|
|
}
|
|
case _likeActionTable:
|
|
if m.Action == match.ActInsert {
|
|
s.actionProc(c, m.New)
|
|
}
|
|
}
|
|
log.Info("consumeCanal key:%s partition:%d offset:%d table:%s", msg.Key, msg.Partition, msg.Offset, m.Table)
|
|
}
|
|
}
|
|
|
|
func (s *Service) kfcCanal() {
|
|
defer s.waiter.Done()
|
|
for {
|
|
if s.closed {
|
|
return
|
|
}
|
|
msg, ok := <-s.kfcSub.Messages()
|
|
if !ok {
|
|
log.Info("databus: activity-job binlog consumer exit!")
|
|
return
|
|
}
|
|
msg.Commit()
|
|
m := &kfcmdl.CouponMsg{}
|
|
if err := json.Unmarshal(msg.Value, m); err != nil {
|
|
log.Error("kfcCanal:json.Unmarshal(%s) error(%+v)", msg.Value, err)
|
|
continue
|
|
}
|
|
j := m.CouponID % int64(s.kfcShare)
|
|
select {
|
|
case s.kfcActionCh[j] <- m:
|
|
default:
|
|
log.Info("kfcCanal cache full (%d)", j)
|
|
}
|
|
log.Info("kfcCanal key:%s partition:%d offset:%d value %s goroutine(%d) all(%d)", msg.Key, msg.Partition, msg.Offset, msg.Value, j, s.kfcShare)
|
|
}
|
|
}
|
|
|
|
func (s *Service) subjectStat(sid int64, typ string) {
|
|
var err error
|
|
for {
|
|
if s.closed {
|
|
return
|
|
}
|
|
var (
|
|
statLike int64
|
|
likeCnt int
|
|
likes []*l.Like
|
|
)
|
|
if sid <= 0 {
|
|
log.Warn("conf sid == 0 typ(%s)", typ)
|
|
time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
|
|
continue
|
|
}
|
|
if likeCnt, err = s.dao.LikeCnt(context.Background(), sid); err != nil {
|
|
log.Error("s.dao.LikeCnt(sid:%d) error(%v)", sid, err)
|
|
time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
|
|
continue
|
|
}
|
|
if likeCnt == 0 {
|
|
log.Warn("s.dao.LikeCnt(sid:%d) likeCnt == 0", sid)
|
|
time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
|
|
continue
|
|
}
|
|
for i := 0; i < likeCnt; i += _objectPieceSize {
|
|
if likes, err = s.likeList(context.Background(), sid, i, _objectPieceSize, _retryTimes); err != nil {
|
|
log.Error("objectStatproc s.likeList(%d,%d,%d) error(%+v)", sid, i, _objectPieceSize, err)
|
|
time.Sleep(100 * time.Millisecond)
|
|
continue
|
|
} else {
|
|
var aids []int64
|
|
for _, v := range likes {
|
|
if v.Wid > 0 {
|
|
aids = append(aids, v.Wid)
|
|
}
|
|
}
|
|
switch typ {
|
|
case _typeArc:
|
|
var arcs map[int64]*arcapi.Arc
|
|
if arcs, err = s.arcs(context.Background(), aids, _retryTimes); err != nil {
|
|
log.Error("objectStatproc s.arcs(%v) error(%v)", aids, err)
|
|
time.Sleep(100 * time.Millisecond)
|
|
continue
|
|
} else {
|
|
for _, aid := range aids {
|
|
if arc, ok := arcs[aid]; ok && arc.IsNormal() {
|
|
statLike += int64(arc.Stat.Like)
|
|
} else {
|
|
likeCnt--
|
|
}
|
|
}
|
|
}
|
|
case _typeArt:
|
|
var arts map[int64]*artmdl.Meta
|
|
if arts, err = s.arts(context.Background(), aids, _retryTimes); err != nil {
|
|
log.Error("objectStatproc s.arcs(%v) error(%v)", aids, err)
|
|
time.Sleep(100 * time.Microsecond)
|
|
continue
|
|
} else {
|
|
for _, aid := range aids {
|
|
if art, ok := arts[aid]; ok && art.IsNormal() {
|
|
statLike += art.Stats.Like
|
|
} else {
|
|
likeCnt--
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if err = s.setSubjectStat(context.Background(), sid, &l.SubjectTotalStat{SumLike: statLike}, likeCnt, _retryTimes); err != nil {
|
|
log.Error("objectStatproc s.setObjectStat(%d,%d) error(%+v)", sid, statLike, err)
|
|
time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
|
|
continue
|
|
}
|
|
time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
|
|
}
|
|
}
|
|
|
|
func (s *Service) likeList(c context.Context, sid int64, offset, limit, retryCnt int) (list []*l.Like, err error) {
|
|
for i := 0; i < retryCnt; i++ {
|
|
if list, err = s.dao.LikeList(c, sid, offset, limit); err == nil {
|
|
break
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *Service) webDataList(c context.Context, vid int64, offset, limit, retryCnt int) (list []*l.WebData, err error) {
|
|
for i := 0; i < retryCnt; i++ {
|
|
if list, err = s.dao.WebDataList(c, vid, offset, limit); err == nil {
|
|
break
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *Service) arts(c context.Context, aids []int64, retryCnt int) (arcs map[int64]*artmdl.Meta, err error) {
|
|
for i := 0; i < retryCnt; i++ {
|
|
if arcs, err = s.articleRPC.ArticleMetas(c, &artmdl.ArgAids{Aids: aids}); err == nil {
|
|
break
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *Service) kingStoryTotalStat(vid int64) {
|
|
var err error
|
|
for {
|
|
if s.closed {
|
|
return
|
|
}
|
|
var (
|
|
statView, statLike, statFav, StatCoin int64
|
|
likeCnt int
|
|
likes []*l.WebData
|
|
)
|
|
if vid <= 0 {
|
|
log.Warn("conf vid == 0")
|
|
time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
|
|
continue
|
|
}
|
|
if likeCnt, err = s.dao.WebDataCnt(context.Background(), vid); err != nil {
|
|
log.Error("kingStoryTotalStat s.dao.WebDataCnt(sid:%d) error(%v)", vid, err)
|
|
time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
|
|
continue
|
|
}
|
|
if likeCnt == 0 {
|
|
log.Warn("kingStoryTotalStat s.dao.LikeCnt(sid:%d) likeCnt == 0", vid)
|
|
time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
|
|
continue
|
|
}
|
|
for i := 0; i < likeCnt; i += _objectPieceSize {
|
|
if likes, err = s.webDataList(context.Background(), vid, i, _objectPieceSize, _retryTimes); err != nil {
|
|
log.Error("kingStoryTotalStat s.webDataList(%d,%d,%d) error(%+v)", vid, i, _objectPieceSize, err)
|
|
time.Sleep(100 * time.Millisecond)
|
|
continue
|
|
} else {
|
|
var (
|
|
aids []int64
|
|
aidStruct = new(struct {
|
|
Aid string `json:"AID"`
|
|
})
|
|
)
|
|
for _, v := range likes {
|
|
if v.Data != "" {
|
|
if e := json.Unmarshal([]byte(v.Data), &aidStruct); e != nil {
|
|
log.Warn("kingStoryTotalStat json.Unmarshal(%s) error(%v)", v.Data, e)
|
|
continue
|
|
}
|
|
if aid, e := strconv.ParseInt(aidStruct.Aid, 10, 64); e != nil {
|
|
log.Warn("kingStoryTotalStat strconv.ParseInt(%s) error(%v)", aidStruct, e)
|
|
continue
|
|
} else {
|
|
aids = append(aids, aid)
|
|
}
|
|
}
|
|
}
|
|
var arcs map[int64]*arcapi.Arc
|
|
if arcs, err = s.arcs(context.Background(), aids, _retryTimes); err != nil {
|
|
log.Error("kingStoryTotalStat s.arcs(%v) error(%v)", aids, err)
|
|
time.Sleep(100 * time.Millisecond)
|
|
continue
|
|
} else {
|
|
for _, aid := range aids {
|
|
if arc, ok := arcs[aid]; ok && arc.IsNormal() {
|
|
statView += int64(arc.Stat.View)
|
|
statLike += int64(arc.Stat.Like)
|
|
statFav += int64(arc.Stat.Fav)
|
|
StatCoin += int64(arc.Stat.Coin)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if err = s.setSubjectStat(context.Background(), vid, &l.SubjectTotalStat{SumView: statView, SumLike: statLike, SumFav: statFav, SumCoin: StatCoin}, likeCnt, _retryTimes); err != nil {
|
|
log.Error("kingStoryTotalStat s.setObjectStat(%d,%d) error(%+v)", vid, statLike, err)
|
|
time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
|
|
continue
|
|
}
|
|
time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
|
|
}
|
|
}
|
|
|
|
func (s *Service) setSubjectStat(c context.Context, sid int64, stat *l.SubjectTotalStat, count, retryCnt int) (err error) {
|
|
for i := 0; i < retryCnt; i++ {
|
|
if err = s.dao.SetObjectStat(c, sid, stat, count); err == nil {
|
|
break
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Ping reports the heath of services.
|
|
func (s *Service) Ping(c context.Context) (err error) {
|
|
return s.dao.Ping(c)
|
|
}
|
|
|
|
// Close kafaka consumer close.
|
|
func (s *Service) Close() (err error) {
|
|
defer s.waiter.Wait()
|
|
s.closed = true
|
|
if s.bnjTimeFinish == 0 {
|
|
s.bnj.AddCacheLessTime(context.Background(), s.bnjLessSecond)
|
|
}
|
|
s.cron.Stop()
|
|
s.dao.Close()
|
|
s.actSub.Close()
|
|
s.bnjSub.Close()
|
|
//s.vipSub.Close()
|
|
s.kfcSub.Close()
|
|
s.closed = true
|
|
return
|
|
}
|