174 lines
4.5 KiB
Go
174 lines
4.5 KiB
Go
package kafkacollect
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/Shopify/sarama"
|
|
|
|
"go-common/app/service/main/dapper/model"
|
|
"go-common/app/service/main/dapper/pkg/collect"
|
|
"go-common/app/service/main/dapper/pkg/process"
|
|
"go-common/library/log"
|
|
"go-common/library/stat/prom"
|
|
)
|
|
|
|
var (
|
|
collectCount = prom.New().WithCounter("dapper_kafka_collect_count", []string{"name"})
|
|
collectErrCount = prom.New().WithCounter("dapper_kafka_collect_err_count", []string{"name"})
|
|
)
|
|
|
|
// Option set option
|
|
type Option func(*option)
|
|
|
|
type option struct {
|
|
group string
|
|
topic string
|
|
addrs []string
|
|
}
|
|
|
|
func (o option) saramaConfig() *sarama.Config {
|
|
return nil
|
|
}
|
|
|
|
var defaultOption = option{
|
|
group: "default",
|
|
}
|
|
|
|
//func NewConsumer(addrs []string, config *Config) (Consumer, error)
|
|
|
|
// New kafka collect
|
|
func New(topic string, addrs []string, options ...Option) (collect.Collecter, error) {
|
|
log.V(10).Info("new kafkacollect topic %s addrs: %v", topic, addrs)
|
|
if len(addrs) == 0 {
|
|
return nil, fmt.Errorf("kafka addrs required")
|
|
}
|
|
opt := defaultOption
|
|
for _, fn := range options {
|
|
fn(&opt)
|
|
}
|
|
opt.addrs = addrs
|
|
opt.topic = topic
|
|
clt := &kafkaCollect{opt: opt}
|
|
return clt, nil
|
|
}
|
|
|
|
type kafkaCollect struct {
|
|
wg sync.WaitGroup
|
|
opt option
|
|
ps []process.Processer
|
|
consumers []*consumer
|
|
client sarama.Client
|
|
offsetManager sarama.OffsetManager
|
|
baseConsumer sarama.Consumer
|
|
}
|
|
|
|
func (k *kafkaCollect) RegisterProcess(p process.Processer) {
|
|
k.ps = append(k.ps, p)
|
|
}
|
|
|
|
func (k *kafkaCollect) Start() error {
|
|
var err error
|
|
if k.client, err = sarama.NewClient(k.opt.addrs, k.opt.saramaConfig()); err != nil {
|
|
return fmt.Errorf("new kafka client error: %s", err)
|
|
}
|
|
if k.offsetManager, err = sarama.NewOffsetManagerFromClient(k.opt.group, k.client); err != nil {
|
|
return fmt.Errorf("new offset manager error: %s", err)
|
|
}
|
|
if k.baseConsumer, err = sarama.NewConsumerFromClient(k.client); err != nil {
|
|
return fmt.Errorf("new kafka consumer error: %s", err)
|
|
}
|
|
log.Info("kafkacollect consumer from topic: %s addrs: %s", k.opt.topic, k.opt.topic)
|
|
return k.start()
|
|
}
|
|
|
|
func (k *kafkaCollect) handler(protoSpan *model.ProtoSpan) {
|
|
var err error
|
|
for _, p := range k.ps {
|
|
if err = p.Process(context.Background(), protoSpan); err != nil {
|
|
log.Error("process span error: %s, discard", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (k *kafkaCollect) start() error {
|
|
ps, err := k.client.Partitions(k.opt.topic)
|
|
if err != nil {
|
|
return fmt.Errorf("get partitions error: %s", err)
|
|
}
|
|
for _, p := range ps {
|
|
var pom sarama.PartitionOffsetManager
|
|
if pom, err = k.offsetManager.ManagePartition(k.opt.topic, p); err != nil {
|
|
return fmt.Errorf("new manage partition error: %s", err)
|
|
}
|
|
offset, _ := pom.NextOffset()
|
|
if offset == -1 {
|
|
offset = sarama.OffsetOldest
|
|
}
|
|
var c sarama.PartitionConsumer
|
|
log.V(10).Info("partitions %d start offset %d", p, offset)
|
|
if c, err = k.baseConsumer.ConsumePartition(k.opt.topic, p, offset); err != nil {
|
|
return fmt.Errorf("new consume partition error: %s", err)
|
|
}
|
|
log.V(10).Info("start partition consumer partition: %d, offset: %d", p, offset)
|
|
consumer := newConsumer(k, c, pom)
|
|
k.consumers = append(k.consumers, consumer)
|
|
k.wg.Add(1)
|
|
go consumer.start()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (k *kafkaCollect) Close() error {
|
|
for _, c := range k.consumers {
|
|
if err := c.close(); err != nil {
|
|
log.Warn("close consumer error: %s", err)
|
|
}
|
|
}
|
|
k.wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
func newConsumer(k *kafkaCollect, c sarama.PartitionConsumer, pom sarama.PartitionOffsetManager) *consumer {
|
|
return &consumer{kafkaCollect: k, consumer: c, pom: pom, closeCh: make(chan struct{}, 1)}
|
|
}
|
|
|
|
type consumer struct {
|
|
*kafkaCollect
|
|
pom sarama.PartitionOffsetManager
|
|
consumer sarama.PartitionConsumer
|
|
closeCh chan struct{}
|
|
}
|
|
|
|
func (c *consumer) close() error {
|
|
c.closeCh <- struct{}{}
|
|
c.pom.Close()
|
|
return c.consumer.Close()
|
|
}
|
|
|
|
func (c *consumer) start() {
|
|
defer c.wg.Done()
|
|
var err error
|
|
var value []byte
|
|
for {
|
|
select {
|
|
case msg := <-c.consumer.Messages():
|
|
collectCount.Incr("count")
|
|
c.pom.MarkOffset(msg.Offset+1, "")
|
|
log.V(10).Info("receive message from kafka topic: %s key: %s content: %s", msg.Key, msg.Topic, msg.Value)
|
|
protoSpan := new(model.ProtoSpan)
|
|
if err = json.Unmarshal(msg.Value, protoSpan); err != nil {
|
|
collectErrCount.Incr("count_error")
|
|
log.Error("unmarshal span from kafka error: %s, value: %v", err, value)
|
|
continue
|
|
}
|
|
c.handler(protoSpan)
|
|
case <-c.closeCh:
|
|
log.V(10).Info("receive closed return")
|
|
return
|
|
}
|
|
}
|
|
}
|