Unify muted and unmuted migration paths. (#2406)

* Unify muted and unmuted migration paths.

If dynacast had disabled all layers, after a migration, the client did
not restart publish (it is akin to muted track). That failed migration
because migration state machine waits for unmuted tracks to be published
(i. e. server has to receive packets).

If a migrating track is in muted state, server does not wait for
packets. It synthesises the published event and catches up later when
packets actually come in.

Just treating all migrations as the erstwhile muted case. Sythesise
publish whether track is muted or not. In the unmuted case, packets
might arrive soon after whereas in muted case, it will depend on when
unmute happens.

This is tricky stuff. So, will need good testing.

* use muted from track info
This commit is contained in:
Raja Subramanian
2024-01-25 01:24:09 +05:30
committed by GitHub
parent 89c7cec2ad
commit 79cdc2df2e
2 changed files with 43 additions and 57 deletions

View File

@@ -125,6 +125,7 @@ func NewMediaTrack(params MediaTrackParams, ti *livekit.TrackInfo) *MediaTrack {
func (t *MediaTrack) OnSubscribedMaxQualityChange(
f func(
trackID livekit.TrackID,
trackInfo *livekit.TrackInfo,
subscribedQualities []*livekit.SubscribedCodec,
maxSubscribedQualities []types.SubscribedCodecQuality,
) error,
@@ -135,7 +136,7 @@ func (t *MediaTrack) OnSubscribedMaxQualityChange(
handler := func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) {
if f != nil && !t.IsMuted() {
_ = f(t.ID(), subscribedQualities, maxSubscribedQualities)
_ = f(t.ID(), t.ToProto(), subscribedQualities, maxSubscribedQualities)
}
for _, q := range maxSubscribedQualities {

View File

@@ -161,8 +161,8 @@ type ParticipantImpl struct {
pendingTracksLock utils.RWMutex
pendingTracks map[string]*pendingTrackInfo
pendingPublishingTracks map[livekit.TrackID]*pendingTrackInfo
// migrated in muted tracks are not fired need close at participant close
mutedTrackNotFired []*MediaTrack
// migrated in tracks are not fired need close at participant close
pendingMigratedTracks []*MediaTrack
// supported codecs
enabledPublishCodecs []*livekit.Codec
@@ -632,13 +632,15 @@ func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription) er
}
if p.MigrateState() == types.MigrateStateSync {
go p.handleMigrateMutedTrack()
go p.handleMigrateTracks()
}
return nil
}
func (p *ParticipantImpl) handleMigrateMutedTrack() {
// muted track won't send rtp packet, so we add mediatrack manually
func (p *ParticipantImpl) handleMigrateTracks() {
// muted track won't send rtp packet, so it is required to add mediatrack manually.
// But, synthesising track publish for unmuted tracks keeps a consistent path.
// In both csaes (muted and unmuted), when publisher sends media packets, OnTrack would register and go from there.
var addedTracks []*MediaTrack
p.pendingTracksLock.Lock()
for cid, pti := range p.pendingTracks {
@@ -650,17 +652,14 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() {
p.pubLogger.Warnw("too many pending migrated tracks", nil, "trackID", pti.trackInfos[0].Sid, "count", len(pti.trackInfos), "cid", cid)
}
ti := pti.trackInfos[0]
if ti.Muted {
mt := p.addMigrateMutedTrack(cid, ti)
if mt != nil {
addedTracks = append(addedTracks, mt)
} else {
p.pubLogger.Warnw("could not find migrated muted track", nil, "cid", cid)
}
mt := p.addMigratedTrack(cid, pti.trackInfos[0])
if mt != nil {
addedTracks = append(addedTracks, mt)
} else {
p.pubLogger.Warnw("could not find migrated muted track", nil, "cid", cid)
}
}
p.mutedTrackNotFired = append(p.mutedTrackNotFired, addedTracks...)
p.pendingMigratedTracks = append(p.pendingMigratedTracks, addedTracks...)
if len(addedTracks) != 0 {
p.dirty.Store(true)
@@ -676,12 +675,12 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() {
}()
}
func (p *ParticipantImpl) removeMutedTrackNotFired(mt *MediaTrack) {
func (p *ParticipantImpl) removePendingMigratedTrack(mt *MediaTrack) {
p.pendingTracksLock.Lock()
for i, t := range p.mutedTrackNotFired {
for i, t := range p.pendingMigratedTracks {
if t == mt {
p.mutedTrackNotFired[i] = p.mutedTrackNotFired[len(p.mutedTrackNotFired)-1]
p.mutedTrackNotFired = p.mutedTrackNotFired[:len(p.mutedTrackNotFired)-1]
p.pendingMigratedTracks[i] = p.pendingMigratedTracks[len(p.pendingMigratedTracks)-1]
p.pendingMigratedTracks = p.pendingMigratedTracks[:len(p.pendingMigratedTracks)-1]
break
}
}
@@ -766,11 +765,11 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
p.pendingTracksLock.Lock()
p.pendingTracks = make(map[string]*pendingTrackInfo)
closeMutedTrack := p.mutedTrackNotFired
p.mutedTrackNotFired = p.mutedTrackNotFired[:0]
pendingMigratedTracksToClose := p.pendingMigratedTracks
p.pendingMigratedTracks = p.pendingMigratedTracks[:0]
p.pendingTracksLock.Unlock()
for _, t := range closeMutedTrack {
for _, t := range pendingMigratedTracksToClose {
t.Close(isExpectedToResume)
}
@@ -1552,7 +1551,12 @@ func (p *ParticipantImpl) onStreamStateChange(update *streamallocator.StreamStat
})
}
func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) error {
func (p *ParticipantImpl) onSubscribedMaxQualityChange(
trackID livekit.TrackID,
trackInfo *livekit.TrackInfo,
subscribedQualities []*livekit.SubscribedCodec,
maxSubscribedQualities []types.SubscribedCodecQuality,
) error {
if p.params.DisableDynacast {
return nil
}
@@ -1561,6 +1565,17 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID,
return nil
}
// send layer info about max subscription changes to telemetry
for _, maxSubscribedQuality := range maxSubscribedQualities {
p.params.Telemetry.TrackMaxSubscribedVideoQuality(
context.Background(),
p.ID(),
trackInfo,
maxSubscribedQuality.CodecMime,
maxSubscribedQuality.Quality,
)
}
// normalize the codec name
for _, subscribedQuality := range subscribedQualities {
subscribedQuality.Codec = strings.ToLower(strings.TrimLeft(subscribedQuality.Codec, "video/"))
@@ -1572,36 +1587,6 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID,
SubscribedCodecs: subscribedQualities,
}
// send layer info about max subscription changes to telemetry
track := p.UpTrackManager.GetPublishedTrack(trackID)
var layerInfo map[livekit.VideoQuality]*livekit.VideoLayer
if track != nil {
layers := track.ToProto().Layers
layerInfo = make(map[livekit.VideoQuality]*livekit.VideoLayer, len(layers))
for _, layer := range layers {
layerInfo[layer.Quality] = layer
}
}
for _, maxSubscribedQuality := range maxSubscribedQualities {
ti := &livekit.TrackInfo{
Sid: string(trackID),
Type: livekit.TrackType_VIDEO,
}
if info, ok := layerInfo[maxSubscribedQuality.Quality]; ok {
ti.Width = info.Width
ti.Height = info.Height
}
p.params.Telemetry.TrackMaxSubscribedVideoQuality(
context.Background(),
p.ID(),
ti,
maxSubscribedQuality.CodecMime,
maxSubscribedQuality.Quality,
)
}
p.pubLogger.Debugw(
"sending max subscribed quality",
"trackID", trackID,
@@ -1864,7 +1849,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei
p.pendingTracksLock.Unlock()
if mt.AddReceiver(rtpReceiver, track, p.twcc, mid) {
p.removeMutedTrackNotFired(mt)
p.removePendingMigratedTrack(mt)
}
if newTrack {
@@ -1881,8 +1866,8 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei
return mt, newTrack
}
func (p *ParticipantImpl) addMigrateMutedTrack(cid string, ti *livekit.TrackInfo) *MediaTrack {
p.pubLogger.Infow("add migrate muted track", "cid", cid, "trackID", ti.Sid, "track", logger.Proto(ti))
func (p *ParticipantImpl) addMigratedTrack(cid string, ti *livekit.TrackInfo) *MediaTrack {
p.pubLogger.Infow("add migrated track", "cid", cid, "trackID", ti.Sid, "track", logger.Proto(ti))
rtpReceiver := p.TransportManager.GetPublisherRTPReceiver(ti.Mid)
if rtpReceiver == nil {
p.pubLogger.Errorw("could not find receiver for migrated track", nil, "trackID", ti.Sid)
@@ -1929,7 +1914,7 @@ func (p *ParticipantImpl) addMigrateMutedTrack(cid string, ti *livekit.TrackInfo
}
}
mt.SetSimulcast(ti.Simulcast)
mt.SetMuted(true)
mt.SetMuted(ti.Muted)
return mt
}