package dao import ( "bytes" "context" "database/sql" "encoding/json" "fmt" "go-common/app/service/bbq/video/api/grpc/v1" "go-common/app/service/bbq/video/model" acc "go-common/app/service/main/account/api" "go-common/library/cache/redis" xsql "go-common/library/database/sql" "go-common/library/ecode" "go-common/library/log" "go-common/library/net/metadata" "go-common/library/xstr" xhttp "net/http" "regexp" "strconv" "strings" "time" ) const ( _BVCSubTableSize = 100 _queryVideo = "SELECT svid FROM video WHERE svid = ?" _addVideo = "INSERT INTO video(`cover_url`,`cover_width`,`cover_height`,`svid`,`title`,`mid`,`avid`,`cid`,`pubtime`,`from`,`tid`,`sub_tid`,`home_img_url`,`home_img_width`,`home_img_height`,`state`) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" _queryTagByName = "SELECT `id` FROM tag WHERE name = ? and type = ?" _insertTag = "INSERT INTO tag (`name`,`type`,`status`) VALUES %s " _insOrUpUserBase = "INSERT IGNORE user_base (mid, uname, face, user_type) VALUES (?, ?, ?, ?) " _insOrUpUserSta = "INSERT IGNORE user_statistics_hive (mid, uname) VALUES (?, ?)" _queryStatisticsList = "select `svid`, `play`, `subtitles`, `like`, `share`, `report` from video_statistics where svid in (%s)" _addBVCData = "insert into %s (`svid`,`path`,`resolution_retio`,`code_rate`,`video_code`,`duration`,`file_size`) values (?,?,?,?,?,?,?)" _updateBVCData = "update %s set path=?, resolution_retio=?, video_code=?, duration=?, file_size=? where svid = ? and code_rate = ?" _updateSvPIC = "update video_repository set cover_url=?,cover_width=?,cover_height=? ,sync_status = sync_status|? where svid = ?" _addVideoViews = "update `video_statistics` set `play` = `play` + ? where `svid` = ?" _existedStatistics = "select `id` from `video_statistics` where `svid` = ?;" _insertStatistics = "insert into `video_statistics`(`svid`, `play`, `subtitles`, `like`, `share`, `report`) values(?,?,?,?,?,?);" _queryVideoList = "select `avid`, `cid`, `svid`, `title`, `mid`, `content`, `pubtime`,`duration`,`tid`,`sub_tid`,`cover_url`,`cover_width`,`cover_height`,`limits`, `state` from video where svid in (%s)" _updateVideoState = "update `video` set `state` = ? where `svid`= ?;" ) const ( videoBaseCacheExpire = 600 videoBaseCacheKey = "video_base:%d" ) func keyVideoBase(svid int64) string { return fmt.Sprintf(videoBaseCacheKey, svid) } // ModifyLimits . func (d *Dao) ModifyLimits(c context.Context, svid int64, limitType uint64, limitOp uint64) (num int64, err error) { // 根据操作选择合适的limits update语句 limitOpCond := fmt.Sprintf("|%d", 1< len(res) { var rspID []int64 for k := range res { rspID = append(rspID, k) } log.Warnw(c, "log", fmt.Sprintf("video req and rsp size not equal: req=%v, rsp=%v", svids, rspID)) } log.V(1).Infow(c, "req_size", len(svids), "rsp_size", len(res)) return } // CacheVideoBase cache video base func (d *Dao) CacheVideoBase(c context.Context, svids []int64) (res map[int64]*v1.VideoBase, err error) { res = make(map[int64]*v1.VideoBase) keys := make([]string, 0, len(svids)) keyMidMap := make(map[int64]bool, len(svids)) for _, svid := range svids { key := keyVideoBase(svid) if _, exist := keyMidMap[svid]; !exist { // duplicate svid keyMidMap[svid] = true keys = append(keys, key) } } conn := d.redis.Get(c) defer conn.Close() for _, key := range keys { conn.Send("GET", key) } conn.Flush() var data []byte for i := 0; i < len(keys); i++ { if data, err = redis.Bytes(conn.Receive()); err != nil { if err == redis.ErrNil { err = nil } else { log.Errorv(c, log.KV("event", "redis_get"), log.KV("key", keys[i])) } continue } baseItem := new(v1.VideoBase) json.Unmarshal(data, baseItem) res[baseItem.Svid] = baseItem } log.Infov(c, log.KV("event", "redis_get"), log.KV("row_num", len(res))) return } // AddCacheVideoBase 添加缓存 func (d *Dao) AddCacheVideoBase(c context.Context, videoBases map[int64]*v1.VideoBase) (err error) { keyValueMap := make(map[string][]byte, len(videoBases)) for mid, videoBase := range videoBases { key := keyVideoBase(mid) if _, exist := keyValueMap[key]; !exist { data, _ := json.Marshal(videoBase) keyValueMap[key] = data } } conn := d.redis.Get(c) defer conn.Close() for key, value := range keyValueMap { conn.Send("SET", key, value, "EX", videoBaseCacheExpire) } conn.Flush() for i := 0; i < len(keyValueMap); i++ { conn.Receive() } log.Infov(c, log.KV("event", "redis_set"), log.KV("row_num", len(videoBases))) return } // DelCacheVideoBase 删除缓存 func (d *Dao) DelCacheVideoBase(c context.Context, svid int64) { var key = keyVideoBase(svid) conn := d.redis.Get(c) defer conn.Close() conn.Do("DEL", key) } // AddOrUpdateVideo 添加或更新视频记录 func (d *Dao) AddOrUpdateVideo(c context.Context, vh *v1.ImportVideoInfo) (err error) { var ( svid int64 ) tx, err := d.BeginTran(c) if err != nil { log.Error("begin transaction err :%v", err) return } defer func() { if err != nil { if err = tx.Rollback(); err != nil { log.Error("tx.Rollback() error(%v)", err) } } else { if err = tx.Commit(); err != nil { log.Error("tx.Commit() error(%v)", err) } } }() p := &model.VideoInfo{ CoverURL: vh.CoverUrl, CoverWidth: vh.CoverWidth, CoverHeight: vh.CoverHeight, SVID: vh.Svid, Title: vh.Title, MID: vh.MID, AVID: vh.AVID, CID: vh.CID, Pubtime: vh.Pubtime, From: int16(vh.From), State: int16(vh.State), TID: vh.TID, SubTID: vh.SubTID, HomeImgURL: vh.HomeImgUrl, HomeImgWidth: vh.HomeImgWidth, HomeImgHeight: vh.HomeImgHeight, } if err = tx.QueryRow(_queryVideo, vh.Svid).Scan(&svid); err == sql.ErrNoRows { if err = d.txInsertVideo(c, tx, p); err != nil { log.Warn("insert video err:%v,svid:%v", err, vh.Svid) return } } else if err != nil { log.Error("video queryrow scan err:[%v], svid[%v]", err, vh.Svid) return } //sync video_upload_process status if err = d.txUpdateVideoUploadProcessStatus(c, tx, vh.Svid, model.VideoUploadProcessStatusSuccessed); err != nil { log.Errorw(c, "event", "d.UpdateVideoUploadProcessStatus err", "err", err) } return } //UpdateVideoUploadProcessStatus ... func (d *Dao) txUpdateVideoUploadProcessStatus(ctx context.Context, tx *xsql.Tx, SVID int64, st int64) (err error) { if _, err = tx.Exec("update video_upload_process set upload_status = ? where svid = ?", st, SVID); err != nil { log.Errorw(ctx, "errmsg", "UpdateVideoUploadProcessStatus update failed", "err", err) } return } //txInsertVideo insert video func (d *Dao) txInsertVideo(c context.Context, tx *xsql.Tx, vh *model.VideoInfo) (err error) { if _, err = tx.Exec(_addVideo, vh.CoverURL, vh.CoverWidth, vh.CoverHeight, vh.SVID, vh.Title, vh.MID, vh.AVID, vh.CID, vh.Pubtime, vh.From, vh.TID, vh.SubTID, vh.HomeImgURL, vh.HomeImgWidth, vh.HomeImgHeight, vh.State, ); err != nil { log.Errorw(c, "event", "insert video err", "err", err, "param", vh) return } return } // AddOrUpdateTag 更新或添加标签 func (d *Dao) AddOrUpdateTag(c context.Context, tmap []*v1.TagInfo) (tids []int64, err error) { // 检查已存在的tag for _, v := range tmap { row := d.db.QueryRow(c, _queryTagByName, v.TagName, v.TagType) t := &model.Tag{ Type: v.TagType, Name: v.TagName, } err = row.Scan(&t.ID) if err == sql.ErrNoRows { var q string var id int64 var res sql.Result n := strings.Replace(t.Name, "'", "\\'", -1) q = "('" + n + "'," + strconv.FormatInt(int64(t.Type), 10) + ",1)" res, _ = d.db.Exec(c, fmt.Sprintf(_insertTag, q)) if res != nil { id, err = res.LastInsertId() } if id != 0 { tids = append(tids, id) } } else if t.ID != 0 { tids = append(tids, t.ID) } else { log.Error("d.db.QueryRow[%v],err:%v", v.TagName, err) return } } return } //根据mids批量查询用户基本信息 func (d *Dao) getUserInfos(c context.Context, mids []int64) (userBases []*model.UserBase, err error) { midsReq := &acc.MidsReq{ Mids: mids, RealIp: metadata.String(c, metadata.RemoteIP)} infosReply, err := d.AccountClient.Infos3(c, midsReq) if infosReply == nil { log.Error("query infos3 failed, err (%v)", err) return } userBases = make([]*model.UserBase, 0, 50) for _, info := range infosReply.Infos { if info.Mid != 0 { if len(info.Face) > 255 { info.Face = "http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png" log.Info("the value of Face is too long, replace it as http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png, mid(%v)", info.Mid) } userBase := &model.UserBase{ Mid: info.Mid, Name: info.Name, Sex: info.Sex, Face: info.Face, Sign: info.Sign, Rank: info.Rank, } userBases = append(userBases, userBase) } } return } //根据mid查询用户基本信息 func (d *Dao) getUserInfo(c context.Context, mid int64) (userBase *model.UserBase, err error) { midReq := &acc.MidReq{ Mid: mid, RealIp: metadata.String(c, metadata.RemoteIP)} info, err := d.AccountClient.Info3(c, midReq) if err != nil { log.Error("query info3 failed,mid(%v), err(%v)", mid, err) return } if len(info.Info.Face) > 255 { info.Info.Face = "http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png" log.Info("the value of Face is too long, replace it as http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png,,mid(%v)", mid) } userBase = &model.UserBase{ Mid: info.Info.Mid, Name: info.Info.Name, Sex: info.Info.Sex, Face: info.Info.Face, Sign: info.Info.Sign, Rank: info.Info.Rank, } log.Info("getUserInfo userbase (%v)", userBase) return } //InOrUpUserBase 更新用户基本信息 func (d *Dao) InOrUpUserBase(c context.Context, mid int64) (response *v1.SyncUserBaseResponse, err error) { var ( retry = 3 try int tx *xsql.Tx res sql.Result ) userBase, _ := d.getUserInfo(c, mid) response = &v1.SyncUserBaseResponse{Affc: -1} for try = 0; try <= retry; try++ { if tx, err = d.BeginTran(c); err != nil { time.Sleep(time.Duration(try) * time.Second) log.Warn("InOrUpUserBase try begin transaction failed ,err(%v)", err) continue } if res, err = tx.Exec( _insOrUpUserBase, userBase.Mid, userBase.Name, userBase.Face, ); err != nil { if err = tx.Rollback(); err != nil { log.Warn("InOrUpUserBase try rollback failed ,error(%v)", err) } } else { if err = tx.Commit(); err != nil { log.Warn("InOrUpUserBase try commit failed , error(%v)", err) } else { //提交成功,退出 response.Affc, _ = res.RowsAffected() log.Info("InOrUpUserBase success, affected %v rows", response.Affc) break } } } if err != nil { log.Error("InOrUpUserBase failed, mid(%v), err(%v)", mid, err) } return } //InOrUpUserBases 批量更新用户基本信息 func (d *Dao) InOrUpUserBases(c context.Context, mids []int64) (response *v1.SyncUserBaseResponse, err error) { var ( retry = 3 try int tx *xsql.Tx res sql.Result ) userBases, _ := d.getUserInfos(c, mids) response = &v1.SyncUserBaseResponse{Affc: -1} for try = 0; try <= retry; try++ { if tx, err = d.BeginTran(c); err != nil { time.Sleep(time.Duration(try) * time.Second) log.Warn("InOrUpUserBases try begin transaction failed failed ,error(%v)", err) continue } sql := "INSERT INTO user_base (mid, uname, face, user_type) VALUES " for _, userBase := range userBases { if userBase.Mid != 0 { sql = sql + "(" + strconv.FormatInt(userBase.Mid, 10) + ",'" + userBase.Name + "','" + userBase.Face + "', 1)," } } if sql == "INSERT INTO user_base (mid, uname, face) VALUES " { response.Affc = 0 log.Info("InOrUpUserBases param mids are not exist") return } sql = sql[0:len(sql)-1] + " ON DUPLICATE KEY UPDATE uname=values(uname), face=values(face);" if res, err = tx.Exec(sql); err != nil { log.Info("InOrUpUserBases sql = (%s)", sql) if err = tx.Rollback(); err != nil { log.Warn("InOrUpUserBases try rollback failed ,error(%v)", err) } } else { log.Info("InOrUpUserBases sql = (%s)", sql) if err = tx.Commit(); err != nil { log.Warn("InOrUpUserBases try commit failed , error(%v)", err) } else { //提交成功,退出 response.Affc, _ = res.RowsAffected() log.Info("InOrUpUserBases commit success, affected %v rows", response.Affc) break } } } if err != nil { log.Error("InOrUpUserBases failed, err(%v)", err) } return } //InOrUpUserSta 更新用户up主主站画像 func (d *Dao) InOrUpUserSta(c context.Context, mid int64) (response *v1.SyncUserBaseResponse, err error) { var ( retry = 3 try int tx *xsql.Tx res sql.Result ) log.Info("InOrUpUserSta start") response = &v1.SyncUserBaseResponse{Affc: -1} userBase, _ := d.getUserInfo(c, mid) for try = 0; try <= retry; try++ { if tx, err = d.BeginTran(c); err != nil { time.Sleep(time.Duration(try) * time.Second) log.Info("InOrUpUserSta on mid(%v) try begin transaction failed failed ,error(%v)", userBase.Mid, err) continue } if res, err = tx.Exec( _insOrUpUserSta, userBase.Mid, userBase.Name, ); err != nil { fmt.Printf("sql exec error,err(%v)", err) if err = tx.Rollback(); err != nil { log.Info("InOrUpUserSta on mid(%v) rollback failed ,error(%v)", userBase.Mid, err) } else { fmt.Println("rollbacked") } } else { if err = tx.Commit(); err != nil { log.Info("InOrUpUserSta on mid(%v) commit failed , error(%v)", userBase.Mid, err) } else { //提交成功,退出 response.Affc, _ = res.RowsAffected() break } } } if err != nil { log.Error("InOrUpUserSta mid(%v) failed, err(%v)", mid, err) } return } //InOrUpUserStas 批量更新用户状态 func (d *Dao) InOrUpUserStas(c context.Context, mids []int64) (response *v1.SyncUserBaseResponse, err error) { var ( retry = 3 try int tx *xsql.Tx res sql.Result ) log.Info("InOrUpUserStas start") response = &v1.SyncUserBaseResponse{Affc: -1} userBases, _ := d.getUserInfos(c, mids) for try = 0; try <= retry; try++ { if tx, err = d.BeginTran(c); err != nil { time.Sleep(time.Duration(try) * time.Second) log.Warn("InOrUpUserStas try begin transaction failed failed ,error(%v)", err) continue } sql := "INSERT INTO user_statistics_hive (mid, uname) VALUES" for _, userBase := range userBases { sql = sql + "(" + strconv.FormatInt(userBase.Mid, 10) + ",'" + userBase.Name + "')," } sql = sql[0:len(sql)-1] + "ON DUPLICATE KEY UPDATE uname=values(uname)" if res, err = tx.Exec(sql); err != nil { if err = tx.Rollback(); err != nil { log.Warn("InOrUpUserStas try rollback failed ,error(%v)", err) } else { log.Warn("InOrUpUserStas rollbacked") } } else { if err = tx.Commit(); err != nil { log.Warn("InOrUpUserStas try commit failed , error(%v)", err) } else { //提交成功,退出 response.Affc, _ = res.RowsAffected() log.Info("InOrUpUserStas on commit success, affected %v rows", response.Affc) break } } } if err != nil { log.Error("InOrUpUserSta run failed, err(%v)", err) } return } // GetVideoBvcTable 获取bvc分表名 func (d *Dao) getVideoBvcTable(svid int64) string { return fmt.Sprintf("video_bvc_%02d", svid%_BVCSubTableSize) } //RawVideoStatistic get video statistics func (d *Dao) RawVideoStatistic(c context.Context, svids []int64) (res map[int64]*model.SvStInfo, err error) { const maxIDNum = 20 var ( idStr string ) res = make(map[int64]*model.SvStInfo) if len(svids) > maxIDNum { svids = svids[:maxIDNum] } l := len(svids) for k, svid := range svids { if k < l-1 { idStr += strconv.FormatInt(svid, 10) + "," } else { idStr += strconv.FormatInt(svid, 10) } res[svid] = &model.SvStInfo{} } rows, err := d.db.Query(c, fmt.Sprintf(_queryStatisticsList, idStr)) if err != nil { log.Error("query error(%s)", err.Error()) return } defer rows.Close() for rows.Next() { ssv := new(model.SvStInfo) if err = rows.Scan(&ssv.SVID, &ssv.Play, &ssv.Subtitles, &ssv.Like, &ssv.Share, &ssv.Report); err != nil { log.Error("RawVideoStatistic rows.Scan() error(%v)", err) return } res[ssv.SVID] = ssv } cmtCount, _ := d.ReplyCounts(c, svids, DefaultCmType) for id, cmt := range cmtCount { if _, ok := res[id]; ok { res[id].Reply = cmt.Count } } return } // CommitTrans 提交转码 func (d *Dao) CommitTrans(c context.Context, arg *v1.BVideoTransRequset) error { path, ok := d.c.URLs["bvc_push"] if !ok { log.Warnv(c, log.KV("log", "bvc_push url not set")) return ecode.ReqParamErr } data, _ := json.Marshal(arg) b := string(data) req, err := xhttp.NewRequest("POST", path, bytes.NewBuffer(data)) req.Header.Set("Content-Type", "application/json") if err != nil { log.Error("bvc_push url(%s) req(%+v) body(%s) error(%v)", path, req, b, err) return err } var res struct { Code int `json:"code"` Msg string `json:"message"` } if err = d.httpClient.Do(c, req, &res); err != nil { log.Errorv(c, log.KV("log", fmt.Sprintf("bvc_push url(%s) req(%+v) body(%s) ret (%+v) err[%v]", path, req, b, res, err))) return err } log.V(5).Infov(c, log.KV("log", fmt.Sprintf("bvc_push req(%+v) body(%s) ret (%+v)", req, b, res))) if res.Code != 0 { err = ecode.Int(res.Code) log.Errorv(c, log.KV("log", fmt.Sprintf("bvc_push url(%s) req(%+v) body(%s) ret(%+v) error(%v)", path, req, b, res, err))) return err } return nil } //AddOrUpdateBVCInfo 添加或更新BVC转码信息 func (d *Dao) AddOrUpdateBVCInfo(c context.Context, arg *model.VideoBVC) (err error) { err = d.AddBVCInfo(c, arg) if err != nil { if matched, _ := regexp.MatchString("Duplicate entry", err.Error()); matched { err = d.UpdataBVCInfo(c, arg) return } log.Errorv(c, log.KV("log", fmt.Sprintf("dao.db.Exec(AddOrUpdateBVCInfo[%+v]) err(%v)", arg, err)), ) } return } //TxAddOrUpdateBVCInfo 事务添加或更新BVC转码信息 func (d *Dao) TxAddOrUpdateBVCInfo(c context.Context, tx *xsql.Tx, arg *model.VideoBVC) (err error) { err = d.TxAddBVCInfo(tx, arg) if err != nil { if matched, _ := regexp.MatchString("Duplicate entry", err.Error()); matched { err = d.TxUpdataBVCInfo(tx, arg) return } log.Errorv(c, log.KV("log", fmt.Sprintf("dao.db.Exec(AddOrUpdateBVCInfo[%+v]) err(%v)", arg, err)), ) } return } // AddBVCInfo 添加BVC转码信息 func (d *Dao) AddBVCInfo(c context.Context, arg *model.VideoBVC) (err error) { t := d.getVideoBvcTable(arg.SVID) sql := fmt.Sprintf(_addBVCData, t) _, err = d.db.Exec(c, sql, arg.SVID, arg.Path, arg.ResolutionRetio, arg.CodeRate, arg.VideoCode, arg.Duration, arg.FileSize) return } // TxAddBVCInfo 事务添加BVC转码信息 func (d *Dao) TxAddBVCInfo(tx *xsql.Tx, arg *model.VideoBVC) (err error) { t := d.getVideoBvcTable(arg.SVID) sql := fmt.Sprintf(_addBVCData, t) _, err = tx.Exec(sql, arg.SVID, arg.Path, arg.ResolutionRetio, arg.CodeRate, arg.VideoCode, arg.Duration, arg.FileSize) return } // TxUpdataBVCInfo 事务更新BVC转码信息 func (d *Dao) TxUpdataBVCInfo(tx *xsql.Tx, arg *model.VideoBVC) (err error) { t := d.getVideoBvcTable(arg.SVID) sql := fmt.Sprintf(_updateBVCData, t) _, err = tx.Exec(sql, arg.Path, arg.ResolutionRetio, arg.VideoCode, arg.Duration, arg.FileSize, arg.SVID, arg.CodeRate) return } // UpdataBVCInfo 更新BVC转码信息 func (d *Dao) UpdataBVCInfo(c context.Context, arg *model.VideoBVC) (err error) { t := d.getVideoBvcTable(arg.SVID) sql := fmt.Sprintf(_updateBVCData, t) _, err = d.db.Exec(c, sql, arg.Path, arg.ResolutionRetio, arg.VideoCode, arg.Duration, arg.FileSize, arg.SVID, arg.CodeRate) return } // UpdateCmsSvPIC 更新封面图 func (d *Dao) UpdateCmsSvPIC(c context.Context, svid int64, pic *v1.SvPic, st int64) error { _, err := d.cmsdb.Exec(c, _updateSvPIC, pic.PicURL, pic.PicWidth, pic.PicHeight, st, svid) return err } // HostnameRegister . func (d *Dao) HostnameRegister(hostnameIndex int64) (succ bool) { conn := d.redis.Get(context.Background()) defer conn.Close() redisKey := fmt.Sprintf("hostname:index:%d", hostnameIndex) exists, err := redis.Int(conn.Do("EXISTS", redisKey)) if err != nil { log.Errorv(context.Background(), log.KV("event", "fatal"), log.KV("log", fmt.Sprintf("get hostname index from redis fail: key=%s", redisKey))) // 即使redis失败了,也给返回成功 return true } if exists == 1 { return false } // 不去管返回结果,永远返回成功 if _, err = conn.Do("SETEX", redisKey, 1000, 1); err != nil { log.Errorv(context.Background(), log.KV("event", "fatal"), log.KV("log", fmt.Sprintf("get hostname index from redis fail: key=%s", redisKey))) } return true } // AddVideoViews . func (d *Dao) AddVideoViews(c context.Context, svid int64, views int) (affected int64, err error) { row := d.db.QueryRow(c, _existedStatistics, svid) tmp := 0 if err = row.Scan(&tmp); err != nil || tmp == 0 { _, err = d.db.Exec(c, _insertStatistics, svid, 0, 0, 0, 0, 0) if err != nil { return } } result, err := d.db.Exec(c, _addVideoViews, views, svid) if err != nil { return } return result.RowsAffected() } // VideoStateUpdate . func (d *Dao) VideoStateUpdate(c context.Context, svid int64, newState int) (aff int64, err error) { result, err := d.db.Exec(c, _updateVideoState, newState, svid) if err != nil { return } aff, err = result.RowsAffected() return }