268 lines
7.0 KiB
Go
268 lines
7.0 KiB
Go
package lancergrpc
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"bytes"
|
|
"sync"
|
|
"strconv"
|
|
"time"
|
|
"math"
|
|
|
|
"go-common/app/service/ops/log-agent/event"
|
|
"go-common/app/service/ops/log-agent/output"
|
|
"go-common/app/service/ops/log-agent/pkg/flowmonitor"
|
|
"go-common/app/service/ops/log-agent/pkg/common"
|
|
"go-common/app/service/ops/log-agent/output/cache/file"
|
|
"go-common/library/log"
|
|
"go-common/app/service/ops/log-agent/pkg/lancermonitor"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"go-common/app/service/ops/log-agent/output/lancergrpc/lancergateway"
|
|
)
|
|
|
|
const (
|
|
_appIdKey = `"app_id":`
|
|
_levelKey = `"level":`
|
|
_logTime = `"time":`
|
|
)
|
|
|
|
var (
|
|
logMagic = []byte{0xAC, 0xBE}
|
|
logMagicBuf = []byte{0xAC, 0xBE}
|
|
_logType = []byte{0, 1}
|
|
_logLength = []byte{0, 0, 0, 0}
|
|
local, _ = time.LoadLocation("Local")
|
|
)
|
|
|
|
type logDoc struct {
|
|
b []byte
|
|
logId string
|
|
}
|
|
|
|
func init() {
|
|
err := output.Register("lancergrpc", NewLancer)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
type Lancer struct {
|
|
c *Config
|
|
next chan string
|
|
i chan *event.ProcessorEvent
|
|
cache *file.FileCache
|
|
logAggrBuf map[string]*bytes.Buffer
|
|
logAggrBufLock sync.Mutex
|
|
sendChan chan *logDoc
|
|
lancerClient lancergateway.Gateway2ServerClient
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
func NewLancer(ctx context.Context, config interface{}) (output.Output, error) {
|
|
var err error
|
|
|
|
lancer := new(Lancer)
|
|
if c, ok := config.(*Config); !ok {
|
|
return nil, fmt.Errorf("Error config for Lancer output")
|
|
} else {
|
|
if err = c.ConfigValidate(); err != nil {
|
|
return nil, err
|
|
}
|
|
lancer.c = c
|
|
}
|
|
if output.OutputRunning(lancer.c.Name) {
|
|
return nil, fmt.Errorf("Output %s already running", lancer.c.Name)
|
|
}
|
|
|
|
lancer.i = make(chan *event.ProcessorEvent)
|
|
lancer.next = make(chan string, 1)
|
|
lancer.logAggrBuf = make(map[string]*bytes.Buffer)
|
|
lancer.sendChan = make(chan *logDoc)
|
|
cache, err := file.NewFileCache(lancer.c.CacheConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
lancer.cache = cache
|
|
lancer.lancerClient, err = lancergateway.NewClient(lancer.c.LancerGateway)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
lancer.ctx, lancer.cancel = context.WithCancel(ctx)
|
|
return lancer, nil
|
|
}
|
|
|
|
func (l *Lancer) InputChan() (chan *event.ProcessorEvent) {
|
|
return l.i
|
|
}
|
|
|
|
func (l *Lancer) Run() (err error) {
|
|
go l.readFromProcessor()
|
|
go l.consumeCache()
|
|
go l.flushLogAggrPeriodically()
|
|
for i := 0; i < l.c.SendConcurrency; i++ {
|
|
go l.sendToLancer()
|
|
}
|
|
if l.c.Name != "" {
|
|
output.RegisterOutput(l.c.Name, l)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (l *Lancer) Stop() {
|
|
l.cancel()
|
|
}
|
|
|
|
func (l *Lancer) readFromProcessor() {
|
|
for e := range l.i {
|
|
// only cache for sock input
|
|
if e.Source == "sock" {
|
|
l.cache.WriteToCache(e)
|
|
continue
|
|
}
|
|
// without cache
|
|
l.preWriteToLancer(e)
|
|
}
|
|
}
|
|
|
|
func (l *Lancer) preWriteToLancer(e *event.ProcessorEvent) {
|
|
flowmonitor.Fm.AddEvent(e, "log-agent.output.lancer", "OK", "write to lancer")
|
|
lancermonitor.IncreaseLogCount("agent.send.success.count", e.LogId)
|
|
if l.c.Name == "lancer-ops-log" {
|
|
l.logAggr(e)
|
|
} else {
|
|
l.sendLogDirectToLancer(e)
|
|
}
|
|
}
|
|
|
|
// consumeCache consume logs from cache
|
|
func (l *Lancer) consumeCache() {
|
|
for {
|
|
e := l.cache.ReadFromCache()
|
|
if e.Length < _logLancerHeaderLen {
|
|
event.PutEvent(e)
|
|
continue
|
|
}
|
|
// monitor should be called before event recycle
|
|
l.parseOpslog(e)
|
|
l.preWriteToLancer(e)
|
|
}
|
|
}
|
|
|
|
func (l *Lancer) parseOpslog(e *event.ProcessorEvent) {
|
|
if l.c.Name == "lancer-ops-log" {
|
|
e.AppId, _ = common.SeekValue([]byte(_appIdKey), e.Bytes())
|
|
|
|
if timeValue, err := common.SeekValue([]byte(_logTime), e.Bytes()); err == nil {
|
|
if len(timeValue) >= 19 {
|
|
// parse time
|
|
var t time.Time
|
|
if t, err = time.Parse(time.RFC3339Nano, string(timeValue)); err != nil {
|
|
if t, err = time.ParseInLocation("2006-01-02T15:04:05", string(timeValue), local); err != nil {
|
|
if t, err = time.ParseInLocation("2006-01-02T15:04:05", string(timeValue[0:19]), local); err != nil {
|
|
}
|
|
}
|
|
}
|
|
if !t.IsZero() {
|
|
e.TimeRangeKey = strconv.FormatInt(t.Unix()/100*100, 10)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendLogDirectToLancer send log direct to lancer without aggr
|
|
func (l *Lancer) sendLogDirectToLancer(e *event.ProcessorEvent) {
|
|
logDoc := new(logDoc)
|
|
logDoc.b = make([]byte, e.Length)
|
|
copy(logDoc.b, e.Bytes())
|
|
logDoc.logId = e.LogId
|
|
event.PutEvent(e)
|
|
l.sendChan <- logDoc
|
|
}
|
|
|
|
func (l *Lancer) nextRetry(retry int) (time.Duration) {
|
|
// avoid d too large
|
|
if retry > 10 {
|
|
return time.Duration(l.c.MaxRetryDuration)
|
|
}
|
|
|
|
d := time.Duration(math.Pow(2, float64(retry))) * time.Duration(l.c.InitialRetryDuration)
|
|
|
|
if d > time.Duration(l.c.MaxRetryDuration) {
|
|
return time.Duration(l.c.MaxRetryDuration)
|
|
}
|
|
|
|
return d
|
|
}
|
|
|
|
func (l *Lancer) bulkSendToLancerWithRetry(in *lancergateway.EventList) {
|
|
retry := 0
|
|
for {
|
|
ctx, _ := context.WithTimeout(context.Background(), time.Duration(l.c.SendBatchTimeout))
|
|
t1 := time.Now()
|
|
resp, err := l.lancerClient.SendList(ctx, in)
|
|
if err == nil {
|
|
if resp.Code == lancergateway.StatusCode_SUCCESS {
|
|
log.Info("get 200 from lancer gateway: size %d, count %d, cost %s", in.Size(), len(in.Events), time.Since(t1).String())
|
|
return
|
|
}
|
|
flowmonitor.Fm.Add("log-agent", "log-agent.output.lancer", "", "ERROR", fmt.Sprintf("write to lancer None 200: %s", resp.Code))
|
|
log.Warn("get None 200 from lancer gateway, retry: %s", resp.Code)
|
|
}
|
|
|
|
if err != nil {
|
|
switch grpc.Code(err) {
|
|
case codes.Canceled, codes.DeadlineExceeded, codes.Unavailable, codes.ResourceExhausted:
|
|
flowmonitor.Fm.Add("log-agent", "log-agent.output.lancer", "", "ERROR", fmt.Sprintf("write to lancer failed, retry: %s", err))
|
|
log.Warn("get error from lancer gateway, retry: %s", err)
|
|
default:
|
|
flowmonitor.Fm.Add("log-agent", "log-agent.output.lancer", "", "ERROR", fmt.Sprintf("write to lancer failed, no retry: %s", err))
|
|
log.Warn("get error from lancer gateway, no retry: %s", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
time.Sleep(l.nextRetry(retry))
|
|
retry ++
|
|
}
|
|
}
|
|
|
|
// sendproc send the proc to lancer
|
|
func (l *Lancer) sendToLancer() {
|
|
eventList := new(lancergateway.EventList)
|
|
eventListLock := sync.Mutex{}
|
|
lastSend := time.Now()
|
|
ticker := time.Tick(time.Second * 1)
|
|
size := 0
|
|
for {
|
|
select {
|
|
case <-ticker:
|
|
if lastSend.Add(time.Duration(l.c.SendFlushInterval)).Before(time.Now()) && len(eventList.Events) > 0 {
|
|
eventListLock.Lock()
|
|
l.bulkSendToLancerWithRetry(eventList)
|
|
eventList.Reset()
|
|
size = 0
|
|
eventListLock.Unlock()
|
|
lastSend = time.Now()
|
|
}
|
|
case logDoc := <-l.sendChan:
|
|
event := new(lancergateway.SimpleEvent)
|
|
event.LogId = logDoc.logId
|
|
event.Header = map[string]string{"timestamp": strconv.FormatInt(time.Now().Unix()/100*100, 10)}
|
|
event.Data = logDoc.b
|
|
size += len(event.Data)
|
|
eventListLock.Lock()
|
|
eventList.Events = append(eventList.Events, event)
|
|
if size > l.c.SendBatchSize || len(eventList.Events) > l.c.SendBatchNum {
|
|
l.bulkSendToLancerWithRetry(eventList)
|
|
eventList.Reset()
|
|
size = 0
|
|
lastSend = time.Now()
|
|
}
|
|
eventListLock.Unlock()
|
|
}
|
|
}
|
|
}
|