Use windowing for NACK monitoring (#557)

* Use windowing for NACK monitoring

* Fix test
This commit is contained in:
Raja Subramanian
2022-03-23 13:42:29 +05:30
committed by GitHub
parent b98b828618
commit 757a59fbcd
2 changed files with 84 additions and 67 deletions
+34 -21
View File
@@ -2,7 +2,9 @@ package rtc
import (
"testing"
"time"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/protocol/livekit"
"github.com/stretchr/testify/require"
)
@@ -148,29 +150,34 @@ func TestSubscribedMaxQuality(t *testing.T) {
})
t.Run("subscribers max quality", func(t *testing.T) {
mt := NewMediaTrack(MediaTrackParams{TrackInfo: &livekit.TrackInfo{
Sid: "v1",
Type: livekit.TrackType_VIDEO,
Width: 1080,
Height: 720,
Layers: []*livekit.VideoLayer{
{
Quality: livekit.VideoQuality_LOW,
Width: 480,
Height: 270,
},
{
Quality: livekit.VideoQuality_MEDIUM,
Width: 960,
Height: 540,
},
{
Quality: livekit.VideoQuality_HIGH,
Width: 1080,
Height: 720,
mt := NewMediaTrack(MediaTrackParams{
TrackInfo: &livekit.TrackInfo{
Sid: "v1",
Type: livekit.TrackType_VIDEO,
Width: 1080,
Height: 720,
Layers: []*livekit.VideoLayer{
{
Quality: livekit.VideoQuality_LOW,
Width: 480,
Height: 270,
},
{
Quality: livekit.VideoQuality_MEDIUM,
Width: 960,
Height: 540,
},
{
Quality: livekit.VideoQuality_HIGH,
Width: 1080,
Height: 720,
},
},
},
}})
VideoConfig: config.VideoConfig{
SubscribedQualityUpdateThrottle: 100 * time.Millisecond,
},
})
actualTrackID := livekit.TrackID("")
actualSubscribedQualities := make([]*livekit.SubscribedQuality, 0)
@@ -194,6 +201,9 @@ func TestSubscribedMaxQuality(t *testing.T) {
// "s1" dropping to MEDIUM should disable HIGH layer
mt.notifySubscriberMaxQuality("s1", livekit.VideoQuality_MEDIUM)
// wait for throttle to kick in
time.Sleep(110 * time.Millisecond)
expectedSubscribedQualities = []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
@@ -206,6 +216,9 @@ func TestSubscribedMaxQuality(t *testing.T) {
mt.notifySubscriberMaxQuality("s1", livekit.VideoQuality_LOW)
mt.notifySubscriberMaxQuality("s2", livekit.VideoQuality_LOW)
// wait for throttle to kick in
time.Sleep(110 * time.Millisecond)
expectedSubscribedQualities = []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
+50 -46
View File
@@ -20,18 +20,6 @@ import (
const (
ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps
NumRequiredEstimatesNonProbe = 8
NumRequiredEstimatesProbe = 3
DownwardTrendThresholdNonProbe = -0.5
DownwardTrendThresholdProbe = 0.0
NackWindowDurationProbe = 0 * time.Second
NackWindowDurationNonProbe = 2 * time.Second
NackRatioThresholdNonProbe = 0.08
NackRatioThresholdProbe = 0.04
NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate
ProbeWaitBase = 5 * time.Second
@@ -51,6 +39,28 @@ const (
PriorityDefaultVideo = PriorityMin
)
var (
ChannelObserverParamsProbe = ChannelObserverParams{
Name: "probe",
EstimateRequiredSamples: 3,
EstimateDownwardTrendThreshold: 0.0,
EstimateCollapseValues: false,
NackWindowMinDuration: 500 * time.Millisecond,
NackWindowMaxDuration: 1 * time.Second,
NackRatioThreshold: 0.04,
}
ChannelObserverParamsNonProbe = ChannelObserverParams{
Name: "non-probe",
EstimateRequiredSamples: 8,
EstimateDownwardTrendThreshold: -0.5,
EstimateCollapseValues: true,
NackWindowMinDuration: 1 * time.Second,
NackWindowMaxDuration: 2 * time.Second,
NackRatioThreshold: 0.08,
}
)
type State int
const (
@@ -709,11 +719,8 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() {
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
switch reason {
case ChannelCongestionReasonLoss:
estimateToCommit = expectedBandwidthUsage
nackRatio = s.channelObserver.GetNackRatio()
if nackRatio > NackRatioThresholdNonProbe {
estimateToCommit = int64(float64(estimateToCommit) * (1.0 - NackRatioAttenuator*nackRatio))
}
estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - NackRatioAttenuator*nackRatio))
default:
estimateToCommit = s.lastReceivedEstimate
}
@@ -1033,27 +1040,11 @@ func (s *StreamAllocator) getNackDelta() (uint32, uint32) {
}
func (s *StreamAllocator) newChannelObserverProbe() *ChannelObserver {
return NewChannelObserver(ChannelObserverParams{
Name: "probe",
Logger: s.params.Logger,
EstimateRequiredSamples: NumRequiredEstimatesProbe,
EstimateDownwardTrendThreshold: DownwardTrendThresholdProbe,
EstimateCollapseValues: false,
NackWindowDuration: NackWindowDurationProbe,
NackRatioThreshold: NackRatioThresholdProbe,
})
return NewChannelObserver(ChannelObserverParamsProbe, s.params.Logger)
}
func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver {
return NewChannelObserver(ChannelObserverParams{
Name: "non-probe",
Logger: s.params.Logger,
EstimateRequiredSamples: NumRequiredEstimatesNonProbe,
EstimateDownwardTrendThreshold: DownwardTrendThresholdNonProbe,
EstimateCollapseValues: true,
NackWindowDuration: NackWindowDurationNonProbe,
NackRatioThreshold: NackRatioThresholdNonProbe,
})
return NewChannelObserver(ChannelObserverParamsNonProbe, s.params.Logger)
}
func (s *StreamAllocator) initProbe(probeRateBps int64) {
@@ -1528,16 +1519,17 @@ func (c ChannelCongestionReason) String() string {
type ChannelObserverParams struct {
Name string
Logger logger.Logger
EstimateRequiredSamples int
EstimateDownwardTrendThreshold float64
EstimateCollapseValues bool
NackWindowDuration time.Duration
NackWindowMinDuration time.Duration
NackWindowMaxDuration time.Duration
NackRatioThreshold float64
}
type ChannelObserver struct {
params ChannelObserverParams
logger logger.Logger
estimateTrend *TrendDetector
@@ -1546,17 +1538,17 @@ type ChannelObserver struct {
repeatedNacks uint32
}
func NewChannelObserver(params ChannelObserverParams) *ChannelObserver {
func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *ChannelObserver {
return &ChannelObserver{
params: params,
logger: logger,
estimateTrend: NewTrendDetector(TrendDetectorParams{
Name: params.Name + "-estimate",
Logger: params.Logger,
Logger: logger,
RequiredSamples: params.EstimateRequiredSamples,
DownwardTrendThreshold: params.EstimateDownwardTrendThreshold,
CollapseValues: params.EstimateCollapseValues,
}),
nackWindowStartTime: time.Now(),
}
}
@@ -1574,14 +1566,26 @@ func (c *ChannelObserver) AddEstimate(estimate int64) {
}
func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32) {
if c.params.NackWindowDuration != 0 && time.Since(c.nackWindowStartTime) > c.params.NackWindowDuration {
c.nackWindowStartTime = time.Now()
if c.params.NackWindowMaxDuration != 0 && !c.nackWindowStartTime.IsZero() && time.Since(c.nackWindowStartTime) > c.params.NackWindowMaxDuration {
c.nackWindowStartTime = time.Time{}
c.packets = 0
c.repeatedNacks = 0
}
c.packets += packets
c.repeatedNacks += repeatedNacks
//
// Start NACK monitoring window only when a repeated NACK happens.
// This allows locking tightly to when NACKs start happening and
// check if the NACKs keep adding up (potentially a sign of congestion)
// or isolated losses
//
if c.repeatedNacks == 0 && repeatedNacks != 0 {
c.nackWindowStartTime = time.Now()
}
if !c.nackWindowStartTime.IsZero() {
c.packets += packets
c.repeatedNacks += repeatedNacks
}
}
func (c *ChannelObserver) GetLowestEstimate() int64 {
@@ -1610,14 +1614,14 @@ func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) {
switch {
case estimateDirection == TrendDirectionDownward:
c.params.Logger.Debugw(
c.logger.Debugw(
"channel observer: estimate is trending downward",
"name", c.params.Name,
"estimate", c.estimateTrend.ToString(),
)
return ChannelTrendCongesting, ChannelCongestionReasonEstimate
case nackRatio > c.params.NackRatioThreshold:
c.params.Logger.Debugw(
case c.params.NackWindowMinDuration != 0 && !c.nackWindowStartTime.IsZero() && time.Since(c.nackWindowStartTime) > c.params.NackWindowMinDuration && nackRatio > c.params.NackRatioThreshold:
c.logger.Debugw(
"channel observer: high rate of repeated NACKs",
"name", c.params.Name,
"ratio", nackRatio,