Do not close publisher peer connection to aid migration. (#4427)

This commit is contained in:
Raja Subramanian
2026-04-03 21:50:59 +05:30
committed by GitHub
parent 91e90c1020
commit 8a67dd1b9f
4 changed files with 33 additions and 22 deletions

View File

@@ -1513,7 +1513,7 @@ func (p *ParticipantImpl) setupMigrationTimerLocked() {
if p.IsClosed() || p.IsDisconnected() {
return
}
p.subLogger.Debugw("closing peer connection(s) to aid migration")
p.subLogger.Debugw("closing subscriber peer connection to aid migration")
//
// Close all down tracks before closing subscriber peer connection.
@@ -1523,7 +1523,7 @@ func (p *ParticipantImpl) setupMigrationTimerLocked() {
//
p.SubscriptionManager.Close(true)
p.TransportManager.Close()
p.TransportManager.SubscriberClose()
})
}

View File

@@ -53,7 +53,19 @@ func (s *signallerAsync) WriteMessage(msg proto.Message) error {
return nil
}
getMessageType := func(msg proto.Message) string {
messageType := "unknown"
if typed, ok := msg.(*livekit.SignalResponse); ok {
messageType = fmt.Sprintf("%T", typed.Message)
}
return messageType
}
if s.params.Participant.IsDisconnected() {
s.params.Logger.Debugw(
"counld not send message to participant, participant disconnected",
"messageType", getMessageType(msg),
)
return nil
}
@@ -72,33 +84,27 @@ func (s *signallerAsync) WriteMessage(msg proto.Message) error {
sink := s.GetResponseSink()
if sink == nil {
if typed, ok := msg.(*livekit.SignalResponse); ok {
s.params.Logger.Debugw(
"could not send message to participant",
"messageType", fmt.Sprintf("%T", typed.Message),
)
}
s.params.Logger.Debugw(
"could not send message to participant, no sink",
"messageType", getMessageType(msg),
)
return nil
}
err := sink.WriteMessage(msg)
if err != nil {
if utils.ErrorIsOneOf(err, psrpc.Canceled, routing.ErrChannelClosed) {
if typed, ok := msg.(*livekit.SignalResponse); ok {
s.params.Logger.Debugw(
"could not send message to participant",
"error", err,
"messageType", fmt.Sprintf("%T", typed.Message),
)
}
s.params.Logger.Debugw(
"could not send message to participant",
"error", err,
"messageType", getMessageType(msg),
)
return nil
} else {
if typed, ok := msg.(*livekit.SignalResponse); ok {
s.params.Logger.Warnw(
"could not send message to participant", err,
"messageType", fmt.Sprintf("%T", typed.Message),
)
}
s.params.Logger.Warnw(
"could not send message to participant", err,
"messageType", getMessageType(msg),
)
return err
}
} else {

View File

@@ -211,6 +211,10 @@ func (t *TransportManager) Close() {
}
}
func (t *TransportManager) SubscriberClose() {
t.subscriber.Close()
}
func (t *TransportManager) HasPublisherEverConnected() bool {
return t.publisher.HasEverConnected()
}

View File

@@ -335,7 +335,7 @@ func (s *RTCService) serve(w http.ResponseWriter, r *http.Request, needsJoinRequ
roomName, pi, code, err = s.validateInternal(pLogger, r, needsJoinRequest, false)
if err != nil {
HandleError(w, r, code, err)
HandleError(w, r, code, err, getLoggerFields()...)
return
}
@@ -343,6 +343,7 @@ func (s *RTCService) serve(w http.ResponseWriter, r *http.Request, needsJoinRequ
if pi.ID != "" {
pID = pi.ID
}
pLogger.Debugw("join request validated", append(getLoggerFields(), "participantInit", &pi))
// give it a few attempts to start session
var cr connectionResult