From 086009f05ae05a4a8f2b451b489d902bbd0afc80 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 26 Nov 2022 21:42:19 +0530 Subject: [PATCH] Do not forward media till peer connection is connected. (#1194) There were some failures with missing media. The only thing I could see between working and non-working case is when media forwarding starts. So, delay media forwarding till peer connection is connected. Also, add a subscribe op only if a subscribe/unsubscribe queuing is successful. There was a recent change to not queue a subscribe when the participant is closed/disconnected. This got the subscribe op counter out of whack. --- pkg/rtc/mediatrackreceiver.go | 13 ++- pkg/rtc/participant.go | 13 ++- pkg/rtc/types/interfaces.go | 9 +- .../typesfakes/fake_local_participant.go | 86 +++++++++++++++++-- pkg/rtc/uptrackmanager.go | 4 +- pkg/sfu/downtrack.go | 2 +- 6 files changed, 105 insertions(+), 22 deletions(-) diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 23669dafe..fcbe6e4ed 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -430,10 +430,9 @@ func (t *MediaTrackReceiver) removePendingSubscribeOp(subscriberID livekit.Parti // AddSubscriber subscribes sub to current mediaTrack func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) error { - t.addPendingSubscribeOp(sub.ID()) - - trackID := t.ID() - sub.EnqueueSubscribeTrack(trackID, t.params.IsRelayed, t.addSubscriber) + if sub.EnqueueSubscribeTrack(t.ID(), t.params.IsRelayed, t.addSubscriber) { + t.addPendingSubscribeOp(sub.ID()) + } return nil } @@ -509,9 +508,9 @@ func (t *MediaTrackReceiver) RemoveSubscriber(subscriberID livekit.ParticipantID } sub := subTrack.Subscriber() - t.addPendingSubscribeOp(sub.ID()) - - sub.EnqueueUnsubscribeTrack(subTrack.ID(), t.params.IsRelayed, willBeResumed, t.removeSubscriber) + if sub.EnqueueUnsubscribeTrack(subTrack.ID(), t.params.IsRelayed, willBeResumed, t.removeSubscriber) { + t.addPendingSubscribeOp(sub.ID()) + } } func (t *MediaTrackReceiver) removeSubscriber(subscriberID livekit.ParticipantID, willBeResumed bool) (err error) { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 6d2411c80..14b200219 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1948,10 +1948,10 @@ func (p *ParticipantImpl) onAnyTransportNegotiationFailed() { p.supervisor.Stop() } -func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, isRelayed bool, f func(sub types.LocalParticipant) error) { +func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, isRelayed bool, f func(sub types.LocalParticipant) error) bool { // do not queue subscription is participant is already closed/disconnected if p.isClosed.Load() || p.State() == livekit.ParticipantInfo_DISCONNECTED { - return + return false } p.params.Logger.Debugw("queuing subscribe", "trackID", trackID, "relayed", isRelayed) @@ -1966,9 +1966,15 @@ func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, isRelay p.lock.Unlock() go p.ProcessSubscriptionRequestsQueue(trackID) + return true } -func (p *ParticipantImpl) EnqueueUnsubscribeTrack(trackID livekit.TrackID, isRelayed bool, willBeResumed bool, f func(subscriberID livekit.ParticipantID, willBeResumed bool) error) { +func (p *ParticipantImpl) EnqueueUnsubscribeTrack( + trackID livekit.TrackID, + isRelayed bool, + willBeResumed bool, + f func(subscriberID livekit.ParticipantID, willBeResumed bool) error, +) bool { p.params.Logger.Debugw("queuing unsubscribe", "trackID", trackID, "relayed", isRelayed) p.supervisor.UpdateSubscription(trackID, false) @@ -1982,6 +1988,7 @@ func (p *ParticipantImpl) EnqueueUnsubscribeTrack(trackID livekit.TrackID, isRel p.lock.Unlock() go p.ProcessSubscriptionRequestsQueue(trackID) + return true } func (p *ParticipantImpl) ProcessSubscriptionRequestsQueue(trackID livekit.TrackID) { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 83e2ceb4f..e8974a5bd 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -319,8 +319,13 @@ type LocalParticipant interface { UncacheDownTrack(rtpTransceiver *webrtc.RTPTransceiver) GetCachedDownTrack(trackID livekit.TrackID) (*webrtc.RTPTransceiver, sfu.DownTrackState) - EnqueueSubscribeTrack(trackID livekit.TrackID, isRelayed bool, f func(sub LocalParticipant) error) - EnqueueUnsubscribeTrack(trackID livekit.TrackID, isRelayed bool, willBeResumed bool, f func(subscriberID livekit.ParticipantID, willBeResumed bool) error) + EnqueueSubscribeTrack(trackID livekit.TrackID, isRelayed bool, f func(sub LocalParticipant) error) bool + EnqueueUnsubscribeTrack( + trackID livekit.TrackID, + isRelayed bool, + willBeResumed bool, + f func(subscriberID livekit.ParticipantID, willBeResumed bool) error, + ) bool ProcessSubscriptionRequestsQueue(trackID livekit.TrackID) ClearInProgressAndProcessSubscriptionRequestsQueue(trackID livekit.TrackID) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index c49818bd9..bc4a43634 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -166,14 +166,20 @@ type FakeLocalParticipant struct { debugInfoReturnsOnCall map[int]struct { result1 map[string]interface{} } - EnqueueSubscribeTrackStub func(livekit.TrackID, bool, func(sub types.LocalParticipant) error) + EnqueueSubscribeTrackStub func(livekit.TrackID, bool, func(sub types.LocalParticipant) error) bool enqueueSubscribeTrackMutex sync.RWMutex enqueueSubscribeTrackArgsForCall []struct { arg1 livekit.TrackID arg2 bool arg3 func(sub types.LocalParticipant) error } - EnqueueUnsubscribeTrackStub func(livekit.TrackID, bool, bool, func(subscriberID livekit.ParticipantID, willBeResumed bool) error) + enqueueSubscribeTrackReturns struct { + result1 bool + } + enqueueSubscribeTrackReturnsOnCall map[int]struct { + result1 bool + } + EnqueueUnsubscribeTrackStub func(livekit.TrackID, bool, bool, func(subscriberID livekit.ParticipantID, willBeResumed bool) error) bool enqueueUnsubscribeTrackMutex sync.RWMutex enqueueUnsubscribeTrackArgsForCall []struct { arg1 livekit.TrackID @@ -181,6 +187,12 @@ type FakeLocalParticipant struct { arg3 bool arg4 func(subscriberID livekit.ParticipantID, willBeResumed bool) error } + enqueueUnsubscribeTrackReturns struct { + result1 bool + } + enqueueUnsubscribeTrackReturnsOnCall map[int]struct { + result1 bool + } GetAdaptiveStreamStub func() bool getAdaptiveStreamMutex sync.RWMutex getAdaptiveStreamArgsForCall []struct { @@ -1533,19 +1545,25 @@ func (fake *FakeLocalParticipant) DebugInfoReturnsOnCall(i int, result1 map[stri }{result1} } -func (fake *FakeLocalParticipant) EnqueueSubscribeTrack(arg1 livekit.TrackID, arg2 bool, arg3 func(sub types.LocalParticipant) error) { +func (fake *FakeLocalParticipant) EnqueueSubscribeTrack(arg1 livekit.TrackID, arg2 bool, arg3 func(sub types.LocalParticipant) error) bool { fake.enqueueSubscribeTrackMutex.Lock() + ret, specificReturn := fake.enqueueSubscribeTrackReturnsOnCall[len(fake.enqueueSubscribeTrackArgsForCall)] fake.enqueueSubscribeTrackArgsForCall = append(fake.enqueueSubscribeTrackArgsForCall, struct { arg1 livekit.TrackID arg2 bool arg3 func(sub types.LocalParticipant) error }{arg1, arg2, arg3}) stub := fake.EnqueueSubscribeTrackStub + fakeReturns := fake.enqueueSubscribeTrackReturns fake.recordInvocation("EnqueueSubscribeTrack", []interface{}{arg1, arg2, arg3}) fake.enqueueSubscribeTrackMutex.Unlock() if stub != nil { - fake.EnqueueSubscribeTrackStub(arg1, arg2, arg3) + return stub(arg1, arg2, arg3) } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 } func (fake *FakeLocalParticipant) EnqueueSubscribeTrackCallCount() int { @@ -1554,7 +1572,7 @@ func (fake *FakeLocalParticipant) EnqueueSubscribeTrackCallCount() int { return len(fake.enqueueSubscribeTrackArgsForCall) } -func (fake *FakeLocalParticipant) EnqueueSubscribeTrackCalls(stub func(livekit.TrackID, bool, func(sub types.LocalParticipant) error)) { +func (fake *FakeLocalParticipant) EnqueueSubscribeTrackCalls(stub func(livekit.TrackID, bool, func(sub types.LocalParticipant) error) bool) { fake.enqueueSubscribeTrackMutex.Lock() defer fake.enqueueSubscribeTrackMutex.Unlock() fake.EnqueueSubscribeTrackStub = stub @@ -1567,8 +1585,32 @@ func (fake *FakeLocalParticipant) EnqueueSubscribeTrackArgsForCall(i int) (livek return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrack(arg1 livekit.TrackID, arg2 bool, arg3 bool, arg4 func(subscriberID livekit.ParticipantID, willBeResumed bool) error) { +func (fake *FakeLocalParticipant) EnqueueSubscribeTrackReturns(result1 bool) { + fake.enqueueSubscribeTrackMutex.Lock() + defer fake.enqueueSubscribeTrackMutex.Unlock() + fake.EnqueueSubscribeTrackStub = nil + fake.enqueueSubscribeTrackReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeLocalParticipant) EnqueueSubscribeTrackReturnsOnCall(i int, result1 bool) { + fake.enqueueSubscribeTrackMutex.Lock() + defer fake.enqueueSubscribeTrackMutex.Unlock() + fake.EnqueueSubscribeTrackStub = nil + if fake.enqueueSubscribeTrackReturnsOnCall == nil { + fake.enqueueSubscribeTrackReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.enqueueSubscribeTrackReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + +func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrack(arg1 livekit.TrackID, arg2 bool, arg3 bool, arg4 func(subscriberID livekit.ParticipantID, willBeResumed bool) error) bool { fake.enqueueUnsubscribeTrackMutex.Lock() + ret, specificReturn := fake.enqueueUnsubscribeTrackReturnsOnCall[len(fake.enqueueUnsubscribeTrackArgsForCall)] fake.enqueueUnsubscribeTrackArgsForCall = append(fake.enqueueUnsubscribeTrackArgsForCall, struct { arg1 livekit.TrackID arg2 bool @@ -1576,11 +1618,16 @@ func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrack(arg1 livekit.TrackID, arg4 func(subscriberID livekit.ParticipantID, willBeResumed bool) error }{arg1, arg2, arg3, arg4}) stub := fake.EnqueueUnsubscribeTrackStub + fakeReturns := fake.enqueueUnsubscribeTrackReturns fake.recordInvocation("EnqueueUnsubscribeTrack", []interface{}{arg1, arg2, arg3, arg4}) fake.enqueueUnsubscribeTrackMutex.Unlock() if stub != nil { - fake.EnqueueUnsubscribeTrackStub(arg1, arg2, arg3, arg4) + return stub(arg1, arg2, arg3, arg4) } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 } func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackCallCount() int { @@ -1589,7 +1636,7 @@ func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackCallCount() int { return len(fake.enqueueUnsubscribeTrackArgsForCall) } -func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackCalls(stub func(livekit.TrackID, bool, bool, func(subscriberID livekit.ParticipantID, willBeResumed bool) error)) { +func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackCalls(stub func(livekit.TrackID, bool, bool, func(subscriberID livekit.ParticipantID, willBeResumed bool) error) bool) { fake.enqueueUnsubscribeTrackMutex.Lock() defer fake.enqueueUnsubscribeTrackMutex.Unlock() fake.EnqueueUnsubscribeTrackStub = stub @@ -1602,6 +1649,29 @@ func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackArgsForCall(i int) (liv return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 } +func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackReturns(result1 bool) { + fake.enqueueUnsubscribeTrackMutex.Lock() + defer fake.enqueueUnsubscribeTrackMutex.Unlock() + fake.EnqueueUnsubscribeTrackStub = nil + fake.enqueueUnsubscribeTrackReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackReturnsOnCall(i int, result1 bool) { + fake.enqueueUnsubscribeTrackMutex.Lock() + defer fake.enqueueUnsubscribeTrackMutex.Unlock() + fake.EnqueueUnsubscribeTrackStub = nil + if fake.enqueueUnsubscribeTrackReturnsOnCall == nil { + fake.enqueueUnsubscribeTrackReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.enqueueUnsubscribeTrackReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeLocalParticipant) GetAdaptiveStream() bool { fake.getAdaptiveStreamMutex.Lock() ret, specificReturn := fake.getAdaptiveStreamReturnsOnCall[len(fake.getAdaptiveStreamArgsForCall)] diff --git a/pkg/rtc/uptrackmanager.go b/pkg/rtc/uptrackmanager.go index 9006f6c6f..391d603d1 100644 --- a/pkg/rtc/uptrackmanager.go +++ b/pkg/rtc/uptrackmanager.go @@ -495,7 +495,9 @@ func (u *UpTrackManager) maybeRemovePendingSubscriptionLocked(trackID livekit.Tr } if sendUpdate && (forceUpdate || found) { - u.params.Logger.Debugw("removing pending subscription", "subscriberID", sub.ID(), "trackID", trackID) + if found { + u.params.Logger.Debugw("removing pending subscription", "subscriberID", sub.ID(), "trackID", trackID) + } u.opsQueue.Enqueue(func() { sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true) }) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 67d8efca9..8da91bed4 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -453,7 +453,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { } }() - if !d.bound.Load() { + if !d.bound.Load() || !d.connected.Load() { return nil }