145 lines
4.5 KiB
Go
145 lines
4.5 KiB
Go
|
package httpstream
|
|||
|
|
|||
|
import (
|
|||
|
"net/http"
|
|||
|
"sync"
|
|||
|
"regexp"
|
|||
|
"bytes"
|
|||
|
"strconv"
|
|||
|
"context"
|
|||
|
"go-common/app/service/ops/log-agent/event"
|
|||
|
)
|
|||
|
|
|||
|
type HttpStream struct {
|
|||
|
c *Config
|
|||
|
logstreams map[chan *event.ProcessorEvent]*filterRule
|
|||
|
l sync.Mutex
|
|||
|
}
|
|||
|
type filterRule struct {
|
|||
|
maxLines int
|
|||
|
appId string
|
|||
|
instanceId string
|
|||
|
reg *regexp.Regexp
|
|||
|
}
|
|||
|
|
|||
|
var LogSourceChan = make(chan *event.ProcessorEvent)
|
|||
|
|
|||
|
// initlogStream init log stream
|
|||
|
func NewHttpStream(config *Config) (httpStream *HttpStream, err error) {
|
|||
|
h := new(HttpStream)
|
|||
|
if err := config.ConfigValidate(); err != nil {
|
|||
|
return nil, err
|
|||
|
}
|
|||
|
h.c = config
|
|||
|
h.logstreams = make(map[chan *event.ProcessorEvent]*filterRule)
|
|||
|
http.HandleFunc("/logs", h.LogStreamer())
|
|||
|
go h.route()
|
|||
|
go http.ListenAndServe(h.c.Addr, nil)
|
|||
|
return h, nil
|
|||
|
}
|
|||
|
|
|||
|
// route 把日志路由到所有注册的logstream
|
|||
|
func (s *HttpStream) route() {
|
|||
|
for buf := range LogSourceChan {
|
|||
|
for logstream, _ := range s.logstreams {
|
|||
|
logstream <- buf
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// LogStreamer 接收请求
|
|||
|
func (s *HttpStream) LogStreamer() func(w http.ResponseWriter, req *http.Request) {
|
|||
|
logsHandler := func(w http.ResponseWriter, req *http.Request) {
|
|||
|
logstream := make(chan *event.ProcessorEvent)
|
|||
|
f := new(filterRule)
|
|||
|
// parse params
|
|||
|
params := req.URL.Query()
|
|||
|
if appId, ok := params["app_id"]; ok {
|
|||
|
f.appId = appId[0]
|
|||
|
} else {
|
|||
|
w.Write(append([]byte("必须指定app_id"), '\n'))
|
|||
|
return
|
|||
|
}
|
|||
|
if reg, ok := params["regexp"]; ok {
|
|||
|
if filterReg, err := regexp.Compile(reg[0]); err == nil {
|
|||
|
f.reg = filterReg
|
|||
|
} else {
|
|||
|
w.Write(append([]byte("正则表达式格式错误"), '\n'))
|
|||
|
return
|
|||
|
}
|
|||
|
}
|
|||
|
if instanceId, ok := params["instance_id"]; ok {
|
|||
|
f.instanceId = instanceId[0]
|
|||
|
}
|
|||
|
|
|||
|
if maxLines, ok := params["max_lines"]; ok {
|
|||
|
if n, err := strconv.Atoi(maxLines[0]); err == nil {
|
|||
|
f.maxLines = n
|
|||
|
} else {
|
|||
|
w.Write(append([]byte("max_lines格式错误"), '\n'))
|
|||
|
return
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
s.add(logstream, f)
|
|||
|
go func() {
|
|||
|
select {
|
|||
|
case <-req.Context().Done():
|
|||
|
s.remove(logstream)
|
|||
|
}
|
|||
|
}()
|
|||
|
defer s.httpStreamer(req.Context(), w, req, logstream, f)
|
|||
|
}
|
|||
|
return logsHandler
|
|||
|
}
|
|||
|
|
|||
|
// httpStreamer 过滤并输出日志
|
|||
|
func (s *HttpStream) httpStreamer(ctx context.Context, w http.ResponseWriter, req *http.Request, logstream chan *event.ProcessorEvent, f *filterRule) {
|
|||
|
w.Header().Add("Content-Type", "text/plain; charset=utf-8")
|
|||
|
if f.instanceId != "" {
|
|||
|
//w.Write([]byte(fmt.Sprintf(
|
|||
|
// "\x1b[1;31m%s\x1b[0m\n", "注意:caster中,只有(1)app_id满足服务树三级格式 (2)日志包含instance_id且值为实例名称 的情况下,日志才能输出\n")))
|
|||
|
w.Write([]byte("注意:caster中,只有(1)app_id满足服务树三级格式 (2)日志包含instance_id且值为实例名称 的情况下,日志才能输出 \n"))
|
|||
|
w.(http.Flusher).Flush()
|
|||
|
}
|
|||
|
c := 0
|
|||
|
for {
|
|||
|
select {
|
|||
|
case e := <-logstream:
|
|||
|
if f.appId != "" && string(e.AppId) != f.appId {
|
|||
|
continue
|
|||
|
}
|
|||
|
if f.reg != nil && !f.reg.Match(e.Bytes()) {
|
|||
|
continue
|
|||
|
}
|
|||
|
if f.instanceId != "" && !bytes.Contains(e.Bytes(), []byte(f.instanceId)) {
|
|||
|
continue
|
|||
|
}
|
|||
|
if f.maxLines != 0 && c >= f.maxLines {
|
|||
|
s.remove(logstream)
|
|||
|
return
|
|||
|
}
|
|||
|
c += 1
|
|||
|
// TODO event recycle
|
|||
|
w.Write(append(e.Bytes(), '\n'))
|
|||
|
w.(http.Flusher).Flush()
|
|||
|
case <-ctx.Done():
|
|||
|
return
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// add 注册logstream
|
|||
|
func (s *HttpStream) add(logstream chan *event.ProcessorEvent, f *filterRule) {
|
|||
|
s.l.Lock()
|
|||
|
defer s.l.Unlock()
|
|||
|
s.logstreams[logstream] = f
|
|||
|
}
|
|||
|
|
|||
|
// remove 注销logstream
|
|||
|
func (s *HttpStream) remove(logstream chan *event.ProcessorEvent) {
|
|||
|
s.l.Lock()
|
|||
|
defer s.l.Unlock()
|
|||
|
delete(s.logstreams, logstream)
|
|||
|
}
|