2019-04-22 02:59:20 +00:00

107 lines
2.1 KiB
Go

package service
import (
"time"
"go-common/app/service/main/broadcast/libs/bytes"
"go-common/app/service/main/broadcast/model"
"go-common/library/log"
)
// RoomOptions room options.
type RoomOptions struct {
BatchNum int
SignalTime time.Duration
}
// Room room.
type Room struct {
s *Service
id string
proto chan *model.Proto
}
var (
roomReadyProto = new(model.Proto)
)
// NewRoom new a room struct, store channel room info.
func NewRoom(s *Service, id string, options RoomOptions) (r *Room) {
r = &Room{
s: s,
id: id,
proto: make(chan *model.Proto, options.BatchNum*2),
}
go r.pushproc(options.BatchNum, options.SignalTime)
return
}
// Push push msg to the room, if chan full discard it.
func (r *Room) Push(op int32, msg []byte, contentType int32) (err error) {
var p = &model.Proto{
Ver: 1,
Operation: op,
ContentType: contentType,
Body: msg,
}
select {
case r.proto <- p:
default:
err = ErrRoomFull
}
return
}
// pushproc merge proto and push msgs in batch.
func (r *Room) pushproc(batch int, sigTime time.Duration) {
var (
n int
last time.Time
p *model.Proto
buf = bytes.NewWriterSize(int(model.MaxBodySize))
)
log.Info("start room:%s goroutine", r.id)
td := time.AfterFunc(sigTime, func() {
select {
case r.proto <- roomReadyProto:
default:
}
})
defer td.Stop()
for {
if p = <-r.proto; p == nil {
break // exit
} else if p != roomReadyProto {
// merge buffer ignore error, always nil
p.WriteTo(buf)
if n++; n == 1 {
last = time.Now()
td.Reset(sigTime)
continue
} else if n < batch {
if sigTime > time.Since(last) {
continue
}
}
} else {
if n == 0 {
break
}
}
r.s.broadcastRoomRawBytes(r.id, buf.Buffer())
// TODO use reset buffer
// after push to room channel, renew a buffer, let old buffer gc
buf = bytes.NewWriterSize(buf.Size())
n = 0
if r.s.conf.Room.Idle != 0 {
td.Reset(time.Duration(r.s.conf.Room.Idle))
} else {
td.Reset(time.Minute)
}
}
r.s.roomsMutex.Lock()
delete(r.s.rooms, r.id)
r.s.roomsMutex.Unlock()
log.Info("room:%s goroutine exit", r.id)
}