diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index db98a89bb..409386e3d 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1051,7 +1051,7 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) { p.lock.RUnlock() if onStateChange != nil { go func() { - defer Recover() + defer Recover(p.GetLogger()) onStateChange(p, oldState) }() } @@ -1225,7 +1225,7 @@ func (p *ParticipantImpl) onAnyTransportFailed() { // subscriberRTCPWorker sends SenderReports periodically when the participant is subscribed to // other publishedTracks in the room. func (p *ParticipantImpl) subscriberRTCPWorker() { - defer Recover() + defer Recover(p.GetLogger()) for { if p.IsDisconnected() { return @@ -1805,7 +1805,7 @@ func (p *ParticipantImpl) getPublishedTrackBySdpCid(clientId string) types.Media } func (p *ParticipantImpl) publisherRTCPWorker() { - defer Recover() + defer Recover(p.GetLogger()) // read from rtcpChan for pkts := range p.rtcpCh { @@ -1895,7 +1895,7 @@ func (p *ParticipantImpl) GetCachedDownTrack(trackID livekit.TrackID) (*webrtc.R return nil, sfu.DownTrackState{} } -func (p *ParticipantImpl) issueFullReconnect(reason types.ParticipantCloseReason) { +func (p *ParticipantImpl) IssueFullReconnect(reason types.ParticipantCloseReason) { _ = p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Leave{ Leave: &livekit.LeaveRequest{ @@ -1913,20 +1913,20 @@ func (p *ParticipantImpl) issueFullReconnect(reason types.ParticipantCloseReason func (p *ParticipantImpl) onPublicationError(trackID livekit.TrackID) { if p.params.ReconnectOnPublicationError { p.params.Logger.Infow("issuing full reconnect on publication error", "trackID", trackID) - p.issueFullReconnect(types.ParticipantCloseReasonPublicationError) + p.IssueFullReconnect(types.ParticipantCloseReasonPublicationError) } } func (p *ParticipantImpl) onSubscriptionError(trackID livekit.TrackID) { if p.params.ReconnectOnSubscriptionError { p.params.Logger.Infow("issuing full reconnect on subscription error", "trackID", trackID) - p.issueFullReconnect(types.ParticipantCloseReasonPublicationError) + p.IssueFullReconnect(types.ParticipantCloseReasonPublicationError) } } func (p *ParticipantImpl) onAnyTransportNegotiationFailed() { p.params.Logger.Infow("negotiation failed, starting full reconnect") - p.issueFullReconnect(types.ParticipantCloseReasonNegotiateFailed) + p.IssueFullReconnect(types.ParticipantCloseReasonNegotiateFailed) } func (p *ParticipantImpl) UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []types.SubscribedCodecQuality) error { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 89f7f96a2..4eee66f8b 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -295,6 +295,7 @@ type LocalParticipant interface { SubscriptionPermissionUpdate(publisherID livekit.ParticipantID, trackID livekit.TrackID, allowed bool) SendRefreshToken(token string) error SendReconnectResponse(reconnectResponse *livekit.ReconnectResponse) error + IssueFullReconnect(reason ParticipantCloseReason) // callbacks OnStateChange(func(p LocalParticipant, oldState livekit.ParticipantInfo_State)) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 044eba2a3..d7355a871 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -397,6 +397,11 @@ type FakeLocalParticipant struct { isSubscribedToReturnsOnCall map[int]struct { result1 bool } + IssueFullReconnectStub func(types.ParticipantCloseReason) + issueFullReconnectMutex sync.RWMutex + issueFullReconnectArgsForCall []struct { + arg1 types.ParticipantCloseReason + } MaybeStartMigrationStub func(bool, func()) bool maybeStartMigrationMutex sync.RWMutex maybeStartMigrationArgsForCall []struct { @@ -2793,6 +2798,38 @@ func (fake *FakeLocalParticipant) IsSubscribedToReturnsOnCall(i int, result1 boo }{result1} } +func (fake *FakeLocalParticipant) IssueFullReconnect(arg1 types.ParticipantCloseReason) { + fake.issueFullReconnectMutex.Lock() + fake.issueFullReconnectArgsForCall = append(fake.issueFullReconnectArgsForCall, struct { + arg1 types.ParticipantCloseReason + }{arg1}) + stub := fake.IssueFullReconnectStub + fake.recordInvocation("IssueFullReconnect", []interface{}{arg1}) + fake.issueFullReconnectMutex.Unlock() + if stub != nil { + fake.IssueFullReconnectStub(arg1) + } +} + +func (fake *FakeLocalParticipant) IssueFullReconnectCallCount() int { + fake.issueFullReconnectMutex.RLock() + defer fake.issueFullReconnectMutex.RUnlock() + return len(fake.issueFullReconnectArgsForCall) +} + +func (fake *FakeLocalParticipant) IssueFullReconnectCalls(stub func(types.ParticipantCloseReason)) { + fake.issueFullReconnectMutex.Lock() + defer fake.issueFullReconnectMutex.Unlock() + fake.IssueFullReconnectStub = stub +} + +func (fake *FakeLocalParticipant) IssueFullReconnectArgsForCall(i int) types.ParticipantCloseReason { + fake.issueFullReconnectMutex.RLock() + defer fake.issueFullReconnectMutex.RUnlock() + argsForCall := fake.issueFullReconnectArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) MaybeStartMigration(arg1 bool, arg2 func()) bool { fake.maybeStartMigrationMutex.Lock() ret, specificReturn := fake.maybeStartMigrationReturnsOnCall[len(fake.maybeStartMigrationArgsForCall)] @@ -5104,6 +5141,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.isRecorderMutex.RUnlock() fake.isSubscribedToMutex.RLock() defer fake.isSubscribedToMutex.RUnlock() + fake.issueFullReconnectMutex.RLock() + defer fake.issueFullReconnectMutex.RUnlock() fake.maybeStartMigrationMutex.RLock() defer fake.maybeStartMigrationMutex.RUnlock() fake.migrateStateMutex.RLock() diff --git a/pkg/rtc/utils.go b/pkg/rtc/utils.go index c78b9d579..388e12520 100644 --- a/pkg/rtc/utils.go +++ b/pkg/rtc/utils.go @@ -112,7 +112,10 @@ func RecoverSilent() { recover() } -func Recover() { +func Recover(l logger.Logger) { + if l == nil { + l = logger.GetLogger() + } if r := recover(); r != nil { var err error switch e := r.(type) { @@ -123,7 +126,7 @@ func Recover() { default: err = errors.New("unknown panic") } - logger.Errorw("recovered panic", err, "panic", r) + l.Errorw("recovered panic", err, "panic", r) } } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index dffc16a98..4715df802 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -468,23 +468,17 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room // manages an RTC session for a participant, runs on the RTC node func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalParticipant, requestSource routing.MessageSource) { - defer func() { - logger.Debugw("RTC session finishing", - "participant", participant.Identity(), - "pID", participant.ID(), - "room", room.Name(), - "roomID", room.ID(), - ) - requestSource.Close() - }() - defer rtc.Recover() - pLogger := rtc.LoggerWithParticipant( rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()), participant.Identity(), participant.ID(), false, ) + defer func() { + pLogger.Debugw("RTC session finishing") + requestSource.Close() + }() + defer rtc.Recover(pLogger) // send first refresh for cases when client token is close to expiring _ = r.refreshToken(participant) diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index f1b19a1bf..4fda2596f 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -261,7 +261,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // we would terminate the signal connection as well _ = conn.Close() }() - defer rtc.Recover() + defer rtc.Recover(pLogger) for { select { case <-done: