bilibili-backup/app/service/bbq/recsys/dao/user.go
2019-04-22 02:59:20 +00:00

497 lines
13 KiB
Go

package dao
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"unsafe"
recsys "go-common/app/service/bbq/recsys/api/grpc/v1"
"go-common/app/service/bbq/recsys/dao/parallel"
"go-common/app/service/bbq/recsys/model"
"go-common/library/cache/redis"
"go-common/library/log"
"github.com/Dai0522/workpool"
"github.com/json-iterator/go"
)
//user const
const (
TaskLastPage = "TaskLastPage"
TaskLastUpsPage = "TaskLastUpsPage"
TaskBiliUserProfile = "TaskBiliUserProfile"
TaskBBQUserProfile = "TaskBBQUserProfile"
TaskBBQDeviceProfile = "TaskBBQDeviceProfile"
TaskUserLike = "TaskUserLike"
TaskUserLikeYesterday = "TaskUserLikeYesterday"
TaskUserPlay = "TaskUserPlay"
TaskUserPlayYesterday = "TaskUserPlayYesterday"
TaskDevicePlay = "TaskDevicePlay"
TaskDevicePlayYesterday = "TaskDevicePlayYesterday"
TaskUserFollow = "TaskUserFollow"
TaskUserFollowYesterday = "TaskUserFollowYesterday"
//_BBQDeviceProfileKey = "bbq:device:profile:%s"
_BBQDeviceProfileKey = "bbq:device:profile:{buvid}:%s"
_BBQUserProfileKey = "bbq:user:profile:%d"
_BiliUserProfileKey = "bbq:user:basic:%d"
_LastFewPageRecords1 = "bbq:last:v1:mid:%d"
_LastFewPageRecords2 = "bbq:last:v1:buvid:%s"
_LastFewUpsPageRecords1 = "bbq:last:v1:ups:mid:%d"
_LastFewUpsPageRecords2 = "bbq:last:v1:ups:buvid:%s"
_RealTimeUserLike = "storm:v2:u:%d:like:%s"
_RealTimeUserPlayMID = "storm:v2:u:%d:%s:view:100"
_RealTimeUserPlayBuvID = "storm:v2:u:%s:%s:view:100"
_RealTimeUserFollow = "storm:v2:u:%d:%s:follow:100"
_ModelTest = "bbq:model:init"
_Zone = "zone"
_Tag = "tag"
_Up = "up"
)
//LastPageRedisKey for main rec process
func (d *Dao) LastPageRedisKey(mid int64, buvid string) (key string) {
if mid > 0 {
key = fmt.Sprintf(_LastFewPageRecords1, mid)
} else {
key = fmt.Sprintf(_LastFewPageRecords2, buvid)
}
return
}
//LastUpsPageRedisKey for ups rec process
func (d *Dao) LastUpsPageRedisKey(mid int64, buvid string) (key string) {
if mid > 0 {
key = fmt.Sprintf(_LastFewUpsPageRecords1, mid)
} else {
key = fmt.Sprintf(_LastFewUpsPageRecords2, buvid)
}
return
}
//InitModel ...
func (d *Dao) InitModel(c context.Context, weights map[string]float64) (err error) {
conn := d.redis.Get(c)
defer conn.Close()
key := _ModelTest
if result, err := redis.String(conn.Do("GET", key)); err == nil {
for _, field := range strings.Split(result, ",") {
featureWeightPair := strings.Split(field, ":")
if len(featureWeightPair) >= 2 {
feature := featureWeightPair[0]
weight, _ := strconv.ParseFloat(featureWeightPair[1], 64)
weights[feature] = weight
}
}
}
return
}
//StoreRecResults store rec or upsRec history according to getKeyFunc
func (d *Dao) StoreRecResults(c context.Context, u *model.UserProfile, mid int64, buvid string, response *recsys.RecsysResponse, getKeyFunc func(int64, string) string, lastRecords []model.Record4Dup) (err error) {
conn := d.redis.Get(c)
defer conn.Close()
key := getKeyFunc(mid, buvid)
maxPageNum := 10
size := len(response.List)
if len(lastRecords) > maxPageNum*size {
lastRecords = lastRecords[size:]
}
for _, record := range response.List {
svid := record.Svid
mid, ok1 := record.Map[model.UperMid]
tag, ok2 := record.Map[model.ScatterTag]
if ok1 && ok2 {
lastRecords = append(lastRecords, model.Record4Dup{
SVID: svid,
MID: mid,
Tag: tag,
})
}
}
bytes, _ := jsoniter.Marshal(lastRecords)
_, err = conn.Do("SETEX", key, 86400, bytes)
if err != nil {
log.Error("store last few records error: ", err)
}
////for test
//if mid == 28272030 || mid == 390642849 {
// return
//}
// write bloomfilter for es
svids := make([]uint64, len(response.List))
for i, v := range response.List {
svids[i] = uint64(v.Svid)
}
if _, bfErr := d.WriteBF(c, mid, buvid, svids); bfErr != nil {
log.Errorv(c, log.KV("Write BF error: ", bfErr))
}
return
}
//InitUserProfile ...
func (d *Dao) InitUserProfile(c context.Context, mid int64, buvid string) (u *model.UserProfile) {
u = &model.UserProfile{
Mid: mid,
Buvid: buvid,
Name: "",
Gender: -1,
ViewVideos: []int64{},
Zones1: map[string]float64{},
BiliTags: map[string]float64{}, //bili
Zones2: map[string]float64{}, //bili
FollowUps: map[int64]int64{}, //bili
BBQTags: map[string]float64{}, //bbq
BBQZones: map[string]float64{}, //bbq
BBQPrefUps: map[int64]int64{}, //bbq
BBQFollowAction: map[int64]int64{}, //bbq
BBQFollow: map[int64]int64{}, //bbq
BBQBlack: map[int64]int64{}, //bbq
PosVideos: map[int64]int64{},
NegVideos: map[int64]int64{},
LikeVideos: map[int64]int64{},
LikeTags: map[string]float64{},
LikeTagIDs: map[int64]int64{},
LikeUPs: map[int64]int64{},
PosTagIDs: map[int64]int64{},
NegTagIDs: map[int64]int64{},
PosTags: map[string]float64{},
NegTags: map[string]float64{},
LastRecords: []model.Record4Dup{},
}
return
}
//LoadUserProfile load user info from redis parallel
func (d *Dao) LoadUserProfile(c context.Context, mid int64, buvid string) (userProfile *model.UserProfile, err error) {
tasks := make(map[string]workpool.Task)
userProfile = d.InitUserProfile(c, mid, buvid)
// lastPage
if mid != 0 || buvid != "" {
taskName := TaskLastPage
key := fmt.Sprintf(_LastFewPageRecords2, buvid)
if mid != 0 {
key = fmt.Sprintf(_LastFewPageRecords1, mid)
}
task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "GET", key)
tasks[taskName] = task
}
if mid != 0 || buvid != "" {
taskName := TaskLastUpsPage
key := fmt.Sprintf(_LastFewUpsPageRecords2, buvid)
if mid != 0 {
key = fmt.Sprintf(_LastFewUpsPageRecords1, mid)
}
task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "GET", key)
tasks[taskName] = task
}
// user profile bili
if mid != 0 {
taskName := TaskBiliUserProfile
key := fmt.Sprintf(_BiliUserProfileKey, mid)
task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "HGETALL", key)
tasks[taskName] = task
}
// user profile bbq: mid
if mid != 0 {
taskName := TaskBBQUserProfile
key := fmt.Sprintf(_BBQUserProfileKey, mid)
task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "HGETALL", key)
tasks[taskName] = task
}
// user profile bbq: buvid
if mid == 0 && buvid != "" {
taskName := TaskBBQDeviceProfile
key := fmt.Sprintf(_BBQDeviceProfileKey, buvid)
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
tasks[taskName] = task
}
// user real time like
today := time.Now().Format("20060102")
yesterday := time.Now().AddDate(0, 0, -1).Format("20060102")
if mid != 0 {
taskName := TaskUserLike
key := fmt.Sprintf(_RealTimeUserLike, mid, today)
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
tasks[taskName] = task
}
if mid != 0 {
taskName := TaskUserLikeYesterday
key := fmt.Sprintf(_RealTimeUserLike, mid, yesterday)
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
tasks[taskName] = task
}
if mid != 0 {
taskName := TaskUserFollow
key := fmt.Sprintf(_RealTimeUserFollow, mid, today)
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
tasks[taskName] = task
}
if mid != 0 {
taskName := TaskUserFollowYesterday
key := fmt.Sprintf(_RealTimeUserFollow, mid, yesterday)
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
tasks[taskName] = task
}
if mid != 0 {
taskName := TaskUserPlay
key := fmt.Sprintf(_RealTimeUserPlayMID, mid, today)
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
tasks[taskName] = task
}
if mid != 0 {
taskName := TaskUserPlayYesterday
key := fmt.Sprintf(_RealTimeUserPlayMID, mid, yesterday)
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
tasks[taskName] = task
}
if mid == 0 && buvid != "" {
taskName := TaskDevicePlay
key := fmt.Sprintf(_RealTimeUserPlayBuvID, buvid, today)
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
tasks[taskName] = task
}
if mid == 0 && buvid != "" {
taskName := TaskDevicePlayYesterday
key := fmt.Sprintf(_RealTimeUserPlayBuvID, buvid, yesterday)
task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
tasks[taskName] = task
}
ftTasks := d.parallelTask2(tasks)
for name, task := range ftTasks {
var raw *[]byte
raw, err = task.Wait(100 * time.Millisecond)
if err != nil && err != redis.ErrNil {
log.Errorv(c, log.KV("REDIS_GET_ERROR", err))
continue
}
if raw == nil {
continue
}
switch name {
case TaskLastPage:
setLastPage(raw, userProfile, "lastRecords")
case TaskLastUpsPage:
setLastPage(raw, userProfile, "lastUpsRecords")
case TaskBiliUserProfile:
setUserProfileBili(raw, err, userProfile)
case TaskBBQDeviceProfile:
setUserProfileBBQ(raw, err, userProfile)
case TaskBBQUserProfile:
setUserProfileBBQ(raw, err, userProfile)
case TaskUserLikeYesterday:
setUserLikeInfo(raw, err, userProfile)
case TaskUserLike:
setUserLikeInfo(raw, err, userProfile)
case TaskUserFollowYesterday:
setUserFollowInfo(raw, err, userProfile)
case TaskUserFollow:
setUserFollowInfo(raw, err, userProfile)
case TaskUserPlayYesterday:
setUserPlayInfo(raw, err, userProfile)
case TaskDevicePlayYesterday:
setUserPlayInfo(raw, err, userProfile)
case TaskUserPlay:
setUserPlayInfo(raw, err, userProfile)
case TaskDevicePlay:
setUserPlayInfo(raw, err, userProfile)
}
}
if err == redis.ErrNil {
err = nil
}
return
}
func setUserProfileBBQ(bytes *[]byte, inErr error, u *model.UserProfile) (err error) {
var res map[string]string
if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
if err == redis.ErrNil {
err = nil
} else {
log.Error("redis HGETALL failed error(%v)", err)
}
}
for key, value := range res {
if key == _Zone {
zone2s := strings.Split(value, ",")
for _, zone2 := range zone2s {
u.BBQZones[zone2] = 1.0
}
} else if key == _Tag {
tags := strings.Split(value, ",")
for _, tag := range tags {
u.BBQTags[tag] = 1.0
}
} else if key == _Up {
ups := strings.Split(value, ",")
for _, upStr := range ups {
upMID, _ := strconv.ParseInt(upStr, 10, 64)
u.BBQPrefUps[upMID] = 1
}
}
}
return
}
func setUserProfileBili(bytes *[]byte, inErr error, u *model.UserProfile) {
var res map[string]string
var err error
if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
if err == redis.ErrNil {
err = nil
} else {
log.Error("redis HGETALL failed error(%v)", err)
}
}
for key, value := range res {
if key == _Zone {
zone2s := strings.Split(value, ",")
for _, zone2 := range zone2s {
u.Zones2[zone2] = 1.0
}
} else if key == _Tag {
tags := strings.Split(value, ",")
for _, tag := range tags {
u.BiliTags[tag] = 1.0
}
} else if key == _Up {
ups := strings.Split(value, ",")
for _, upStr := range ups {
upMID, _ := strconv.ParseInt(upStr, 10, 64)
u.FollowUps[upMID] = 1
}
}
}
}
func setUserLikeInfo(bytes *[]byte, inErr error, u *model.UserProfile) {
var object struct {
SVID int64 `json:"svid"`
CTime int64 `json:"ctime"`
BuvID string `json:"buvid"`
}
var res map[string]string
var err error
if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
if err != redis.ErrNil {
log.Error("redis HGETALL failed error(%v)", err)
}
}
for _, value := range res {
err = jsoniter.UnmarshalFromString(value, &object)
if err != nil {
log.Error("json parse error: %v", err)
}
u.LikeVideos[object.SVID] = object.CTime
}
}
func setUserFollowInfo(bytes *[]byte, inErr error, u *model.UserProfile) {
var object struct {
UpID int64 `json:"upid"`
CTime int64 `json:"ctime"`
MID int64 `json:"mid"`
}
var res map[string]string
var err error
if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
if err != redis.ErrNil {
log.Error("user real time follow redis HGETALL failed error(%v)", err)
}
}
for _, value := range res {
err = jsoniter.UnmarshalFromString(value, &object)
if err != nil {
log.Error("json parse error: %v", err)
}
u.BBQFollowAction[object.UpID] = object.CTime
}
}
func setUserPlayInfo(bytes *[]byte, inErr error, u *model.UserProfile) {
var object struct {
Svid int64 `json:"svid"`
CTime int64 `json:"ctime"`
Duration int64 `json:"duration"`
ViewDuration int64 `json:"viewDuration"`
}
var res map[string]string
var err error
if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
if err != redis.ErrNil {
log.Error("redis HGETALL failed error(%v)", err)
} else {
err = nil
}
}
for _, value := range res {
err = jsoniter.UnmarshalFromString(value, &object)
if err != nil {
log.Error("json parse error: %v", err)
continue
}
u.ViewVideos = append(u.ViewVideos, object.Svid)
if object.ViewDuration >= 15000 || (object.Duration >= 5000 && float64(object.ViewDuration) >= 0.95*float64(object.Duration)) {
u.PosVideos[object.Svid] = object.CTime
}
if object.ViewDuration <= 500 {
u.NegVideos[object.Svid] = object.CTime
}
}
}
func setLastPage(bytes *[]byte, u *model.UserProfile, lastRecordType string) {
var results []model.Record4Dup
if len(*bytes) == 0 {
return
}
err := jsoniter.Unmarshal(*bytes, &results)
if err != nil {
log.Error("UnmarshalFromString value(%v) error(%v)", bytes, err)
} else {
if lastRecordType == "lastRecords" {
u.LastRecords = results
} else {
u.LastUpsRecords = results
}
}
}