Declare migration complete only after publish callback finishes. (#1442)

The following sequence caused early migration complete declaration
1. Audio track received
2. Audio track published callback in progress
3. Video track received, this clears the pending track
4. Audio track published callback finishes. This checks for pending
   tracks. As nothing is pending migration complete declared.
5. Due to the above, the remote video track is closed as not resuming.
   That causes an unsubscription.

Fix
- Wait till publish callback to finish to remove a track from pending
  fully.
- Introducing a new map as pending tracks is used in OnClose too. So,
  did not want to delay removing from it as a close could happen while
  publish callback is happening.

Also, moving the publish callback to a go routine (just like the recent
change for running those in a go routine for migrated muted tracks)
This commit is contained in:
Raja Subramanian
2023-02-18 12:08:43 +05:30
committed by GitHub
parent 7a2d9b3d61
commit bdc515774e

View File

@@ -105,8 +105,9 @@ type ParticipantImpl struct {
twcc *twcc.Responder
// client intended to publish, yet to be reconciled
pendingTracksLock utils.RWMutex
pendingTracks map[string]*pendingTrackInfo
pendingTracksLock utils.RWMutex
pendingTracks map[string]*pendingTrackInfo
pendingPublishingTracks map[livekit.TrackID]*pendingTrackInfo
// migrated in muted tracks are not fired need close at participant close
mutedTrackNotFired []*MediaTrack
@@ -170,6 +171,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
params: params,
rtcpCh: make(chan []rtcp.Packet, 100),
pendingTracks: make(map[string]*pendingTrackInfo),
pendingPublishingTracks: make(map[livekit.TrackID]*pendingTrackInfo),
disallowedSubscriptions: make(map[livekit.TrackID]livekit.ParticipantID),
connectedAt: time.Now(),
rttUpdatedAt: time.Now(),
@@ -546,7 +548,7 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() {
p.mutedTrackNotFired = append(p.mutedTrackNotFired, addedTracks...)
p.pendingTracksLock.Unlock()
// launch callbacks in goroutine since they could block
// launch callbacks in goroutine since they could block.
// callbacks handle webhooks as well as db persistence
go func() {
for _, t := range addedTracks {
@@ -1580,7 +1582,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei
if mt.AddReceiver(rtpReceiver, track, p.twcc, mid) {
p.removeMutedTrackNotFired(mt)
if newTrack {
p.handleTrackPublished(mt)
go p.handleTrackPublished(mt)
}
}
@@ -1648,6 +1650,17 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt)
p.UpTrackManager.AddPublishedTrack(mt)
pti := p.pendingTracks[signalCid]
if pti != nil {
if p.pendingPublishingTracks[livekit.TrackID(ti.Sid)] != nil {
p.params.Logger.Infow("unexpected pending publish track", "trackID", ti.Sid)
}
p.pendingPublishingTracks[livekit.TrackID(ti.Sid)] = &pendingTrackInfo{
trackInfos: []*livekit.TrackInfo{pti.trackInfos[0]},
migrated: pti.migrated,
}
}
p.pendingTracks[signalCid].trackInfos = p.pendingTracks[signalCid].trackInfos[1:]
if len(p.pendingTracks[signalCid].trackInfos) == 0 {
delete(p.pendingTracks, signalCid)
@@ -1707,6 +1720,10 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack) {
track.ToProto(),
)
p.pendingTracksLock.Lock()
delete(p.pendingPublishingTracks, track.ID())
p.pendingTracksLock.Unlock()
if !p.hasPendingMigratedTrack() {
p.SetMigrateState(types.MigrateStateComplete)
}
@@ -1722,6 +1739,12 @@ func (p *ParticipantImpl) hasPendingMigratedTrack() bool {
}
}
for _, t := range p.pendingPublishingTracks {
if t.migrated {
return true
}
}
return false
}