2019-04-22 02:59:20 +00:00

156 lines
2.7 KiB
Go

package deliver
import (
"encoding/binary"
"fmt"
"math/rand"
"net"
"sync"
"time"
"go-common/library/log"
)
var (
_magicBuf = []byte{0xAC, 0xBE}
_bufpool sync.Pool
)
func init() {
rand.Seed(time.Now().UnixNano())
_bufpool = sync.Pool{New: func() interface{} {
return make([]byte, 0, 4096)
}}
}
func freeBuf(buf []byte) {
buf = buf[:0]
_bufpool.Put(buf)
}
func getBuf() []byte {
return _bufpool.Get().([]byte)
}
// Deliver deliver span to dapper-service through tcp
type Deliver struct {
servers []string
readFn func() ([]byte, error)
conn *net.TCPConn
dataCh chan []byte
closeCh chan struct{}
closed bool
}
// New Deliver
func New(servers []string, readFn func() ([]byte, error)) (*Deliver, error) {
if len(servers) == 0 {
return nil, fmt.Errorf("no server provide")
}
d := &Deliver{
servers: servers,
readFn: readFn,
closeCh: make(chan struct{}, 1),
dataCh: make(chan []byte),
}
return d, d.start()
}
func (d *Deliver) start() error {
if err := d.dial(); err != nil {
return err
}
go d.fetch()
go d.loop()
return nil
}
func (d *Deliver) fetch() {
for {
if d.closed {
return
}
data, err := d.readFn()
if err != nil {
log.Error("deliver read data error: %s", err)
continue
}
d.dataCh <- data
}
}
func (d *Deliver) loop() {
for {
select {
case <-d.closeCh:
return
case data := <-d.dataCh:
data = warpData(data)
send:
_, err := d.conn.Write(data)
if err == nil {
freeBuf(data)
continue
}
d.reDial()
goto send
}
}
}
// Close deliver
func (d *Deliver) Close() error {
if d.closed {
return fmt.Errorf("already closed")
}
d.closed = true
d.closeCh <- struct{}{}
timer := time.NewTimer(50 * time.Millisecond)
select {
case data := <-d.dataCh:
// write last data to conn
_, err := d.conn.Write(data)
return fmt.Errorf("write last data error: %s", err)
case <-timer.C:
return nil
}
return nil
}
func (d *Deliver) reDial() {
if d.conn != nil {
d.conn.Close()
}
for {
if err := d.dial(); err != nil {
log.Error("redial error: %s, retry after second", err)
time.Sleep(time.Second)
}
break
}
}
func (d *Deliver) dial() error {
server := chioceServer(d.servers)
conn, err := net.Dial("tcp", server)
if err != nil {
return fmt.Errorf("dial tcp://%s error: %s", server, err)
}
d.conn = conn.(*net.TCPConn)
d.conn.SetKeepAlive(true)
return nil
}
func chioceServer(servers []string) string {
return servers[rand.Intn(len(servers))]
}
func warpData(data []byte) []byte {
buf := getBuf()
buf = append(buf, _magicBuf...)
buf = append(buf, []byte{0, 0, 0, 0, 0, 0}...)
binary.BigEndian.PutUint32(buf[2:6], uint32(len(data)+2))
buf = append(buf, data...)
return buf
}