bilibili-backup/app/job/main/activity/service/service.go
2019-04-22 02:59:20 +00:00

730 lines
20 KiB
Go

package service
import (
"context"
"encoding/json"
"runtime"
"strconv"
"sync"
"time"
actrpc "go-common/app/interface/main/activity/rpc/client"
artmdl "go-common/app/interface/openplatform/article/model"
artrpc "go-common/app/interface/openplatform/article/rpc/client"
"go-common/app/job/main/activity/conf"
"go-common/app/job/main/activity/dao/bnj"
"go-common/app/job/main/activity/dao/dm"
"go-common/app/job/main/activity/dao/kfc"
"go-common/app/job/main/activity/dao/like"
kfcmdl "go-common/app/job/main/activity/model/kfc"
l "go-common/app/job/main/activity/model/like"
"go-common/app/job/main/activity/model/match"
"go-common/app/service/main/account/api"
arcapi "go-common/app/service/main/archive/api"
arcrpc "go-common/app/service/main/archive/api/gorpc"
comarcmdl "go-common/app/service/main/archive/model/archive"
"go-common/app/service/main/coin/api/gorpc"
"go-common/library/log"
"go-common/library/queue/databus"
"github.com/robfig/cron"
)
const (
//_startVoteP = 2
//_startVote = 3
//_endingVote = 4
//_endVote = 5
//_goOn = 6
//_next = 7
_matchObjTable = "act_matchs_object"
_subjectTable = "act_subject"
_likesTable = "likes"
_likeContentTable = "like_content"
_likeActionTable = "like_action"
//_vipActOrderTable = "vip_order_activity_record"
_objectPieceSize = 100
_retryTimes = 3
_typeArc = "archive"
_typeArt = "article"
_sharding = 10
)
// Service service
type Service struct {
c *conf.Config
dao *like.Dao
bnj *bnj.Dao
dm *dm.Dao
kfcDao *kfc.Dao
// waiter
waiter sync.WaitGroup
closed bool
// cache: type, upper
// arc rpc
arcRPC *arcrpc.Service2
coinRPC *coin.Service
actRPC *actrpc.Service
articleRPC *artrpc.Service
//grpc
accClient api.AccountClient
// databus
actSub *databus.Databus
bnjSub *databus.Databus
// vip binlog databus
//vipSub *databus.Databus
kfcSub *databus.Databus
kfcActionCh []chan *kfcmdl.CouponMsg
kfcShare int
subActionCh []chan *l.Action
actionSM []map[int64]*l.LastTmStat
// bnj
bnjMaxSecond int64
bnjLessSecond int64
bnjTimeFinish int64
bnjMsgFlagMap map[int]int64
bnjMsgFlagMu sync.Mutex
bnjWxMsgFlagMap map[int]int64
bnjWxMsgFlagMu sync.Mutex
// cron
cron *cron.Cron
}
// New is archive service implementation.
func New(c *conf.Config) (s *Service) {
s = &Service{
c: c,
dao: like.New(c),
dm: dm.New(c),
bnj: bnj.New(c),
kfcDao: kfc.New(c),
arcRPC: arcrpc.New2(c.ArchiveRPC),
articleRPC: artrpc.New(c.ArticleRPC),
coinRPC: coin.New(c.CoinRPC),
actRPC: actrpc.New(c.ActRPC),
actSub: databus.New(c.ActSub),
bnjSub: databus.New(c.BnjSub),
//vipSub: databus.New(c.VipSub),
kfcSub: databus.New(c.KfcSub),
cron: cron.New(),
}
var err error
if s.accClient, err = api.NewClient(c.AccClient); err != nil {
panic(err)
}
if s.c.Bnj2019.MsgSpec != "" {
if err = s.cron.AddFunc(s.c.Bnj2019.MsgSpec, s.cronInformationMessage); err != nil {
panic(err)
}
log.Info("cronInformationMessage init")
s.cron.Start()
}
//time.Sleep(2 * time.Second)
//subject, err := s.sub(context.Background(), c.Rule.BroadcastSid)
//if err != nil {
// log.Error("error(%v)", err)
// return
//}
//log.Info("start-subject")
//log.Info("subject(%v)", subject)
//log.Info("end-subject")
//if subject != nil {
// go s.genesis(subject)
//}
s.bnjMsgFlagMap = make(map[int]int64, len(bnjSteps))
s.bnjWxMsgFlagMap = make(map[int]int64, len(bnjSteps))
for _, step := range bnjSteps {
s.bnjMsgFlagMap[step] = 0
s.bnjWxMsgFlagMap[step] = 0
}
for i := 0; i < _sharding; i++ {
s.subActionCh = append(s.subActionCh, make(chan *l.Action, 10240))
s.actionSM = append(s.actionSM, map[int64]*l.LastTmStat{})
s.waiter.Add(1)
go s.actionDealProc(i)
}
//s.waiter.Add(1)
//go s.vipCanal()
s.waiter.Add(1)
go s.consumeCanal()
go s.subjectStat(s.c.Rule.ArcObjStatSid, _typeArc)
go s.subjectStat(s.c.Rule.ArtObjStatSid, _typeArt)
go s.kingStoryTotalStat(s.c.Rule.KingStorySid)
go s.subsRankproc()
go s.initBnjSecond()
if runtime.NumCPU() <= 4 {
s.kfcShare = 4
} else if runtime.NumCPU() > 32 {
s.kfcShare = 32
} else {
s.kfcShare = runtime.NumCPU()
}
for j := 0; j < s.kfcShare; j++ {
s.kfcActionCh = append(s.kfcActionCh, make(chan *kfcmdl.CouponMsg, 10240))
s.waiter.Add(1)
go s.kfcActionDeal(j)
}
s.waiter.Add(1)
go s.kfcCanal()
return s
}
func (s *Service) likeArc(c context.Context, sub *l.Subject) (res *l.Subject, err error) {
if sub != nil {
if sub.ID == 0 {
res = nil
} else {
res = sub
var (
ok bool
arcs map[int64]*arcapi.Arc
aids []int64
)
for _, l := range res.List {
aids = append(aids, l.Wid)
}
argAids := &comarcmdl.ArgAids2{
Aids: aids,
}
if arcs, err = s.arcRPC.Archives3(c, argAids); err != nil {
log.Error("s.arcRPC.Archives(arcAids:(%v), arcs), err(%v)", aids, err)
return
}
for _, l := range res.List {
if l.Archive, ok = arcs[l.Wid]; !ok {
log.Info("s.arcs.wid:(%d), (%v)", l.Wid, ok)
continue
}
}
}
}
return
}
//func (s *Service) genesis(l *l.Subject) {
// var (
// index int
// nowTime, stage, yes, no, next int64
// err error
// c = context.Background()
// )
// log.Info("st")
// for {
// if time.Now().Unix() >= l.Stime.Time().Unix() {
// break
// }
// time.Sleep(time.Second)
// }
// lstime := map[string]interface{}{
// "aid": 0,
// "time": 0,
// "index": 0,
// "stage": 0,
// }
// go s.inLtime(lstime, l.ID)
// for i, a := range l.List {
// arg1 := &comarcmdl.ArgAid2{Aid: a.Archive.Aid}
// arc, errRPC := s.arcRPC.Archive3(c, arg1)
// if errRPC != nil {
// log.Error("act-job s.arcRPC.Archive3(%v) error(%v)", arg1, errRPC)
// errRPC = nil
// continue
// }
// if arc.State < 0 && arc.State != -6 {
// log.Error("act-job s.arcRPC.Archive3(%v) stat err", arg1)
// }
// index = i
// nowTime = 0
// stage = 0
// next = 0
// time.Sleep(time.Duration(l.Ltime) * time.Second)
// log.Info("aid sleep")
// aidTime := time.Now().Unix()
// ltime := map[string]interface{}{
// "aid": a.Archive.Aid,
// "time": aidTime,
// "index": i,
// "stage": stage,
// "title": arc.Title,
// "author": arc.Author.Name,
// "tname": arc.TypeName,
// }
// go s.inLtime(ltime, l.ID)
// for {
// fmt.Println(stage)
// log.Info("s.stage{%d}", stage)
// //演出开始
// ltime := map[string]interface{}{
// "aid": a.Archive.Aid,
// "time": aidTime,
// "index": i,
// "stage": stage,
// "title": arc.Title,
// "author": arc.Author.Name,
// "tname": arc.TypeName,
// }
// go s.inLtime(ltime, l.ID)
// tp := l.Interval - l.Ltime
// nowTime += tp
// time.Sleep(time.Duration(tp) * time.Second)
// //预备投票
// sdm := &dmm.ActDM{Act: _startVoteP, Aid: a.Archive.Aid, Next: next, Yes: 0, No: 0, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
// go s.brodcast(sdm)
// log.Info("act:1")
// go s.dao.CreateSelection(c, a.Archive.Aid, stage)
// nowTime += l.Ltime
// time.Sleep(time.Duration(l.Ltime) * time.Second)
// //投票开始
// interval := &dmm.ActDM{Act: _startVote, Aid: a.Archive.Aid, Next: next, Yes: 0, No: 0, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
// go s.brodcast(interval)
// log.Info("act:2")
// tl := l.Tlimit - l.Ltime
// nowTime += tl
// time.Sleep(time.Duration(tl) * time.Second)
// //投票预结束
// intervalP := &dmm.ActDM{Act: _endingVote, Aid: a.Archive.Aid, Next: next, Yes: 0, No: 0, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
// log.Info("act:3")
// go s.brodcast(intervalP)
// nowTime += l.Ltime
// time.Sleep(time.Duration(l.Ltime) * time.Second)
// //投票结果
// if yes, no, err = s.dao.Selection(c, a.Archive.Aid, stage); err != nil {
// log.Error("s.dao.Selection() error(%v)", err)
// return
// }
// goNext := true
// if yes != 0 || no != 0 {
// goNext = (float64(no)/float64(yes+no)*100 > 40)
// }
// if goNext {
// next = 1
// } else {
// next = 0
// }
// intervalEnd := &dmm.ActDM{Act: _endVote, Aid: a.Archive.Aid, Next: next, Yes: yes, No: no, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
// go s.brodcast(intervalEnd)
// log.Info("act:4")
// nowTime += l.Ltime
// time.Sleep(time.Duration(l.Ltime) * time.Second)
// go s.inOnlinelog(c, l.ID, a.Archive.Aid, stage, yes, no)
// if goNext {
// tlimit := &dmm.ActDM{Act: _next, Aid: a.Archive.Aid, Next: next, Yes: yes, No: no, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
// go s.brodcast(tlimit)
// log.Info("act:6")
// nowTime += l.Ltime
// ldtime := map[string]interface{}{
// "aid": 0,
// "time": time.Now().Unix() + l.Ltime,
// "index": i + 1,
// "stage": 0,
// "title": arc.Title,
// "author": arc.Author.Name,
// "tname": arc.TypeName,
// }
// go s.inLtime(ldtime, l.ID)
// time.Sleep(time.Duration(l.Ltime) * time.Second)
// break
// }
// tlimit := &dmm.ActDM{Act: _goOn, Aid: a.Archive.Aid, Next: next, Yes: yes, No: no, Stage: stage, Title: arc.Title, Author: arc.Author.Name, Tname: arc.TypeName}
// log.Info("bro:%v", tlimit)
// go s.brodcast(tlimit)
// log.Info("act:5")
// //投票结果判断
// stage++
// if a.Archive.Duration-nowTime < 60 {
// go s.inOnlinelog(c, l.ID, a.Archive.Aid, 100, 0, 0)
// time.Sleep(time.Duration(a.Archive.Duration-nowTime) * time.Second)
// break
// }
// }
// }
// ltime := map[string]interface{}{
// "aid": 0,
// "time": time.Now().Unix(),
// "index": index + 1,
// "stage": 0,
// "title": "",
// "author": "",
// "tname": "",
// }
// go s.inLtime(ltime, l.ID)
// log.Info("end")
//}
//
//func (s *Service) inOnlinelog(c context.Context, sid, aid, stage, yes, no int64) {
// if row, err := s.dao.InOnlinelog(c, sid, aid, stage, yes, no); err != nil {
// log.Error("s.dao.inOnlinelog, err(%v) row(%v)", err, row)
// }
//}
//
//func (s *Service) inLtime(lt map[string]interface{}, sid int64) {
// var v, err = json.Marshal(lt)
// if err != nil {
// log.Error("s.genesis.inLtime.json.Marshal(dm:(%v)), err(%v)", v, err)
// return
// }
// s.dao.RbSet(context.Background(), "ltime:"+strconv.FormatInt(sid, 10), v)
//}
//
//func (s *Service) brodcast(d *dmm.ActDM) {
// var ds, err = json.Marshal(d)
// if err != nil {
// log.Error("s.genesis.json.Marshal(dm:(%v)), err(%v)", d, err)
// return
// }
// var m = &dmm.Broadcast{
// RoomID: s.c.Rule.BroadcastCid,
// CMD: dmm.BroadcastCMDACT,
// Info: ds,
// }
// s.dm.Broadcast(context.Background(), m)
//}
//
//func (s *Service) sub(c context.Context, sid int64) (res *l.Subject, err error) {
// var (
// eg errgroup.Group
// ls []*l.Like
// )
// eg.Go(func() (err error) {
// res, err = s.dao.Subject(c, sid)
// return
// })
// eg.Go(func() (err error) {
// ls, err = s.dao.Like(c, sid)
// return
// })
// if err = eg.Wait(); err != nil {
// log.Error("eg.Wait error(%v)", err)
// return
// }
// if res != nil {
// res.List = ls
// }
// if res, err = s.likeArc(c, res); err != nil {
// return
// }
// return
//}
//func (s *Service) vipCanal() {
// defer s.waiter.Done()
// var c = context.Background()
// for {
// msg, ok := <-s.vipSub.Messages()
// if !ok {
// log.Info("databus:activity-job vip binlog consumer exit!")
// return
// }
// msg.Commit()
// m := &match.Message{}
// if err := json.Unmarshal(msg.Value, m); err != nil {
// log.Error("json.Unmarshal(%s) error(%+v)", msg.Value, err)
// continue
// }
// switch m.Table {
// case _vipActOrderTable:
// if m.Action == match.ActInsert {
// s.addElemeLottery(c, m.New)
// }
// }
// log.Info("vipCanal key:%s partition:%d offset:%d table:%s", msg.Key, msg.Partition, msg.Offset, m.Table)
// }
//}
func (s *Service) consumeCanal() {
defer s.waiter.Done()
var c = context.Background()
for {
msg, ok := <-s.actSub.Messages()
if !ok {
log.Info("databus: activity-job binlog consumer exit!")
return
}
msg.Commit()
m := &match.Message{}
if err := json.Unmarshal(msg.Value, m); err != nil {
log.Error("json.Unmarshal(%s) error(%+v)", msg.Value, err)
continue
}
switch m.Table {
case _matchObjTable:
if m.Action == match.ActUpdate {
s.upMatchUser(c, m.New, m.Old)
}
case _subjectTable:
if m.Action == match.ActInsert || m.Action == match.ActUpdate {
s.upSubject(c, m.New)
} else if m.Action == match.ActDelete {
s.upSubject(c, m.Old)
}
case _likesTable:
if m.Action == match.ActInsert {
s.AddLike(c, m.New)
} else if m.Action == match.ActUpdate {
s.UpLike(c, m.New, m.Old)
} else if m.Action == match.ActDelete {
s.DelLike(c, m.Old)
}
case _likeContentTable:
if m.Action == match.ActDelete {
s.upLikeContent(c, m.Old)
} else {
s.upLikeContent(c, m.New)
}
case _likeActionTable:
if m.Action == match.ActInsert {
s.actionProc(c, m.New)
}
}
log.Info("consumeCanal key:%s partition:%d offset:%d table:%s", msg.Key, msg.Partition, msg.Offset, m.Table)
}
}
func (s *Service) kfcCanal() {
defer s.waiter.Done()
for {
if s.closed {
return
}
msg, ok := <-s.kfcSub.Messages()
if !ok {
log.Info("databus: activity-job binlog consumer exit!")
return
}
msg.Commit()
m := &kfcmdl.CouponMsg{}
if err := json.Unmarshal(msg.Value, m); err != nil {
log.Error("kfcCanal:json.Unmarshal(%s) error(%+v)", msg.Value, err)
continue
}
j := m.CouponID % int64(s.kfcShare)
select {
case s.kfcActionCh[j] <- m:
default:
log.Info("kfcCanal cache full (%d)", j)
}
log.Info("kfcCanal key:%s partition:%d offset:%d value %s goroutine(%d) all(%d)", msg.Key, msg.Partition, msg.Offset, msg.Value, j, s.kfcShare)
}
}
func (s *Service) subjectStat(sid int64, typ string) {
var err error
for {
if s.closed {
return
}
var (
statLike int64
likeCnt int
likes []*l.Like
)
if sid <= 0 {
log.Warn("conf sid == 0 typ(%s)", typ)
time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
continue
}
if likeCnt, err = s.dao.LikeCnt(context.Background(), sid); err != nil {
log.Error("s.dao.LikeCnt(sid:%d) error(%v)", sid, err)
time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
continue
}
if likeCnt == 0 {
log.Warn("s.dao.LikeCnt(sid:%d) likeCnt == 0", sid)
time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
continue
}
for i := 0; i < likeCnt; i += _objectPieceSize {
if likes, err = s.likeList(context.Background(), sid, i, _objectPieceSize, _retryTimes); err != nil {
log.Error("objectStatproc s.likeList(%d,%d,%d) error(%+v)", sid, i, _objectPieceSize, err)
time.Sleep(100 * time.Millisecond)
continue
} else {
var aids []int64
for _, v := range likes {
if v.Wid > 0 {
aids = append(aids, v.Wid)
}
}
switch typ {
case _typeArc:
var arcs map[int64]*arcapi.Arc
if arcs, err = s.arcs(context.Background(), aids, _retryTimes); err != nil {
log.Error("objectStatproc s.arcs(%v) error(%v)", aids, err)
time.Sleep(100 * time.Millisecond)
continue
} else {
for _, aid := range aids {
if arc, ok := arcs[aid]; ok && arc.IsNormal() {
statLike += int64(arc.Stat.Like)
} else {
likeCnt--
}
}
}
case _typeArt:
var arts map[int64]*artmdl.Meta
if arts, err = s.arts(context.Background(), aids, _retryTimes); err != nil {
log.Error("objectStatproc s.arcs(%v) error(%v)", aids, err)
time.Sleep(100 * time.Microsecond)
continue
} else {
for _, aid := range aids {
if art, ok := arts[aid]; ok && art.IsNormal() {
statLike += art.Stats.Like
} else {
likeCnt--
}
}
}
}
}
}
if err = s.setSubjectStat(context.Background(), sid, &l.SubjectTotalStat{SumLike: statLike}, likeCnt, _retryTimes); err != nil {
log.Error("objectStatproc s.setObjectStat(%d,%d) error(%+v)", sid, statLike, err)
time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
continue
}
time.Sleep(time.Duration(s.c.Interval.ObjStatInterval))
}
}
func (s *Service) likeList(c context.Context, sid int64, offset, limit, retryCnt int) (list []*l.Like, err error) {
for i := 0; i < retryCnt; i++ {
if list, err = s.dao.LikeList(c, sid, offset, limit); err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
return
}
func (s *Service) webDataList(c context.Context, vid int64, offset, limit, retryCnt int) (list []*l.WebData, err error) {
for i := 0; i < retryCnt; i++ {
if list, err = s.dao.WebDataList(c, vid, offset, limit); err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
return
}
func (s *Service) arts(c context.Context, aids []int64, retryCnt int) (arcs map[int64]*artmdl.Meta, err error) {
for i := 0; i < retryCnt; i++ {
if arcs, err = s.articleRPC.ArticleMetas(c, &artmdl.ArgAids{Aids: aids}); err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
return
}
func (s *Service) kingStoryTotalStat(vid int64) {
var err error
for {
if s.closed {
return
}
var (
statView, statLike, statFav, StatCoin int64
likeCnt int
likes []*l.WebData
)
if vid <= 0 {
log.Warn("conf vid == 0")
time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
continue
}
if likeCnt, err = s.dao.WebDataCnt(context.Background(), vid); err != nil {
log.Error("kingStoryTotalStat s.dao.WebDataCnt(sid:%d) error(%v)", vid, err)
time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
continue
}
if likeCnt == 0 {
log.Warn("kingStoryTotalStat s.dao.LikeCnt(sid:%d) likeCnt == 0", vid)
time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
continue
}
for i := 0; i < likeCnt; i += _objectPieceSize {
if likes, err = s.webDataList(context.Background(), vid, i, _objectPieceSize, _retryTimes); err != nil {
log.Error("kingStoryTotalStat s.webDataList(%d,%d,%d) error(%+v)", vid, i, _objectPieceSize, err)
time.Sleep(100 * time.Millisecond)
continue
} else {
var (
aids []int64
aidStruct = new(struct {
Aid string `json:"AID"`
})
)
for _, v := range likes {
if v.Data != "" {
if e := json.Unmarshal([]byte(v.Data), &aidStruct); e != nil {
log.Warn("kingStoryTotalStat json.Unmarshal(%s) error(%v)", v.Data, e)
continue
}
if aid, e := strconv.ParseInt(aidStruct.Aid, 10, 64); e != nil {
log.Warn("kingStoryTotalStat strconv.ParseInt(%s) error(%v)", aidStruct, e)
continue
} else {
aids = append(aids, aid)
}
}
}
var arcs map[int64]*arcapi.Arc
if arcs, err = s.arcs(context.Background(), aids, _retryTimes); err != nil {
log.Error("kingStoryTotalStat s.arcs(%v) error(%v)", aids, err)
time.Sleep(100 * time.Millisecond)
continue
} else {
for _, aid := range aids {
if arc, ok := arcs[aid]; ok && arc.IsNormal() {
statView += int64(arc.Stat.View)
statLike += int64(arc.Stat.Like)
statFav += int64(arc.Stat.Fav)
StatCoin += int64(arc.Stat.Coin)
}
}
}
}
}
if err = s.setSubjectStat(context.Background(), vid, &l.SubjectTotalStat{SumView: statView, SumLike: statLike, SumFav: statFav, SumCoin: StatCoin}, likeCnt, _retryTimes); err != nil {
log.Error("kingStoryTotalStat s.setObjectStat(%d,%d) error(%+v)", vid, statLike, err)
time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
continue
}
time.Sleep(time.Duration(s.c.Interval.KingStoryInterval))
}
}
func (s *Service) setSubjectStat(c context.Context, sid int64, stat *l.SubjectTotalStat, count, retryCnt int) (err error) {
for i := 0; i < retryCnt; i++ {
if err = s.dao.SetObjectStat(c, sid, stat, count); err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
return
}
// Ping reports the heath of services.
func (s *Service) Ping(c context.Context) (err error) {
return s.dao.Ping(c)
}
// Close kafaka consumer close.
func (s *Service) Close() (err error) {
defer s.waiter.Wait()
s.closed = true
if s.bnjTimeFinish == 0 {
s.bnj.AddCacheLessTime(context.Background(), s.bnjLessSecond)
}
s.cron.Stop()
s.dao.Close()
s.actSub.Close()
s.bnjSub.Close()
//s.vipSub.Close()
s.kfcSub.Close()
s.closed = true
return
}