From 0a2943bbc503c7a9891e10000733dec10e4c0e75 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 28 Nov 2025 10:30:39 +0530 Subject: [PATCH] Clean up bits added to debug peer connection close hang. (#4114) --- pkg/rtc/participant.go | 20 ------ pkg/rtc/subscriptionmanager.go | 15 ----- pkg/rtc/transport.go | 109 ++------------------------------- pkg/rtc/transportmanager.go | 14 ----- 4 files changed, 5 insertions(+), 153 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 2aa199edb..8f501a54e 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1560,31 +1560,11 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea // Close peer connections without blocking participant Close. If peer connections are gathering candidates // Close will block. go func() { - var smClosed atomic.Bool - var tmClosed atomic.Bool - var mcClosed atomic.Bool - var mrClosed atomic.Bool - time.AfterFunc(time.Minute, func() { // CLOSE-DEBUG-CLEANUP - if !smClosed.Load() || !tmClosed.Load() || !mcClosed.Load() || !mrClosed.Load() { - p.params.Logger.Infow( - "participant close timeout", - "smClosed", smClosed.Load(), - "tmClosed", tmClosed.Load(), - "mcClosed", mcClosed.Load(), - "mrClosed", mrClosed.Load(), - ) - } - }) - p.SubscriptionManager.Close(isExpectedToResume) - smClosed.Store(true) p.TransportManager.Close() - tmClosed.Store(true) p.metricsCollector.Stop() - mcClosed.Store(true) p.metricsReporter.Stop() - mrClosed.Store(true) }() p.dataChannelStats.Stop() diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index d60451427..eb7fcb45c 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -104,21 +104,7 @@ func (m *SubscriptionManager) Close(isExpectedToResume bool) { close(m.closeCh) m.lock.Unlock() - var done atomic.Bool - var downTracksClosed atomic.Bool - time.AfterFunc(time.Minute, func() { // CLOSE-DEBUG-CLEANUP - if !done.Load() || !downTracksClosed.Load() { - m.params.Logger.Infow( - "subscription maanager close timeout", - "done", done.Load(), - "downTracksClosed", downTracksClosed.Load(), - "numSubscribedTracks", len(m.GetSubscribedTracks()), - ) - } - }) - <-m.doneCh - done.Store(true) prometheus.RecordTrackSubscribeCancels(int32(m.getNumCancellations())) @@ -143,7 +129,6 @@ func (m *SubscriptionManager) Close(isExpectedToResume bool) { go dt.CloseWithFlush(true) } } - downTracksClosed.Store(true) } func (m *SubscriptionManager) isClosed() bool { diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 63197745a..8a4368016 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -192,12 +192,10 @@ type trackDescription struct { // PCTransport is a wrapper around PeerConnection, with some helper methods type PCTransport struct { - params TransportParams - pc *webrtc.PeerConnection - sctpTransport *webrtc.SCTPTransport // CLOSE-DEBUG-CLEANUP - dtlsTransport *webrtc.DTLSTransport // CLOSE-DEBUG-CLEANUP - iceTransport *webrtc.ICETransport - me *webrtc.MediaEngine + params TransportParams + pc *webrtc.PeerConnection + iceTransport *webrtc.ICETransport + me *webrtc.MediaEngine lock sync.RWMutex @@ -282,12 +280,6 @@ type PCTransport struct { signalStateCheckTimer *time.Timer currentOfferIceCredential string // ice user:pwd, for publish side ice restart checking pendingRestartIceOffer *webrtc.SessionDescription - - // CLOSE-DEBUG-CLEANUP - iceGatheringState atomic.Value // webrtc.ICEGatheringState - iceConnectionState atomic.Value // webrtc.ICEConnectionState - iceConnectionStateChangeAt atomic.Time - peerConnectionState atomic.Value // webrtc.PeerConnectionState } type TransportParams struct { @@ -522,9 +514,6 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) { connectionDetails: types.NewICEConnectionDetails(params.Transport, params.Logger), lastNegotiate: time.Now(), } - t.iceGatheringState.Store(webrtc.ICEGatheringStateUnknown) - t.iceConnectionState.Store(webrtc.ICEConnectionStateUnknown) - t.peerConnectionState.Store(webrtc.PeerConnectionStateUnknown) t.localOfferId.Store(uint32(rand.Intn(1<<8) + 1)) bwe, err := t.createPeerConnection() @@ -596,8 +585,6 @@ func (t *PCTransport) createPeerConnection() (cc.BandwidthEstimator, error) { t.pc.OnDataChannel(t.onDataChannel) t.pc.OnTrack(t.params.Handler.OnTrack) - t.sctpTransport = t.pc.SCTP() - t.dtlsTransport = t.sctpTransport.Transport() t.iceTransport = t.pc.SCTP().Transport().ICETransport() if t.iceTransport == nil { return bwe, ErrNoICETransport @@ -781,7 +768,6 @@ func (t *PCTransport) setConnectedAt(at time.Time) bool { } func (t *PCTransport) onICEGatheringStateChange(state webrtc.ICEGatheringState) { - t.iceGatheringState.Store(state) t.params.Logger.Debugw("ice gathering state change", "state", state.String()) if state != webrtc.ICEGatheringStateComplete { return @@ -813,8 +799,6 @@ func (t *PCTransport) handleConnectionFailed(forceShortConn bool) { } func (t *PCTransport) onICEConnectionStateChange(state webrtc.ICEConnectionState) { - t.iceConnectionState.Store(state) - t.iceConnectionStateChangeAt.Store(time.Now()) t.params.Logger.Debugw("ice connection state change", "state", state.String()) switch state { case webrtc.ICEConnectionStateConnected: @@ -826,7 +810,6 @@ func (t *PCTransport) onICEConnectionStateChange(state webrtc.ICEConnectionState } func (t *PCTransport) onPeerConnectionStateChange(state webrtc.PeerConnectionState) { - t.peerConnectionState.Store(state) t.params.Logger.Debugw("peer connection state change", "state", state.String()) switch state { case webrtc.PeerConnectionStateConnected: @@ -1454,94 +1437,16 @@ func (t *PCTransport) Close() { return } - var eventsQueueDone atomic.Bool - var streamAllocatorStopped atomic.Bool - var pacerStopped atomic.Bool - var reliableDataChannelClosed atomic.Bool - var lossyDataChannelClosed atomic.Bool - var unlabeledDataChannelClosed atomic.Bool - var peerConnectionClosed atomic.Bool - time.AfterFunc(time.Minute, func() { // CLOSE-DEBUG-CLEANUP - if !eventsQueueDone.Load() || !streamAllocatorStopped.Load() || !pacerStopped.Load() || !reliableDataChannelClosed.Load() || !lossyDataChannelClosed.Load() || !unlabeledDataChannelClosed.Load() || !peerConnectionClosed.Load() { - t.lock.Lock() - iceStartedAt := t.iceStartedAt - iceConnectedAt := t.iceConnectedAt - iceStats := t.mayFailedICEStats - t.lock.Unlock() - - t.params.Logger.Infow( - "transport close timeout", - "eventsQueueDone", eventsQueueDone.Load(), - "streamAllocatorStopped", streamAllocatorStopped.Load(), - "pacerStopped", pacerStopped.Load(), - "reliableDataChannelClosed", reliableDataChannelClosed.Load(), - "lossyDataChannelClosed", lossyDataChannelClosed.Load(), - "unlabeledDataChannelClosed", unlabeledDataChannelClosed.Load(), - "peerConnectionClosed", peerConnectionClosed.Load(), - "iceStartedAt", iceStartedAt, - "iceConnectedAt", iceConnectedAt, - "iceGatheringState", t.iceGatheringState.Load().(webrtc.ICEGatheringState).String(), - "iceConnectionState", t.iceConnectionState.Load().(webrtc.ICEConnectionState).String(), - "icceConnectionStateChangedAt", t.iceConnectionStateChangeAt.Load(), - "peerConnectionState", t.peerConnectionState.Load().(webrtc.PeerConnectionState).String(), - "iceStats", iceCandidatePairStatsEncoder{iceStats}, - "iceConnectionInfo", t.GetICEConnectionInfo(), - "connectionType", t.connectionDetails.GetConnectionType(), - ) - - t.params.Logger.Infow( - "transport close timeout - signalling state", - "signalingState", t.pc.SignalingState().String(), - ) - - t.params.Logger.Infow( - "transport close timeout - iceTransport state", - "iceTransportState", t.iceTransport.State().String(), - ) - - if t.sctpTransport != nil { - t.params.Logger.Infow( - "transport close timeout - sctpTransport state", - "sctpTransportState", t.sctpTransport.State().String(), - ) - } - - numActiveTransceivers := 0 - numInactiveTransceivers := 0 - for _, tr := range t.pc.GetTransceivers() { - if tr.Direction() == webrtc.RTPTransceiverDirectionInactive { - numInactiveTransceivers++ - } else { - numActiveTransceivers++ - } - } - t.params.Logger.Infow( - "transport close timeout - transceivers state", - "numInactiveTransceivers", numInactiveTransceivers, - "numActiveTransceivers", numActiveTransceivers, - ) - - if t.dtlsTransport != nil { - t.params.Logger.Infow( - "transport close timeout - dtlsTransport state", - "dltsTransportState", t.dtlsTransport.State().String(), - ) - } - } - }) - <-t.eventsQueue.Stop() - eventsQueueDone.Store(true) t.clearSignalStateCheckTimer() if t.streamAllocator != nil { t.streamAllocator.Stop() } - streamAllocatorStopped.Store(true) + if t.pacer != nil { t.pacer.Stop() } - pacerStopped.Store(true) t.clearConnTimer() @@ -1555,25 +1460,21 @@ func (t *PCTransport) Close() { t.reliableDC.Close() t.reliableDC = nil } - reliableDataChannelClosed.Store(true) if t.lossyDC != nil { t.lossyDC.Close() t.lossyDC = nil } - lossyDataChannelClosed.Store(true) for _, dc := range t.unlabeledDataChannels { dc.Close() } t.unlabeledDataChannels = nil - unlabeledDataChannelClosed.Store(true) t.lock.Unlock() if err := t.pc.Close(); err != nil { t.params.Logger.Warnw("unclean close of peer connection", err) } - peerConnectionClosed.Store(true) t.outputAndClearICEStats() } diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 8a0dbc3cb..02a421c42 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -199,26 +199,12 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro } func (t *TransportManager) Close() { - var publisherClosed atomic.Bool - var subscriberClosed atomic.Bool - time.AfterFunc(time.Minute, func() { // CLOSE-DEBUG-CLEANUP - if !publisherClosed.Load() || !subscriberClosed.Load() { - t.params.Logger.Infow( - "transport maanager close timeout", - "publisherClosed", publisherClosed.Load(), - "subscriberClosed", subscriberClosed.Load(), - ) - } - }) - if t.publisher != nil { t.publisher.Close() } - publisherClosed.Store(true) if t.subscriber != nil { t.subscriber.Close() } - subscriberClosed.Store(true) } func (t *TransportManager) SubscriberClose() {