Connection quality clean up (#766)

* WIP commit

* WIP commit

* Remove debug

* Revert to reduce diff

* Fix tests

* Determine spatial layer from track info quality if non-simulcast

* Adjust for invalid layer on no rid, previously that function was returning 0 for no rid case

* Fall back to top level width/height if there are no layers

* Use duration from RTPDeltaInfo
This commit is contained in:
Raja Subramanian
2022-06-18 21:58:47 +05:30
committed by GitHub
parent 651d2aee4d
commit 9032db857c
12 changed files with 351 additions and 407 deletions

View File

@@ -203,6 +203,10 @@ func (t *MediaTrackReceiver) SetLayerSsrc(mime string, rid string, ssrc uint32)
defer t.lock.Unlock()
layer := sfu.RidToLayer(rid)
if layer == sfu.InvalidLayerSpatial {
// non-simulcast case will not have `rid`
layer = 0
}
for _, receiver := range t.receiversShadow {
if strings.EqualFold(receiver.Codec().MimeType, mime) && int(layer) < len(receiver.layerSSRCs) {
receiver.layerSSRCs[layer] = ssrc
@@ -671,29 +675,3 @@ func (t *MediaTrackReceiver) OnSubscribedMaxQualityChange(f func(trackID livekit
}
// ---------------------------
func QualityForSpatialLayer(layer int32) livekit.VideoQuality {
switch layer {
case 0:
return livekit.VideoQuality_LOW
case 1:
return livekit.VideoQuality_MEDIUM
case 2:
return livekit.VideoQuality_HIGH
case sfu.InvalidLayerSpatial:
return livekit.VideoQuality_OFF
default:
return livekit.VideoQuality_OFF
}
}
func VideoQualityToRID(q livekit.VideoQuality) string {
switch q {
case livekit.VideoQuality_HIGH:
return sfu.FullResolution
case livekit.VideoQuality_MEDIUM:
return sfu.HalfResolution
default:
return sfu.QuarterResolution
}
}

View File

@@ -19,6 +19,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/utils"
)
const (
@@ -224,7 +225,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
})
downTrack.OnMaxLayerChanged(func(dt *sfu.DownTrack, layer int32) {
go t.notifySubscriberMaxQuality(subscriberID, dt.Codec(), QualityForSpatialLayer(layer))
go t.notifySubscriberMaxQuality(subscriberID, dt.Codec(), utils.QualityForSpatialLayer(layer))
})
downTrack.OnRttUpdate(func(_ *sfu.DownTrack, rtt uint32) {

View File

@@ -8,7 +8,6 @@ import (
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils"
@@ -224,27 +223,17 @@ func TestMuteSetting(t *testing.T) {
}
func TestConnectionQuality(t *testing.T) {
videoScore := func(totalBytes int64, totalFrames int64, qualityParam *buffer.ConnectionQualityParams,
codec string, expectedHeight int32, expectedWidth int32, actualHeight int32, actualWidth int32) float32 {
return connectionquality.VideoConnectionScore(1*time.Second, totalBytes, totalFrames, qualityParam, codec,
expectedHeight, expectedWidth, actualHeight, actualWidth)
}
testPublishedVideoTrack := func(totalBytes int64, totalFrames int64, qualityParam *buffer.ConnectionQualityParams,
codec string, expectedHeight int32, expectedWidth int32, actualHeight int32, actualWidth int32) *typesfakes.FakeLocalMediaTrack {
testPublishedVideoTrack := func(params connectionquality.TrackScoreParams) *typesfakes.FakeLocalMediaTrack {
tr := &typesfakes.FakeLocalMediaTrack{}
score := videoScore(totalBytes, totalFrames, qualityParam, codec, expectedHeight, expectedWidth,
actualHeight, actualWidth)
score := connectionquality.VideoTrackScore(params)
t.Log("video score: ", score)
tr.GetConnectionScoreReturns(score)
return tr
}
testPublishedAudioTrack := func(totalBytes int64, qualityParam *buffer.ConnectionQualityParams,
dtxDisabled bool) *typesfakes.FakeLocalMediaTrack {
testPublishedAudioTrack := func(params connectionquality.TrackScoreParams) *typesfakes.FakeLocalMediaTrack {
tr := &typesfakes.FakeLocalMediaTrack{}
score := connectionquality.AudioConnectionScore(1*time.Second, totalBytes, qualityParam, dtxDisabled)
score := connectionquality.AudioTrackScore(params)
t.Log("audio score: ", score)
tr.GetConnectionScoreReturns(score)
return tr
@@ -257,11 +246,33 @@ func TestConnectionQuality(t *testing.T) {
p := newParticipantForTest("test")
// >2Mbps, 30fps, expected/actual video size = 1280x720
p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(290000, 30, &buffer.ConnectionQualityParams{},
"", 720, 1280, 720, 1280)
params := connectionquality.TrackScoreParams{
Duration: 1 * time.Second,
PacketsExpected: 100,
PacketsLost: 0,
Bytes: 290000,
Frames: 30,
Jitter: 0.0,
Rtt: 0,
ExpectedWidth: 1280,
ExpectedHeight: 720,
ActualWidth: 1280,
ActualHeight: 720,
}
p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(params)
// no packet loss
p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, &buffer.ConnectionQualityParams{}, false)
params = connectionquality.TrackScoreParams{
Duration: 1 * time.Second,
Codec: "opus",
PacketsExpected: 100,
PacketsLost: 0,
Bytes: 1000,
Jitter: 0.0,
Rtt: 0,
DtxDisabled: false,
}
p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(params)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, p.GetConnectionQuality().GetQuality())
})
@@ -270,11 +281,33 @@ func TestConnectionQuality(t *testing.T) {
p := newParticipantForTest("test")
// 1Mbps, 15fps, expected = 1280x720, actual = 640 x 480
p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(25000, 15, &buffer.ConnectionQualityParams{},
"", 720, 1280, 480, 640)
params := connectionquality.TrackScoreParams{
Duration: 1 * time.Second,
PacketsExpected: 100,
PacketsLost: 0,
Bytes: 125000,
Frames: 15,
Jitter: 0.0,
Rtt: 0,
ExpectedWidth: 1280,
ExpectedHeight: 720,
ActualWidth: 640,
ActualHeight: 480,
}
p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(params)
// packet loss of 10%
p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, &buffer.ConnectionQualityParams{LossPercentage: 5}, false)
// packet loss of 5%
params = connectionquality.TrackScoreParams{
Duration: 1 * time.Second,
Codec: "opus",
PacketsExpected: 100,
PacketsLost: 5,
Bytes: 1000,
Jitter: 0.0,
Rtt: 0,
DtxDisabled: false,
}
p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(params)
require.Equal(t, livekit.ConnectionQuality_GOOD, p.GetConnectionQuality().GetQuality())
})
@@ -282,20 +315,51 @@ func TestConnectionQuality(t *testing.T) {
t.Run("audio smooth publishing", func(t *testing.T) {
p := newParticipantForTest("test")
// no packet loss
p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, &buffer.ConnectionQualityParams{}, false)
params := connectionquality.TrackScoreParams{
Duration: 1 * time.Second,
Codec: "opus",
PacketsExpected: 100,
PacketsLost: 0,
Bytes: 1000,
Jitter: 0.0,
Rtt: 0,
DtxDisabled: false,
}
p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(params)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, p.GetConnectionQuality().GetQuality())
})
t.Run("audio reduced publishing", func(t *testing.T) {
p := newParticipantForTest("test")
p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, &buffer.ConnectionQualityParams{LossPercentage: 5}, false)
params := connectionquality.TrackScoreParams{
Duration: 1 * time.Second,
Codec: "opus",
PacketsExpected: 100,
PacketsLost: 5,
Bytes: 1000,
Jitter: 0.0,
Rtt: 0,
DtxDisabled: false,
}
p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(params)
require.Equal(t, livekit.ConnectionQuality_GOOD, p.GetConnectionQuality().GetQuality())
})
t.Run("audio bad publishing", func(t *testing.T) {
p := newParticipantForTest("test")
p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, &buffer.ConnectionQualityParams{LossPercentage: 20}, false)
params := connectionquality.TrackScoreParams{
Duration: 1 * time.Second,
Codec: "opus",
PacketsExpected: 100,
PacketsLost: 20,
Bytes: 1000,
Jitter: 0.0,
Rtt: 0,
DtxDisabled: false,
}
p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(params)
require.Equal(t, livekit.ConnectionQuality_POOR, p.GetConnectionQuality().GetQuality())
})
@@ -304,26 +368,64 @@ func TestConnectionQuality(t *testing.T) {
p := newParticipantForTest("test")
// >2Mbps, 30fps, expected/actual video size = 1280x720
p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(290000, 30, &buffer.ConnectionQualityParams{},
"", 720, 1280, 720, 1280)
params := connectionquality.TrackScoreParams{
Duration: 1 * time.Second,
PacketsExpected: 100,
PacketsLost: 0,
Bytes: 290000,
Frames: 30,
Jitter: 0.0,
Rtt: 0,
ExpectedWidth: 1280,
ExpectedHeight: 720,
ActualWidth: 1280,
ActualHeight: 720,
}
p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(params)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, p.GetConnectionQuality().GetQuality())
})
t.Run("video reduced publishing", func(t *testing.T) {
p := newParticipantForTest("test")
// 1Mbps, 15fps, expected = 1280x720, actual = 640 x 480
p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(100000, 15, &buffer.ConnectionQualityParams{},
"", 720, 1280, 480, 640)
params := connectionquality.TrackScoreParams{
Duration: 1 * time.Second,
PacketsExpected: 100,
PacketsLost: 0,
Bytes: 125000,
Frames: 15,
Jitter: 0.0,
Rtt: 0,
ExpectedWidth: 1280,
ExpectedHeight: 720,
ActualWidth: 640,
ActualHeight: 480,
}
p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(params)
require.Equal(t, livekit.ConnectionQuality_GOOD, p.GetConnectionQuality().GetQuality())
})
t.Run("video poor publishing", func(t *testing.T) {
p := newParticipantForTest("test")
// 20kbps, 8fps, expected = 1280x720, actual = 640 x 480
p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(2500, 8, &buffer.ConnectionQualityParams{},
"", 720, 1280, 426, 240)
// 20kbps, 8fps, expected = 1280x720, actual = 240x426
params := connectionquality.TrackScoreParams{
Duration: 1 * time.Second,
PacketsExpected: 100,
PacketsLost: 0,
Bytes: 2500,
Frames: 8,
Jitter: 0.0,
Rtt: 0,
ExpectedWidth: 1280,
ExpectedHeight: 720,
ActualWidth: 240,
ActualHeight: 426,
}
p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(params)
require.Equal(t, livekit.ConnectionQuality_POOR, p.GetConnectionQuality().GetQuality())
})

