Do not forward media till peer connection is connected. (#1194)

There were some failures with missing media. The only thing I could
see between working and non-working case is when media forwarding
starts. So, delay media forwarding till peer connection is connected.

Also, add a subscribe op only if a subscribe/unsubscribe queuing is
successful. There was a recent change to not queue a subscribe when
the participant is closed/disconnected. This got the subscribe op
counter out of whack.
This commit is contained in:
Raja Subramanian
2022-11-26 21:42:19 +05:30
committed by GitHub
parent 55718724a9
commit 086009f05a
6 changed files with 105 additions and 22 deletions
+6 -7
View File
@@ -430,10 +430,9 @@ func (t *MediaTrackReceiver) removePendingSubscribeOp(subscriberID livekit.Parti
// AddSubscriber subscribes sub to current mediaTrack
func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) error {
t.addPendingSubscribeOp(sub.ID())
trackID := t.ID()
sub.EnqueueSubscribeTrack(trackID, t.params.IsRelayed, t.addSubscriber)
if sub.EnqueueSubscribeTrack(t.ID(), t.params.IsRelayed, t.addSubscriber) {
t.addPendingSubscribeOp(sub.ID())
}
return nil
}
@@ -509,9 +508,9 @@ func (t *MediaTrackReceiver) RemoveSubscriber(subscriberID livekit.ParticipantID
}
sub := subTrack.Subscriber()
t.addPendingSubscribeOp(sub.ID())
sub.EnqueueUnsubscribeTrack(subTrack.ID(), t.params.IsRelayed, willBeResumed, t.removeSubscriber)
if sub.EnqueueUnsubscribeTrack(subTrack.ID(), t.params.IsRelayed, willBeResumed, t.removeSubscriber) {
t.addPendingSubscribeOp(sub.ID())
}
}
func (t *MediaTrackReceiver) removeSubscriber(subscriberID livekit.ParticipantID, willBeResumed bool) (err error) {
+10 -3
View File
@@ -1948,10 +1948,10 @@ func (p *ParticipantImpl) onAnyTransportNegotiationFailed() {
p.supervisor.Stop()
}
func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, isRelayed bool, f func(sub types.LocalParticipant) error) {
func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, isRelayed bool, f func(sub types.LocalParticipant) error) bool {
// do not queue subscription is participant is already closed/disconnected
if p.isClosed.Load() || p.State() == livekit.ParticipantInfo_DISCONNECTED {
return
return false
}
p.params.Logger.Debugw("queuing subscribe", "trackID", trackID, "relayed", isRelayed)
@@ -1966,9 +1966,15 @@ func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, isRelay
p.lock.Unlock()
go p.ProcessSubscriptionRequestsQueue(trackID)
return true
}
func (p *ParticipantImpl) EnqueueUnsubscribeTrack(trackID livekit.TrackID, isRelayed bool, willBeResumed bool, f func(subscriberID livekit.ParticipantID, willBeResumed bool) error) {
func (p *ParticipantImpl) EnqueueUnsubscribeTrack(
trackID livekit.TrackID,
isRelayed bool,
willBeResumed bool,
f func(subscriberID livekit.ParticipantID, willBeResumed bool) error,
) bool {
p.params.Logger.Debugw("queuing unsubscribe", "trackID", trackID, "relayed", isRelayed)
p.supervisor.UpdateSubscription(trackID, false)
@@ -1982,6 +1988,7 @@ func (p *ParticipantImpl) EnqueueUnsubscribeTrack(trackID livekit.TrackID, isRel
p.lock.Unlock()
go p.ProcessSubscriptionRequestsQueue(trackID)
return true
}
func (p *ParticipantImpl) ProcessSubscriptionRequestsQueue(trackID livekit.TrackID) {
+7 -2
View File
@@ -319,8 +319,13 @@ type LocalParticipant interface {
UncacheDownTrack(rtpTransceiver *webrtc.RTPTransceiver)
GetCachedDownTrack(trackID livekit.TrackID) (*webrtc.RTPTransceiver, sfu.DownTrackState)
EnqueueSubscribeTrack(trackID livekit.TrackID, isRelayed bool, f func(sub LocalParticipant) error)
EnqueueUnsubscribeTrack(trackID livekit.TrackID, isRelayed bool, willBeResumed bool, f func(subscriberID livekit.ParticipantID, willBeResumed bool) error)
EnqueueSubscribeTrack(trackID livekit.TrackID, isRelayed bool, f func(sub LocalParticipant) error) bool
EnqueueUnsubscribeTrack(
trackID livekit.TrackID,
isRelayed bool,
willBeResumed bool,
f func(subscriberID livekit.ParticipantID, willBeResumed bool) error,
) bool
ProcessSubscriptionRequestsQueue(trackID livekit.TrackID)
ClearInProgressAndProcessSubscriptionRequestsQueue(trackID livekit.TrackID)
@@ -166,14 +166,20 @@ type FakeLocalParticipant struct {
debugInfoReturnsOnCall map[int]struct {
result1 map[string]interface{}
}
EnqueueSubscribeTrackStub func(livekit.TrackID, bool, func(sub types.LocalParticipant) error)
EnqueueSubscribeTrackStub func(livekit.TrackID, bool, func(sub types.LocalParticipant) error) bool
enqueueSubscribeTrackMutex sync.RWMutex
enqueueSubscribeTrackArgsForCall []struct {
arg1 livekit.TrackID
arg2 bool
arg3 func(sub types.LocalParticipant) error
}
EnqueueUnsubscribeTrackStub func(livekit.TrackID, bool, bool, func(subscriberID livekit.ParticipantID, willBeResumed bool) error)
enqueueSubscribeTrackReturns struct {
result1 bool
}
enqueueSubscribeTrackReturnsOnCall map[int]struct {
result1 bool
}
EnqueueUnsubscribeTrackStub func(livekit.TrackID, bool, bool, func(subscriberID livekit.ParticipantID, willBeResumed bool) error) bool
enqueueUnsubscribeTrackMutex sync.RWMutex
enqueueUnsubscribeTrackArgsForCall []struct {
arg1 livekit.TrackID
@@ -181,6 +187,12 @@ type FakeLocalParticipant struct {
arg3 bool
arg4 func(subscriberID livekit.ParticipantID, willBeResumed bool) error
}
enqueueUnsubscribeTrackReturns struct {
result1 bool
}
enqueueUnsubscribeTrackReturnsOnCall map[int]struct {
result1 bool
}
GetAdaptiveStreamStub func() bool
getAdaptiveStreamMutex sync.RWMutex
getAdaptiveStreamArgsForCall []struct {
@@ -1533,19 +1545,25 @@ func (fake *FakeLocalParticipant) DebugInfoReturnsOnCall(i int, result1 map[stri
}{result1}
}
func (fake *FakeLocalParticipant) EnqueueSubscribeTrack(arg1 livekit.TrackID, arg2 bool, arg3 func(sub types.LocalParticipant) error) {
func (fake *FakeLocalParticipant) EnqueueSubscribeTrack(arg1 livekit.TrackID, arg2 bool, arg3 func(sub types.LocalParticipant) error) bool {
fake.enqueueSubscribeTrackMutex.Lock()
ret, specificReturn := fake.enqueueSubscribeTrackReturnsOnCall[len(fake.enqueueSubscribeTrackArgsForCall)]
fake.enqueueSubscribeTrackArgsForCall = append(fake.enqueueSubscribeTrackArgsForCall, struct {
arg1 livekit.TrackID
arg2 bool
arg3 func(sub types.LocalParticipant) error
}{arg1, arg2, arg3})
stub := fake.EnqueueSubscribeTrackStub
fakeReturns := fake.enqueueSubscribeTrackReturns
fake.recordInvocation("EnqueueSubscribeTrack", []interface{}{arg1, arg2, arg3})
fake.enqueueSubscribeTrackMutex.Unlock()
if stub != nil {
fake.EnqueueSubscribeTrackStub(arg1, arg2, arg3)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) EnqueueSubscribeTrackCallCount() int {
@@ -1554,7 +1572,7 @@ func (fake *FakeLocalParticipant) EnqueueSubscribeTrackCallCount() int {
return len(fake.enqueueSubscribeTrackArgsForCall)
}
func (fake *FakeLocalParticipant) EnqueueSubscribeTrackCalls(stub func(livekit.TrackID, bool, func(sub types.LocalParticipant) error)) {
func (fake *FakeLocalParticipant) EnqueueSubscribeTrackCalls(stub func(livekit.TrackID, bool, func(sub types.LocalParticipant) error) bool) {
fake.enqueueSubscribeTrackMutex.Lock()
defer fake.enqueueSubscribeTrackMutex.Unlock()
fake.EnqueueSubscribeTrackStub = stub
@@ -1567,8 +1585,32 @@ func (fake *FakeLocalParticipant) EnqueueSubscribeTrackArgsForCall(i int) (livek
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrack(arg1 livekit.TrackID, arg2 bool, arg3 bool, arg4 func(subscriberID livekit.ParticipantID, willBeResumed bool) error) {
func (fake *FakeLocalParticipant) EnqueueSubscribeTrackReturns(result1 bool) {
fake.enqueueSubscribeTrackMutex.Lock()
defer fake.enqueueSubscribeTrackMutex.Unlock()
fake.EnqueueSubscribeTrackStub = nil
fake.enqueueSubscribeTrackReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) EnqueueSubscribeTrackReturnsOnCall(i int, result1 bool) {
fake.enqueueSubscribeTrackMutex.Lock()
defer fake.enqueueSubscribeTrackMutex.Unlock()
fake.EnqueueSubscribeTrackStub = nil
if fake.enqueueSubscribeTrackReturnsOnCall == nil {
fake.enqueueSubscribeTrackReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.enqueueSubscribeTrackReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrack(arg1 livekit.TrackID, arg2 bool, arg3 bool, arg4 func(subscriberID livekit.ParticipantID, willBeResumed bool) error) bool {
fake.enqueueUnsubscribeTrackMutex.Lock()
ret, specificReturn := fake.enqueueUnsubscribeTrackReturnsOnCall[len(fake.enqueueUnsubscribeTrackArgsForCall)]
fake.enqueueUnsubscribeTrackArgsForCall = append(fake.enqueueUnsubscribeTrackArgsForCall, struct {
arg1 livekit.TrackID
arg2 bool
@@ -1576,11 +1618,16 @@ func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrack(arg1 livekit.TrackID,
arg4 func(subscriberID livekit.ParticipantID, willBeResumed bool) error
}{arg1, arg2, arg3, arg4})
stub := fake.EnqueueUnsubscribeTrackStub
fakeReturns := fake.enqueueUnsubscribeTrackReturns
fake.recordInvocation("EnqueueUnsubscribeTrack", []interface{}{arg1, arg2, arg3, arg4})
fake.enqueueUnsubscribeTrackMutex.Unlock()
if stub != nil {
fake.EnqueueUnsubscribeTrackStub(arg1, arg2, arg3, arg4)
return stub(arg1, arg2, arg3, arg4)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackCallCount() int {
@@ -1589,7 +1636,7 @@ func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackCallCount() int {
return len(fake.enqueueUnsubscribeTrackArgsForCall)
}
func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackCalls(stub func(livekit.TrackID, bool, bool, func(subscriberID livekit.ParticipantID, willBeResumed bool) error)) {
func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackCalls(stub func(livekit.TrackID, bool, bool, func(subscriberID livekit.ParticipantID, willBeResumed bool) error) bool) {
fake.enqueueUnsubscribeTrackMutex.Lock()
defer fake.enqueueUnsubscribeTrackMutex.Unlock()
fake.EnqueueUnsubscribeTrackStub = stub
@@ -1602,6 +1649,29 @@ func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackArgsForCall(i int) (liv
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
}
func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackReturns(result1 bool) {
fake.enqueueUnsubscribeTrackMutex.Lock()
defer fake.enqueueUnsubscribeTrackMutex.Unlock()
fake.EnqueueUnsubscribeTrackStub = nil
fake.enqueueUnsubscribeTrackReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) EnqueueUnsubscribeTrackReturnsOnCall(i int, result1 bool) {
fake.enqueueUnsubscribeTrackMutex.Lock()
defer fake.enqueueUnsubscribeTrackMutex.Unlock()
fake.EnqueueUnsubscribeTrackStub = nil
if fake.enqueueUnsubscribeTrackReturnsOnCall == nil {
fake.enqueueUnsubscribeTrackReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.enqueueUnsubscribeTrackReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) GetAdaptiveStream() bool {
fake.getAdaptiveStreamMutex.Lock()
ret, specificReturn := fake.getAdaptiveStreamReturnsOnCall[len(fake.getAdaptiveStreamArgsForCall)]
+3 -1
View File
@@ -495,7 +495,9 @@ func (u *UpTrackManager) maybeRemovePendingSubscriptionLocked(trackID livekit.Tr
}
if sendUpdate && (forceUpdate || found) {
u.params.Logger.Debugw("removing pending subscription", "subscriberID", sub.ID(), "trackID", trackID)
if found {
u.params.Logger.Debugw("removing pending subscription", "subscriberID", sub.ID(), "trackID", trackID)
}
u.opsQueue.Enqueue(func() {
sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true)
})
+1 -1
View File
@@ -453,7 +453,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
}
}()
if !d.bound.Load() {
if !d.bound.Load() || !d.connected.Load() {
return nil
}