bilibili-backup/app/service/bbq/video/dao/video.go
2019-04-22 02:59:20 +00:00

737 lines
22 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dao
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"go-common/app/service/bbq/video/api/grpc/v1"
"go-common/app/service/bbq/video/model"
acc "go-common/app/service/main/account/api"
"go-common/library/cache/redis"
xsql "go-common/library/database/sql"
"go-common/library/ecode"
"go-common/library/log"
"go-common/library/net/metadata"
"go-common/library/xstr"
xhttp "net/http"
"regexp"
"strconv"
"strings"
"time"
)
const (
_BVCSubTableSize = 100
_queryVideo = "SELECT svid FROM video WHERE svid = ?"
_addVideo = "INSERT INTO video(`cover_url`,`cover_width`,`cover_height`,`svid`,`title`,`mid`,`avid`,`cid`,`pubtime`,`from`,`tid`,`sub_tid`,`home_img_url`,`home_img_width`,`home_img_height`,`state`) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
_queryTagByName = "SELECT `id` FROM tag WHERE name = ? and type = ?"
_insertTag = "INSERT INTO tag (`name`,`type`,`status`) VALUES %s "
_insOrUpUserBase = "INSERT IGNORE user_base (mid, uname, face, user_type) VALUES (?, ?, ?, ?) "
_insOrUpUserSta = "INSERT IGNORE user_statistics_hive (mid, uname) VALUES (?, ?)"
_queryStatisticsList = "select `svid`, `play`, `subtitles`, `like`, `share`, `report` from video_statistics where svid in (%s)"
_addBVCData = "insert into %s (`svid`,`path`,`resolution_retio`,`code_rate`,`video_code`,`duration`,`file_size`) values (?,?,?,?,?,?,?)"
_updateBVCData = "update %s set path=?, resolution_retio=?, video_code=?, duration=?, file_size=? where svid = ? and code_rate = ?"
_updateSvPIC = "update video_repository set cover_url=?,cover_width=?,cover_height=? ,sync_status = sync_status|? where svid = ?"
_addVideoViews = "update `video_statistics` set `play` = `play` + ? where `svid` = ?"
_existedStatistics = "select `id` from `video_statistics` where `svid` = ?;"
_insertStatistics = "insert into `video_statistics`(`svid`, `play`, `subtitles`, `like`, `share`, `report`) values(?,?,?,?,?,?);"
_queryVideoList = "select `avid`, `cid`, `svid`, `title`, `mid`, `content`, `pubtime`,`duration`,`tid`,`sub_tid`,`cover_url`,`cover_width`,`cover_height`,`limits`, `state` from video where svid in (%s)"
_updateVideoState = "update `video` set `state` = ? where `svid`= ?;"
)
const (
videoBaseCacheExpire = 600
videoBaseCacheKey = "video_base:%d"
)
func keyVideoBase(svid int64) string {
return fmt.Sprintf(videoBaseCacheKey, svid)
}
// ModifyLimits .
func (d *Dao) ModifyLimits(c context.Context, svid int64, limitType uint64, limitOp uint64) (num int64, err error) {
// 根据操作选择合适的limits update语句
limitOpCond := fmt.Sprintf("|%d", 1<<limitType)
if limitOp == 0 {
limitOpCond = fmt.Sprintf("&~%d", 1<<limitType)
}
querySQL := fmt.Sprintf("update video set limits = limits%s where svid = %d", limitOpCond, svid)
res, err := d.db.Exec(c, querySQL)
if err != nil {
log.Warnw(c, "log", "modify video limits fail", "sql", querySQL)
return
}
num, _ = res.RowsAffected()
log.V(1).Infow(c, "sql", querySQL, "affected_num", num)
d.DelCacheVideoBase(c, svid)
return
}
// RawVideoBase mysql获取video_base
func (d *Dao) RawVideoBase(c context.Context, svids []int64) (res map[int64]*v1.VideoBase, err error) {
res = make(map[int64]*v1.VideoBase)
if len(svids) == 0 {
return
}
querySQL := fmt.Sprintf(_queryVideoList, xstr.JoinInts(svids))
rows, err := d.db.Query(c, querySQL)
if err != nil {
log.Errorw(c, "log", "get video base from mysql fail", "sql", querySQL, "err", err)
return
}
defer rows.Close()
log.V(1).Infow(c, "log", "raw get video base from mysql", "sql", querySQL)
for rows.Next() {
sv := new(v1.VideoBase)
if err = rows.Scan(&sv.Avid, &sv.Cid, &sv.Svid, &sv.Title, &sv.Mid, &sv.Content, &sv.Pubtime, &sv.Duration, &sv.Tid, &sv.SubTid, &sv.CoverUrl, &sv.CoverWidth, &sv.CoverHeight, &sv.Limits, &sv.State); err != nil {
log.Error("row.Scan() error(%v)", err)
return
}
res[sv.Svid] = sv
}
if len(svids) > len(res) {
var rspID []int64
for k := range res {
rspID = append(rspID, k)
}
log.Warnw(c, "log", fmt.Sprintf("video req and rsp size not equal: req=%v, rsp=%v", svids, rspID))
}
log.V(1).Infow(c, "req_size", len(svids), "rsp_size", len(res))
return
}
// CacheVideoBase cache video base
func (d *Dao) CacheVideoBase(c context.Context, svids []int64) (res map[int64]*v1.VideoBase, err error) {
res = make(map[int64]*v1.VideoBase)
keys := make([]string, 0, len(svids))
keyMidMap := make(map[int64]bool, len(svids))
for _, svid := range svids {
key := keyVideoBase(svid)
if _, exist := keyMidMap[svid]; !exist {
// duplicate svid
keyMidMap[svid] = true
keys = append(keys, key)
}
}
conn := d.redis.Get(c)
defer conn.Close()
for _, key := range keys {
conn.Send("GET", key)
}
conn.Flush()
var data []byte
for i := 0; i < len(keys); i++ {
if data, err = redis.Bytes(conn.Receive()); err != nil {
if err == redis.ErrNil {
err = nil
} else {
log.Errorv(c, log.KV("event", "redis_get"), log.KV("key", keys[i]))
}
continue
}
baseItem := new(v1.VideoBase)
json.Unmarshal(data, baseItem)
res[baseItem.Svid] = baseItem
}
log.Infov(c, log.KV("event", "redis_get"), log.KV("row_num", len(res)))
return
}
// AddCacheVideoBase 添加缓存
func (d *Dao) AddCacheVideoBase(c context.Context, videoBases map[int64]*v1.VideoBase) (err error) {
keyValueMap := make(map[string][]byte, len(videoBases))
for mid, videoBase := range videoBases {
key := keyVideoBase(mid)
if _, exist := keyValueMap[key]; !exist {
data, _ := json.Marshal(videoBase)
keyValueMap[key] = data
}
}
conn := d.redis.Get(c)
defer conn.Close()
for key, value := range keyValueMap {
conn.Send("SET", key, value, "EX", videoBaseCacheExpire)
}
conn.Flush()
for i := 0; i < len(keyValueMap); i++ {
conn.Receive()
}
log.Infov(c, log.KV("event", "redis_set"), log.KV("row_num", len(videoBases)))
return
}
// DelCacheVideoBase 删除缓存
func (d *Dao) DelCacheVideoBase(c context.Context, svid int64) {
var key = keyVideoBase(svid)
conn := d.redis.Get(c)
defer conn.Close()
conn.Do("DEL", key)
}
// AddOrUpdateVideo 添加或更新视频记录
func (d *Dao) AddOrUpdateVideo(c context.Context, vh *v1.ImportVideoInfo) (err error) {
var (
svid int64
)
tx, err := d.BeginTran(c)
if err != nil {
log.Error("begin transaction err :%v", err)
return
}
defer func() {
if err != nil {
if err = tx.Rollback(); err != nil {
log.Error("tx.Rollback() error(%v)", err)
}
} else {
if err = tx.Commit(); err != nil {
log.Error("tx.Commit() error(%v)", err)
}
}
}()
p := &model.VideoInfo{
CoverURL: vh.CoverUrl,
CoverWidth: vh.CoverWidth,
CoverHeight: vh.CoverHeight,
SVID: vh.Svid,
Title: vh.Title,
MID: vh.MID,
AVID: vh.AVID,
CID: vh.CID,
Pubtime: vh.Pubtime,
From: int16(vh.From),
State: int16(vh.State),
TID: vh.TID,
SubTID: vh.SubTID,
HomeImgURL: vh.HomeImgUrl,
HomeImgWidth: vh.HomeImgWidth,
HomeImgHeight: vh.HomeImgHeight,
}
if err = tx.QueryRow(_queryVideo, vh.Svid).Scan(&svid); err == sql.ErrNoRows {
if err = d.txInsertVideo(c, tx, p); err != nil {
log.Warn("insert video err:%v,svid:%v", err, vh.Svid)
return
}
} else if err != nil {
log.Error("video queryrow scan err:[%v], svid[%v]", err, vh.Svid)
return
}
//sync video_upload_process status
if err = d.txUpdateVideoUploadProcessStatus(c, tx, vh.Svid, model.VideoUploadProcessStatusSuccessed); err != nil {
log.Errorw(c, "event", "d.UpdateVideoUploadProcessStatus err", "err", err)
}
return
}
//UpdateVideoUploadProcessStatus ...
func (d *Dao) txUpdateVideoUploadProcessStatus(ctx context.Context, tx *xsql.Tx, SVID int64, st int64) (err error) {
if _, err = tx.Exec("update video_upload_process set upload_status = ? where svid = ?", st, SVID); err != nil {
log.Errorw(ctx, "errmsg", "UpdateVideoUploadProcessStatus update failed", "err", err)
}
return
}
//txInsertVideo insert video
func (d *Dao) txInsertVideo(c context.Context, tx *xsql.Tx, vh *model.VideoInfo) (err error) {
if _, err = tx.Exec(_addVideo,
vh.CoverURL,
vh.CoverWidth,
vh.CoverHeight,
vh.SVID,
vh.Title,
vh.MID,
vh.AVID,
vh.CID,
vh.Pubtime,
vh.From,
vh.TID,
vh.SubTID,
vh.HomeImgURL,
vh.HomeImgWidth,
vh.HomeImgHeight,
vh.State,
); err != nil {
log.Errorw(c, "event", "insert video err", "err", err, "param", vh)
return
}
return
}
// AddOrUpdateTag 更新或添加标签
func (d *Dao) AddOrUpdateTag(c context.Context, tmap []*v1.TagInfo) (tids []int64, err error) {
// 检查已存在的tag
for _, v := range tmap {
row := d.db.QueryRow(c, _queryTagByName, v.TagName, v.TagType)
t := &model.Tag{
Type: v.TagType,
Name: v.TagName,
}
err = row.Scan(&t.ID)
if err == sql.ErrNoRows {
var q string
var id int64
var res sql.Result
n := strings.Replace(t.Name, "'", "\\'", -1)
q = "('" + n + "'," + strconv.FormatInt(int64(t.Type), 10) + ",1)"
res, _ = d.db.Exec(c, fmt.Sprintf(_insertTag, q))
if res != nil {
id, err = res.LastInsertId()
}
if id != 0 {
tids = append(tids, id)
}
} else if t.ID != 0 {
tids = append(tids, t.ID)
} else {
log.Error("d.db.QueryRow[%v],err:%v", v.TagName, err)
return
}
}
return
}
//根据mids批量查询用户基本信息
func (d *Dao) getUserInfos(c context.Context, mids []int64) (userBases []*model.UserBase, err error) {
midsReq := &acc.MidsReq{
Mids: mids,
RealIp: metadata.String(c, metadata.RemoteIP)}
infosReply, err := d.AccountClient.Infos3(c, midsReq)
if infosReply == nil {
log.Error("query infos3 failed, err (%v)", err)
return
}
userBases = make([]*model.UserBase, 0, 50)
for _, info := range infosReply.Infos {
if info.Mid != 0 {
if len(info.Face) > 255 {
info.Face = "http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png"
log.Info("the value of Face is too long, replace it as http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png, mid(%v)", info.Mid)
}
userBase := &model.UserBase{
Mid: info.Mid,
Name: info.Name,
Sex: info.Sex,
Face: info.Face,
Sign: info.Sign,
Rank: info.Rank,
}
userBases = append(userBases, userBase)
}
}
return
}
//根据mid查询用户基本信息
func (d *Dao) getUserInfo(c context.Context, mid int64) (userBase *model.UserBase, err error) {
midReq := &acc.MidReq{
Mid: mid,
RealIp: metadata.String(c, metadata.RemoteIP)}
info, err := d.AccountClient.Info3(c, midReq)
if err != nil {
log.Error("query info3 failed,mid(%v), err(%v)", mid, err)
return
}
if len(info.Info.Face) > 255 {
info.Info.Face = "http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png"
log.Info("the value of Face is too long, replace it as http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png,,mid(%v)", mid)
}
userBase = &model.UserBase{
Mid: info.Info.Mid,
Name: info.Info.Name,
Sex: info.Info.Sex,
Face: info.Info.Face,
Sign: info.Info.Sign,
Rank: info.Info.Rank,
}
log.Info("getUserInfo userbase (%v)", userBase)
return
}
//InOrUpUserBase 更新用户基本信息
func (d *Dao) InOrUpUserBase(c context.Context, mid int64) (response *v1.SyncUserBaseResponse, err error) {
var (
retry = 3
try int
tx *xsql.Tx
res sql.Result
)
userBase, _ := d.getUserInfo(c, mid)
response = &v1.SyncUserBaseResponse{Affc: -1}
for try = 0; try <= retry; try++ {
if tx, err = d.BeginTran(c); err != nil {
time.Sleep(time.Duration(try) * time.Second)
log.Warn("InOrUpUserBase try begin transaction failed ,err(%v)", err)
continue
}
if res, err = tx.Exec(
_insOrUpUserBase,
userBase.Mid,
userBase.Name,
userBase.Face,
); err != nil {
if err = tx.Rollback(); err != nil {
log.Warn("InOrUpUserBase try rollback failed ,error(%v)", err)
}
} else {
if err = tx.Commit(); err != nil {
log.Warn("InOrUpUserBase try commit failed , error(%v)", err)
} else {
//提交成功,退出
response.Affc, _ = res.RowsAffected()
log.Info("InOrUpUserBase success, affected %v rows", response.Affc)
break
}
}
}
if err != nil {
log.Error("InOrUpUserBase failed, mid(%v), err(%v)", mid, err)
}
return
}
//InOrUpUserBases 批量更新用户基本信息
func (d *Dao) InOrUpUserBases(c context.Context, mids []int64) (response *v1.SyncUserBaseResponse, err error) {
var (
retry = 3
try int
tx *xsql.Tx
res sql.Result
)
userBases, _ := d.getUserInfos(c, mids)
response = &v1.SyncUserBaseResponse{Affc: -1}
for try = 0; try <= retry; try++ {
if tx, err = d.BeginTran(c); err != nil {
time.Sleep(time.Duration(try) * time.Second)
log.Warn("InOrUpUserBases try begin transaction failed failed ,error(%v)", err)
continue
}
sql := "INSERT INTO user_base (mid, uname, face, user_type) VALUES "
for _, userBase := range userBases {
if userBase.Mid != 0 {
sql = sql + "(" + strconv.FormatInt(userBase.Mid, 10) + ",'" + userBase.Name + "','" + userBase.Face + "', 1),"
}
}
if sql == "INSERT INTO user_base (mid, uname, face) VALUES " {
response.Affc = 0
log.Info("InOrUpUserBases param mids are not exist")
return
}
sql = sql[0:len(sql)-1] + " ON DUPLICATE KEY UPDATE uname=values(uname), face=values(face);"
if res, err = tx.Exec(sql); err != nil {
log.Info("InOrUpUserBases sql = (%s)", sql)
if err = tx.Rollback(); err != nil {
log.Warn("InOrUpUserBases try rollback failed ,error(%v)", err)
}
} else {
log.Info("InOrUpUserBases sql = (%s)", sql)
if err = tx.Commit(); err != nil {
log.Warn("InOrUpUserBases try commit failed , error(%v)", err)
} else {
//提交成功,退出
response.Affc, _ = res.RowsAffected()
log.Info("InOrUpUserBases commit success, affected %v rows", response.Affc)
break
}
}
}
if err != nil {
log.Error("InOrUpUserBases failed, err(%v)", err)
}
return
}
//InOrUpUserSta 更新用户up主主站画像
func (d *Dao) InOrUpUserSta(c context.Context, mid int64) (response *v1.SyncUserBaseResponse, err error) {
var (
retry = 3
try int
tx *xsql.Tx
res sql.Result
)
log.Info("InOrUpUserSta start")
response = &v1.SyncUserBaseResponse{Affc: -1}
userBase, _ := d.getUserInfo(c, mid)
for try = 0; try <= retry; try++ {
if tx, err = d.BeginTran(c); err != nil {
time.Sleep(time.Duration(try) * time.Second)
log.Info("InOrUpUserSta on mid(%v) try begin transaction failed failed ,error(%v)", userBase.Mid, err)
continue
}
if res, err = tx.Exec(
_insOrUpUserSta,
userBase.Mid,
userBase.Name,
); err != nil {
fmt.Printf("sql exec error,err(%v)", err)
if err = tx.Rollback(); err != nil {
log.Info("InOrUpUserSta on mid(%v) rollback failed ,error(%v)", userBase.Mid, err)
} else {
fmt.Println("rollbacked")
}
} else {
if err = tx.Commit(); err != nil {
log.Info("InOrUpUserSta on mid(%v) commit failed , error(%v)", userBase.Mid, err)
} else {
//提交成功,退出
response.Affc, _ = res.RowsAffected()
break
}
}
}
if err != nil {
log.Error("InOrUpUserSta mid(%v) failed, err(%v)", mid, err)
}
return
}
//InOrUpUserStas 批量更新用户状态
func (d *Dao) InOrUpUserStas(c context.Context, mids []int64) (response *v1.SyncUserBaseResponse, err error) {
var (
retry = 3
try int
tx *xsql.Tx
res sql.Result
)
log.Info("InOrUpUserStas start")
response = &v1.SyncUserBaseResponse{Affc: -1}
userBases, _ := d.getUserInfos(c, mids)
for try = 0; try <= retry; try++ {
if tx, err = d.BeginTran(c); err != nil {
time.Sleep(time.Duration(try) * time.Second)
log.Warn("InOrUpUserStas try begin transaction failed failed ,error(%v)", err)
continue
}
sql := "INSERT INTO user_statistics_hive (mid, uname) VALUES"
for _, userBase := range userBases {
sql = sql + "(" + strconv.FormatInt(userBase.Mid, 10) + ",'" + userBase.Name + "'),"
}
sql = sql[0:len(sql)-1] + "ON DUPLICATE KEY UPDATE uname=values(uname)"
if res, err = tx.Exec(sql); err != nil {
if err = tx.Rollback(); err != nil {
log.Warn("InOrUpUserStas try rollback failed ,error(%v)", err)
} else {
log.Warn("InOrUpUserStas rollbacked")
}
} else {
if err = tx.Commit(); err != nil {
log.Warn("InOrUpUserStas try commit failed , error(%v)", err)
} else {
//提交成功,退出
response.Affc, _ = res.RowsAffected()
log.Info("InOrUpUserStas on commit success, affected %v rows", response.Affc)
break
}
}
}
if err != nil {
log.Error("InOrUpUserSta run failed, err(%v)", err)
}
return
}
// GetVideoBvcTable 获取bvc分表名
func (d *Dao) getVideoBvcTable(svid int64) string {
return fmt.Sprintf("video_bvc_%02d", svid%_BVCSubTableSize)
}
//RawVideoStatistic get video statistics
func (d *Dao) RawVideoStatistic(c context.Context, svids []int64) (res map[int64]*model.SvStInfo, err error) {
const maxIDNum = 20
var (
idStr string
)
res = make(map[int64]*model.SvStInfo)
if len(svids) > maxIDNum {
svids = svids[:maxIDNum]
}
l := len(svids)
for k, svid := range svids {
if k < l-1 {
idStr += strconv.FormatInt(svid, 10) + ","
} else {
idStr += strconv.FormatInt(svid, 10)
}
res[svid] = &model.SvStInfo{}
}
rows, err := d.db.Query(c, fmt.Sprintf(_queryStatisticsList, idStr))
if err != nil {
log.Error("query error(%s)", err.Error())
return
}
defer rows.Close()
for rows.Next() {
ssv := new(model.SvStInfo)
if err = rows.Scan(&ssv.SVID, &ssv.Play, &ssv.Subtitles, &ssv.Like, &ssv.Share, &ssv.Report); err != nil {
log.Error("RawVideoStatistic rows.Scan() error(%v)", err)
return
}
res[ssv.SVID] = ssv
}
cmtCount, _ := d.ReplyCounts(c, svids, DefaultCmType)
for id, cmt := range cmtCount {
if _, ok := res[id]; ok {
res[id].Reply = cmt.Count
}
}
return
}
// CommitTrans 提交转码
func (d *Dao) CommitTrans(c context.Context, arg *v1.BVideoTransRequset) error {
path, ok := d.c.URLs["bvc_push"]
if !ok {
log.Warnv(c, log.KV("log", "bvc_push url not set"))
return ecode.ReqParamErr
}
data, _ := json.Marshal(arg)
b := string(data)
req, err := xhttp.NewRequest("POST", path, bytes.NewBuffer(data))
req.Header.Set("Content-Type", "application/json")
if err != nil {
log.Error("bvc_push url(%s) req(%+v) body(%s) error(%v)", path, req, b, err)
return err
}
var res struct {
Code int `json:"code"`
Msg string `json:"message"`
}
if err = d.httpClient.Do(c, req, &res); err != nil {
log.Errorv(c, log.KV("log", fmt.Sprintf("bvc_push url(%s) req(%+v) body(%s) ret (%+v) err[%v]", path, req, b, res, err)))
return err
}
log.V(5).Infov(c, log.KV("log", fmt.Sprintf("bvc_push req(%+v) body(%s) ret (%+v)", req, b, res)))
if res.Code != 0 {
err = ecode.Int(res.Code)
log.Errorv(c, log.KV("log", fmt.Sprintf("bvc_push url(%s) req(%+v) body(%s) ret(%+v) error(%v)", path, req, b, res, err)))
return err
}
return nil
}
//AddOrUpdateBVCInfo 添加或更新BVC转码信息
func (d *Dao) AddOrUpdateBVCInfo(c context.Context, arg *model.VideoBVC) (err error) {
err = d.AddBVCInfo(c, arg)
if err != nil {
if matched, _ := regexp.MatchString("Duplicate entry", err.Error()); matched {
err = d.UpdataBVCInfo(c, arg)
return
}
log.Errorv(c,
log.KV("log", fmt.Sprintf("dao.db.Exec(AddOrUpdateBVCInfo[%+v]) err(%v)", arg, err)),
)
}
return
}
//TxAddOrUpdateBVCInfo 事务添加或更新BVC转码信息
func (d *Dao) TxAddOrUpdateBVCInfo(c context.Context, tx *xsql.Tx, arg *model.VideoBVC) (err error) {
err = d.TxAddBVCInfo(tx, arg)
if err != nil {
if matched, _ := regexp.MatchString("Duplicate entry", err.Error()); matched {
err = d.TxUpdataBVCInfo(tx, arg)
return
}
log.Errorv(c,
log.KV("log", fmt.Sprintf("dao.db.Exec(AddOrUpdateBVCInfo[%+v]) err(%v)", arg, err)),
)
}
return
}
// AddBVCInfo 添加BVC转码信息
func (d *Dao) AddBVCInfo(c context.Context, arg *model.VideoBVC) (err error) {
t := d.getVideoBvcTable(arg.SVID)
sql := fmt.Sprintf(_addBVCData, t)
_, err = d.db.Exec(c, sql, arg.SVID, arg.Path, arg.ResolutionRetio, arg.CodeRate, arg.VideoCode, arg.Duration, arg.FileSize)
return
}
// TxAddBVCInfo 事务添加BVC转码信息
func (d *Dao) TxAddBVCInfo(tx *xsql.Tx, arg *model.VideoBVC) (err error) {
t := d.getVideoBvcTable(arg.SVID)
sql := fmt.Sprintf(_addBVCData, t)
_, err = tx.Exec(sql, arg.SVID, arg.Path, arg.ResolutionRetio, arg.CodeRate, arg.VideoCode, arg.Duration, arg.FileSize)
return
}
// TxUpdataBVCInfo 事务更新BVC转码信息
func (d *Dao) TxUpdataBVCInfo(tx *xsql.Tx, arg *model.VideoBVC) (err error) {
t := d.getVideoBvcTable(arg.SVID)
sql := fmt.Sprintf(_updateBVCData, t)
_, err = tx.Exec(sql, arg.Path, arg.ResolutionRetio, arg.VideoCode, arg.Duration, arg.FileSize, arg.SVID, arg.CodeRate)
return
}
// UpdataBVCInfo 更新BVC转码信息
func (d *Dao) UpdataBVCInfo(c context.Context, arg *model.VideoBVC) (err error) {
t := d.getVideoBvcTable(arg.SVID)
sql := fmt.Sprintf(_updateBVCData, t)
_, err = d.db.Exec(c, sql, arg.Path, arg.ResolutionRetio, arg.VideoCode, arg.Duration, arg.FileSize, arg.SVID, arg.CodeRate)
return
}
// UpdateCmsSvPIC 更新封面图
func (d *Dao) UpdateCmsSvPIC(c context.Context, svid int64, pic *v1.SvPic, st int64) error {
_, err := d.cmsdb.Exec(c, _updateSvPIC, pic.PicURL, pic.PicWidth, pic.PicHeight, st, svid)
return err
}
// HostnameRegister .
func (d *Dao) HostnameRegister(hostnameIndex int64) (succ bool) {
conn := d.redis.Get(context.Background())
defer conn.Close()
redisKey := fmt.Sprintf("hostname:index:%d", hostnameIndex)
exists, err := redis.Int(conn.Do("EXISTS", redisKey))
if err != nil {
log.Errorv(context.Background(), log.KV("event", "fatal"), log.KV("log", fmt.Sprintf("get hostname index from redis fail: key=%s", redisKey)))
// 即使redis失败了也给返回成功
return true
}
if exists == 1 {
return false
}
// 不去管返回结果,永远返回成功
if _, err = conn.Do("SETEX", redisKey, 1000, 1); err != nil {
log.Errorv(context.Background(), log.KV("event", "fatal"), log.KV("log", fmt.Sprintf("get hostname index from redis fail: key=%s", redisKey)))
}
return true
}
// AddVideoViews .
func (d *Dao) AddVideoViews(c context.Context, svid int64, views int) (affected int64, err error) {
row := d.db.QueryRow(c, _existedStatistics, svid)
tmp := 0
if err = row.Scan(&tmp); err != nil || tmp == 0 {
_, err = d.db.Exec(c, _insertStatistics, svid, 0, 0, 0, 0, 0)
if err != nil {
return
}
}
result, err := d.db.Exec(c, _addVideoViews, views, svid)
if err != nil {
return
}
return result.RowsAffected()
}
// VideoStateUpdate .
func (d *Dao) VideoStateUpdate(c context.Context, svid int64, newState int) (aff int64, err error) {
result, err := d.db.Exec(c, _updateVideoState, newState, svid)
if err != nil {
return
}
aff, err = result.RowsAffected()
return
}