View File

@@ -73,10 +73,9 @@ type Buffer struct {
pliThrottle int64
rtpStats *RTPStats
rrSnapshotId uint32
connectionQualitySnapshotId uint32
deltaStatsSnapshotId uint32
rtpStats *RTPStats
rrSnapshotId uint32
deltaStatsSnapshotId uint32
lastFractionLostToReport uint8 // Last fraction lost from subscribers, should report to publisher; Audio only
@@ -148,7 +147,6 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
Logger: b.logger,
})
b.rrSnapshotId = b.rtpStats.NewSnapshotId()
b.connectionQualitySnapshotId = b.rtpStats.NewSnapshotId()
b.deltaStatsSnapshotId = b.rtpStats.NewSnapshotId()
b.clockRate = codec.ClockRate
@@ -613,17 +611,6 @@ func (b *Buffer) GetStats() *livekit.RTPStats {
return b.rtpStats.ToProto()
}
func (b *Buffer) GetQualityInfo() *RTPSnapshotInfo {
b.RLock()
defer b.RUnlock()
if b.rtpStats == nil {
return nil
}
return b.rtpStats.SnapshotInfo(b.connectionQualitySnapshotId)
}
func (b *Buffer) GetDeltaStats() *StreamStatsWithLayers {
b.RLock()
defer b.RUnlock()
@@ -637,16 +624,11 @@ func (b *Buffer) GetDeltaStats() *StreamStatsWithLayers {
return nil
}
layers := make(map[int]LayerStats)
layers[0] = LayerStats{
Packets: deltaStats.Packets + deltaStats.PacketsDuplicate + deltaStats.PacketsPadding,
Bytes: deltaStats.Bytes + deltaStats.BytesDuplicate + deltaStats.BytesPadding,
Frames: deltaStats.Frames,
}
return &StreamStatsWithLayers{
RTPStats: deltaStats,
Layers: layers,
Layers: map[int32]*RTPDeltaInfo{
0: deltaStats,
},
}
}

