Increase default message channel from 200 -> 10k (#638)

This commit is contained in:
David Zhao
2022-04-20 16:34:00 -07:00
committed by GitHub
parent f699499220
commit 59112b9200
5 changed files with 16 additions and 11 deletions
+7 -4
View File
@@ -12,6 +12,9 @@ import (
"github.com/livekit/protocol/logger"
)
// aggregated channel for all participants
const localRTCChannelSize = 10000
// a router of messages on the same node, basic implementation for local testing
type LocalRouter struct {
currentNode LocalNode
@@ -32,7 +35,7 @@ func NewLocalRouter(currentNode LocalNode) *LocalRouter {
currentNode: currentNode,
requestChannels: make(map[string]*MessageChannel),
responseChannels: make(map[string]*MessageChannel),
rtcMessageChan: NewMessageChannel(),
rtcMessageChan: NewMessageChannel(localRTCChannelSize),
}
}
@@ -114,7 +117,7 @@ func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livek
func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
if r.rtcMessageChan.IsClosed() {
// create a new one
r.rtcMessageChan = NewMessageChannel()
r.rtcMessageChan = NewMessageChannel(localRTCChannelSize)
}
msg.ParticipantKey = string(participantKey(roomName, identity))
return r.writeRTCMessage(r.rtcMessageChan, msg)
@@ -128,7 +131,7 @@ func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomNam
func (r *LocalRouter) WriteNodeRTC(_ context.Context, _ string, msg *livekit.RTCNodeMessage) error {
if r.rtcMessageChan.IsClosed() {
// create a new one
r.rtcMessageChan = NewMessageChannel()
r.rtcMessageChan = NewMessageChannel(localRTCChannelSize)
}
return r.writeRTCMessage(r.rtcMessageChan, msg)
}
@@ -228,7 +231,7 @@ func (r *LocalRouter) getOrCreateMessageChannel(target map[string]*MessageChanne
return mc
}
mc = NewMessageChannel()
mc = NewMessageChannel(DefaultMessageChannelSize)
mc.OnClose(func() {
r.lock.Lock()
delete(target, key)
+4 -2
View File
@@ -4,16 +4,18 @@ import (
"google.golang.org/protobuf/proto"
)
const DefaultMessageChannelSize = 200
type MessageChannel struct {
msgChan chan proto.Message
closed chan struct{}
onClose func()
}
func NewMessageChannel() *MessageChannel {
func NewMessageChannel(size int) *MessageChannel {
return &MessageChannel{
// allow some buffer to avoid blocked writes
msgChan: make(chan proto.Message, 200),
msgChan: make(chan proto.Message, size),
closed: make(chan struct{}),
}
}
+1 -1
View File
@@ -11,7 +11,7 @@ import (
func TestMessageChannel_WriteMessageClosed(t *testing.T) {
// ensure it doesn't panic when written to after closing
m := routing.NewMessageChannel()
m := routing.NewMessageChannel(routing.DefaultMessageChannelSize)
go func() {
for msg := range m.ReadChan() {
if msg == nil {
+3 -3
View File
@@ -178,12 +178,12 @@ func NewWebRTCConfig(conf *config.Config, externalIP string) (*WebRTCConfig, err
if rtcConf.UseICELite {
s.SetLite(true)
} else if !conf.RTC.UseExternalIP {
} else if !rtcConf.UseExternalIP {
// use STUN servers for server to support NAT
// when deployed in production, we expect UseExternalIP to be used, and ports accessible
// this is not compatible with ICE Lite
if len(conf.RTC.STUNServers) > 0 {
c.ICEServers = []webrtc.ICEServer{iceServerForStunServers(conf.RTC.STUNServers)}
if len(rtcConf.STUNServers) > 0 {
c.ICEServers = []webrtc.ICEServer{iceServerForStunServers(rtcConf.STUNServers)}
} else {
c.ICEServers = []webrtc.ICEServer{iceServerForStunServers(config.DefaultStunServers)}
}
+1 -1
View File
@@ -173,7 +173,7 @@ func TestOutOfOrderUpdates(t *testing.T) {
func TestDisconnectTiming(t *testing.T) {
t.Run("Negotiate doesn't panic after channel closed", func(t *testing.T) {
p := newParticipantForTest("test")
msg := routing.NewMessageChannel()
msg := routing.NewMessageChannel(routing.DefaultMessageChannelSize)
p.params.Sink = msg
go func() {
for msg := range msg.ReadChan() {