diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index cfab5d862..1ea1b0df5 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -105,8 +105,9 @@ type ParticipantImpl struct { twcc *twcc.Responder // client intended to publish, yet to be reconciled - pendingTracksLock utils.RWMutex - pendingTracks map[string]*pendingTrackInfo + pendingTracksLock utils.RWMutex + pendingTracks map[string]*pendingTrackInfo + pendingPublishingTracks map[livekit.TrackID]*pendingTrackInfo // migrated in muted tracks are not fired need close at participant close mutedTrackNotFired []*MediaTrack @@ -170,6 +171,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { params: params, rtcpCh: make(chan []rtcp.Packet, 100), pendingTracks: make(map[string]*pendingTrackInfo), + pendingPublishingTracks: make(map[livekit.TrackID]*pendingTrackInfo), disallowedSubscriptions: make(map[livekit.TrackID]livekit.ParticipantID), connectedAt: time.Now(), rttUpdatedAt: time.Now(), @@ -546,7 +548,7 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() { p.mutedTrackNotFired = append(p.mutedTrackNotFired, addedTracks...) p.pendingTracksLock.Unlock() - // launch callbacks in goroutine since they could block + // launch callbacks in goroutine since they could block. // callbacks handle webhooks as well as db persistence go func() { for _, t := range addedTracks { @@ -1580,7 +1582,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei if mt.AddReceiver(rtpReceiver, track, p.twcc, mid) { p.removeMutedTrackNotFired(mt) if newTrack { - p.handleTrackPublished(mt) + go p.handleTrackPublished(mt) } } @@ -1648,6 +1650,17 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt) p.UpTrackManager.AddPublishedTrack(mt) + pti := p.pendingTracks[signalCid] + if pti != nil { + if p.pendingPublishingTracks[livekit.TrackID(ti.Sid)] != nil { + p.params.Logger.Infow("unexpected pending publish track", "trackID", ti.Sid) + } + p.pendingPublishingTracks[livekit.TrackID(ti.Sid)] = &pendingTrackInfo{ + trackInfos: []*livekit.TrackInfo{pti.trackInfos[0]}, + migrated: pti.migrated, + } + } + p.pendingTracks[signalCid].trackInfos = p.pendingTracks[signalCid].trackInfos[1:] if len(p.pendingTracks[signalCid].trackInfos) == 0 { delete(p.pendingTracks, signalCid) @@ -1707,6 +1720,10 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack) { track.ToProto(), ) + p.pendingTracksLock.Lock() + delete(p.pendingPublishingTracks, track.ID()) + p.pendingTracksLock.Unlock() + if !p.hasPendingMigratedTrack() { p.SetMigrateState(types.MigrateStateComplete) } @@ -1722,6 +1739,12 @@ func (p *ParticipantImpl) hasPendingMigratedTrack() bool { } } + for _, t := range p.pendingPublishingTracks { + if t.migrated { + return true + } + } + return false }