From bbadd4ccb6ad418fb60dc9355d411ca6ffd3400a Mon Sep 17 00:00:00 2001 From: David Zhao Date: Wed, 12 May 2021 23:40:41 -0700 Subject: [PATCH] properly cleanup downtracks on subscriber disconnect --- pkg/rtc/participant.go | 16 ++++++++++++++-- pkg/rtc/transport.go | 3 +++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 93e595606..59b3c98f5 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + "github.com/pion/ion-sfu/pkg/sfu" "github.com/pion/ion-sfu/pkg/twcc" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" @@ -367,8 +368,19 @@ func (p *ParticipantImpl) Close() error { t.OnClose(nil) t.RemoveAllSubscribers() } + + var downtracksToClose []*sfu.DownTrack + for _, tracks := range p.subscribedTracks { + for _, st := range tracks { + downtracksToClose = append(downtracksToClose, st.DownTrack()) + } + } p.lock.Unlock() + for _, dt := range downtracksToClose { + dt.Close() + } + p.updateState(livekit.ParticipantInfo_DISCONNECTED) p.subscriber.pc.OnDataChannel(nil) p.subscriber.pc.OnICECandidate(nil) @@ -571,7 +583,7 @@ func (p *ParticipantImpl) GetSubscribedTracks() []types.SubscribedTrack { // AddSubscribedTrack adds a track to the participant's subscribed list func (p *ParticipantImpl) AddSubscribedTrack(pubId string, subTrack types.SubscribedTrack) { logger.Debugw("added subscribedTrack", "srcParticipant", pubId, - "participant", p.Identity()) + "participant", p.Identity(), "track", subTrack.ID()) p.lock.Lock() p.subscribedTracks[pubId] = append(p.subscribedTracks[pubId], subTrack) p.lock.Unlock() @@ -580,7 +592,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(pubId string, subTrack types.Subscr // RemoveSubscribedTrack removes a track to the participant's subscribed list func (p *ParticipantImpl) RemoveSubscribedTrack(pubId string, subTrack types.SubscribedTrack) { logger.Debugw("removed subscribedTrack", "srcParticipant", pubId, - "participant", p.Identity()) + "participant", p.Identity(), "track", subTrack.ID()) p.lock.Lock() defer p.lock.Unlock() tracks := make([]types.SubscribedTrack, 0, len(p.subscribedTracks[pubId])) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index b1cd18369..a492750a7 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -156,6 +156,9 @@ func (t *PCTransport) CreateAndSendOffer(options *webrtc.OfferOptions) error { if t.onOffer == nil { return nil } + if t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed { + return nil + } state := t.negotiationState.Load().(int) // when there's an ongoing negotiation, let it finish and not disrupt its state