From 79cdc2df2ea5bd6004fbfc84e8be9c7ad6709c34 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 25 Jan 2024 01:24:09 +0530 Subject: [PATCH] Unify muted and unmuted migration paths. (#2406) * Unify muted and unmuted migration paths. If dynacast had disabled all layers, after a migration, the client did not restart publish (it is akin to muted track). That failed migration because migration state machine waits for unmuted tracks to be published (i. e. server has to receive packets). If a migrating track is in muted state, server does not wait for packets. It synthesises the published event and catches up later when packets actually come in. Just treating all migrations as the erstwhile muted case. Sythesise publish whether track is muted or not. In the unmuted case, packets might arrive soon after whereas in muted case, it will depend on when unmute happens. This is tricky stuff. So, will need good testing. * use muted from track info --- pkg/rtc/mediatrack.go | 3 +- pkg/rtc/participant.go | 97 ++++++++++++++++++------------------------ 2 files changed, 43 insertions(+), 57 deletions(-) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 3cc45dba6..ba714228d 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -125,6 +125,7 @@ func NewMediaTrack(params MediaTrackParams, ti *livekit.TrackInfo) *MediaTrack { func (t *MediaTrack) OnSubscribedMaxQualityChange( f func( trackID livekit.TrackID, + trackInfo *livekit.TrackInfo, subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality, ) error, @@ -135,7 +136,7 @@ func (t *MediaTrack) OnSubscribedMaxQualityChange( handler := func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) { if f != nil && !t.IsMuted() { - _ = f(t.ID(), subscribedQualities, maxSubscribedQualities) + _ = f(t.ID(), t.ToProto(), subscribedQualities, maxSubscribedQualities) } for _, q := range maxSubscribedQualities { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index de62da35e..b1c9857c5 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -161,8 +161,8 @@ type ParticipantImpl struct { pendingTracksLock utils.RWMutex pendingTracks map[string]*pendingTrackInfo pendingPublishingTracks map[livekit.TrackID]*pendingTrackInfo - // migrated in muted tracks are not fired need close at participant close - mutedTrackNotFired []*MediaTrack + // migrated in tracks are not fired need close at participant close + pendingMigratedTracks []*MediaTrack // supported codecs enabledPublishCodecs []*livekit.Codec @@ -632,13 +632,15 @@ func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription) er } if p.MigrateState() == types.MigrateStateSync { - go p.handleMigrateMutedTrack() + go p.handleMigrateTracks() } return nil } -func (p *ParticipantImpl) handleMigrateMutedTrack() { - // muted track won't send rtp packet, so we add mediatrack manually +func (p *ParticipantImpl) handleMigrateTracks() { + // muted track won't send rtp packet, so it is required to add mediatrack manually. + // But, synthesising track publish for unmuted tracks keeps a consistent path. + // In both csaes (muted and unmuted), when publisher sends media packets, OnTrack would register and go from there. var addedTracks []*MediaTrack p.pendingTracksLock.Lock() for cid, pti := range p.pendingTracks { @@ -650,17 +652,14 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() { p.pubLogger.Warnw("too many pending migrated tracks", nil, "trackID", pti.trackInfos[0].Sid, "count", len(pti.trackInfos), "cid", cid) } - ti := pti.trackInfos[0] - if ti.Muted { - mt := p.addMigrateMutedTrack(cid, ti) - if mt != nil { - addedTracks = append(addedTracks, mt) - } else { - p.pubLogger.Warnw("could not find migrated muted track", nil, "cid", cid) - } + mt := p.addMigratedTrack(cid, pti.trackInfos[0]) + if mt != nil { + addedTracks = append(addedTracks, mt) + } else { + p.pubLogger.Warnw("could not find migrated muted track", nil, "cid", cid) } } - p.mutedTrackNotFired = append(p.mutedTrackNotFired, addedTracks...) + p.pendingMigratedTracks = append(p.pendingMigratedTracks, addedTracks...) if len(addedTracks) != 0 { p.dirty.Store(true) @@ -676,12 +675,12 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() { }() } -func (p *ParticipantImpl) removeMutedTrackNotFired(mt *MediaTrack) { +func (p *ParticipantImpl) removePendingMigratedTrack(mt *MediaTrack) { p.pendingTracksLock.Lock() - for i, t := range p.mutedTrackNotFired { + for i, t := range p.pendingMigratedTracks { if t == mt { - p.mutedTrackNotFired[i] = p.mutedTrackNotFired[len(p.mutedTrackNotFired)-1] - p.mutedTrackNotFired = p.mutedTrackNotFired[:len(p.mutedTrackNotFired)-1] + p.pendingMigratedTracks[i] = p.pendingMigratedTracks[len(p.pendingMigratedTracks)-1] + p.pendingMigratedTracks = p.pendingMigratedTracks[:len(p.pendingMigratedTracks)-1] break } } @@ -766,11 +765,11 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea p.pendingTracksLock.Lock() p.pendingTracks = make(map[string]*pendingTrackInfo) - closeMutedTrack := p.mutedTrackNotFired - p.mutedTrackNotFired = p.mutedTrackNotFired[:0] + pendingMigratedTracksToClose := p.pendingMigratedTracks + p.pendingMigratedTracks = p.pendingMigratedTracks[:0] p.pendingTracksLock.Unlock() - for _, t := range closeMutedTrack { + for _, t := range pendingMigratedTracksToClose { t.Close(isExpectedToResume) } @@ -1552,7 +1551,12 @@ func (p *ParticipantImpl) onStreamStateChange(update *streamallocator.StreamStat }) } -func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) error { +func (p *ParticipantImpl) onSubscribedMaxQualityChange( + trackID livekit.TrackID, + trackInfo *livekit.TrackInfo, + subscribedQualities []*livekit.SubscribedCodec, + maxSubscribedQualities []types.SubscribedCodecQuality, +) error { if p.params.DisableDynacast { return nil } @@ -1561,6 +1565,17 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, return nil } + // send layer info about max subscription changes to telemetry + for _, maxSubscribedQuality := range maxSubscribedQualities { + p.params.Telemetry.TrackMaxSubscribedVideoQuality( + context.Background(), + p.ID(), + trackInfo, + maxSubscribedQuality.CodecMime, + maxSubscribedQuality.Quality, + ) + } + // normalize the codec name for _, subscribedQuality := range subscribedQualities { subscribedQuality.Codec = strings.ToLower(strings.TrimLeft(subscribedQuality.Codec, "video/")) @@ -1572,36 +1587,6 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, SubscribedCodecs: subscribedQualities, } - // send layer info about max subscription changes to telemetry - track := p.UpTrackManager.GetPublishedTrack(trackID) - var layerInfo map[livekit.VideoQuality]*livekit.VideoLayer - if track != nil { - layers := track.ToProto().Layers - layerInfo = make(map[livekit.VideoQuality]*livekit.VideoLayer, len(layers)) - for _, layer := range layers { - layerInfo[layer.Quality] = layer - } - } - - for _, maxSubscribedQuality := range maxSubscribedQualities { - ti := &livekit.TrackInfo{ - Sid: string(trackID), - Type: livekit.TrackType_VIDEO, - } - if info, ok := layerInfo[maxSubscribedQuality.Quality]; ok { - ti.Width = info.Width - ti.Height = info.Height - } - - p.params.Telemetry.TrackMaxSubscribedVideoQuality( - context.Background(), - p.ID(), - ti, - maxSubscribedQuality.CodecMime, - maxSubscribedQuality.Quality, - ) - } - p.pubLogger.Debugw( "sending max subscribed quality", "trackID", trackID, @@ -1864,7 +1849,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei p.pendingTracksLock.Unlock() if mt.AddReceiver(rtpReceiver, track, p.twcc, mid) { - p.removeMutedTrackNotFired(mt) + p.removePendingMigratedTrack(mt) } if newTrack { @@ -1881,8 +1866,8 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei return mt, newTrack } -func (p *ParticipantImpl) addMigrateMutedTrack(cid string, ti *livekit.TrackInfo) *MediaTrack { - p.pubLogger.Infow("add migrate muted track", "cid", cid, "trackID", ti.Sid, "track", logger.Proto(ti)) +func (p *ParticipantImpl) addMigratedTrack(cid string, ti *livekit.TrackInfo) *MediaTrack { + p.pubLogger.Infow("add migrated track", "cid", cid, "trackID", ti.Sid, "track", logger.Proto(ti)) rtpReceiver := p.TransportManager.GetPublisherRTPReceiver(ti.Mid) if rtpReceiver == nil { p.pubLogger.Errorw("could not find receiver for migrated track", nil, "trackID", ti.Sid) @@ -1929,7 +1914,7 @@ func (p *ParticipantImpl) addMigrateMutedTrack(cid string, ti *livekit.TrackInfo } } mt.SetSimulcast(ti.Simulcast) - mt.SetMuted(true) + mt.SetMuted(ti.Muted) return mt }