From 12db4692972ca4892ce781e1c7352cd7bc75382c Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 15 Jun 2023 12:53:34 +0530 Subject: [PATCH] Better tracking of signalling connection. (#1794) * Better tracking of signalling connection. - Reason for closing signaling channel. - ConnectionID attached to request source/response sink * Tests --- pkg/routing/interfaces.go | 2 + pkg/routing/localrouter.go | 10 +-- pkg/routing/messagechannel.go | 21 ++++-- pkg/routing/messagechannel_test.go | 2 +- pkg/routing/redis.go | 20 +++++- pkg/routing/redisrouter.go | 6 +- pkg/routing/routingfakes/fake_message_sink.go | 66 +++++++++++++++++++ .../routingfakes/fake_message_source.go | 66 +++++++++++++++++++ pkg/routing/signal.go | 8 ++- pkg/rtc/participant.go | 20 ++++-- pkg/rtc/participant_internal_test.go | 2 +- pkg/rtc/participant_signal.go | 5 +- pkg/rtc/room.go | 2 +- pkg/rtc/types/interfaces.go | 45 ++++++++++++- .../typesfakes/fake_local_participant.go | 21 ++++-- pkg/service/roommanager.go | 4 +- pkg/service/rtcservice.go | 11 ++-- pkg/service/signal.go | 13 ++-- 18 files changed, 275 insertions(+), 49 deletions(-) diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 566d2a2b4..3bc54d9c9 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -23,6 +23,7 @@ type MessageSink interface { WriteMessage(msg proto.Message) error IsClosed() bool Close() + ConnectionID() livekit.ConnectionID } //counterfeiter:generate . MessageSource @@ -31,6 +32,7 @@ type MessageSource interface { ReadChan() <-chan proto.Message IsClosed() bool Close() + ConnectionID() livekit.ConnectionID } type ParticipantInit struct { diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index da43566db..0b604a73d 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -38,7 +38,7 @@ func NewLocalRouter(currentNode LocalNode, signalClient SignalClient) *LocalRout signalClient: signalClient, requestChannels: make(map[string]*MessageChannel), responseChannels: make(map[string]*MessageChannel), - rtcMessageChan: NewMessageChannel(localRTCChannelSize), + rtcMessageChan: NewMessageChannel(livekit.ConnectionID("local"), localRTCChannelSize), } } @@ -93,7 +93,7 @@ func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, room logger.Errorw("could not handle new participant", err, "room", roomName, "participant", pi.Identity, - "connectionID", connectionID, + "connID", connectionID, ) } return @@ -103,7 +103,7 @@ func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.Ro r.lock.Lock() if r.rtcMessageChan.IsClosed() { // create a new one - r.rtcMessageChan = NewMessageChannel(localRTCChannelSize) + r.rtcMessageChan = NewMessageChannel(livekit.ConnectionID("local"), localRTCChannelSize) } r.lock.Unlock() msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, identity)) @@ -121,7 +121,7 @@ func (r *LocalRouter) WriteNodeRTC(_ context.Context, _ string, msg *livekit.RTC r.lock.Lock() if r.rtcMessageChan.IsClosed() { // create a new one - r.rtcMessageChan = NewMessageChannel(localRTCChannelSize) + r.rtcMessageChan = NewMessageChannel(livekit.ConnectionID("local"), localRTCChannelSize) } r.lock.Unlock() return r.writeRTCMessage(r.rtcMessageChan, msg) @@ -254,7 +254,7 @@ func (r *LocalRouter) getOrCreateMessageChannel(target map[string]*MessageChanne return mc } - mc = NewMessageChannel(DefaultMessageChannelSize) + mc = NewMessageChannel(livekit.ConnectionID(key), DefaultMessageChannelSize) mc.OnClose(func() { r.lock.Lock() delete(target, key) diff --git a/pkg/routing/messagechannel.go b/pkg/routing/messagechannel.go index de5bd9075..bf914e1aa 100644 --- a/pkg/routing/messagechannel.go +++ b/pkg/routing/messagechannel.go @@ -3,24 +3,27 @@ package routing import ( "sync" + "github.com/livekit/protocol/livekit" "google.golang.org/protobuf/proto" ) const DefaultMessageChannelSize = 200 type MessageChannel struct { - msgChan chan proto.Message - onClose func() - isClosed bool - lock sync.RWMutex + connectionID livekit.ConnectionID + msgChan chan proto.Message + onClose func() + isClosed bool + lock sync.RWMutex } -func NewDefaultMessageChannel() *MessageChannel { - return NewMessageChannel(DefaultMessageChannelSize) +func NewDefaultMessageChannel(connectionID livekit.ConnectionID) *MessageChannel { + return NewMessageChannel(connectionID, DefaultMessageChannelSize) } -func NewMessageChannel(size int) *MessageChannel { +func NewMessageChannel(connectionID livekit.ConnectionID, size int) *MessageChannel { return &MessageChannel{ + connectionID: connectionID, // allow some buffer to avoid blocked writes msgChan: make(chan proto.Message, size), } @@ -71,3 +74,7 @@ func (m *MessageChannel) Close() { m.onClose() } } + +func (m *MessageChannel) ConnectionID() livekit.ConnectionID { + return m.connectionID +} diff --git a/pkg/routing/messagechannel_test.go b/pkg/routing/messagechannel_test.go index b79a00795..25bf7fdf1 100644 --- a/pkg/routing/messagechannel_test.go +++ b/pkg/routing/messagechannel_test.go @@ -11,7 +11,7 @@ import ( func TestMessageChannel_WriteMessageClosed(t *testing.T) { // ensure it doesn't panic when written to after closing - m := routing.NewMessageChannel(routing.DefaultMessageChannelSize) + m := routing.NewMessageChannel(livekit.ConnectionID("test"), routing.DefaultMessageChannelSize) go func() { for msg := range m.ReadChan() { if msg == nil { diff --git a/pkg/routing/redis.go b/pkg/routing/redis.go index db781cbe9..054613e61 100644 --- a/pkg/routing/redis.go +++ b/pkg/routing/redis.go @@ -98,16 +98,24 @@ func publishSignalMessage(rc redis.UniversalClient, nodeID livekit.NodeID, conne type RTCNodeSink struct { rc redis.UniversalClient nodeID livekit.NodeID + connectionID livekit.ConnectionID participantKey livekit.ParticipantKey participantKeyB62 livekit.ParticipantKey isClosed atomic.Bool onClose func() } -func NewRTCNodeSink(rc redis.UniversalClient, nodeID livekit.NodeID, participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey) *RTCNodeSink { +func NewRTCNodeSink( + rc redis.UniversalClient, + nodeID livekit.NodeID, + connectionID livekit.ConnectionID, + participantKey livekit.ParticipantKey, + participantKeyB62 livekit.ParticipantKey, +) *RTCNodeSink { return &RTCNodeSink{ rc: rc, nodeID: nodeID, + connectionID: connectionID, participantKey: participantKey, participantKeyB62: participantKeyB62, } @@ -137,6 +145,12 @@ func (s *RTCNodeSink) OnClose(f func()) { s.onClose = f } +func (s *RTCNodeSink) ConnectionID() livekit.ConnectionID { + return s.connectionID +} + +// ---------------------------------------------------------------------- + type SignalNodeSink struct { rc redis.UniversalClient nodeID livekit.NodeID @@ -177,3 +191,7 @@ func (s *SignalNodeSink) IsClosed() bool { func (s *SignalNodeSink) OnClose(f func()) { s.onClose = f } + +func (s *SignalNodeSink) ConnectionID() livekit.ConnectionID { + return s.connectionID +} diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index c5da2a54b..d31a13638 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -174,7 +174,7 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livek // set up response channel before sending StartSession and be ready to receive responses. resChan := r.getOrCreateMessageChannel(r.responseChannels, string(connectionID)) - sink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode.Id), pKey, pKeyB62) + sink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode.Id), connectionID, pKey, pKeyB62) // serialize claims ss, err := pi.ToStartSession(roomName, connectionID) @@ -199,7 +199,7 @@ func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName livekit.Ro return err } - rtcSink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode), pkey, pkeyB62) + rtcSink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode), livekit.ConnectionID("ephemeral"), pkey, pkeyB62) msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, identity)) msg.ParticipantKeyB62 = string(ParticipantKey(roomName, identity)) return r.writeRTCMessage(rtcSink, msg) @@ -216,7 +216,7 @@ func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomNam } func (r *RedisRouter) WriteNodeRTC(_ context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error { - rtcSink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNodeID), livekit.ParticipantKey(msg.ParticipantKey), livekit.ParticipantKey(msg.ParticipantKeyB62)) + rtcSink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNodeID), livekit.ConnectionID("ephemeral"), livekit.ParticipantKey(msg.ParticipantKey), livekit.ParticipantKey(msg.ParticipantKeyB62)) return r.writeRTCMessage(rtcSink, msg) } diff --git a/pkg/routing/routingfakes/fake_message_sink.go b/pkg/routing/routingfakes/fake_message_sink.go index ec53c4309..9359b2ac2 100644 --- a/pkg/routing/routingfakes/fake_message_sink.go +++ b/pkg/routing/routingfakes/fake_message_sink.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/protocol/livekit" "google.golang.org/protobuf/reflect/protoreflect" ) @@ -13,6 +14,16 @@ type FakeMessageSink struct { closeMutex sync.RWMutex closeArgsForCall []struct { } + ConnectionIDStub func() livekit.ConnectionID + connectionIDMutex sync.RWMutex + connectionIDArgsForCall []struct { + } + connectionIDReturns struct { + result1 livekit.ConnectionID + } + connectionIDReturnsOnCall map[int]struct { + result1 livekit.ConnectionID + } IsClosedStub func() bool isClosedMutex sync.RWMutex isClosedArgsForCall []struct { @@ -62,6 +73,59 @@ func (fake *FakeMessageSink) CloseCalls(stub func()) { fake.CloseStub = stub } +func (fake *FakeMessageSink) ConnectionID() livekit.ConnectionID { + fake.connectionIDMutex.Lock() + ret, specificReturn := fake.connectionIDReturnsOnCall[len(fake.connectionIDArgsForCall)] + fake.connectionIDArgsForCall = append(fake.connectionIDArgsForCall, struct { + }{}) + stub := fake.ConnectionIDStub + fakeReturns := fake.connectionIDReturns + fake.recordInvocation("ConnectionID", []interface{}{}) + fake.connectionIDMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeMessageSink) ConnectionIDCallCount() int { + fake.connectionIDMutex.RLock() + defer fake.connectionIDMutex.RUnlock() + return len(fake.connectionIDArgsForCall) +} + +func (fake *FakeMessageSink) ConnectionIDCalls(stub func() livekit.ConnectionID) { + fake.connectionIDMutex.Lock() + defer fake.connectionIDMutex.Unlock() + fake.ConnectionIDStub = stub +} + +func (fake *FakeMessageSink) ConnectionIDReturns(result1 livekit.ConnectionID) { + fake.connectionIDMutex.Lock() + defer fake.connectionIDMutex.Unlock() + fake.ConnectionIDStub = nil + fake.connectionIDReturns = struct { + result1 livekit.ConnectionID + }{result1} +} + +func (fake *FakeMessageSink) ConnectionIDReturnsOnCall(i int, result1 livekit.ConnectionID) { + fake.connectionIDMutex.Lock() + defer fake.connectionIDMutex.Unlock() + fake.ConnectionIDStub = nil + if fake.connectionIDReturnsOnCall == nil { + fake.connectionIDReturnsOnCall = make(map[int]struct { + result1 livekit.ConnectionID + }) + } + fake.connectionIDReturnsOnCall[i] = struct { + result1 livekit.ConnectionID + }{result1} +} + func (fake *FakeMessageSink) IsClosed() bool { fake.isClosedMutex.Lock() ret, specificReturn := fake.isClosedReturnsOnCall[len(fake.isClosedArgsForCall)] @@ -181,6 +245,8 @@ func (fake *FakeMessageSink) Invocations() map[string][][]interface{} { defer fake.invocationsMutex.RUnlock() fake.closeMutex.RLock() defer fake.closeMutex.RUnlock() + fake.connectionIDMutex.RLock() + defer fake.connectionIDMutex.RUnlock() fake.isClosedMutex.RLock() defer fake.isClosedMutex.RUnlock() fake.writeMessageMutex.RLock() diff --git a/pkg/routing/routingfakes/fake_message_source.go b/pkg/routing/routingfakes/fake_message_source.go index acfe7606c..40c48eb56 100644 --- a/pkg/routing/routingfakes/fake_message_source.go +++ b/pkg/routing/routingfakes/fake_message_source.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/protocol/livekit" "google.golang.org/protobuf/reflect/protoreflect" ) @@ -13,6 +14,16 @@ type FakeMessageSource struct { closeMutex sync.RWMutex closeArgsForCall []struct { } + ConnectionIDStub func() livekit.ConnectionID + connectionIDMutex sync.RWMutex + connectionIDArgsForCall []struct { + } + connectionIDReturns struct { + result1 livekit.ConnectionID + } + connectionIDReturnsOnCall map[int]struct { + result1 livekit.ConnectionID + } IsClosedStub func() bool isClosedMutex sync.RWMutex isClosedArgsForCall []struct { @@ -61,6 +72,59 @@ func (fake *FakeMessageSource) CloseCalls(stub func()) { fake.CloseStub = stub } +func (fake *FakeMessageSource) ConnectionID() livekit.ConnectionID { + fake.connectionIDMutex.Lock() + ret, specificReturn := fake.connectionIDReturnsOnCall[len(fake.connectionIDArgsForCall)] + fake.connectionIDArgsForCall = append(fake.connectionIDArgsForCall, struct { + }{}) + stub := fake.ConnectionIDStub + fakeReturns := fake.connectionIDReturns + fake.recordInvocation("ConnectionID", []interface{}{}) + fake.connectionIDMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeMessageSource) ConnectionIDCallCount() int { + fake.connectionIDMutex.RLock() + defer fake.connectionIDMutex.RUnlock() + return len(fake.connectionIDArgsForCall) +} + +func (fake *FakeMessageSource) ConnectionIDCalls(stub func() livekit.ConnectionID) { + fake.connectionIDMutex.Lock() + defer fake.connectionIDMutex.Unlock() + fake.ConnectionIDStub = stub +} + +func (fake *FakeMessageSource) ConnectionIDReturns(result1 livekit.ConnectionID) { + fake.connectionIDMutex.Lock() + defer fake.connectionIDMutex.Unlock() + fake.ConnectionIDStub = nil + fake.connectionIDReturns = struct { + result1 livekit.ConnectionID + }{result1} +} + +func (fake *FakeMessageSource) ConnectionIDReturnsOnCall(i int, result1 livekit.ConnectionID) { + fake.connectionIDMutex.Lock() + defer fake.connectionIDMutex.Unlock() + fake.ConnectionIDStub = nil + if fake.connectionIDReturnsOnCall == nil { + fake.connectionIDReturnsOnCall = make(map[int]struct { + result1 livekit.ConnectionID + }) + } + fake.connectionIDReturnsOnCall[i] = struct { + result1 livekit.ConnectionID + }{result1} +} + func (fake *FakeMessageSource) IsClosed() bool { fake.isClosedMutex.Lock() ret, specificReturn := fake.isClosedReturnsOnCall[len(fake.isClosedArgsForCall)] @@ -172,6 +236,8 @@ func (fake *FakeMessageSource) Invocations() map[string][][]interface{} { defer fake.invocationsMutex.RUnlock() fake.closeMutex.RLock() defer fake.closeMutex.RUnlock() + fake.connectionIDMutex.RLock() + defer fake.connectionIDMutex.RUnlock() fake.isClosedMutex.RLock() defer fake.isClosedMutex.RUnlock() fake.readChanMutex.RLock() diff --git a/pkg/routing/signal.go b/pkg/routing/signal.go index fab96ef7c..fad31191b 100644 --- a/pkg/routing/signal.go +++ b/pkg/routing/signal.go @@ -105,8 +105,9 @@ func (r *signalClient) StartParticipantSignal( Writer: signalRequestMessageWriter{}, CloseOnFailure: true, BlockOnClose: true, + ConnectionID: connectionID, }) - resChan := NewDefaultMessageChannel() + resChan := NewDefaultMessageChannel(connectionID) go func() { r.active.Inc() @@ -230,6 +231,7 @@ type SignalSinkParams[SendType, RecvType RelaySignalMessage] struct { Writer SignalMessageWriter[SendType] CloseOnFailure bool BlockOnClose bool + ConnectionID livekit.ConnectionID } func NewSignalMessageSink[SendType, RecvType RelaySignalMessage](params SignalSinkParams[SendType, RecvType]) MessageSink { @@ -348,3 +350,7 @@ func (s *signalMessageSink[SendType, RecvType]) WriteMessage(msg proto.Message) } return nil } + +func (s *signalMessageSink[SendType, RecvType]) ConnectionID() livekit.ConnectionID { + return s.SignalSinkParams.ConnectionID +} diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 5c3cdef26..5971d00e9 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -688,7 +688,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea p.updateState(livekit.ParticipantInfo_DISCONNECTED) // ensure this is synchronized - p.CloseSignalConnection() + p.CloseSignalConnection(types.SignallingCloseReasonParticipantClose) p.lock.RLock() onClose := p.onClose p.lock.RUnlock() @@ -741,7 +741,7 @@ func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool { onStart() } - p.CloseSignalConnection() + p.CloseSignalConnection(types.SignallingCloseReasonMigration) // // On subscriber peer connection, remote side will try ICE on both @@ -1349,7 +1349,7 @@ func (p *ParticipantImpl) setupDisconnectTimer() { func (p *ParticipantImpl) onAnyTransportFailed() { // clients support resuming of connections when websocket becomes disconnected - p.CloseSignalConnection() + p.CloseSignalConnection(types.SignallingCloseReasonTransportFailure) // detect when participant has actually left. p.setupDisconnectTimer() @@ -2068,7 +2068,17 @@ func (p *ParticipantImpl) IssueFullReconnect(reason types.ParticipantCloseReason }, }, }) - p.CloseSignalConnection() + + scr := types.SignallingCloseReasonUnknown + switch reason { + case types.ParticipantCloseReasonPublicationError: + scr = types.SignallingCloseReasonFullReconnectPublicationError + case types.ParticipantCloseReasonSubscriptionError: + scr = types.SignallingCloseReasonFullReconnectSubscriptionError + case types.ParticipantCloseReasonNegotiateFailed: + scr = types.SignallingCloseReasonFullReconnectNegotiateFailed + } + p.CloseSignalConnection(scr) // on a full reconnect, no need to supervise this participant anymore p.supervisor.Stop() @@ -2101,7 +2111,7 @@ func (p *ParticipantImpl) onSubscriptionError(trackID livekit.TrackID, fatal boo if p.params.ReconnectOnSubscriptionError && fatal { p.params.Logger.Infow("issuing full reconnect on subscription error", "trackID", trackID) - p.IssueFullReconnect(types.ParticipantCloseReasonPublicationError) + p.IssueFullReconnect(types.ParticipantCloseReasonSubscriptionError) } } diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index adbe5862c..8dce027b5 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -229,7 +229,7 @@ func TestOutOfOrderUpdates(t *testing.T) { func TestDisconnectTiming(t *testing.T) { t.Run("Negotiate doesn't panic after channel closed", func(t *testing.T) { p := newParticipantForTest("test") - msg := routing.NewMessageChannel(routing.DefaultMessageChannelSize) + msg := routing.NewMessageChannel(livekit.ConnectionID("test"), routing.DefaultMessageChannelSize) p.params.Sink = msg go func() { for msg := range msg.ReadChan() { diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index e09ff85ab..192ef0b48 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -11,6 +11,7 @@ import ( "github.com/livekit/psrpc" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc/types" ) func (p *ParticipantImpl) getResponseSink() routing.MessageSink { @@ -280,10 +281,10 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error { } // closes signal connection to notify client to resume/reconnect -func (p *ParticipantImpl) CloseSignalConnection() { +func (p *ParticipantImpl) CloseSignalConnection(reason types.SignallingCloseReason) { sink := p.getResponseSink() if sink != nil { - p.params.Logger.Infow("closing signal connection") + p.params.Logger.Infow("closing signal connection", "reason", reason, "connID", sink.ConnectionID()) sink.Close() p.SetResponseSink(nil) } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 0441a56b4..4344c6787 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -400,7 +400,7 @@ func (r *Room) GetParticipantRequestSource(identity livekit.ParticipantIdentity) func (r *Room) ResumeParticipant(p types.LocalParticipant, requestSource routing.MessageSource, responseSink routing.MessageSink, iceServers []*livekit.ICEServer, reason livekit.ReconnectReason) error { r.ReplaceParticipantRequestSource(p.Identity(), requestSource) // close previous sink, and link to new one - p.CloseSignalConnection() + p.CloseSignalConnection(types.SignallingCloseReasonResume) p.SetResponseSink(responseSink) p.SetSignalSourceValid(true) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 2501d7013..d9b887041 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -86,6 +86,7 @@ const ( ParticipantCloseReasonMigrationRequested ParticipantCloseReasonOvercommitted ParticipantCloseReasonPublicationError + ParticipantCloseReasonSubscriptionError ) func (p ParticipantCloseReason) String() string { @@ -130,6 +131,8 @@ func (p ParticipantCloseReason) String() string { return "OVERCOMMITTED" case ParticipantCloseReasonPublicationError: return "PUBLICATION_ERROR" + case ParticipantCloseReasonSubscriptionError: + return "SUBSCRIPTION_ERROR" default: return fmt.Sprintf("%d", int(p)) } @@ -160,7 +163,7 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason { return livekit.DisconnectReason_SERVER_SHUTDOWN case ParticipantCloseReasonOvercommitted: return livekit.DisconnectReason_SERVER_SHUTDOWN - case ParticipantCloseReasonNegotiateFailed, ParticipantCloseReasonPublicationError: + case ParticipantCloseReasonNegotiateFailed, ParticipantCloseReasonPublicationError, ParticipantCloseReasonSubscriptionError: return livekit.DisconnectReason_STATE_MISMATCH default: // the other types will map to unknown reason @@ -170,6 +173,44 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason { // --------------------------------------------- +type SignallingCloseReason int + +const ( + SignallingCloseReasonUnknown SignallingCloseReason = iota + SignallingCloseReasonMigration + SignallingCloseReasonResume + SignallingCloseReasonTransportFailure + SignallingCloseReasonFullReconnectPublicationError + SignallingCloseReasonFullReconnectSubscriptionError + SignallingCloseReasonFullReconnectNegotiateFailed + SignallingCloseReasonParticipantClose +) + +func (s SignallingCloseReason) String() string { + switch s { + case SignallingCloseReasonUnknown: + return "UNKNOWN" + case SignallingCloseReasonMigration: + return "MIGRATION" + case SignallingCloseReasonResume: + return "RESUME" + case SignallingCloseReasonTransportFailure: + return "TRANSPORT_FAILURE" + case SignallingCloseReasonFullReconnectPublicationError: + return "FULL_RECONNECT_PUBLICATION_ERROR" + case SignallingCloseReasonFullReconnectSubscriptionError: + return "FULL_RECONNECT_SUBSCRIPTION_ERROR" + case SignallingCloseReasonFullReconnectNegotiateFailed: + return "FULL_RECONNECT_NEGOTIATE_FAILED" + case SignallingCloseReasonParticipantClose: + return "PARTICIPANT_CLOSE" + default: + return fmt.Sprintf("%d", int(s)) + } +} + +// --------------------------------------------- + //counterfeiter:generate . Participant type Participant interface { ID() livekit.ParticipantID @@ -249,7 +290,7 @@ type LocalParticipant interface { GetBufferFactory() *buffer.Factory SetResponseSink(sink routing.MessageSink) - CloseSignalConnection() + CloseSignalConnection(reason SignallingCloseReason) UpdateLastSeenSignal() SetSignalSourceValid(valid bool) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index e4ddfea5f..235994fa9 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -131,9 +131,10 @@ type FakeLocalParticipant struct { closeReturnsOnCall map[int]struct { result1 error } - CloseSignalConnectionStub func() + CloseSignalConnectionStub func(types.SignallingCloseReason) closeSignalConnectionMutex sync.RWMutex closeSignalConnectionArgsForCall []struct { + arg1 types.SignallingCloseReason } ConnectedAtStub func() time.Time connectedAtMutex sync.RWMutex @@ -1429,15 +1430,16 @@ func (fake *FakeLocalParticipant) CloseReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeLocalParticipant) CloseSignalConnection() { +func (fake *FakeLocalParticipant) CloseSignalConnection(arg1 types.SignallingCloseReason) { fake.closeSignalConnectionMutex.Lock() fake.closeSignalConnectionArgsForCall = append(fake.closeSignalConnectionArgsForCall, struct { - }{}) + arg1 types.SignallingCloseReason + }{arg1}) stub := fake.CloseSignalConnectionStub - fake.recordInvocation("CloseSignalConnection", []interface{}{}) + fake.recordInvocation("CloseSignalConnection", []interface{}{arg1}) fake.closeSignalConnectionMutex.Unlock() if stub != nil { - fake.CloseSignalConnectionStub() + fake.CloseSignalConnectionStub(arg1) } } @@ -1447,12 +1449,19 @@ func (fake *FakeLocalParticipant) CloseSignalConnectionCallCount() int { return len(fake.closeSignalConnectionArgsForCall) } -func (fake *FakeLocalParticipant) CloseSignalConnectionCalls(stub func()) { +func (fake *FakeLocalParticipant) CloseSignalConnectionCalls(stub func(types.SignallingCloseReason)) { fake.closeSignalConnectionMutex.Lock() defer fake.closeSignalConnectionMutex.Unlock() fake.CloseSignalConnectionStub = stub } +func (fake *FakeLocalParticipant) CloseSignalConnectionArgsForCall(i int) types.SignallingCloseReason { + fake.closeSignalConnectionMutex.RLock() + defer fake.closeSignalConnectionMutex.RUnlock() + argsForCall := fake.closeSignalConnectionArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) ConnectedAt() time.Time { fake.connectedAtMutex.Lock() ret, specificReturn := fake.connectedAtReturnsOnCall[len(fake.connectedAtArgsForCall)] diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 056b1b976..4e3997d15 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -485,7 +485,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa false, ) defer func() { - pLogger.Debugw("RTC session finishing") + pLogger.Debugw("RTC session finishing", "connID", requestSource.ConnectionID()) requestSource.Close() }() @@ -511,7 +511,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa case <-tokenTicker.C: // refresh token with the first API Key/secret pair if err := r.refreshToken(participant); err != nil { - pLogger.Errorw("could not refresh token", err) + pLogger.Errorw("could not refresh token", err, "connID", requestSource.ConnectionID()) } case obj := <-requestSource.ReadChan(): // In single node mode, the request source is directly tied to the signal message channel diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index cd6bc93da..3888dfc24 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -285,6 +285,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { return case msg := <-cr.ResponseSource.ReadChan(): if msg == nil { + pLogger.Infow("nothing to read from response source", "connID", cr.ConnectionID) return } res, ok := msg.(*livekit.SignalResponse) @@ -323,16 +324,15 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // handle incoming requests from websocket for { req, count, err := sigConn.ReadRequest() - // normal closure if err != nil { + // normal/expected closure if err == io.EOF || strings.HasSuffix(err.Error(), "use of closed network connection") || websocket.IsCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { - pLogger.Debugw("exit ws read loop for closed connection", "connID", cr.ConnectionID) - return + pLogger.Infow("exit ws read loop for closed connection", "connID", cr.ConnectionID, "wsError", err) } else { pLogger.Errorw("error reading from websocket", err) - return } + return } if signalStats != nil { signalStats.AddBytes(uint64(count), false) @@ -374,8 +374,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if err := cr.RequestSink.WriteMessage(req); err != nil { - pLogger.Warnw("error writing to request sink", err, - "connID", cr.ConnectionID) + pLogger.Warnw("error writing to request sink", err, "connID", cr.ConnectionID) } } } diff --git a/pkg/service/signal.go b/pkg/service/signal.go index a8ce00811..c6907dadb 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -136,17 +136,18 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe l := logger.GetLogger().WithValues( "room", ss.RoomName, "participant", ss.Identity, - "connectionID", ss.ConnectionId, + "connID", ss.ConnectionId, ) - reqChan := routing.NewDefaultMessageChannel() + reqChan := routing.NewDefaultMessageChannel(livekit.ConnectionID(ss.ConnectionId)) defer reqChan.Close() sink := routing.NewSignalMessageSink(routing.SignalSinkParams[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest]{ - Logger: l, - Stream: stream, - Config: r.config, - Writer: signalResponseMessageWriter{}, + Logger: l, + Stream: stream, + Config: r.config, + Writer: signalResponseMessageWriter{}, + ConnectionID: livekit.ConnectionID(ss.ConnectionId), }) err = r.sessionHandler(ctx, livekit.RoomName(ss.RoomName), *pi, livekit.ConnectionID(ss.ConnectionId), reqChan, sink)