bilibili-backup/app/service/video/stream-mng/dao/localcache.go
2019-04-22 02:59:20 +00:00

163 lines
3.9 KiB
Go

package dao
import (
"context"
"encoding/json"
"fmt"
"go-common/app/service/video/stream-mng/model"
"go-common/library/log"
"net/http"
"time"
)
const (
_localLiveStreamList = "live_stream"
_localStreamInfo = "rid:%d"
_liveExpiredTime = 600
_localStreamExpiredTime = 1
)
type OnAirStream struct {
StreamName string `json:"stream_name, omitempty"`
}
type OnAirStreamList struct {
List []*OnAirStream `json:"list,omitempty"`
}
type AllOnAirStream struct {
M map[string]*OnAirStreamList `json:"m,omitempty"`
}
func (d *Dao) getLocalLiveStreamListKey() string {
return _localLiveStreamList
}
func (d *Dao) getLocalStreamInfoKey(rid int64) string {
return fmt.Sprintf(_localStreamInfo, rid)
}
// loadLiveStreamList 判断流是否在播
func (d *Dao) LoadLiveStreamList(c context.Context, rids []int64) map[int64]bool {
list, _ := d.localCache.Get(d.getLocalLiveStreamListKey())
isLive := map[int64]bool{}
if res, ok := list.(map[string]int); ok {
for _, v := range rids {
// rid => stream
info, err := d.streamFullInfo(c, v, "")
if err != nil || info == nil {
continue
}
var sname string
var origin int64
for _, v := range info.List {
if v.Type == 1 {
sname = v.StreamName
origin = v.Origin
break
}
}
// 不在在播列表&orgin 为0===》不在播
if _, exe := res[sname]; !exe {
if origin == 0 {
isLive[v] = false
continue
}
}
isLive[v] = true
}
} else {
// 当从缓存中获取失败 or 项目刚刚启动
for _, v := range rids {
isLive[v] = true
}
}
return isLive
}
// StoreLiveStreamList 刷新在播列表缓存
func (d *Dao) StoreLiveStreamList() {
ctx := context.Background()
type liveStream struct {
Code int `json:"code,omitempty"`
Data *AllOnAirStream `json:"data,omitempty"`
}
resp := &liveStream{}
uri := d.getLiveStreamUrl("/api/live/vendor/onairstreamlist?cdn=bvc")
err := d.NewRequst(ctx, http.MethodGet, uri, nil, nil, nil, resp)
if err != nil {
log.Errorv(ctx, log.KV("log", fmt.Sprintf("http_live_err=%v", err)))
return
}
res := map[string]int{}
if resp.Code == 0 && resp.Data != nil && resp.Data.M != nil && len(resp.Data.M["BVC"].List) > 0 {
for _, v := range resp.Data.M["BVC"].List {
res[v.StreamName] = 1
}
} else {
res, _ := json.Marshal(resp)
log.Errorv(ctx, log.KV("log", fmt.Sprintf("http_live_err=%v", string(res))))
}
// 存10分钟
d.localCache.SetWithExpire(d.getLocalLiveStreamListKey(), res, _liveExpiredTime*time.Second)
}
// storeStreamInfo 存储单个流信息
func (d *Dao) storeStreamInfo(c context.Context, info *model.StreamFullInfo) {
if info == nil || info.RoomID < 0 {
return
}
key := d.getLocalStreamInfoKey(info.RoomID)
d.localCache.SetWithExpire(key, info, _localStreamExpiredTime*time.Second)
}
// loadStreamInfo 读取单个信息
func (d *Dao) loadStreamInfo(c context.Context, rid int64) *model.StreamFullInfo {
key := d.getLocalStreamInfoKey(rid)
info, _ := d.localCache.Get(key)
if res, ok := info.(*model.StreamFullInfo); ok {
if res != nil && res.RoomID > 0 {
return res
}
}
return nil
}
// storeMultiStreamInfo 存储多路流
func (d *Dao) storeMultiStreamInfo(c context.Context, infos map[int64]*model.StreamFullInfo) {
for _, v := range infos {
key := d.getLocalStreamInfoKey(v.RoomID)
//log.Warn("key=%v", key)
d.localCache.SetWithExpire(key, v, _localStreamExpiredTime*time.Second)
}
}
// loadMultiStreamInfo 读取批量信息
func (d *Dao) loadMultiStreamInfo(c context.Context, rids []int64) (map[int64]*model.StreamFullInfo, []int64) {
infos := map[int64]*model.StreamFullInfo{}
missRids := []int64{}
for _, v := range rids {
key := d.getLocalStreamInfoKey(v)
info, _ := d.localCache.Get(key)
if res, ok := info.(*model.StreamFullInfo); ok {
if res != nil && res.RoomID > 0 {
infos[v] = res
}
} else {
missRids = append(missRids, v)
}
}
return infos, missRids
}