384 lines
10 KiB
Go
384 lines
10 KiB
Go
|
package dao
|
|||
|
|
|||
|
import (
|
|||
|
"context"
|
|||
|
"errors"
|
|||
|
"fmt"
|
|||
|
"go-common/app/service/video/stream-mng/common"
|
|||
|
"go-common/app/service/video/stream-mng/model"
|
|||
|
"go-common/library/log"
|
|||
|
"go-common/library/net/metadata"
|
|||
|
"go-common/library/sync/errgroup"
|
|||
|
)
|
|||
|
|
|||
|
// RawStreamFullInfo 直接从数据库中查询流信息,可传入流名, 也可传入rid
|
|||
|
func (d *Dao) RawStreamFullInfo(c context.Context, id int64, sname string) (res *model.StreamFullInfo, err error) {
|
|||
|
var (
|
|||
|
official []*model.OfficialStream
|
|||
|
backup []*model.StreamBase
|
|||
|
mainStream *model.MainStream
|
|||
|
)
|
|||
|
|
|||
|
if sname != "" {
|
|||
|
official, err = d.GetOfficialStreamByName(c, sname)
|
|||
|
|
|||
|
// 可以从原表中查询到
|
|||
|
if err == nil && official != nil && len(official) > 0 {
|
|||
|
id = official[0].RoomID
|
|||
|
goto END
|
|||
|
}
|
|||
|
|
|||
|
var backUpInfo *model.BackupStream
|
|||
|
// 原表中查询不到
|
|||
|
backUpInfo, err = d.GetBackupStreamByStreamName(c, sname)
|
|||
|
if err != nil {
|
|||
|
log.Errorv(c, log.KV("log", fmt.Sprintf("sql backup_stream err = %v", err)))
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
if backUpInfo == nil {
|
|||
|
err = fmt.Errorf("can not find any info by %s", sname)
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
id = backUpInfo.RoomID
|
|||
|
}
|
|||
|
|
|||
|
END:
|
|||
|
// todo 这里用老的errgroup, 新errgroup2 暂时未有人用,bug未知
|
|||
|
group, errCtx := errgroup.WithContext(c)
|
|||
|
// 如果还未查sv_ls_stream则需要查询
|
|||
|
if id > 0 && len(official) == 0 {
|
|||
|
group.Go(func() (err error) {
|
|||
|
log.Warn("group offical")
|
|||
|
if official, err = d.GetOfficialStreamByRoomID(errCtx, id); err != nil {
|
|||
|
log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group offical err=%v", err)))
|
|||
|
}
|
|||
|
return nil
|
|||
|
})
|
|||
|
}
|
|||
|
|
|||
|
if id > 0 {
|
|||
|
group.Go(func() (err error) {
|
|||
|
log.Warn("group main")
|
|||
|
if mainStream, err = d.GetMainStreamFromDB(errCtx, id, ""); err != nil {
|
|||
|
log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group main err=%v", err)))
|
|||
|
}
|
|||
|
return nil
|
|||
|
})
|
|||
|
|
|||
|
group.Go(func() (err error) {
|
|||
|
log.Warn("group back")
|
|||
|
back, err := d.GetBackupStreamByRoomID(errCtx, id)
|
|||
|
if err != nil {
|
|||
|
log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group backup err=%v", err)))
|
|||
|
} else {
|
|||
|
backup = d.formatBackup2BaseInfo(c, back)
|
|||
|
}
|
|||
|
return nil
|
|||
|
})
|
|||
|
}
|
|||
|
|
|||
|
err = group.Wait()
|
|||
|
|
|||
|
if err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
if len(official) == 0 {
|
|||
|
err = fmt.Errorf("can not find any info by room_id=%d", id)
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
return d.formatStreamFullInfo(c, official, backup, mainStream)
|
|||
|
}
|
|||
|
|
|||
|
// RawStreamRIDByName 查询rid
|
|||
|
func (d *Dao) RawStreamRIDByName(c context.Context, sname string) (res *model.StreamFullInfo, err error) {
|
|||
|
return d.RawStreamFullInfo(c, 0, sname)
|
|||
|
}
|
|||
|
|
|||
|
// RawMultiStreamInfo 批量查询流信息
|
|||
|
func (d *Dao) RawMultiStreamInfo(c context.Context, rids []int64) (res map[int64]*model.StreamFullInfo, err error) {
|
|||
|
var (
|
|||
|
official []*model.OfficialStream
|
|||
|
backup []*model.BackupStream
|
|||
|
mainStream []*model.MainStream
|
|||
|
)
|
|||
|
|
|||
|
group, errCtx := errgroup.WithContext(c)
|
|||
|
group.Go(func() (err error) {
|
|||
|
if official, err = d.GetMultiOfficalStreamByRID(errCtx, rids); err != nil {
|
|||
|
log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group offical err=%v", err)))
|
|||
|
}
|
|||
|
return nil
|
|||
|
})
|
|||
|
|
|||
|
group.Go(func() (err error) {
|
|||
|
if backup, err = d.GetMultiBackupStreamByRID(errCtx, rids); err != nil {
|
|||
|
log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group back err=%v", err)))
|
|||
|
}
|
|||
|
return nil
|
|||
|
})
|
|||
|
|
|||
|
group.Go(func() (err error) {
|
|||
|
if mainStream, err = d.GetMultiMainStreamFromDB(errCtx, rids); err != nil {
|
|||
|
log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group back err=%v", err)))
|
|||
|
}
|
|||
|
return nil
|
|||
|
})
|
|||
|
|
|||
|
err = group.Wait()
|
|||
|
|
|||
|
if err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
// 把rid相同的放为一组
|
|||
|
ridMapOfficial := map[int64][]*model.OfficialStream{}
|
|||
|
for _, v := range official {
|
|||
|
ridMapOfficial[v.RoomID] = append(ridMapOfficial[v.RoomID], v)
|
|||
|
}
|
|||
|
|
|||
|
ridMapBackup := map[int64][]*model.BackupStream{}
|
|||
|
for _, v := range backup {
|
|||
|
ridMapBackup[v.RoomID] = append(ridMapBackup[v.RoomID], v)
|
|||
|
}
|
|||
|
|
|||
|
ridMapBackupBase := map[int64][]*model.StreamBase{}
|
|||
|
for id, v := range ridMapBackup {
|
|||
|
ridMapBackupBase[id] = d.formatBackup2BaseInfo(c, v)
|
|||
|
}
|
|||
|
|
|||
|
ridMapMain := map[int64]*model.MainStream{}
|
|||
|
for _, v := range mainStream {
|
|||
|
ridMapMain[v.RoomID] = v
|
|||
|
}
|
|||
|
|
|||
|
infos := map[int64]*model.StreamFullInfo{}
|
|||
|
|
|||
|
flag := false
|
|||
|
for id, v := range ridMapOfficial {
|
|||
|
flag = true
|
|||
|
infos[id], _ = d.formatStreamFullInfo(c, v, ridMapBackupBase[id], ridMapMain[id])
|
|||
|
}
|
|||
|
|
|||
|
if flag {
|
|||
|
return infos, nil
|
|||
|
}
|
|||
|
|
|||
|
log.Errorv(c, log.KV("log", fmt.Errorf("can not find any info by room_ids=%d", rids)))
|
|||
|
return nil, nil
|
|||
|
}
|
|||
|
|
|||
|
// formatStreamFullInfo 格式化流信息
|
|||
|
func (d *Dao) formatStreamFullInfo(c context.Context, official []*model.OfficialStream, backup []*model.StreamBase, main *model.MainStream) (*model.StreamFullInfo, error) {
|
|||
|
resp := &model.StreamFullInfo{}
|
|||
|
resp.List = []*model.StreamBase{}
|
|||
|
|
|||
|
var roomID int64
|
|||
|
|
|||
|
roomID = official[0].RoomID
|
|||
|
resp.RoomID = official[0].RoomID
|
|||
|
|
|||
|
base := &model.StreamBase{}
|
|||
|
base.StreamName = official[0].Name
|
|||
|
base.Type = 1
|
|||
|
base.Key = official[0].Key
|
|||
|
|
|||
|
if main != nil {
|
|||
|
base.Options = main.Options
|
|||
|
if 4&base.Options == 4 {
|
|||
|
base.Wmask = true
|
|||
|
}
|
|||
|
if 8&base.Options == 8 {
|
|||
|
base.Mmask = true
|
|||
|
}
|
|||
|
|
|||
|
}
|
|||
|
|
|||
|
for _, item := range official {
|
|||
|
if item.UpRank == 1 {
|
|||
|
if val, ok := common.SrcMapBitwise[item.Src]; ok {
|
|||
|
// todo origin为main-stream取
|
|||
|
if main != nil {
|
|||
|
base.Origin = main.OriginUpstream
|
|||
|
} else {
|
|||
|
// 做个兜底逻辑, main-stream中没有这个数据,但是sv_ls_stream确实在播
|
|||
|
base.Origin = val
|
|||
|
}
|
|||
|
base.DefaultUpStream = val
|
|||
|
} else {
|
|||
|
// 如果上行不在现在的任意一家, 则重新设置上行
|
|||
|
if err := d.UpdateOfficialStreamStatus(c, roomID, common.BVCSrc); err == nil {
|
|||
|
if main != nil {
|
|||
|
base.Origin = main.OriginUpstream
|
|||
|
} else {
|
|||
|
base.Origin = common.BitWiseBVC
|
|||
|
}
|
|||
|
base.DefaultUpStream = common.BitWiseBVC
|
|||
|
|
|||
|
go func(c context.Context, rid int64, fromOrigin int8, toOrigin int64, sname string) {
|
|||
|
d.UpdateStreamStatusCache(c, &model.StreamStatus{
|
|||
|
RoomID: rid,
|
|||
|
StreamName: sname,
|
|||
|
DefaultChange: true,
|
|||
|
DefaultUpStream: toOrigin,
|
|||
|
})
|
|||
|
// 插入日志
|
|||
|
d.InsertChangeLog(c, &model.StreamChangeLog{
|
|||
|
RoomID: rid,
|
|||
|
FromOrigin: int64(fromOrigin),
|
|||
|
ToOrigin: toOrigin,
|
|||
|
Reason: fmt.Sprintf("上行不在五家CDN,old origin=%d", fromOrigin),
|
|||
|
OperateName: "auto_change",
|
|||
|
Source: "background",
|
|||
|
})
|
|||
|
}(metadata.WithContext(c), roomID, item.Src, common.BitWiseBVC, item.Name)
|
|||
|
}
|
|||
|
}
|
|||
|
} else if item.UpRank == 2 {
|
|||
|
if val, ok := common.SrcMapBitwise[item.Src]; ok {
|
|||
|
base.Forward = append(base.Forward, val)
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
resp.List = append(resp.List, base)
|
|||
|
|
|||
|
if len(backup) > 0 {
|
|||
|
for _, v := range backup {
|
|||
|
resp.List = append(resp.List, v)
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
d.liveAside.Do(c, func(ctx context.Context) {
|
|||
|
d.diffStreamInfo(ctx, resp, main)
|
|||
|
})
|
|||
|
|
|||
|
return resp, nil
|
|||
|
}
|
|||
|
|
|||
|
// formatBackup2Base backup 格式化为base
|
|||
|
func (d *Dao) formatBackup2BaseInfo(c context.Context, back []*model.BackupStream) (resp []*model.StreamBase) {
|
|||
|
if len(back) > 0 {
|
|||
|
for _, b := range back {
|
|||
|
bs := &model.StreamBase{}
|
|||
|
bs.StreamName = b.StreamName
|
|||
|
bs.Type = 2
|
|||
|
bs.Key = b.Key
|
|||
|
|
|||
|
// 原始上行
|
|||
|
bs.Origin = b.OriginUpstream
|
|||
|
bs.DefaultUpStream = b.DefaultVendor
|
|||
|
bs.Options = b.Options
|
|||
|
|
|||
|
// 位运算:可满足9家cdn
|
|||
|
var n int64
|
|||
|
for n = 256; n > 0; n /= 2 {
|
|||
|
if (b.Streaming&n) == n && n != bs.Origin {
|
|||
|
bs.Forward = append(bs.Forward, n)
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
resp = append(resp, bs)
|
|||
|
}
|
|||
|
}
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
// 比较新表和老表
|
|||
|
func (d *Dao) diffStreamInfo(c context.Context, info *model.StreamFullInfo, mainStream *model.MainStream) {
|
|||
|
if info != nil && info.RoomID != 0 && len(info.List) > 0 {
|
|||
|
if mainStream == nil {
|
|||
|
d.syncMainStream(c, info.RoomID, "")
|
|||
|
log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:can find any info, room_id=%d", info.RoomID)))
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
offical := info.List[0]
|
|||
|
if mainStream.StreamName != offical.StreamName {
|
|||
|
log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:stream name is different,room_id=%d", info.RoomID)))
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
if mainStream.Key != offical.Key {
|
|||
|
log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:key is different,room_id=%d", info.RoomID)))
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
if mainStream.DefaultVendor != offical.DefaultUpStream {
|
|||
|
log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:DefaultVendor is different,room_id=%d,main=%d,offical=%d", info.RoomID, mainStream.DefaultVendor, offical.DefaultUpStream)))
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
if mainStream.OriginUpstream != 0 && (mainStream.OriginUpstream != mainStream.DefaultVendor) {
|
|||
|
log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:OriginUpstream is different,room_id=%d, main origin=%d, main default=%d", info.RoomID, mainStream.OriginUpstream, mainStream.DefaultVendor)))
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
streaming := offical.DefaultUpStream
|
|||
|
for _, v := range offical.Forward {
|
|||
|
streaming += v
|
|||
|
}
|
|||
|
if mainStream.Streaming != streaming {
|
|||
|
log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:Streaming is different,room_id=%d, main=%d, offical=%d", info.RoomID, mainStream.Streaming, streaming)))
|
|||
|
return
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
func (d *Dao) syncMainStream(c context.Context, roomID int64, streamName string) error {
|
|||
|
if roomID <= 0 && streamName == "" {
|
|||
|
return errors.New("invalid params")
|
|||
|
}
|
|||
|
|
|||
|
var err error
|
|||
|
exists, err := d.GetMainStreamFromDB(c, roomID, streamName)
|
|||
|
if err != nil && err.Error() != "sql: no rows in result set" {
|
|||
|
log.Errorv(c, log.KV("log", fmt.Sprintf("sync_stream_data_error = %v", err)))
|
|||
|
return err
|
|||
|
}
|
|||
|
if exists != nil && (exists.RoomID == roomID || exists.StreamName == streamName) {
|
|||
|
return nil
|
|||
|
}
|
|||
|
|
|||
|
var full *model.StreamFullInfo
|
|||
|
if roomID > 0 && streamName == "" {
|
|||
|
full, err = d.StreamFullInfo(c, roomID, "")
|
|||
|
} else if roomID <= 0 && streamName != "" {
|
|||
|
full, err = d.StreamFullInfo(c, 0, streamName)
|
|||
|
}
|
|||
|
|
|||
|
if err != nil {
|
|||
|
return err
|
|||
|
}
|
|||
|
if full == nil {
|
|||
|
return errors.New("unknow response")
|
|||
|
}
|
|||
|
|
|||
|
for _, ss := range full.List {
|
|||
|
if ss.Type == 1 {
|
|||
|
ms := &model.MainStream{
|
|||
|
RoomID: full.RoomID,
|
|||
|
StreamName: ss.StreamName,
|
|||
|
Key: ss.Key,
|
|||
|
DefaultVendor: ss.DefaultUpStream,
|
|||
|
Status: 1,
|
|||
|
}
|
|||
|
|
|||
|
if ms.DefaultVendor == 0 {
|
|||
|
ms.DefaultVendor = 1
|
|||
|
}
|
|||
|
|
|||
|
_, err := d.CreateNewStream(c, ms)
|
|||
|
if err != nil {
|
|||
|
log.Errorv(c, log.KV("log", fmt.Sprintf("sync_stream_data_error = %v", err)))
|
|||
|
}
|
|||
|
break
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
return nil
|
|||
|
}
|