Clean up bits added to debug peer connection close hang. (#4114)

This commit is contained in:
Raja Subramanian
2025-11-28 10:30:39 +05:30
committed by GitHub
parent 9c483a693a
commit 0a2943bbc5
4 changed files with 5 additions and 153 deletions

View File

@@ -1560,31 +1560,11 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
// Close peer connections without blocking participant Close. If peer connections are gathering candidates
// Close will block.
go func() {
var smClosed atomic.Bool
var tmClosed atomic.Bool
var mcClosed atomic.Bool
var mrClosed atomic.Bool
time.AfterFunc(time.Minute, func() { // CLOSE-DEBUG-CLEANUP
if !smClosed.Load() || !tmClosed.Load() || !mcClosed.Load() || !mrClosed.Load() {
p.params.Logger.Infow(
"participant close timeout",
"smClosed", smClosed.Load(),
"tmClosed", tmClosed.Load(),
"mcClosed", mcClosed.Load(),
"mrClosed", mrClosed.Load(),
)
}
})
p.SubscriptionManager.Close(isExpectedToResume)
smClosed.Store(true)
p.TransportManager.Close()
tmClosed.Store(true)
p.metricsCollector.Stop()
mcClosed.Store(true)
p.metricsReporter.Stop()
mrClosed.Store(true)
}()
p.dataChannelStats.Stop()

View File

@@ -104,21 +104,7 @@ func (m *SubscriptionManager) Close(isExpectedToResume bool) {
close(m.closeCh)
m.lock.Unlock()
var done atomic.Bool
var downTracksClosed atomic.Bool
time.AfterFunc(time.Minute, func() { // CLOSE-DEBUG-CLEANUP
if !done.Load() || !downTracksClosed.Load() {
m.params.Logger.Infow(
"subscription maanager close timeout",
"done", done.Load(),
"downTracksClosed", downTracksClosed.Load(),
"numSubscribedTracks", len(m.GetSubscribedTracks()),
)
}
})
<-m.doneCh
done.Store(true)
prometheus.RecordTrackSubscribeCancels(int32(m.getNumCancellations()))
@@ -143,7 +129,6 @@ func (m *SubscriptionManager) Close(isExpectedToResume bool) {
go dt.CloseWithFlush(true)
}
}
downTracksClosed.Store(true)
}
func (m *SubscriptionManager) isClosed() bool {

View File

@@ -192,12 +192,10 @@ type trackDescription struct {
// PCTransport is a wrapper around PeerConnection, with some helper methods
type PCTransport struct {
params TransportParams
pc *webrtc.PeerConnection
sctpTransport *webrtc.SCTPTransport // CLOSE-DEBUG-CLEANUP
dtlsTransport *webrtc.DTLSTransport // CLOSE-DEBUG-CLEANUP
iceTransport *webrtc.ICETransport
me *webrtc.MediaEngine
params TransportParams
pc *webrtc.PeerConnection
iceTransport *webrtc.ICETransport
me *webrtc.MediaEngine
lock sync.RWMutex
@@ -282,12 +280,6 @@ type PCTransport struct {
signalStateCheckTimer *time.Timer
currentOfferIceCredential string // ice user:pwd, for publish side ice restart checking
pendingRestartIceOffer *webrtc.SessionDescription
// CLOSE-DEBUG-CLEANUP
iceGatheringState atomic.Value // webrtc.ICEGatheringState
iceConnectionState atomic.Value // webrtc.ICEConnectionState
iceConnectionStateChangeAt atomic.Time
peerConnectionState atomic.Value // webrtc.PeerConnectionState
}
type TransportParams struct {
@@ -522,9 +514,6 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
connectionDetails: types.NewICEConnectionDetails(params.Transport, params.Logger),
lastNegotiate: time.Now(),
}
t.iceGatheringState.Store(webrtc.ICEGatheringStateUnknown)
t.iceConnectionState.Store(webrtc.ICEConnectionStateUnknown)
t.peerConnectionState.Store(webrtc.PeerConnectionStateUnknown)
t.localOfferId.Store(uint32(rand.Intn(1<<8) + 1))
bwe, err := t.createPeerConnection()
@@ -596,8 +585,6 @@ func (t *PCTransport) createPeerConnection() (cc.BandwidthEstimator, error) {
t.pc.OnDataChannel(t.onDataChannel)
t.pc.OnTrack(t.params.Handler.OnTrack)
t.sctpTransport = t.pc.SCTP()
t.dtlsTransport = t.sctpTransport.Transport()
t.iceTransport = t.pc.SCTP().Transport().ICETransport()
if t.iceTransport == nil {
return bwe, ErrNoICETransport
@@ -781,7 +768,6 @@ func (t *PCTransport) setConnectedAt(at time.Time) bool {
}
func (t *PCTransport) onICEGatheringStateChange(state webrtc.ICEGatheringState) {
t.iceGatheringState.Store(state)
t.params.Logger.Debugw("ice gathering state change", "state", state.String())
if state != webrtc.ICEGatheringStateComplete {
return
@@ -813,8 +799,6 @@ func (t *PCTransport) handleConnectionFailed(forceShortConn bool) {
}
func (t *PCTransport) onICEConnectionStateChange(state webrtc.ICEConnectionState) {
t.iceConnectionState.Store(state)
t.iceConnectionStateChangeAt.Store(time.Now())
t.params.Logger.Debugw("ice connection state change", "state", state.String())
switch state {
case webrtc.ICEConnectionStateConnected:
@@ -826,7 +810,6 @@ func (t *PCTransport) onICEConnectionStateChange(state webrtc.ICEConnectionState
}
func (t *PCTransport) onPeerConnectionStateChange(state webrtc.PeerConnectionState) {
t.peerConnectionState.Store(state)
t.params.Logger.Debugw("peer connection state change", "state", state.String())
switch state {
case webrtc.PeerConnectionStateConnected:
@@ -1454,94 +1437,16 @@ func (t *PCTransport) Close() {
return
}
var eventsQueueDone atomic.Bool
var streamAllocatorStopped atomic.Bool
var pacerStopped atomic.Bool
var reliableDataChannelClosed atomic.Bool
var lossyDataChannelClosed atomic.Bool
var unlabeledDataChannelClosed atomic.Bool
var peerConnectionClosed atomic.Bool
time.AfterFunc(time.Minute, func() { // CLOSE-DEBUG-CLEANUP
if !eventsQueueDone.Load() || !streamAllocatorStopped.Load() || !pacerStopped.Load() || !reliableDataChannelClosed.Load() || !lossyDataChannelClosed.Load() || !unlabeledDataChannelClosed.Load() || !peerConnectionClosed.Load() {
t.lock.Lock()
iceStartedAt := t.iceStartedAt
iceConnectedAt := t.iceConnectedAt
iceStats := t.mayFailedICEStats
t.lock.Unlock()
t.params.Logger.Infow(
"transport close timeout",
"eventsQueueDone", eventsQueueDone.Load(),
"streamAllocatorStopped", streamAllocatorStopped.Load(),
"pacerStopped", pacerStopped.Load(),
"reliableDataChannelClosed", reliableDataChannelClosed.Load(),
"lossyDataChannelClosed", lossyDataChannelClosed.Load(),
"unlabeledDataChannelClosed", unlabeledDataChannelClosed.Load(),
"peerConnectionClosed", peerConnectionClosed.Load(),
"iceStartedAt", iceStartedAt,
"iceConnectedAt", iceConnectedAt,
"iceGatheringState", t.iceGatheringState.Load().(webrtc.ICEGatheringState).String(),
"iceConnectionState", t.iceConnectionState.Load().(webrtc.ICEConnectionState).String(),
"icceConnectionStateChangedAt", t.iceConnectionStateChangeAt.Load(),
"peerConnectionState", t.peerConnectionState.Load().(webrtc.PeerConnectionState).String(),
"iceStats", iceCandidatePairStatsEncoder{iceStats},
"iceConnectionInfo", t.GetICEConnectionInfo(),
"connectionType", t.connectionDetails.GetConnectionType(),
)
t.params.Logger.Infow(
"transport close timeout - signalling state",
"signalingState", t.pc.SignalingState().String(),
)
t.params.Logger.Infow(
"transport close timeout - iceTransport state",
"iceTransportState", t.iceTransport.State().String(),
)
if t.sctpTransport != nil {
t.params.Logger.Infow(
"transport close timeout - sctpTransport state",
"sctpTransportState", t.sctpTransport.State().String(),
)
}
numActiveTransceivers := 0
numInactiveTransceivers := 0
for _, tr := range t.pc.GetTransceivers() {
if tr.Direction() == webrtc.RTPTransceiverDirectionInactive {
numInactiveTransceivers++
} else {
numActiveTransceivers++
}
}
t.params.Logger.Infow(
"transport close timeout - transceivers state",
"numInactiveTransceivers", numInactiveTransceivers,
"numActiveTransceivers", numActiveTransceivers,
)
if t.dtlsTransport != nil {
t.params.Logger.Infow(
"transport close timeout - dtlsTransport state",
"dltsTransportState", t.dtlsTransport.State().String(),
)
}
}
})
<-t.eventsQueue.Stop()
eventsQueueDone.Store(true)
t.clearSignalStateCheckTimer()
if t.streamAllocator != nil {
t.streamAllocator.Stop()
}
streamAllocatorStopped.Store(true)
if t.pacer != nil {
t.pacer.Stop()
}
pacerStopped.Store(true)
t.clearConnTimer()
@@ -1555,25 +1460,21 @@ func (t *PCTransport) Close() {
t.reliableDC.Close()
t.reliableDC = nil
}
reliableDataChannelClosed.Store(true)
if t.lossyDC != nil {
t.lossyDC.Close()
t.lossyDC = nil
}
lossyDataChannelClosed.Store(true)
for _, dc := range t.unlabeledDataChannels {
dc.Close()
}
t.unlabeledDataChannels = nil
unlabeledDataChannelClosed.Store(true)
t.lock.Unlock()
if err := t.pc.Close(); err != nil {
t.params.Logger.Warnw("unclean close of peer connection", err)
}
peerConnectionClosed.Store(true)
t.outputAndClearICEStats()
}

View File

@@ -199,26 +199,12 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro
}
func (t *TransportManager) Close() {
var publisherClosed atomic.Bool
var subscriberClosed atomic.Bool
time.AfterFunc(time.Minute, func() { // CLOSE-DEBUG-CLEANUP
if !publisherClosed.Load() || !subscriberClosed.Load() {
t.params.Logger.Infow(
"transport maanager close timeout",
"publisherClosed", publisherClosed.Load(),
"subscriberClosed", subscriberClosed.Load(),
)
}
})
if t.publisher != nil {
t.publisher.Close()
}
publisherClosed.Store(true)
if t.subscriber != nil {
t.subscriber.Close()
}
subscriberClosed.Store(true)
}
func (t *TransportManager) SubscriberClose() {