bilibili-backup/app/service/ops/log-agent/pkg/httpstream/httpstream.go
2019-04-22 02:59:20 +00:00

145 lines
4.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package httpstream
import (
"net/http"
"sync"
"regexp"
"bytes"
"strconv"
"context"
"go-common/app/service/ops/log-agent/event"
)
type HttpStream struct {
c *Config
logstreams map[chan *event.ProcessorEvent]*filterRule
l sync.Mutex
}
type filterRule struct {
maxLines int
appId string
instanceId string
reg *regexp.Regexp
}
var LogSourceChan = make(chan *event.ProcessorEvent)
// initlogStream init log stream
func NewHttpStream(config *Config) (httpStream *HttpStream, err error) {
h := new(HttpStream)
if err := config.ConfigValidate(); err != nil {
return nil, err
}
h.c = config
h.logstreams = make(map[chan *event.ProcessorEvent]*filterRule)
http.HandleFunc("/logs", h.LogStreamer())
go h.route()
go http.ListenAndServe(h.c.Addr, nil)
return h, nil
}
// route 把日志路由到所有注册的logstream
func (s *HttpStream) route() {
for buf := range LogSourceChan {
for logstream, _ := range s.logstreams {
logstream <- buf
}
}
}
// LogStreamer 接收请求
func (s *HttpStream) LogStreamer() func(w http.ResponseWriter, req *http.Request) {
logsHandler := func(w http.ResponseWriter, req *http.Request) {
logstream := make(chan *event.ProcessorEvent)
f := new(filterRule)
// parse params
params := req.URL.Query()
if appId, ok := params["app_id"]; ok {
f.appId = appId[0]
} else {
w.Write(append([]byte("必须指定app_id"), '\n'))
return
}
if reg, ok := params["regexp"]; ok {
if filterReg, err := regexp.Compile(reg[0]); err == nil {
f.reg = filterReg
} else {
w.Write(append([]byte("正则表达式格式错误"), '\n'))
return
}
}
if instanceId, ok := params["instance_id"]; ok {
f.instanceId = instanceId[0]
}
if maxLines, ok := params["max_lines"]; ok {
if n, err := strconv.Atoi(maxLines[0]); err == nil {
f.maxLines = n
} else {
w.Write(append([]byte("max_lines格式错误"), '\n'))
return
}
}
s.add(logstream, f)
go func() {
select {
case <-req.Context().Done():
s.remove(logstream)
}
}()
defer s.httpStreamer(req.Context(), w, req, logstream, f)
}
return logsHandler
}
// httpStreamer 过滤并输出日志
func (s *HttpStream) httpStreamer(ctx context.Context, w http.ResponseWriter, req *http.Request, logstream chan *event.ProcessorEvent, f *filterRule) {
w.Header().Add("Content-Type", "text/plain; charset=utf-8")
if f.instanceId != "" {
//w.Write([]byte(fmt.Sprintf(
// "\x1b[1;31m%s\x1b[0m\n", "注意caster中只有(1)app_id满足服务树三级格式 (2)日志包含instance_id且值为实例名称 的情况下,日志才能输出\n")))
w.Write([]byte("注意caster中只有(1)app_id满足服务树三级格式 (2)日志包含instance_id且值为实例名称 的情况下,日志才能输出 \n"))
w.(http.Flusher).Flush()
}
c := 0
for {
select {
case e := <-logstream:
if f.appId != "" && string(e.AppId) != f.appId {
continue
}
if f.reg != nil && !f.reg.Match(e.Bytes()) {
continue
}
if f.instanceId != "" && !bytes.Contains(e.Bytes(), []byte(f.instanceId)) {
continue
}
if f.maxLines != 0 && c >= f.maxLines {
s.remove(logstream)
return
}
c += 1
// TODO event recycle
w.Write(append(e.Bytes(), '\n'))
w.(http.Flusher).Flush()
case <-ctx.Done():
return
}
}
}
// add 注册logstream
func (s *HttpStream) add(logstream chan *event.ProcessorEvent, f *filterRule) {
s.l.Lock()
defer s.l.Unlock()
s.logstreams[logstream] = f
}
// remove 注销logstream
func (s *HttpStream) remove(logstream chan *event.ProcessorEvent) {
s.l.Lock()
defer s.l.Unlock()
delete(s.logstreams, logstream)
}