From 4d7df612ec6d539b449ec34e07da0bf70ab0c019 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 9 Aug 2022 11:47:06 +0530 Subject: [PATCH] Refactor DynacastQuality & MediaLossProxy into separate modules (#894) * WIP commit * Refactor media loss proxy * Use DynacastQuality and MediaLossProxy from MediaTrack * fix test * Remove unused param * Remove unused interfaces * Move interface methods to local * Split out DynacastManager * have to add codec to dynacast manager * RUnlock * fix restart * Adding API to force quality and also maintain closed state * Address PR comments --- pkg/rtc/dynacastmanager.go | 275 +++++++++++++++ pkg/rtc/dynacastmanager_test.go | 271 +++++++++++++++ pkg/rtc/dynacastquality.go | 162 +++++++++ pkg/rtc/medialossproxy.go | 87 +++++ pkg/rtc/mediatrack.go | 77 ++++- pkg/rtc/mediatrack_test.go | 336 ------------------- pkg/rtc/mediatrackreceiver.go | 108 ++---- pkg/rtc/mediatracksubscriptions.go | 322 +----------------- pkg/rtc/participant.go | 29 +- pkg/rtc/types/interfaces.go | 15 +- pkg/rtc/types/typesfakes/fake_media_track.go | 117 ------- pkg/rtc/types/typesfakes/fake_participant.go | 161 --------- pkg/rtc/uptrackmanager.go | 28 -- pkg/sfu/downtrack.go | 3 + 14 files changed, 933 insertions(+), 1058 deletions(-) create mode 100644 pkg/rtc/dynacastmanager.go create mode 100644 pkg/rtc/dynacastmanager_test.go create mode 100644 pkg/rtc/dynacastquality.go create mode 100644 pkg/rtc/medialossproxy.go diff --git a/pkg/rtc/dynacastmanager.go b/pkg/rtc/dynacastmanager.go new file mode 100644 index 000000000..e671ea4f2 --- /dev/null +++ b/pkg/rtc/dynacastmanager.go @@ -0,0 +1,275 @@ +package rtc + +import ( + "sync" + "time" + + "github.com/bep/debounce" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/utils" +) + +type DynacastManagerParams struct { + DynacastPauseDelay time.Duration + Logger logger.Logger +} + +type DynacastManager struct { + params DynacastManagerParams + + lock sync.RWMutex + dynacastQuality map[string]*DynacastQuality // mime type => DynacastQuality + maxSubscribedQuality map[string]livekit.VideoQuality + committedMaxSubscribedQuality map[string]livekit.VideoQuality + + maxSubscribedQualityDebounce func(func()) + + qualityNotifyOpQueue *utils.OpsQueue + + isClosed bool + + onSubscribedMaxQualityChange func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) +} + +func NewDynacastManager(params DynacastManagerParams) *DynacastManager { + d := &DynacastManager{ + params: params, + dynacastQuality: make(map[string]*DynacastQuality), + maxSubscribedQuality: make(map[string]livekit.VideoQuality), + committedMaxSubscribedQuality: make(map[string]livekit.VideoQuality), + maxSubscribedQualityDebounce: debounce.New(params.DynacastPauseDelay), + qualityNotifyOpQueue: utils.NewOpsQueue(params.Logger, "quality-notify", 100), + } + d.qualityNotifyOpQueue.Start() + return d +} + +func (d *DynacastManager) OnSubscribedMaxQualityChange(f func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality)) { + d.lock.Lock() + d.onSubscribedMaxQualityChange = f + d.lock.Unlock() +} + +func (d *DynacastManager) AddCodec(mime string) { + d.getOrCreateDynacastQuality(mime) +} + +func (d *DynacastManager) Restart() { + d.lock.RLock() + dqs := d.getDynacastQualitiesLocked() + d.lock.RUnlock() + + for _, dq := range dqs { + dq.Restart() + d.committedMaxSubscribedQuality[dq.MimeType()] = dq.MaxSubscribedQuality() + } +} + +func (d *DynacastManager) Close() { + d.qualityNotifyOpQueue.Stop() + + d.lock.Lock() + dqs := d.getDynacastQualitiesLocked() + d.dynacastQuality = make(map[string]*DynacastQuality) + + d.isClosed = true + d.lock.Unlock() + + for _, dq := range dqs { + dq.Stop() + } +} + +// +// THere are situations like track unmute or streaming from a sifferent node +// where subscribed quality needs to sent to the provider immediately. +// This bypasses any debouncing and forces a subscribed quality update +// with immediate effect. +// +func (d *DynacastManager) ForceUpdate() { + d.update(true) +} + +// +// It is possible for tracks to be in pending close state. When track +// is waiting to be closed, a node is not streaming a track. This can +// be used to force an update announcing that subscribed quality is OFF, +// i.e. indicating not pulling track any more. +// +func (d *DynacastManager) ForceQuality(quality livekit.VideoQuality) { + d.lock.Lock() + defer d.lock.Unlock() + + for mime := range d.committedMaxSubscribedQuality { + d.committedMaxSubscribedQuality[mime] = quality + } + + d.enqueueSubscribedQualityChange() +} + +func (d *DynacastManager) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, mime string, quality livekit.VideoQuality) { + dq := d.getOrCreateDynacastQuality(mime) + if dq != nil { + dq.NotifySubscriberMaxQuality(subscriberID, quality) + } +} + +func (d *DynacastManager) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []types.SubscribedCodecQuality) { + for _, quality := range qualities { + dq := d.getOrCreateDynacastQuality(quality.CodecMime) + if dq != nil { + dq.NotifySubscriberNodeMaxQuality(nodeID, quality.Quality) + } + } +} + +func (d *DynacastManager) getOrCreateDynacastQuality(mime string) *DynacastQuality { + d.lock.Lock() + defer d.lock.Unlock() + + if d.isClosed { + return nil + } + + if dq := d.dynacastQuality[mime]; dq != nil { + return dq + } + + dq := NewDynacastQuality(DynacastQualityParams{ + MimeType: mime, + DynacastPauseDelay: d.params.DynacastPauseDelay, + Logger: d.params.Logger, + }) + dq.OnSubscribedMaxQualityChange(func(maxQuality livekit.VideoQuality) { + d.updateMaxQualityForMime(mime, maxQuality) + }) + dq.Start() + + d.dynacastQuality[mime] = dq + d.committedMaxSubscribedQuality[mime] = dq.MaxSubscribedQuality() + + return dq +} + +func (d *DynacastManager) getDynacastQualitiesLocked() []*DynacastQuality { + dqs := make([]*DynacastQuality, 0, len(d.dynacastQuality)) + for _, dq := range d.dynacastQuality { + dqs = append(dqs, dq) + } + + return dqs +} + +func (d *DynacastManager) updateMaxQualityForMime(mime string, maxQuality livekit.VideoQuality) { + d.lock.Lock() + d.maxSubscribedQuality[mime] = maxQuality + d.lock.Unlock() + + d.update(false) +} + +func (d *DynacastManager) update(force bool) { + d.lock.Lock() + + // add or remove of a mime triggers an update + changed := len(d.maxSubscribedQuality) != len(d.committedMaxSubscribedQuality) + downgradesOnly := !changed + if !changed { + for mime, quality := range d.maxSubscribedQuality { + if cq, ok := d.committedMaxSubscribedQuality[mime]; ok { + if cq != quality { + changed = true + } + + if (cq == livekit.VideoQuality_OFF && quality != livekit.VideoQuality_OFF) || (cq != livekit.VideoQuality_OFF && quality != livekit.VideoQuality_OFF && cq < quality) { + downgradesOnly = false + } + } + } + } + + if !force { + if !changed { + d.lock.Unlock() + return + } + + if downgradesOnly { + d.params.Logger.Debugw("debouncing quality downgrade", + "committedMaxSubscribedQuality", d.committedMaxSubscribedQuality, + "maxSubscribedQuality", d.maxSubscribedQuality, + ) + d.maxSubscribedQualityDebounce(func() { + d.update(true) + }) + d.lock.Unlock() + return + } + } + + // clear debounce on send + d.maxSubscribedQualityDebounce(func() {}) + + d.params.Logger.Infow("committing quality change", + "force", force, + "committedMaxSubscribedQuality", d.committedMaxSubscribedQuality, + "maxSubscribedQuality", d.maxSubscribedQuality, + ) + + // commit change + d.committedMaxSubscribedQuality = make(map[string]livekit.VideoQuality, len(d.maxSubscribedQuality)) + for mime, quality := range d.maxSubscribedQuality { + d.committedMaxSubscribedQuality[mime] = quality + } + + d.enqueueSubscribedQualityChange() + d.lock.Unlock() +} + +func (d *DynacastManager) enqueueSubscribedQualityChange() { + if d.isClosed || d.onSubscribedMaxQualityChange == nil { + return + } + + subscribedCodecs := make([]*livekit.SubscribedCodec, 0, len(d.committedMaxSubscribedQuality)) + maxSubscribedQualities := make([]types.SubscribedCodecQuality, 0, len(d.committedMaxSubscribedQuality)) + for mime, quality := range d.committedMaxSubscribedQuality { + maxSubscribedQualities = append(maxSubscribedQualities, types.SubscribedCodecQuality{ + CodecMime: mime, + Quality: quality, + }) + + if quality == livekit.VideoQuality_OFF { + subscribedCodecs = append(subscribedCodecs, &livekit.SubscribedCodec{ + Codec: mime, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: false}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }) + } else { + var subscribedQualities []*livekit.SubscribedQuality + for q := livekit.VideoQuality_LOW; q <= livekit.VideoQuality_HIGH; q++ { + subscribedQualities = append(subscribedQualities, &livekit.SubscribedQuality{ + Quality: q, + Enabled: q <= quality, + }) + } + subscribedCodecs = append(subscribedCodecs, &livekit.SubscribedCodec{ + Codec: mime, + Qualities: subscribedQualities, + }) + } + } + + d.params.Logger.Debugw("subscribedMaxQualityChange", + "subscribedCodecs", subscribedCodecs, + "maxSubscribedQualities", maxSubscribedQualities) + d.qualityNotifyOpQueue.Enqueue(func() { + d.onSubscribedMaxQualityChange(subscribedCodecs, maxSubscribedQualities) + }) +} diff --git a/pkg/rtc/dynacastmanager_test.go b/pkg/rtc/dynacastmanager_test.go new file mode 100644 index 000000000..ae600575a --- /dev/null +++ b/pkg/rtc/dynacastmanager_test.go @@ -0,0 +1,271 @@ +package rtc + +import ( + "sort" + "sync" + "testing" + "time" + + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/protocol/livekit" + "github.com/pion/webrtc/v3" + "github.com/stretchr/testify/require" +) + +func TestSubscribedMaxQuality(t *testing.T) { + subscribedCodecsAsString := func(c1 []*livekit.SubscribedCodec) string { + sort.Slice(c1, func(i, j int) bool { return c1[i].Codec < c1[j].Codec }) + var s1 string + for _, c := range c1 { + s1 += c.String() + } + return s1 + } + t.Run("subscribers muted", func(t *testing.T) { + dm := NewDynacastManager(DynacastManagerParams{}) + var lock sync.Mutex + actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0) + dm.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) { + lock.Lock() + actualSubscribedQualities = subscribedQualities + lock.Unlock() + }) + + dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_HIGH) + dm.NotifySubscriberMaxQuality("s2", webrtc.MimeTypeAV1, livekit.VideoQuality_HIGH) + + // mute all subscribers of vp8 + dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_OFF) + + expectedSubscribedQualities := []*livekit.SubscribedCodec{ + { + Codec: webrtc.MimeTypeVP8, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: false}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }, + { + Codec: webrtc.MimeTypeAV1, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: true}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, + {Quality: livekit.VideoQuality_HIGH, Enabled: true}, + }, + }, + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) + }, 10*time.Second, 100*time.Millisecond) + }) + + t.Run("subscribers max quality", func(t *testing.T) { + dm := NewDynacastManager(DynacastManagerParams{ + DynacastPauseDelay: 100 * time.Millisecond, + }) + + lock := sync.RWMutex{} + lock.Lock() + actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0) + lock.Unlock() + dm.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) { + lock.Lock() + actualSubscribedQualities = subscribedQualities + lock.Unlock() + }) + + dm.maxSubscribedQuality = map[string]livekit.VideoQuality{ + webrtc.MimeTypeVP8: livekit.VideoQuality_LOW, + webrtc.MimeTypeAV1: livekit.VideoQuality_LOW, + } + dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_HIGH) + dm.NotifySubscriberMaxQuality("s2", webrtc.MimeTypeVP8, livekit.VideoQuality_MEDIUM) + dm.NotifySubscriberMaxQuality("s3", webrtc.MimeTypeAV1, livekit.VideoQuality_MEDIUM) + + expectedSubscribedQualities := []*livekit.SubscribedCodec{ + { + Codec: webrtc.MimeTypeVP8, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: true}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, + {Quality: livekit.VideoQuality_HIGH, Enabled: true}, + }, + }, + { + Codec: webrtc.MimeTypeAV1, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: true}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }, + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) + }, 10*time.Second, 100*time.Millisecond) + + // "s1" dropping to MEDIUM should disable HIGH layer + dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_MEDIUM) + + expectedSubscribedQualities = []*livekit.SubscribedCodec{ + { + Codec: webrtc.MimeTypeVP8, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: true}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }, + { + Codec: webrtc.MimeTypeAV1, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: true}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }, + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) + }, 10*time.Second, 100*time.Millisecond) + + // "s1" , "s2" , "s3" dropping to LOW should disable HIGH & MEDIUM + dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_LOW) + dm.NotifySubscriberMaxQuality("s2", webrtc.MimeTypeVP8, livekit.VideoQuality_LOW) + dm.NotifySubscriberMaxQuality("s3", webrtc.MimeTypeAV1, livekit.VideoQuality_LOW) + + expectedSubscribedQualities = []*livekit.SubscribedCodec{ + { + Codec: webrtc.MimeTypeVP8, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: true}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }, + { + Codec: webrtc.MimeTypeAV1, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: true}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }, + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) + }, 10*time.Second, 100*time.Millisecond) + + // muting "s2" only should not disable all qualities of vp8, no change of expected qualities + dm.NotifySubscriberMaxQuality("s2", webrtc.MimeTypeVP8, livekit.VideoQuality_OFF) + + time.Sleep(100 * time.Millisecond) + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) + }, 10*time.Second, 100*time.Millisecond) + + // muting "s1" and s3 also should disable all qualities + dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_OFF) + dm.NotifySubscriberMaxQuality("s3", webrtc.MimeTypeAV1, livekit.VideoQuality_OFF) + + expectedSubscribedQualities = []*livekit.SubscribedCodec{ + { + Codec: webrtc.MimeTypeVP8, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: false}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }, + { + Codec: webrtc.MimeTypeAV1, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: false}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }, + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) + }, 10*time.Second, 100*time.Millisecond) + + // unmuting "s1" should enable vp8 previously set max quality + dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_LOW) + + expectedSubscribedQualities = []*livekit.SubscribedCodec{ + { + Codec: webrtc.MimeTypeVP8, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: true}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }, + { + Codec: webrtc.MimeTypeAV1, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: false}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }, + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) + }, 10*time.Second, 100*time.Millisecond) + + // a higher quality from a different node should trigger that quality + dm.NotifySubscriberNodeMaxQuality("n1", []types.SubscribedCodecQuality{ + {CodecMime: webrtc.MimeTypeVP8, Quality: livekit.VideoQuality_HIGH}, + {CodecMime: webrtc.MimeTypeAV1, Quality: livekit.VideoQuality_MEDIUM}, + }) + + expectedSubscribedQualities = []*livekit.SubscribedCodec{ + { + Codec: webrtc.MimeTypeVP8, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: true}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, + {Quality: livekit.VideoQuality_HIGH, Enabled: true}, + }, + }, + { + Codec: webrtc.MimeTypeAV1, + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: true}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }, + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) + }, 10*time.Second, 100*time.Millisecond) + }) +} diff --git a/pkg/rtc/dynacastquality.go b/pkg/rtc/dynacastquality.go new file mode 100644 index 000000000..059e468a9 --- /dev/null +++ b/pkg/rtc/dynacastquality.go @@ -0,0 +1,162 @@ +package rtc + +import ( + "sync" + "time" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +const ( + initialQualityUpdateWait = 10 * time.Second +) + +type DynacastQualityParams struct { + MimeType string + DynacastPauseDelay time.Duration + Logger logger.Logger +} + +// DynacastQuality manages max subscribed quality of a single receiver of a media track +type DynacastQuality struct { + params DynacastQualityParams + + // quality level enable/disable + lock sync.RWMutex + initialized bool + maxSubscriberQuality map[livekit.ParticipantID]livekit.VideoQuality + maxSubscriberNodeQuality map[livekit.NodeID]livekit.VideoQuality + maxSubscribedQuality livekit.VideoQuality + maxQualityTimer *time.Timer + + onSubscribedMaxQualityChange func(maxSubscribedQuality livekit.VideoQuality) +} + +func NewDynacastQuality(params DynacastQualityParams) *DynacastQuality { + return &DynacastQuality{ + params: params, + maxSubscriberQuality: make(map[livekit.ParticipantID]livekit.VideoQuality), + maxSubscriberNodeQuality: make(map[livekit.NodeID]livekit.VideoQuality), + } +} + +func (d *DynacastQuality) Start() { + d.reset() +} + +func (d *DynacastQuality) Restart() { + d.reset() +} + +func (d *DynacastQuality) Stop() { + d.stopMaxQualityTimer() +} + +func (d *DynacastQuality) OnSubscribedMaxQualityChange(f func(maxSubscribedQuality livekit.VideoQuality)) { + d.onSubscribedMaxQualityChange = f +} + +func (d *DynacastQuality) MimeType() string { + return d.params.MimeType +} + +func (d *DynacastQuality) MaxSubscribedQuality() livekit.VideoQuality { + d.lock.RLock() + defer d.lock.RUnlock() + + return d.maxSubscribedQuality +} + +func (d *DynacastQuality) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, quality livekit.VideoQuality) { + d.lock.Lock() + if quality == livekit.VideoQuality_OFF { + delete(d.maxSubscriberQuality, subscriberID) + } else { + d.maxSubscriberQuality[subscriberID] = quality + } + d.lock.Unlock() + + d.updateQualityChange() +} + +func (d *DynacastQuality) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quality livekit.VideoQuality) { + d.lock.Lock() + if quality == livekit.VideoQuality_OFF { + delete(d.maxSubscriberNodeQuality, nodeID) + } else { + d.maxSubscriberNodeQuality[nodeID] = quality + } + d.lock.Unlock() + + d.updateQualityChange() +} + +func (d *DynacastQuality) reset() { + d.lock.Lock() + d.initialized = false + d.maxSubscribedQuality = livekit.VideoQuality_HIGH + d.lock.Unlock() + + d.startMaxQualityTimer() +} + +func (d *DynacastQuality) updateQualityChange() { + d.lock.Lock() + maxSubscribedQuality := livekit.VideoQuality_OFF + for _, subQuality := range d.maxSubscriberQuality { + if maxSubscribedQuality == livekit.VideoQuality_OFF || (subQuality != livekit.VideoQuality_OFF && subQuality > maxSubscribedQuality) { + maxSubscribedQuality = subQuality + } + } + for _, nodeQuality := range d.maxSubscriberNodeQuality { + if maxSubscribedQuality == livekit.VideoQuality_OFF || (nodeQuality != livekit.VideoQuality_OFF && nodeQuality > maxSubscribedQuality) { + maxSubscribedQuality = nodeQuality + } + } + + if maxSubscribedQuality == d.maxSubscribedQuality && d.initialized { + d.lock.Unlock() + return + } + + d.initialized = true + d.maxSubscribedQuality = maxSubscribedQuality + d.params.Logger.Infow("notifying quality change", + "mime", d.params.MimeType, + "maxSubscriberQuality", d.maxSubscriberQuality, + "maxSubscriberNodeQuality", d.maxSubscriberNodeQuality, + "maxSubscribedQuality", d.maxSubscribedQuality, + ) + onSubscribedMaxQualityChange := d.onSubscribedMaxQualityChange + d.lock.Unlock() + + if onSubscribedMaxQualityChange != nil { + onSubscribedMaxQualityChange(maxSubscribedQuality) + } +} + +func (d *DynacastQuality) startMaxQualityTimer() { + d.lock.Lock() + defer d.lock.Unlock() + + if d.maxQualityTimer != nil { + d.maxQualityTimer.Stop() + d.maxQualityTimer = nil + } + + d.maxQualityTimer = time.AfterFunc(initialQualityUpdateWait, func() { + d.stopMaxQualityTimer() + d.updateQualityChange() + }) +} + +func (d *DynacastQuality) stopMaxQualityTimer() { + d.lock.Lock() + defer d.lock.Unlock() + + if d.maxQualityTimer != nil { + d.maxQualityTimer.Stop() + d.maxQualityTimer = nil + } +} diff --git a/pkg/rtc/medialossproxy.go b/pkg/rtc/medialossproxy.go new file mode 100644 index 000000000..ec47092fc --- /dev/null +++ b/pkg/rtc/medialossproxy.go @@ -0,0 +1,87 @@ +package rtc + +import ( + "sync" + "time" + + "github.com/pion/rtcp" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + + "github.com/livekit/livekit-server/pkg/sfu" +) + +const ( + downLostUpdateDelta = time.Second +) + +type MediaLossProxyParams struct { + Logger logger.Logger +} + +type MediaLossProxy struct { + params MediaLossProxyParams + + lock sync.Mutex + maxDownFracLost uint8 + maxDownFracLostTs time.Time + + onMediaLossUpdate func(fractionalLoss uint8) +} + +func NewMediaLossProxy(params MediaLossProxyParams) *MediaLossProxy { + return &MediaLossProxy{params: params} +} + +func (m *MediaLossProxy) OnMediaLossUpdate(f func(fractionalLoss uint8)) { + m.lock.Lock() + m.onMediaLossUpdate = f + m.lock.Unlock() +} + +func (m *MediaLossProxy) HandleMaxLossFeedback(_ *sfu.DownTrack, report *rtcp.ReceiverReport) { + m.lock.Lock() + for _, rr := range report.Reports { + if m.maxDownFracLost < rr.FractionLost { + m.maxDownFracLost = rr.FractionLost + } + } + m.lock.Unlock() + + m.maybeUpdateLoss() +} + +func (m *MediaLossProxy) NotifySubscriberNodeMediaLoss(_nodeID livekit.NodeID, fractionalLoss uint8) { + m.lock.Lock() + if m.maxDownFracLost < fractionalLoss { + m.maxDownFracLost = fractionalLoss + } + m.lock.Unlock() + + m.maybeUpdateLoss() +} + +func (m *MediaLossProxy) maybeUpdateLoss() { + var ( + shouldUpdate bool + maxLost uint8 + ) + + m.lock.Lock() + now := time.Now() + if now.Sub(m.maxDownFracLostTs) > downLostUpdateDelta { + shouldUpdate = true + maxLost = m.maxDownFracLost + m.maxDownFracLost = 0 + m.maxDownFracLostTs = now + } + onMediaLossUpdate := m.onMediaLossUpdate + m.lock.Unlock() + + if shouldUpdate { + if onMediaLossUpdate != nil { + onMediaLossUpdate(maxLost) + } + } +} diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 88cfa0293..221cc393b 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -14,10 +14,12 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/twcc" "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/livekit-server/pkg/utils" ) // MediaTrack represents a WebRTC track that needs to be forwarded @@ -28,6 +30,9 @@ type MediaTrack struct { buffer *buffer.Buffer *MediaTrackReceiver + *MediaLossProxy + + dynacastManager *DynacastManager lock sync.RWMutex } @@ -55,6 +60,10 @@ type MediaTrackParams struct { func NewMediaTrack(params MediaTrackParams) *MediaTrack { t := &MediaTrack{ params: params, + dynacastManager: NewDynacastManager(DynacastManagerParams{ + DynacastPauseDelay: params.VideoConfig.DynacastPauseDelay, + Logger: params.Logger, + }), } t.MediaTrackReceiver = NewMediaTrackReceiver(MediaTrackReceiverParams{ @@ -66,16 +75,9 @@ func NewMediaTrack(params MediaTrackParams) *MediaTrack { BufferFactory: params.BufferFactory, ReceiverConfig: params.ReceiverConfig, SubscriberConfig: params.SubscriberConfig, - VideoConfig: params.VideoConfig, Telemetry: params.Telemetry, Logger: params.Logger, }) - t.MediaTrackReceiver.OnMediaLossUpdate(func(fractionalLoss uint8) { - if t.buffer != nil && t.Kind() == livekit.TrackType_AUDIO { - // ok to access buffer since receivers are added before subscribers - t.buffer.SetLastFractionLostReport(fractionalLoss) - } - }) t.MediaTrackReceiver.OnVideoLayerUpdate(func(layers []*livekit.VideoLayer) { t.params.Telemetry.TrackPublishedUpdate(context.Background(), t.PublisherID(), &livekit.TrackInfo{ @@ -87,9 +89,45 @@ func NewMediaTrack(params MediaTrackParams) *MediaTrack { }) }) + t.MediaLossProxy = NewMediaLossProxy(MediaLossProxyParams{ + Logger: params.Logger, + }) + t.MediaLossProxy.OnMediaLossUpdate(func(fractionalLoss uint8) { + if t.buffer != nil && t.Kind() == livekit.TrackType_AUDIO { + // ok to access buffer since receivers are added before subscribers + t.buffer.SetLastFractionLostReport(fractionalLoss) + } + }) + + t.MediaTrackReceiver.OnSetupReceiver(func(mime string) { + t.dynacastManager.AddCodec(mime) + }) + t.MediaTrackReceiver.OnSubscriberMaxQualityChange(func(subscriberID livekit.ParticipantID, codec webrtc.RTPCodecCapability, layer int32) { + t.dynacastManager.NotifySubscriberMaxQuality(subscriberID, codec.MimeType, utils.QualityForSpatialLayer(layer)) + }) + t.MediaTrackReceiver.OnMediaLossFeedback(t.MediaLossProxy.HandleMaxLossFeedback) + return t } +func (t *MediaTrack) OnSubscribedMaxQualityChange(f func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) error) { + t.dynacastManager.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) { + if f != nil && !t.IsMuted() { + _ = f(t.ID(), subscribedQualities, maxSubscribedQualities) + } + for _, q := range maxSubscribedQualities { + receiver := t.Receiver(q.CodecMime) + if receiver != nil { + receiver.SetMaxExpectedSpatialLayer(utils.SpatialLayerForQuality(q.Quality)) + } + } + }) +} + +func (t *MediaTrack) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []types.SubscribedCodecQuality) { + t.dynacastManager.NotifySubscriberNodeMaxQuality(nodeID, qualities) +} + func (t *MediaTrack) SignalCid() string { return t.params.SignalCid } @@ -130,7 +168,6 @@ func (t *MediaTrack) SetPendingCodecSid(codecs []*livekit.SimulcastCodec) { // AddReceiver adds a new RTP receiver to the track, returns true when receiver represents a new codec func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, twcc *twcc.Responder, mid string) bool { - var newCodec bool buff, rtcpReader := t.params.BufferFactory.GetBufferPair(uint32(track.SSRC())) if buff == nil || rtcpReader == nil { @@ -183,6 +220,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra t.RemoveAllSubscribers(false) t.MediaTrackReceiver.ClearReceiver(mime) if t.MediaTrackReceiver.TryClose() { + t.dynacastManager.Close() t.params.Telemetry.TrackUnpublished( context.Background(), t.PublisherID(), @@ -281,3 +319,26 @@ func (t *MediaTrack) OnMaxLayerChange(maxLayer int32) { } t.params.Telemetry.TrackPublishedUpdate(context.Background(), t.PublisherID(), ti) } + +func (t *MediaTrack) Restart() { + t.MediaTrackReceiver.Restart() + + t.dynacastManager.Restart() +} + +func (t *MediaTrack) Close() { + t.dynacastManager.Close() + + t.MediaTrackReceiver.Close() +} + +func (t *MediaTrack) SetMuted(muted bool) { + // update quality based on subscription if unmuting. + // This will queue up the current state, but subscriber + // driven changes could update it. + if !muted { + t.dynacastManager.ForceUpdate() + } + + t.MediaTrackReceiver.SetMuted(muted) +} diff --git a/pkg/rtc/mediatrack_test.go b/pkg/rtc/mediatrack_test.go index 0afbe62c0..5def4e0b0 100644 --- a/pkg/rtc/mediatrack_test.go +++ b/pkg/rtc/mediatrack_test.go @@ -1,18 +1,11 @@ package rtc import ( - "sort" - "sync" "testing" - "time" - "github.com/pion/webrtc/v3" "github.com/stretchr/testify/require" "github.com/livekit/protocol/livekit" - - "github.com/livekit/livekit-server/pkg/config" - "github.com/livekit/livekit-server/pkg/rtc/types" ) func TestTrackInfo(t *testing.T) { @@ -106,332 +99,3 @@ func TestGetQualityForDimension(t *testing.T) { require.Equal(t, livekit.VideoQuality_HIGH, mt.GetQualityForDimension(1000, 700)) }) } - -func TestSubscribedMaxQuality(t *testing.T) { - subscribedCodecsAsString := func(c1 []*livekit.SubscribedCodec) string { - sort.Slice(c1, func(i, j int) bool { return c1[i].Codec < c1[j].Codec }) - var s1 string - for _, c := range c1 { - s1 += c.String() - } - return s1 - } - t.Run("subscribers muted", func(t *testing.T) { - mt := NewMediaTrack(MediaTrackParams{ - TrackInfo: &livekit.TrackInfo{ - Sid: "v1", - Type: livekit.TrackType_VIDEO, - Width: 1080, - Height: 720, - Layers: []*livekit.VideoLayer{ - { - Quality: livekit.VideoQuality_LOW, - Width: 480, - Height: 270, - }, - { - Quality: livekit.VideoQuality_MEDIUM, - Width: 960, - Height: 540, - }, - { - Quality: livekit.VideoQuality_HIGH, - Width: 1080, - Height: 720, - }, - }, - }, - }) - mt.Start() - var lock sync.Mutex - actualTrackID := livekit.TrackID("") - actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0) - mt.OnSubscribedMaxQualityChange(func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) error { - lock.Lock() - actualTrackID = trackID - actualSubscribedQualities = subscribedQualities - lock.Unlock() - return nil - }) - - mt.AddCodec(webrtc.MimeTypeVP8) - mt.AddCodec(webrtc.MimeTypeAV1) - - mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_HIGH) - mt.notifySubscriberMaxQuality("s2", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeAV1}, livekit.VideoQuality_HIGH) - - // mute all subscribers of vp8 - mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_OFF) - - expectedSubscribedQualities := []*livekit.SubscribedCodec{ - { - Codec: webrtc.MimeTypeVP8, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: false}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, - }, - }, - { - Codec: webrtc.MimeTypeAV1, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: true}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, - {Quality: livekit.VideoQuality_HIGH, Enabled: true}, - }, - }, - } - require.Eventually(t, func() bool { - lock.Lock() - defer lock.Unlock() - - return livekit.TrackID("v1") == actualTrackID && - subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) - }, 10*time.Second, 100*time.Millisecond) - }) - - t.Run("subscribers max quality", func(t *testing.T) { - mt := NewMediaTrack(MediaTrackParams{ - TrackInfo: &livekit.TrackInfo{ - Sid: "v1", - Type: livekit.TrackType_VIDEO, - Width: 1080, - Height: 720, - Layers: []*livekit.VideoLayer{ - { - Quality: livekit.VideoQuality_LOW, - Width: 480, - Height: 270, - }, - { - Quality: livekit.VideoQuality_MEDIUM, - Width: 960, - Height: 540, - }, - { - Quality: livekit.VideoQuality_HIGH, - Width: 1080, - Height: 720, - }, - }, - }, - VideoConfig: config.VideoConfig{ - DynacastPauseDelay: 100 * time.Millisecond, - }, - }) - mt.Start() - - mt.AddCodec(webrtc.MimeTypeVP8) - mt.AddCodec(webrtc.MimeTypeAV1) - - lock := sync.RWMutex{} - lock.Lock() - actualTrackID := livekit.TrackID("") - actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0) - lock.Unlock() - mt.OnSubscribedMaxQualityChange(func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) error { - lock.Lock() - actualTrackID = trackID - actualSubscribedQualities = subscribedQualities - lock.Unlock() - return nil - }) - - mt.maxSubscribedQuality = map[string]livekit.VideoQuality{ - webrtc.MimeTypeVP8: livekit.VideoQuality_LOW, - webrtc.MimeTypeAV1: livekit.VideoQuality_LOW, - } - mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_HIGH) - mt.notifySubscriberMaxQuality("s2", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_MEDIUM) - mt.notifySubscriberMaxQuality("s3", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeAV1}, livekit.VideoQuality_MEDIUM) - - expectedSubscribedQualities := []*livekit.SubscribedCodec{ - { - Codec: webrtc.MimeTypeVP8, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: true}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, - {Quality: livekit.VideoQuality_HIGH, Enabled: true}, - }, - }, - { - Codec: webrtc.MimeTypeAV1, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: true}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, - }, - }, - } - require.Eventually(t, func() bool { - lock.Lock() - defer lock.Unlock() - - return livekit.TrackID("v1") == actualTrackID && - subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) - }, 10*time.Second, 100*time.Millisecond) - - // "s1" dropping to MEDIUM should disable HIGH layer - mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_MEDIUM) - - expectedSubscribedQualities = []*livekit.SubscribedCodec{ - { - Codec: webrtc.MimeTypeVP8, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: true}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, - }, - }, - { - Codec: webrtc.MimeTypeAV1, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: true}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, - }, - }, - } - require.Eventually(t, func() bool { - lock.Lock() - defer lock.Unlock() - - return livekit.TrackID("v1") == actualTrackID && - subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) - }, 10*time.Second, 100*time.Millisecond) - - // "s1" , "s2" , "s3" dropping to LOW should disable HIGH & MEDIUM - mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_LOW) - mt.notifySubscriberMaxQuality("s2", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_LOW) - mt.notifySubscriberMaxQuality("s3", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeAV1}, livekit.VideoQuality_LOW) - - expectedSubscribedQualities = []*livekit.SubscribedCodec{ - { - Codec: webrtc.MimeTypeVP8, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: true}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, - }, - }, - { - Codec: webrtc.MimeTypeAV1, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: true}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, - }, - }, - } - require.Eventually(t, func() bool { - lock.Lock() - defer lock.Unlock() - - return livekit.TrackID("v1") == actualTrackID && - subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) - }, 10*time.Second, 100*time.Millisecond) - - // muting "s2" only should not disable all qualities of vp8, no change of expected qualities - mt.notifySubscriberMaxQuality("s2", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_OFF) - - time.Sleep(100 * time.Millisecond) - require.Eventually(t, func() bool { - lock.Lock() - defer lock.Unlock() - - return livekit.TrackID("v1") == actualTrackID && - subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) - }, 10*time.Second, 100*time.Millisecond) - - // muting "s1" and s3 also should disable all qualities - mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_OFF) - mt.notifySubscriberMaxQuality("s3", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeAV1}, livekit.VideoQuality_OFF) - - expectedSubscribedQualities = []*livekit.SubscribedCodec{ - { - Codec: webrtc.MimeTypeVP8, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: false}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, - }, - }, - { - Codec: webrtc.MimeTypeAV1, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: false}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, - }, - }, - } - require.Eventually(t, func() bool { - lock.Lock() - defer lock.Unlock() - - return livekit.TrackID("v1") == actualTrackID && - subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) - }, 10*time.Second, 100*time.Millisecond) - - // unmuting "s1" should enable vp8 previously set max quality - mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_LOW) - - expectedSubscribedQualities = []*livekit.SubscribedCodec{ - { - Codec: webrtc.MimeTypeVP8, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: true}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, - }, - }, - { - Codec: webrtc.MimeTypeAV1, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: false}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, - }, - }, - } - require.Eventually(t, func() bool { - lock.Lock() - defer lock.Unlock() - - return livekit.TrackID("v1") == actualTrackID && - subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) - }, 10*time.Second, 100*time.Millisecond) - - // a higher quality from a different node should trigger that quality - mt.NotifySubscriberNodeMaxQuality("n1", []types.SubscribedCodecQuality{ - {CodecMime: webrtc.MimeTypeVP8, Quality: livekit.VideoQuality_HIGH}, - {CodecMime: webrtc.MimeTypeAV1, Quality: livekit.VideoQuality_MEDIUM}, - }) - - expectedSubscribedQualities = []*livekit.SubscribedCodec{ - { - Codec: webrtc.MimeTypeVP8, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: true}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, - {Quality: livekit.VideoQuality_HIGH, Enabled: true}, - }, - }, - { - Codec: webrtc.MimeTypeAV1, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: true}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, - }, - }, - } - require.Eventually(t, func() bool { - lock.Lock() - defer lock.Unlock() - - return livekit.TrackID("v1") == actualTrackID && - subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) - }, 10*time.Second, 100*time.Millisecond) - }) -} diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 42c21d3ef..56bfa2cff 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -5,7 +5,6 @@ import ( "sort" "strings" "sync" - "time" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" @@ -15,7 +14,6 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" @@ -24,7 +22,6 @@ import ( ) const ( - downLostUpdateDelta = time.Second layerSelectionTolerance = 0.9 ) @@ -51,7 +48,6 @@ type MediaTrackReceiverParams struct { BufferFactory *buffer.Factory ReceiverConfig ReceiverConfig SubscriberConfig DirectionConfig - VideoConfig config.VideoConfig Telemetry telemetry.TelemetryService Logger logger.Logger } @@ -69,14 +65,10 @@ type MediaTrackReceiver struct { potentialCodecs []webrtc.RTPCodecParameters pendingSubscribeOp map[livekit.ParticipantID]int - // track audio fraction lost - downFracLostLock sync.Mutex - maxDownFracLost uint8 - maxDownFracLostTs time.Time - onMediaLossUpdate func(fractionalLoss uint8) - onVideoLayerUpdate func(layers []*livekit.VideoLayer) - - onClose []func() + onSetupReceiver func(mime string) + onMediaLossFeedback func(dt *sfu.DownTrack, report *rtcp.ReceiverReport) + onVideoLayerUpdate func(layers []*livekit.VideoLayer) + onClose []func() *MediaTrackSubscriptions } @@ -94,7 +86,6 @@ func NewMediaTrackReceiver(params MediaTrackReceiverParams) *MediaTrackReceiver BufferFactory: params.BufferFactory, ReceiverConfig: params.ReceiverConfig, SubscriberConfig: params.SubscriberConfig, - VideoConfig: t.params.VideoConfig, Telemetry: params.Telemetry, Logger: params.Logger, }) @@ -124,8 +115,12 @@ func (t *MediaTrackReceiver) Restart() { for _, receiver := range receivers { receiver.SetMaxExpectedSpatialLayer(utils.SpatialLayerForQuality(livekit.VideoQuality_HIGH)) } +} - t.MediaTrackSubscriptions.Restart() +func (t *MediaTrackReceiver) OnSetupReceiver(f func(mime string)) { + t.lock.Lock() + t.onSetupReceiver = f + t.lock.Unlock() } func (t *MediaTrackReceiver) SetupReceiver(receiver sfu.TrackReceiver, priority int, mid string) { @@ -172,12 +167,13 @@ func (t *MediaTrackReceiver) SetupReceiver(receiver sfu.TrackReceiver, priority t.shadowReceiversLocked() + onSetupReceiver := t.onSetupReceiver t.params.Logger.Debugw("setup receiver", "mime", receiver.Codec().MimeType, "priority", priority, "receivers", t.receiversShadow) t.lock.Unlock() - t.MediaTrackSubscriptions.AddCodec(receiver.Codec().MimeType) - - t.MediaTrackSubscriptions.Start() + if onSetupReceiver != nil { + onSetupReceiver(receiver.Codec().MimeType) + } } func (t *MediaTrackReceiver) SetPotentialCodecs(codecs []webrtc.RTPCodecParameters, headers []webrtc.RTPHeaderExtensionParameter) { @@ -238,13 +234,7 @@ func (t *MediaTrackReceiver) ClearReceiver(mime string) { } t.shadowReceiversLocked() - - stopSubscription := len(t.receiversShadow) == 0 t.lock.Unlock() - - if stopSubscription { - t.MediaTrackSubscriptions.Stop() - } } func (t *MediaTrackReceiver) ClearAllReceivers() { @@ -252,11 +242,10 @@ func (t *MediaTrackReceiver) ClearAllReceivers() { t.receivers = t.receivers[:0] t.receiversShadow = nil t.lock.Unlock() - t.MediaTrackSubscriptions.Stop() } -func (t *MediaTrackReceiver) OnMediaLossUpdate(f func(fractionalLoss uint8)) { - t.onMediaLossUpdate = f +func (t *MediaTrackReceiver) OnMediaLossFeedback(f func(dt *sfu.DownTrack, rr *rtcp.ReceiverReport)) { + t.onMediaLossFeedback = f } func (t *MediaTrackReceiver) OnVideoLayerUpdate(f func(layers []*livekit.VideoLayer)) { @@ -280,7 +269,6 @@ func (t *MediaTrackReceiver) Close() { onclose := t.onClose t.lock.RUnlock() - t.MediaTrackSubscriptions.Close() for _, f := range onclose { f() } @@ -662,53 +650,11 @@ func (t *MediaTrackReceiver) GetAudioLevel() (float64, bool) { func (t *MediaTrackReceiver) onDownTrackCreated(downTrack *sfu.DownTrack) { if t.Kind() == livekit.TrackType_AUDIO { - downTrack.AddReceiverReportListener(t.handleMaxLossFeedback) - } -} - -// handles max loss for audio streams -func (t *MediaTrackReceiver) handleMaxLossFeedback(_ *sfu.DownTrack, report *rtcp.ReceiverReport) { - t.downFracLostLock.Lock() - for _, rr := range report.Reports { - if t.maxDownFracLost < rr.FractionLost { - t.maxDownFracLost = rr.FractionLost - } - } - t.downFracLostLock.Unlock() - - t.maybeUpdateLoss() -} - -func (t *MediaTrackReceiver) NotifySubscriberNodeMediaLoss(_nodeID livekit.NodeID, fractionalLoss uint8) { - t.downFracLostLock.Lock() - if t.maxDownFracLost < fractionalLoss { - t.maxDownFracLost = fractionalLoss - } - t.downFracLostLock.Unlock() - - t.maybeUpdateLoss() -} - -func (t *MediaTrackReceiver) maybeUpdateLoss() { - var ( - shouldUpdate bool - maxLost uint8 - ) - - t.downFracLostLock.Lock() - now := time.Now() - if now.Sub(t.maxDownFracLostTs) > downLostUpdateDelta { - shouldUpdate = true - maxLost = t.maxDownFracLost - t.maxDownFracLost = 0 - t.maxDownFracLostTs = now - } - t.downFracLostLock.Unlock() - - if shouldUpdate { - if t.onMediaLossUpdate != nil { - t.onMediaLossUpdate(maxLost) - } + downTrack.AddReceiverReportListener(func(dt *sfu.DownTrack, rr *rtcp.ReceiverReport) { + if t.onMediaLossFeedback != nil { + t.onMediaLossFeedback(dt, rr) + } + }) } } @@ -782,20 +728,6 @@ func (t *MediaTrackReceiver) SetRTT(rtt uint32) { } } -func (t *MediaTrackReceiver) OnSubscribedMaxQualityChange(f func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) error) { - t.MediaTrackSubscriptions.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) { - if f != nil && !t.IsMuted() { - _ = f(t.ID(), subscribedQualities, maxSubscribedQualities) - } - for _, q := range maxSubscribedQualities { - receiver := t.Receiver(q.CodecMime) - if receiver != nil { - receiver.SetMaxExpectedSpatialLayer(utils.SpatialLayerForQuality(q.Quality)) - } - } - }) -} - // --------------------------- func VideoQualityToRID(q livekit.VideoQuality) string { diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index fb0aaf444..c00bedaf9 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/bep/debounce" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/rtcerr" @@ -15,16 +14,10 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/telemetry" - "github.com/livekit/livekit-server/pkg/utils" -) - -const ( - initialQualityUpdateWait = 10 * time.Second ) var ( @@ -41,19 +34,9 @@ type MediaTrackSubscriptions struct { subscribedTracksMu sync.RWMutex subscribedTracks map[livekit.ParticipantID]types.SubscribedTrack - // quality level enable/disable - maxQualityLock sync.RWMutex - maxSubscriberQuality map[livekit.ParticipantID]*types.SubscribedCodecQuality - maxSubscriberNodeQuality map[livekit.NodeID][]types.SubscribedCodecQuality - maxSubscribedQuality map[string]livekit.VideoQuality // codec mime -> quality - maxSubscribedQualityDebounce func(func()) - onSubscribedMaxQualityChange func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) - maxQualityTimer *time.Timer - - qualityNotifyOpQueue *utils.OpsQueue - onDownTrackCreated func(downTrack *sfu.DownTrack) onSubscriptionOperationComplete func(sub types.LocalParticipant) + onSubscriberMaxQualityChange func(subscriberID livekit.ParticipantID, codec webrtc.RTPCodecCapability, layer int32) } type MediaTrackSubscriptionsParams struct { @@ -62,7 +45,6 @@ type MediaTrackSubscriptionsParams struct { BufferFactory *buffer.Factory ReceiverConfig ReceiverConfig SubscriberConfig DirectionConfig - VideoConfig config.VideoConfig Telemetry telemetry.TelemetryService @@ -70,35 +52,10 @@ type MediaTrackSubscriptionsParams struct { } func NewMediaTrackSubscriptions(params MediaTrackSubscriptionsParams) *MediaTrackSubscriptions { - t := &MediaTrackSubscriptions{ - params: params, - subscribedTracks: make(map[livekit.ParticipantID]types.SubscribedTrack), - maxSubscriberQuality: make(map[livekit.ParticipantID]*types.SubscribedCodecQuality), - maxSubscriberNodeQuality: make(map[livekit.NodeID][]types.SubscribedCodecQuality), - maxSubscribedQuality: make(map[string]livekit.VideoQuality), - maxSubscribedQualityDebounce: debounce.New(params.VideoConfig.DynacastPauseDelay), - qualityNotifyOpQueue: utils.NewOpsQueue(params.Logger, "quality-notify", 100), + return &MediaTrackSubscriptions{ + params: params, + subscribedTracks: make(map[livekit.ParticipantID]types.SubscribedTrack), } - - return t -} - -func (t *MediaTrackSubscriptions) Start() { - t.qualityNotifyOpQueue.Start() - t.startMaxQualityTimer(false) -} - -func (t *MediaTrackSubscriptions) Restart() { - t.startMaxQualityTimer(true) -} - -func (t *MediaTrackSubscriptions) Stop() { - t.stopMaxQualityTimer() -} - -func (t *MediaTrackSubscriptions) Close() { - t.qualityNotifyOpQueue.Stop() - t.stopMaxQualityTimer() } func (t *MediaTrackSubscriptions) OnDownTrackCreated(f func(downTrack *sfu.DownTrack)) { @@ -109,14 +66,11 @@ func (t *MediaTrackSubscriptions) OnSubscriptionOperationComplete(f func(sub typ t.onSubscriptionOperationComplete = f } -func (t *MediaTrackSubscriptions) SetMuted(muted bool) { - // update quality based on subscription if unmuting. - // This will queue up the current state, but subscriber - // driven changes could update it. - if !muted { - t.UpdateQualityChange(true) - } +func (t *MediaTrackSubscriptions) OnSubscriberMaxQualityChange(f func(subscriberID livekit.ParticipantID, codec webrtc.RTPCodecCapability, layer int32)) { + t.onSubscriberMaxQualityChange = f +} +func (t *MediaTrackSubscriptions) SetMuted(muted bool) { // update mute of all subscribed tracks for _, st := range t.getAllSubscribedTracks() { st.SetPublisherMuted(muted) @@ -131,12 +85,6 @@ func (t *MediaTrackSubscriptions) IsSubscriber(subID livekit.ParticipantID) bool return ok } -func (t *MediaTrackSubscriptions) AddCodec(mime string) { - t.subscribedTracksMu.Lock() - t.maxSubscribedQuality[mime] = livekit.VideoQuality_HIGH - t.subscribedTracksMu.Unlock() -} - // AddSubscriber subscribes sub to current mediaTrack func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *WrappedReceiver) error { trackID := t.params.MediaTrack.ID() @@ -209,8 +157,6 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * // when down track is bound, start loop to send reports go t.sendDownTrackBindingReports(sub) - // initialize to default layer - t.notifySubscriberMaxQuality(subscriberID, downTrack.Codec(), livekit.VideoQuality_HIGH) subTrack.SetPublisherMuted(t.params.MediaTrack.IsMuted()) }) @@ -299,7 +245,9 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * }) downTrack.OnMaxLayerChanged(func(dt *sfu.DownTrack, layer int32) { - t.notifySubscriberMaxQuality(subscriberID, dt.Codec(), utils.QualityForSpatialLayer(layer)) + if t.onSubscriberMaxQualityChange != nil { + t.onSubscriberMaxQualityChange(subscriberID, dt.Codec(), layer) + } }) downTrack.OnRttUpdate(func(_ *sfu.DownTrack, rtt uint32) { @@ -474,254 +422,6 @@ func (t *MediaTrackSubscriptions) DebugInfo() []map[string]interface{} { return subscribedTrackInfo } -func (t *MediaTrackSubscriptions) OnSubscribedMaxQualityChange(f func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality)) { - t.onSubscribedMaxQualityChange = f -} - -func (t *MediaTrackSubscriptions) notifySubscriberMaxQuality(subscriberID livekit.ParticipantID, codec webrtc.RTPCodecCapability, quality livekit.VideoQuality) { - if t.params.MediaTrack.Kind() != livekit.TrackType_VIDEO { - return - } - t.params.Logger.Debugw("notifying subscriber max quality", "subscriberID", subscriberID, "codec", codec, "quality", quality) - - if codec.MimeType == "" { - t.params.Logger.Errorw("codec mime type is empty", nil) - } - - t.maxQualityLock.Lock() - if quality == livekit.VideoQuality_OFF { - _, ok := t.maxSubscriberQuality[subscriberID] - if !ok { - t.maxQualityLock.Unlock() - return - } - - delete(t.maxSubscriberQuality, subscriberID) - } else { - maxQuality, ok := t.maxSubscriberQuality[subscriberID] - if ok { - if maxQuality.Quality == quality && maxQuality.CodecMime == codec.MimeType { - t.maxQualityLock.Unlock() - return - } - maxQuality.CodecMime = codec.MimeType - maxQuality.Quality = quality - } else { - t.maxSubscriberQuality[subscriberID] = &types.SubscribedCodecQuality{ - Quality: quality, - CodecMime: codec.MimeType, - } - } - } - t.maxQualityLock.Unlock() - - go t.UpdateQualityChange(false) -} - -func (t *MediaTrackSubscriptions) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []types.SubscribedCodecQuality) { - if t.params.MediaTrack.Kind() != livekit.TrackType_VIDEO { - return - } - - if len(qualities) == 1 && qualities[0].CodecMime == "" { - // for old version msg don't have codec mime, use first mime type - t.maxQualityLock.RLock() - for mime := range t.maxSubscribedQuality { - qualities[0].CodecMime = mime - break - } - t.maxQualityLock.RUnlock() - } - - t.maxQualityLock.Lock() - if len(qualities) == 0 { - if _, ok := t.maxSubscriberNodeQuality[nodeID]; !ok { - t.maxQualityLock.Unlock() - return - } - delete(t.maxSubscriberNodeQuality, nodeID) - } else { - if maxQualities, ok := t.maxSubscriberNodeQuality[nodeID]; ok { - var matchCounter int - for _, quality := range qualities { - for _, maxQuality := range maxQualities { - if quality == maxQuality { - matchCounter++ - break - } - } - } - - if matchCounter == len(qualities) && matchCounter == len(maxQualities) { - t.maxQualityLock.Unlock() - return - } - } - t.maxSubscriberNodeQuality[nodeID] = qualities - } - t.maxQualityLock.Unlock() - - go t.UpdateQualityChange(false) -} - -func (t *MediaTrackSubscriptions) UpdateQualityChange(force bool) { - if t.params.MediaTrack.Kind() != livekit.TrackType_VIDEO { - return - } - - t.maxQualityLock.Lock() - t.params.Logger.Debugw("updating quality change", - "force", force, - "maxSubscriberQuality", t.maxSubscriberQuality, - "maxSubscriberNodeQuality", t.maxSubscriberNodeQuality, - "maxSubscribedQuality", t.maxSubscribedQuality) - - maxSubscribedQuality := make(map[string]livekit.VideoQuality, len(t.maxSubscribedQuality)) - var changed bool - // reset maxSubscribedQuality - for mime := range t.maxSubscribedQuality { - maxSubscribedQuality[mime] = livekit.VideoQuality_OFF - } - - for _, subQuality := range t.maxSubscriberQuality { - if q, ok := maxSubscribedQuality[subQuality.CodecMime]; ok { - if q == livekit.VideoQuality_OFF || (subQuality.Quality != livekit.VideoQuality_OFF && subQuality.Quality > q) { - maxSubscribedQuality[subQuality.CodecMime] = subQuality.Quality - } - } else { - maxSubscribedQuality[subQuality.CodecMime] = subQuality.Quality - } - } - for _, subQualities := range t.maxSubscriberNodeQuality { - for _, subQuality := range subQualities { - if q, ok := maxSubscribedQuality[subQuality.CodecMime]; ok { - if q == livekit.VideoQuality_OFF || (subQuality.Quality != livekit.VideoQuality_OFF && subQuality.Quality > q) { - maxSubscribedQuality[subQuality.CodecMime] = subQuality.Quality - } - } else { - maxSubscribedQuality[subQuality.CodecMime] = subQuality.Quality - } - } - } - - qualityDowngrades := make(map[string]livekit.VideoQuality, len(t.maxSubscribedQuality)) - noChangeCount := 0 - for mime, q := range maxSubscribedQuality { - origin, ok := t.maxSubscribedQuality[mime] - if !ok { - origin = livekit.VideoQuality_OFF - } - if origin != q { - if q == livekit.VideoQuality_OFF || (origin != livekit.VideoQuality_OFF && origin > q) { - // quality downgrade (or become off), delay notify to publisher - qualityDowngrades[mime] = origin - if force { - t.maxSubscribedQuality[mime] = q - } - } else { - // quality upgrade, update immediately - t.maxSubscribedQuality[mime] = q - } - changed = true - } else { - noChangeCount++ - } - } - t.params.Logger.Debugw("updated quality change", - "changed", changed, - "maxSubscribedQuality", maxSubscribedQuality, - "t.maxSubscribedQuality", t.maxSubscribedQuality, - "qualityDowngrades", qualityDowngrades) - - if !changed && !force { - t.maxQualityLock.Unlock() - return - } - - // if quality downgrade (or become OFF), delay notify to publisher if needed - if len(qualityDowngrades) > 0 && !force { - t.maxSubscribedQualityDebounce(func() { - t.UpdateQualityChange(true) - }) - - // no quality upgrades - if len(qualityDowngrades)+noChangeCount == len(t.maxSubscribedQuality) { - t.maxQualityLock.Unlock() - return - } - } - - subscribedCodec := make([]*livekit.SubscribedCodec, 0, len(t.maxSubscribedQuality)) - maxSubscribedQualities := make([]types.SubscribedCodecQuality, 0, len(t.maxSubscribedQuality)) - for mime, maxQuality := range t.maxSubscribedQuality { - maxSubscribedQualities = append(maxSubscribedQualities, types.SubscribedCodecQuality{ - CodecMime: mime, - Quality: maxQuality, - }) - - if maxQuality == livekit.VideoQuality_OFF { - subscribedCodec = append(subscribedCodec, &livekit.SubscribedCodec{ - Codec: mime, - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: false}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, - }, - }) - } else { - var subscribedQualities []*livekit.SubscribedQuality - for q := livekit.VideoQuality_LOW; q <= livekit.VideoQuality_HIGH; q++ { - subscribedQualities = append(subscribedQualities, &livekit.SubscribedQuality{ - Quality: q, - Enabled: q <= maxQuality, - }) - } - subscribedCodec = append(subscribedCodec, &livekit.SubscribedCodec{ - Codec: mime, - Qualities: subscribedQualities, - }) - } - } - if t.onSubscribedMaxQualityChange != nil { - t.params.Logger.Debugw("subscribedMaxQualityChange", - "subscribedCodec", subscribedCodec, - "maxSubscribedQualities", maxSubscribedQualities) - t.qualityNotifyOpQueue.Enqueue(func() { - t.onSubscribedMaxQualityChange(subscribedCodec, maxSubscribedQualities) - }) - } - t.maxQualityLock.Unlock() -} - -func (t *MediaTrackSubscriptions) startMaxQualityTimer(force bool) { - t.maxQualityLock.Lock() - defer t.maxQualityLock.Unlock() - - if t.params.MediaTrack.Kind() != livekit.TrackType_VIDEO { - return - } - - if t.maxQualityTimer != nil { - t.maxQualityTimer.Stop() - t.maxQualityTimer = nil - } - - t.maxQualityTimer = time.AfterFunc(initialQualityUpdateWait, func() { - t.stopMaxQualityTimer() - t.UpdateQualityChange(force) - }) -} - -func (t *MediaTrackSubscriptions) stopMaxQualityTimer() { - t.maxQualityLock.Lock() - defer t.maxQualityLock.Unlock() - - if t.maxQualityTimer != nil { - t.maxQualityTimer.Stop() - t.maxQualityTimer = nil - } -} - func (t *MediaTrackSubscriptions) downTrackClosed( sub types.LocalParticipant, subTrack types.SubscribedTrack, diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 2ef4132a7..9482a97a1 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -857,7 +857,9 @@ func (p *ParticipantImpl) ICERestart(iceConfig *types.IceConfig) error { return nil } - p.UpTrackManager.Restart() + for _, t := range p.GetPublishedTracks() { + t.(types.LocalMediaTrack).Restart() + } return p.subscriber.CreateAndSendOffer(&webrtc.OfferOptions{ ICERestart: true, @@ -1621,7 +1623,8 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, SubscribedQualities: subscribedQualities[0].Qualities, // for compatible with old client SubscribedCodecs: subscribedQualities, } - // get track's layer dimensions + + // send layer info about max subscription changes to telemetry track := p.UpTrackManager.GetPublishedTrack(trackID) var layerInfo map[livekit.VideoQuality]*livekit.VideoLayer if track != nil { @@ -2333,3 +2336,25 @@ func (p *ParticipantImpl) ClearInProgressAndProcessSubscriptionRequestsQueue(tra go p.ProcessSubscriptionRequestsQueue(trackID) } + +func (p *ParticipantImpl) UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []types.SubscribedCodecQuality) error { + track := p.GetPublishedTrack(trackID) + if track == nil { + p.params.Logger.Warnw("could not find track", nil, "trackID", trackID) + return errors.New("could not find published track") + } + + track.(types.LocalMediaTrack).NotifySubscriberNodeMaxQuality(nodeID, maxQualities) + return nil +} + +func (p *ParticipantImpl) UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error { + track := p.GetPublishedTrack(trackID) + if track == nil { + p.params.Logger.Warnw("could not find track", nil, "trackID", trackID) + return errors.New("could not find published track") + } + + track.(types.LocalMediaTrack).NotifySubscriberNodeMediaLoss(nodeID, uint8(fractionalLoss)) + return nil +} diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index cf5d3d33d..a3928d1a7 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -191,8 +191,6 @@ type Participant interface { resolverBySid func(participantID livekit.ParticipantID) LocalParticipant, ) error UpdateVideoLayers(updateVideoLayers *livekit.UpdateVideoLayers) error - UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []SubscribedCodecQuality) error - UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error DebugInfo() map[string]interface{} } @@ -295,6 +293,9 @@ type LocalParticipant interface { SetICEConfig(iceConfig IceConfig) OnICEConfigChanged(callback func(participant LocalParticipant, iceConfig IceConfig)) + + UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []SubscribedCodecQuality) error + UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error } // Room is a container of participants, and can provide room-level actions @@ -331,8 +332,6 @@ type MediaTrack interface { UpdateVideoLayers(layers []*livekit.VideoLayer) IsSimulcast() bool - Restart() - // callbacks AddOnClose(func()) @@ -348,9 +347,6 @@ type MediaTrack interface { // returns quality information that's appropriate for width & height GetQualityForDimension(width, height uint32) livekit.VideoQuality - NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualites []SubscribedCodecQuality) - NotifySubscriberNodeMediaLoss(nodeID livekit.NodeID, fractionalLoss uint8) - Receivers() []sfu.TrackReceiver } @@ -358,6 +354,8 @@ type MediaTrack interface { type LocalMediaTrack interface { MediaTrack + Restart() + SignalCid() string HasSdpCid(cid string) bool @@ -365,6 +363,9 @@ type LocalMediaTrack interface { GetConnectionScore() float32 SetRTT(rtt uint32) + + NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []SubscribedCodecQuality) + NotifySubscriberNodeMediaLoss(nodeID livekit.NodeID, fractionalLoss uint8) } // MediaTrack is the main interface representing a track published to the room diff --git a/pkg/rtc/types/typesfakes/fake_media_track.go b/pkg/rtc/types/typesfakes/fake_media_track.go index 2593f2786..04ce00dc8 100644 --- a/pkg/rtc/types/typesfakes/fake_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_media_track.go @@ -119,18 +119,6 @@ type FakeMediaTrack struct { nameReturnsOnCall map[int]struct { result1 string } - NotifySubscriberNodeMaxQualityStub func(livekit.NodeID, []types.SubscribedCodecQuality) - notifySubscriberNodeMaxQualityMutex sync.RWMutex - notifySubscriberNodeMaxQualityArgsForCall []struct { - arg1 livekit.NodeID - arg2 []types.SubscribedCodecQuality - } - NotifySubscriberNodeMediaLossStub func(livekit.NodeID, uint8) - notifySubscriberNodeMediaLossMutex sync.RWMutex - notifySubscriberNodeMediaLossArgsForCall []struct { - arg1 livekit.NodeID - arg2 uint8 - } PublisherIDStub func() livekit.ParticipantID publisherIDMutex sync.RWMutex publisherIDArgsForCall []struct { @@ -182,10 +170,6 @@ type FakeMediaTrack struct { arg1 livekit.ParticipantID arg2 bool } - RestartStub func() - restartMutex sync.RWMutex - restartArgsForCall []struct { - } RevokeDisallowedSubscribersStub func([]livekit.ParticipantIdentity) []livekit.ParticipantIdentity revokeDisallowedSubscribersMutex sync.RWMutex revokeDisallowedSubscribersArgsForCall []struct { @@ -818,77 +802,6 @@ func (fake *FakeMediaTrack) NameReturnsOnCall(i int, result1 string) { }{result1} } -func (fake *FakeMediaTrack) NotifySubscriberNodeMaxQuality(arg1 livekit.NodeID, arg2 []types.SubscribedCodecQuality) { - var arg2Copy []types.SubscribedCodecQuality - if arg2 != nil { - arg2Copy = make([]types.SubscribedCodecQuality, len(arg2)) - copy(arg2Copy, arg2) - } - fake.notifySubscriberNodeMaxQualityMutex.Lock() - fake.notifySubscriberNodeMaxQualityArgsForCall = append(fake.notifySubscriberNodeMaxQualityArgsForCall, struct { - arg1 livekit.NodeID - arg2 []types.SubscribedCodecQuality - }{arg1, arg2Copy}) - stub := fake.NotifySubscriberNodeMaxQualityStub - fake.recordInvocation("NotifySubscriberNodeMaxQuality", []interface{}{arg1, arg2Copy}) - fake.notifySubscriberNodeMaxQualityMutex.Unlock() - if stub != nil { - fake.NotifySubscriberNodeMaxQualityStub(arg1, arg2) - } -} - -func (fake *FakeMediaTrack) NotifySubscriberNodeMaxQualityCallCount() int { - fake.notifySubscriberNodeMaxQualityMutex.RLock() - defer fake.notifySubscriberNodeMaxQualityMutex.RUnlock() - return len(fake.notifySubscriberNodeMaxQualityArgsForCall) -} - -func (fake *FakeMediaTrack) NotifySubscriberNodeMaxQualityCalls(stub func(livekit.NodeID, []types.SubscribedCodecQuality)) { - fake.notifySubscriberNodeMaxQualityMutex.Lock() - defer fake.notifySubscriberNodeMaxQualityMutex.Unlock() - fake.NotifySubscriberNodeMaxQualityStub = stub -} - -func (fake *FakeMediaTrack) NotifySubscriberNodeMaxQualityArgsForCall(i int) (livekit.NodeID, []types.SubscribedCodecQuality) { - fake.notifySubscriberNodeMaxQualityMutex.RLock() - defer fake.notifySubscriberNodeMaxQualityMutex.RUnlock() - argsForCall := fake.notifySubscriberNodeMaxQualityArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeMediaTrack) NotifySubscriberNodeMediaLoss(arg1 livekit.NodeID, arg2 uint8) { - fake.notifySubscriberNodeMediaLossMutex.Lock() - fake.notifySubscriberNodeMediaLossArgsForCall = append(fake.notifySubscriberNodeMediaLossArgsForCall, struct { - arg1 livekit.NodeID - arg2 uint8 - }{arg1, arg2}) - stub := fake.NotifySubscriberNodeMediaLossStub - fake.recordInvocation("NotifySubscriberNodeMediaLoss", []interface{}{arg1, arg2}) - fake.notifySubscriberNodeMediaLossMutex.Unlock() - if stub != nil { - fake.NotifySubscriberNodeMediaLossStub(arg1, arg2) - } -} - -func (fake *FakeMediaTrack) NotifySubscriberNodeMediaLossCallCount() int { - fake.notifySubscriberNodeMediaLossMutex.RLock() - defer fake.notifySubscriberNodeMediaLossMutex.RUnlock() - return len(fake.notifySubscriberNodeMediaLossArgsForCall) -} - -func (fake *FakeMediaTrack) NotifySubscriberNodeMediaLossCalls(stub func(livekit.NodeID, uint8)) { - fake.notifySubscriberNodeMediaLossMutex.Lock() - defer fake.notifySubscriberNodeMediaLossMutex.Unlock() - fake.NotifySubscriberNodeMediaLossStub = stub -} - -func (fake *FakeMediaTrack) NotifySubscriberNodeMediaLossArgsForCall(i int) (livekit.NodeID, uint8) { - fake.notifySubscriberNodeMediaLossMutex.RLock() - defer fake.notifySubscriberNodeMediaLossMutex.RUnlock() - argsForCall := fake.notifySubscriberNodeMediaLossArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - func (fake *FakeMediaTrack) PublisherID() livekit.ParticipantID { fake.publisherIDMutex.Lock() ret, specificReturn := fake.publisherIDReturnsOnCall[len(fake.publisherIDArgsForCall)] @@ -1166,30 +1079,6 @@ func (fake *FakeMediaTrack) RemoveSubscriberArgsForCall(i int) (livekit.Particip return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeMediaTrack) Restart() { - fake.restartMutex.Lock() - fake.restartArgsForCall = append(fake.restartArgsForCall, struct { - }{}) - stub := fake.RestartStub - fake.recordInvocation("Restart", []interface{}{}) - fake.restartMutex.Unlock() - if stub != nil { - fake.RestartStub() - } -} - -func (fake *FakeMediaTrack) RestartCallCount() int { - fake.restartMutex.RLock() - defer fake.restartMutex.RUnlock() - return len(fake.restartArgsForCall) -} - -func (fake *FakeMediaTrack) RestartCalls(stub func()) { - fake.restartMutex.Lock() - defer fake.restartMutex.Unlock() - fake.RestartStub = stub -} - func (fake *FakeMediaTrack) RevokeDisallowedSubscribers(arg1 []livekit.ParticipantIdentity) []livekit.ParticipantIdentity { var arg1Copy []livekit.ParticipantIdentity if arg1 != nil { @@ -1456,10 +1345,6 @@ func (fake *FakeMediaTrack) Invocations() map[string][][]interface{} { defer fake.kindMutex.RUnlock() fake.nameMutex.RLock() defer fake.nameMutex.RUnlock() - fake.notifySubscriberNodeMaxQualityMutex.RLock() - defer fake.notifySubscriberNodeMaxQualityMutex.RUnlock() - fake.notifySubscriberNodeMediaLossMutex.RLock() - defer fake.notifySubscriberNodeMediaLossMutex.RUnlock() fake.publisherIDMutex.RLock() defer fake.publisherIDMutex.RUnlock() fake.publisherIdentityMutex.RLock() @@ -1472,8 +1357,6 @@ func (fake *FakeMediaTrack) Invocations() map[string][][]interface{} { defer fake.removeAllSubscribersMutex.RUnlock() fake.removeSubscriberMutex.RLock() defer fake.removeSubscriberMutex.RUnlock() - fake.restartMutex.RLock() - defer fake.restartMutex.RUnlock() fake.revokeDisallowedSubscribersMutex.RLock() defer fake.revokeDisallowedSubscribersMutex.RUnlock() fake.setMutedMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index f560b7f19..70400a314 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -144,32 +144,6 @@ type FakeParticipant struct { toProtoReturnsOnCall map[int]struct { result1 *livekit.ParticipantInfo } - UpdateMediaLossStub func(livekit.NodeID, livekit.TrackID, uint32) error - updateMediaLossMutex sync.RWMutex - updateMediaLossArgsForCall []struct { - arg1 livekit.NodeID - arg2 livekit.TrackID - arg3 uint32 - } - updateMediaLossReturns struct { - result1 error - } - updateMediaLossReturnsOnCall map[int]struct { - result1 error - } - UpdateSubscribedQualityStub func(livekit.NodeID, livekit.TrackID, []types.SubscribedCodecQuality) error - updateSubscribedQualityMutex sync.RWMutex - updateSubscribedQualityArgsForCall []struct { - arg1 livekit.NodeID - arg2 livekit.TrackID - arg3 []types.SubscribedCodecQuality - } - updateSubscribedQualityReturns struct { - result1 error - } - updateSubscribedQualityReturnsOnCall map[int]struct { - result1 error - } UpdateSubscriptionPermissionStub func(*livekit.SubscriptionPermission, *livekit.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) error updateSubscriptionPermissionMutex sync.RWMutex updateSubscriptionPermissionArgsForCall []struct { @@ -904,137 +878,6 @@ func (fake *FakeParticipant) ToProtoReturnsOnCall(i int, result1 *livekit.Partic }{result1} } -func (fake *FakeParticipant) UpdateMediaLoss(arg1 livekit.NodeID, arg2 livekit.TrackID, arg3 uint32) error { - fake.updateMediaLossMutex.Lock() - ret, specificReturn := fake.updateMediaLossReturnsOnCall[len(fake.updateMediaLossArgsForCall)] - fake.updateMediaLossArgsForCall = append(fake.updateMediaLossArgsForCall, struct { - arg1 livekit.NodeID - arg2 livekit.TrackID - arg3 uint32 - }{arg1, arg2, arg3}) - stub := fake.UpdateMediaLossStub - fakeReturns := fake.updateMediaLossReturns - fake.recordInvocation("UpdateMediaLoss", []interface{}{arg1, arg2, arg3}) - fake.updateMediaLossMutex.Unlock() - if stub != nil { - return stub(arg1, arg2, arg3) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeParticipant) UpdateMediaLossCallCount() int { - fake.updateMediaLossMutex.RLock() - defer fake.updateMediaLossMutex.RUnlock() - return len(fake.updateMediaLossArgsForCall) -} - -func (fake *FakeParticipant) UpdateMediaLossCalls(stub func(livekit.NodeID, livekit.TrackID, uint32) error) { - fake.updateMediaLossMutex.Lock() - defer fake.updateMediaLossMutex.Unlock() - fake.UpdateMediaLossStub = stub -} - -func (fake *FakeParticipant) UpdateMediaLossArgsForCall(i int) (livekit.NodeID, livekit.TrackID, uint32) { - fake.updateMediaLossMutex.RLock() - defer fake.updateMediaLossMutex.RUnlock() - argsForCall := fake.updateMediaLossArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 -} - -func (fake *FakeParticipant) UpdateMediaLossReturns(result1 error) { - fake.updateMediaLossMutex.Lock() - defer fake.updateMediaLossMutex.Unlock() - fake.UpdateMediaLossStub = nil - fake.updateMediaLossReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeParticipant) UpdateMediaLossReturnsOnCall(i int, result1 error) { - fake.updateMediaLossMutex.Lock() - defer fake.updateMediaLossMutex.Unlock() - fake.UpdateMediaLossStub = nil - if fake.updateMediaLossReturnsOnCall == nil { - fake.updateMediaLossReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.updateMediaLossReturnsOnCall[i] = struct { - result1 error - }{result1} -} - -func (fake *FakeParticipant) UpdateSubscribedQuality(arg1 livekit.NodeID, arg2 livekit.TrackID, arg3 []types.SubscribedCodecQuality) error { - var arg3Copy []types.SubscribedCodecQuality - if arg3 != nil { - arg3Copy = make([]types.SubscribedCodecQuality, len(arg3)) - copy(arg3Copy, arg3) - } - fake.updateSubscribedQualityMutex.Lock() - ret, specificReturn := fake.updateSubscribedQualityReturnsOnCall[len(fake.updateSubscribedQualityArgsForCall)] - fake.updateSubscribedQualityArgsForCall = append(fake.updateSubscribedQualityArgsForCall, struct { - arg1 livekit.NodeID - arg2 livekit.TrackID - arg3 []types.SubscribedCodecQuality - }{arg1, arg2, arg3Copy}) - stub := fake.UpdateSubscribedQualityStub - fakeReturns := fake.updateSubscribedQualityReturns - fake.recordInvocation("UpdateSubscribedQuality", []interface{}{arg1, arg2, arg3Copy}) - fake.updateSubscribedQualityMutex.Unlock() - if stub != nil { - return stub(arg1, arg2, arg3) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeParticipant) UpdateSubscribedQualityCallCount() int { - fake.updateSubscribedQualityMutex.RLock() - defer fake.updateSubscribedQualityMutex.RUnlock() - return len(fake.updateSubscribedQualityArgsForCall) -} - -func (fake *FakeParticipant) UpdateSubscribedQualityCalls(stub func(livekit.NodeID, livekit.TrackID, []types.SubscribedCodecQuality) error) { - fake.updateSubscribedQualityMutex.Lock() - defer fake.updateSubscribedQualityMutex.Unlock() - fake.UpdateSubscribedQualityStub = stub -} - -func (fake *FakeParticipant) UpdateSubscribedQualityArgsForCall(i int) (livekit.NodeID, livekit.TrackID, []types.SubscribedCodecQuality) { - fake.updateSubscribedQualityMutex.RLock() - defer fake.updateSubscribedQualityMutex.RUnlock() - argsForCall := fake.updateSubscribedQualityArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 -} - -func (fake *FakeParticipant) UpdateSubscribedQualityReturns(result1 error) { - fake.updateSubscribedQualityMutex.Lock() - defer fake.updateSubscribedQualityMutex.Unlock() - fake.UpdateSubscribedQualityStub = nil - fake.updateSubscribedQualityReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeParticipant) UpdateSubscribedQualityReturnsOnCall(i int, result1 error) { - fake.updateSubscribedQualityMutex.Lock() - defer fake.updateSubscribedQualityMutex.Unlock() - fake.UpdateSubscribedQualityStub = nil - if fake.updateSubscribedQualityReturnsOnCall == nil { - fake.updateSubscribedQualityReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.updateSubscribedQualityReturnsOnCall[i] = struct { - result1 error - }{result1} -} - func (fake *FakeParticipant) UpdateSubscriptionPermission(arg1 *livekit.SubscriptionPermission, arg2 *livekit.TimedVersion, arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, arg4 func(participantID livekit.ParticipantID) types.LocalParticipant) error { fake.updateSubscriptionPermissionMutex.Lock() ret, specificReturn := fake.updateSubscriptionPermissionReturnsOnCall[len(fake.updateSubscriptionPermissionArgsForCall)] @@ -1191,10 +1034,6 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.subscriptionPermissionMutex.RUnlock() fake.toProtoMutex.RLock() defer fake.toProtoMutex.RUnlock() - fake.updateMediaLossMutex.RLock() - defer fake.updateMediaLossMutex.RUnlock() - fake.updateSubscribedQualityMutex.RLock() - defer fake.updateSubscribedQualityMutex.RUnlock() fake.updateSubscriptionPermissionMutex.RLock() defer fake.updateSubscriptionPermissionMutex.RUnlock() fake.updateVideoLayersMutex.RLock() diff --git a/pkg/rtc/uptrackmanager.go b/pkg/rtc/uptrackmanager.go index 5edec1f44..40a19e9ed 100644 --- a/pkg/rtc/uptrackmanager.go +++ b/pkg/rtc/uptrackmanager.go @@ -58,12 +58,6 @@ func (u *UpTrackManager) Start() { u.opsQueue.Start() } -func (u *UpTrackManager) Restart() { - for _, t := range u.GetPublishedTracks() { - t.Restart() - } -} - func (u *UpTrackManager) Close(willBeResumed bool) { u.opsQueue.Stop() @@ -288,28 +282,6 @@ func (u *UpTrackManager) UpdateVideoLayers(updateVideoLayers *livekit.UpdateVide return nil } -func (u *UpTrackManager) UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []types.SubscribedCodecQuality) error { - track := u.GetPublishedTrack(trackID) - if track == nil { - u.params.Logger.Warnw("could not find track", nil, "trackID", trackID) - return errors.New("could not find published track") - } - - track.NotifySubscriberNodeMaxQuality(nodeID, maxQualities) - return nil -} - -func (u *UpTrackManager) UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error { - track := u.GetPublishedTrack(trackID) - if track == nil { - u.params.Logger.Warnw("could not find track", nil, "trackID", trackID) - return errors.New("could not find published track") - } - - track.NotifySubscriberNodeMediaLoss(nodeID, uint8(fractionalLoss)) - return nil -} - func (u *UpTrackManager) AddPublishedTrack(track types.MediaTrack) { u.lock.Lock() if _, ok := u.publishedTracks[track.ID()]; !ok { diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index e99f83e5c..f831d9564 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -306,6 +306,9 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.bound.Store(true) d.bindLock.Unlock() + if d.onMaxLayerChanged != nil { + d.onMaxLayerChanged(d, d.MaxLayers().Spatial) + } d.connectionStats.SetTrackSource(d.receiver.TrackSource()) d.connectionStats.Start() d.logger.Debugw("downtrack bound")