497 lines
13 KiB
Go
497 lines
13 KiB
Go
package dao
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
"unsafe"
|
|
|
|
recsys "go-common/app/service/bbq/recsys/api/grpc/v1"
|
|
"go-common/app/service/bbq/recsys/dao/parallel"
|
|
"go-common/app/service/bbq/recsys/model"
|
|
"go-common/library/cache/redis"
|
|
"go-common/library/log"
|
|
|
|
"github.com/Dai0522/workpool"
|
|
"github.com/json-iterator/go"
|
|
)
|
|
|
|
//user const
|
|
const (
|
|
TaskLastPage = "TaskLastPage"
|
|
TaskLastUpsPage = "TaskLastUpsPage"
|
|
TaskBiliUserProfile = "TaskBiliUserProfile"
|
|
TaskBBQUserProfile = "TaskBBQUserProfile"
|
|
TaskBBQDeviceProfile = "TaskBBQDeviceProfile"
|
|
TaskUserLike = "TaskUserLike"
|
|
TaskUserLikeYesterday = "TaskUserLikeYesterday"
|
|
TaskUserPlay = "TaskUserPlay"
|
|
TaskUserPlayYesterday = "TaskUserPlayYesterday"
|
|
TaskDevicePlay = "TaskDevicePlay"
|
|
TaskDevicePlayYesterday = "TaskDevicePlayYesterday"
|
|
TaskUserFollow = "TaskUserFollow"
|
|
TaskUserFollowYesterday = "TaskUserFollowYesterday"
|
|
|
|
//_BBQDeviceProfileKey = "bbq:device:profile:%s"
|
|
_BBQDeviceProfileKey = "bbq:device:profile:{buvid}:%s"
|
|
_BBQUserProfileKey = "bbq:user:profile:%d"
|
|
_BiliUserProfileKey = "bbq:user:basic:%d"
|
|
_LastFewPageRecords1 = "bbq:last:v1:mid:%d"
|
|
_LastFewPageRecords2 = "bbq:last:v1:buvid:%s"
|
|
_LastFewUpsPageRecords1 = "bbq:last:v1:ups:mid:%d"
|
|
_LastFewUpsPageRecords2 = "bbq:last:v1:ups:buvid:%s"
|
|
|
|
_RealTimeUserLike = "storm:v2:u:%d:like:%s"
|
|
_RealTimeUserPlayMID = "storm:v2:u:%d:%s:view:100"
|
|
_RealTimeUserPlayBuvID = "storm:v2:u:%s:%s:view:100"
|
|
_RealTimeUserFollow = "storm:v2:u:%d:%s:follow:100"
|
|
|
|
_ModelTest = "bbq:model:init"
|
|
|
|
_Zone = "zone"
|
|
_Tag = "tag"
|
|
_Up = "up"
|
|
)
|
|
|
|
//LastPageRedisKey for main rec process
|
|
func (d *Dao) LastPageRedisKey(mid int64, buvid string) (key string) {
|
|
if mid > 0 {
|
|
key = fmt.Sprintf(_LastFewPageRecords1, mid)
|
|
} else {
|
|
key = fmt.Sprintf(_LastFewPageRecords2, buvid)
|
|
}
|
|
return
|
|
}
|
|
|
|
//LastUpsPageRedisKey for ups rec process
|
|
func (d *Dao) LastUpsPageRedisKey(mid int64, buvid string) (key string) {
|
|
if mid > 0 {
|
|
key = fmt.Sprintf(_LastFewUpsPageRecords1, mid)
|
|
} else {
|
|
key = fmt.Sprintf(_LastFewUpsPageRecords2, buvid)
|
|
}
|
|
return
|
|
}
|
|
|
|
//InitModel ...
|
|
func (d *Dao) InitModel(c context.Context, weights map[string]float64) (err error) {
|
|
conn := d.redis.Get(c)
|
|
defer conn.Close()
|
|
|
|
key := _ModelTest
|
|
if result, err := redis.String(conn.Do("GET", key)); err == nil {
|
|
for _, field := range strings.Split(result, ",") {
|
|
featureWeightPair := strings.Split(field, ":")
|
|
if len(featureWeightPair) >= 2 {
|
|
feature := featureWeightPair[0]
|
|
weight, _ := strconv.ParseFloat(featureWeightPair[1], 64)
|
|
weights[feature] = weight
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
//StoreRecResults store rec or upsRec history according to getKeyFunc
|
|
func (d *Dao) StoreRecResults(c context.Context, u *model.UserProfile, mid int64, buvid string, response *recsys.RecsysResponse, getKeyFunc func(int64, string) string, lastRecords []model.Record4Dup) (err error) {
|
|
conn := d.redis.Get(c)
|
|
defer conn.Close()
|
|
|
|
key := getKeyFunc(mid, buvid)
|
|
maxPageNum := 10
|
|
size := len(response.List)
|
|
if len(lastRecords) > maxPageNum*size {
|
|
lastRecords = lastRecords[size:]
|
|
}
|
|
|
|
for _, record := range response.List {
|
|
svid := record.Svid
|
|
mid, ok1 := record.Map[model.UperMid]
|
|
tag, ok2 := record.Map[model.ScatterTag]
|
|
if ok1 && ok2 {
|
|
lastRecords = append(lastRecords, model.Record4Dup{
|
|
SVID: svid,
|
|
MID: mid,
|
|
Tag: tag,
|
|
})
|
|
}
|
|
}
|
|
|
|
bytes, _ := jsoniter.Marshal(lastRecords)
|
|
_, err = conn.Do("SETEX", key, 86400, bytes)
|
|
if err != nil {
|
|
log.Error("store last few records error: ", err)
|
|
}
|
|
|
|
////for test
|
|
//if mid == 28272030 || mid == 390642849 {
|
|
// return
|
|
//}
|
|
// write bloomfilter for es
|
|
svids := make([]uint64, len(response.List))
|
|
for i, v := range response.List {
|
|
svids[i] = uint64(v.Svid)
|
|
}
|
|
if _, bfErr := d.WriteBF(c, mid, buvid, svids); bfErr != nil {
|
|
log.Errorv(c, log.KV("Write BF error: ", bfErr))
|
|
}
|
|
return
|
|
}
|
|
|
|
//InitUserProfile ...
|
|
func (d *Dao) InitUserProfile(c context.Context, mid int64, buvid string) (u *model.UserProfile) {
|
|
|
|
u = &model.UserProfile{
|
|
Mid: mid,
|
|
Buvid: buvid,
|
|
Name: "",
|
|
Gender: -1,
|
|
ViewVideos: []int64{},
|
|
Zones1: map[string]float64{},
|
|
BiliTags: map[string]float64{}, //bili
|
|
Zones2: map[string]float64{}, //bili
|
|
FollowUps: map[int64]int64{}, //bili
|
|
|
|
BBQTags: map[string]float64{}, //bbq
|
|
BBQZones: map[string]float64{}, //bbq
|
|
BBQPrefUps: map[int64]int64{}, //bbq
|
|
|
|
BBQFollowAction: map[int64]int64{}, //bbq
|
|
BBQFollow: map[int64]int64{}, //bbq
|
|
BBQBlack: map[int64]int64{}, //bbq
|
|
|
|
PosVideos: map[int64]int64{},
|
|
NegVideos: map[int64]int64{},
|
|
LikeVideos: map[int64]int64{},
|
|
LikeTags: map[string]float64{},
|
|
LikeTagIDs: map[int64]int64{},
|
|
LikeUPs: map[int64]int64{},
|
|
PosTagIDs: map[int64]int64{},
|
|
NegTagIDs: map[int64]int64{},
|
|
PosTags: map[string]float64{},
|
|
NegTags: map[string]float64{},
|
|
LastRecords: []model.Record4Dup{},
|
|
}
|
|
return
|
|
}
|
|
|
|
//LoadUserProfile load user info from redis parallel
|
|
func (d *Dao) LoadUserProfile(c context.Context, mid int64, buvid string) (userProfile *model.UserProfile, err error) {
|
|
tasks := make(map[string]workpool.Task)
|
|
|
|
userProfile = d.InitUserProfile(c, mid, buvid)
|
|
|
|
// lastPage
|
|
if mid != 0 || buvid != "" {
|
|
taskName := TaskLastPage
|
|
key := fmt.Sprintf(_LastFewPageRecords2, buvid)
|
|
if mid != 0 {
|
|
key = fmt.Sprintf(_LastFewPageRecords1, mid)
|
|
}
|
|
task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "GET", key)
|
|
tasks[taskName] = task
|
|
}
|
|
|
|
if mid != 0 || buvid != "" {
|
|
taskName := TaskLastUpsPage
|
|
key := fmt.Sprintf(_LastFewUpsPageRecords2, buvid)
|
|
if mid != 0 {
|
|
key = fmt.Sprintf(_LastFewUpsPageRecords1, mid)
|
|
}
|
|
task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "GET", key)
|
|
tasks[taskName] = task
|
|
}
|
|
|
|
// user profile bili
|
|
if mid != 0 {
|
|
taskName := TaskBiliUserProfile
|
|
key := fmt.Sprintf(_BiliUserProfileKey, mid)
|
|
task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "HGETALL", key)
|
|
tasks[taskName] = task
|
|
}
|
|
|
|
// user profile bbq: mid
|
|
if mid != 0 {
|
|
taskName := TaskBBQUserProfile
|
|
key := fmt.Sprintf(_BBQUserProfileKey, mid)
|
|
task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "HGETALL", key)
|
|
tasks[taskName] = task
|
|
}
|
|
// user profile bbq: buvid
|
|
if mid == 0 && buvid != "" {
|
|
taskName := TaskBBQDeviceProfile
|
|
key := fmt.Sprintf(_BBQDeviceProfileKey, buvid)
|
|
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
|
|
tasks[taskName] = task
|
|
}
|
|
// user real time like
|
|
today := time.Now().Format("20060102")
|
|
yesterday := time.Now().AddDate(0, 0, -1).Format("20060102")
|
|
|
|
if mid != 0 {
|
|
taskName := TaskUserLike
|
|
key := fmt.Sprintf(_RealTimeUserLike, mid, today)
|
|
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
|
|
tasks[taskName] = task
|
|
}
|
|
if mid != 0 {
|
|
taskName := TaskUserLikeYesterday
|
|
key := fmt.Sprintf(_RealTimeUserLike, mid, yesterday)
|
|
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
|
|
tasks[taskName] = task
|
|
}
|
|
if mid != 0 {
|
|
taskName := TaskUserFollow
|
|
key := fmt.Sprintf(_RealTimeUserFollow, mid, today)
|
|
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
|
|
tasks[taskName] = task
|
|
}
|
|
if mid != 0 {
|
|
taskName := TaskUserFollowYesterday
|
|
key := fmt.Sprintf(_RealTimeUserFollow, mid, yesterday)
|
|
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
|
|
tasks[taskName] = task
|
|
}
|
|
if mid != 0 {
|
|
taskName := TaskUserPlay
|
|
key := fmt.Sprintf(_RealTimeUserPlayMID, mid, today)
|
|
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
|
|
tasks[taskName] = task
|
|
}
|
|
if mid != 0 {
|
|
taskName := TaskUserPlayYesterday
|
|
key := fmt.Sprintf(_RealTimeUserPlayMID, mid, yesterday)
|
|
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
|
|
tasks[taskName] = task
|
|
}
|
|
if mid == 0 && buvid != "" {
|
|
taskName := TaskDevicePlay
|
|
key := fmt.Sprintf(_RealTimeUserPlayBuvID, buvid, today)
|
|
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
|
|
tasks[taskName] = task
|
|
}
|
|
if mid == 0 && buvid != "" {
|
|
taskName := TaskDevicePlayYesterday
|
|
key := fmt.Sprintf(_RealTimeUserPlayBuvID, buvid, yesterday)
|
|
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
|
|
tasks[taskName] = task
|
|
}
|
|
|
|
ftTasks := d.parallelTask2(tasks)
|
|
|
|
for name, task := range ftTasks {
|
|
var raw *[]byte
|
|
raw, err = task.Wait(100 * time.Millisecond)
|
|
if err != nil && err != redis.ErrNil {
|
|
log.Errorv(c, log.KV("REDIS_GET_ERROR", err))
|
|
continue
|
|
}
|
|
if raw == nil {
|
|
continue
|
|
}
|
|
|
|
switch name {
|
|
case TaskLastPage:
|
|
setLastPage(raw, userProfile, "lastRecords")
|
|
case TaskLastUpsPage:
|
|
setLastPage(raw, userProfile, "lastUpsRecords")
|
|
|
|
case TaskBiliUserProfile:
|
|
setUserProfileBili(raw, err, userProfile)
|
|
case TaskBBQDeviceProfile:
|
|
setUserProfileBBQ(raw, err, userProfile)
|
|
case TaskBBQUserProfile:
|
|
setUserProfileBBQ(raw, err, userProfile)
|
|
|
|
case TaskUserLikeYesterday:
|
|
setUserLikeInfo(raw, err, userProfile)
|
|
case TaskUserLike:
|
|
setUserLikeInfo(raw, err, userProfile)
|
|
|
|
case TaskUserFollowYesterday:
|
|
setUserFollowInfo(raw, err, userProfile)
|
|
case TaskUserFollow:
|
|
setUserFollowInfo(raw, err, userProfile)
|
|
|
|
case TaskUserPlayYesterday:
|
|
setUserPlayInfo(raw, err, userProfile)
|
|
case TaskDevicePlayYesterday:
|
|
setUserPlayInfo(raw, err, userProfile)
|
|
case TaskUserPlay:
|
|
setUserPlayInfo(raw, err, userProfile)
|
|
case TaskDevicePlay:
|
|
setUserPlayInfo(raw, err, userProfile)
|
|
}
|
|
}
|
|
|
|
if err == redis.ErrNil {
|
|
err = nil
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func setUserProfileBBQ(bytes *[]byte, inErr error, u *model.UserProfile) (err error) {
|
|
var res map[string]string
|
|
if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
|
|
if err == redis.ErrNil {
|
|
err = nil
|
|
} else {
|
|
log.Error("redis HGETALL failed error(%v)", err)
|
|
}
|
|
}
|
|
|
|
for key, value := range res {
|
|
if key == _Zone {
|
|
zone2s := strings.Split(value, ",")
|
|
for _, zone2 := range zone2s {
|
|
u.BBQZones[zone2] = 1.0
|
|
}
|
|
} else if key == _Tag {
|
|
tags := strings.Split(value, ",")
|
|
for _, tag := range tags {
|
|
u.BBQTags[tag] = 1.0
|
|
}
|
|
} else if key == _Up {
|
|
ups := strings.Split(value, ",")
|
|
for _, upStr := range ups {
|
|
upMID, _ := strconv.ParseInt(upStr, 10, 64)
|
|
u.BBQPrefUps[upMID] = 1
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func setUserProfileBili(bytes *[]byte, inErr error, u *model.UserProfile) {
|
|
var res map[string]string
|
|
var err error
|
|
if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
|
|
if err == redis.ErrNil {
|
|
err = nil
|
|
} else {
|
|
log.Error("redis HGETALL failed error(%v)", err)
|
|
}
|
|
}
|
|
|
|
for key, value := range res {
|
|
if key == _Zone {
|
|
zone2s := strings.Split(value, ",")
|
|
for _, zone2 := range zone2s {
|
|
u.Zones2[zone2] = 1.0
|
|
}
|
|
} else if key == _Tag {
|
|
tags := strings.Split(value, ",")
|
|
for _, tag := range tags {
|
|
u.BiliTags[tag] = 1.0
|
|
}
|
|
} else if key == _Up {
|
|
ups := strings.Split(value, ",")
|
|
for _, upStr := range ups {
|
|
upMID, _ := strconv.ParseInt(upStr, 10, 64)
|
|
u.FollowUps[upMID] = 1
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func setUserLikeInfo(bytes *[]byte, inErr error, u *model.UserProfile) {
|
|
|
|
var object struct {
|
|
SVID int64 `json:"svid"`
|
|
CTime int64 `json:"ctime"`
|
|
BuvID string `json:"buvid"`
|
|
}
|
|
var res map[string]string
|
|
var err error
|
|
if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
|
|
if err != redis.ErrNil {
|
|
log.Error("redis HGETALL failed error(%v)", err)
|
|
}
|
|
}
|
|
for _, value := range res {
|
|
err = jsoniter.UnmarshalFromString(value, &object)
|
|
if err != nil {
|
|
log.Error("json parse error: %v", err)
|
|
}
|
|
u.LikeVideos[object.SVID] = object.CTime
|
|
}
|
|
}
|
|
|
|
func setUserFollowInfo(bytes *[]byte, inErr error, u *model.UserProfile) {
|
|
|
|
var object struct {
|
|
UpID int64 `json:"upid"`
|
|
CTime int64 `json:"ctime"`
|
|
MID int64 `json:"mid"`
|
|
}
|
|
var res map[string]string
|
|
var err error
|
|
if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
|
|
if err != redis.ErrNil {
|
|
log.Error("user real time follow redis HGETALL failed error(%v)", err)
|
|
}
|
|
}
|
|
for _, value := range res {
|
|
err = jsoniter.UnmarshalFromString(value, &object)
|
|
if err != nil {
|
|
log.Error("json parse error: %v", err)
|
|
}
|
|
u.BBQFollowAction[object.UpID] = object.CTime
|
|
}
|
|
}
|
|
|
|
func setUserPlayInfo(bytes *[]byte, inErr error, u *model.UserProfile) {
|
|
|
|
var object struct {
|
|
Svid int64 `json:"svid"`
|
|
CTime int64 `json:"ctime"`
|
|
Duration int64 `json:"duration"`
|
|
ViewDuration int64 `json:"viewDuration"`
|
|
}
|
|
var res map[string]string
|
|
var err error
|
|
if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
|
|
if err != redis.ErrNil {
|
|
log.Error("redis HGETALL failed error(%v)", err)
|
|
} else {
|
|
err = nil
|
|
}
|
|
}
|
|
|
|
for _, value := range res {
|
|
err = jsoniter.UnmarshalFromString(value, &object)
|
|
if err != nil {
|
|
log.Error("json parse error: %v", err)
|
|
continue
|
|
}
|
|
u.ViewVideos = append(u.ViewVideos, object.Svid)
|
|
if object.ViewDuration >= 15000 || (object.Duration >= 5000 && float64(object.ViewDuration) >= 0.95*float64(object.Duration)) {
|
|
u.PosVideos[object.Svid] = object.CTime
|
|
}
|
|
if object.ViewDuration <= 500 {
|
|
u.NegVideos[object.Svid] = object.CTime
|
|
}
|
|
}
|
|
}
|
|
|
|
func setLastPage(bytes *[]byte, u *model.UserProfile, lastRecordType string) {
|
|
var results []model.Record4Dup
|
|
if len(*bytes) == 0 {
|
|
return
|
|
}
|
|
err := jsoniter.Unmarshal(*bytes, &results)
|
|
if err != nil {
|
|
log.Error("UnmarshalFromString value(%v) error(%v)", bytes, err)
|
|
} else {
|
|
if lastRecordType == "lastRecords" {
|
|
u.LastRecords = results
|
|
} else {
|
|
u.LastUpsRecords = results
|
|
}
|
|
|
|
}
|
|
}
|