Refactor DynacastQuality & MediaLossProxy into separate modules (#894)

* WIP commit

* Refactor media loss proxy

* Use DynacastQuality and MediaLossProxy from MediaTrack

* fix test

* Remove unused param

* Remove unused interfaces

* Move interface methods to local

* Split out DynacastManager

* have to add codec to dynacast manager

* RUnlock

* fix restart

* Adding API to force quality and also maintain closed state

* Address PR comments
This commit is contained in:
Raja Subramanian
2022-08-09 11:47:06 +05:30
committed by GitHub
parent 21ffc03a0c
commit 4d7df612ec
14 changed files with 933 additions and 1058 deletions
+275
View File
@@ -0,0 +1,275 @@
package rtc
import (
"sync"
"time"
"github.com/bep/debounce"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/utils"
)
type DynacastManagerParams struct {
DynacastPauseDelay time.Duration
Logger logger.Logger
}
type DynacastManager struct {
params DynacastManagerParams
lock sync.RWMutex
dynacastQuality map[string]*DynacastQuality // mime type => DynacastQuality
maxSubscribedQuality map[string]livekit.VideoQuality
committedMaxSubscribedQuality map[string]livekit.VideoQuality
maxSubscribedQualityDebounce func(func())
qualityNotifyOpQueue *utils.OpsQueue
isClosed bool
onSubscribedMaxQualityChange func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality)
}
func NewDynacastManager(params DynacastManagerParams) *DynacastManager {
d := &DynacastManager{
params: params,
dynacastQuality: make(map[string]*DynacastQuality),
maxSubscribedQuality: make(map[string]livekit.VideoQuality),
committedMaxSubscribedQuality: make(map[string]livekit.VideoQuality),
maxSubscribedQualityDebounce: debounce.New(params.DynacastPauseDelay),
qualityNotifyOpQueue: utils.NewOpsQueue(params.Logger, "quality-notify", 100),
}
d.qualityNotifyOpQueue.Start()
return d
}
func (d *DynacastManager) OnSubscribedMaxQualityChange(f func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality)) {
d.lock.Lock()
d.onSubscribedMaxQualityChange = f
d.lock.Unlock()
}
func (d *DynacastManager) AddCodec(mime string) {
d.getOrCreateDynacastQuality(mime)
}
func (d *DynacastManager) Restart() {
d.lock.RLock()
dqs := d.getDynacastQualitiesLocked()
d.lock.RUnlock()
for _, dq := range dqs {
dq.Restart()
d.committedMaxSubscribedQuality[dq.MimeType()] = dq.MaxSubscribedQuality()
}
}
func (d *DynacastManager) Close() {
d.qualityNotifyOpQueue.Stop()
d.lock.Lock()
dqs := d.getDynacastQualitiesLocked()
d.dynacastQuality = make(map[string]*DynacastQuality)
d.isClosed = true
d.lock.Unlock()
for _, dq := range dqs {
dq.Stop()
}
}
//
// THere are situations like track unmute or streaming from a sifferent node
// where subscribed quality needs to sent to the provider immediately.
// This bypasses any debouncing and forces a subscribed quality update
// with immediate effect.
//
func (d *DynacastManager) ForceUpdate() {
d.update(true)
}
//
// It is possible for tracks to be in pending close state. When track
// is waiting to be closed, a node is not streaming a track. This can
// be used to force an update announcing that subscribed quality is OFF,
// i.e. indicating not pulling track any more.
//
func (d *DynacastManager) ForceQuality(quality livekit.VideoQuality) {
d.lock.Lock()
defer d.lock.Unlock()
for mime := range d.committedMaxSubscribedQuality {
d.committedMaxSubscribedQuality[mime] = quality
}
d.enqueueSubscribedQualityChange()
}
func (d *DynacastManager) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, mime string, quality livekit.VideoQuality) {
dq := d.getOrCreateDynacastQuality(mime)
if dq != nil {
dq.NotifySubscriberMaxQuality(subscriberID, quality)
}
}
func (d *DynacastManager) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []types.SubscribedCodecQuality) {
for _, quality := range qualities {
dq := d.getOrCreateDynacastQuality(quality.CodecMime)
if dq != nil {
dq.NotifySubscriberNodeMaxQuality(nodeID, quality.Quality)
}
}
}
func (d *DynacastManager) getOrCreateDynacastQuality(mime string) *DynacastQuality {
d.lock.Lock()
defer d.lock.Unlock()
if d.isClosed {
return nil
}
if dq := d.dynacastQuality[mime]; dq != nil {
return dq
}
dq := NewDynacastQuality(DynacastQualityParams{
MimeType: mime,
DynacastPauseDelay: d.params.DynacastPauseDelay,
Logger: d.params.Logger,
})
dq.OnSubscribedMaxQualityChange(func(maxQuality livekit.VideoQuality) {
d.updateMaxQualityForMime(mime, maxQuality)
})
dq.Start()
d.dynacastQuality[mime] = dq
d.committedMaxSubscribedQuality[mime] = dq.MaxSubscribedQuality()
return dq
}
func (d *DynacastManager) getDynacastQualitiesLocked() []*DynacastQuality {
dqs := make([]*DynacastQuality, 0, len(d.dynacastQuality))
for _, dq := range d.dynacastQuality {
dqs = append(dqs, dq)
}
return dqs
}
func (d *DynacastManager) updateMaxQualityForMime(mime string, maxQuality livekit.VideoQuality) {
d.lock.Lock()
d.maxSubscribedQuality[mime] = maxQuality
d.lock.Unlock()
d.update(false)
}
func (d *DynacastManager) update(force bool) {
d.lock.Lock()
// add or remove of a mime triggers an update
changed := len(d.maxSubscribedQuality) != len(d.committedMaxSubscribedQuality)
downgradesOnly := !changed
if !changed {
for mime, quality := range d.maxSubscribedQuality {
if cq, ok := d.committedMaxSubscribedQuality[mime]; ok {
if cq != quality {
changed = true
}
if (cq == livekit.VideoQuality_OFF && quality != livekit.VideoQuality_OFF) || (cq != livekit.VideoQuality_OFF && quality != livekit.VideoQuality_OFF && cq < quality) {
downgradesOnly = false
}
}
}
}
if !force {
if !changed {
d.lock.Unlock()
return
}
if downgradesOnly {
d.params.Logger.Debugw("debouncing quality downgrade",
"committedMaxSubscribedQuality", d.committedMaxSubscribedQuality,
"maxSubscribedQuality", d.maxSubscribedQuality,
)
d.maxSubscribedQualityDebounce(func() {
d.update(true)
})
d.lock.Unlock()
return
}
}
// clear debounce on send
d.maxSubscribedQualityDebounce(func() {})
d.params.Logger.Infow("committing quality change",
"force", force,
"committedMaxSubscribedQuality", d.committedMaxSubscribedQuality,
"maxSubscribedQuality", d.maxSubscribedQuality,
)
// commit change
d.committedMaxSubscribedQuality = make(map[string]livekit.VideoQuality, len(d.maxSubscribedQuality))
for mime, quality := range d.maxSubscribedQuality {
d.committedMaxSubscribedQuality[mime] = quality
}
d.enqueueSubscribedQualityChange()
d.lock.Unlock()
}
func (d *DynacastManager) enqueueSubscribedQualityChange() {
if d.isClosed || d.onSubscribedMaxQualityChange == nil {
return
}
subscribedCodecs := make([]*livekit.SubscribedCodec, 0, len(d.committedMaxSubscribedQuality))
maxSubscribedQualities := make([]types.SubscribedCodecQuality, 0, len(d.committedMaxSubscribedQuality))
for mime, quality := range d.committedMaxSubscribedQuality {
maxSubscribedQualities = append(maxSubscribedQualities, types.SubscribedCodecQuality{
CodecMime: mime,
Quality: quality,
})
if quality == livekit.VideoQuality_OFF {
subscribedCodecs = append(subscribedCodecs, &livekit.SubscribedCodec{
Codec: mime,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
})
} else {
var subscribedQualities []*livekit.SubscribedQuality
for q := livekit.VideoQuality_LOW; q <= livekit.VideoQuality_HIGH; q++ {
subscribedQualities = append(subscribedQualities, &livekit.SubscribedQuality{
Quality: q,
Enabled: q <= quality,
})
}
subscribedCodecs = append(subscribedCodecs, &livekit.SubscribedCodec{
Codec: mime,
Qualities: subscribedQualities,
})
}
}
d.params.Logger.Debugw("subscribedMaxQualityChange",
"subscribedCodecs", subscribedCodecs,
"maxSubscribedQualities", maxSubscribedQualities)
d.qualityNotifyOpQueue.Enqueue(func() {
d.onSubscribedMaxQualityChange(subscribedCodecs, maxSubscribedQualities)
})
}
+271
View File
@@ -0,0 +1,271 @@
package rtc
import (
"sort"
"sync"
"testing"
"time"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/protocol/livekit"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
)
func TestSubscribedMaxQuality(t *testing.T) {
subscribedCodecsAsString := func(c1 []*livekit.SubscribedCodec) string {
sort.Slice(c1, func(i, j int) bool { return c1[i].Codec < c1[j].Codec })
var s1 string
for _, c := range c1 {
s1 += c.String()
}
return s1
}
t.Run("subscribers muted", func(t *testing.T) {
dm := NewDynacastManager(DynacastManagerParams{})
var lock sync.Mutex
actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0)
dm.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) {
lock.Lock()
actualSubscribedQualities = subscribedQualities
lock.Unlock()
})
dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_HIGH)
dm.NotifySubscriberMaxQuality("s2", webrtc.MimeTypeAV1, livekit.VideoQuality_HIGH)
// mute all subscribers of vp8
dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_OFF)
expectedSubscribedQualities := []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: true},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
})
t.Run("subscribers max quality", func(t *testing.T) {
dm := NewDynacastManager(DynacastManagerParams{
DynacastPauseDelay: 100 * time.Millisecond,
})
lock := sync.RWMutex{}
lock.Lock()
actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0)
lock.Unlock()
dm.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) {
lock.Lock()
actualSubscribedQualities = subscribedQualities
lock.Unlock()
})
dm.maxSubscribedQuality = map[string]livekit.VideoQuality{
webrtc.MimeTypeVP8: livekit.VideoQuality_LOW,
webrtc.MimeTypeAV1: livekit.VideoQuality_LOW,
}
dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_HIGH)
dm.NotifySubscriberMaxQuality("s2", webrtc.MimeTypeVP8, livekit.VideoQuality_MEDIUM)
dm.NotifySubscriberMaxQuality("s3", webrtc.MimeTypeAV1, livekit.VideoQuality_MEDIUM)
expectedSubscribedQualities := []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: true},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
// "s1" dropping to MEDIUM should disable HIGH layer
dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_MEDIUM)
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
// "s1" , "s2" , "s3" dropping to LOW should disable HIGH & MEDIUM
dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_LOW)
dm.NotifySubscriberMaxQuality("s2", webrtc.MimeTypeVP8, livekit.VideoQuality_LOW)
dm.NotifySubscriberMaxQuality("s3", webrtc.MimeTypeAV1, livekit.VideoQuality_LOW)
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
// muting "s2" only should not disable all qualities of vp8, no change of expected qualities
dm.NotifySubscriberMaxQuality("s2", webrtc.MimeTypeVP8, livekit.VideoQuality_OFF)
time.Sleep(100 * time.Millisecond)
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
// muting "s1" and s3 also should disable all qualities
dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_OFF)
dm.NotifySubscriberMaxQuality("s3", webrtc.MimeTypeAV1, livekit.VideoQuality_OFF)
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
// unmuting "s1" should enable vp8 previously set max quality
dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_LOW)
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
// a higher quality from a different node should trigger that quality
dm.NotifySubscriberNodeMaxQuality("n1", []types.SubscribedCodecQuality{
{CodecMime: webrtc.MimeTypeVP8, Quality: livekit.VideoQuality_HIGH},
{CodecMime: webrtc.MimeTypeAV1, Quality: livekit.VideoQuality_MEDIUM},
})
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: true},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
})
}
+162
View File
@@ -0,0 +1,162 @@
package rtc
import (
"sync"
"time"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
const (
initialQualityUpdateWait = 10 * time.Second
)
type DynacastQualityParams struct {
MimeType string
DynacastPauseDelay time.Duration
Logger logger.Logger
}
// DynacastQuality manages max subscribed quality of a single receiver of a media track
type DynacastQuality struct {
params DynacastQualityParams
// quality level enable/disable
lock sync.RWMutex
initialized bool
maxSubscriberQuality map[livekit.ParticipantID]livekit.VideoQuality
maxSubscriberNodeQuality map[livekit.NodeID]livekit.VideoQuality
maxSubscribedQuality livekit.VideoQuality
maxQualityTimer *time.Timer
onSubscribedMaxQualityChange func(maxSubscribedQuality livekit.VideoQuality)
}
func NewDynacastQuality(params DynacastQualityParams) *DynacastQuality {
return &DynacastQuality{
params: params,
maxSubscriberQuality: make(map[livekit.ParticipantID]livekit.VideoQuality),
maxSubscriberNodeQuality: make(map[livekit.NodeID]livekit.VideoQuality),
}
}
func (d *DynacastQuality) Start() {
d.reset()
}
func (d *DynacastQuality) Restart() {
d.reset()
}
func (d *DynacastQuality) Stop() {
d.stopMaxQualityTimer()
}
func (d *DynacastQuality) OnSubscribedMaxQualityChange(f func(maxSubscribedQuality livekit.VideoQuality)) {
d.onSubscribedMaxQualityChange = f
}
func (d *DynacastQuality) MimeType() string {
return d.params.MimeType
}
func (d *DynacastQuality) MaxSubscribedQuality() livekit.VideoQuality {
d.lock.RLock()
defer d.lock.RUnlock()
return d.maxSubscribedQuality
}
func (d *DynacastQuality) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, quality livekit.VideoQuality) {
d.lock.Lock()
if quality == livekit.VideoQuality_OFF {
delete(d.maxSubscriberQuality, subscriberID)
} else {
d.maxSubscriberQuality[subscriberID] = quality
}
d.lock.Unlock()
d.updateQualityChange()
}
func (d *DynacastQuality) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quality livekit.VideoQuality) {
d.lock.Lock()
if quality == livekit.VideoQuality_OFF {
delete(d.maxSubscriberNodeQuality, nodeID)
} else {
d.maxSubscriberNodeQuality[nodeID] = quality
}
d.lock.Unlock()
d.updateQualityChange()
}
func (d *DynacastQuality) reset() {
d.lock.Lock()
d.initialized = false
d.maxSubscribedQuality = livekit.VideoQuality_HIGH
d.lock.Unlock()
d.startMaxQualityTimer()
}
func (d *DynacastQuality) updateQualityChange() {
d.lock.Lock()
maxSubscribedQuality := livekit.VideoQuality_OFF
for _, subQuality := range d.maxSubscriberQuality {
if maxSubscribedQuality == livekit.VideoQuality_OFF || (subQuality != livekit.VideoQuality_OFF && subQuality > maxSubscribedQuality) {
maxSubscribedQuality = subQuality
}
}
for _, nodeQuality := range d.maxSubscriberNodeQuality {
if maxSubscribedQuality == livekit.VideoQuality_OFF || (nodeQuality != livekit.VideoQuality_OFF && nodeQuality > maxSubscribedQuality) {
maxSubscribedQuality = nodeQuality
}
}
if maxSubscribedQuality == d.maxSubscribedQuality && d.initialized {
d.lock.Unlock()
return
}
d.initialized = true
d.maxSubscribedQuality = maxSubscribedQuality
d.params.Logger.Infow("notifying quality change",
"mime", d.params.MimeType,
"maxSubscriberQuality", d.maxSubscriberQuality,
"maxSubscriberNodeQuality", d.maxSubscriberNodeQuality,
"maxSubscribedQuality", d.maxSubscribedQuality,
)
onSubscribedMaxQualityChange := d.onSubscribedMaxQualityChange
d.lock.Unlock()
if onSubscribedMaxQualityChange != nil {
onSubscribedMaxQualityChange(maxSubscribedQuality)
}
}
func (d *DynacastQuality) startMaxQualityTimer() {
d.lock.Lock()
defer d.lock.Unlock()
if d.maxQualityTimer != nil {
d.maxQualityTimer.Stop()
d.maxQualityTimer = nil
}
d.maxQualityTimer = time.AfterFunc(initialQualityUpdateWait, func() {
d.stopMaxQualityTimer()
d.updateQualityChange()
})
}
func (d *DynacastQuality) stopMaxQualityTimer() {
d.lock.Lock()
defer d.lock.Unlock()
if d.maxQualityTimer != nil {
d.maxQualityTimer.Stop()
d.maxQualityTimer = nil
}
}
+87
View File
@@ -0,0 +1,87 @@
package rtc
import (
"sync"
"time"
"github.com/pion/rtcp"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu"
)
const (
downLostUpdateDelta = time.Second
)
type MediaLossProxyParams struct {
Logger logger.Logger
}
type MediaLossProxy struct {
params MediaLossProxyParams
lock sync.Mutex
maxDownFracLost uint8
maxDownFracLostTs time.Time
onMediaLossUpdate func(fractionalLoss uint8)
}
func NewMediaLossProxy(params MediaLossProxyParams) *MediaLossProxy {
return &MediaLossProxy{params: params}
}
func (m *MediaLossProxy) OnMediaLossUpdate(f func(fractionalLoss uint8)) {
m.lock.Lock()
m.onMediaLossUpdate = f
m.lock.Unlock()
}
func (m *MediaLossProxy) HandleMaxLossFeedback(_ *sfu.DownTrack, report *rtcp.ReceiverReport) {
m.lock.Lock()
for _, rr := range report.Reports {
if m.maxDownFracLost < rr.FractionLost {
m.maxDownFracLost = rr.FractionLost
}
}
m.lock.Unlock()
m.maybeUpdateLoss()
}
func (m *MediaLossProxy) NotifySubscriberNodeMediaLoss(_nodeID livekit.NodeID, fractionalLoss uint8) {
m.lock.Lock()
if m.maxDownFracLost < fractionalLoss {
m.maxDownFracLost = fractionalLoss
}
m.lock.Unlock()
m.maybeUpdateLoss()
}
func (m *MediaLossProxy) maybeUpdateLoss() {
var (
shouldUpdate bool
maxLost uint8
)
m.lock.Lock()
now := time.Now()
if now.Sub(m.maxDownFracLostTs) > downLostUpdateDelta {
shouldUpdate = true
maxLost = m.maxDownFracLost
m.maxDownFracLost = 0
m.maxDownFracLostTs = now
}
onMediaLossUpdate := m.onMediaLossUpdate
m.lock.Unlock()
if shouldUpdate {
if onMediaLossUpdate != nil {
onMediaLossUpdate(maxLost)
}
}
}
+69 -8
View File
@@ -14,10 +14,12 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/twcc"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/utils"
)
// MediaTrack represents a WebRTC track that needs to be forwarded
@@ -28,6 +30,9 @@ type MediaTrack struct {
buffer *buffer.Buffer
*MediaTrackReceiver
*MediaLossProxy
dynacastManager *DynacastManager
lock sync.RWMutex
}
@@ -55,6 +60,10 @@ type MediaTrackParams struct {
func NewMediaTrack(params MediaTrackParams) *MediaTrack {
t := &MediaTrack{
params: params,
dynacastManager: NewDynacastManager(DynacastManagerParams{
DynacastPauseDelay: params.VideoConfig.DynacastPauseDelay,
Logger: params.Logger,
}),
}
t.MediaTrackReceiver = NewMediaTrackReceiver(MediaTrackReceiverParams{
@@ -66,16 +75,9 @@ func NewMediaTrack(params MediaTrackParams) *MediaTrack {
BufferFactory: params.BufferFactory,
ReceiverConfig: params.ReceiverConfig,
SubscriberConfig: params.SubscriberConfig,
VideoConfig: params.VideoConfig,
Telemetry: params.Telemetry,
Logger: params.Logger,
})
t.MediaTrackReceiver.OnMediaLossUpdate(func(fractionalLoss uint8) {
if t.buffer != nil && t.Kind() == livekit.TrackType_AUDIO {
// ok to access buffer since receivers are added before subscribers
t.buffer.SetLastFractionLostReport(fractionalLoss)
}
})
t.MediaTrackReceiver.OnVideoLayerUpdate(func(layers []*livekit.VideoLayer) {
t.params.Telemetry.TrackPublishedUpdate(context.Background(), t.PublisherID(),
&livekit.TrackInfo{
@@ -87,9 +89,45 @@ func NewMediaTrack(params MediaTrackParams) *MediaTrack {
})
})
t.MediaLossProxy = NewMediaLossProxy(MediaLossProxyParams{
Logger: params.Logger,
})
t.MediaLossProxy.OnMediaLossUpdate(func(fractionalLoss uint8) {
if t.buffer != nil && t.Kind() == livekit.TrackType_AUDIO {
// ok to access buffer since receivers are added before subscribers
t.buffer.SetLastFractionLostReport(fractionalLoss)
}
})
t.MediaTrackReceiver.OnSetupReceiver(func(mime string) {
t.dynacastManager.AddCodec(mime)
})
t.MediaTrackReceiver.OnSubscriberMaxQualityChange(func(subscriberID livekit.ParticipantID, codec webrtc.RTPCodecCapability, layer int32) {
t.dynacastManager.NotifySubscriberMaxQuality(subscriberID, codec.MimeType, utils.QualityForSpatialLayer(layer))
})
t.MediaTrackReceiver.OnMediaLossFeedback(t.MediaLossProxy.HandleMaxLossFeedback)
return t
}
func (t *MediaTrack) OnSubscribedMaxQualityChange(f func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) error) {
t.dynacastManager.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) {
if f != nil && !t.IsMuted() {
_ = f(t.ID(), subscribedQualities, maxSubscribedQualities)
}
for _, q := range maxSubscribedQualities {
receiver := t.Receiver(q.CodecMime)
if receiver != nil {
receiver.SetMaxExpectedSpatialLayer(utils.SpatialLayerForQuality(q.Quality))
}
}
})
}
func (t *MediaTrack) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []types.SubscribedCodecQuality) {
t.dynacastManager.NotifySubscriberNodeMaxQuality(nodeID, qualities)
}
func (t *MediaTrack) SignalCid() string {
return t.params.SignalCid
}
@@ -130,7 +168,6 @@ func (t *MediaTrack) SetPendingCodecSid(codecs []*livekit.SimulcastCodec) {
// AddReceiver adds a new RTP receiver to the track, returns true when receiver represents a new codec
func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, twcc *twcc.Responder, mid string) bool {
var newCodec bool
buff, rtcpReader := t.params.BufferFactory.GetBufferPair(uint32(track.SSRC()))
if buff == nil || rtcpReader == nil {
@@ -183,6 +220,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
t.RemoveAllSubscribers(false)
t.MediaTrackReceiver.ClearReceiver(mime)
if t.MediaTrackReceiver.TryClose() {
t.dynacastManager.Close()
t.params.Telemetry.TrackUnpublished(
context.Background(),
t.PublisherID(),
@@ -281,3 +319,26 @@ func (t *MediaTrack) OnMaxLayerChange(maxLayer int32) {
}
t.params.Telemetry.TrackPublishedUpdate(context.Background(), t.PublisherID(), ti)
}
func (t *MediaTrack) Restart() {
t.MediaTrackReceiver.Restart()
t.dynacastManager.Restart()
}
func (t *MediaTrack) Close() {
t.dynacastManager.Close()
t.MediaTrackReceiver.Close()
}
func (t *MediaTrack) SetMuted(muted bool) {
// update quality based on subscription if unmuting.
// This will queue up the current state, but subscriber
// driven changes could update it.
if !muted {
t.dynacastManager.ForceUpdate()
}
t.MediaTrackReceiver.SetMuted(muted)
}
-336
View File
@@ -1,18 +1,11 @@
package rtc
import (
"sort"
"sync"
"testing"
"time"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
"github.com/livekit/protocol/livekit"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc/types"
)
func TestTrackInfo(t *testing.T) {
@@ -106,332 +99,3 @@ func TestGetQualityForDimension(t *testing.T) {
require.Equal(t, livekit.VideoQuality_HIGH, mt.GetQualityForDimension(1000, 700))
})
}
func TestSubscribedMaxQuality(t *testing.T) {
subscribedCodecsAsString := func(c1 []*livekit.SubscribedCodec) string {
sort.Slice(c1, func(i, j int) bool { return c1[i].Codec < c1[j].Codec })
var s1 string
for _, c := range c1 {
s1 += c.String()
}
return s1
}
t.Run("subscribers muted", func(t *testing.T) {
mt := NewMediaTrack(MediaTrackParams{
TrackInfo: &livekit.TrackInfo{
Sid: "v1",
Type: livekit.TrackType_VIDEO,
Width: 1080,
Height: 720,
Layers: []*livekit.VideoLayer{
{
Quality: livekit.VideoQuality_LOW,
Width: 480,
Height: 270,
},
{
Quality: livekit.VideoQuality_MEDIUM,
Width: 960,
Height: 540,
},
{
Quality: livekit.VideoQuality_HIGH,
Width: 1080,
Height: 720,
},
},
},
})
mt.Start()
var lock sync.Mutex
actualTrackID := livekit.TrackID("")
actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0)
mt.OnSubscribedMaxQualityChange(func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) error {
lock.Lock()
actualTrackID = trackID
actualSubscribedQualities = subscribedQualities
lock.Unlock()
return nil
})
mt.AddCodec(webrtc.MimeTypeVP8)
mt.AddCodec(webrtc.MimeTypeAV1)
mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_HIGH)
mt.notifySubscriberMaxQuality("s2", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeAV1}, livekit.VideoQuality_HIGH)
// mute all subscribers of vp8
mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_OFF)
expectedSubscribedQualities := []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: true},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return livekit.TrackID("v1") == actualTrackID &&
subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
})
t.Run("subscribers max quality", func(t *testing.T) {
mt := NewMediaTrack(MediaTrackParams{
TrackInfo: &livekit.TrackInfo{
Sid: "v1",
Type: livekit.TrackType_VIDEO,
Width: 1080,
Height: 720,
Layers: []*livekit.VideoLayer{
{
Quality: livekit.VideoQuality_LOW,
Width: 480,
Height: 270,
},
{
Quality: livekit.VideoQuality_MEDIUM,
Width: 960,
Height: 540,
},
{
Quality: livekit.VideoQuality_HIGH,
Width: 1080,
Height: 720,
},
},
},
VideoConfig: config.VideoConfig{
DynacastPauseDelay: 100 * time.Millisecond,
},
})
mt.Start()
mt.AddCodec(webrtc.MimeTypeVP8)
mt.AddCodec(webrtc.MimeTypeAV1)
lock := sync.RWMutex{}
lock.Lock()
actualTrackID := livekit.TrackID("")
actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0)
lock.Unlock()
mt.OnSubscribedMaxQualityChange(func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) error {
lock.Lock()
actualTrackID = trackID
actualSubscribedQualities = subscribedQualities
lock.Unlock()
return nil
})
mt.maxSubscribedQuality = map[string]livekit.VideoQuality{
webrtc.MimeTypeVP8: livekit.VideoQuality_LOW,
webrtc.MimeTypeAV1: livekit.VideoQuality_LOW,
}
mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_HIGH)
mt.notifySubscriberMaxQuality("s2", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_MEDIUM)
mt.notifySubscriberMaxQuality("s3", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeAV1}, livekit.VideoQuality_MEDIUM)
expectedSubscribedQualities := []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: true},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return livekit.TrackID("v1") == actualTrackID &&
subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
// "s1" dropping to MEDIUM should disable HIGH layer
mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_MEDIUM)
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return livekit.TrackID("v1") == actualTrackID &&
subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
// "s1" , "s2" , "s3" dropping to LOW should disable HIGH & MEDIUM
mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_LOW)
mt.notifySubscriberMaxQuality("s2", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_LOW)
mt.notifySubscriberMaxQuality("s3", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeAV1}, livekit.VideoQuality_LOW)
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return livekit.TrackID("v1") == actualTrackID &&
subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
// muting "s2" only should not disable all qualities of vp8, no change of expected qualities
mt.notifySubscriberMaxQuality("s2", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_OFF)
time.Sleep(100 * time.Millisecond)
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return livekit.TrackID("v1") == actualTrackID &&
subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
// muting "s1" and s3 also should disable all qualities
mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_OFF)
mt.notifySubscriberMaxQuality("s3", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeAV1}, livekit.VideoQuality_OFF)
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return livekit.TrackID("v1") == actualTrackID &&
subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
// unmuting "s1" should enable vp8 previously set max quality
mt.notifySubscriberMaxQuality("s1", webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, livekit.VideoQuality_LOW)
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return livekit.TrackID("v1") == actualTrackID &&
subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
// a higher quality from a different node should trigger that quality
mt.NotifySubscriberNodeMaxQuality("n1", []types.SubscribedCodecQuality{
{CodecMime: webrtc.MimeTypeVP8, Quality: livekit.VideoQuality_HIGH},
{CodecMime: webrtc.MimeTypeAV1, Quality: livekit.VideoQuality_MEDIUM},
})
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: true},
},
},
{
Codec: webrtc.MimeTypeAV1,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return livekit.TrackID("v1") == actualTrackID &&
subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
})
}
+20 -88
View File
@@ -5,7 +5,6 @@ import (
"sort"
"strings"
"sync"
"time"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
@@ -15,7 +14,6 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
@@ -24,7 +22,6 @@ import (
)
const (
downLostUpdateDelta = time.Second
layerSelectionTolerance = 0.9
)
@@ -51,7 +48,6 @@ type MediaTrackReceiverParams struct {
BufferFactory *buffer.Factory
ReceiverConfig ReceiverConfig
SubscriberConfig DirectionConfig
VideoConfig config.VideoConfig
Telemetry telemetry.TelemetryService
Logger logger.Logger
}
@@ -69,14 +65,10 @@ type MediaTrackReceiver struct {
potentialCodecs []webrtc.RTPCodecParameters
pendingSubscribeOp map[livekit.ParticipantID]int
// track audio fraction lost
downFracLostLock sync.Mutex
maxDownFracLost uint8
maxDownFracLostTs time.Time
onMediaLossUpdate func(fractionalLoss uint8)
onVideoLayerUpdate func(layers []*livekit.VideoLayer)
onClose []func()
onSetupReceiver func(mime string)
onMediaLossFeedback func(dt *sfu.DownTrack, report *rtcp.ReceiverReport)
onVideoLayerUpdate func(layers []*livekit.VideoLayer)
onClose []func()
*MediaTrackSubscriptions
}
@@ -94,7 +86,6 @@ func NewMediaTrackReceiver(params MediaTrackReceiverParams) *MediaTrackReceiver
BufferFactory: params.BufferFactory,
ReceiverConfig: params.ReceiverConfig,
SubscriberConfig: params.SubscriberConfig,
VideoConfig: t.params.VideoConfig,
Telemetry: params.Telemetry,
Logger: params.Logger,
})
@@ -124,8 +115,12 @@ func (t *MediaTrackReceiver) Restart() {
for _, receiver := range receivers {
receiver.SetMaxExpectedSpatialLayer(utils.SpatialLayerForQuality(livekit.VideoQuality_HIGH))
}
}
t.MediaTrackSubscriptions.Restart()
func (t *MediaTrackReceiver) OnSetupReceiver(f func(mime string)) {
t.lock.Lock()
t.onSetupReceiver = f
t.lock.Unlock()
}
func (t *MediaTrackReceiver) SetupReceiver(receiver sfu.TrackReceiver, priority int, mid string) {
@@ -172,12 +167,13 @@ func (t *MediaTrackReceiver) SetupReceiver(receiver sfu.TrackReceiver, priority
t.shadowReceiversLocked()
onSetupReceiver := t.onSetupReceiver
t.params.Logger.Debugw("setup receiver", "mime", receiver.Codec().MimeType, "priority", priority, "receivers", t.receiversShadow)
t.lock.Unlock()
t.MediaTrackSubscriptions.AddCodec(receiver.Codec().MimeType)
t.MediaTrackSubscriptions.Start()
if onSetupReceiver != nil {
onSetupReceiver(receiver.Codec().MimeType)
}
}
func (t *MediaTrackReceiver) SetPotentialCodecs(codecs []webrtc.RTPCodecParameters, headers []webrtc.RTPHeaderExtensionParameter) {
@@ -238,13 +234,7 @@ func (t *MediaTrackReceiver) ClearReceiver(mime string) {
}
t.shadowReceiversLocked()
stopSubscription := len(t.receiversShadow) == 0
t.lock.Unlock()
if stopSubscription {
t.MediaTrackSubscriptions.Stop()
}
}
func (t *MediaTrackReceiver) ClearAllReceivers() {
@@ -252,11 +242,10 @@ func (t *MediaTrackReceiver) ClearAllReceivers() {
t.receivers = t.receivers[:0]
t.receiversShadow = nil
t.lock.Unlock()
t.MediaTrackSubscriptions.Stop()
}
func (t *MediaTrackReceiver) OnMediaLossUpdate(f func(fractionalLoss uint8)) {
t.onMediaLossUpdate = f
func (t *MediaTrackReceiver) OnMediaLossFeedback(f func(dt *sfu.DownTrack, rr *rtcp.ReceiverReport)) {
t.onMediaLossFeedback = f
}
func (t *MediaTrackReceiver) OnVideoLayerUpdate(f func(layers []*livekit.VideoLayer)) {
@@ -280,7 +269,6 @@ func (t *MediaTrackReceiver) Close() {
onclose := t.onClose
t.lock.RUnlock()
t.MediaTrackSubscriptions.Close()
for _, f := range onclose {
f()
}
@@ -662,53 +650,11 @@ func (t *MediaTrackReceiver) GetAudioLevel() (float64, bool) {
func (t *MediaTrackReceiver) onDownTrackCreated(downTrack *sfu.DownTrack) {
if t.Kind() == livekit.TrackType_AUDIO {
downTrack.AddReceiverReportListener(t.handleMaxLossFeedback)
}
}
// handles max loss for audio streams
func (t *MediaTrackReceiver) handleMaxLossFeedback(_ *sfu.DownTrack, report *rtcp.ReceiverReport) {
t.downFracLostLock.Lock()
for _, rr := range report.Reports {
if t.maxDownFracLost < rr.FractionLost {
t.maxDownFracLost = rr.FractionLost
}
}
t.downFracLostLock.Unlock()
t.maybeUpdateLoss()
}
func (t *MediaTrackReceiver) NotifySubscriberNodeMediaLoss(_nodeID livekit.NodeID, fractionalLoss uint8) {
t.downFracLostLock.Lock()
if t.maxDownFracLost < fractionalLoss {
t.maxDownFracLost = fractionalLoss
}
t.downFracLostLock.Unlock()
t.maybeUpdateLoss()
}
func (t *MediaTrackReceiver) maybeUpdateLoss() {
var (
shouldUpdate bool
maxLost uint8
)
t.downFracLostLock.Lock()
now := time.Now()
if now.Sub(t.maxDownFracLostTs) > downLostUpdateDelta {
shouldUpdate = true
maxLost = t.maxDownFracLost
t.maxDownFracLost = 0
t.maxDownFracLostTs = now
}
t.downFracLostLock.Unlock()
if shouldUpdate {
if t.onMediaLossUpdate != nil {
t.onMediaLossUpdate(maxLost)
}
downTrack.AddReceiverReportListener(func(dt *sfu.DownTrack, rr *rtcp.ReceiverReport) {
if t.onMediaLossFeedback != nil {
t.onMediaLossFeedback(dt, rr)
}
})
}
}
@@ -782,20 +728,6 @@ func (t *MediaTrackReceiver) SetRTT(rtt uint32) {
}
}
func (t *MediaTrackReceiver) OnSubscribedMaxQualityChange(f func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) error) {
t.MediaTrackSubscriptions.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) {
if f != nil && !t.IsMuted() {
_ = f(t.ID(), subscribedQualities, maxSubscribedQualities)
}
for _, q := range maxSubscribedQualities {
receiver := t.Receiver(q.CodecMime)
if receiver != nil {
receiver.SetMaxExpectedSpatialLayer(utils.SpatialLayerForQuality(q.Quality))
}
}
})
}
// ---------------------------
func VideoQualityToRID(q livekit.VideoQuality) string {
+11 -311
View File
@@ -6,7 +6,6 @@ import (
"sync"
"time"
"github.com/bep/debounce"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/rtcerr"
@@ -15,16 +14,10 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/utils"
)
const (
initialQualityUpdateWait = 10 * time.Second
)
var (
@@ -41,19 +34,9 @@ type MediaTrackSubscriptions struct {
subscribedTracksMu sync.RWMutex
subscribedTracks map[livekit.ParticipantID]types.SubscribedTrack
// quality level enable/disable
maxQualityLock sync.RWMutex
maxSubscriberQuality map[livekit.ParticipantID]*types.SubscribedCodecQuality
maxSubscriberNodeQuality map[livekit.NodeID][]types.SubscribedCodecQuality
maxSubscribedQuality map[string]livekit.VideoQuality // codec mime -> quality
maxSubscribedQualityDebounce func(func())
onSubscribedMaxQualityChange func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality)
maxQualityTimer *time.Timer
qualityNotifyOpQueue *utils.OpsQueue
onDownTrackCreated func(downTrack *sfu.DownTrack)
onSubscriptionOperationComplete func(sub types.LocalParticipant)
onSubscriberMaxQualityChange func(subscriberID livekit.ParticipantID, codec webrtc.RTPCodecCapability, layer int32)
}
type MediaTrackSubscriptionsParams struct {
@@ -62,7 +45,6 @@ type MediaTrackSubscriptionsParams struct {
BufferFactory *buffer.Factory
ReceiverConfig ReceiverConfig
SubscriberConfig DirectionConfig
VideoConfig config.VideoConfig
Telemetry telemetry.TelemetryService
@@ -70,35 +52,10 @@ type MediaTrackSubscriptionsParams struct {
}
func NewMediaTrackSubscriptions(params MediaTrackSubscriptionsParams) *MediaTrackSubscriptions {
t := &MediaTrackSubscriptions{
params: params,
subscribedTracks: make(map[livekit.ParticipantID]types.SubscribedTrack),
maxSubscriberQuality: make(map[livekit.ParticipantID]*types.SubscribedCodecQuality),
maxSubscriberNodeQuality: make(map[livekit.NodeID][]types.SubscribedCodecQuality),
maxSubscribedQuality: make(map[string]livekit.VideoQuality),
maxSubscribedQualityDebounce: debounce.New(params.VideoConfig.DynacastPauseDelay),
qualityNotifyOpQueue: utils.NewOpsQueue(params.Logger, "quality-notify", 100),
return &MediaTrackSubscriptions{
params: params,
subscribedTracks: make(map[livekit.ParticipantID]types.SubscribedTrack),
}
return t
}
func (t *MediaTrackSubscriptions) Start() {
t.qualityNotifyOpQueue.Start()
t.startMaxQualityTimer(false)
}
func (t *MediaTrackSubscriptions) Restart() {
t.startMaxQualityTimer(true)
}
func (t *MediaTrackSubscriptions) Stop() {
t.stopMaxQualityTimer()
}
func (t *MediaTrackSubscriptions) Close() {
t.qualityNotifyOpQueue.Stop()
t.stopMaxQualityTimer()
}
func (t *MediaTrackSubscriptions) OnDownTrackCreated(f func(downTrack *sfu.DownTrack)) {
@@ -109,14 +66,11 @@ func (t *MediaTrackSubscriptions) OnSubscriptionOperationComplete(f func(sub typ
t.onSubscriptionOperationComplete = f
}
func (t *MediaTrackSubscriptions) SetMuted(muted bool) {
// update quality based on subscription if unmuting.
// This will queue up the current state, but subscriber
// driven changes could update it.
if !muted {
t.UpdateQualityChange(true)
}
func (t *MediaTrackSubscriptions) OnSubscriberMaxQualityChange(f func(subscriberID livekit.ParticipantID, codec webrtc.RTPCodecCapability, layer int32)) {
t.onSubscriberMaxQualityChange = f
}
func (t *MediaTrackSubscriptions) SetMuted(muted bool) {
// update mute of all subscribed tracks
for _, st := range t.getAllSubscribedTracks() {
st.SetPublisherMuted(muted)
@@ -131,12 +85,6 @@ func (t *MediaTrackSubscriptions) IsSubscriber(subID livekit.ParticipantID) bool
return ok
}
func (t *MediaTrackSubscriptions) AddCodec(mime string) {
t.subscribedTracksMu.Lock()
t.maxSubscribedQuality[mime] = livekit.VideoQuality_HIGH
t.subscribedTracksMu.Unlock()
}
// AddSubscriber subscribes sub to current mediaTrack
func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *WrappedReceiver) error {
trackID := t.params.MediaTrack.ID()
@@ -209,8 +157,6 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
// when down track is bound, start loop to send reports
go t.sendDownTrackBindingReports(sub)
// initialize to default layer
t.notifySubscriberMaxQuality(subscriberID, downTrack.Codec(), livekit.VideoQuality_HIGH)
subTrack.SetPublisherMuted(t.params.MediaTrack.IsMuted())
})
@@ -299,7 +245,9 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
})
downTrack.OnMaxLayerChanged(func(dt *sfu.DownTrack, layer int32) {
t.notifySubscriberMaxQuality(subscriberID, dt.Codec(), utils.QualityForSpatialLayer(layer))
if t.onSubscriberMaxQualityChange != nil {
t.onSubscriberMaxQualityChange(subscriberID, dt.Codec(), layer)
}
})
downTrack.OnRttUpdate(func(_ *sfu.DownTrack, rtt uint32) {
@@ -474,254 +422,6 @@ func (t *MediaTrackSubscriptions) DebugInfo() []map[string]interface{} {
return subscribedTrackInfo
}
func (t *MediaTrackSubscriptions) OnSubscribedMaxQualityChange(f func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality)) {
t.onSubscribedMaxQualityChange = f
}
func (t *MediaTrackSubscriptions) notifySubscriberMaxQuality(subscriberID livekit.ParticipantID, codec webrtc.RTPCodecCapability, quality livekit.VideoQuality) {
if t.params.MediaTrack.Kind() != livekit.TrackType_VIDEO {
return
}
t.params.Logger.Debugw("notifying subscriber max quality", "subscriberID", subscriberID, "codec", codec, "quality", quality)
if codec.MimeType == "" {
t.params.Logger.Errorw("codec mime type is empty", nil)
}
t.maxQualityLock.Lock()
if quality == livekit.VideoQuality_OFF {
_, ok := t.maxSubscriberQuality[subscriberID]
if !ok {
t.maxQualityLock.Unlock()
return
}
delete(t.maxSubscriberQuality, subscriberID)
} else {
maxQuality, ok := t.maxSubscriberQuality[subscriberID]
if ok {
if maxQuality.Quality == quality && maxQuality.CodecMime == codec.MimeType {
t.maxQualityLock.Unlock()
return
}
maxQuality.CodecMime = codec.MimeType
maxQuality.Quality = quality
} else {
t.maxSubscriberQuality[subscriberID] = &types.SubscribedCodecQuality{
Quality: quality,
CodecMime: codec.MimeType,
}
}
}
t.maxQualityLock.Unlock()
go t.UpdateQualityChange(false)
}
func (t *MediaTrackSubscriptions) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []types.SubscribedCodecQuality) {
if t.params.MediaTrack.Kind() != livekit.TrackType_VIDEO {
return
}
if len(qualities) == 1 && qualities[0].CodecMime == "" {
// for old version msg don't have codec mime, use first mime type
t.maxQualityLock.RLock()
for mime := range t.maxSubscribedQuality {
qualities[0].CodecMime = mime
break
}
t.maxQualityLock.RUnlock()
}
t.maxQualityLock.Lock()
if len(qualities) == 0 {
if _, ok := t.maxSubscriberNodeQuality[nodeID]; !ok {
t.maxQualityLock.Unlock()
return
}
delete(t.maxSubscriberNodeQuality, nodeID)
} else {
if maxQualities, ok := t.maxSubscriberNodeQuality[nodeID]; ok {
var matchCounter int
for _, quality := range qualities {
for _, maxQuality := range maxQualities {
if quality == maxQuality {
matchCounter++
break
}
}
}
if matchCounter == len(qualities) && matchCounter == len(maxQualities) {
t.maxQualityLock.Unlock()
return
}
}
t.maxSubscriberNodeQuality[nodeID] = qualities
}
t.maxQualityLock.Unlock()
go t.UpdateQualityChange(false)
}
func (t *MediaTrackSubscriptions) UpdateQualityChange(force bool) {
if t.params.MediaTrack.Kind() != livekit.TrackType_VIDEO {
return
}
t.maxQualityLock.Lock()
t.params.Logger.Debugw("updating quality change",
"force", force,
"maxSubscriberQuality", t.maxSubscriberQuality,
"maxSubscriberNodeQuality", t.maxSubscriberNodeQuality,
"maxSubscribedQuality", t.maxSubscribedQuality)
maxSubscribedQuality := make(map[string]livekit.VideoQuality, len(t.maxSubscribedQuality))
var changed bool
// reset maxSubscribedQuality
for mime := range t.maxSubscribedQuality {
maxSubscribedQuality[mime] = livekit.VideoQuality_OFF
}
for _, subQuality := range t.maxSubscriberQuality {
if q, ok := maxSubscribedQuality[subQuality.CodecMime]; ok {
if q == livekit.VideoQuality_OFF || (subQuality.Quality != livekit.VideoQuality_OFF && subQuality.Quality > q) {
maxSubscribedQuality[subQuality.CodecMime] = subQuality.Quality
}
} else {
maxSubscribedQuality[subQuality.CodecMime] = subQuality.Quality
}
}
for _, subQualities := range t.maxSubscriberNodeQuality {
for _, subQuality := range subQualities {
if q, ok := maxSubscribedQuality[subQuality.CodecMime]; ok {
if q == livekit.VideoQuality_OFF || (subQuality.Quality != livekit.VideoQuality_OFF && subQuality.Quality > q) {
maxSubscribedQuality[subQuality.CodecMime] = subQuality.Quality
}
} else {
maxSubscribedQuality[subQuality.CodecMime] = subQuality.Quality
}
}
}
qualityDowngrades := make(map[string]livekit.VideoQuality, len(t.maxSubscribedQuality))
noChangeCount := 0
for mime, q := range maxSubscribedQuality {
origin, ok := t.maxSubscribedQuality[mime]
if !ok {
origin = livekit.VideoQuality_OFF
}
if origin != q {
if q == livekit.VideoQuality_OFF || (origin != livekit.VideoQuality_OFF && origin > q) {
// quality downgrade (or become off), delay notify to publisher
qualityDowngrades[mime] = origin
if force {
t.maxSubscribedQuality[mime] = q
}
} else {
// quality upgrade, update immediately
t.maxSubscribedQuality[mime] = q
}
changed = true
} else {
noChangeCount++
}
}
t.params.Logger.Debugw("updated quality change",
"changed", changed,
"maxSubscribedQuality", maxSubscribedQuality,
"t.maxSubscribedQuality", t.maxSubscribedQuality,
"qualityDowngrades", qualityDowngrades)
if !changed && !force {
t.maxQualityLock.Unlock()
return
}
// if quality downgrade (or become OFF), delay notify to publisher if needed
if len(qualityDowngrades) > 0 && !force {
t.maxSubscribedQualityDebounce(func() {
t.UpdateQualityChange(true)
})
// no quality upgrades
if len(qualityDowngrades)+noChangeCount == len(t.maxSubscribedQuality) {
t.maxQualityLock.Unlock()
return
}
}
subscribedCodec := make([]*livekit.SubscribedCodec, 0, len(t.maxSubscribedQuality))
maxSubscribedQualities := make([]types.SubscribedCodecQuality, 0, len(t.maxSubscribedQuality))
for mime, maxQuality := range t.maxSubscribedQuality {
maxSubscribedQualities = append(maxSubscribedQualities, types.SubscribedCodecQuality{
CodecMime: mime,
Quality: maxQuality,
})
if maxQuality == livekit.VideoQuality_OFF {
subscribedCodec = append(subscribedCodec, &livekit.SubscribedCodec{
Codec: mime,
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
})
} else {
var subscribedQualities []*livekit.SubscribedQuality
for q := livekit.VideoQuality_LOW; q <= livekit.VideoQuality_HIGH; q++ {
subscribedQualities = append(subscribedQualities, &livekit.SubscribedQuality{
Quality: q,
Enabled: q <= maxQuality,
})
}
subscribedCodec = append(subscribedCodec, &livekit.SubscribedCodec{
Codec: mime,
Qualities: subscribedQualities,
})
}
}
if t.onSubscribedMaxQualityChange != nil {
t.params.Logger.Debugw("subscribedMaxQualityChange",
"subscribedCodec", subscribedCodec,
"maxSubscribedQualities", maxSubscribedQualities)
t.qualityNotifyOpQueue.Enqueue(func() {
t.onSubscribedMaxQualityChange(subscribedCodec, maxSubscribedQualities)
})
}
t.maxQualityLock.Unlock()
}
func (t *MediaTrackSubscriptions) startMaxQualityTimer(force bool) {
t.maxQualityLock.Lock()
defer t.maxQualityLock.Unlock()
if t.params.MediaTrack.Kind() != livekit.TrackType_VIDEO {
return
}
if t.maxQualityTimer != nil {
t.maxQualityTimer.Stop()
t.maxQualityTimer = nil
}
t.maxQualityTimer = time.AfterFunc(initialQualityUpdateWait, func() {
t.stopMaxQualityTimer()
t.UpdateQualityChange(force)
})
}
func (t *MediaTrackSubscriptions) stopMaxQualityTimer() {
t.maxQualityLock.Lock()
defer t.maxQualityLock.Unlock()
if t.maxQualityTimer != nil {
t.maxQualityTimer.Stop()
t.maxQualityTimer = nil
}
}
func (t *MediaTrackSubscriptions) downTrackClosed(
sub types.LocalParticipant,
subTrack types.SubscribedTrack,
+27 -2
View File
@@ -857,7 +857,9 @@ func (p *ParticipantImpl) ICERestart(iceConfig *types.IceConfig) error {
return nil
}
p.UpTrackManager.Restart()
for _, t := range p.GetPublishedTracks() {
t.(types.LocalMediaTrack).Restart()
}
return p.subscriber.CreateAndSendOffer(&webrtc.OfferOptions{
ICERestart: true,
@@ -1621,7 +1623,8 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID,
SubscribedQualities: subscribedQualities[0].Qualities, // for compatible with old client
SubscribedCodecs: subscribedQualities,
}
// get track's layer dimensions
// send layer info about max subscription changes to telemetry
track := p.UpTrackManager.GetPublishedTrack(trackID)
var layerInfo map[livekit.VideoQuality]*livekit.VideoLayer
if track != nil {
@@ -2333,3 +2336,25 @@ func (p *ParticipantImpl) ClearInProgressAndProcessSubscriptionRequestsQueue(tra
go p.ProcessSubscriptionRequestsQueue(trackID)
}
func (p *ParticipantImpl) UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []types.SubscribedCodecQuality) error {
track := p.GetPublishedTrack(trackID)
if track == nil {
p.params.Logger.Warnw("could not find track", nil, "trackID", trackID)
return errors.New("could not find published track")
}
track.(types.LocalMediaTrack).NotifySubscriberNodeMaxQuality(nodeID, maxQualities)
return nil
}
func (p *ParticipantImpl) UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error {
track := p.GetPublishedTrack(trackID)
if track == nil {
p.params.Logger.Warnw("could not find track", nil, "trackID", trackID)
return errors.New("could not find published track")
}
track.(types.LocalMediaTrack).NotifySubscriberNodeMediaLoss(nodeID, uint8(fractionalLoss))
return nil
}
+8 -7
View File
@@ -191,8 +191,6 @@ type Participant interface {
resolverBySid func(participantID livekit.ParticipantID) LocalParticipant,
) error
UpdateVideoLayers(updateVideoLayers *livekit.UpdateVideoLayers) error
UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []SubscribedCodecQuality) error
UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error
DebugInfo() map[string]interface{}
}
@@ -295,6 +293,9 @@ type LocalParticipant interface {
SetICEConfig(iceConfig IceConfig)
OnICEConfigChanged(callback func(participant LocalParticipant, iceConfig IceConfig))
UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []SubscribedCodecQuality) error
UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error
}
// Room is a container of participants, and can provide room-level actions
@@ -331,8 +332,6 @@ type MediaTrack interface {
UpdateVideoLayers(layers []*livekit.VideoLayer)
IsSimulcast() bool
Restart()
// callbacks
AddOnClose(func())
@@ -348,9 +347,6 @@ type MediaTrack interface {
// returns quality information that's appropriate for width & height
GetQualityForDimension(width, height uint32) livekit.VideoQuality
NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualites []SubscribedCodecQuality)
NotifySubscriberNodeMediaLoss(nodeID livekit.NodeID, fractionalLoss uint8)
Receivers() []sfu.TrackReceiver
}
@@ -358,6 +354,8 @@ type MediaTrack interface {
type LocalMediaTrack interface {
MediaTrack
Restart()
SignalCid() string
HasSdpCid(cid string) bool
@@ -365,6 +363,9 @@ type LocalMediaTrack interface {
GetConnectionScore() float32
SetRTT(rtt uint32)
NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []SubscribedCodecQuality)
NotifySubscriberNodeMediaLoss(nodeID livekit.NodeID, fractionalLoss uint8)
}
// MediaTrack is the main interface representing a track published to the room
@@ -119,18 +119,6 @@ type FakeMediaTrack struct {
nameReturnsOnCall map[int]struct {
result1 string
}
NotifySubscriberNodeMaxQualityStub func(livekit.NodeID, []types.SubscribedCodecQuality)
notifySubscriberNodeMaxQualityMutex sync.RWMutex
notifySubscriberNodeMaxQualityArgsForCall []struct {
arg1 livekit.NodeID
arg2 []types.SubscribedCodecQuality
}
NotifySubscriberNodeMediaLossStub func(livekit.NodeID, uint8)
notifySubscriberNodeMediaLossMutex sync.RWMutex
notifySubscriberNodeMediaLossArgsForCall []struct {
arg1 livekit.NodeID
arg2 uint8
}
PublisherIDStub func() livekit.ParticipantID
publisherIDMutex sync.RWMutex
publisherIDArgsForCall []struct {
@@ -182,10 +170,6 @@ type FakeMediaTrack struct {
arg1 livekit.ParticipantID
arg2 bool
}
RestartStub func()
restartMutex sync.RWMutex
restartArgsForCall []struct {
}
RevokeDisallowedSubscribersStub func([]livekit.ParticipantIdentity) []livekit.ParticipantIdentity
revokeDisallowedSubscribersMutex sync.RWMutex
revokeDisallowedSubscribersArgsForCall []struct {
@@ -818,77 +802,6 @@ func (fake *FakeMediaTrack) NameReturnsOnCall(i int, result1 string) {
}{result1}
}
func (fake *FakeMediaTrack) NotifySubscriberNodeMaxQuality(arg1 livekit.NodeID, arg2 []types.SubscribedCodecQuality) {
var arg2Copy []types.SubscribedCodecQuality
if arg2 != nil {
arg2Copy = make([]types.SubscribedCodecQuality, len(arg2))
copy(arg2Copy, arg2)
}
fake.notifySubscriberNodeMaxQualityMutex.Lock()
fake.notifySubscriberNodeMaxQualityArgsForCall = append(fake.notifySubscriberNodeMaxQualityArgsForCall, struct {
arg1 livekit.NodeID
arg2 []types.SubscribedCodecQuality
}{arg1, arg2Copy})
stub := fake.NotifySubscriberNodeMaxQualityStub
fake.recordInvocation("NotifySubscriberNodeMaxQuality", []interface{}{arg1, arg2Copy})
fake.notifySubscriberNodeMaxQualityMutex.Unlock()
if stub != nil {
fake.NotifySubscriberNodeMaxQualityStub(arg1, arg2)
}
}
func (fake *FakeMediaTrack) NotifySubscriberNodeMaxQualityCallCount() int {
fake.notifySubscriberNodeMaxQualityMutex.RLock()
defer fake.notifySubscriberNodeMaxQualityMutex.RUnlock()
return len(fake.notifySubscriberNodeMaxQualityArgsForCall)
}
func (fake *FakeMediaTrack) NotifySubscriberNodeMaxQualityCalls(stub func(livekit.NodeID, []types.SubscribedCodecQuality)) {
fake.notifySubscriberNodeMaxQualityMutex.Lock()
defer fake.notifySubscriberNodeMaxQualityMutex.Unlock()
fake.NotifySubscriberNodeMaxQualityStub = stub
}
func (fake *FakeMediaTrack) NotifySubscriberNodeMaxQualityArgsForCall(i int) (livekit.NodeID, []types.SubscribedCodecQuality) {
fake.notifySubscriberNodeMaxQualityMutex.RLock()
defer fake.notifySubscriberNodeMaxQualityMutex.RUnlock()
argsForCall := fake.notifySubscriberNodeMaxQualityArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeMediaTrack) NotifySubscriberNodeMediaLoss(arg1 livekit.NodeID, arg2 uint8) {
fake.notifySubscriberNodeMediaLossMutex.Lock()
fake.notifySubscriberNodeMediaLossArgsForCall = append(fake.notifySubscriberNodeMediaLossArgsForCall, struct {
arg1 livekit.NodeID
arg2 uint8
}{arg1, arg2})
stub := fake.NotifySubscriberNodeMediaLossStub
fake.recordInvocation("NotifySubscriberNodeMediaLoss", []interface{}{arg1, arg2})
fake.notifySubscriberNodeMediaLossMutex.Unlock()
if stub != nil {
fake.NotifySubscriberNodeMediaLossStub(arg1, arg2)
}
}
func (fake *FakeMediaTrack) NotifySubscriberNodeMediaLossCallCount() int {
fake.notifySubscriberNodeMediaLossMutex.RLock()
defer fake.notifySubscriberNodeMediaLossMutex.RUnlock()
return len(fake.notifySubscriberNodeMediaLossArgsForCall)
}
func (fake *FakeMediaTrack) NotifySubscriberNodeMediaLossCalls(stub func(livekit.NodeID, uint8)) {
fake.notifySubscriberNodeMediaLossMutex.Lock()
defer fake.notifySubscriberNodeMediaLossMutex.Unlock()
fake.NotifySubscriberNodeMediaLossStub = stub
}
func (fake *FakeMediaTrack) NotifySubscriberNodeMediaLossArgsForCall(i int) (livekit.NodeID, uint8) {
fake.notifySubscriberNodeMediaLossMutex.RLock()
defer fake.notifySubscriberNodeMediaLossMutex.RUnlock()
argsForCall := fake.notifySubscriberNodeMediaLossArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeMediaTrack) PublisherID() livekit.ParticipantID {
fake.publisherIDMutex.Lock()
ret, specificReturn := fake.publisherIDReturnsOnCall[len(fake.publisherIDArgsForCall)]
@@ -1166,30 +1079,6 @@ func (fake *FakeMediaTrack) RemoveSubscriberArgsForCall(i int) (livekit.Particip
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeMediaTrack) Restart() {
fake.restartMutex.Lock()
fake.restartArgsForCall = append(fake.restartArgsForCall, struct {
}{})
stub := fake.RestartStub
fake.recordInvocation("Restart", []interface{}{})
fake.restartMutex.Unlock()
if stub != nil {
fake.RestartStub()
}
}
func (fake *FakeMediaTrack) RestartCallCount() int {
fake.restartMutex.RLock()
defer fake.restartMutex.RUnlock()
return len(fake.restartArgsForCall)
}
func (fake *FakeMediaTrack) RestartCalls(stub func()) {
fake.restartMutex.Lock()
defer fake.restartMutex.Unlock()
fake.RestartStub = stub
}
func (fake *FakeMediaTrack) RevokeDisallowedSubscribers(arg1 []livekit.ParticipantIdentity) []livekit.ParticipantIdentity {
var arg1Copy []livekit.ParticipantIdentity
if arg1 != nil {
@@ -1456,10 +1345,6 @@ func (fake *FakeMediaTrack) Invocations() map[string][][]interface{} {
defer fake.kindMutex.RUnlock()
fake.nameMutex.RLock()
defer fake.nameMutex.RUnlock()
fake.notifySubscriberNodeMaxQualityMutex.RLock()
defer fake.notifySubscriberNodeMaxQualityMutex.RUnlock()
fake.notifySubscriberNodeMediaLossMutex.RLock()
defer fake.notifySubscriberNodeMediaLossMutex.RUnlock()
fake.publisherIDMutex.RLock()
defer fake.publisherIDMutex.RUnlock()
fake.publisherIdentityMutex.RLock()
@@ -1472,8 +1357,6 @@ func (fake *FakeMediaTrack) Invocations() map[string][][]interface{} {
defer fake.removeAllSubscribersMutex.RUnlock()
fake.removeSubscriberMutex.RLock()
defer fake.removeSubscriberMutex.RUnlock()
fake.restartMutex.RLock()
defer fake.restartMutex.RUnlock()
fake.revokeDisallowedSubscribersMutex.RLock()
defer fake.revokeDisallowedSubscribersMutex.RUnlock()
fake.setMutedMutex.RLock()
@@ -144,32 +144,6 @@ type FakeParticipant struct {
toProtoReturnsOnCall map[int]struct {
result1 *livekit.ParticipantInfo
}
UpdateMediaLossStub func(livekit.NodeID, livekit.TrackID, uint32) error
updateMediaLossMutex sync.RWMutex
updateMediaLossArgsForCall []struct {
arg1 livekit.NodeID
arg2 livekit.TrackID
arg3 uint32
}
updateMediaLossReturns struct {
result1 error
}
updateMediaLossReturnsOnCall map[int]struct {
result1 error
}
UpdateSubscribedQualityStub func(livekit.NodeID, livekit.TrackID, []types.SubscribedCodecQuality) error
updateSubscribedQualityMutex sync.RWMutex
updateSubscribedQualityArgsForCall []struct {
arg1 livekit.NodeID
arg2 livekit.TrackID
arg3 []types.SubscribedCodecQuality
}
updateSubscribedQualityReturns struct {
result1 error
}
updateSubscribedQualityReturnsOnCall map[int]struct {
result1 error
}
UpdateSubscriptionPermissionStub func(*livekit.SubscriptionPermission, *livekit.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) error
updateSubscriptionPermissionMutex sync.RWMutex
updateSubscriptionPermissionArgsForCall []struct {
@@ -904,137 +878,6 @@ func (fake *FakeParticipant) ToProtoReturnsOnCall(i int, result1 *livekit.Partic
}{result1}
}
func (fake *FakeParticipant) UpdateMediaLoss(arg1 livekit.NodeID, arg2 livekit.TrackID, arg3 uint32) error {
fake.updateMediaLossMutex.Lock()
ret, specificReturn := fake.updateMediaLossReturnsOnCall[len(fake.updateMediaLossArgsForCall)]
fake.updateMediaLossArgsForCall = append(fake.updateMediaLossArgsForCall, struct {
arg1 livekit.NodeID
arg2 livekit.TrackID
arg3 uint32
}{arg1, arg2, arg3})
stub := fake.UpdateMediaLossStub
fakeReturns := fake.updateMediaLossReturns
fake.recordInvocation("UpdateMediaLoss", []interface{}{arg1, arg2, arg3})
fake.updateMediaLossMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeParticipant) UpdateMediaLossCallCount() int {
fake.updateMediaLossMutex.RLock()
defer fake.updateMediaLossMutex.RUnlock()
return len(fake.updateMediaLossArgsForCall)
}
func (fake *FakeParticipant) UpdateMediaLossCalls(stub func(livekit.NodeID, livekit.TrackID, uint32) error) {
fake.updateMediaLossMutex.Lock()
defer fake.updateMediaLossMutex.Unlock()
fake.UpdateMediaLossStub = stub
}
func (fake *FakeParticipant) UpdateMediaLossArgsForCall(i int) (livekit.NodeID, livekit.TrackID, uint32) {
fake.updateMediaLossMutex.RLock()
defer fake.updateMediaLossMutex.RUnlock()
argsForCall := fake.updateMediaLossArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeParticipant) UpdateMediaLossReturns(result1 error) {
fake.updateMediaLossMutex.Lock()
defer fake.updateMediaLossMutex.Unlock()
fake.UpdateMediaLossStub = nil
fake.updateMediaLossReturns = struct {
result1 error
}{result1}
}
func (fake *FakeParticipant) UpdateMediaLossReturnsOnCall(i int, result1 error) {
fake.updateMediaLossMutex.Lock()
defer fake.updateMediaLossMutex.Unlock()
fake.UpdateMediaLossStub = nil
if fake.updateMediaLossReturnsOnCall == nil {
fake.updateMediaLossReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.updateMediaLossReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeParticipant) UpdateSubscribedQuality(arg1 livekit.NodeID, arg2 livekit.TrackID, arg3 []types.SubscribedCodecQuality) error {
var arg3Copy []types.SubscribedCodecQuality
if arg3 != nil {
arg3Copy = make([]types.SubscribedCodecQuality, len(arg3))
copy(arg3Copy, arg3)
}
fake.updateSubscribedQualityMutex.Lock()
ret, specificReturn := fake.updateSubscribedQualityReturnsOnCall[len(fake.updateSubscribedQualityArgsForCall)]
fake.updateSubscribedQualityArgsForCall = append(fake.updateSubscribedQualityArgsForCall, struct {
arg1 livekit.NodeID
arg2 livekit.TrackID
arg3 []types.SubscribedCodecQuality
}{arg1, arg2, arg3Copy})
stub := fake.UpdateSubscribedQualityStub
fakeReturns := fake.updateSubscribedQualityReturns
fake.recordInvocation("UpdateSubscribedQuality", []interface{}{arg1, arg2, arg3Copy})
fake.updateSubscribedQualityMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeParticipant) UpdateSubscribedQualityCallCount() int {
fake.updateSubscribedQualityMutex.RLock()
defer fake.updateSubscribedQualityMutex.RUnlock()
return len(fake.updateSubscribedQualityArgsForCall)
}
func (fake *FakeParticipant) UpdateSubscribedQualityCalls(stub func(livekit.NodeID, livekit.TrackID, []types.SubscribedCodecQuality) error) {
fake.updateSubscribedQualityMutex.Lock()
defer fake.updateSubscribedQualityMutex.Unlock()
fake.UpdateSubscribedQualityStub = stub
}
func (fake *FakeParticipant) UpdateSubscribedQualityArgsForCall(i int) (livekit.NodeID, livekit.TrackID, []types.SubscribedCodecQuality) {
fake.updateSubscribedQualityMutex.RLock()
defer fake.updateSubscribedQualityMutex.RUnlock()
argsForCall := fake.updateSubscribedQualityArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeParticipant) UpdateSubscribedQualityReturns(result1 error) {
fake.updateSubscribedQualityMutex.Lock()
defer fake.updateSubscribedQualityMutex.Unlock()
fake.UpdateSubscribedQualityStub = nil
fake.updateSubscribedQualityReturns = struct {
result1 error
}{result1}
}
func (fake *FakeParticipant) UpdateSubscribedQualityReturnsOnCall(i int, result1 error) {
fake.updateSubscribedQualityMutex.Lock()
defer fake.updateSubscribedQualityMutex.Unlock()
fake.UpdateSubscribedQualityStub = nil
if fake.updateSubscribedQualityReturnsOnCall == nil {
fake.updateSubscribedQualityReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.updateSubscribedQualityReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeParticipant) UpdateSubscriptionPermission(arg1 *livekit.SubscriptionPermission, arg2 *livekit.TimedVersion, arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, arg4 func(participantID livekit.ParticipantID) types.LocalParticipant) error {
fake.updateSubscriptionPermissionMutex.Lock()
ret, specificReturn := fake.updateSubscriptionPermissionReturnsOnCall[len(fake.updateSubscriptionPermissionArgsForCall)]
@@ -1191,10 +1034,6 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} {
defer fake.subscriptionPermissionMutex.RUnlock()
fake.toProtoMutex.RLock()
defer fake.toProtoMutex.RUnlock()
fake.updateMediaLossMutex.RLock()
defer fake.updateMediaLossMutex.RUnlock()
fake.updateSubscribedQualityMutex.RLock()
defer fake.updateSubscribedQualityMutex.RUnlock()
fake.updateSubscriptionPermissionMutex.RLock()
defer fake.updateSubscriptionPermissionMutex.RUnlock()
fake.updateVideoLayersMutex.RLock()
-28
View File
@@ -58,12 +58,6 @@ func (u *UpTrackManager) Start() {
u.opsQueue.Start()
}
func (u *UpTrackManager) Restart() {
for _, t := range u.GetPublishedTracks() {
t.Restart()
}
}
func (u *UpTrackManager) Close(willBeResumed bool) {
u.opsQueue.Stop()
@@ -288,28 +282,6 @@ func (u *UpTrackManager) UpdateVideoLayers(updateVideoLayers *livekit.UpdateVide
return nil
}
func (u *UpTrackManager) UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []types.SubscribedCodecQuality) error {
track := u.GetPublishedTrack(trackID)
if track == nil {
u.params.Logger.Warnw("could not find track", nil, "trackID", trackID)
return errors.New("could not find published track")
}
track.NotifySubscriberNodeMaxQuality(nodeID, maxQualities)
return nil
}
func (u *UpTrackManager) UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error {
track := u.GetPublishedTrack(trackID)
if track == nil {
u.params.Logger.Warnw("could not find track", nil, "trackID", trackID)
return errors.New("could not find published track")
}
track.NotifySubscriberNodeMediaLoss(nodeID, uint8(fractionalLoss))
return nil
}
func (u *UpTrackManager) AddPublishedTrack(track types.MediaTrack) {
u.lock.Lock()
if _, ok := u.publishedTracks[track.ID()]; !ok {
+3
View File
@@ -306,6 +306,9 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
d.bound.Store(true)
d.bindLock.Unlock()
if d.onMaxLayerChanged != nil {
d.onMaxLayerChanged(d, d.MaxLayers().Spatial)
}
d.connectionStats.SetTrackSource(d.receiver.TrackSource())
d.connectionStats.Start()
d.logger.Debugw("downtrack bound")