156 lines
2.7 KiB
Go
156 lines
2.7 KiB
Go
package deliver
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"go-common/library/log"
|
|
)
|
|
|
|
var (
|
|
_magicBuf = []byte{0xAC, 0xBE}
|
|
_bufpool sync.Pool
|
|
)
|
|
|
|
func init() {
|
|
rand.Seed(time.Now().UnixNano())
|
|
_bufpool = sync.Pool{New: func() interface{} {
|
|
return make([]byte, 0, 4096)
|
|
}}
|
|
}
|
|
|
|
func freeBuf(buf []byte) {
|
|
buf = buf[:0]
|
|
_bufpool.Put(buf)
|
|
}
|
|
|
|
func getBuf() []byte {
|
|
return _bufpool.Get().([]byte)
|
|
}
|
|
|
|
// Deliver deliver span to dapper-service through tcp
|
|
type Deliver struct {
|
|
servers []string
|
|
readFn func() ([]byte, error)
|
|
conn *net.TCPConn
|
|
dataCh chan []byte
|
|
closeCh chan struct{}
|
|
closed bool
|
|
}
|
|
|
|
// New Deliver
|
|
func New(servers []string, readFn func() ([]byte, error)) (*Deliver, error) {
|
|
if len(servers) == 0 {
|
|
return nil, fmt.Errorf("no server provide")
|
|
}
|
|
d := &Deliver{
|
|
servers: servers,
|
|
readFn: readFn,
|
|
closeCh: make(chan struct{}, 1),
|
|
dataCh: make(chan []byte),
|
|
}
|
|
return d, d.start()
|
|
}
|
|
|
|
func (d *Deliver) start() error {
|
|
if err := d.dial(); err != nil {
|
|
return err
|
|
}
|
|
go d.fetch()
|
|
go d.loop()
|
|
return nil
|
|
}
|
|
|
|
func (d *Deliver) fetch() {
|
|
for {
|
|
if d.closed {
|
|
return
|
|
}
|
|
data, err := d.readFn()
|
|
if err != nil {
|
|
log.Error("deliver read data error: %s", err)
|
|
continue
|
|
}
|
|
d.dataCh <- data
|
|
}
|
|
}
|
|
|
|
func (d *Deliver) loop() {
|
|
for {
|
|
select {
|
|
case <-d.closeCh:
|
|
return
|
|
case data := <-d.dataCh:
|
|
data = warpData(data)
|
|
send:
|
|
_, err := d.conn.Write(data)
|
|
if err == nil {
|
|
freeBuf(data)
|
|
continue
|
|
}
|
|
d.reDial()
|
|
goto send
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close deliver
|
|
func (d *Deliver) Close() error {
|
|
if d.closed {
|
|
return fmt.Errorf("already closed")
|
|
}
|
|
d.closed = true
|
|
d.closeCh <- struct{}{}
|
|
timer := time.NewTimer(50 * time.Millisecond)
|
|
select {
|
|
case data := <-d.dataCh:
|
|
// write last data to conn
|
|
_, err := d.conn.Write(data)
|
|
return fmt.Errorf("write last data error: %s", err)
|
|
case <-timer.C:
|
|
return nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *Deliver) reDial() {
|
|
if d.conn != nil {
|
|
d.conn.Close()
|
|
}
|
|
for {
|
|
if err := d.dial(); err != nil {
|
|
log.Error("redial error: %s, retry after second", err)
|
|
time.Sleep(time.Second)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
func (d *Deliver) dial() error {
|
|
server := chioceServer(d.servers)
|
|
conn, err := net.Dial("tcp", server)
|
|
if err != nil {
|
|
return fmt.Errorf("dial tcp://%s error: %s", server, err)
|
|
}
|
|
d.conn = conn.(*net.TCPConn)
|
|
d.conn.SetKeepAlive(true)
|
|
return nil
|
|
}
|
|
|
|
func chioceServer(servers []string) string {
|
|
return servers[rand.Intn(len(servers))]
|
|
}
|
|
|
|
func warpData(data []byte) []byte {
|
|
buf := getBuf()
|
|
buf = append(buf, _magicBuf...)
|
|
buf = append(buf, []byte{0, 0, 0, 0, 0, 0}...)
|
|
binary.BigEndian.PutUint32(buf[2:6], uint32(len(data)+2))
|
|
buf = append(buf, data...)
|
|
return buf
|
|
}
|