From 076eb1c8ae5eedba1d187b123f4567969d3441fa Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 22 Mar 2022 22:23:22 +0530 Subject: [PATCH] Dampen stream allocator (#551) * WIP commit * WIP commit * WIP commit * format * NACK window * Remove layer when it is expected to stop * Remove debug --- pkg/sfu/buffer/rtpstats.go | 9 +- pkg/sfu/downtrack.go | 6 +- pkg/sfu/streamallocator.go | 307 +++++++++++++++++++----------- pkg/sfu/streamtrackermanager.go | 2 +- pkg/telemetry/prometheus/rooms.go | 6 +- 5 files changed, 210 insertions(+), 120 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 67df98436..01ab6d57c 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -264,11 +264,16 @@ func (r *RTPStats) GetTotalPackets() uint32 { return r.getNumPacketsSeen() + r.packetsDuplicate + r.packetsPadding } -func (r *RTPStats) GetTotalPacketsSansDuplicate() uint32 { +func (r *RTPStats) GetTotalPacketsPrimary() uint32 { r.lock.RLock() defer r.lock.RUnlock() - return r.getNumPacketsSeen() + r.packetsPadding + packetsSeen := r.getNumPacketsSeen() + if r.packetsPadding > packetsSeen { + return 0 + } + + return packetsSeen - r.packetsPadding } func (r *RTPStats) GetTotalBytes() uint64 { diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index eb1cb2fc1..240c42101 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1056,10 +1056,6 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { nackMisses := uint32(0) for _, meta := range d.sequencer.getPacketsMeta(filtered) { if meta.layer == int8(InvalidLayerSpatial) { - if meta.nacked > 1 { - numRepeatedNACKs++ - } - // padding packet, no RTX for those continue } @@ -1279,7 +1275,7 @@ func (d *DownTrack) getQualityParams() *buffer.ConnectionQualityParams { } func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint32) { - totalPackets = d.rtpStats.GetTotalPacketsSansDuplicate() + totalPackets = d.rtpStats.GetTotalPacketsPrimary() d.statsLock.RLock() totalRepeatedNACKs = d.totalRepeatedNACKs diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index c495a29d1..9a32a7729 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -23,7 +23,13 @@ const ( NumRequiredEstimatesNonProbe = 8 NumRequiredEstimatesProbe = 3 - NackRatioThresholdNonProbe = 0.06 + DownwardTrendThresholdNonProbe = -0.5 + DownwardTrendThresholdProbe = 0.0 + + NackWindowDurationProbe = 0 * time.Second + NackWindowDurationNonProbe = 2 * time.Second + + NackRatioThresholdNonProbe = 0.08 NackRatioThresholdProbe = 0.04 NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate @@ -145,7 +151,6 @@ type StreamAllocator struct { abortedProbeClusterId ProbeClusterId probeTrendObserved bool probeEndTime time.Time - probeChannelObserver *ChannelObserver prober *Prober @@ -167,9 +172,8 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { prober: NewProber(ProberParams{ Logger: params.Logger, }), - channelObserver: NewChannelObserver("non-probe", params.Logger, NumRequiredEstimatesNonProbe, NackRatioThresholdNonProbe), - videoTracks: make(map[livekit.TrackID]*Track), - eventCh: make(chan Event, 20), + videoTracks: make(map[livekit.TrackID]*Track), + eventCh: make(chan Event, 20), } s.resetState() @@ -248,7 +252,7 @@ func (s *StreamAllocator) SetTrackPriority(downTrack *DownTrack, priority uint8) } func (s *StreamAllocator) resetState() { - s.channelObserver.Reset() + s.channelObserver = s.newChannelObserverNonProbe() s.resetProbe() s.state = StateStable @@ -603,7 +607,7 @@ func (s *StreamAllocator) handleSignalProbeClusterDone(event *Event) { // ensure probe queue is flushed // LK-TODO: ProbeSettleWait should actually be a certain number of RTTs. - lowestEstimate := int64(math.Min(float64(s.committedChannelCapacity), float64(s.probeChannelObserver.GetLowestEstimate()))) + lowestEstimate := int64(math.Min(float64(s.committedChannelCapacity), float64(s.channelObserver.GetLowestEstimate()))) expectedDuration := float64(info.BytesSent*8*1000) / float64(lowestEstimate) queueTime := expectedDuration - float64(info.Duration.Milliseconds()) if queueTime < 0.0 { @@ -648,18 +652,23 @@ func (s *StreamAllocator) handleNewEstimate(receivedEstimate int64) { } func (s *StreamAllocator) handleNewEstimateInProbe() { - s.probeChannelObserver.AddEstimate(s.lastReceivedEstimate) - + // always update NACKs, even if aborted packetDelta, repeatedNackDelta := s.getNackDelta() - s.probeChannelObserver.AddNack(packetDelta, repeatedNackDelta) - trend := s.probeChannelObserver.GetTrend() + if s.abortedProbeClusterId != ProbeClusterIdInvalid { + // waiting for aborted probe to finalize + return + } + + s.channelObserver.AddEstimate(s.lastReceivedEstimate) + s.channelObserver.AddNack(packetDelta, repeatedNackDelta) + + trend, _ := s.channelObserver.GetTrend() if trend != ChannelTrendNeutral { s.probeTrendObserved = true } + switch { - case s.abortedProbeClusterId != ProbeClusterIdInvalid: - return case !s.probeTrendObserved && time.Since(s.lastProbeStartTime) > ProbeTrendWait: // // More of a safety net. @@ -672,13 +681,13 @@ func (s *StreamAllocator) handleNewEstimateInProbe() { // stop immediately if the probe is congesting channel more s.params.Logger.Debugw("probe: aborting, channel is congesting", "cluster", s.probeClusterId) s.abortProbe() - case s.probeChannelObserver.GetHighestEstimate() > s.probeGoalBps: + case s.channelObserver.GetHighestEstimate() > s.probeGoalBps: // reached goal, stop probing s.params.Logger.Debugw( "probe: stopping, goal reached", "cluster", s.probeClusterId, "goal", s.probeGoalBps, - "highest", s.probeChannelObserver.GetHighestEstimate(), + "highest", s.channelObserver.GetHighestEstimate(), ) s.stopProbe() } @@ -690,28 +699,38 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() { packetDelta, repeatedNackDelta := s.getNackDelta() s.channelObserver.AddNack(packetDelta, repeatedNackDelta) - trend := s.channelObserver.GetTrend() + trend, reason := s.channelObserver.GetTrend() if trend != ChannelTrendCongesting { return } - nackRatio := s.channelObserver.GetNackRatio() - lossAdjustedEstimate := s.lastReceivedEstimate - if nackRatio > NackRatioThresholdNonProbe { - lossAdjustedEstimate = int64(float64(lossAdjustedEstimate) * (1.0 - NackRatioAttenuator*nackRatio)) + var estimateToCommit int64 + var nackRatio float64 + expectedBandwidthUsage := s.getExpectedBandwidthUsage() + switch reason { + case ChannelCongestionReasonLoss: + estimateToCommit = expectedBandwidthUsage + nackRatio = s.channelObserver.GetNackRatio() + if nackRatio > NackRatioThresholdNonProbe { + estimateToCommit = int64(float64(estimateToCommit) * (1.0 - NackRatioAttenuator*nackRatio)) + } + default: + estimateToCommit = s.lastReceivedEstimate } s.params.Logger.Infow( "channel congestion detected, updating channel capacity", + "reason", reason, "old(bps)", s.committedChannelCapacity, - "new(bps)", lossAdjustedEstimate, + "new(bps)", estimateToCommit, "lastReceived(bps)", s.lastReceivedEstimate, + "expectedUsage(bps)", expectedBandwidthUsage, "nackRatio", nackRatio, ) - s.committedChannelCapacity = lossAdjustedEstimate + s.committedChannelCapacity = estimateToCommit // reset to get new set of samples for next trend - s.channelObserver.Reset() + s.channelObserver = s.newChannelObserverNonProbe() // reset probe to ensure it does not start too soon after a downward trend s.resetProbe() @@ -812,7 +831,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) { func (s *StreamAllocator) finalizeProbe() { aborted := s.probeClusterId == s.abortedProbeClusterId - highestEstimateInProbe := s.probeChannelObserver.GetHighestEstimate() + highestEstimateInProbe := s.channelObserver.GetHighestEstimate() s.clearProbe() @@ -827,7 +846,7 @@ func (s *StreamAllocator) finalizeProbe() { // NOTE: With TWCC, it is possible to reset bandwidth estimation to clean state as // the send side is in full control of bandwidth estimation. // - s.channelObserver.Reset() + s.channelObserver = s.newChannelObserverNonProbe() if aborted { // failed probe, backoff @@ -1013,6 +1032,30 @@ func (s *StreamAllocator) getNackDelta() (uint32, uint32) { return aggPacketDelta, aggRepeatedNackDelta } +func (s *StreamAllocator) newChannelObserverProbe() *ChannelObserver { + return NewChannelObserver(ChannelObserverParams{ + Name: "probe", + Logger: s.params.Logger, + EstimateRequiredSamples: NumRequiredEstimatesProbe, + EstimateDownwardTrendThreshold: DownwardTrendThresholdProbe, + EstimateCollapseValues: false, + NackWindowDuration: NackWindowDurationProbe, + NackRatioThreshold: NackRatioThresholdProbe, + }) +} + +func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver { + return NewChannelObserver(ChannelObserverParams{ + Name: "non-probe", + Logger: s.params.Logger, + EstimateRequiredSamples: NumRequiredEstimatesNonProbe, + EstimateDownwardTrendThreshold: DownwardTrendThresholdNonProbe, + EstimateCollapseValues: true, + NackWindowDuration: NackWindowDurationNonProbe, + NackRatioThreshold: NackRatioThresholdNonProbe, + }) +} + func (s *StreamAllocator) initProbe(probeRateBps int64) { s.lastProbeStartTime = time.Now() @@ -1025,8 +1068,8 @@ func (s *StreamAllocator) initProbe(probeRateBps int64) { s.probeEndTime = time.Time{} - s.probeChannelObserver = NewChannelObserver("probe", s.params.Logger, NumRequiredEstimatesProbe, NackRatioThresholdProbe) - s.probeChannelObserver.SeedEstimate(s.lastReceivedEstimate) + s.channelObserver = s.newChannelObserverProbe() + s.channelObserver.SeedEstimate(s.lastReceivedEstimate) desiredRateBps := int(probeRateBps) + int(math.Max(float64(s.committedChannelCapacity), float64(expectedBandwidthUsage))) s.probeClusterId = s.prober.AddCluster( @@ -1057,12 +1100,6 @@ func (s *StreamAllocator) resetProbe() { func (s *StreamAllocator) clearProbe() { s.probeClusterId = ProbeClusterIdInvalid s.abortedProbeClusterId = ProbeClusterIdInvalid - - s.probeTrendObserved = false - - s.probeEndTime = time.Time{} - - s.probeChannelObserver = nil } func (s *StreamAllocator) backoffProbeInterval() { @@ -1468,36 +1505,59 @@ func (c ChannelTrend) String() string { } } -type ChannelObserver struct { - name string - logger logger.Logger +type ChannelCongestionReason int - estimateTrend *TrendDetector +const ( + ChannelCongestionReasonNone ChannelCongestionReason = iota + ChannelCongestionReasonEstimate + ChannelCongestionReasonLoss +) - nackRatioThreshold float64 - packets uint32 - repeatedNacks uint32 -} - -func NewChannelObserver( - name string, - logger logger.Logger, - estimateRequiredSamples int, - nackRatioThreshold float64, -) *ChannelObserver { - return &ChannelObserver{ - name: name, - logger: logger, - estimateTrend: NewTrendDetector(name+"-estimate", logger, estimateRequiredSamples), - nackRatioThreshold: nackRatioThreshold, +func (c ChannelCongestionReason) String() string { + switch c { + case ChannelCongestionReasonNone: + return "NONE" + case ChannelCongestionReasonEstimate: + return "ESTIMATE" + case ChannelCongestionReasonLoss: + return "LOSS" + default: + return fmt.Sprintf("%d", int(c)) } } -func (c *ChannelObserver) Reset() { - c.estimateTrend.Reset() +type ChannelObserverParams struct { + Name string + Logger logger.Logger + EstimateRequiredSamples int + EstimateDownwardTrendThreshold float64 + EstimateCollapseValues bool + NackWindowDuration time.Duration + NackRatioThreshold float64 +} - c.packets = 0 - c.repeatedNacks = 0 +type ChannelObserver struct { + params ChannelObserverParams + + estimateTrend *TrendDetector + + nackWindowStartTime time.Time + packets uint32 + repeatedNacks uint32 +} + +func NewChannelObserver(params ChannelObserverParams) *ChannelObserver { + return &ChannelObserver{ + params: params, + estimateTrend: NewTrendDetector(TrendDetectorParams{ + Name: params.Name + "-estimate", + Logger: params.Logger, + RequiredSamples: params.EstimateRequiredSamples, + DownwardTrendThreshold: params.EstimateDownwardTrendThreshold, + CollapseValues: params.EstimateCollapseValues, + }), + nackWindowStartTime: time.Now(), + } } func (c *ChannelObserver) SeedEstimate(estimate int64) { @@ -1514,6 +1574,12 @@ func (c *ChannelObserver) AddEstimate(estimate int64) { } func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32) { + if c.params.NackWindowDuration != 0 && time.Since(c.nackWindowStartTime) > c.params.NackWindowDuration { + c.nackWindowStartTime = time.Now() + c.packets = 0 + c.repeatedNacks = 0 + } + c.packets += packets c.repeatedNacks += repeatedNacks } @@ -1538,27 +1604,30 @@ func (c *ChannelObserver) GetNackRatio() float64 { return ratio } -func (c *ChannelObserver) GetTrend() ChannelTrend { +func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) { estimateDirection := c.estimateTrend.GetDirection() nackRatio := c.GetNackRatio() switch { case estimateDirection == TrendDirectionDownward: - c.logger.Debugw( + c.params.Logger.Debugw( "channel observer: estimate is trending downward", - "lowest", c.estimateTrend.GetLowest(), - "highest", c.estimateTrend.GetHighest(), - "estimates", c.estimateTrend.GetValues(), + "name", c.params.Name, + "estimate", c.estimateTrend.ToString(), ) - return ChannelTrendCongesting - case nackRatio > c.nackRatioThreshold: - c.logger.Debugw("channel observer: high rate of repeated NACKs", "ratio", nackRatio) - return ChannelTrendCongesting + return ChannelTrendCongesting, ChannelCongestionReasonEstimate + case nackRatio > c.params.NackRatioThreshold: + c.params.Logger.Debugw( + "channel observer: high rate of repeated NACKs", + "name", c.params.Name, + "ratio", nackRatio, + ) + return ChannelTrendCongesting, ChannelCongestionReasonLoss case estimateDirection == TrendDirectionUpward: - return ChannelTrendClearing + return ChannelTrendClearing, ChannelCongestionReasonNone } - return ChannelTrendNeutral + return ChannelTrendNeutral, ChannelCongestionReasonNone } // ------------------------------------------------ @@ -1584,33 +1653,34 @@ func (t TrendDirection) String() string { } } -type TrendDetector struct { - name string - logger logger.Logger - requiredSamples int +type TrendDetectorParams struct { + Name string + Logger logger.Logger + RequiredSamples int + DownwardTrendThreshold float64 + CollapseValues bool +} +type TrendDetector struct { + params TrendDetectorParams + + startTime time.Time + numSamples int values []int64 - lowestvalue int64 - highestvalue int64 + lowestValue int64 + highestValue int64 direction TrendDirection } -func NewTrendDetector(name string, logger logger.Logger, requiredSamples int) *TrendDetector { +func NewTrendDetector(params TrendDetectorParams) *TrendDetector { return &TrendDetector{ - name: name, - logger: logger, - requiredSamples: requiredSamples, - direction: TrendDirectionNeutral, + params: params, + startTime: time.Now(), + direction: TrendDirectionNeutral, } } -func (t *TrendDetector) Reset() { - t.values = nil - t.lowestvalue = int64(0) - t.highestvalue = int64(0) -} - func (t *TrendDetector) Seed(value int64) { if len(t.values) != 0 { return @@ -1620,14 +1690,20 @@ func (t *TrendDetector) Seed(value int64) { } func (t *TrendDetector) AddValue(value int64) { - if t.lowestvalue == 0 || value < t.lowestvalue { - t.lowestvalue = value + t.numSamples++ + if t.lowestValue == 0 || value < t.lowestValue { + t.lowestValue = value } - if value > t.highestvalue { - t.highestvalue = value + if value > t.highestValue { + t.highestValue = value } - if len(t.values) == t.requiredSamples { + // ignore duplicate values + if t.params.CollapseValues && len(t.values) != 0 && t.values[len(t.values)-1] == value { + return + } + + if len(t.values) == t.params.RequiredSamples { t.values = t.values[1:] } t.values = append(t.values, value) @@ -1636,11 +1712,11 @@ func (t *TrendDetector) AddValue(value int64) { } func (t *TrendDetector) GetLowest() int64 { - return t.lowestvalue + return t.lowestValue } func (t *TrendDetector) GetHighest() int64 { - return t.highestvalue + return t.highestValue } func (t *TrendDetector) GetValues() []int64 { @@ -1651,39 +1727,52 @@ func (t *TrendDetector) GetDirection() TrendDirection { return t.direction } +func (t *TrendDetector) ToString() string { + now := time.Now() + elapsed := now.Sub(t.startTime).Seconds() + str := fmt.Sprintf("n: %s", t.params.Name) + str += fmt.Sprintf(", t: %+v|%+v|%.2fs", t.startTime.Format(time.UnixDate), now.Format(time.UnixDate), elapsed) + str += fmt.Sprintf(", v: %d|%d|%d|%+v|%.2f", t.numSamples, t.lowestValue, t.highestValue, t.values, kendallsTau(t.values)) + return str +} + func (t *TrendDetector) updateDirection() { - if len(t.values) < t.requiredSamples { + if len(t.values) < t.params.RequiredSamples { t.direction = TrendDirectionNeutral return } // using Kendall's Tau to find trend + kt := kendallsTau(t.values) + + t.direction = TrendDirectionNeutral + switch { + case kt > 0: + t.direction = TrendDirectionUpward + case kt < t.params.DownwardTrendThreshold: + t.direction = TrendDirectionDownward + } +} + +// ------------------------------------------------ + +func kendallsTau(values []int64) float64 { concordantPairs := 0 discordantPairs := 0 - for i := 0; i < len(t.values)-1; i++ { - for j := i + 1; j < len(t.values); j++ { - if t.values[i] < t.values[j] { + for i := 0; i < len(values)-1; i++ { + for j := i + 1; j < len(values); j++ { + if values[i] < values[j] { concordantPairs++ - } else if t.values[i] > t.values[j] { + } else if values[i] > values[j] { discordantPairs++ } } } if (concordantPairs + discordantPairs) == 0 { - t.direction = TrendDirectionNeutral - return + return 0.0 } - t.direction = TrendDirectionNeutral - kt := (float64(concordantPairs) - float64(discordantPairs)) / (float64(concordantPairs) + float64(discordantPairs)) - switch { - case kt > 0: - t.direction = TrendDirectionUpward - case kt < 0: - t.direction = TrendDirectionDownward - } + return (float64(concordantPairs) - float64(discordantPairs)) / (float64(concordantPairs) + float64(discordantPairs)) } - -// ------------------------------------------------ diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 695959389..759e45eb1 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -109,7 +109,7 @@ func (s *StreamTrackerManager) AddTracker(layer int32) { break } } - if !exempt { + if !exempt || layer > s.maxExpectedLayer { s.removeAvailableLayer(layer) } else { s.logger.Debugw("not removing exempt layer", "layer", layer) diff --git a/pkg/telemetry/prometheus/rooms.go b/pkg/telemetry/prometheus/rooms.go index 2f1b46131..b41066f3c 100644 --- a/pkg/telemetry/prometheus/rooms.go +++ b/pkg/telemetry/prometheus/rooms.go @@ -22,9 +22,9 @@ var ( func initRoomStats(nodeID string) { promRoomTotal = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: livekitNamespace, - Subsystem: "room", - Name: "total", + Namespace: livekitNamespace, + Subsystem: "room", + Name: "total", ConstLabels: prometheus.Labels{"node_id": nodeID}, }) promRoomDuration = prometheus.NewHistogram(prometheus.HistogramOpts{