107 lines
2.1 KiB
Go
107 lines
2.1 KiB
Go
package service
|
|
|
|
import (
|
|
"time"
|
|
|
|
"go-common/app/service/main/broadcast/libs/bytes"
|
|
"go-common/app/service/main/broadcast/model"
|
|
"go-common/library/log"
|
|
)
|
|
|
|
// RoomOptions room options.
|
|
type RoomOptions struct {
|
|
BatchNum int
|
|
SignalTime time.Duration
|
|
}
|
|
|
|
// Room room.
|
|
type Room struct {
|
|
s *Service
|
|
id string
|
|
proto chan *model.Proto
|
|
}
|
|
|
|
var (
|
|
roomReadyProto = new(model.Proto)
|
|
)
|
|
|
|
// NewRoom new a room struct, store channel room info.
|
|
func NewRoom(s *Service, id string, options RoomOptions) (r *Room) {
|
|
r = &Room{
|
|
s: s,
|
|
id: id,
|
|
proto: make(chan *model.Proto, options.BatchNum*2),
|
|
}
|
|
go r.pushproc(options.BatchNum, options.SignalTime)
|
|
return
|
|
}
|
|
|
|
// Push push msg to the room, if chan full discard it.
|
|
func (r *Room) Push(op int32, msg []byte, contentType int32) (err error) {
|
|
var p = &model.Proto{
|
|
Ver: 1,
|
|
Operation: op,
|
|
ContentType: contentType,
|
|
Body: msg,
|
|
}
|
|
select {
|
|
case r.proto <- p:
|
|
default:
|
|
err = ErrRoomFull
|
|
}
|
|
return
|
|
}
|
|
|
|
// pushproc merge proto and push msgs in batch.
|
|
func (r *Room) pushproc(batch int, sigTime time.Duration) {
|
|
var (
|
|
n int
|
|
last time.Time
|
|
p *model.Proto
|
|
buf = bytes.NewWriterSize(int(model.MaxBodySize))
|
|
)
|
|
log.Info("start room:%s goroutine", r.id)
|
|
td := time.AfterFunc(sigTime, func() {
|
|
select {
|
|
case r.proto <- roomReadyProto:
|
|
default:
|
|
}
|
|
})
|
|
defer td.Stop()
|
|
for {
|
|
if p = <-r.proto; p == nil {
|
|
break // exit
|
|
} else if p != roomReadyProto {
|
|
// merge buffer ignore error, always nil
|
|
p.WriteTo(buf)
|
|
if n++; n == 1 {
|
|
last = time.Now()
|
|
td.Reset(sigTime)
|
|
continue
|
|
} else if n < batch {
|
|
if sigTime > time.Since(last) {
|
|
continue
|
|
}
|
|
}
|
|
} else {
|
|
if n == 0 {
|
|
break
|
|
}
|
|
}
|
|
r.s.broadcastRoomRawBytes(r.id, buf.Buffer())
|
|
// TODO use reset buffer
|
|
// after push to room channel, renew a buffer, let old buffer gc
|
|
buf = bytes.NewWriterSize(buf.Size())
|
|
n = 0
|
|
if r.s.conf.Room.Idle != 0 {
|
|
td.Reset(time.Duration(r.s.conf.Room.Idle))
|
|
} else {
|
|
td.Reset(time.Minute)
|
|
}
|
|
}
|
|
r.s.roomsMutex.Lock()
|
|
delete(r.s.rooms, r.id)
|
|
r.s.roomsMutex.Unlock()
|
|
log.Info("room:%s goroutine exit", r.id)
|
|
}
|