From 8a67dd1b9f40ee089b3193ef3dc0967189a26548 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 3 Apr 2026 21:50:59 +0530 Subject: [PATCH] Do not close publisher peer connection to aid migration. (#4427) --- pkg/rtc/participant.go | 4 +-- pkg/rtc/signalling/signallerasync.go | 44 ++++++++++++++++------------ pkg/rtc/transportmanager.go | 4 +++ pkg/service/rtcservice.go | 3 +- 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 4efd90f23..62f579677 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1513,7 +1513,7 @@ func (p *ParticipantImpl) setupMigrationTimerLocked() { if p.IsClosed() || p.IsDisconnected() { return } - p.subLogger.Debugw("closing peer connection(s) to aid migration") + p.subLogger.Debugw("closing subscriber peer connection to aid migration") // // Close all down tracks before closing subscriber peer connection. @@ -1523,7 +1523,7 @@ func (p *ParticipantImpl) setupMigrationTimerLocked() { // p.SubscriptionManager.Close(true) - p.TransportManager.Close() + p.TransportManager.SubscriberClose() }) } diff --git a/pkg/rtc/signalling/signallerasync.go b/pkg/rtc/signalling/signallerasync.go index 15fb5814e..a25486faa 100644 --- a/pkg/rtc/signalling/signallerasync.go +++ b/pkg/rtc/signalling/signallerasync.go @@ -53,7 +53,19 @@ func (s *signallerAsync) WriteMessage(msg proto.Message) error { return nil } + getMessageType := func(msg proto.Message) string { + messageType := "unknown" + if typed, ok := msg.(*livekit.SignalResponse); ok { + messageType = fmt.Sprintf("%T", typed.Message) + } + return messageType + } + if s.params.Participant.IsDisconnected() { + s.params.Logger.Debugw( + "counld not send message to participant, participant disconnected", + "messageType", getMessageType(msg), + ) return nil } @@ -72,33 +84,27 @@ func (s *signallerAsync) WriteMessage(msg proto.Message) error { sink := s.GetResponseSink() if sink == nil { - if typed, ok := msg.(*livekit.SignalResponse); ok { - s.params.Logger.Debugw( - "could not send message to participant", - "messageType", fmt.Sprintf("%T", typed.Message), - ) - } + s.params.Logger.Debugw( + "could not send message to participant, no sink", + "messageType", getMessageType(msg), + ) return nil } err := sink.WriteMessage(msg) if err != nil { if utils.ErrorIsOneOf(err, psrpc.Canceled, routing.ErrChannelClosed) { - if typed, ok := msg.(*livekit.SignalResponse); ok { - s.params.Logger.Debugw( - "could not send message to participant", - "error", err, - "messageType", fmt.Sprintf("%T", typed.Message), - ) - } + s.params.Logger.Debugw( + "could not send message to participant", + "error", err, + "messageType", getMessageType(msg), + ) return nil } else { - if typed, ok := msg.(*livekit.SignalResponse); ok { - s.params.Logger.Warnw( - "could not send message to participant", err, - "messageType", fmt.Sprintf("%T", typed.Message), - ) - } + s.params.Logger.Warnw( + "could not send message to participant", err, + "messageType", getMessageType(msg), + ) return err } } else { diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 80e5fac87..25cacf8fe 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -211,6 +211,10 @@ func (t *TransportManager) Close() { } } +func (t *TransportManager) SubscriberClose() { + t.subscriber.Close() +} + func (t *TransportManager) HasPublisherEverConnected() bool { return t.publisher.HasEverConnected() } diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index e96901dbd..e8c225518 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -335,7 +335,7 @@ func (s *RTCService) serve(w http.ResponseWriter, r *http.Request, needsJoinRequ roomName, pi, code, err = s.validateInternal(pLogger, r, needsJoinRequest, false) if err != nil { - HandleError(w, r, code, err) + HandleError(w, r, code, err, getLoggerFields()...) return } @@ -343,6 +343,7 @@ func (s *RTCService) serve(w http.ResponseWriter, r *http.Request, needsJoinRequ if pi.ID != "" { pID = pi.ID } + pLogger.Debugw("join request validated", append(getLoggerFields(), "participantInit", &pi)) // give it a few attempts to start session var cr connectionResult