534 lines
13 KiB
Go
534 lines
13 KiB
Go
package service
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"crypto/md5"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"go-common/app/job/main/push/dao"
|
|
pb "go-common/app/service/main/push/api/grpc/v1"
|
|
pushmdl "go-common/app/service/main/push/model"
|
|
xsql "go-common/library/database/sql"
|
|
"go-common/library/log"
|
|
"go-common/library/sync/errgroup"
|
|
)
|
|
|
|
const (
|
|
_delTaskLimit = 5000
|
|
)
|
|
|
|
var (
|
|
errEmptyLine = errors.New("empty line")
|
|
errInvalidMid = errors.New("invalid mid format")
|
|
errInvalidToken = errors.New("invalid token format")
|
|
)
|
|
|
|
func (s *Service) addTaskproc() {
|
|
defer s.addTaskWg.Done()
|
|
var err error
|
|
for {
|
|
task, ok := <-s.addTaskCh
|
|
if !ok {
|
|
log.Info("add task channel exit")
|
|
return
|
|
}
|
|
if task == nil {
|
|
continue
|
|
}
|
|
task.Status = pushmdl.TaskStatusPrepared
|
|
for i := 0; i < _retry; i++ {
|
|
if err = s.dao.AddTask(context.Background(), task); err == nil {
|
|
break
|
|
}
|
|
}
|
|
if err != nil {
|
|
log.Error("add task(%+v) error(%v)", task, err)
|
|
s.cache.Save(func() {
|
|
s.dao.SendWechat(fmt.Sprintf("add task(%d)", task.Job))
|
|
})
|
|
continue
|
|
}
|
|
dao.PromInfo("add task")
|
|
time.Sleep(time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func (s *Service) delTasksproc() {
|
|
for {
|
|
now := time.Now()
|
|
// 每天2点时删除一个月前的task数据
|
|
if now.Hour() != 2 {
|
|
time.Sleep(time.Minute)
|
|
continue
|
|
}
|
|
var (
|
|
err error
|
|
deleted int64
|
|
b = now.Add(time.Duration(-s.c.Job.DelTaskInterval*24) * time.Hour)
|
|
loc, _ = time.LoadLocation("Local")
|
|
t = time.Date(b.Year(), b.Month(), b.Day(), 23, 59, 59, 0, loc)
|
|
)
|
|
for {
|
|
if deleted, err = s.dao.DelTasks(context.TODO(), t, _delTaskLimit); err != nil {
|
|
log.Error("s.delTasks(%v) error(%v)", t, err)
|
|
s.dao.SendWechat("DB操作失败:push-job删除task数据错误")
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
if deleted < _delTaskLimit {
|
|
break
|
|
}
|
|
time.Sleep(time.Second)
|
|
}
|
|
time.Sleep(time.Hour)
|
|
}
|
|
}
|
|
|
|
func (s *Service) pretreatTaskproc() {
|
|
defer s.waiter.Done()
|
|
for {
|
|
if s.closed {
|
|
return
|
|
}
|
|
task, err := s.pickPretreatmentTask()
|
|
if err != nil {
|
|
time.Sleep(5 * time.Second)
|
|
continue
|
|
}
|
|
if task != nil {
|
|
log.Info("pretreat task job(%d) id(%s)", task.Job, task.ID)
|
|
if err = s.pretreatTask(task); err != nil {
|
|
log.Error("pretreat task(%+v) error(%v)", task, err)
|
|
s.cache.Save(func() { s.dao.SendWechat(fmt.Sprintf("pretreat task(%s) error", task.ID)) })
|
|
}
|
|
}
|
|
time.Sleep(time.Duration(s.c.Job.LoadTaskInteval))
|
|
}
|
|
}
|
|
|
|
func (s *Service) pickPretreatmentTask() (t *pushmdl.Task, err error) {
|
|
c := context.Background()
|
|
var tx *xsql.Tx
|
|
if tx, err = s.dao.BeginTx(c); err != nil {
|
|
log.Error("tx.BeginTx() error(%v)", err)
|
|
return
|
|
}
|
|
if t, err = s.dao.TxTaskByStatus(tx, pushmdl.TaskStatusPretreatmentPrepared); err != nil {
|
|
if e := tx.Rollback(); e != nil {
|
|
dao.PromError("task:获取新任务")
|
|
log.Error("tx.Rollback() error(%v)", e)
|
|
}
|
|
return
|
|
}
|
|
if t == nil {
|
|
if e := tx.Rollback(); e != nil {
|
|
dao.PromError("task:获取新任务")
|
|
log.Error("tx.Rollback() error(%v)", e)
|
|
}
|
|
return
|
|
}
|
|
if err = s.dao.TxUpdateTaskStatus(tx, t.ID, pushmdl.TaskStatusPretreatmentDoing); err != nil {
|
|
if e := tx.Rollback(); e != nil {
|
|
dao.PromError("task:更新任务状态")
|
|
log.Error("tx.Rollback() error(%v)", e)
|
|
}
|
|
return
|
|
}
|
|
if err = tx.Commit(); err != nil {
|
|
dao.PromError("task:获取新任务commit")
|
|
log.Error("tx.Commit() error(%v)", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *Service) pretreatTask(t *pushmdl.Task) (err error) {
|
|
id, _ := strconv.ParseInt(t.ID, 10, 64)
|
|
switch t.Type {
|
|
case pushmdl.TaskTypeAll:
|
|
err = s.pretreatTaskAll(t)
|
|
case pushmdl.TaskTypeMngToken, pushmdl.TaskTypeDataPlatformToken, pushmdl.TaskTypeDataPlatformMid:
|
|
err = s.pretreatTaskToken(t)
|
|
case pushmdl.TaskTypeStrategyMid, pushmdl.TaskTypeMngMid:
|
|
err = s.pretreatTaskMid(t)
|
|
default:
|
|
log.Error("invalid task type, (%+v)", t)
|
|
}
|
|
if err != nil {
|
|
err = s.dao.UpdateTaskStatus(context.Background(), id, pushmdl.TaskStatusPretreatmentFailed)
|
|
return
|
|
}
|
|
err = s.dao.UpdateTaskStatus(context.Background(), id, pushmdl.TaskStatusPretreatmentDone)
|
|
return
|
|
}
|
|
|
|
func (s *Service) pretreatTaskAll(t *pushmdl.Task) (err error) {
|
|
log.Info("AddTaskAll start, task(%+v)", t)
|
|
var (
|
|
maxID int64
|
|
group = errgroup.Group{}
|
|
)
|
|
maxID, err = s.dao.ReportLastID(context.Background())
|
|
if err != nil || maxID <= 0 {
|
|
log.Error("s.pretreatTaskAll() error(%v)", err)
|
|
s.cache.Save(func() {
|
|
s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) ReportLastID(%d) error", t.ID, maxID))
|
|
})
|
|
return
|
|
}
|
|
log.Info("AddTaskAll get last report ID(%d)", maxID)
|
|
buildCount := len(t.Build)
|
|
batch := maxID / int64(s.c.Job.TaskGoroutines)
|
|
for j := 0; j < s.c.Job.TaskGoroutines; j++ {
|
|
begin := int64(j) * batch
|
|
end := begin + batch
|
|
group.Go(func() (e error) {
|
|
var (
|
|
path string
|
|
rows *xsql.Rows
|
|
tokens = make(map[int][]string)
|
|
)
|
|
for {
|
|
if begin >= end {
|
|
break
|
|
}
|
|
l := begin + int64(_dbBatch)
|
|
if l >= end {
|
|
l = end
|
|
}
|
|
log.Info("AddTaskAll load reports start(%d) end(%d)", begin, l)
|
|
if rows, e = s.dao.ReportsTaskAll(context.Background(), begin, l, t.APPID); e != nil {
|
|
log.Error("s.dao.ReportsTaskAll(%d,%d,%d) error(%v)", begin, l, t.APPID)
|
|
s.cache.Save(func() {
|
|
s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) ReportsTaskAll(%d,%d,%d) error", t.ID, begin, l, t.APPID))
|
|
})
|
|
return
|
|
}
|
|
for rows.Next() {
|
|
var (
|
|
platformID int
|
|
build int
|
|
token string
|
|
)
|
|
if e = rows.Scan(&platformID, &token, &build); e != nil {
|
|
log.Error("AddTaskAll rows.Scan() error(%v)", e)
|
|
s.cache.Save(func() {
|
|
s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) ReportsTaskAll(%d,%d,%d) error", t.ID, begin, l, t.APPID))
|
|
})
|
|
return
|
|
}
|
|
if buildCount > 0 && !pushmdl.ValidateBuild(platformID, build, t.Build) {
|
|
continue
|
|
}
|
|
tokens[platformID] = append(tokens[platformID], token)
|
|
if len(tokens[platformID]) >= s.c.Job.LimitPerTask {
|
|
if path, e = s.saveFile(tokens[platformID]); e != nil {
|
|
log.Error("AddTaskAll s.saveTokens error(%v)", e)
|
|
s.cache.Save(func() {
|
|
s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) saveTokens error(%v)", t.ID, e))
|
|
})
|
|
return
|
|
}
|
|
tokens[platformID] = []string{}
|
|
task := *t
|
|
task.MidFile = path
|
|
task.PlatformID = platformID
|
|
s.addTaskCh <- &task
|
|
}
|
|
}
|
|
begin = l
|
|
}
|
|
for p, v := range tokens {
|
|
if len(v) == 0 {
|
|
continue
|
|
}
|
|
if path, e = s.saveFile(v); e == nil {
|
|
task := *t
|
|
task.MidFile = path
|
|
task.PlatformID = p
|
|
s.addTaskCh <- &task
|
|
}
|
|
}
|
|
return
|
|
})
|
|
}
|
|
if err = group.Wait(); err != nil {
|
|
log.Error("add task all, task(%+v) error(%v)", t, err)
|
|
s.cache.Save(func() {
|
|
s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) error(%v)", t.ID, err))
|
|
})
|
|
return
|
|
}
|
|
log.Info("AddTaskAll end, task(%+v)", t)
|
|
s.cache.Save(func() {
|
|
s.dao.SendWechat(fmt.Sprintf("add task all success, job(%d)", t.Job))
|
|
})
|
|
return
|
|
}
|
|
|
|
func (s *Service) pretreatTaskMid(t *pushmdl.Task) (err error) {
|
|
f, err := os.Open(t.MidFile)
|
|
if err != nil {
|
|
log.Error("pretreatTaskMid(%+v) open file error(%v)", t, err)
|
|
return
|
|
}
|
|
defer f.Close()
|
|
var (
|
|
exit bool
|
|
line string
|
|
path string
|
|
mid int64
|
|
counter int
|
|
midTotal int64
|
|
midValid int64
|
|
mu sync.Mutex
|
|
mids []int64
|
|
tokens = make(map[int][]string)
|
|
group = errgroup.Group{}
|
|
reader = bufio.NewReader(f)
|
|
)
|
|
for {
|
|
if exit {
|
|
break
|
|
}
|
|
if line, err = reader.ReadString('\n'); err != nil {
|
|
if err == io.EOF {
|
|
exit = true
|
|
} else {
|
|
log.Error("read file error(%v)", err)
|
|
continue
|
|
}
|
|
}
|
|
if mid, err = parseMidLine(line); err != nil {
|
|
log.Error("parse mid line(%s) error(%v)", line, err)
|
|
continue
|
|
}
|
|
midTotal++
|
|
mids = append(mids, mid)
|
|
if len(mids) >= s.c.Job.PushPartSize {
|
|
midsCp := make([]int64, len(mids))
|
|
copy(midsCp, mids)
|
|
mids = []int64{}
|
|
group.Go(func() (e error) {
|
|
ts, valid, e := s.tokensByMids(t, midsCp)
|
|
if e != nil {
|
|
log.Error("s.tokensByMids(%v) error(%v)", t.ID, e)
|
|
return
|
|
}
|
|
tcopy := make(map[int][]string)
|
|
mu.Lock()
|
|
midValid += valid
|
|
for p, v := range ts {
|
|
tokens[p] = append(tokens[p], v...)
|
|
if len(tokens[p]) >= s.c.Job.LimitPerTask {
|
|
tcopy[p] = append(tcopy[p], tokens[p]...)
|
|
tokens[p] = []string{}
|
|
}
|
|
}
|
|
mu.Unlock()
|
|
for p, v := range tcopy {
|
|
if path, err = s.saveFile(v); err != nil {
|
|
log.Error("pretreatTaskMid s.saveFild error(%v)", err)
|
|
s.cache.Save(func() {
|
|
s.dao.SendWechat(fmt.Sprintf("pretreatTaskMid(%v) saveTokens error(%v)", t.ID, err))
|
|
})
|
|
return
|
|
}
|
|
task := *t
|
|
task.MidFile = path
|
|
task.PlatformID = p
|
|
s.addTaskCh <- &task
|
|
}
|
|
return
|
|
})
|
|
counter++
|
|
if counter == s.c.Job.PushPartChanSize {
|
|
group.Wait()
|
|
counter = 0
|
|
}
|
|
}
|
|
}
|
|
if counter > 0 {
|
|
group.Wait()
|
|
}
|
|
if len(mids) > 0 {
|
|
var (
|
|
valid int64
|
|
ts map[int][]string
|
|
)
|
|
if ts, valid, err = s.tokensByMids(t, mids); err == nil {
|
|
midValid += valid
|
|
for p, v := range ts {
|
|
tokens[p] = append(tokens[p], v...)
|
|
}
|
|
} else {
|
|
log.Error("s.tokensByMids(%+v) error(%v)", t, err)
|
|
}
|
|
}
|
|
s.cache.Save(func() {
|
|
arg := &pb.AddMidProgressRequest{Task: t.ID, MidTotal: midTotal, MidValid: midValid}
|
|
if _, e := s.pushRPC.AddMidProgress(context.Background(), arg); e != nil {
|
|
log.Error("s.pushRPC.AddMidProgress(%+v) error(%v)", arg, e)
|
|
}
|
|
})
|
|
for p, v := range tokens {
|
|
if len(v) == 0 {
|
|
continue
|
|
}
|
|
if path, err = s.saveFile(v); err != nil {
|
|
log.Error("pretreatTaskMid s.saveFild error(%v)", err)
|
|
return
|
|
}
|
|
task := *t
|
|
task.MidFile = path
|
|
task.PlatformID = p
|
|
s.addTaskCh <- &task
|
|
}
|
|
log.Info("pretreatTaskMid task(%+v)", t)
|
|
return
|
|
}
|
|
|
|
func (s *Service) pretreatTaskToken(t *pushmdl.Task) (err error) {
|
|
f, err := os.Open(t.MidFile)
|
|
if err != nil {
|
|
log.Error("pretreatTaskToken(%+v) open file error(%v)", t, err)
|
|
return
|
|
}
|
|
defer f.Close()
|
|
var (
|
|
exit bool
|
|
plat int
|
|
line string
|
|
token string
|
|
path string
|
|
tokens = make(map[int][]string)
|
|
reader = bufio.NewReader(f)
|
|
)
|
|
for {
|
|
if exit {
|
|
break
|
|
}
|
|
if line, err = reader.ReadString('\n'); err != nil {
|
|
if err == io.EOF {
|
|
exit = true // no 'continue', solve the last line whitout '\n'
|
|
} else {
|
|
log.Error("read file error(%v)", err)
|
|
continue
|
|
}
|
|
}
|
|
if plat, token, err = parseTokenLine(line); err != nil {
|
|
log.Error("parse token line(%s) error(%v)", line, err)
|
|
continue
|
|
}
|
|
tokens[plat] = append(tokens[plat], token)
|
|
if len(tokens[plat]) >= s.c.Job.LimitPerTask {
|
|
if path, err = s.saveFile(tokens[plat]); err != nil {
|
|
log.Error("pretreatTaskToken s.saveFile error(%v)", err)
|
|
s.cache.Save(func() {
|
|
s.dao.SendWechat(fmt.Sprintf("pretreatTaskToken(%v) saveTokens error(%v)", t.ID, err))
|
|
})
|
|
return
|
|
}
|
|
tokens[plat] = []string{}
|
|
task := *t
|
|
task.MidFile = path
|
|
task.PlatformID = plat
|
|
s.addTaskCh <- &task
|
|
}
|
|
}
|
|
for p, v := range tokens {
|
|
if len(v) == 0 {
|
|
continue
|
|
}
|
|
if path, err = s.saveFile(v); err == nil {
|
|
task := *t
|
|
task.MidFile = path
|
|
task.PlatformID = p
|
|
s.addTaskCh <- &task
|
|
}
|
|
}
|
|
log.Info("pretreatTaskToken task(%+v)", t)
|
|
return
|
|
}
|
|
|
|
func parseTokenLine(line string) (plat int, token string, err error) {
|
|
line = strings.Trim(line, " \r\n")
|
|
if line == "" {
|
|
err = errEmptyLine
|
|
return
|
|
}
|
|
res := strings.Split(line, "\t")
|
|
if len(res) != 2 {
|
|
err = errInvalidToken
|
|
return
|
|
}
|
|
if res[0] == "" || res[1] == "" {
|
|
err = errInvalidToken
|
|
return
|
|
}
|
|
if plat, err = strconv.Atoi(res[0]); err != nil || plat <= 0 {
|
|
err = errInvalidToken
|
|
return
|
|
}
|
|
token = res[1]
|
|
return
|
|
}
|
|
|
|
func parseMidLine(line string) (mid int64, err error) {
|
|
line = strings.Trim(line, " \r\t\n")
|
|
if line == "" {
|
|
err = errEmptyLine
|
|
return
|
|
}
|
|
if mid, err = strconv.ParseInt(line, 10, 64); err != nil || mid <= 0 {
|
|
err = errInvalidMid
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *Service) saveFile(tokens []string) (path string, err error) {
|
|
name := strconv.FormatInt(time.Now().UnixNano(), 10) + tokens[0]
|
|
data := []byte(strings.Join(tokens, "\n"))
|
|
for i := 0; i < _retry; i++ {
|
|
if path, err = s.saveNASFile(name, data); err == nil {
|
|
break
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
return
|
|
}
|
|
|
|
// saveNASFile writes data into NAS.
|
|
func (s *Service) saveNASFile(name string, data []byte) (path string, err error) {
|
|
name = fmt.Sprintf("%x", md5.Sum([]byte(name)))
|
|
dir := fmt.Sprintf("%s/%s/%s", strings.TrimSuffix(s.c.Job.MountDir, "/"), time.Now().Format("20060102"), name[:2])
|
|
if _, err = os.Stat(dir); err != nil {
|
|
if !os.IsNotExist(err) {
|
|
log.Error("os.IsNotExist(%s) error(%v)", dir, err)
|
|
return
|
|
}
|
|
if err = os.MkdirAll(dir, 0777); err != nil {
|
|
log.Error("os.MkdirAll(%s) error(%v)", dir, err)
|
|
return
|
|
}
|
|
}
|
|
path = fmt.Sprintf("%s/%s", dir, name)
|
|
f, err := os.OpenFile(path, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
|
|
if err != nil {
|
|
log.Error("s.saveNASFile(%s) OpenFile() error(%v)", path, err)
|
|
return
|
|
}
|
|
defer f.Close()
|
|
if _, err = f.Write(data); err != nil {
|
|
log.Error("s.saveNASFile(%s) f.Write() error(%v)", err)
|
|
}
|
|
return
|
|
}
|