From bdc515774e995bf08d0c3d0fa64d74e6436cc785 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 18 Feb 2023 12:08:43 +0530 Subject: [PATCH] Declare migration complete only after publish callback finishes. (#1442) The following sequence caused early migration complete declaration 1. Audio track received 2. Audio track published callback in progress 3. Video track received, this clears the pending track 4. Audio track published callback finishes. This checks for pending tracks. As nothing is pending migration complete declared. 5. Due to the above, the remote video track is closed as not resuming. That causes an unsubscription. Fix - Wait till publish callback to finish to remove a track from pending fully. - Introducing a new map as pending tracks is used in OnClose too. So, did not want to delay removing from it as a close could happen while publish callback is happening. Also, moving the publish callback to a go routine (just like the recent change for running those in a go routine for migrated muted tracks) --- pkg/rtc/participant.go | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index cfab5d862..1ea1b0df5 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -105,8 +105,9 @@ type ParticipantImpl struct { twcc *twcc.Responder // client intended to publish, yet to be reconciled - pendingTracksLock utils.RWMutex - pendingTracks map[string]*pendingTrackInfo + pendingTracksLock utils.RWMutex + pendingTracks map[string]*pendingTrackInfo + pendingPublishingTracks map[livekit.TrackID]*pendingTrackInfo // migrated in muted tracks are not fired need close at participant close mutedTrackNotFired []*MediaTrack @@ -170,6 +171,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { params: params, rtcpCh: make(chan []rtcp.Packet, 100), pendingTracks: make(map[string]*pendingTrackInfo), + pendingPublishingTracks: make(map[livekit.TrackID]*pendingTrackInfo), disallowedSubscriptions: make(map[livekit.TrackID]livekit.ParticipantID), connectedAt: time.Now(), rttUpdatedAt: time.Now(), @@ -546,7 +548,7 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() { p.mutedTrackNotFired = append(p.mutedTrackNotFired, addedTracks...) p.pendingTracksLock.Unlock() - // launch callbacks in goroutine since they could block + // launch callbacks in goroutine since they could block. // callbacks handle webhooks as well as db persistence go func() { for _, t := range addedTracks { @@ -1580,7 +1582,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei if mt.AddReceiver(rtpReceiver, track, p.twcc, mid) { p.removeMutedTrackNotFired(mt) if newTrack { - p.handleTrackPublished(mt) + go p.handleTrackPublished(mt) } } @@ -1648,6 +1650,17 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt) p.UpTrackManager.AddPublishedTrack(mt) + pti := p.pendingTracks[signalCid] + if pti != nil { + if p.pendingPublishingTracks[livekit.TrackID(ti.Sid)] != nil { + p.params.Logger.Infow("unexpected pending publish track", "trackID", ti.Sid) + } + p.pendingPublishingTracks[livekit.TrackID(ti.Sid)] = &pendingTrackInfo{ + trackInfos: []*livekit.TrackInfo{pti.trackInfos[0]}, + migrated: pti.migrated, + } + } + p.pendingTracks[signalCid].trackInfos = p.pendingTracks[signalCid].trackInfos[1:] if len(p.pendingTracks[signalCid].trackInfos) == 0 { delete(p.pendingTracks, signalCid) @@ -1707,6 +1720,10 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack) { track.ToProto(), ) + p.pendingTracksLock.Lock() + delete(p.pendingPublishingTracks, track.ID()) + p.pendingTracksLock.Unlock() + if !p.hasPendingMigratedTrack() { p.SetMigrateState(types.MigrateStateComplete) } @@ -1722,6 +1739,12 @@ func (p *ParticipantImpl) hasPendingMigratedTrack() bool { } } + for _, t := range p.pendingPublishingTracks { + if t.migrated { + return true + } + } + return false }