bilibili-backup/app/service/video/stream-mng/dao/mysql.go
2019-04-22 02:59:20 +00:00

384 lines
10 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dao
import (
"context"
"errors"
"fmt"
"go-common/app/service/video/stream-mng/common"
"go-common/app/service/video/stream-mng/model"
"go-common/library/log"
"go-common/library/net/metadata"
"go-common/library/sync/errgroup"
)
// RawStreamFullInfo 直接从数据库中查询流信息,可传入流名, 也可传入rid
func (d *Dao) RawStreamFullInfo(c context.Context, id int64, sname string) (res *model.StreamFullInfo, err error) {
var (
official []*model.OfficialStream
backup []*model.StreamBase
mainStream *model.MainStream
)
if sname != "" {
official, err = d.GetOfficialStreamByName(c, sname)
// 可以从原表中查询到
if err == nil && official != nil && len(official) > 0 {
id = official[0].RoomID
goto END
}
var backUpInfo *model.BackupStream
// 原表中查询不到
backUpInfo, err = d.GetBackupStreamByStreamName(c, sname)
if err != nil {
log.Errorv(c, log.KV("log", fmt.Sprintf("sql backup_stream err = %v", err)))
return
}
if backUpInfo == nil {
err = fmt.Errorf("can not find any info by %s", sname)
return
}
id = backUpInfo.RoomID
}
END:
// todo 这里用老的errgroup 新errgroup2 暂时未有人用,bug未知
group, errCtx := errgroup.WithContext(c)
// 如果还未查sv_ls_stream则需要查询
if id > 0 && len(official) == 0 {
group.Go(func() (err error) {
log.Warn("group offical")
if official, err = d.GetOfficialStreamByRoomID(errCtx, id); err != nil {
log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group offical err=%v", err)))
}
return nil
})
}
if id > 0 {
group.Go(func() (err error) {
log.Warn("group main")
if mainStream, err = d.GetMainStreamFromDB(errCtx, id, ""); err != nil {
log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group main err=%v", err)))
}
return nil
})
group.Go(func() (err error) {
log.Warn("group back")
back, err := d.GetBackupStreamByRoomID(errCtx, id)
if err != nil {
log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group backup err=%v", err)))
} else {
backup = d.formatBackup2BaseInfo(c, back)
}
return nil
})
}
err = group.Wait()
if err != nil {
return
}
if len(official) == 0 {
err = fmt.Errorf("can not find any info by room_id=%d", id)
return
}
return d.formatStreamFullInfo(c, official, backup, mainStream)
}
// RawStreamRIDByName 查询rid
func (d *Dao) RawStreamRIDByName(c context.Context, sname string) (res *model.StreamFullInfo, err error) {
return d.RawStreamFullInfo(c, 0, sname)
}
// RawMultiStreamInfo 批量查询流信息
func (d *Dao) RawMultiStreamInfo(c context.Context, rids []int64) (res map[int64]*model.StreamFullInfo, err error) {
var (
official []*model.OfficialStream
backup []*model.BackupStream
mainStream []*model.MainStream
)
group, errCtx := errgroup.WithContext(c)
group.Go(func() (err error) {
if official, err = d.GetMultiOfficalStreamByRID(errCtx, rids); err != nil {
log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group offical err=%v", err)))
}
return nil
})
group.Go(func() (err error) {
if backup, err = d.GetMultiBackupStreamByRID(errCtx, rids); err != nil {
log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group back err=%v", err)))
}
return nil
})
group.Go(func() (err error) {
if mainStream, err = d.GetMultiMainStreamFromDB(errCtx, rids); err != nil {
log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group back err=%v", err)))
}
return nil
})
err = group.Wait()
if err != nil {
return
}
// 把rid相同的放为一组
ridMapOfficial := map[int64][]*model.OfficialStream{}
for _, v := range official {
ridMapOfficial[v.RoomID] = append(ridMapOfficial[v.RoomID], v)
}
ridMapBackup := map[int64][]*model.BackupStream{}
for _, v := range backup {
ridMapBackup[v.RoomID] = append(ridMapBackup[v.RoomID], v)
}
ridMapBackupBase := map[int64][]*model.StreamBase{}
for id, v := range ridMapBackup {
ridMapBackupBase[id] = d.formatBackup2BaseInfo(c, v)
}
ridMapMain := map[int64]*model.MainStream{}
for _, v := range mainStream {
ridMapMain[v.RoomID] = v
}
infos := map[int64]*model.StreamFullInfo{}
flag := false
for id, v := range ridMapOfficial {
flag = true
infos[id], _ = d.formatStreamFullInfo(c, v, ridMapBackupBase[id], ridMapMain[id])
}
if flag {
return infos, nil
}
log.Errorv(c, log.KV("log", fmt.Errorf("can not find any info by room_ids=%d", rids)))
return nil, nil
}
// formatStreamFullInfo 格式化流信息
func (d *Dao) formatStreamFullInfo(c context.Context, official []*model.OfficialStream, backup []*model.StreamBase, main *model.MainStream) (*model.StreamFullInfo, error) {
resp := &model.StreamFullInfo{}
resp.List = []*model.StreamBase{}
var roomID int64
roomID = official[0].RoomID
resp.RoomID = official[0].RoomID
base := &model.StreamBase{}
base.StreamName = official[0].Name
base.Type = 1
base.Key = official[0].Key
if main != nil {
base.Options = main.Options
if 4&base.Options == 4 {
base.Wmask = true
}
if 8&base.Options == 8 {
base.Mmask = true
}
}
for _, item := range official {
if item.UpRank == 1 {
if val, ok := common.SrcMapBitwise[item.Src]; ok {
// todo origin为main-stream取
if main != nil {
base.Origin = main.OriginUpstream
} else {
// 做个兜底逻辑, main-stream中没有这个数据但是sv_ls_stream确实在播
base.Origin = val
}
base.DefaultUpStream = val
} else {
// 如果上行不在现在的任意一家, 则重新设置上行
if err := d.UpdateOfficialStreamStatus(c, roomID, common.BVCSrc); err == nil {
if main != nil {
base.Origin = main.OriginUpstream
} else {
base.Origin = common.BitWiseBVC
}
base.DefaultUpStream = common.BitWiseBVC
go func(c context.Context, rid int64, fromOrigin int8, toOrigin int64, sname string) {
d.UpdateStreamStatusCache(c, &model.StreamStatus{
RoomID: rid,
StreamName: sname,
DefaultChange: true,
DefaultUpStream: toOrigin,
})
// 插入日志
d.InsertChangeLog(c, &model.StreamChangeLog{
RoomID: rid,
FromOrigin: int64(fromOrigin),
ToOrigin: toOrigin,
Reason: fmt.Sprintf("上行不在五家CDN,old origin=%d", fromOrigin),
OperateName: "auto_change",
Source: "background",
})
}(metadata.WithContext(c), roomID, item.Src, common.BitWiseBVC, item.Name)
}
}
} else if item.UpRank == 2 {
if val, ok := common.SrcMapBitwise[item.Src]; ok {
base.Forward = append(base.Forward, val)
}
}
}
resp.List = append(resp.List, base)
if len(backup) > 0 {
for _, v := range backup {
resp.List = append(resp.List, v)
}
}
d.liveAside.Do(c, func(ctx context.Context) {
d.diffStreamInfo(ctx, resp, main)
})
return resp, nil
}
// formatBackup2Base backup 格式化为base
func (d *Dao) formatBackup2BaseInfo(c context.Context, back []*model.BackupStream) (resp []*model.StreamBase) {
if len(back) > 0 {
for _, b := range back {
bs := &model.StreamBase{}
bs.StreamName = b.StreamName
bs.Type = 2
bs.Key = b.Key
// 原始上行
bs.Origin = b.OriginUpstream
bs.DefaultUpStream = b.DefaultVendor
bs.Options = b.Options
// 位运算:可满足9家cdn
var n int64
for n = 256; n > 0; n /= 2 {
if (b.Streaming&n) == n && n != bs.Origin {
bs.Forward = append(bs.Forward, n)
}
}
resp = append(resp, bs)
}
}
return
}
// 比较新表和老表
func (d *Dao) diffStreamInfo(c context.Context, info *model.StreamFullInfo, mainStream *model.MainStream) {
if info != nil && info.RoomID != 0 && len(info.List) > 0 {
if mainStream == nil {
d.syncMainStream(c, info.RoomID, "")
log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:can find any info, room_id=%d", info.RoomID)))
return
}
offical := info.List[0]
if mainStream.StreamName != offical.StreamName {
log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:stream name is differentroom_id=%d", info.RoomID)))
return
}
if mainStream.Key != offical.Key {
log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:key is differentroom_id=%d", info.RoomID)))
return
}
if mainStream.DefaultVendor != offical.DefaultUpStream {
log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:DefaultVendor is differentroom_id=%d,main=%d,offical=%d", info.RoomID, mainStream.DefaultVendor, offical.DefaultUpStream)))
return
}
if mainStream.OriginUpstream != 0 && (mainStream.OriginUpstream != mainStream.DefaultVendor) {
log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:OriginUpstream is differentroom_id=%d, main origin=%d, main default=%d", info.RoomID, mainStream.OriginUpstream, mainStream.DefaultVendor)))
return
}
streaming := offical.DefaultUpStream
for _, v := range offical.Forward {
streaming += v
}
if mainStream.Streaming != streaming {
log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:Streaming is differentroom_id=%d, main=%d, offical=%d", info.RoomID, mainStream.Streaming, streaming)))
return
}
}
}
func (d *Dao) syncMainStream(c context.Context, roomID int64, streamName string) error {
if roomID <= 0 && streamName == "" {
return errors.New("invalid params")
}
var err error
exists, err := d.GetMainStreamFromDB(c, roomID, streamName)
if err != nil && err.Error() != "sql: no rows in result set" {
log.Errorv(c, log.KV("log", fmt.Sprintf("sync_stream_data_error = %v", err)))
return err
}
if exists != nil && (exists.RoomID == roomID || exists.StreamName == streamName) {
return nil
}
var full *model.StreamFullInfo
if roomID > 0 && streamName == "" {
full, err = d.StreamFullInfo(c, roomID, "")
} else if roomID <= 0 && streamName != "" {
full, err = d.StreamFullInfo(c, 0, streamName)
}
if err != nil {
return err
}
if full == nil {
return errors.New("unknow response")
}
for _, ss := range full.List {
if ss.Type == 1 {
ms := &model.MainStream{
RoomID: full.RoomID,
StreamName: ss.StreamName,
Key: ss.Key,
DefaultVendor: ss.DefaultUpStream,
Status: 1,
}
if ms.DefaultVendor == 0 {
ms.DefaultVendor = 1
}
_, err := d.CreateNewStream(c, ms)
if err != nil {
log.Errorv(c, log.KV("log", fmt.Sprintf("sync_stream_data_error = %v", err)))
}
break
}
}
return nil
}