163 lines
3.9 KiB
Go
163 lines
3.9 KiB
Go
|
package dao
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"go-common/app/service/video/stream-mng/model"
|
||
|
"go-common/library/log"
|
||
|
"net/http"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
_localLiveStreamList = "live_stream"
|
||
|
_localStreamInfo = "rid:%d"
|
||
|
_liveExpiredTime = 600
|
||
|
_localStreamExpiredTime = 1
|
||
|
)
|
||
|
|
||
|
type OnAirStream struct {
|
||
|
StreamName string `json:"stream_name, omitempty"`
|
||
|
}
|
||
|
|
||
|
type OnAirStreamList struct {
|
||
|
List []*OnAirStream `json:"list,omitempty"`
|
||
|
}
|
||
|
|
||
|
type AllOnAirStream struct {
|
||
|
M map[string]*OnAirStreamList `json:"m,omitempty"`
|
||
|
}
|
||
|
|
||
|
func (d *Dao) getLocalLiveStreamListKey() string {
|
||
|
return _localLiveStreamList
|
||
|
}
|
||
|
|
||
|
func (d *Dao) getLocalStreamInfoKey(rid int64) string {
|
||
|
return fmt.Sprintf(_localStreamInfo, rid)
|
||
|
}
|
||
|
|
||
|
// loadLiveStreamList 判断流是否在播
|
||
|
func (d *Dao) LoadLiveStreamList(c context.Context, rids []int64) map[int64]bool {
|
||
|
list, _ := d.localCache.Get(d.getLocalLiveStreamListKey())
|
||
|
|
||
|
isLive := map[int64]bool{}
|
||
|
if res, ok := list.(map[string]int); ok {
|
||
|
for _, v := range rids {
|
||
|
// rid => stream
|
||
|
info, err := d.streamFullInfo(c, v, "")
|
||
|
if err != nil || info == nil {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
var sname string
|
||
|
var origin int64
|
||
|
for _, v := range info.List {
|
||
|
if v.Type == 1 {
|
||
|
sname = v.StreamName
|
||
|
origin = v.Origin
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
// 不在在播列表&orgin 为0===》不在播
|
||
|
if _, exe := res[sname]; !exe {
|
||
|
if origin == 0 {
|
||
|
isLive[v] = false
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
isLive[v] = true
|
||
|
}
|
||
|
} else {
|
||
|
// 当从缓存中获取失败 or 项目刚刚启动
|
||
|
for _, v := range rids {
|
||
|
isLive[v] = true
|
||
|
}
|
||
|
}
|
||
|
return isLive
|
||
|
}
|
||
|
|
||
|
// StoreLiveStreamList 刷新在播列表缓存
|
||
|
func (d *Dao) StoreLiveStreamList() {
|
||
|
ctx := context.Background()
|
||
|
|
||
|
type liveStream struct {
|
||
|
Code int `json:"code,omitempty"`
|
||
|
Data *AllOnAirStream `json:"data,omitempty"`
|
||
|
}
|
||
|
|
||
|
resp := &liveStream{}
|
||
|
uri := d.getLiveStreamUrl("/api/live/vendor/onairstreamlist?cdn=bvc")
|
||
|
|
||
|
err := d.NewRequst(ctx, http.MethodGet, uri, nil, nil, nil, resp)
|
||
|
if err != nil {
|
||
|
log.Errorv(ctx, log.KV("log", fmt.Sprintf("http_live_err=%v", err)))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
res := map[string]int{}
|
||
|
if resp.Code == 0 && resp.Data != nil && resp.Data.M != nil && len(resp.Data.M["BVC"].List) > 0 {
|
||
|
for _, v := range resp.Data.M["BVC"].List {
|
||
|
res[v.StreamName] = 1
|
||
|
}
|
||
|
} else {
|
||
|
res, _ := json.Marshal(resp)
|
||
|
log.Errorv(ctx, log.KV("log", fmt.Sprintf("http_live_err=%v", string(res))))
|
||
|
}
|
||
|
|
||
|
// 存10分钟
|
||
|
d.localCache.SetWithExpire(d.getLocalLiveStreamListKey(), res, _liveExpiredTime*time.Second)
|
||
|
}
|
||
|
|
||
|
// storeStreamInfo 存储单个流信息
|
||
|
func (d *Dao) storeStreamInfo(c context.Context, info *model.StreamFullInfo) {
|
||
|
if info == nil || info.RoomID < 0 {
|
||
|
return
|
||
|
}
|
||
|
key := d.getLocalStreamInfoKey(info.RoomID)
|
||
|
|
||
|
d.localCache.SetWithExpire(key, info, _localStreamExpiredTime*time.Second)
|
||
|
}
|
||
|
|
||
|
// loadStreamInfo 读取单个信息
|
||
|
func (d *Dao) loadStreamInfo(c context.Context, rid int64) *model.StreamFullInfo {
|
||
|
key := d.getLocalStreamInfoKey(rid)
|
||
|
info, _ := d.localCache.Get(key)
|
||
|
if res, ok := info.(*model.StreamFullInfo); ok {
|
||
|
if res != nil && res.RoomID > 0 {
|
||
|
return res
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// storeMultiStreamInfo 存储多路流
|
||
|
func (d *Dao) storeMultiStreamInfo(c context.Context, infos map[int64]*model.StreamFullInfo) {
|
||
|
for _, v := range infos {
|
||
|
key := d.getLocalStreamInfoKey(v.RoomID)
|
||
|
|
||
|
//log.Warn("key=%v", key)
|
||
|
d.localCache.SetWithExpire(key, v, _localStreamExpiredTime*time.Second)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// loadMultiStreamInfo 读取批量信息
|
||
|
func (d *Dao) loadMultiStreamInfo(c context.Context, rids []int64) (map[int64]*model.StreamFullInfo, []int64) {
|
||
|
infos := map[int64]*model.StreamFullInfo{}
|
||
|
missRids := []int64{}
|
||
|
for _, v := range rids {
|
||
|
key := d.getLocalStreamInfoKey(v)
|
||
|
info, _ := d.localCache.Get(key)
|
||
|
if res, ok := info.(*model.StreamFullInfo); ok {
|
||
|
if res != nil && res.RoomID > 0 {
|
||
|
infos[v] = res
|
||
|
}
|
||
|
} else {
|
||
|
missRids = append(missRids, v)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return infos, missRids
|
||
|
}
|