bilibili-backup/app/service/bbq/video/dao/video.go

737 lines
22 KiB
Go
Raw Permalink Normal View History

2019-04-22 10:59:20 +08:00
package dao
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"go-common/app/service/bbq/video/api/grpc/v1"
"go-common/app/service/bbq/video/model"
acc "go-common/app/service/main/account/api"
"go-common/library/cache/redis"
xsql "go-common/library/database/sql"
"go-common/library/ecode"
"go-common/library/log"
"go-common/library/net/metadata"
"go-common/library/xstr"
xhttp "net/http"
"regexp"
"strconv"
"strings"
"time"
)
const (
_BVCSubTableSize = 100
_queryVideo = "SELECT svid FROM video WHERE svid = ?"
_addVideo = "INSERT INTO video(`cover_url`,`cover_width`,`cover_height`,`svid`,`title`,`mid`,`avid`,`cid`,`pubtime`,`from`,`tid`,`sub_tid`,`home_img_url`,`home_img_width`,`home_img_height`,`state`) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
_queryTagByName = "SELECT `id` FROM tag WHERE name = ? and type = ?"
_insertTag = "INSERT INTO tag (`name`,`type`,`status`) VALUES %s "
_insOrUpUserBase = "INSERT IGNORE user_base (mid, uname, face, user_type) VALUES (?, ?, ?, ?) "
_insOrUpUserSta = "INSERT IGNORE user_statistics_hive (mid, uname) VALUES (?, ?)"
_queryStatisticsList = "select `svid`, `play`, `subtitles`, `like`, `share`, `report` from video_statistics where svid in (%s)"
_addBVCData = "insert into %s (`svid`,`path`,`resolution_retio`,`code_rate`,`video_code`,`duration`,`file_size`) values (?,?,?,?,?,?,?)"
_updateBVCData = "update %s set path=?, resolution_retio=?, video_code=?, duration=?, file_size=? where svid = ? and code_rate = ?"
_updateSvPIC = "update video_repository set cover_url=?,cover_width=?,cover_height=? ,sync_status = sync_status|? where svid = ?"
_addVideoViews = "update `video_statistics` set `play` = `play` + ? where `svid` = ?"
_existedStatistics = "select `id` from `video_statistics` where `svid` = ?;"
_insertStatistics = "insert into `video_statistics`(`svid`, `play`, `subtitles`, `like`, `share`, `report`) values(?,?,?,?,?,?);"
_queryVideoList = "select `avid`, `cid`, `svid`, `title`, `mid`, `content`, `pubtime`,`duration`,`tid`,`sub_tid`,`cover_url`,`cover_width`,`cover_height`,`limits`, `state` from video where svid in (%s)"
_updateVideoState = "update `video` set `state` = ? where `svid`= ?;"
)
const (
videoBaseCacheExpire = 600
videoBaseCacheKey = "video_base:%d"
)
func keyVideoBase(svid int64) string {
return fmt.Sprintf(videoBaseCacheKey, svid)
}
// ModifyLimits .
func (d *Dao) ModifyLimits(c context.Context, svid int64, limitType uint64, limitOp uint64) (num int64, err error) {
// 根据操作选择合适的limits update语句
limitOpCond := fmt.Sprintf("|%d", 1<<limitType)
if limitOp == 0 {
limitOpCond = fmt.Sprintf("&~%d", 1<<limitType)
}
querySQL := fmt.Sprintf("update video set limits = limits%s where svid = %d", limitOpCond, svid)
res, err := d.db.Exec(c, querySQL)
if err != nil {
log.Warnw(c, "log", "modify video limits fail", "sql", querySQL)
return
}
num, _ = res.RowsAffected()
log.V(1).Infow(c, "sql", querySQL, "affected_num", num)
d.DelCacheVideoBase(c, svid)
return
}
// RawVideoBase mysql获取video_base
func (d *Dao) RawVideoBase(c context.Context, svids []int64) (res map[int64]*v1.VideoBase, err error) {
res = make(map[int64]*v1.VideoBase)
if len(svids) == 0 {
return
}
querySQL := fmt.Sprintf(_queryVideoList, xstr.JoinInts(svids))
rows, err := d.db.Query(c, querySQL)
if err != nil {
log.Errorw(c, "log", "get video base from mysql fail", "sql", querySQL, "err", err)
return
}
defer rows.Close()
log.V(1).Infow(c, "log", "raw get video base from mysql", "sql", querySQL)
for rows.Next() {
sv := new(v1.VideoBase)
if err = rows.Scan(&sv.Avid, &sv.Cid, &sv.Svid, &sv.Title, &sv.Mid, &sv.Content, &sv.Pubtime, &sv.Duration, &sv.Tid, &sv.SubTid, &sv.CoverUrl, &sv.CoverWidth, &sv.CoverHeight, &sv.Limits, &sv.State); err != nil {
log.Error("row.Scan() error(%v)", err)
return
}
res[sv.Svid] = sv
}
if len(svids) > len(res) {
var rspID []int64
for k := range res {
rspID = append(rspID, k)
}
log.Warnw(c, "log", fmt.Sprintf("video req and rsp size not equal: req=%v, rsp=%v", svids, rspID))
}
log.V(1).Infow(c, "req_size", len(svids), "rsp_size", len(res))
return
}
// CacheVideoBase cache video base
func (d *Dao) CacheVideoBase(c context.Context, svids []int64) (res map[int64]*v1.VideoBase, err error) {
res = make(map[int64]*v1.VideoBase)
keys := make([]string, 0, len(svids))
keyMidMap := make(map[int64]bool, len(svids))
for _, svid := range svids {
key := keyVideoBase(svid)
if _, exist := keyMidMap[svid]; !exist {
// duplicate svid
keyMidMap[svid] = true
keys = append(keys, key)
}
}
conn := d.redis.Get(c)
defer conn.Close()
for _, key := range keys {
conn.Send("GET", key)
}
conn.Flush()
var data []byte
for i := 0; i < len(keys); i++ {
if data, err = redis.Bytes(conn.Receive()); err != nil {
if err == redis.ErrNil {
err = nil
} else {
log.Errorv(c, log.KV("event", "redis_get"), log.KV("key", keys[i]))
}
continue
}
baseItem := new(v1.VideoBase)
json.Unmarshal(data, baseItem)
res[baseItem.Svid] = baseItem
}
log.Infov(c, log.KV("event", "redis_get"), log.KV("row_num", len(res)))
return
}
// AddCacheVideoBase 添加缓存
func (d *Dao) AddCacheVideoBase(c context.Context, videoBases map[int64]*v1.VideoBase) (err error) {
keyValueMap := make(map[string][]byte, len(videoBases))
for mid, videoBase := range videoBases {
key := keyVideoBase(mid)
if _, exist := keyValueMap[key]; !exist {
data, _ := json.Marshal(videoBase)
keyValueMap[key] = data
}
}
conn := d.redis.Get(c)
defer conn.Close()
for key, value := range keyValueMap {
conn.Send("SET", key, value, "EX", videoBaseCacheExpire)
}
conn.Flush()
for i := 0; i < len(keyValueMap); i++ {
conn.Receive()
}
log.Infov(c, log.KV("event", "redis_set"), log.KV("row_num", len(videoBases)))
return
}
// DelCacheVideoBase 删除缓存
func (d *Dao) DelCacheVideoBase(c context.Context, svid int64) {
var key = keyVideoBase(svid)
conn := d.redis.Get(c)
defer conn.Close()
conn.Do("DEL", key)
}
// AddOrUpdateVideo 添加或更新视频记录
func (d *Dao) AddOrUpdateVideo(c context.Context, vh *v1.ImportVideoInfo) (err error) {
var (
svid int64
)
tx, err := d.BeginTran(c)
if err != nil {
log.Error("begin transaction err :%v", err)
return
}
defer func() {
if err != nil {
if err = tx.Rollback(); err != nil {
log.Error("tx.Rollback() error(%v)", err)
}
} else {
if err = tx.Commit(); err != nil {
log.Error("tx.Commit() error(%v)", err)
}
}
}()
p := &model.VideoInfo{
CoverURL: vh.CoverUrl,
CoverWidth: vh.CoverWidth,
CoverHeight: vh.CoverHeight,
SVID: vh.Svid,
Title: vh.Title,
MID: vh.MID,
AVID: vh.AVID,
CID: vh.CID,
Pubtime: vh.Pubtime,
From: int16(vh.From),
State: int16(vh.State),
TID: vh.TID,
SubTID: vh.SubTID,
HomeImgURL: vh.HomeImgUrl,
HomeImgWidth: vh.HomeImgWidth,
HomeImgHeight: vh.HomeImgHeight,
}
if err = tx.QueryRow(_queryVideo, vh.Svid).Scan(&svid); err == sql.ErrNoRows {
if err = d.txInsertVideo(c, tx, p); err != nil {
log.Warn("insert video err:%v,svid:%v", err, vh.Svid)
return
}
} else if err != nil {
log.Error("video queryrow scan err:[%v], svid[%v]", err, vh.Svid)
return
}
//sync video_upload_process status
if err = d.txUpdateVideoUploadProcessStatus(c, tx, vh.Svid, model.VideoUploadProcessStatusSuccessed); err != nil {
log.Errorw(c, "event", "d.UpdateVideoUploadProcessStatus err", "err", err)
}
return
}
//UpdateVideoUploadProcessStatus ...
func (d *Dao) txUpdateVideoUploadProcessStatus(ctx context.Context, tx *xsql.Tx, SVID int64, st int64) (err error) {
if _, err = tx.Exec("update video_upload_process set upload_status = ? where svid = ?", st, SVID); err != nil {
log.Errorw(ctx, "errmsg", "UpdateVideoUploadProcessStatus update failed", "err", err)
}
return
}
//txInsertVideo insert video
func (d *Dao) txInsertVideo(c context.Context, tx *xsql.Tx, vh *model.VideoInfo) (err error) {
if _, err = tx.Exec(_addVideo,
vh.CoverURL,
vh.CoverWidth,
vh.CoverHeight,
vh.SVID,
vh.Title,
vh.MID,
vh.AVID,
vh.CID,
vh.Pubtime,
vh.From,
vh.TID,
vh.SubTID,
vh.HomeImgURL,
vh.HomeImgWidth,
vh.HomeImgHeight,
vh.State,
); err != nil {
log.Errorw(c, "event", "insert video err", "err", err, "param", vh)
return
}
return
}
// AddOrUpdateTag 更新或添加标签
func (d *Dao) AddOrUpdateTag(c context.Context, tmap []*v1.TagInfo) (tids []int64, err error) {
// 检查已存在的tag
for _, v := range tmap {
row := d.db.QueryRow(c, _queryTagByName, v.TagName, v.TagType)
t := &model.Tag{
Type: v.TagType,
Name: v.TagName,
}
err = row.Scan(&t.ID)
if err == sql.ErrNoRows {
var q string
var id int64
var res sql.Result
n := strings.Replace(t.Name, "'", "\\'", -1)
q = "('" + n + "'," + strconv.FormatInt(int64(t.Type), 10) + ",1)"
res, _ = d.db.Exec(c, fmt.Sprintf(_insertTag, q))
if res != nil {
id, err = res.LastInsertId()
}
if id != 0 {
tids = append(tids, id)
}
} else if t.ID != 0 {
tids = append(tids, t.ID)
} else {
log.Error("d.db.QueryRow[%v],err:%v", v.TagName, err)
return
}
}
return
}
//根据mids批量查询用户基本信息
func (d *Dao) getUserInfos(c context.Context, mids []int64) (userBases []*model.UserBase, err error) {
midsReq := &acc.MidsReq{
Mids: mids,
RealIp: metadata.String(c, metadata.RemoteIP)}
infosReply, err := d.AccountClient.Infos3(c, midsReq)
if infosReply == nil {
log.Error("query infos3 failed, err (%v)", err)
return
}
userBases = make([]*model.UserBase, 0, 50)
for _, info := range infosReply.Infos {
if info.Mid != 0 {
if len(info.Face) > 255 {
info.Face = "http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png"
log.Info("the value of Face is too long, replace it as http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png, mid(%v)", info.Mid)
}
userBase := &model.UserBase{
Mid: info.Mid,
Name: info.Name,
Sex: info.Sex,
Face: info.Face,
Sign: info.Sign,
Rank: info.Rank,
}
userBases = append(userBases, userBase)
}
}
return
}
//根据mid查询用户基本信息
func (d *Dao) getUserInfo(c context.Context, mid int64) (userBase *model.UserBase, err error) {
midReq := &acc.MidReq{
Mid: mid,
RealIp: metadata.String(c, metadata.RemoteIP)}
info, err := d.AccountClient.Info3(c, midReq)
if err != nil {
log.Error("query info3 failed,mid(%v), err(%v)", mid, err)
return
}
if len(info.Info.Face) > 255 {
info.Info.Face = "http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png"
log.Info("the value of Face is too long, replace it as http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png,,mid(%v)", mid)
}
userBase = &model.UserBase{
Mid: info.Info.Mid,
Name: info.Info.Name,
Sex: info.Info.Sex,
Face: info.Info.Face,
Sign: info.Info.Sign,
Rank: info.Info.Rank,
}
log.Info("getUserInfo userbase (%v)", userBase)
return
}
//InOrUpUserBase 更新用户基本信息
func (d *Dao) InOrUpUserBase(c context.Context, mid int64) (response *v1.SyncUserBaseResponse, err error) {
var (
retry = 3
try int
tx *xsql.Tx
res sql.Result
)
userBase, _ := d.getUserInfo(c, mid)
response = &v1.SyncUserBaseResponse{Affc: -1}
for try = 0; try <= retry; try++ {
if tx, err = d.BeginTran(c); err != nil {
time.Sleep(time.Duration(try) * time.Second)
log.Warn("InOrUpUserBase try begin transaction failed ,err(%v)", err)
continue
}
if res, err = tx.Exec(
_insOrUpUserBase,
userBase.Mid,
userBase.Name,
userBase.Face,
); err != nil {
if err = tx.Rollback(); err != nil {
log.Warn("InOrUpUserBase try rollback failed ,error(%v)", err)
}
} else {
if err = tx.Commit(); err != nil {
log.Warn("InOrUpUserBase try commit failed , error(%v)", err)
} else {
//提交成功,退出
response.Affc, _ = res.RowsAffected()
log.Info("InOrUpUserBase success, affected %v rows", response.Affc)
break
}
}
}
if err != nil {
log.Error("InOrUpUserBase failed, mid(%v), err(%v)", mid, err)
}
return
}
//InOrUpUserBases 批量更新用户基本信息
func (d *Dao) InOrUpUserBases(c context.Context, mids []int64) (response *v1.SyncUserBaseResponse, err error) {
var (
retry = 3
try int
tx *xsql.Tx
res sql.Result
)
userBases, _ := d.getUserInfos(c, mids)
response = &v1.SyncUserBaseResponse{Affc: -1}
for try = 0; try <= retry; try++ {
if tx, err = d.BeginTran(c); err != nil {
time.Sleep(time.Duration(try) * time.Second)
log.Warn("InOrUpUserBases try begin transaction failed failed ,error(%v)", err)
continue
}
sql := "INSERT INTO user_base (mid, uname, face, user_type) VALUES "
for _, userBase := range userBases {
if userBase.Mid != 0 {
sql = sql + "(" + strconv.FormatInt(userBase.Mid, 10) + ",'" + userBase.Name + "','" + userBase.Face + "', 1),"
}
}
if sql == "INSERT INTO user_base (mid, uname, face) VALUES " {
response.Affc = 0
log.Info("InOrUpUserBases param mids are not exist")
return
}
sql = sql[0:len(sql)-1] + " ON DUPLICATE KEY UPDATE uname=values(uname), face=values(face);"
if res, err = tx.Exec(sql); err != nil {
log.Info("InOrUpUserBases sql = (%s)", sql)
if err = tx.Rollback(); err != nil {
log.Warn("InOrUpUserBases try rollback failed ,error(%v)", err)
}
} else {
log.Info("InOrUpUserBases sql = (%s)", sql)
if err = tx.Commit(); err != nil {
log.Warn("InOrUpUserBases try commit failed , error(%v)", err)
} else {
//提交成功,退出
response.Affc, _ = res.RowsAffected()
log.Info("InOrUpUserBases commit success, affected %v rows", response.Affc)
break
}
}
}
if err != nil {
log.Error("InOrUpUserBases failed, err(%v)", err)
}
return
}
//InOrUpUserSta 更新用户up主主站画像
func (d *Dao) InOrUpUserSta(c context.Context, mid int64) (response *v1.SyncUserBaseResponse, err error) {
var (
retry = 3
try int
tx *xsql.Tx
res sql.Result
)
log.Info("InOrUpUserSta start")
response = &v1.SyncUserBaseResponse{Affc: -1}
userBase, _ := d.getUserInfo(c, mid)
for try = 0; try <= retry; try++ {
if tx, err = d.BeginTran(c); err != nil {
time.Sleep(time.Duration(try) * time.Second)
log.Info("InOrUpUserSta on mid(%v) try begin transaction failed failed ,error(%v)", userBase.Mid, err)
continue
}
if res, err = tx.Exec(
_insOrUpUserSta,
userBase.Mid,
userBase.Name,
); err != nil {
fmt.Printf("sql exec error,err(%v)", err)
if err = tx.Rollback(); err != nil {
log.Info("InOrUpUserSta on mid(%v) rollback failed ,error(%v)", userBase.Mid, err)
} else {
fmt.Println("rollbacked")
}
} else {
if err = tx.Commit(); err != nil {
log.Info("InOrUpUserSta on mid(%v) commit failed , error(%v)", userBase.Mid, err)
} else {
//提交成功,退出
response.Affc, _ = res.RowsAffected()
break
}
}
}
if err != nil {
log.Error("InOrUpUserSta mid(%v) failed, err(%v)", mid, err)
}
return
}
//InOrUpUserStas 批量更新用户状态
func (d *Dao) InOrUpUserStas(c context.Context, mids []int64) (response *v1.SyncUserBaseResponse, err error) {
var (
retry = 3
try int
tx *xsql.Tx
res sql.Result
)
log.Info("InOrUpUserStas start")
response = &v1.SyncUserBaseResponse{Affc: -1}
userBases, _ := d.getUserInfos(c, mids)
for try = 0; try <= retry; try++ {
if tx, err = d.BeginTran(c); err != nil {
time.Sleep(time.Duration(try) * time.Second)
log.Warn("InOrUpUserStas try begin transaction failed failed ,error(%v)", err)
continue
}
sql := "INSERT INTO user_statistics_hive (mid, uname) VALUES"
for _, userBase := range userBases {
sql = sql + "(" + strconv.FormatInt(userBase.Mid, 10) + ",'" + userBase.Name + "'),"
}
sql = sql[0:len(sql)-1] + "ON DUPLICATE KEY UPDATE uname=values(uname)"
if res, err = tx.Exec(sql); err != nil {
if err = tx.Rollback(); err != nil {
log.Warn("InOrUpUserStas try rollback failed ,error(%v)", err)
} else {
log.Warn("InOrUpUserStas rollbacked")
}
} else {
if err = tx.Commit(); err != nil {
log.Warn("InOrUpUserStas try commit failed , error(%v)", err)
} else {
//提交成功,退出
response.Affc, _ = res.RowsAffected()
log.Info("InOrUpUserStas on commit success, affected %v rows", response.Affc)
break
}
}
}
if err != nil {
log.Error("InOrUpUserSta run failed, err(%v)", err)
}
return
}
// GetVideoBvcTable 获取bvc分表名
func (d *Dao) getVideoBvcTable(svid int64) string {
return fmt.Sprintf("video_bvc_%02d", svid%_BVCSubTableSize)
}
//RawVideoStatistic get video statistics
func (d *Dao) RawVideoStatistic(c context.Context, svids []int64) (res map[int64]*model.SvStInfo, err error) {
const maxIDNum = 20
var (
idStr string
)
res = make(map[int64]*model.SvStInfo)
if len(svids) > maxIDNum {
svids = svids[:maxIDNum]
}
l := len(svids)
for k, svid := range svids {
if k < l-1 {
idStr += strconv.FormatInt(svid, 10) + ","
} else {
idStr += strconv.FormatInt(svid, 10)
}
res[svid] = &model.SvStInfo{}
}
rows, err := d.db.Query(c, fmt.Sprintf(_queryStatisticsList, idStr))
if err != nil {
log.Error("query error(%s)", err.Error())
return
}
defer rows.Close()
for rows.Next() {
ssv := new(model.SvStInfo)
if err = rows.Scan(&ssv.SVID, &ssv.Play, &ssv.Subtitles, &ssv.Like, &ssv.Share, &ssv.Report); err != nil {
log.Error("RawVideoStatistic rows.Scan() error(%v)", err)
return
}
res[ssv.SVID] = ssv
}
cmtCount, _ := d.ReplyCounts(c, svids, DefaultCmType)
for id, cmt := range cmtCount {
if _, ok := res[id]; ok {
res[id].Reply = cmt.Count
}
}
return
}
// CommitTrans 提交转码
func (d *Dao) CommitTrans(c context.Context, arg *v1.BVideoTransRequset) error {
path, ok := d.c.URLs["bvc_push"]
if !ok {
log.Warnv(c, log.KV("log", "bvc_push url not set"))
return ecode.ReqParamErr
}
data, _ := json.Marshal(arg)
b := string(data)
req, err := xhttp.NewRequest("POST", path, bytes.NewBuffer(data))
req.Header.Set("Content-Type", "application/json")
if err != nil {
log.Error("bvc_push url(%s) req(%+v) body(%s) error(%v)", path, req, b, err)
return err
}
var res struct {
Code int `json:"code"`
Msg string `json:"message"`
}
if err = d.httpClient.Do(c, req, &res); err != nil {
log.Errorv(c, log.KV("log", fmt.Sprintf("bvc_push url(%s) req(%+v) body(%s) ret (%+v) err[%v]", path, req, b, res, err)))
return err
}
log.V(5).Infov(c, log.KV("log", fmt.Sprintf("bvc_push req(%+v) body(%s) ret (%+v)", req, b, res)))
if res.Code != 0 {
err = ecode.Int(res.Code)
log.Errorv(c, log.KV("log", fmt.Sprintf("bvc_push url(%s) req(%+v) body(%s) ret(%+v) error(%v)", path, req, b, res, err)))
return err
}
return nil
}
//AddOrUpdateBVCInfo 添加或更新BVC转码信息
func (d *Dao) AddOrUpdateBVCInfo(c context.Context, arg *model.VideoBVC) (err error) {
err = d.AddBVCInfo(c, arg)
if err != nil {
if matched, _ := regexp.MatchString("Duplicate entry", err.Error()); matched {
err = d.UpdataBVCInfo(c, arg)
return
}
log.Errorv(c,
log.KV("log", fmt.Sprintf("dao.db.Exec(AddOrUpdateBVCInfo[%+v]) err(%v)", arg, err)),
)
}
return
}
//TxAddOrUpdateBVCInfo 事务添加或更新BVC转码信息
func (d *Dao) TxAddOrUpdateBVCInfo(c context.Context, tx *xsql.Tx, arg *model.VideoBVC) (err error) {
err = d.TxAddBVCInfo(tx, arg)
if err != nil {
if matched, _ := regexp.MatchString("Duplicate entry", err.Error()); matched {
err = d.TxUpdataBVCInfo(tx, arg)
return
}
log.Errorv(c,
log.KV("log", fmt.Sprintf("dao.db.Exec(AddOrUpdateBVCInfo[%+v]) err(%v)", arg, err)),
)
}
return
}
// AddBVCInfo 添加BVC转码信息
func (d *Dao) AddBVCInfo(c context.Context, arg *model.VideoBVC) (err error) {
t := d.getVideoBvcTable(arg.SVID)
sql := fmt.Sprintf(_addBVCData, t)
_, err = d.db.Exec(c, sql, arg.SVID, arg.Path, arg.ResolutionRetio, arg.CodeRate, arg.VideoCode, arg.Duration, arg.FileSize)
return
}
// TxAddBVCInfo 事务添加BVC转码信息
func (d *Dao) TxAddBVCInfo(tx *xsql.Tx, arg *model.VideoBVC) (err error) {
t := d.getVideoBvcTable(arg.SVID)
sql := fmt.Sprintf(_addBVCData, t)
_, err = tx.Exec(sql, arg.SVID, arg.Path, arg.ResolutionRetio, arg.CodeRate, arg.VideoCode, arg.Duration, arg.FileSize)
return
}
// TxUpdataBVCInfo 事务更新BVC转码信息
func (d *Dao) TxUpdataBVCInfo(tx *xsql.Tx, arg *model.VideoBVC) (err error) {
t := d.getVideoBvcTable(arg.SVID)
sql := fmt.Sprintf(_updateBVCData, t)
_, err = tx.Exec(sql, arg.Path, arg.ResolutionRetio, arg.VideoCode, arg.Duration, arg.FileSize, arg.SVID, arg.CodeRate)
return
}
// UpdataBVCInfo 更新BVC转码信息
func (d *Dao) UpdataBVCInfo(c context.Context, arg *model.VideoBVC) (err error) {
t := d.getVideoBvcTable(arg.SVID)
sql := fmt.Sprintf(_updateBVCData, t)
_, err = d.db.Exec(c, sql, arg.Path, arg.ResolutionRetio, arg.VideoCode, arg.Duration, arg.FileSize, arg.SVID, arg.CodeRate)
return
}
// UpdateCmsSvPIC 更新封面图
func (d *Dao) UpdateCmsSvPIC(c context.Context, svid int64, pic *v1.SvPic, st int64) error {
_, err := d.cmsdb.Exec(c, _updateSvPIC, pic.PicURL, pic.PicWidth, pic.PicHeight, st, svid)
return err
}
// HostnameRegister .
func (d *Dao) HostnameRegister(hostnameIndex int64) (succ bool) {
conn := d.redis.Get(context.Background())
defer conn.Close()
redisKey := fmt.Sprintf("hostname:index:%d", hostnameIndex)
exists, err := redis.Int(conn.Do("EXISTS", redisKey))
if err != nil {
log.Errorv(context.Background(), log.KV("event", "fatal"), log.KV("log", fmt.Sprintf("get hostname index from redis fail: key=%s", redisKey)))
// 即使redis失败了也给返回成功
return true
}
if exists == 1 {
return false
}
// 不去管返回结果,永远返回成功
if _, err = conn.Do("SETEX", redisKey, 1000, 1); err != nil {
log.Errorv(context.Background(), log.KV("event", "fatal"), log.KV("log", fmt.Sprintf("get hostname index from redis fail: key=%s", redisKey)))
}
return true
}
// AddVideoViews .
func (d *Dao) AddVideoViews(c context.Context, svid int64, views int) (affected int64, err error) {
row := d.db.QueryRow(c, _existedStatistics, svid)
tmp := 0
if err = row.Scan(&tmp); err != nil || tmp == 0 {
_, err = d.db.Exec(c, _insertStatistics, svid, 0, 0, 0, 0, 0)
if err != nil {
return
}
}
result, err := d.db.Exec(c, _addVideoViews, views, svid)
if err != nil {
return
}
return result.RowsAffected()
}
// VideoStateUpdate .
func (d *Dao) VideoStateUpdate(c context.Context, svid int64, newState int) (aff int64, err error) {
result, err := d.db.Exec(c, _updateVideoState, newState, svid)
if err != nil {
return
}
aff, err = result.RowsAffected()
return
}