View File

@@ -15,8 +15,6 @@ import (
const (
GapHistogramNumBins = 101
SequenceNumberMin = uint16(0)
SequenceNumberMax = uint16(65535)
NumSequenceNumbers = 65536
FirstSnapshotId = 1
SnInfoSize = 2048
@@ -29,14 +27,8 @@ type RTPFlowState struct {
LossEndExclusive uint16
}
type RTPSnapshotInfo struct {
PacketsExpected uint32
PacketsLost uint32
JitterMax float64
RttMax uint32
}
type RTPDeltaInfo struct {
Duration time.Duration
Packets uint32
Bytes uint64
PacketsDuplicate uint32
@@ -53,6 +45,7 @@ type RTPDeltaInfo struct {
}
type Snapshot struct {
startTime time.Time
extStartSN uint32
packetsDuplicate uint32
bytesDuplicate uint64
@@ -177,7 +170,7 @@ func (r *RTPStats) NewSnapshotId() uint32 {
id := r.nextSnapshotId
if r.initialized {
r.snapshots[id] = &Snapshot{extStartSN: r.extStartSN}
r.snapshots[id] = &Snapshot{startTime: time.Now(), extStartSN: r.extStartSN}
}
r.nextSnapshotId++
@@ -613,54 +606,6 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
}
}
func (r *RTPStats) SnapshotInfo(snapshotId uint32) *RTPSnapshotInfo {
r.lock.Lock()
then, now := r.getAndResetSnapshot(snapshotId)
r.lock.Unlock()
if now == nil || then == nil {
return nil
}
r.lock.RLock()
defer r.lock.RUnlock()
packetsExpected := now.extStartSN - then.extStartSN
if packetsExpected > NumSequenceNumbers {
r.logger.Warnw(
"too many packets expected in snapshot",
fmt.Errorf("start: %d, end: %d, expected: %d", then.extStartSN, now.extStartSN, packetsExpected),
)
return nil
}
if packetsExpected == 0 {
return nil
}
packetsLost := uint32(0)
if r.params.IsReceiverReportDriven {
packetsLost = now.packetsLostOverridden - then.packetsLostOverridden
if int32(packetsLost) < 0 {
packetsLost = 0
}
} else {
_, _, _, _, packetsLost, _ = r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
}
maxJitter := then.maxJitter
if r.params.IsReceiverReportDriven {
maxJitter = then.maxJitterOverridden
}
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
return &RTPSnapshotInfo{
PacketsExpected: packetsExpected,
PacketsLost: packetsLost,
JitterMax: maxJitterTime,
RttMax: then.maxRtt,
}
}
func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
r.lock.Lock()
then, now := r.getAndResetSnapshot(snapshotId)
@@ -700,6 +645,7 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
return &RTPDeltaInfo{
Duration: now.startTime.Sub(then.startTime),
Packets: packetsExpected - packetsPadding,
Bytes: bytes,
PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate,
@@ -1068,6 +1014,7 @@ func (r *RTPStats) getAndResetSnapshot(snapshotId uint32) (*Snapshot, *Snapshot)
// snapshot now
r.snapshots[snapshotId] = &Snapshot{
startTime: time.Now(),
extStartSN: r.getExtHighestSNAdjusted() + 1,
packetsDuplicate: r.packetsDuplicate,
bytesDuplicate: r.bytesDuplicate,

View File

@@ -1,18 +1,6 @@
package buffer
type LayerStats struct {
Packets uint32
Bytes uint64
Frames uint32
}
type StreamStatsWithLayers struct {
RTPStats *RTPDeltaInfo
Layers map[int]LayerStats
}
type ConnectionQualityParams struct {
LossPercentage float32
Jitter float32
Rtt uint32
Layers map[int32]*RTPDeltaInfo
}

View File

@@ -1,8 +1,6 @@
package connectionquality
import (
"math"
"sort"
"sync"
"time"
@@ -24,11 +22,9 @@ type ConnectionStatsParams struct {
UpdateInterval time.Duration
CodecType webrtc.RTPCodecType
CodecName string
DtxDisabled bool
DtxDisabled bool // RAJA-TODO - fix this
MimeType string
GetDeltaStats func() map[uint32]*buffer.StreamStatsWithLayers
GetQualityParams func() *buffer.ConnectionQualityParams
GetIsReducedQuality func() bool
GetLayerDimension func(int32) (uint32, uint32)
GetMaxExpectedLayer func() *livekit.VideoLayer
Logger logger.Logger
@@ -39,8 +35,9 @@ type ConnectionStats struct {
onStatsUpdate func(cs *ConnectionStats, stat *livekit.AnalyticsStat)
lock sync.RWMutex
score float32
lock sync.RWMutex
score float32
lastUpdate time.Time
done chan struct{}
isClosed atomic.Bool
@@ -77,64 +74,65 @@ func (cs *ConnectionStats) GetScore() float32 {
return cs.score
}
func (cs *ConnectionStats) updateScore(streams []*livekit.AnalyticsStream, iteration uint64) float32 {
func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWithLayers) float32 {
cs.lock.Lock()
defer cs.lock.Unlock()
// Initial interval will have partial data
if iteration < 2 {
if cs.lastUpdate.IsZero() {
cs.lastUpdate = time.Now()
cs.score = 5
return cs.score
}
s := cs.params.GetQualityParams()
var qualityParam buffer.ConnectionQualityParams
if s == nil {
cs.lastUpdate = time.Now()
// RAJA-TODO - take mute into account
// RAJA-TODO - maybe a probation period at the start, coming out of mute?
// RAJA-TODO - maybe a probation period when a layer starts/stops?
// RAJA-TODO - maybe layer changes should be notified here directly rather than pulling
// RAJA-TODO - maybe track info should be set here and this can know max published without having to pull every time
maxAvailableLayer, maxAvailableLayerStats := cs.getMaxAvailableLayerStats(streams)
if maxAvailableLayerStats == nil {
// retain old score as stats will not be available when muted
return cs.score
} else {
qualityParam = *s
if math.IsInf(float64(qualityParam.Jitter), 0) {
qualityParam.Jitter = 0
}
if math.IsInf(float64(qualityParam.LossPercentage), 0) {
qualityParam.LossPercentage = 0
}
}
interval := cs.params.UpdateInterval
if interval == 0 {
interval = UpdateInterval
params := TrackScoreParams{
Duration: maxAvailableLayerStats.Duration,
Codec: cs.params.CodecName,
PacketsExpected: maxAvailableLayerStats.Packets + maxAvailableLayerStats.PacketsPadding,
PacketsLost: maxAvailableLayerStats.PacketsLost,
Bytes: maxAvailableLayerStats.Bytes,
Frames: maxAvailableLayerStats.Frames,
Jitter: maxAvailableLayerStats.JitterMax,
Rtt: maxAvailableLayerStats.RttMax,
}
if cs.params.CodecType == webrtc.RTPCodecTypeAudio {
totalBytes, _, _ := cs.getBytesFramesFromStreams(streams)
cs.score = AudioConnectionScore(interval, int64(totalBytes), s, cs.params.DtxDisabled)
} else {
switch cs.params.CodecType {
case webrtc.RTPCodecTypeAudio:
// RAJA-TODO - need to set DTX in params
cs.score = AudioTrackScore(params)
case webrtc.RTPCodecTypeVideo:
// get tracks expected max layer and dimensions
expectedLayer := cs.params.GetMaxExpectedLayer()
if expectedLayer == nil || utils.SpatialLayerForQuality(expectedLayer.Quality) == buffer.InvalidLayerSpatial {
maxExpectedLayer := cs.params.GetMaxExpectedLayer()
if maxExpectedLayer == nil || utils.SpatialLayerForQuality(maxExpectedLayer.Quality) == buffer.InvalidLayerSpatial {
return cs.score
}
// get bytes/frames and max later from actual stream stats
totalBytes, totalFrames, maxLayer := cs.getBytesFramesFromStreams(streams)
var actualHeight uint32
var actualWidth uint32
// if data present, but maxLayer == -1 no layer info available, set actual to expected, else fetch
if maxLayer == buffer.InvalidLayerSpatial && totalBytes > 0 {
actualHeight = expectedLayer.Height
actualWidth = expectedLayer.Width
} else {
actualWidth, actualHeight = cs.params.GetLayerDimension(maxLayer)
if maxAvailableLayerStats != nil {
params.ActualWidth, params.ActualHeight = cs.params.GetLayerDimension(maxAvailableLayer)
}
cs.score = VideoConnectionScore(interval, int64(totalBytes), int64(totalFrames), &qualityParam, cs.params.CodecName,
int32(expectedLayer.Height), int32(expectedLayer.Width), int32(actualHeight), int32(actualWidth))
params.ExpectedWidth = maxExpectedLayer.Width
params.ExpectedHeight = maxExpectedLayer.Height
cs.score = VideoTrackScore(params)
}
return cs.score
}
func (cs *ConnectionStats) getStat(iteration uint64) *livekit.AnalyticsStat {
func (cs *ConnectionStats) getStat() *livekit.AnalyticsStat {
if cs.params.GetDeltaStats == nil {
return nil
}
@@ -155,14 +153,14 @@ func (cs *ConnectionStats) getStat(iteration uint64) *livekit.AnalyticsStat {
//
if cs.params.CodecType == webrtc.RTPCodecTypeVideo && (len(streams) > 1 || len(stream.Layers) > 1) {
for layer, layerStats := range stream.Layers {
as.VideoLayers = append(as.VideoLayers, ToAnalyticsVideoLayer(layer, &layerStats))
as.VideoLayers = append(as.VideoLayers, ToAnalyticsVideoLayer(layer, layerStats))
}
}
analyticsStreams = append(analyticsStreams, as)
}
score := cs.updateScore(analyticsStreams, iteration)
score := cs.updateScore(streams)
return &livekit.AnalyticsStat{
Score: score,
@@ -180,16 +178,13 @@ func (cs *ConnectionStats) updateStatsWorker() {
tk := time.NewTicker(interval)
defer tk.Stop()
// Delay sending scores until 2nd cycle, as 1st will be partial.
counter := uint64(0)
for {
select {
case <-cs.done:
return
case <-tk.C:
stat := cs.getStat(counter)
stat := cs.getStat()
if stat == nil {
continue
}
@@ -197,9 +192,6 @@ func (cs *ConnectionStats) updateStatsWorker() {
if cs.onStatsUpdate != nil {
cs.onStatsUpdate(cs, stat)
}
counter++
}
}
}
@@ -223,49 +215,26 @@ func ToAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.An
}
}
func ToAnalyticsVideoLayer(layer int, layerStats *buffer.LayerStats) *livekit.AnalyticsVideoLayer {
func ToAnalyticsVideoLayer(layer int32, layerStats *buffer.RTPDeltaInfo) *livekit.AnalyticsVideoLayer {
return &livekit.AnalyticsVideoLayer{
Layer: int32(layer),
Packets: layerStats.Packets,
Bytes: layerStats.Bytes,
Layer: layer,
Packets: layerStats.Packets + layerStats.PacketsDuplicate + layerStats.PacketsPadding,
Bytes: layerStats.Bytes + layerStats.BytesDuplicate + layerStats.BytesPadding,
Frames: layerStats.Frames,
}
}
func (cs *ConnectionStats) getBytesFramesFromStreams(streams []*livekit.AnalyticsStream) (totalBytes uint64, totalFrames uint32, maxLayer int32) {
layerStats := make(map[int32]buffer.LayerStats)
hasLayers := false
maxLayer = buffer.InvalidLayerSpatial
func (cs *ConnectionStats) getMaxAvailableLayerStats(streams map[uint32]*buffer.StreamStatsWithLayers) (int32, *buffer.RTPDeltaInfo) {
maxAvailableLayer := buffer.InvalidLayerSpatial
var maxAvailableLayerStats *buffer.RTPDeltaInfo
for _, stream := range streams {
// get frames/bytes/packets from video layers if available. Store per layer in layerStats map
if len(stream.VideoLayers) > 0 {
hasLayers = true
layers := stream.VideoLayers
// find max quality 0(LOW), 1(MED), 2(HIGH) . sort on layer.Layer desc
sort.Slice(layers, func(i, j int) bool {
return layers[i].Layer > layers[j].Layer
})
layerStats[layers[0].Layer] = buffer.LayerStats{
Bytes: layers[0].GetBytes(),
Frames: layers[0].GetFrames(),
for layer, layerStats := range stream.Layers {
if int32(layer) > maxAvailableLayer {
maxAvailableLayer = int32(layer)
maxAvailableLayerStats = layerStats
}
if layers[0].Layer > maxLayer {
maxLayer = layers[0].Layer
}
} else {
totalFrames += stream.GetFrames()
totalBytes += stream.GetPrimaryBytes()
}
}
if hasLayers {
if stats, ok := layerStats[maxLayer]; ok {
return stats.Bytes, stats.Frames, maxLayer
} else {
return 0, 0, buffer.InvalidLayerSpatial
}
}
return totalBytes, totalFrames, maxLayer
return maxAvailableLayer, maxAvailableLayerStats
}

View File

@@ -3,7 +3,6 @@ package connectionquality
import (
"time"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/protocol/livekit"
"github.com/livekit/rtcscore-go/pkg/rtcmos"
)
@@ -19,30 +18,55 @@ func Score2Rating(score float32) livekit.ConnectionQuality {
return livekit.ConnectionQuality_POOR
}
func getBitRate(interval float64, totalBytes int64) int32 {
return int32(float64(totalBytes*8) / interval)
func getBitRate(interval float64, bytes uint64) int32 {
return int32(float64(bytes*8) / interval)
}
func getFrameRate(interval float64, totalFrames int64) int32 {
return int32(float64(totalFrames) / interval)
func getFrameRate(interval float64, frames uint32) int32 {
return int32(float64(frames) / interval)
}
func getLossPercentage(expected uint32, lost uint32) float32 {
if expected == 0 {
return 0.0
}
return float32(lost) * 100.0 / float32(expected)
}
func int32Ptr(x int32) *int32 {
return &x
}
func AudioConnectionScore(interval time.Duration, totalBytes int64,
qualityParam *buffer.ConnectionQualityParams, dtxDisabled bool) float32 {
type TrackScoreParams struct {
Duration time.Duration
Codec string
PacketsExpected uint32
PacketsLost uint32
Bytes uint64
Frames uint32
Jitter float64
Rtt uint32
DtxDisabled bool
ActualWidth uint32
ActualHeight uint32
ExpectedWidth uint32
ExpectedHeight uint32
}
stat := rtcmos.Stat{
Bitrate: getBitRate(interval.Seconds(), totalBytes),
PacketLoss: qualityParam.LossPercentage,
RoundTripTime: int32Ptr(int32(qualityParam.Rtt)),
BufferDelay: int32Ptr(int32(qualityParam.Jitter)),
AudioConfig: &rtcmos.AudioConfig{},
func getRtcMosStat(params TrackScoreParams) rtcmos.Stat {
return rtcmos.Stat{
Bitrate: getBitRate(params.Duration.Seconds(), params.Bytes),
PacketLoss: getLossPercentage(params.PacketsExpected, params.PacketsLost),
RoundTripTime: int32Ptr(int32(params.Rtt)),
BufferDelay: int32Ptr(int32(params.Jitter / 1000.0)),
}
}
if dtxDisabled {
func AudioTrackScore(params TrackScoreParams) float32 {
stat := getRtcMosStat(params)
stat.AudioConfig = &rtcmos.AudioConfig{}
if params.DtxDisabled {
flag := false
stat.AudioConfig.Dtx = &flag
}
@@ -54,22 +78,17 @@ func AudioConnectionScore(interval time.Duration, totalBytes int64,
return 0
}
func VideoConnectionScore(interval time.Duration, totalBytes int64, totalFrames int64, qualityParam *buffer.ConnectionQualityParams,
codec string, expectedHeight int32, expectedWidth int32, actualHeight int32, actualWidth int32) float32 {
stat := rtcmos.Stat{
Bitrate: getBitRate(interval.Seconds(), totalBytes),
PacketLoss: qualityParam.LossPercentage,
RoundTripTime: int32Ptr(int32(qualityParam.Rtt)),
BufferDelay: int32Ptr(int32(qualityParam.Jitter)),
VideoConfig: &rtcmos.VideoConfig{
FrameRate: int32Ptr(getFrameRate(interval.Seconds(), totalFrames)),
Codec: codec,
ExpectedHeight: &expectedHeight,
ExpectedWidth: &expectedWidth,
Height: &actualHeight,
Width: &actualWidth,
},
func VideoTrackScore(params TrackScoreParams) float32 {
stat := getRtcMosStat(params)
stat.VideoConfig = &rtcmos.VideoConfig{
FrameRate: int32Ptr(getFrameRate(params.Duration.Seconds(), params.Frames)),
Codec: params.Codec,
ExpectedWidth: int32Ptr(int32(params.ExpectedWidth)),
ExpectedHeight: int32Ptr(int32(params.ExpectedHeight)),
Width: int32Ptr(int32(params.ActualWidth)),
Height: int32Ptr(int32(params.ActualHeight)),
}
scores := rtcmos.Score([]rtcmos.Stat{stat})
if len(scores) == 1 {
return float32(scores[0].VideoScore)

View File

@@ -21,6 +21,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/utils"
)
// TrackSender defines an interface send media to remote peer
@@ -147,9 +148,8 @@ type DownTrack struct {
blankFramesGeneration atomic.Uint32
connectionStats *connectionquality.ConnectionStats
connectionQualitySnapshotId uint32
deltaStatsSnapshotId uint32
connectionStats *connectionquality.ConnectionStats
deltaStatsSnapshotId uint32
// Debug info
pktsDropped atomic.Uint32
@@ -222,22 +222,20 @@ func NewDownTrack(
d.forwarder = NewForwarder(d.kind, d.logger)
d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
CodecType: kind,
GetDeltaStats: d.getDeltaStats,
GetQualityParams: d.getQualityParams,
GetIsReducedQuality: func() bool {
return d.GetForwardingStatus() != ForwardingStatusOptimal
},
GetLayerDimension: func(quality int32) (uint32, uint32) {
CodecType: kind,
GetDeltaStats: d.getDeltaStats,
GetLayerDimension: func(layer int32) (uint32, uint32) {
if d.receiver != nil {
return d.receiver.GetLayerDimension(quality)
return d.receiver.GetLayerDimension(layer)
}
return 0, 0
},
// RAJA-TODO: Check muted, test quality when muted
// RAJA-TODO: this one needs more work as layers are muxed into one layer, maybe need to maitain RTP stats separately for each layer
GetMaxExpectedLayer: func() *livekit.VideoLayer {
quality := d.forwarder.MaxLayers().Spatial
width, height := d.receiver.GetLayerDimension(quality)
return &livekit.VideoLayer{Quality: livekit.VideoQuality(quality), Width: width, Height: height}
maxLayer := d.forwarder.MaxLayers().Spatial
width, height := d.receiver.GetLayerDimension(maxLayer)
return &livekit.VideoLayer{Quality: utils.QualityForSpatialLayer(maxLayer), Width: width, Height: height}
},
Logger: d.logger,
CodecName: getCodecNameFromMime(codecs[0].MimeType),
@@ -253,7 +251,6 @@ func NewDownTrack(
IsReceiverReportDriven: true,
Logger: d.logger,
})
d.connectionQualitySnapshotId = d.rtpStats.NewSnapshotId()
d.deltaStatsSnapshotId = d.rtpStats.NewSnapshotId()
return d, nil
@@ -1383,24 +1380,6 @@ func (d *DownTrack) GetTrackStats() *livekit.RTPStats {
return d.rtpStats.ToProto()
}
func (d *DownTrack) getQualityParams() *buffer.ConnectionQualityParams {
s := d.rtpStats.SnapshotInfo(d.connectionQualitySnapshotId)
if s == nil {
return nil
}
lossPercentage := float32(0.0)
if s.PacketsExpected != 0 {
lossPercentage = float32(s.PacketsLost) * 100.0 / float32(s.PacketsExpected)
}
return &buffer.ConnectionQualityParams{
LossPercentage: lossPercentage,
Jitter: float32(s.JitterMax / 1000.0),
Rtt: s.RttMax,
}
}
func (d *DownTrack) getDeltaStats() map[uint32]*buffer.StreamStatsWithLayers {
streamStats := make(map[uint32]*buffer.StreamStatsWithLayers, 1)
@@ -1409,16 +1388,11 @@ func (d *DownTrack) getDeltaStats() map[uint32]*buffer.StreamStatsWithLayers {
return nil
}
layers := make(map[int]buffer.LayerStats)
layers[0] = buffer.LayerStats{
Packets: deltaStats.Packets + deltaStats.PacketsDuplicate + deltaStats.PacketsPadding,
Bytes: deltaStats.Bytes + deltaStats.BytesDuplicate + deltaStats.BytesPadding,
Frames: deltaStats.Frames,
}
streamStats[d.ssrc] = &buffer.StreamStatsWithLayers{
RTPStats: deltaStats,
Layers: layers,
Layers: map[int32]*buffer.RTPDeltaInfo{
0: deltaStats,
},
}
return streamStats

View File

@@ -54,7 +54,7 @@ type TrackReceiver interface {
DebugInfo() map[string]interface{}
GetLayerDimension(quality int32) (uint32, uint32)
GetLayerDimension(layer int32) (uint32, uint32)
}
// WebRTCReceiver receives a media track
@@ -64,18 +64,19 @@ type WebRTCReceiver struct {
pliThrottleConfig config.PLIThrottleConfig
audioConfig config.AudioConfig
trackID livekit.TrackID
streamID string
kind webrtc.RTPCodecType
receiver *webrtc.RTPReceiver
codec webrtc.RTPCodecParameters
isSimulcast bool
isSVC bool
onCloseHandler func()
closeOnce sync.Once
closed atomic.Bool
useTrackers bool
TrackInfo *livekit.TrackInfo
trackID livekit.TrackID
streamID string
kind webrtc.RTPCodecType
receiver *webrtc.RTPReceiver
codec webrtc.RTPCodecParameters
isSimulcast bool
isSVC bool
onCloseHandler func()
closeOnce sync.Once
closed atomic.Bool
useTrackers bool
trackInfo *livekit.TrackInfo
maxPublishedLayer int32
rtcpCh chan []rtcp.Packet
@@ -109,8 +110,10 @@ func RidToLayer(rid string) int32 {
return 2
case HalfResolution:
return 1
default:
case QuarterResolution:
return 0
default:
return InvalidLayerSpatial
}
}
@@ -182,10 +185,19 @@ func NewWebRTCReceiver(
isSimulcast: len(track.RID()) > 0,
twcc: twcc,
streamTrackerManager: NewStreamTrackerManager(logger, trackInfo.Source),
TrackInfo: trackInfo,
trackInfo: trackInfo,
maxPublishedLayer: 0,
isSVC: IsSvcCodec(track.Codec().MimeType),
}
for _, layer := range w.trackInfo.Layers {
spatialLayer := utils.SpatialLayerForQuality(layer.Quality)
if spatialLayer > w.maxPublishedLayer {
w.maxPublishedLayer = spatialLayer
}
}
w.streamTrackerManager.SetMaxExpectedSpatialLayer(w.maxPublishedLayer)
w.streamTrackerManager.OnMaxLayerChanged(w.onMaxLayerChange)
w.streamTrackerManager.OnAvailableLayersChanged(w.downTrackLayerChange)
w.streamTrackerManager.OnBitrateAvailabilityChanged(w.downTrackBitrateAvailabilityChange)
@@ -200,39 +212,24 @@ func NewWebRTCReceiver(
})
w.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
CodecType: w.kind,
GetDeltaStats: w.getDeltaStats,
GetQualityParams: w.getQualityParams,
GetIsReducedQuality: func() bool {
return w.streamTrackerManager.IsReducedQuality()
},
GetLayerDimension: func(quality int32) (uint32, uint32) {
return w.GetLayerDimension(quality)
CodecType: w.kind,
GetDeltaStats: w.getDeltaStats,
GetLayerDimension: func(layer int32) (uint32, uint32) {
return w.GetLayerDimension(layer)
},
GetMaxExpectedLayer: func() *livekit.VideoLayer {
var expectedLayer *livekit.VideoLayer
var maxPublishedLayer *livekit.VideoLayer
// find min of <expected, published> layer
expectedQuality := w.streamTrackerManager.GetMaxExpectedLayer()
maxPublishedQuality := InvalidLayerSpatial
if w.TrackInfo != nil {
for _, layer := range w.TrackInfo.Layers {
if layer.Quality == livekit.VideoQuality_OFF {
continue
}
if expectedQuality == utils.SpatialLayerForQuality(layer.Quality) {
expectedLayer = proto.Clone(layer).(*livekit.VideoLayer)
}
if utils.SpatialLayerForQuality(layer.Quality) > maxPublishedQuality {
maxPublishedQuality = int32(layer.Quality)
maxPublishedLayer = proto.Clone(layer).(*livekit.VideoLayer)
}
maxExpectedSpatialLayer := w.streamTrackerManager.GetMaxExpectedSpatialLayer()
if maxExpectedSpatialLayer > w.maxPublishedLayer {
maxExpectedSpatialLayer = w.maxPublishedLayer
}
for _, layer := range w.trackInfo.Layers {
if maxExpectedSpatialLayer == utils.SpatialLayerForQuality(layer.Quality) {
return proto.Clone(layer).(*livekit.VideoLayer)
}
}
if expectedQuality < maxPublishedQuality {
return expectedLayer
}
return maxPublishedLayer
return nil
},
Logger: w.logger,
CodecName: getCodecNameFromMime(w.codec.MimeType),
@@ -247,15 +244,21 @@ func NewWebRTCReceiver(
return w
}
func (w *WebRTCReceiver) GetLayerDimension(quality int32) (uint32, uint32) {
func (w *WebRTCReceiver) GetLayerDimension(layer int32) (uint32, uint32) {
height := uint32(0)
width := uint32(0)
for _, layer := range w.TrackInfo.Layers {
if layer.Quality == livekit.VideoQuality(quality) {
height = layer.Height
width = layer.Width
break
if len(w.trackInfo.Layers) > 0 {
quality := utils.QualityForSpatialLayer(layer)
for _, layer := range w.trackInfo.Layers {
if layer.Quality == quality {
height = layer.Height
width = layer.Width
break
}
}
} else {
width = w.trackInfo.Width
height = w.trackInfo.Height
}
return width, height
}
@@ -333,6 +336,14 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff
}
layer := RidToLayer(track.RID())
if layer == InvalidLayerSpatial {
// check if there is only one layer and if so, assign it to that
if len(w.trackInfo.Layers) == 1 {
layer = utils.SpatialLayerForQuality(w.trackInfo.Layers[0].Quality)
} else {
layer = 0
}
}
buff.SetLogger(logger.Logger(logr.Logger(w.logger).WithValues("layer", layer)))
buff.SetTWCC(w.twcc)
buff.SetAudioLevelParams(audio.AudioLevelParams{
@@ -522,46 +533,6 @@ func (w *WebRTCReceiver) GetAudioLevel() (float64, bool) {
return 0, false
}
func (w *WebRTCReceiver) getQualityParams() *buffer.ConnectionQualityParams {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
packetsExpected := uint32(0)
packetsLost := uint32(0)
maxJitter := 0.0
maxRtt := uint32(0)
for _, buff := range w.buffers {
if buff == nil {
continue
}
q := buff.GetQualityInfo()
if q == nil {
continue
}
packetsExpected += q.PacketsExpected
packetsLost += q.PacketsLost
if q.JitterMax > maxJitter {
maxJitter = q.JitterMax
}
if q.RttMax > maxRtt {
maxRtt = q.RttMax
}
}
lossPercentage := float32(0.0)
if packetsExpected != 0 {
lossPercentage = float32(packetsLost) * 100.0 / float32(packetsExpected)
}
return &buffer.ConnectionQualityParams{
LossPercentage: lossPercentage,
Jitter: float32(maxJitter / 1000.0),
Rtt: maxRtt,
}
}
func (w *WebRTCReceiver) getDeltaStats() map[uint32]*buffer.StreamStatsWithLayers {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
@@ -578,12 +549,10 @@ func (w *WebRTCReceiver) getDeltaStats() map[uint32]*buffer.StreamStatsWithLayer
continue
}
// if simulcast, patch buffer stats with correct layer
if w.isSimulcast {
patched := make(map[int]buffer.LayerStats, 1)
patched[layer] = sswl.Layers[0]
sswl.Layers = patched
}
// patch buffer stats with correct layer
patched := make(map[int32]*buffer.RTPDeltaInfo, 1)
patched[int32(layer)] = sswl.Layers[0]
sswl.Layers = patched
deltaStats[w.SSRC(layer)] = sswl
}

View File

@@ -244,7 +244,7 @@ func (s *StreamTrackerManager) IsReducedQuality() bool {
return int32(len(s.availableLayers)) < (s.maxExpectedLayer + 1)
}
func (s *StreamTrackerManager) GetMaxExpectedLayer() int32 {
func (s *StreamTrackerManager) GetMaxExpectedSpatialLayer() int32 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.maxExpectedLayer

View File

@@ -1,6 +1,8 @@
package utils
import "github.com/livekit/protocol/livekit"
import (
"github.com/livekit/protocol/livekit"
)
func SpatialLayerForQuality(quality livekit.VideoQuality) int32 {
switch quality {
@@ -16,3 +18,16 @@ func SpatialLayerForQuality(quality livekit.VideoQuality) int32 {
return -1
}
}
func QualityForSpatialLayer(layer int32) livekit.VideoQuality {
switch layer {
case 0:
return livekit.VideoQuality_LOW
case 1:
return livekit.VideoQuality_MEDIUM
case 2:
return livekit.VideoQuality_HIGH
default:
return livekit.VideoQuality_OFF
}
}