diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 3cc45dba6..ba714228d 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -125,6 +125,7 @@ func NewMediaTrack(params MediaTrackParams, ti *livekit.TrackInfo) *MediaTrack { func (t *MediaTrack) OnSubscribedMaxQualityChange( f func( trackID livekit.TrackID, + trackInfo *livekit.TrackInfo, subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality, ) error, @@ -135,7 +136,7 @@ func (t *MediaTrack) OnSubscribedMaxQualityChange( handler := func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) { if f != nil && !t.IsMuted() { - _ = f(t.ID(), subscribedQualities, maxSubscribedQualities) + _ = f(t.ID(), t.ToProto(), subscribedQualities, maxSubscribedQualities) } for _, q := range maxSubscribedQualities { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index de62da35e..b1c9857c5 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -161,8 +161,8 @@ type ParticipantImpl struct { pendingTracksLock utils.RWMutex pendingTracks map[string]*pendingTrackInfo pendingPublishingTracks map[livekit.TrackID]*pendingTrackInfo - // migrated in muted tracks are not fired need close at participant close - mutedTrackNotFired []*MediaTrack + // migrated in tracks are not fired need close at participant close + pendingMigratedTracks []*MediaTrack // supported codecs enabledPublishCodecs []*livekit.Codec @@ -632,13 +632,15 @@ func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription) er } if p.MigrateState() == types.MigrateStateSync { - go p.handleMigrateMutedTrack() + go p.handleMigrateTracks() } return nil } -func (p *ParticipantImpl) handleMigrateMutedTrack() { - // muted track won't send rtp packet, so we add mediatrack manually +func (p *ParticipantImpl) handleMigrateTracks() { + // muted track won't send rtp packet, so it is required to add mediatrack manually. + // But, synthesising track publish for unmuted tracks keeps a consistent path. + // In both csaes (muted and unmuted), when publisher sends media packets, OnTrack would register and go from there. var addedTracks []*MediaTrack p.pendingTracksLock.Lock() for cid, pti := range p.pendingTracks { @@ -650,17 +652,14 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() { p.pubLogger.Warnw("too many pending migrated tracks", nil, "trackID", pti.trackInfos[0].Sid, "count", len(pti.trackInfos), "cid", cid) } - ti := pti.trackInfos[0] - if ti.Muted { - mt := p.addMigrateMutedTrack(cid, ti) - if mt != nil { - addedTracks = append(addedTracks, mt) - } else { - p.pubLogger.Warnw("could not find migrated muted track", nil, "cid", cid) - } + mt := p.addMigratedTrack(cid, pti.trackInfos[0]) + if mt != nil { + addedTracks = append(addedTracks, mt) + } else { + p.pubLogger.Warnw("could not find migrated muted track", nil, "cid", cid) } } - p.mutedTrackNotFired = append(p.mutedTrackNotFired, addedTracks...) + p.pendingMigratedTracks = append(p.pendingMigratedTracks, addedTracks...) if len(addedTracks) != 0 { p.dirty.Store(true) @@ -676,12 +675,12 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() { }() } -func (p *ParticipantImpl) removeMutedTrackNotFired(mt *MediaTrack) { +func (p *ParticipantImpl) removePendingMigratedTrack(mt *MediaTrack) { p.pendingTracksLock.Lock() - for i, t := range p.mutedTrackNotFired { + for i, t := range p.pendingMigratedTracks { if t == mt { - p.mutedTrackNotFired[i] = p.mutedTrackNotFired[len(p.mutedTrackNotFired)-1] - p.mutedTrackNotFired = p.mutedTrackNotFired[:len(p.mutedTrackNotFired)-1] + p.pendingMigratedTracks[i] = p.pendingMigratedTracks[len(p.pendingMigratedTracks)-1] + p.pendingMigratedTracks = p.pendingMigratedTracks[:len(p.pendingMigratedTracks)-1] break } } @@ -766,11 +765,11 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea p.pendingTracksLock.Lock() p.pendingTracks = make(map[string]*pendingTrackInfo) - closeMutedTrack := p.mutedTrackNotFired - p.mutedTrackNotFired = p.mutedTrackNotFired[:0] + pendingMigratedTracksToClose := p.pendingMigratedTracks + p.pendingMigratedTracks = p.pendingMigratedTracks[:0] p.pendingTracksLock.Unlock() - for _, t := range closeMutedTrack { + for _, t := range pendingMigratedTracksToClose { t.Close(isExpectedToResume) } @@ -1552,7 +1551,12 @@ func (p *ParticipantImpl) onStreamStateChange(update *streamallocator.StreamStat }) } -func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) error { +func (p *ParticipantImpl) onSubscribedMaxQualityChange( + trackID livekit.TrackID, + trackInfo *livekit.TrackInfo, + subscribedQualities []*livekit.SubscribedCodec, + maxSubscribedQualities []types.SubscribedCodecQuality, +) error { if p.params.DisableDynacast { return nil } @@ -1561,6 +1565,17 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, return nil } + // send layer info about max subscription changes to telemetry + for _, maxSubscribedQuality := range maxSubscribedQualities { + p.params.Telemetry.TrackMaxSubscribedVideoQuality( + context.Background(), + p.ID(), + trackInfo, + maxSubscribedQuality.CodecMime, + maxSubscribedQuality.Quality, + ) + } + // normalize the codec name for _, subscribedQuality := range subscribedQualities { subscribedQuality.Codec = strings.ToLower(strings.TrimLeft(subscribedQuality.Codec, "video/")) @@ -1572,36 +1587,6 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, SubscribedCodecs: subscribedQualities, } - // send layer info about max subscription changes to telemetry - track := p.UpTrackManager.GetPublishedTrack(trackID) - var layerInfo map[livekit.VideoQuality]*livekit.VideoLayer - if track != nil { - layers := track.ToProto().Layers - layerInfo = make(map[livekit.VideoQuality]*livekit.VideoLayer, len(layers)) - for _, layer := range layers { - layerInfo[layer.Quality] = layer - } - } - - for _, maxSubscribedQuality := range maxSubscribedQualities { - ti := &livekit.TrackInfo{ - Sid: string(trackID), - Type: livekit.TrackType_VIDEO, - } - if info, ok := layerInfo[maxSubscribedQuality.Quality]; ok { - ti.Width = info.Width - ti.Height = info.Height - } - - p.params.Telemetry.TrackMaxSubscribedVideoQuality( - context.Background(), - p.ID(), - ti, - maxSubscribedQuality.CodecMime, - maxSubscribedQuality.Quality, - ) - } - p.pubLogger.Debugw( "sending max subscribed quality", "trackID", trackID, @@ -1864,7 +1849,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei p.pendingTracksLock.Unlock() if mt.AddReceiver(rtpReceiver, track, p.twcc, mid) { - p.removeMutedTrackNotFired(mt) + p.removePendingMigratedTrack(mt) } if newTrack { @@ -1881,8 +1866,8 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei return mt, newTrack } -func (p *ParticipantImpl) addMigrateMutedTrack(cid string, ti *livekit.TrackInfo) *MediaTrack { - p.pubLogger.Infow("add migrate muted track", "cid", cid, "trackID", ti.Sid, "track", logger.Proto(ti)) +func (p *ParticipantImpl) addMigratedTrack(cid string, ti *livekit.TrackInfo) *MediaTrack { + p.pubLogger.Infow("add migrated track", "cid", cid, "trackID", ti.Sid, "track", logger.Proto(ti)) rtpReceiver := p.TransportManager.GetPublisherRTPReceiver(ti.Mid) if rtpReceiver == nil { p.pubLogger.Errorw("could not find receiver for migrated track", nil, "trackID", ti.Sid) @@ -1929,7 +1914,7 @@ func (p *ParticipantImpl) addMigrateMutedTrack(cid string, ti *livekit.TrackInfo } } mt.SetSimulcast(ti.Simulcast) - mt.SetMuted(true) + mt.SetMuted(ti.Muted) return mt }