bilibili-backup/app/service/main/relation/dao/redis.go
2019-04-22 02:59:20 +00:00

331 lines
8.6 KiB
Go

package dao
import (
"context"
"fmt"
"strconv"
gtime "time"
"go-common/app/service/main/relation/model"
"go-common/library/cache/redis"
"go-common/library/log"
"go-common/library/time"
)
const (
_prefixFollowings = "at_" // key of public following with tags datas.
_prefixMonitor = "rs_mo_list" // key of monitor
_prefixRecentFollower = "rf_" // recent follower sorted set
_prefixRecentFollowerTime = "rft_" // recent follower sorted set
_prefixDailyNotifyCount = "dnc_%d_%s" // daily new-follower notificaiton count
_notifyCountExpire = 24 * 3600 // notify count scope is daily
)
func followingsKey(mid int64) string {
return _prefixFollowings + strconv.FormatInt(mid, 10)
}
func monitorKey() string {
return _prefixMonitor
}
func recentFollower(mid int64) string {
return _prefixRecentFollower + strconv.FormatInt(mid, 10)
}
func recentFollowerNotify(mid int64) string {
return _prefixRecentFollowerTime + strconv.FormatInt(mid%10000, 10)
}
func dailyNotifyCount(mid int64, date gtime.Time) string {
// _cacheShard 作为sharding
return fmt.Sprintf(_prefixDailyNotifyCount, mid%_cacheShard, date.Format("2006-01-02"))
}
// pingRedis ping redis.
func (d *Dao) pingRedis(c context.Context) (err error) {
conn := d.redis.Get(c)
if _, err = conn.Do("SET", "PING", "PONG"); err != nil {
log.Error("conn.Do(SET,PING,PONG) error(%v)", err)
}
conn.Close()
return
}
// SetFollowingsCache set followings cache.
func (d *Dao) SetFollowingsCache(c context.Context, mid int64, followings []*model.Following) (err error) {
key := followingsKey(mid)
args := redis.Args{}.Add(key)
expire := d.redisExpire
if len(followings) == 0 {
expire = 7200
}
ef, _ := d.encode(0, 0, nil, 0)
args = args.Add(0, ef)
for i := 0; i < len(followings); i++ {
var ef []byte
if ef, err = d.encode(followings[i].Attribute, followings[i].MTime, followings[i].Tag, followings[i].Special); err != nil {
return
}
args = args.Add(followings[i].Mid, ef)
}
conn := d.redis.Get(c)
defer conn.Close()
if err = conn.Send("DEL", key); err != nil {
log.Error("conn.Send(DEL, %s) error(%v)", key, err)
return
}
if err = conn.Send("HMSET", args...); err != nil {
log.Error("conn.Send(HMSET, %s) error(%v)", key, err)
return
}
if err = conn.Send("EXPIRE", key, expire); err != nil {
log.Error("conn.Send(EXPIRE, %s) error(%v)", key, err)
return
}
if err = conn.Flush(); err != nil {
log.Error("conn.Flush() error(%v)", err)
return
}
for i := 0; i < 3; i++ {
if _, err = conn.Receive(); err != nil {
log.Error("conn.Receive() %d error(%v)", i+1, err)
break
}
}
return
}
// AddFollowingCache add following cache.
func (d *Dao) AddFollowingCache(c context.Context, mid int64, following *model.Following) (err error) {
var (
ok bool
key = followingsKey(mid)
)
conn := d.redis.Get(c)
if ok, err = redis.Bool(conn.Do("EXPIRE", key, d.redisExpire)); err != nil {
log.Error("redis.Bool(conn.Do(EXPIRE, %s)) error(%v)", key, err)
} else if ok {
var ef []byte
if ef, err = d.encode(following.Attribute, following.MTime, following.Tag, following.Special); err != nil {
return
}
if _, err = conn.Do("HSET", key, following.Mid, ef); err != nil {
log.Error("conn.Do(HSET, %s, %d) error(%v)", key, following.Mid, err)
}
}
conn.Close()
return
}
// DelFollowing del following cache.
func (d *Dao) DelFollowing(c context.Context, mid int64, following *model.Following) (err error) {
var (
ok bool
key = followingsKey(mid)
)
conn := d.redis.Get(c)
if ok, err = redis.Bool(conn.Do("EXPIRE", key, d.redisExpire)); err != nil {
log.Error("redis.Bool(conn.Do(EXPIRE, %s)) error(%v)", key, err)
} else if ok {
if _, err = conn.Do("HDEL", key, following.Mid); err != nil {
log.Error("conn.Do(HDEL, %s, %d) error(%v)", key, following.Mid, err)
}
}
conn.Close()
return
}
// FollowingsCache get followings cache.
func (d *Dao) FollowingsCache(c context.Context, mid int64) (followings []*model.Following, err error) {
key := followingsKey(mid)
conn := d.redis.Get(c)
defer conn.Close()
tmp, err := redis.StringMap(conn.Do("HGETALL", key))
if err != nil {
return
}
if err == nil && len(tmp) > 0 {
for k, v := range tmp {
if mid, err = strconv.ParseInt(k, 10, 64); err != nil {
return
}
if mid <= 0 {
continue
}
vf := &model.FollowingTags{}
if err = d.decode([]byte(v), vf); err != nil {
//todo
return
}
followings = append(followings, &model.Following{
Mid: mid,
Attribute: vf.Attr,
Tag: vf.TagIds,
MTime: vf.Ts,
Special: vf.Special,
})
}
}
return
}
// DelFollowingsCache delete followings cache.
func (d *Dao) DelFollowingsCache(c context.Context, mid int64) (err error) {
key := followingsKey(mid)
conn := d.redis.Get(c)
if _, err = conn.Do("DEL", key); err != nil {
log.Error("conn.Do(DEL, %s) error(%v)", key, err)
}
conn.Close()
return
}
// RelationsCache relations cache.
func (d *Dao) RelationsCache(c context.Context, mid int64, fids []int64) (resMap map[int64]*model.Following, err error) {
var retRedis [][]byte
key := followingsKey(mid)
args := redis.Args{}.Add(key)
for _, fid := range fids {
args = args.Add(fid)
}
args.Add(0)
conn := d.redis.Get(c)
defer conn.Close()
if retRedis, err = redis.ByteSlices(conn.Do("HMGET", args...)); err != nil {
log.Error("redis.Int64s(conn.DO(HMGET, %v)) error(%v)", args, err)
return
}
resMap = make(map[int64]*model.Following)
for index, fid := range fids {
if retRedis[index] == nil {
continue
}
v := &model.FollowingTags{}
if err = d.decode(retRedis[index], v); err != nil {
return
}
resMap[fid] = &model.Following{
Mid: fid,
Attribute: v.Attr,
Tag: v.TagIds,
MTime: v.Ts,
Special: v.Special,
}
}
return
}
// encode
func (d *Dao) encode(attribute uint32, mtime time.Time, tagids []int64, special int32) (res []byte, err error) {
ft := &model.FollowingTags{Attr: attribute, Ts: mtime, TagIds: tagids, Special: special}
return ft.Marshal()
}
// decode
func (d *Dao) decode(src []byte, v *model.FollowingTags) (err error) {
return v.Unmarshal(src)
}
// MonitorCache monitor cache
func (d *Dao) MonitorCache(c context.Context, mid int64) (exist bool, err error) {
key := monitorKey()
conn := d.redis.Get(c)
if exist, err = redis.Bool(conn.Do("SISMEMBER", key, mid)); err != nil {
log.Error("redis.Bool(conn.Do(SISMEMBER, %s, %d)) error(%v)", key, mid, err)
}
conn.Close()
return
}
// SetMonitorCache set monitor cache
func (d *Dao) SetMonitorCache(c context.Context, mid int64) (err error) {
var (
key = monitorKey()
conn = d.redis.Get(c)
)
defer conn.Close()
if _, err = conn.Do("SADD", key, mid); err != nil {
log.Error("SADD conn.Do error(%v)", err)
return
}
return
}
// DelMonitorCache del monitor cache
func (d *Dao) DelMonitorCache(c context.Context, mid int64) (err error) {
var (
key = monitorKey()
conn = d.redis.Get(c)
)
defer conn.Close()
if _, err = redis.Int64(conn.Do("SREM", key, mid)); err != nil {
log.Error("SREM conn.Do(%s,%d) err(%v)", key, mid, err)
}
return
}
// LoadMonitorCache load monitor cache
func (d *Dao) LoadMonitorCache(c context.Context, mids []int64) (err error) {
var (
key = monitorKey()
conn = d.redis.Get(c)
)
defer conn.Close()
for _, v := range mids {
if err = conn.Send("SADD", key, v); err != nil {
log.Error("SADD conn.Do error(%v)", err)
return
}
}
if err = conn.Flush(); err != nil {
log.Error("conn.Flush error(%v)", err)
return
}
return
}
// TodayNotifyCountCache get notify count in the current day
func (d *Dao) TodayNotifyCountCache(c context.Context, mid int64) (notifyCount int64, err error) {
var (
key = dailyNotifyCount(mid, gtime.Now())
conn = d.redis.Get(c)
)
defer conn.Close()
if notifyCount, err = redis.Int64(conn.Do("HGET", key, mid)); err != nil {
if err == redis.ErrNil {
err = nil
return
}
log.Error("HGET conn.Do error(%v)", err)
return
}
if err = conn.Flush(); err != nil {
log.Error("conn.Flush error(%v)", err)
return
}
return
}
// IncrTodayNotifyCount increment the today notify count in the current day
func (d *Dao) IncrTodayNotifyCount(c context.Context, mid int64) (err error) {
var (
key = dailyNotifyCount(mid, gtime.Now())
conn = d.redis.Get(c)
)
defer conn.Close()
if err = conn.Send("HINCRBY", key, mid, 1); err != nil {
log.Error("HINCRBY conn.Do error(%v)", err)
return
}
if err = conn.Send("EXPIRE", key, _notifyCountExpire); err != nil {
log.Error("EXPIRE conn.Do error(%v)", err)
return
}
if err = conn.Flush(); err != nil {
log.Error("conn.Flush error(%v)", err)
return
}
return
}