bilibili-backup/app/interface/main/reply/service/reply.go
2019-04-22 02:59:20 +00:00

1478 lines
42 KiB
Go

package service
import (
"context"
"regexp"
"strconv"
"strings"
"sync"
"time"
"go-common/app/interface/main/reply/conf"
"go-common/app/interface/main/reply/model/adminlog"
"go-common/app/interface/main/reply/model/drawyoo"
"go-common/app/interface/main/reply/model/reply"
accmdl "go-common/app/service/main/account/api"
"go-common/app/service/main/archive/api"
arcmdl "go-common/app/service/main/archive/model/archive"
assmdl "go-common/app/service/main/assist/model/assist"
filgrpc "go-common/app/service/main/filter/api/grpc/v1"
locmdl "go-common/app/service/main/location/model"
relmdl "go-common/app/service/main/relation/model"
thumdl "go-common/app/service/main/thumbup/model"
ugcpay "go-common/app/service/main/ugcpay/api/grpc/v1"
"go-common/library/ecode"
"go-common/library/log"
xip "go-common/library/net/ip"
"go-common/library/net/metadata"
"go-common/library/queue/databus/report"
"go-common/library/sync/errgroup"
xtime "go-common/library/time"
"github.com/mvdan/xurls"
)
var (
_emptyReplies = make([]*reply.Reply, 0)
_emptyAction = map[int64]int8{}
_emptyCards = make(map[int64]*accmdl.Card)
_emptyBlackList = make(map[int64]bool)
_emptyRelations = make(map[int64]*accmdl.RelationReply)
_emojiCode = regexp.MustCompile(`\[[^\[+][^]]+]`)
)
// status
const (
StatusNormal = 1
StatusNeedContest = 2
StatusForbidden = 3
)
// IsWhiteAid IsWhiteAid
func (s *Service) IsWhiteAid(aid int64, tp int8) bool {
if tp != 1 {
return false
}
for _, white := range s.aidWhiteList {
if aid == white {
return true
}
}
return false
}
// UserBlockStatus UserBlockStatus
func (s *Service) UserBlockStatus(c context.Context, mid int64) (int, error) {
res, err := s.dao.BlockStatus.BlockInfo(c, mid)
if err != nil {
return 0, err
}
if res.ForeverBlock || time.Now().Unix() < res.BlockUntil {
return StatusForbidden, nil
}
if res.PassTest == 0 {
return StatusNormal, nil
}
return StatusNeedContest, nil
}
// ValidUserStatus validate reply user status
func (s *Service) ValidUserStatus(c context.Context, profile *accmdl.Profile, isUpper bool) (err error) {
// if myInfo.Silence == 1 {
// err = ecode.UserDisabled
// } else if myInfo.Active == 0 {
// err = ecode.UserInactive
// } else if myInfo.Moral < 60 {
// err = ecode.LackOfScores
// } else if myInfo.Rank == 5000 {
// err = ecode.UserNoMember
// } else if myInfo.Level.Cur < 1 {
// err = ecode.UserLevelLow
// }
if profile.Silence == 1 {
err = ecode.UserDisabled
} else if profile.TelStatus == 0 && profile.EmailStatus == 0 {
err = ecode.UserInactive
} else if profile.Moral < 60 {
err = ecode.LackOfScores
} else if profile.Rank == 5000 && !isUpper {
err = ecode.UserNoMember
} else if profile.Level < 1 && !isUpper {
err = ecode.UserLevelLow
}
return
}
// ValidUserAction validate reply user status for like/hate action.
func (s *Service) ValidUserAction(c context.Context, profile *accmdl.Profile) (err error) {
if profile.Silence == 1 {
err = ecode.UserDisabled
} else if profile.TelStatus == 0 && profile.EmailStatus == 0 {
err = ecode.UserInactive
} else if profile.Moral < 60 {
err = ecode.LackOfScores
}
return
}
// checkSpam detemine whether user can reply or not
func (s *Service) checkSpam(c context.Context, sub *reply.Subject, mid int64, captcha string, level int) (uri string, err error) {
if sub.Type != reply.SubTypeBBQ && sub.Type != reply.SubTypeHuoniao {
if level <= reply.UserLevelFirst && sub.Mid != mid {
if captcha == "" {
var uri string
uri, err = s.Captcha(c, mid)
if err != nil {
return "", err
}
return uri, ecode.ReplyDeniedAsCaptcha
} else if err = s.VerifyCaptcha(c, captcha, mid); err != nil {
return "", err
}
}
}
recent, daily, err := s.dao.Redis.SpamReply(c, mid)
if err != nil {
log.Error("replyCacheDao.SpamReply(%d), err (%v)", mid, err)
return "", err
}
if recent == ecode.ReplyDeniedAsCD.Code() || daily == ecode.ReplyDeniedAsCD.Code() {
return "", ecode.ReplyDeniedAsCD
}
if recent == ecode.ReplyDeniedAsCaptcha.Code() || daily == ecode.ReplyDeniedAsCaptcha.Code() {
if captcha == "" {
uri, err := s.Captcha(c, mid)
if err != nil {
return "", err
}
return uri, ecode.ReplyDeniedAsCaptcha
}
if err := s.VerifyCaptcha(c, captcha, mid); err != nil {
return "", err
}
s.dao.Redis.DelReplyIncr(c, mid, sub.Mid == mid)
s.dao.Redis.DelReplySpam(c, mid)
}
s.dao.Databus.AddSpam(c, sub.Oid, mid, sub.Mid == mid, sub.Type)
return "", nil
}
func (s *Service) isNormalVip(c context.Context, profile *accmdl.Profile) bool {
return profile.Vip.Type != 0 && profile.Vip.Status == 1
}
// ContainUrls ContainUrls
func (s *Service) ContainUrls(msg string) bool {
return xurls.Strict.FindAllString(msg, -1) != nil
}
// bigDataFilter check content by big data and find conmment garbage
func (s *Service) bigDataFilter(c context.Context, msg string) (err error) {
if err = s.bigdata.Filter(c, msg); err != nil {
log.Error("s.bigdata.Filter(%s) error(%v)", msg, err)
}
return
}
func (s *Service) isUpper(c context.Context, mid, oid int64, tp int8) bool {
sub, err := s.getSubject(c, oid, tp)
if err != nil {
return false
}
return sub.Mid == mid
}
// CheckAssist check whether upper grant the supervision permission for user
func (s *Service) CheckAssist(c context.Context, mid, uid int64) (assisted bool, operation bool) {
arg := &assmdl.ArgAssist{
Mid: mid,
AssistMid: uid,
Type: 1,
RealIP: "",
}
if respro, _ := s.assist.Assist(c, arg); respro == nil {
log.Error("s.assist.Assist(%d, %d) error(%v)", mid, uid, "获取up协管关系错误")
} else if respro.Assist == 1 {
assisted = true
if respro.Allow == 1 {
operation = true
}
}
return assisted, operation
}
// getAssistList fetch all assistants of user mid
func (s *Service) getAssistList(c context.Context, mid int64) (assistMap map[int64]int) {
ip := metadata.String(c, metadata.RemoteIP)
arg := &assmdl.ArgAssists{
Mid: mid,
RealIP: ip,
}
assistMap = make(map[int64]int)
if response, err := s.assist.AssistIDs(c, arg); err != nil {
log.Error("s.assist.Assists(%d) error(%v)", mid, err)
} else {
for _, tmp := range response {
assistMap[tmp] = 1
}
}
return
}
// checkContentFilter2 check content by word filter and minus moral when this be filtered.
func (s *Service) checkContentFilter2(c context.Context, oid, mid, rpid int64, ip, msg string, tp int8) (correct string, err error) {
arg := &filgrpc.FilterReq{
Message: msg,
Area: "reply",
Id: rpid,
Oid: oid,
Mid: mid,
}
var res *filgrpc.FilterReply
res, err = s.filcli.Filter(c, arg)
if err != nil {
log.Error("s.filter.Filter(%s) error(%v)", msg, err)
return msg, err
}
switch int(res.Limit) {
case ecode.FilterHitLimitBlack.Code():
log.Info("Reply id %d, content %q contains sensitive msg, not allowed to send out", rpid, msg)
err = ecode.ReplyHitBlacklist
case ecode.FilterHitRubLimit.Code():
log.Info("Reply id %d, content %q was sent too many times, exceed allowed counts", rpid, msg)
err = ecode.ReplyOverRateLimit
case ecode.FilterHitStrictLimit.Code():
if res.Level == 0 {
log.Info("Reply id %d, content %q was limit strictly", rpid, msg)
err = ecode.ReplyDeniedAsCaptcha
}
}
if err != nil {
return msg, err
}
if res.Level > 0 {
s.cache.Do(c, func(ctx context.Context) {
s.AddFilteredReply(ctx, rpid, oid, mid, tp, int8(res.Level), msg, time.Now())
})
switch res.Level {
case 10, 20:
err = ecode.ReplyMosaicByFilter
case 30:
err = ecode.ReplyDeniedByFilter
return
case 40:
tmp := []rune(msg)
if len(tmp) > 80 {
tmp = tmp[:80]
}
arg := &accmdl.MoralReq{
Mid: mid,
Moral: -1,
Oper: "",
Reason: "发布恶意评论: " + string(tmp),
Remark: "云屏蔽",
RealIp: ip,
}
if _, err = s.acc.AddMoral3(c, arg); err != nil {
log.Error("s.acc.AddMoral3(%d) error(%v)", mid, err)
return
}
err = ecode.ReplyDeniedByFilter
return
}
}
correct = res.Result
return
}
// AddFilteredReply AddFilteredReply
func (s *Service) AddFilteredReply(c context.Context, rpID, oid, mid int64, tp, level int8, message string, now time.Time) (err error) {
return s.dao.Reply.AddFilteredReply(c, rpID, oid, mid, tp, level, message, now)
}
// UseBigdata use bigdata switch
func (s *Service) UseBigdata(c context.Context, b bool, per int64) bool {
s.useBigData = b
return s.useBigData
}
// AddReply add a reply.
func (s *Service) AddReply(c context.Context, mid, oid int64, tp, plat int8, ats []int64, accessKey, cookie, captcha, msg, dev, ver, platform string, build int64, buvid string) (r *reply.Reply, uri string, err error) {
var (
rootID, parentID, dialog int64
profile *accmdl.Profile
subject *reply.Subject
)
//whitelist for test
profile, subject, uri, err = s.validateReply(c, mid, oid, tp, captcha, msg, accessKey, cookie)
if err != nil {
return
}
// check content contain emoji code
if emoCodes := _emojiCode.FindAllString(msg, -1); len(emoCodes) > 0 {
if s.isNormalVip(c, profile) {
if len(emoCodes) > conf.Conf.Reply.MaxEmoji {
err = ecode.ReplyEmojiOverMax
return
}
needRepressEmoCodes := make([]string, 0)
for _, emoCode := range emoCodes {
if _, ok := s.emojisM[emoCode]; !ok {
needRepressEmoCodes = append(needRepressEmoCodes, emoCode)
}
}
if len(needRepressEmoCodes) > 0 {
msg = RepressEmotions(msg, needRepressEmoCodes)
}
} else {
msg = RepressEmotions(msg, emoCodes)
}
}
if err = s.SuperviseReply(c, mid, accessKey, cookie, int8(tp)); err != nil {
return
}
r, err = s.persistReply(c, mid, rootID, parentID, plat, tp, ats, msg, dev, ver, captcha, platform, build, buvid, subject, dialog)
if err == ecode.ReplyDeniedAsCaptcha {
uri, err := s.Captcha(c, mid)
if err != nil {
return r, "", err
}
return r, uri, ecode.ReplyDeniedAsCaptcha
}
return
}
// RepressEmotions RepressEmotions
func RepressEmotions(msg string, emoCodes []string) string {
for _, emoCode := range emoCodes {
msg = repressEmotion(msg, emoCode)
}
return msg
}
func repressEmotion(msg, emoCode string) string {
// replace [] to 【】
emoCode = emoCode[1 : len(emoCode)-1]
return strings.Replace(msg, "["+emoCode+"]", "【"+emoCode+"】", -1)
}
// AddReplyReply add reply to a root reply.
func (s *Service) AddReplyReply(c context.Context, mid, oid, rootID, parentID int64, tp, plat int8, ats []int64, accessKey, cookie, captcha, msg, dev, ver, platform string, build int64, buvid string) (r *reply.Reply, uri string, err error) {
var dialog int64
var profile *accmdl.Profile
var subject *reply.Subject
profile, subject, uri, err = s.validateReply(c, mid, oid, tp, captcha, msg, accessKey, cookie)
if err != nil {
return
}
if emoCodes := _emojiCode.FindAllString(msg, -1); len(emoCodes) > 0 {
if s.isNormalVip(c, profile) {
if len(emoCodes) > conf.Conf.Reply.MaxEmoji {
err = ecode.ReplyEmojiOverMax
return
}
needRepressEmoCodes := make([]string, 0)
for _, emoCode := range emoCodes {
if _, ok := s.emojisM[emoCode]; !ok {
needRepressEmoCodes = append(needRepressEmoCodes, emoCode)
}
}
if len(needRepressEmoCodes) > 0 {
msg = RepressEmotions(msg, needRepressEmoCodes)
}
} else {
msg = RepressEmotions(msg, emoCodes)
}
}
rootReply, err := s.GetRootReply(c, oid, rootID, tp)
if err != nil {
return
}
// NOTE if the pending reply, the state is not normal
if rootReply.IsDeleted() {
err = ecode.ReplyDeleted
return
}
if s.RelationBlocked(c, rootReply.Mid, mid) {
err = ecode.ReplyBlacklistFilter
return
}
if err = s.SuperviseReply(c, mid, accessKey, cookie, int8(tp)); err != nil {
return
}
if rootID != parentID {
var parentReply *reply.Reply
if parentReply, err = s.GetReply(c, oid, parentID, tp); err != nil {
return
}
// if parentReply.Dialog == 0 {
// s.dao.Databus.RecoverFixDialogIdx(c, oid, tp, rootID)
// }
dialog = parentReply.Dialog
if parentReply.Root != rootID {
err = ecode.ReplyIllegalRoot
return
}
if mid != parentReply.Mid && !parentReply.IsNormal() {
err = ecode.ReplyNotExist
return
}
if s.RelationBlocked(c, parentReply.Mid, mid) {
err = ecode.ReplyBlacklistFilter
return
}
}
r, err = s.persistReply(c, mid, rootID, parentID, plat, tp, ats, msg, dev, ver, captcha, platform, build, buvid, subject, dialog)
if err == ecode.ReplyDeniedAsCaptcha {
uri, err := s.Captcha(c, mid)
if err != nil {
return r, "", err
}
return r, uri, ecode.ReplyDeniedAsCaptcha
}
return
}
func (s *Service) validateReply(c context.Context, mid, oid int64, tp int8, captcha, msg, accessKey, cookie string) (profile *accmdl.Profile, subject *reply.Subject, uri string, err error) {
profile, err = s.userInfo(c, mid)
if err != nil {
log.Error("myinfo(%d) error(%v)", mid, err)
return nil, nil, "", err
}
if tp != reply.SubTypeBBQ {
if conf.Conf.Identification.SwitchOn && profile.Identification == 0 {
if profile.TelStatus == 0 {
err = ecode.UserCheckNoPhone
return
}
if profile.TelStatus == 2 && profile.Identification == 0 {
err = ecode.UserCheckInvalidPhone
return
}
}
}
subject, err = s.Subject(c, oid, tp)
if err != nil {
return
}
if tp != reply.SubTypeBBQ && tp != reply.SubTypeHuoniao {
if err = s.ValidUserStatus(c, profile, subject.Mid == mid); err != nil {
log.Warn("s.ValidUserStatus(%d,%+v) error(%v)", mid, profile.Level, err)
return
}
}
if s.RelationBlocked(c, subject.Mid, mid) {
err = ecode.ReplyBlacklistFilter
return
}
if tp != reply.SubTypeBBQ && tp != reply.SubTypeHuoniao {
if profile.Level < reply.UserLevelSnd && subject.Mid != mid {
err = ecode.UserLevelLow
return
}
}
if mid != 165252 && mid != 10287644 {
if uri, err = s.checkSpam(c, subject, mid, captcha, int(profile.Level)); err != nil {
log.Error("s.checkSpam failed(%d) err is %V", mid, err)
return
}
}
if tp == reply.SubTypeArchive && subject.Mid != mid {
var arc *api.Arc
arc, err = s.arcSrv.Archive3(c, &arcmdl.ArgAid2{Aid: oid})
if err != nil {
log.Error("s.arcSrc.Archive3(%d) failed!err:=%v", oid, err)
return
}
if arc.Rights.UGCPay == 1 {
var relation *ugcpay.AssetRelationResp
relation, err = s.ugcpay.AssetRelation(c, &ugcpay.AssetRelationReq{Mid: mid, Oid: oid, Otype: "archive"})
if err != nil {
log.Error("s.ugcpay.AssetRelation(%d,%d) failed!err:=%v", mid, oid, err)
return
}
if relation.State != "paid" {
err = ecode.ReplyForbidReplyNotPay
return
}
}
}
return
}
func (s *Service) persistReply(c context.Context, mid, root, parent int64, plat, tp int8, ats []int64, msg, dev, ver, captcha, platform string, build int64, buvid string, subject *reply.Subject, dialog int64) (r *reply.Reply, err error) {
rpID, err := s.nextID(c)
if err != nil {
return
}
// 一级子评论
if root == parent && root != 0 {
dialog = rpID
} else if root != parent {
parentRp, err := s.reply(c, mid, subject.Oid, parent, tp)
if err != nil {
return nil, err
}
dialog = parentRp.Dialog
}
cTime := xtime.Time(time.Now().Unix())
ip := metadata.String(c, metadata.RemoteIP)
port := metadata.String(c, metadata.RemotePort)
r = &reply.Reply{
RpID: rpID,
Oid: subject.Oid,
Type: tp,
Mid: mid,
Root: root,
State: reply.ReplyStateNormal,
Parent: parent,
CTime: cTime,
Dialog: dialog,
Content: &reply.Content{
RpID: rpID,
Message: msg,
Ats: ats,
IP: xip.InetAtoN(ip),
Plat: plat,
Device: dev,
Version: ver,
CTime: cTime,
},
}
if s.useBigData {
if err = s.bigDataFilter(c, msg); err != nil {
if err == ecode.ReplyDeniedAsGarbage {
// TODO: do not use garbage as state
r.State = reply.ReplyStateGarbage
r.AttrSet(reply.AttrYes, reply.ReplyAttrGarbage)
}
}
}
// if not rpID passed, then no data will be recorded
msg, err = s.checkContentFilter2(c, r.Oid, mid, rpID, ip, msg, r.Type)
if err != nil {
if err != ecode.ReplyDeniedAsCaptcha && err != ecode.ReplyMosaicByFilter {
log.Error("s.checkContentFilter2(%d, %d, msg: %s) error(%v)", mid, subject.Oid, msg, err)
return
}
if err == ecode.ReplyHitBlacklist || err == ecode.ReplyOverRateLimit {
return
}
if err == ecode.ReplyDeniedAsCaptcha {
if captcha == "" {
return
}
if err = s.VerifyCaptcha(c, captcha, mid); err != nil {
return
}
} else {
r.Content.Message = msg
r.AttrSet(reply.AttrYes, reply.ReplyAttrFilter)
r.State = reply.ReplyStateFiltered
}
}
// NOTE audit pending most priority
if subject.AttrVal(reply.SubAttrMonitor) == reply.AttrYes {
r.State = reply.ReplyStateMonitor
}
if subject.AttrVal(reply.SubAttrAudit) == reply.AttrYes {
r.State = reply.ReplyStateAudit
}
s.dao.Databus.AddReply(c, subject.Oid, r)
report.User(&report.UserInfo{
Mid: r.Mid,
Platform: platform,
Build: build,
Buvid: buvid,
Business: 41,
Type: int(r.Type),
Oid: r.Oid,
Action: reply.ReportReplyAdd,
Ctime: time.Now(),
IP: ip + ":" + port,
Index: []interface{}{
r.RpID,
r.State,
r.State,
strconv.FormatInt(r.Root, 10),
},
})
return
}
// checkUpSpam determine user can add up.
func (s *Service) checkActionSpam(c context.Context, mid int64) (err error) {
var ret int
if ret, err = s.dao.Redis.SpamAction(c, mid); err != nil {
log.Error("replyCacheDao.SpamAction(%d), err (%v)", mid, err)
} else {
if ret != ecode.OK.Code() {
err = ecode.ReplyForbidAction
}
}
return
}
// AddAction do act or cancel act for a reply.
func (s *Service) AddAction(c context.Context, mid, oid, rpID int64, tp, action int8, ak, ck, op, platform, buvid string, build int64) (err error) {
if err = reply.CheckAction(action); err != nil {
return
}
user, err := s.userInfo(c, mid)
if err != nil {
return
}
if err = s.ValidUserAction(c, user); err != nil {
return
}
if err = s.checkActionSpam(c, mid); err != nil {
log.Error("s.checkActionSpam(%d) err (%v)", mid, err)
return
}
r, err := s.reply(c, mid, oid, rpID, tp)
if err != nil {
return
}
// NOTE if the pending reply, the state is not normal
if mid != r.Mid && !r.IsNormal() {
err = ecode.ReplyForbidAction
return
}
if s.RelationBlocked(c, r.Mid, mid) {
err = ecode.ReplyBlacklistFilter
return
}
var (
userLikes map[int64]int8
act int8
)
if userLikes, err = s.thumbup.HasLike(c, &thumdl.ArgHasLike{Business: "reply", MessageIDs: []int64{rpID}, Mid: mid}); err != nil {
log.Error("s.thumbup.HasLike(%d,%d,%d) error(%v)", mid, rpID, oid, err)
return
}
act = userLikes[rpID]
now := time.Now()
remoteIP := metadata.String(c, metadata.RemoteIP)
var ac string
if op == "like" {
if (int8(act) == reply.ActionLike && action == reply.OpAdd) || (int8(act) != reply.ActionLike && action == reply.OpCancel) {
err = ecode.ReplyActioned
return
}
if action == reply.OpAdd {
ac = reply.ReportReplyLike
err = s.thumbup.Like(c, &thumdl.ArgLike{UpMid: r.Mid, Business: "reply", Mid: mid, MessageID: rpID, Type: thumdl.TypeLike, RealIP: remoteIP, OriginID: oid})
} else {
ac = reply.ReportReplyCancelLike
err = s.thumbup.Like(c, &thumdl.ArgLike{UpMid: r.Mid, Business: "reply", Mid: mid, MessageID: rpID, Type: thumdl.TypeCancelLike, RealIP: remoteIP, OriginID: oid})
}
if err == nil {
s.dao.Databus.Like(c, oid, rpID, mid, action, now.Unix())
}
} else {
if (int8(act) == reply.ActionHate && action == reply.OpAdd) || (int8(act) != reply.ActionHate && action == reply.OpCancel) {
err = ecode.ReplyActioned
return
}
if action == reply.OpAdd {
ac = reply.ReportReplyHate
err = s.thumbup.Like(c, &thumdl.ArgLike{UpMid: r.Mid, Business: "reply", Mid: mid, MessageID: rpID, Type: thumdl.TypeDislike, RealIP: remoteIP, OriginID: oid})
} else {
ac = reply.ReportReplyCancelHate
err = s.thumbup.Like(c, &thumdl.ArgLike{UpMid: r.Mid, Business: "reply", Mid: mid, MessageID: rpID, Type: thumdl.TypeCancelDislike, RealIP: remoteIP, OriginID: oid})
}
if err == nil {
s.dao.Databus.Hate(c, oid, rpID, mid, action, now.Unix())
}
}
if err != nil {
if ecode.ThumbupCancelDislikeErr.Equal(err) || ecode.ThumbupCancelLikeErr.Equal(err) || ecode.ThumbupDupLikeErr.Equal(err) || ecode.ThumbupDupDislikeErr.Equal(err) {
err = nil
return
}
log.Error("thumbup (%d,%d,%d,%s,%d) failed!err:=%v", mid, oid, rpID, op, action, err)
return
}
err = s.infoc.Info(mid, platform, build, buvid, 41, int(r.Type), r.Oid, ac, remoteIP, time.Now().Format("2006-01-02 15:04:05"), r.RpID, r.Mid, "", "", "", "", "")
if err != nil {
log.Error("infoc error (%v)", err)
}
return
}
func (s *Service) getIdsByRoots(c context.Context, oid int64, roots []int64, tp int8, pn, ps int) (sidsmap map[int64][]int64, ids []int64, err error) {
var (
start = (pn - 1) * ps
end = start + ps - 1
miss []int64
tmprpIDs []int64
)
if sidsmap, ids, miss, err = s.dao.Redis.RangeByRoots(c, roots, start, end); err != nil {
log.Error("s.dao.Redis.RangeByRoots() err(%v)", err)
return
}
if len(miss) == 0 {
return
}
for _, root := range miss {
if tmprpIDs, err = s.dao.Reply.GetIdsByRoot(c, oid, root, tp, start, ps); err != nil {
log.Error("s.dao.Reply.GetIdsByRoot(oid %d,tp %d,root %d) err(%v)", oid, tp, root, err)
}
if len(tmprpIDs) != 0 {
sidsmap[root] = tmprpIDs
ids = append(ids, tmprpIDs...)
s.dao.Databus.RecoverIndexByRoot(c, oid, root, tp)
}
}
return
}
func (s *Service) actions(c context.Context, mid, oid int64, rpIDs []int64) (amap map[int64]int8, err error) {
if mid == 0 {
amap = _emptyAction
return
}
amap, err = s.thumbup.HasLike(c, &thumdl.ArgHasLike{Business: "reply", MessageIDs: rpIDs, Mid: mid})
if err != nil {
log.Error("s.thumbup.HasLike(%d, %d) error(%v)", mid, rpIDs, err)
return
}
// NOTE: may have many keys
// if mid not action,add -1 as mark
if len(amap) == 0 {
amap = map[int64]int8{-1: 0}
}
return
}
// getAccInfo get account infos of mids
func (s *Service) getAccInfo(c context.Context, mids []int64) (cards map[int64]*accmdl.Card, err error) {
if len(mids) == 0 {
cards = _emptyCards
return
}
var cardsReply *accmdl.CardsReply
if cardsReply, err = s.acc.Cards3(c, &accmdl.MidsReq{Mids: mids}); err != nil {
log.Error("s.acc.MultiInfo2(%v) error(%v)", mids, err)
return nil, err
}
cards = cardsReply.Cards
return
}
// GetBlacklist get account infos of mids
func (s *Service) GetBlacklist(c context.Context, mid int64) (blacklistMap map[int64]bool, err error) {
if mid == 0 {
blacklistMap = _emptyBlackList
return
}
var blacksReply *accmdl.BlacksReply
if blacksReply, err = s.acc.Blacks3(c, &accmdl.MidReq{Mid: mid}); err != nil {
log.Error("s.acc.Blacks(%v) error(%v)", mid, err)
return
}
blacklistMap = blacksReply.BlackList
return
}
// GetAttentions get relationships whether the user(mid) follows the target reply users
func (s *Service) getAttentions(c context.Context, mid int64, targetMids []int64) (relations map[int64]*accmdl.RelationReply, err error) {
if len(targetMids) == 0 {
relations = _emptyRelations
return
}
ip := metadata.String(c, metadata.RemoteIP)
var relationsReply *accmdl.RelationsReply
if relationsReply, err = s.acc.Relations3(c, &accmdl.RelationsReq{Mid: mid, Owners: targetMids, RealIp: ip}); err != nil {
log.Error("s.acc.Relations2(%v, %v) error(%v)", mid, targetMids, err)
return
}
relations = relationsReply.Relations
return
}
// Subject get normal state reply subject
func (s *Service) Subject(c context.Context, oid int64, tp int8) (*reply.Subject, error) {
subject, err := s.getSubject(c, oid, tp)
if err != nil {
return nil, err
}
if subject.State == reply.SubStateForbid {
return nil, ecode.ReplyForbidReply
}
return subject, nil
}
func (s *Service) getSubject(c context.Context, oid int64, tp int8) (*reply.Subject, error) {
if !reply.LegalSubjectType(tp) {
log.Error("illegal subject type: %v", tp)
return nil, ecode.ReplyIllegalSubType
}
sub, err := s.dao.Mc.GetSubject(c, oid, tp)
if err != nil {
log.Error("replyCacheDao.GetSubject(%d, %d) error(%v)", oid, tp, err)
}
if sub != nil {
return sub, nil
}
sub, err = s.dao.Subject.Get(c, oid, tp)
if err != nil {
log.Error("s.subject.Get(%d, %d) error(%v)", oid, tp, err)
}
if err == nil && sub != nil {
s.dao.Mc.AddSubject(c, sub)
return sub, nil
}
// fetch from remote call
if tp != reply.SubTypeDrawyoo {
log.Error("subject type is nether topic nor drawyoo: %v", tp)
return nil, ecode.ReplyForbidReply
}
var mid int64
if tp == reply.SubTypeDrawyoo {
var yoo *drawyoo.Drawyoo
if yoo, err = s.drawyoo.Info(c, oid); err != nil || yoo == nil {
log.Warn("drawtyoo.DrawInfo(%d) not exist", oid)
err = ecode.ReplyForbidReply
return nil, err
}
mid = yoo.Mid
}
sub, err = s.upsertSubject(c, oid, tp, reply.SubStateNormal, mid)
if err == ecode.ReplySubjectExist {
sub, err = s.dao.Subject.Get(c, oid, tp)
if err != nil {
log.Error("s.subject.Get(%d, %d) error(%v)", oid, tp, err)
return nil, err
}
return sub, nil
}
return sub, err
}
// upsertSubject insert or update a subject.
func (s *Service) upsertSubject(c context.Context, oid int64, tp, state int8, mid int64) (sub *reply.Subject, err error) {
now := time.Now()
sub = &reply.Subject{
Oid: oid,
Type: tp,
Mid: mid,
State: state,
CTime: xtime.Time(now.Unix()),
MTime: xtime.Time(now.Unix()),
}
sub.ID, err = s.dao.Subject.Set(c, sub)
if err != nil {
log.Error("s.subject.Insert(%s) error(%v)", sub, err)
return
}
if sub.ID == 0 {
log.Warn("already have subject oid(%d) type(%d)", oid, tp)
err = ecode.ReplySubjectExist
}
return
}
// setSubject insert or update a subject.
//
// Deprecated
func (s *Service) setSubject(c context.Context, oid int64, tp, state int8, mid int64) (sub *reply.Subject, err error) {
now := time.Now()
sub, err = s.dao.Subject.Get(c, oid, tp)
if err != nil {
return
}
if sub != nil && sub.AttrVal(reply.SubAttrFrozen) == reply.AttrYes {
err = ecode.ReplySubjectFrozen
return
}
sub = &reply.Subject{
Oid: oid,
Type: tp,
Mid: mid,
State: state,
CTime: xtime.Time(now.Unix()),
MTime: xtime.Time(now.Unix()),
}
sub.ID, err = s.dao.Subject.Set(c, sub)
if err != nil {
log.Error("s.subject.Insert(%s) error(%v)", sub, err)
return
}
if sub.ID == 0 {
log.Warn("already have subject oid(%d) type(%d)", oid, tp)
err = ecode.ReplySubjectExist
}
return
}
// Reply get reply from cache or db.
// NOTE old php api call
// TODO mobile jump
func (s *Service) Reply(c context.Context, oid int64, tp int8, rpID int64) (r *reply.Reply, err error) {
if r, err = s.GetReply(c, oid, rpID, tp); err != nil {
log.Error("s.reply(oid %d,rpid %d) err(%v)", oid, rpID, err)
return
}
r.Content, _ = s.dao.Content.Get(c, oid, rpID)
arg := &accmdl.MidReq{Mid: r.Mid}
r.Member = new(reply.Member)
var card *accmdl.CardReply
if card, err = s.acc.Card3(c, arg); err != nil {
log.Error("s.acc.Info2(%d) error(%v)", r.Mid, err)
return
}
r.Member.Info = &reply.Info{}
if card != nil {
r.Member.Info.FromCard(card.Card)
}
return
}
// Deprecated: use GetReply instead
func (s *Service) reply(c context.Context, mid, oid, rpID int64, tp int8) (r *reply.Reply, err error) {
if r, err = s.dao.Mc.GetReply(c, rpID); err != nil {
log.Error("replyCacheDao.GetReply(%d, %d, %d) error(%v)", oid, rpID, tp, err)
err = nil // NOTE ignore error
}
if r == nil {
if r, err = s.dao.Reply.Get(c, oid, rpID); err != nil {
log.Error("s.reply.GetReply(%d, %d) error(%v)", oid, rpID, err)
return
}
}
if r != nil {
if r.Oid != oid || r.Type != tp {
log.Warn("reply dismatches with parameter, oid: %d, rpID: %d, tp: %d, actual: %d, %d, %d", oid, rpID, tp, r.Oid, r.RpID, r.Type)
err = ecode.RequestErr
return
}
// NOTE if the pending reply, the state is audit
if mid != r.Mid && !r.IsNormal() {
err = ecode.ReplyNotExist
return
}
} else {
err = ecode.ReplyNotExist
}
return
}
// GetRootReply GetRootReply
func (s *Service) GetRootReply(c context.Context, oid, rpID int64, tp int8) (*reply.Reply, error) {
r, err := s.GetReply(c, oid, rpID, tp)
if err != nil {
return nil, err
}
if r.Root != 0 {
return nil, ecode.ReplyIllegalRoot
}
return r, nil
}
// GetReply GetReply
func (s *Service) GetReply(c context.Context, oid, rpID int64, tp int8) (*reply.Reply, error) {
r, err := s.dao.Mc.GetReply(c, rpID)
if err != nil {
log.Error("replyCacheDao.GetReply(%d, %d, %d) error(%v)", oid, rpID, tp, err)
err = nil // NOTE ignore error
}
if r == nil {
r, err = s.dao.Reply.Get(c, oid, rpID)
if err != nil {
log.Error("s.reply.GetReply(%d, %d) error(%v)", oid, rpID, err)
return nil, err
}
if r == nil {
return nil, ecode.ReplyNotExist
}
if r.Oid != oid {
log.Warn("reply dismatches with parameter, oid: %d, rpID: %d, tp: %d, actual: %d, %d, %d", oid, rpID, tp, r.Oid, r.RpID, r.Type)
return nil, ecode.RequestErr
}
}
return r, nil
}
// getReplyPos get root reply position.
func (s *Service) getReplyPos(c context.Context, sub *reply.Subject, rp *reply.Reply) (pos int) {
if ok, _ := s.dao.Redis.ExpireIndex(c, sub.Oid, sub.Type, reply.SortByFloor); ok {
var err error
if pos, err = s.dao.Redis.RankIndex(c, sub.Oid, sub.Type, rp.RpID, reply.SortByFloor); err == nil && pos >= 0 {
pos++
return
}
}
// If get position from redis failed, then calc by subject
pos = sub.Count - rp.Floor + 1
return
}
// getReplyPosByRoot get reply position from root reply.
func (s *Service) getReplyPosByRoot(c context.Context, rootRp *reply.Reply, rp *reply.Reply) (pos int) {
if ok, _ := s.dao.Redis.ExpireIndexByRoot(c, rootRp.RpID); ok {
var err error
if pos, err = s.dao.Redis.RankIndexByRoot(c, rootRp.RpID, rp.RpID); err == nil && pos >= 0 {
pos++
return
}
}
// If get position from redis failed, then calc by subject
pos = rootRp.Count - rp.Floor + 1
return
}
// Hide hide reply by upper.
func (s *Service) Hide(c context.Context, oid, mid, rpID int64, tp int8, ak, ck string) (err error) {
now := time.Now()
if !reply.LegalSubjectType(tp) {
err = ecode.ReplyIllegalSubType
return
}
if !s.isUpper(c, mid, oid, tp) {
err = ecode.AccessDenied
return
}
if _, err = s.reply(c, mid, oid, rpID, tp); err != nil {
return
}
s.dao.Databus.Hide(c, oid, rpID, tp, now.Unix())
return
}
// Show show reply by upper.
func (s *Service) Show(c context.Context, oid, mid, rpID int64, tp int8, ak, ck string) (err error) {
now := time.Now()
if !reply.LegalSubjectType(tp) {
err = ecode.ReplyIllegalSubType
return
}
if !s.isUpper(c, mid, oid, tp) {
err = ecode.AccessDenied
return
}
if _, err = s.reply(c, mid, oid, rpID, tp); err != nil {
return
}
s.dao.Databus.Show(c, oid, rpID, tp, now.Unix())
return
}
// Emojis get vip emojis
func (s *Service) Emojis(c context.Context) (emo []*reply.EmojiPackage) {
emo = s.emojis
return
}
// UpperAddTop add top reply by upper
func (s *Service) UpperAddTop(c context.Context, mid, oid, rpID int64, tp, act int8, platform string, build int64, buvid string) (err error) {
var (
ts = time.Now().Unix()
r *reply.Reply
)
if !reply.LegalSubjectType(tp) {
err = ecode.ReplyIllegalSubType
return
}
if !s.isUpper(c, mid, oid, tp) {
err = ecode.AccessDenied
return
}
sub, err := s.Subject(c, oid, tp)
if err != nil {
log.Error("s.Subject(oid %v) err(%v)", oid, err)
return
}
if r, err = s.GetTop(c, sub, oid, tp, reply.ReplyAttrUpperTop); err != nil {
log.Error("s.GetTop(%d,%d) err(%v)", oid, tp, err)
return
}
if r != nil && act == 1 {
log.Warn("oid(%d) type(%d) already have top ", oid, tp)
err = ecode.ReplyHaveTop
return
}
if r == nil && act == 0 {
log.Warn("oid(%d) type(%d) do not have top ", oid, tp)
err = ecode.ReplyNotExist
return
}
if r != nil && r.RpID != rpID {
log.Error("reply not exist top(%v) rpID(%v)", r.RpID, rpID)
err = ecode.ReplyNotExist
return
}
// TODO: only need reply,no not need content and user info
if r, err = s.reply(c, mid, oid, rpID, tp); err != nil {
log.Error("s.GetReply err (%v)", err)
return
}
if r == nil {
log.Warn("oid(%d) type(%d) rpID(%d) do not exist ", oid, tp, rpID)
err = ecode.ReplyNotExist
return
}
if r.AttrVal(reply.ReplyAttrAdminTop) == 1 {
err = ecode.ReplyHaveTop
return
}
if r.Root != 0 {
log.Warn("oir(%d) type(%d) rpID(%d) not root reply", oid, tp, rpID)
err = ecode.ReplyNotRootReply
return
}
s.dao.Databus.UpperAddTop(c, mid, oid, rpID, ts, act, tp)
var action = reply.ReportReplyTop
if act == 0 {
action = reply.ReportReplyUntop
}
ip := metadata.String(c, metadata.RemoteIP)
report.User(&report.UserInfo{
Mid: r.Mid,
Platform: platform,
Build: build,
Buvid: buvid,
Business: 41,
Type: int(r.Type),
Oid: r.Oid,
Action: action,
Ctime: time.Now(),
IP: ip,
Index: []interface{}{
r.RpID,
},
})
return
}
// GetTop get upperTop reply from cache or db.
func (s *Service) GetTop(c context.Context, sub *reply.Subject, oid int64, tp int8, top uint32) (r *reply.Reply, err error) {
if (top == reply.ReplyAttrUpperTop) && sub.AttrVal(reply.SubAttrUpperTop) == 0 {
return
}
if (top == reply.ReplyAttrAdminTop) && sub.AttrVal(reply.SubAttrAdminTop) == 0 {
return
}
if r, err = s.dao.Mc.GetTop(c, oid, tp, top); err != nil {
log.Error("s.dao.Mc.GetAdminTop(%d, %d) error(%v)", oid, tp, err)
err = ecode.ServerErr
return
}
// NOTE load by job ,in case Cache penetration
if r == nil {
// if r, err = s.dao.Reply.GetTop(c, oid, tp, top); err != nil {
// log.Error("s.dao.Reply.GetTop(%d, %d) error(%v)", oid, tp, err)
// err = ecode.ServerErr
// return
// }
// if r == nil {
// err = ecode.ReplyNotExist
// return
// }
// if r.Content, err = s.dao.Content.Get(c, oid, r.rpID); err != nil {
// return
// }
// select {
// case s.topRpChan <- topRpChan{oid: oid, tp: tp, rp: r}:
// default:
// log.Warn("s.replyChan is full")
s.dao.Databus.AddTop(c, oid, tp, top)
}
return
}
// getRelation get account infos of mids
func (s *Service) getRelation(c context.Context, srcID, targetID int64, ip string) (uint32, error) {
if targetID == 0 {
return 0, nil
}
relMap, err := s.acc.RichRelations3(c, &accmdl.RichRelationReq{Owner: srcID, Mids: []int64{targetID}, RealIp: ip})
if err != nil || relMap == nil {
log.Error("s.acc.RichRelations2 sourceId(%v) targetId(%v)error(%v)", srcID, targetID, err)
// return normal relation if remote service is down!
return 0, nil
}
rel, ok := relMap.RichRelations[targetID]
if !ok {
// return normal relation if remote service is down!
return 0, nil
}
return relmdl.Attr(uint32(rel)), nil
}
// RelationBlocked RelationBlocked
func (s *Service) RelationBlocked(c context.Context, srcMid, targetMid int64) bool {
rel, _ := s.getRelation(c, srcMid, targetMid, "")
return rel == relmdl.AttrBlack
}
// SuperviseReply Supervise Reply
func (s *Service) SuperviseReply(c context.Context, mid int64, ak, ck string, tp int8) (err error) {
if conf.Conf.Supervision.Completed {
err = ecode.ReplyUpgrading
return
}
now := time.Now()
loc, _ := time.LoadLocation("Asia/Shanghai")
startTime, _ := time.ParseInLocation("2006-01-02 15:04:05", conf.Conf.Supervision.StartTime, loc)
endTime, _ := time.ParseInLocation("2006-01-02 15:04:05", conf.Conf.Supervision.EndTime, loc)
if now.Before(startTime) || now.After(endTime) {
err = nil
return
}
if tp == reply.SubTypeDrawyoo {
err = ecode.ReplyUpgrading
return
}
if overseas, _ := s.checkOverseasUser(c); overseas {
err = ecode.ReplyUpgrading
}
return
}
func (s *Service) checkOverseasUser(c context.Context) (overseas bool, err error) {
ip := metadata.String(c, metadata.RemoteIP)
arg := &locmdl.ArgIP{
IP: ip,
}
overseas = false
if respro, err := s.location.Info(c, arg); err != nil || respro == nil {
log.Error("s.location.Info(%s) error(%v) or respro is nil", ip, err)
} else {
if !strings.EqualFold(respro.Country, conf.Conf.Supervision.Location) {
overseas = true
}
}
return
}
// FetchFans fetching a fans relation array between upper(mid) and the users(uids)
func (s *Service) FetchFans(c context.Context, uids []int64, mid int64) (fans map[int64]*reply.FansDetail, err error) {
if fans, err = s.fans.Fetch(c, uids, mid, time.Now()); err != nil {
log.Error("s.fans.fetch(%d, %d) error(%v)", mid, uids, err)
}
return
}
// PaginateUpperDeletedLogs paginating the admin logs for size of 'pageSize', and returning the number of reporting, the number of admin logs delete by administrator
func (s *Service) PaginateUpperDeletedLogs(c context.Context, oid int64, tp int, curPage, pageSize int) (logs []*adminlog.AdminLog, replyCount, reportCount, pageCount, total int64, err error) {
var states = []int64{16, 18}
if logs, replyCount, reportCount, pageCount, total, err = s.search.LogPaginate(c, oid, tp, states, curPage, pageSize, conf.Conf.AssistConfig.StartTime, time.Now()); err != nil {
log.Error("s.adminlog.Paginate(%d, %d) error(%v)", oid, tp, err)
return nil, 0, 0, 0, 0, err
}
var mids = make([]int64, 0)
for _, d := range logs {
mids = append(mids, d.AdminID, d.ReplyMid)
}
minfos, err := s.getAccInfo(c, mids)
if err != nil {
log.Error("s.getAccInfo(mids %v) err(%v)", mids, err)
// NOTE degrade account
err = nil
}
for _, d := range logs {
if userInfo, ok := minfos[d.ReplyMid]; ok {
rs := []rune(userInfo.Name)
length := len(rs)
if length >= 3 {
d.ReplyUser = string(rs[0]) + "***" + string(rs[length-1])
} else if length == 2 {
d.ReplyUser = string(rs[0]) + "***"
} else {
d.ReplyUser = userInfo.Name
}
d.ReplyFacePic = userInfo.Face
}
if upperInfo, ok := minfos[d.AdminID]; ok {
d.Operator = upperInfo.Name
}
}
return
}
// GetReplyLogConfig get reply configuration from memocached or load a record from database by oid, type, category
func (s *Service) GetReplyLogConfig(c context.Context, sub *reply.Subject, category int8) (config *reply.Config, err error) {
if sub.AttrVal(reply.SubAttrConfig) == 0 {
return nil, nil
}
config, err = s.dao.Mc.GetReplyConfig(c, sub.Oid, sub.Type, category)
if err != nil {
log.Error("replyConfigCacheDao.GetReplyConfig(%d, %d, %d) error(%v)", sub.Oid, sub.Type, err)
err = nil // NOTE ignore error
}
if config == nil {
config, err = s.dao.Config.LoadConfig(c, sub.Oid, sub.Type, category)
if err != nil {
log.Error("s.reply.GetReply(%d, %d) error(%v)", sub.Oid, sub.Type, err)
return nil, err
}
if config == nil {
return nil, nil
}
if err = s.dao.Mc.AddReplyConfigCache(c, config); err != nil {
log.Error("replyConfigCacheDao.AddReplyConfig(%v) error(%v)", config, err)
return config, nil
}
}
return
}
// VerifyCaptcha VerifyCaptcha
func (s *Service) VerifyCaptcha(c context.Context, captcha string, mid int64) error {
token, err := s.dao.Mc.CaptchaToken(c, mid)
if err != nil {
return err
}
return s.dao.Captcha.Verify(c, token, captcha)
}
// Captcha return Captcha
func (s *Service) Captcha(c context.Context, mid int64) (string, error) {
token, uri, err := s.dao.Captcha.Captcha(c)
if err != nil {
return "", err
}
s.dao.Mc.SetCaptchaToken(c, mid, token)
return uri, nil
}
// Topics return topics
func (s *Service) Topics(c context.Context, mid int64, oid int64, typ int8, msg string) (topics []string, err error) {
if s.IsBnj(oid, typ) {
topics = []string{"拜年祭"}
return
}
topics, err = s.bigdata.Topics(c, mid, oid, typ, msg)
if err != nil {
return
}
if len(topics) == 0 {
return
}
messages := make(map[string]string)
for i := range topics {
key := strconv.FormatInt(int64(i), 10)
messages[key] = topics[i]
}
topics = topics[:0]
mf := &filgrpc.MFilterReq{
Area: "reply",
MsgMap: messages,
}
res, err := s.filcli.MFilter(c, mf)
if err != nil {
log.Error("s.fil.MFilter(%v) failed!err:=%v", messages, err)
return
}
for _, data := range res.RMap {
if data.Level > 15 {
continue
}
topics = append(topics, data.Result)
}
return
}
// IsHotReply IsHotReply
func (s *Service) IsHotReply(c context.Context, tp int8, oid, rpID int64) (isHot bool, err error) {
rpIDs, _, err := s.dao.Redis.Range(c, oid, tp, reply.SortByLike, 0, 5)
if err != nil {
log.Error("s.dao.Redis.Range() error(%v)", err)
return
}
rs, err := s.GetReplyByIDs(c, oid, tp, rpIDs)
if err != nil {
log.Error("s.GetReplyByIDs() error(%v)", err)
return
}
for _, rp := range rs {
if rpID == rp.RpID && rp.Like >= 3 {
isHot = true
return
}
}
return
}
// HotsBatch return HotsBatch
func (s *Service) HotsBatch(c context.Context, tp, size int8, oids []int64, mid int64) (res map[int64][]*reply.Reply, err error) {
var (
missed []int64
missedSubs []int64
m sync.Mutex
oidMap = make(map[int64][]int64)
subMap map[int64]*reply.Subject
)
res = make(map[int64][]*reply.Reply, len(oids))
if subMap, missedSubs, err = s.dao.Mc.GetMultiSubject(c, oids, tp); err != nil {
log.Error("s.dao.Mc.GetMultiSubject() error(%v)", err)
return
}
if len(missedSubs) > 0 {
var missedSubMap map[int64]*reply.Subject
if missedSubMap, err = s.dao.Subject.Gets(c, missedSubs, tp); err != nil {
log.Error("s.dao.Subject.Gets() error(%v)", err)
return
}
var subs []*reply.Subject
for oid, sub := range missedSubMap {
subMap[oid] = sub
subs = append(subs, sub)
}
s.cache.Do(c, func(ctx context.Context) { s.dao.Mc.AddSubject(ctx, subs...) })
}
if oidMap, missed, err = s.dao.Redis.RangeByOids(c, oids, tp, reply.SortByLike, 0, size); err != nil {
log.Error("s.dao.Redis.RangeByOids() error(%v)", err)
return
}
if len(missed) > 0 {
g, ctx := errgroup.WithContext(c)
for _, missedOid := range missed {
missedOid := missedOid
g.Go(func() error {
s.cache.Do(ctx, func(ctx context.Context) { s.dao.Databus.RecoverIndex(ctx, missedOid, tp, reply.SortByLike) })
rpIDs, err := s.dao.Reply.GetIdsSortLike(ctx, missedOid, tp, 0, int(size))
if err != nil {
return err
}
m.Lock()
oidMap[missedOid] = rpIDs
m.Unlock()
return nil
})
}
if err = g.Wait(); err != nil {
return
}
}
for oid, rpIDs := range oidMap {
var (
rpsMap map[int64]*reply.Reply
rps = make([]*reply.Reply, 0, len(rpIDs))
)
// 通过评论ID获取评论内容等元信息
if rpsMap, err = s.repliesMap(c, oid, tp, rpIDs); err != nil {
return
}
for _, rpID := range rpIDs {
if r, ok := rpsMap[rpID]; ok {
rps = append(rps, r)
}
}
if sub, ok := subMap[oid]; ok {
// 点赞以及关注等等关系构建
if err = s.buildReply(c, sub, rps, mid, false); err != nil {
return
}
}
res[oid] = rps
}
return
}