From 59112b92009fbfa330256314501ccfbee6e105cc Mon Sep 17 00:00:00 2001 From: David Zhao Date: Wed, 20 Apr 2022 16:34:00 -0700 Subject: [PATCH] Increase default message channel from 200 -> 10k (#638) --- pkg/routing/localrouter.go | 11 +++++++---- pkg/routing/messagechannel.go | 6 ++++-- pkg/routing/messagechannel_test.go | 2 +- pkg/rtc/config.go | 6 +++--- pkg/rtc/participant_internal_test.go | 2 +- 5 files changed, 16 insertions(+), 11 deletions(-) diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index bd17df269..f35ed7918 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -12,6 +12,9 @@ import ( "github.com/livekit/protocol/logger" ) +// aggregated channel for all participants +const localRTCChannelSize = 10000 + // a router of messages on the same node, basic implementation for local testing type LocalRouter struct { currentNode LocalNode @@ -32,7 +35,7 @@ func NewLocalRouter(currentNode LocalNode) *LocalRouter { currentNode: currentNode, requestChannels: make(map[string]*MessageChannel), responseChannels: make(map[string]*MessageChannel), - rtcMessageChan: NewMessageChannel(), + rtcMessageChan: NewMessageChannel(localRTCChannelSize), } } @@ -114,7 +117,7 @@ func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livek func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error { if r.rtcMessageChan.IsClosed() { // create a new one - r.rtcMessageChan = NewMessageChannel() + r.rtcMessageChan = NewMessageChannel(localRTCChannelSize) } msg.ParticipantKey = string(participantKey(roomName, identity)) return r.writeRTCMessage(r.rtcMessageChan, msg) @@ -128,7 +131,7 @@ func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomNam func (r *LocalRouter) WriteNodeRTC(_ context.Context, _ string, msg *livekit.RTCNodeMessage) error { if r.rtcMessageChan.IsClosed() { // create a new one - r.rtcMessageChan = NewMessageChannel() + r.rtcMessageChan = NewMessageChannel(localRTCChannelSize) } return r.writeRTCMessage(r.rtcMessageChan, msg) } @@ -228,7 +231,7 @@ func (r *LocalRouter) getOrCreateMessageChannel(target map[string]*MessageChanne return mc } - mc = NewMessageChannel() + mc = NewMessageChannel(DefaultMessageChannelSize) mc.OnClose(func() { r.lock.Lock() delete(target, key) diff --git a/pkg/routing/messagechannel.go b/pkg/routing/messagechannel.go index 074d18ec5..8359d15f9 100644 --- a/pkg/routing/messagechannel.go +++ b/pkg/routing/messagechannel.go @@ -4,16 +4,18 @@ import ( "google.golang.org/protobuf/proto" ) +const DefaultMessageChannelSize = 200 + type MessageChannel struct { msgChan chan proto.Message closed chan struct{} onClose func() } -func NewMessageChannel() *MessageChannel { +func NewMessageChannel(size int) *MessageChannel { return &MessageChannel{ // allow some buffer to avoid blocked writes - msgChan: make(chan proto.Message, 200), + msgChan: make(chan proto.Message, size), closed: make(chan struct{}), } } diff --git a/pkg/routing/messagechannel_test.go b/pkg/routing/messagechannel_test.go index 22d8c9f2e..b79a00795 100644 --- a/pkg/routing/messagechannel_test.go +++ b/pkg/routing/messagechannel_test.go @@ -11,7 +11,7 @@ import ( func TestMessageChannel_WriteMessageClosed(t *testing.T) { // ensure it doesn't panic when written to after closing - m := routing.NewMessageChannel() + m := routing.NewMessageChannel(routing.DefaultMessageChannelSize) go func() { for msg := range m.ReadChan() { if msg == nil { diff --git a/pkg/rtc/config.go b/pkg/rtc/config.go index a83c2c886..65b43a17c 100644 --- a/pkg/rtc/config.go +++ b/pkg/rtc/config.go @@ -178,12 +178,12 @@ func NewWebRTCConfig(conf *config.Config, externalIP string) (*WebRTCConfig, err if rtcConf.UseICELite { s.SetLite(true) - } else if !conf.RTC.UseExternalIP { + } else if !rtcConf.UseExternalIP { // use STUN servers for server to support NAT // when deployed in production, we expect UseExternalIP to be used, and ports accessible // this is not compatible with ICE Lite - if len(conf.RTC.STUNServers) > 0 { - c.ICEServers = []webrtc.ICEServer{iceServerForStunServers(conf.RTC.STUNServers)} + if len(rtcConf.STUNServers) > 0 { + c.ICEServers = []webrtc.ICEServer{iceServerForStunServers(rtcConf.STUNServers)} } else { c.ICEServers = []webrtc.ICEServer{iceServerForStunServers(config.DefaultStunServers)} } diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index b5345a00c..808fb8bd5 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -173,7 +173,7 @@ func TestOutOfOrderUpdates(t *testing.T) { func TestDisconnectTiming(t *testing.T) { t.Run("Negotiate doesn't panic after channel closed", func(t *testing.T) { p := newParticipantForTest("test") - msg := routing.NewMessageChannel() + msg := routing.NewMessageChannel(routing.DefaultMessageChannelSize) p.params.Sink = msg go func() { for msg := range msg.ReadChan() {