package dao import ( "context" "encoding/json" "fmt" "go-common/app/service/bbq/topic/api" "go-common/app/service/bbq/topic/internal/model" "go-common/library/cache/redis" "go-common/library/ecode" "go-common/library/log" "go-common/library/sync/errgroup.v2" "go-common/library/xstr" "strings" ) const ( _selectTopic = "select `id`, `name`, `desc`, `state` from topic where id in (%s)" _insertUpdateTopic = "insert into topic (`name`,`score`,`state`,`video_num`) values %s on duplicate key update `video_num`=`video_num`+1" _selectTopicID = "select id, name from topic where name in (%s)" _selectDiscoveryTopic = "select id from topic where state=0 %s order by score desc, id desc limit %d, %d" _selectUnavailabelTopic = "select id from topic where state=1 limit %d,%d" _updateTopicField = "update topic set `%s` = ? where `id` = ?" ) const ( _topicKey = "topic:%d" ) // RawTopicInfo 从mysql获取topic info func (d *Dao) RawTopicInfo(ctx context.Context, topicIDs []int64) (res map[int64]*api.TopicInfo, err error) { res = make(map[int64]*api.TopicInfo) if len(topicIDs) == 0 { return } querySQL := fmt.Sprintf(_selectTopic, xstr.JoinInts(topicIDs)) rows, err := d.db.Query(ctx, querySQL) if err != nil { log.Errorw(ctx, "log", "get topic error", "err", err, "sql", querySQL) return } defer rows.Close() for rows.Next() { topicInfo := new(api.TopicInfo) if err = rows.Scan(&topicInfo.TopicId, &topicInfo.Name, &topicInfo.Desc, &topicInfo.State); err != nil { log.Errorw(ctx, "log", "get topic from mysql fail", "sql", querySQL) return } topicInfo.CoverUrl = "http://i0.hdslb.com/bfs/bbq/video-image/userface/155886860_1547729941" res[topicInfo.TopicId] = topicInfo } log.V(1).Infow(ctx, "log", "get topic", "req", topicIDs, "rsp_size", len(res)) return } // CacheTopicInfo 从缓存获取topic info func (d *Dao) CacheTopicInfo(ctx context.Context, topicIDs []int64) (res map[int64]*api.TopicInfo, err error) { res = make(map[int64]*api.TopicInfo) keys := make([]string, 0, len(topicIDs)) keyMidMap := make(map[int64]bool, len(topicIDs)) for _, topicID := range topicIDs { key := fmt.Sprintf(_topicKey, topicID) if _, exist := keyMidMap[topicID]; !exist { // duplicate mid keyMidMap[topicID] = true keys = append(keys, key) } } conn := d.redis.Get(ctx) defer conn.Close() for _, key := range keys { conn.Send("GET", key) } conn.Flush() var data []byte for i := 0; i < len(keys); i++ { if data, err = redis.Bytes(conn.Receive()); err != nil { if err == redis.ErrNil { err = nil } else { log.Errorv(ctx, log.KV("event", "redis_get"), log.KV("key", keys[i])) } continue } topicInfo := new(api.TopicInfo) json.Unmarshal(data, topicInfo) res[topicInfo.TopicId] = topicInfo } log.Infov(ctx, log.KV("event", "redis_get"), log.KV("row_num", len(res))) return } // AddCacheTopicInfo 添加topic info缓存 func (d *Dao) AddCacheTopicInfo(ctx context.Context, topicInfos map[int64]*api.TopicInfo) (err error) { keyValueMap := make(map[string][]byte, len(topicInfos)) for topicID, topicInfo := range topicInfos { key := fmt.Sprintf(_topicKey, topicID) if _, exist := keyValueMap[key]; !exist { data, _ := json.Marshal(topicInfo) keyValueMap[key] = data } } conn := d.redis.Get(ctx) defer conn.Close() for key, value := range keyValueMap { conn.Send("SET", key, value, "EX", d.topicExpire) } conn.Flush() for i := 0; i < len(keyValueMap); i++ { conn.Receive() } log.Infov(ctx, log.KV("event", "redis_set"), log.KV("row_num", len(topicInfos))) return } // DelCacheTopicInfo 删除topic info缓存 func (d *Dao) DelCacheTopicInfo(ctx context.Context, topicID int64) { var key = fmt.Sprintf(_topicKey, topicID) conn := d.redis.Get(ctx) defer conn.Close() conn.Do("DEL", key) } // InsertTopics 插入话题 func (d *Dao) InsertTopics(ctx context.Context, topics map[string]*api.TopicInfo) (newTopics map[string]*api.TopicInfo, err error) { //func (d *Dao) InsertTopics(ctx context.Context, topics map[string]int64) (err error) { newTopics = make(map[string]*api.TopicInfo) // 0. check if len(topics) == 0 { return } if len(topics) > model.MaxBatchLen { err = ecode.TopicNumTooManyErr return } // 长度校验 for _, item := range topics { if strings.Count(item.Name, "")-1 > model.MaxTopicNameLen { err = ecode.TopicNameLenErr log.Errorw(ctx, "log", "topic name len too long", "name", item.Name) return } } // 1. 插入更新 group := errgroup.WithCancel(ctx) group.GOMAXPROCS(5) var groupInsertTopic = func(topicInfo *api.TopicInfo) { group.Go(func(ctx context.Context) (err error) { topicID, err := d.insertTopic(ctx, topicInfo) if err != nil { log.Warnw(ctx, "log", "get topic videos fail", "topic_name", topicInfo.Name) return } if topicID == 0 { log.Errorw(ctx, "log", "get error topic_id", "name", topicInfo.Name) err = ecode.TopicInsertErr return } topicInfo.TopicId = topicID return }) } for _, topic := range topics { groupInsertTopic(topic) } err = group.Wait() if err != nil { log.Warnw(ctx, "log", "do group insert topic fail") return } // 由于insert的时候会返回ID,所以直接赋值返回 newTopics = topics return } // insertTopic 插入话题 func (d *Dao) insertTopic(ctx context.Context, topicInfo *api.TopicInfo) (topicID int64, err error) { //func (d *Dao) InsertTopics(ctx context.Context, topics map[string]int64) (err error) { // 0. check // 长度校验 if strings.Count(topicInfo.Name, "")-1 > model.MaxTopicNameLen { err = ecode.TopicNameLenErr log.Errorw(ctx, "log", "topic name len too long", "name", topicInfo.Name) return } var str string // 1. 插入更新 str += fmt.Sprintf("('%s',%f,%d,1)", topicInfo.Name, topicInfo.Score, topicInfo.State) insertSQL := fmt.Sprintf(_insertUpdateTopic, str) log.V(1).Infow(ctx, "sql", insertSQL) res, err := d.db.Exec(ctx, insertSQL) if err != nil { log.Errorw(ctx, "log", "insert topic fail", "topic_name", topicInfo.Name) return } topicID, err = res.LastInsertId() if err != nil { log.Errorw(ctx, "log", "insert topic fail", "topic_name", topicInfo.Name) return } return } // UpdateTopic 更新话题,当前有简介和状态 // 这个函数把操作权其实已经交给上层了,设计上不是个好设计,但是在于避免重复代码 func (d *Dao) UpdateTopic(ctx context.Context, topicID int64, field string, value interface{}) (err error) { if field != "desc" && field != "state" { return ecode.ReqParamErr } querySQL := fmt.Sprintf(_updateTopicField, field) _, err = d.db.Exec(ctx, querySQL, value, topicID) if err != nil { log.Errorw(ctx, "log", "update topic field fail", "field", field, "value", value, "topic_id", topicID) return } d.DelCacheTopicInfo(ctx, topicID) return } // TopicID 通过话题name获取话题ID // 话题ID结果存在topics中 func (d *Dao) TopicID(ctx context.Context, names []string) (topics map[string]int64, err error) { topics = make(map[string]int64) if len(names) == 0 { return } if len(names) > model.MaxBatchLen { err = ecode.TopicNumTooManyErr return } querySQL := fmt.Sprintf(_selectTopicID, "\""+strings.Join(names, "\",\"")+"\"") log.V(1).Infow(ctx, "log", "select topic id", "sql", querySQL) rows, err := d.db.Query(ctx, querySQL) if err != nil { log.Errorw(ctx, "log", "get topic id error", "err", err, "sql", querySQL) return } defer rows.Close() var topicID int64 var name string for rows.Next() { if err = rows.Scan(&topicID, &name); err != nil { log.Errorw(ctx, "log", "scan topic id error", "err", err, "sql", querySQL) return } topics[name] = topicID } log.V(1).Infow(ctx, "log", "get topic id", "req", names, "rsp", topics) return } // ListUnAvailableTopics . func (d *Dao) ListUnAvailableTopics(ctx context.Context, page int32, size int32) (list []int64, hasMore bool, err error) { hasMore = true // 0. check if page < 1 { err = ecode.TopicReqParamErr return } if page > model.MaxDiscoveryTopicPage { hasMore = false return } // 2. get list offset := (page - 1) * size querySQL := fmt.Sprintf(_selectUnavailabelTopic, offset, size) rows, err := d.db.Query(ctx, querySQL) if err != nil { log.Errorw(ctx, "log", "get topic video error", "err", err, "sql", querySQL) return } defer rows.Close() for rows.Next() { var topicID int64 if err = rows.Scan(&topicID); err != nil { log.Errorw(ctx, "log", "get topic from mysql fail", "sql", querySQL) return } list = append(list, topicID) } // 3. 判断has_more if len(list) < int(size) { hasMore = false } return } // ListRankTopics 获取推荐的话题列表 // TODO: 把置顶逻辑移上去 func (d *Dao) ListRankTopics(ctx context.Context, page int32, size int32) (list []int64, hasMore bool, err error) { hasMore = true // 0. check if page < 1 { err = ecode.TopicReqParamErr return } if page > model.MaxDiscoveryTopicPage { hasMore = false return } // 1. 获取置顶数据s additionalConditionSQL := "" stickList, err := d.GetStickTopic(ctx) if err != nil { log.Warnw(ctx, "log", "get stick topic fail") } else if len(stickList) > 0 { additionalConditionSQL = fmt.Sprintf("and id not in (%s)", xstr.JoinInts(stickList)) } // 2. 若page=1,则获取推荐 if page == 1 { list = stickList } // 3. 根据page获取话题列表 offset := (page - 1) * size querySQL := fmt.Sprintf(_selectDiscoveryTopic, additionalConditionSQL, offset, size) log.Infow(ctx, "sql", querySQL, "page", page, "size", size) rows, err := d.db.Query(ctx, querySQL) if err != nil { log.Errorw(ctx, "log", "get topic video error", "err", err, "sql", querySQL) return } defer rows.Close() for rows.Next() { var topicID int64 if err = rows.Scan(&topicID); err != nil { log.Errorw(ctx, "log", "get topic from mysql fail", "sql", querySQL) return } list = append(list, topicID) } // 4. 判断has_more if len(list) < int(size) { hasMore = false } return } // GetStickTopic 获取置顶视频 // TODO: 这个方式是临时之计,当qps增大时,会导致热点的产生 func (d *Dao) GetStickTopic(ctx context.Context) (list []int64, err error) { return d.getRedisList(ctx, model.RedisStickTopicKey) } func (d *Dao) setStickTopic(ctx context.Context, list []int64) (err error) { return d.setRedisList(ctx, model.RedisStickTopicKey, list) } // StickTopic . func (d *Dao) StickTopic(ctx context.Context, opTopicID, op int64) (err error) { // 0. check info, err := d.TopicInfo(ctx, []int64{opTopicID}) if err != nil { log.Warnw(ctx, "log", "get topic info fail", "topic_id", opTopicID) return } topicInfo, exists := info[opTopicID] if !exists { log.Errorw(ctx, "log", "stick topic fail due to error topic_id", "topic_id", opTopicID) err = ecode.TopicIDNotFound return } if topicInfo.State != api.TopicStateAvailable { log.Errorw(ctx, "log", "topic state unavailable to do sticking", "state", topicInfo.State, "topic_id", opTopicID) err = ecode.TopicStateErr return } // 1. 获取stick topic stickList, err := d.GetStickTopic(ctx) if err != nil { log.Warnw(ctx, "log", "get stick topic fail") return } // 2. 操作stick topic var newStickList []int64 if op != 0 { newStickList = append(newStickList, opTopicID) } for _, stickTopicID := range stickList { if stickTopicID != opTopicID { newStickList = append(newStickList, stickTopicID) } } if len(newStickList) > model.MaxStickTopicNum { newStickList = newStickList[:model.MaxStickTopicNum] } // 3. 更新stick topic err = d.setStickTopic(ctx, newStickList) if err != nil { log.Warnw(ctx, "update stick topic fail") return } return }