diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 2b7fd26ca..bb16917bc 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -1018,9 +1018,6 @@ func (t *PCTransport) Close() { if t.pacer != nil { t.pacer.Stop() } - if t.bwe != nil { - t.bwe.Stop() - } _ = t.pc.Close() diff --git a/pkg/sfu/bwe/bwe.go b/pkg/sfu/bwe/bwe.go index e0b5addd4..a47f05bf4 100644 --- a/pkg/sfu/bwe/bwe.go +++ b/pkg/sfu/bwe/bwe.go @@ -65,8 +65,6 @@ type BWE interface { Reset() - Stop() - HandleREMB( receivedEstimate int64, expectedBandwidthUsage int64, diff --git a/pkg/sfu/bwe/null_bwe.go b/pkg/sfu/bwe/null_bwe.go index 3ef5f4787..e0242edcf 100644 --- a/pkg/sfu/bwe/null_bwe.go +++ b/pkg/sfu/bwe/null_bwe.go @@ -28,8 +28,6 @@ func (n *NullBWE) SetBWEListener(_bweListener BWEListener) {} func (n *NullBWE) Reset() {} -func (n *NullBWE) Stop() {} - func (n *NullBWE) RecordPacketSendAndGetSequenceNumber( _atMicro int64, _size int, diff --git a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go index a894313d8..0f16515c5 100644 --- a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go +++ b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go @@ -18,8 +18,6 @@ import ( "sync" "time" - "github.com/frostbyte73/core" - "github.com/gammazero/deque" "github.com/livekit/livekit-server/pkg/sfu/bwe" "github.com/livekit/livekit-server/pkg/sfu/ccutils" "github.com/livekit/protocol/logger" @@ -40,22 +38,22 @@ func (c CongestionSignalConfig) IsTriggered(numGroups int, duration int64) bool } var ( - DefaultQueuingDelayEarlyWarningCongestionSignalConfig = CongestionSignalConfig{ + defaultQueuingDelayEarlyWarningCongestionSignalConfig = CongestionSignalConfig{ MinNumberOfGroups: 1, MinDuration: 100 * time.Millisecond, } - DefaultLossEarlyWarningCongestionSignalConfig = CongestionSignalConfig{ + defaultLossEarlyWarningCongestionSignalConfig = CongestionSignalConfig{ MinNumberOfGroups: 2, MinDuration: 200 * time.Millisecond, } - DefaultQueuingDelayCongestedCongestionSignalConfig = CongestionSignalConfig{ + defaultQueuingDelayCongestedCongestionSignalConfig = CongestionSignalConfig{ MinNumberOfGroups: 3, MinDuration: 300 * time.Millisecond, } - DefaultLossCongestedCongestionSignalConfig = CongestionSignalConfig{ + defaultLossCongestedCongestionSignalConfig = CongestionSignalConfig{ MinNumberOfGroups: 5, MinDuration: 600 * time.Millisecond, } @@ -101,7 +99,7 @@ func (p ProbeSignalConfig) ProbeSignal(ppg *probePacketGroup) (ccutils.ProbeSign } var ( - DefaultProbeSignalConfig = ProbeSignalConfig{ + defaultProbeSignalConfig = ProbeSignalConfig{ MinBytesRatio: 0.5, MinDurationRatio: 0.5, @@ -121,10 +119,11 @@ type qdMeasurement struct { jqrMin int64 dqrMax int64 - numGroups int - minSendTime int64 - maxSendTime int64 - isSealed bool + numGroups int + smallestGroupIdx int + minSendTime int64 + maxSendTime int64 + isSealed bool } func newQdMeasurement( @@ -141,11 +140,15 @@ func newQdMeasurement( } } -func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup) { +func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) { if q.isSealed { return } + if q.smallestGroupIdx == 0 || q.smallestGroupIdx > groupIdx { + q.smallestGroupIdx = groupIdx + } + pqd, pqdOk := pg.FinalizedPropagatedQueuingDelay() if !pqdOk { return @@ -178,6 +181,10 @@ func (q *qdMeasurement) IsSealed() bool { return q.isSealed } +func (q *qdMeasurement) SmallestGroupIdx() int { + return q.smallestGroupIdx +} + func (q *qdMeasurement) IsEarlyWarningTriggered() bool { return q.earlyWarningConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) } @@ -192,6 +199,7 @@ func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { } e.AddInt("numGroups", q.numGroups) + e.AddInt("smallestGroupIdx", q.smallestGroupIdx) e.AddInt64("minSendTime", q.minSendTime) e.AddInt64("maxSendTime", q.maxSendTime) e.AddDuration("duration", time.Duration((q.maxSendTime-q.minSendTime)*1000)) @@ -208,11 +216,13 @@ type lossMeasurement struct { congestedConfig CongestionSignalConfig congestionMinLoss float64 - numGroups int - ts *trafficStats + numGroups int + smallestGroupIdx int + ts *trafficStats - earlyWarningWeightedLoss float64 - congestedWeightedLoss float64 + earlyWarningWeightedLoss float64 + earlyWarningWeightedLossDone bool + congestedWeightedLoss float64 isSealed bool } @@ -235,17 +245,22 @@ func newLossMeasurement( } } -func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup) { - if l.isSealed { +func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) { + if l.isSealed || !pg.IsFinalized() { return } + if l.smallestGroupIdx == 0 || l.smallestGroupIdx > groupIdx { + l.smallestGroupIdx = groupIdx + } + l.numGroups++ l.ts.Merge(pg.Traffic()) duration := l.ts.Duration() - if l.earlyWarningConfig.IsTriggered(l.numGroups, duration) { + if l.earlyWarningConfig.IsTriggered(l.numGroups, duration) && !l.earlyWarningWeightedLossDone { l.earlyWarningWeightedLoss = l.ts.WeightedLoss() + l.earlyWarningWeightedLossDone = true } if l.congestedConfig.IsTriggered(l.numGroups, duration) { l.congestedWeightedLoss = l.ts.WeightedLoss() @@ -257,6 +272,10 @@ func (l *lossMeasurement) IsSealed() bool { return l.isSealed } +func (l *lossMeasurement) SmallestGroupIdx() int { + return l.smallestGroupIdx +} + func (l *lossMeasurement) IsEarlyWarningTriggered() bool { return l.earlyWarningWeightedLoss > l.congestionMinLoss } @@ -271,6 +290,7 @@ func (l *lossMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { } e.AddInt("numGroups", l.numGroups) + e.AddInt("smallestGroupIdx", l.smallestGroupIdx) e.AddObject("ts", l.ts) e.AddFloat64("earlyWarningWeightedLoss", l.earlyWarningWeightedLoss) e.AddFloat64("congestedWeightedLoss", l.congestedWeightedLoss) @@ -304,32 +324,33 @@ type CongestionDetectorConfig struct { LossCongested CongestionSignalConfig `yaml:"loss_congested,omitempty"` CongestedHangover time.Duration `yaml:"congested_hangover,omitempty"` - RateMeasurementWindowDurationMin time.Duration `yaml:"rate_measurement_window_duration_min,omitempty"` - RateMeasurementWindowDurationMax time.Duration `yaml:"rate_measurement_window_duration_max,omitempty"` - - PeriodicCheckInterval time.Duration `yaml:"periodic_check_interval,omitempty"` - PeriodicCheckIntervalCongested time.Duration `yaml:"periodic_check_interval_congested,omitempty"` - CongestedCTRTrend ccutils.TrendDetectorConfig `yaml:"congested_ctr_trend,omitempty"` - CongestedCTREpsilon float64 `yaml:"congested_ctr_epsilon,omitempty"` + CongestedCTRTrend ccutils.TrendDetectorConfig `yaml:"congested_ctr_trend,omitempty"` + CongestedCTREpsilon float64 `yaml:"congested_ctr_epsilon,omitempty"` + CongestedPacketGroup PacketGroupConfig `yaml:"congested_packet_group,omitempty"` } var ( defaultTrendDetectorConfigCongestedCTR = ccutils.TrendDetectorConfig{ - RequiredSamples: 5, + RequiredSamples: 4, RequiredSamplesMin: 2, DownwardTrendThreshold: -0.5, - DownwardTrendMaxWait: 5 * time.Second, + DownwardTrendMaxWait: 2 * time.Second, CollapseThreshold: 500 * time.Millisecond, ValidityWindow: 10 * time.Second, } - DefaultCongestionDetectorConfig = CongestionDetectorConfig{ - PacketGroup: DefaultPacketGroupConfig, - PacketGroupMaxAge: 15 * time.Second, + defaultCongestedPacketGroupConfig = PacketGroupConfig{ + MinPackets: 20, + MaxWindowDuration: 150 * time.Millisecond, + } - ProbePacketGroup: DefaultPacketGroupConfigProbe, + defaultCongestionDetectorConfig = CongestionDetectorConfig{ + PacketGroup: defaultPacketGroupConfig, + PacketGroupMaxAge: 10 * time.Second, + + ProbePacketGroup: defaultProbePacketGroupConfig, ProbeRegulator: ccutils.DefaultProbeRegulatorConfig, - ProbeSignal: DefaultProbeSignalConfig, + ProbeSignal: defaultProbeSignalConfig, JQRMinDelay: 15 * time.Millisecond, DQRMaxDelay: 5 * time.Millisecond, @@ -337,31 +358,22 @@ var ( WeightedLoss: defaultWeightedLossConfig, CongestionMinLoss: 0.25, - QueuingDelayEarlyWarning: DefaultQueuingDelayEarlyWarningCongestionSignalConfig, - LossEarlyWarning: DefaultLossEarlyWarningCongestionSignalConfig, + QueuingDelayEarlyWarning: defaultQueuingDelayEarlyWarningCongestionSignalConfig, + LossEarlyWarning: defaultLossEarlyWarningCongestionSignalConfig, EarlyWarningHangover: 500 * time.Millisecond, - QueuingDelayCongested: DefaultQueuingDelayCongestedCongestionSignalConfig, - LossCongested: DefaultLossCongestedCongestionSignalConfig, + QueuingDelayCongested: defaultQueuingDelayCongestedCongestionSignalConfig, + LossCongested: defaultLossCongestedCongestionSignalConfig, CongestedHangover: 3 * time.Second, - RateMeasurementWindowDurationMin: 800 * time.Millisecond, - RateMeasurementWindowDurationMax: 2 * time.Second, - - PeriodicCheckInterval: 2 * time.Second, - PeriodicCheckIntervalCongested: 200 * time.Millisecond, - CongestedCTRTrend: defaultTrendDetectorConfigCongestedCTR, - CongestedCTREpsilon: 0.05, + CongestedCTRTrend: defaultTrendDetectorConfigCongestedCTR, + CongestedCTREpsilon: 0.05, + CongestedPacketGroup: defaultCongestedPacketGroupConfig, } ) // ------------------------------------------------------------------------------- -type feedbackReport struct { - at time.Time - report *rtcp.TransportLayerCC -} - type congestionDetectorParams struct { Config CongestionDetectorConfig Logger logger.Logger @@ -370,8 +382,7 @@ type congestionDetectorParams struct { type congestionDetector struct { params congestionDetectorParams - lock sync.RWMutex - feedbackReports deque.Deque[feedbackReport] + lock sync.Mutex rtt float64 @@ -383,14 +394,13 @@ type congestionDetector struct { probePacketGroup *probePacketGroup probeRegulator *ccutils.ProbeRegulator - wake chan struct{} - stop core.Fuse - estimatedAvailableChannelCapacity int64 congestionState bwe.CongestionState congestionStateSwitchedAt time.Time - congestedCTRTrend *ccutils.TrendDetector[float64] - congestedTrafficStats *trafficStats + + congestedCTRTrend *ccutils.TrendDetector[float64] + congestedTrafficStats *trafficStats + congestedPacketGroup *packetGroup bweListener bwe.BWEListener } @@ -398,27 +408,29 @@ type congestionDetector struct { func newCongestionDetector(params congestionDetectorParams) *congestionDetector { c := &congestionDetector{ params: params, - rtt: bwe.DefaultRTT, packetTracker: newPacketTracker(packetTrackerParams{Logger: params.Logger}), twccFeedback: newTWCCFeedback(twccFeedbackParams{Logger: params.Logger}), - probeRegulator: ccutils.NewProbeRegulator(ccutils.ProbeRegulatorParams{ - Config: params.Config.ProbeRegulator, - Logger: params.Logger, - }), - wake: make(chan struct{}, 1), - estimatedAvailableChannelCapacity: 100_000_000, - congestionState: bwe.CongestionStateNone, - congestionStateSwitchedAt: mono.Now(), } + c.Reset() - c.feedbackReports.SetMinCapacity(3) - - go c.worker() return c } -func (c *congestionDetector) Stop() { - c.stop.Break() +func (c *congestionDetector) Reset() { + c.lock.Lock() + defer c.lock.Unlock() + + c.rtt = bwe.DefaultRTT + c.packetGroups = nil + c.probePacketGroup = nil + c.probeRegulator = ccutils.NewProbeRegulator(ccutils.ProbeRegulatorParams{ + Config: c.params.Config.ProbeRegulator, + Logger: c.params.Logger, + }) + c.estimatedAvailableChannelCapacity = 100_000_000 + c.congestionState = bwe.CongestionStateNone + c.congestionStateSwitchedAt = mono.Now() + c.clearCTRTrend() } func (c *congestionDetector) SetBWEListener(bweListener bwe.BWEListener) { @@ -429,25 +441,163 @@ func (c *congestionDetector) SetBWEListener(bweListener bwe.BWEListener) { } func (c *congestionDetector) getBWEListener() bwe.BWEListener { - c.lock.RLock() - defer c.lock.RUnlock() + c.lock.Lock() + defer c.lock.Unlock() return c.bweListener } func (c *congestionDetector) HandleTWCCFeedback(report *rtcp.TransportLayerCC) { c.lock.Lock() - c.feedbackReports.PushBack(feedbackReport{mono.Now(), report}) + recvRefTime, isOutOfOrder := c.twccFeedback.ProcessReport(report, mono.Now()) + if isOutOfOrder { + c.params.Logger.Infow("send side bwe: received out-of-order feedback report") + } + + if len(c.packetGroups) == 0 { + c.packetGroups = append( + c.packetGroups, + newPacketGroup( + packetGroupParams{ + Config: c.params.Config.PacketGroup, + WeightedLoss: c.params.Config.WeightedLoss, + Logger: c.params.Logger, + }, + 0, + ), + ) + } + + pg := c.packetGroups[len(c.packetGroups)-1] + trackPacketGroup := func(pi *packetInfo, sendDelta, recvDelta int64, isLost bool) { + if pi == nil { + return + } + + c.updateCTRTrend(pi, sendDelta, recvDelta, isLost) + + if c.probePacketGroup != nil { + c.probePacketGroup.Add(pi, sendDelta, recvDelta, isLost) + } + + err := pg.Add(pi, sendDelta, recvDelta, isLost) + if err == nil { + return + } + + if err == errGroupFinalized { + // previous group ended, start a new group + pg = newPacketGroup( + packetGroupParams{ + Config: c.params.Config.PacketGroup, + WeightedLoss: c.params.Config.WeightedLoss, + Logger: c.params.Logger, + }, + pg.PropagatedQueuingDelay(), + ) + c.packetGroups = append(c.packetGroups, pg) + + if err = pg.Add(pi, sendDelta, recvDelta, isLost); err != nil { + c.params.Logger.Warnw("send side bwe: could not add packet to new packet group", err, "packetInfo", pi, "packetGroup", pg) + } + return + } + + // try an older group + for idx := len(c.packetGroups) - 2; idx >= 0; idx-- { + opg := c.packetGroups[idx] + if err := opg.Add(pi, sendDelta, recvDelta, isLost); err == nil { + return + } else if err == errGroupFinalized { + c.params.Logger.Infow("send side bwe: unexpected finalized group", "packetInfo", pi, "packetGroup", opg) + } + } + } + + // 1. go through the TWCC feedback report and record recive time as reported by remote + // 2. process acknowledged packet and group them + // + // losses are not recorded if a feedback report is completely lost. + // RFC recommends treating lost reports by ignoring packets that would have been in it. + // ----------------------------------------------------------------------------------- + // | From a congestion control perspective, lost feedback messages are | + // | handled by ignoring packets which would have been reported as lost or | + // | received in the lost feedback messages. This behavior is similar to | + // | how a lost RTCP receiver report is handled. | + // ----------------------------------------------------------------------------------- + // Reference: https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01#page-4 + sequenceNumber := report.BaseSequenceNumber + endSequenceNumberExclusive := sequenceNumber + report.PacketStatusCount + deltaIdx := 0 + for _, chunk := range report.PacketChunks { + if sequenceNumber == endSequenceNumberExclusive { + break + } + + switch chunk := chunk.(type) { + case *rtcp.RunLengthChunk: + for i := uint16(0); i < chunk.RunLength; i++ { + if sequenceNumber == endSequenceNumberExclusive { + break + } + + recvTime := int64(0) + isLost := false + if chunk.PacketStatusSymbol != rtcp.TypeTCCPacketNotReceived { + recvRefTime += report.RecvDeltas[deltaIdx].Delta + deltaIdx++ + + recvTime = recvRefTime + } else { + isLost = true + } + pi, sendDelta, recvDelta := c.packetTracker.RecordPacketIndicationFromRemote(sequenceNumber, recvTime) + if pi.sendTime != 0 { + trackPacketGroup(&pi, sendDelta, recvDelta, isLost) + } + sequenceNumber++ + } + + case *rtcp.StatusVectorChunk: + for _, symbol := range chunk.SymbolList { + if sequenceNumber == endSequenceNumberExclusive { + break + } + + recvTime := int64(0) + isLost := false + if symbol != rtcp.TypeTCCPacketNotReceived { + recvRefTime += report.RecvDeltas[deltaIdx].Delta + deltaIdx++ + + recvTime = recvRefTime + } else { + isLost = true + } + pi, sendDelta, recvDelta := c.packetTracker.RecordPacketIndicationFromRemote(sequenceNumber, recvTime) + if pi.sendTime != 0 { + trackPacketGroup(&pi, sendDelta, recvDelta, isLost) + } + sequenceNumber++ + } + } + } + + c.prunePacketGroups() + shouldNotify, state, committedChannelCapacity := c.congestionDetectionStateMachine() c.lock.Unlock() - // notify worker of a new feedback - select { - case c.wake <- struct{}{}: - default: + if shouldNotify { + if bweListener := c.getBWEListener(); bweListener != nil { + bweListener.OnCongestionStateChange(state, committedChannelCapacity) + } } } func (c *congestionDetector) UpdateRTT(rtt float64) { + c.lock.Lock() + defer c.lock.Unlock() + if rtt == 0 { c.rtt = bwe.DefaultRTT } else { @@ -566,23 +716,33 @@ func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool, var idx int for idx = len(c.packetGroups) - 1; idx >= 0; idx-- { pg := c.packetGroups[idx] - qdMeasurement.ProcessPacketGroup(pg) - lossMeasurement.ProcessPacketGroup(pg) + qdMeasurement.ProcessPacketGroup(pg, idx) + lossMeasurement.ProcessPacketGroup(pg, idx) // if both measurements have enough data to make a decision, stop processing groups if qdMeasurement.IsSealed() && lossMeasurement.IsSealed() { break } + + // if "congested" triggered, can stop as that is the longer duration check and also + // the worst case check, i. e. if "congested" is triggered due to any condition, + // there can be nothing else that can trigger + if qdMeasurement.IsCongestedTriggered() || lossMeasurement.IsCongestedTriggered() { + break + } } + oldestContributingGroup := max(0, idx) earlyWarningReason := "" earlyWarningTriggered := qdMeasurement.IsEarlyWarningTriggered() if earlyWarningTriggered { earlyWarningReason = "queuing-delay" + oldestContributingGroup = qdMeasurement.SmallestGroupIdx() } else { earlyWarningTriggered = lossMeasurement.IsEarlyWarningTriggered() if earlyWarningTriggered { earlyWarningReason = "loss" + oldestContributingGroup = lossMeasurement.SmallestGroupIdx() } } @@ -590,17 +750,19 @@ func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool, congestedTriggered := qdMeasurement.IsCongestedTriggered() if congestedTriggered { congestedReason = "queuing-delay" + oldestContributingGroup = qdMeasurement.SmallestGroupIdx() } else { congestedTriggered = lossMeasurement.IsCongestedTriggered() if congestedTriggered { congestedReason = "loss" + oldestContributingGroup = lossMeasurement.SmallestGroupIdx() } } - return earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, max(0, idx) + return earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, oldestContributingGroup } -func (c *congestionDetector) congestionDetectionStateMachine() { +func (c *congestionDetector) congestionDetectionStateMachine() (bool, bwe.CongestionState, int64) { state := c.congestionState newState := c.congestionState reason := "" @@ -653,12 +815,39 @@ func (c *congestionDetector) congestionDetectionStateMachine() { } } - c.estimateAvailableChannelCapacity() + c.estimateAvailableChannelCapacity(oldestContributingGroup) // update after running the above estimate as state change callback includes the estimated available channel capacity + shouldNotify := false if newState != state { c.updateCongestionState(newState, reason, oldestContributingGroup) + shouldNotify = true } + + if c.congestedCTRTrend != nil && c.congestedCTRTrend.GetDirection() == ccutils.TrendDirectionDownward { + shouldNotify = true + + congestedAckedBitrate := c.congestedTrafficStats.AcknowledgedBitrate() + if congestedAckedBitrate < c.estimatedAvailableChannelCapacity { + c.estimatedAvailableChannelCapacity = congestedAckedBitrate + } + c.params.Logger.Infow( + "send side bwe: captured traffic ratio is trending downward", + "channel", c.congestedCTRTrend, + "trafficStats", c.congestedTrafficStats, + "estimatedAvailableChannelCapacity", c.estimatedAvailableChannelCapacity, + ) + + // reset to get new set of samples for next trend + c.resetCTRTrend() + } + + return shouldNotify, c.congestionState, c.estimatedAvailableChannelCapacity +} + +func (c *congestionDetector) createCTRTrend() { + c.resetCTRTrend() + c.congestedPacketGroup = nil } func (c *congestionDetector) resetCTRTrend() { @@ -676,42 +865,53 @@ func (c *congestionDetector) resetCTRTrend() { func (c *congestionDetector) clearCTRTrend() { c.congestedCTRTrend = nil c.congestedTrafficStats = nil + c.congestedPacketGroup = nil } -func (c *congestionDetector) updateCTRTrend(pg *packetGroup) { +func (c *congestionDetector) updateCTRTrend(pi *packetInfo, sendDelta, recvDelta int64, isLost bool) { if c.congestedCTRTrend == nil { return } - // progressively keep increasing the window and make measurements over longer windows, - // if congestion is not relieving, CTR will trend down - c.congestedTrafficStats.Merge(pg.Traffic()) - ctr := c.congestedTrafficStats.CapturedTrafficRatio() - - // quantise CTR to filter out small changes - c.congestedCTRTrend.AddValue(float64(int((ctr+(c.params.Config.CongestedCTREpsilon/2))/c.params.Config.CongestedCTREpsilon)) * c.params.Config.CongestedCTREpsilon) - - if c.congestedCTRTrend.GetDirection() != ccutils.TrendDirectionDownward { - return + if c.congestedPacketGroup == nil { + c.congestedPacketGroup = newPacketGroup( + packetGroupParams{ + Config: c.params.Config.CongestedPacketGroup, + WeightedLoss: c.params.Config.WeightedLoss, + Logger: c.params.Logger, + }, + 0, + ) } - c.params.Logger.Infow("send side bwe: captured traffic ratio is trending downward", "channel", c.congestedCTRTrend) + if err := c.congestedPacketGroup.Add(pi, sendDelta, recvDelta, isLost); err == errGroupFinalized { + // progressively keep increasing the window and make measurements over longer windows, + // if congestion is not relieving, CTR will trend down + c.congestedTrafficStats.Merge(c.congestedPacketGroup.Traffic()) - if bweListener := c.getBWEListener(); bweListener != nil { - bweListener.OnCongestionStateChange(c.congestionState, c.estimatedAvailableChannelCapacity) + ts := newTrafficStats(trafficStatsParams{ + Config: c.params.Config.WeightedLoss, + Logger: c.params.Logger, + }) + ts.Merge(c.congestedPacketGroup.Traffic()) + ctr := ts.CapturedTrafficRatio() + + // quantise CTR to filter out small changes + c.congestedCTRTrend.AddValue(float64(int((ctr+(c.params.Config.CongestedCTREpsilon/2))/c.params.Config.CongestedCTREpsilon)) * c.params.Config.CongestedCTREpsilon) + + c.congestedPacketGroup = newPacketGroup( + packetGroupParams{ + Config: c.params.Config.CongestedPacketGroup, + WeightedLoss: c.params.Config.WeightedLoss, + Logger: c.params.Logger, + }, + c.congestedPacketGroup.PropagatedQueuingDelay(), + ) } - - // reset to get new set of samples for next trend - c.resetCTRTrend() } -func (c *congestionDetector) estimateAvailableChannelCapacity() { - if len(c.packetGroups) == 0 || c.probePacketGroup != nil { - return - } - - threshold, ok := c.packetTracker.BaseSendTimeThreshold(c.params.Config.RateMeasurementWindowDurationMax.Microseconds()) - if !ok { +func (c *congestionDetector) estimateAvailableChannelCapacity(oldestContributingGroup int) { + if len(c.packetGroups) == 0 || c.congestedCTRTrend != nil || c.probePacketGroup != nil { return } @@ -719,26 +919,15 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() { Config: c.params.Config.WeightedLoss, Logger: c.params.Logger, }) - var idx int - for idx = len(c.packetGroups) - 1; idx >= 0; idx-- { + for idx := len(c.packetGroups) - 1; idx >= oldestContributingGroup; idx-- { pg := c.packetGroups[idx] - if mst := pg.MinSendTime(); mst != 0 && mst < threshold { - break + if !pg.IsFinalized() { + continue } agg.Merge(pg.Traffic()) } - if agg.Duration() < c.params.Config.RateMeasurementWindowDurationMin.Microseconds() { - c.params.Logger.Infow( - "send side bwe: not enough data to estimate available channel capacity", - "duration", agg.Duration(), - "numGroups", len(c.packetGroups), - "oldestUsed", max(0, idx), - ) - return - } - c.estimatedAvailableChannelCapacity = agg.AcknowledgedBitrate() } @@ -761,10 +950,6 @@ func (c *congestionDetector) updateCongestionState(state bwe.CongestionState, re prevState := c.congestionState c.congestionState = state - if bweListener := c.getBWEListener(); bweListener != nil { - bweListener.OnCongestionStateChange(state, c.estimatedAvailableChannelCapacity) - } - // when in congested state, monitor changes in captured traffic ratio (CTR) // to ensure allocations are in line with latest estimates, it is possible that // the estimate is incorrect when congestion starts and the allocation may be @@ -772,182 +957,8 @@ func (c *congestionDetector) updateCongestionState(state bwe.CongestionState, re // on a continuous basis allocations can be adjusted in the direction of // reducing/relieving congestion if state == bwe.CongestionStateCongested && prevState != bwe.CongestionStateCongested { - c.resetCTRTrend() + c.createCTRTrend() } else if state != bwe.CongestionStateCongested { c.clearCTRTrend() } } - -func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) { - recvRefTime, isOutOfOrder := c.twccFeedback.ProcessReport(fbr.report, fbr.at) - if isOutOfOrder { - c.params.Logger.Infow("send side bwe: received out-of-order feedback report") - } - - if len(c.packetGroups) == 0 { - c.packetGroups = append( - c.packetGroups, - newPacketGroup( - packetGroupParams{ - Config: c.params.Config.PacketGroup, - WeightedLoss: c.params.Config.WeightedLoss, - Logger: c.params.Logger, - }, - 0, - ), - ) - } - - pg := c.packetGroups[len(c.packetGroups)-1] - trackPacketGroup := func(pi *packetInfo, sendDelta, recvDelta int64, isLost bool) { - if pi == nil { - return - } - - if c.probePacketGroup != nil { - c.probePacketGroup.Add(pi, sendDelta, recvDelta, isLost) - } - - err := pg.Add(pi, sendDelta, recvDelta, isLost) - if err == nil { - return - } - - if err == errGroupFinalized { - // previous group ended, start a new group - c.updateCTRTrend(pg) - - pg = newPacketGroup( - packetGroupParams{ - Config: c.params.Config.PacketGroup, - WeightedLoss: c.params.Config.WeightedLoss, - Logger: c.params.Logger, - }, - pg.PropagatedQueuingDelay(), - ) - c.packetGroups = append(c.packetGroups, pg) - - if err = pg.Add(pi, sendDelta, recvDelta, isLost); err != nil { - c.params.Logger.Warnw("send side bwe: could not add packet to new packet group", err, "packetInfo", pi, "packetGroup", pg) - } - return - } - - // try an older group - for idx := len(c.packetGroups) - 2; idx >= 0; idx-- { - opg := c.packetGroups[idx] - if err := opg.Add(pi, sendDelta, recvDelta, isLost); err == nil { - return - } else if err == errGroupFinalized { - c.params.Logger.Infow("send side bwe: unexpected finalized group", "packetInfo", pi, "packetGroup", opg) - } - } - } - - // 1. go through the TWCC feedback report and record recive time as reported by remote - // 2. process acknowledged packet and group them - // - // losses are not recorded if a feedback report is completely lost. - // RFC recommends treating lost reports by ignoring packets that would have been in it. - // ----------------------------------------------------------------------------------- - // | From a congestion control perspective, lost feedback messages are | - // | handled by ignoring packets which would have been reported as lost or | - // | received in the lost feedback messages. This behavior is similar to | - // | how a lost RTCP receiver report is handled. | - // ----------------------------------------------------------------------------------- - // Reference: https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01#page-4 - sequenceNumber := fbr.report.BaseSequenceNumber - endSequenceNumberExclusive := sequenceNumber + fbr.report.PacketStatusCount - deltaIdx := 0 - for _, chunk := range fbr.report.PacketChunks { - if sequenceNumber == endSequenceNumberExclusive { - break - } - - switch chunk := chunk.(type) { - case *rtcp.RunLengthChunk: - for i := uint16(0); i < chunk.RunLength; i++ { - if sequenceNumber == endSequenceNumberExclusive { - break - } - - recvTime := int64(0) - isLost := false - if chunk.PacketStatusSymbol != rtcp.TypeTCCPacketNotReceived { - recvRefTime += fbr.report.RecvDeltas[deltaIdx].Delta - deltaIdx++ - - recvTime = recvRefTime - } else { - isLost = true - } - pi, sendDelta, recvDelta := c.packetTracker.RecordPacketIndicationFromRemote(sequenceNumber, recvTime) - if pi.sendTime != 0 { - trackPacketGroup(&pi, sendDelta, recvDelta, isLost) - } - sequenceNumber++ - } - - case *rtcp.StatusVectorChunk: - for _, symbol := range chunk.SymbolList { - if sequenceNumber == endSequenceNumberExclusive { - break - } - - recvTime := int64(0) - isLost := false - if symbol != rtcp.TypeTCCPacketNotReceived { - recvRefTime += fbr.report.RecvDeltas[deltaIdx].Delta - deltaIdx++ - - recvTime = recvRefTime - } else { - isLost = true - } - pi, sendDelta, recvDelta := c.packetTracker.RecordPacketIndicationFromRemote(sequenceNumber, recvTime) - if pi.sendTime != 0 { - trackPacketGroup(&pi, sendDelta, recvDelta, isLost) - } - sequenceNumber++ - } - } - } - - c.prunePacketGroups() - c.congestionDetectionStateMachine() -} - -func (c *congestionDetector) worker() { - ticker := time.NewTicker(c.params.Config.PeriodicCheckInterval) - defer ticker.Stop() - - for { - select { - case <-c.wake: - for { - c.lock.Lock() - if c.feedbackReports.Len() == 0 { - c.lock.Unlock() - break - } - fbReport := c.feedbackReports.PopFront() - c.lock.Unlock() - - c.processFeedbackReport(fbReport) - } - - if c.congestionState == bwe.CongestionStateCongested { - ticker.Reset(c.params.Config.PeriodicCheckIntervalCongested) - } else { - ticker.Reset(c.params.Config.PeriodicCheckInterval) - } - - case <-ticker.C: - c.prunePacketGroups() - c.congestionDetectionStateMachine() - - case <-c.stop.Watch(): - return - } - } -} diff --git a/pkg/sfu/bwe/sendsidebwe/packet_group.go b/pkg/sfu/bwe/sendsidebwe/packet_group.go index b3e0f8021..a9b7b1920 100644 --- a/pkg/sfu/bwe/sendsidebwe/packet_group.go +++ b/pkg/sfu/bwe/sendsidebwe/packet_group.go @@ -38,7 +38,7 @@ type PacketGroupConfig struct { } var ( - DefaultPacketGroupConfig = PacketGroupConfig{ + defaultPacketGroupConfig = PacketGroupConfig{ MinPackets: 20, MaxWindowDuration: 500 * time.Millisecond, } @@ -246,6 +246,10 @@ func (p *packetGroup) FinalizedPropagatedQueuingDelay() (int64, bool) { return p.PropagatedQueuingDelay(), true } +func (p *packetGroup) IsFinalized() bool { + return p.isFinalized +} + func (p *packetGroup) Traffic() *trafficStats { return &trafficStats{ minSendTime: p.minSendTime, diff --git a/pkg/sfu/bwe/sendsidebwe/probe_packet_group.go b/pkg/sfu/bwe/sendsidebwe/probe_packet_group.go index 17efc88b6..f47b6fad6 100644 --- a/pkg/sfu/bwe/sendsidebwe/probe_packet_group.go +++ b/pkg/sfu/bwe/sendsidebwe/probe_packet_group.go @@ -35,7 +35,7 @@ type ProbePacketGroupConfig struct { var ( // large numbers to treat a probe packet group as one - DefaultPacketGroupConfigProbe = ProbePacketGroupConfig{ + defaultProbePacketGroupConfig = ProbePacketGroupConfig{ PacketGroup: PacketGroupConfig{ MinPackets: 16384, MaxWindowDuration: time.Minute, diff --git a/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go b/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go index 029a5dcbf..a1cc20e44 100644 --- a/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go +++ b/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go @@ -59,7 +59,7 @@ type SendSideBWEConfig struct { var ( DefaultSendSideBWEConfig = SendSideBWEConfig{ - CongestionDetector: DefaultCongestionDetectorConfig, + CongestionDetector: defaultCongestionDetectorConfig, } ) @@ -93,14 +93,7 @@ func (s *SendSideBWE) SetBWEListener(bweListener bwe.BWEListener) { } func (s *SendSideBWE) Reset() { - s.congestionDetector = newCongestionDetector(congestionDetectorParams{ - Config: s.params.Config.CongestionDetector, - Logger: s.params.Logger, - }) -} - -func (s *SendSideBWE) Stop() { - s.congestionDetector.Stop() + s.congestionDetector.Reset() } func (s *SendSideBWE) RecordPacketSendAndGetSequenceNumber( diff --git a/pkg/sfu/bwe/sendsidebwe/traffic_stats.go b/pkg/sfu/bwe/sendsidebwe/traffic_stats.go index efe6ac634..63b18e364 100644 --- a/pkg/sfu/bwe/sendsidebwe/traffic_stats.go +++ b/pkg/sfu/bwe/sendsidebwe/traffic_stats.go @@ -25,14 +25,16 @@ import ( // ----------------------------------------------------------- type WeightedLossConfig struct { - MinPacketsForLossValidity int `yaml:"min_packets_for_loss_validity,omitempty"` - LossPenaltyFactor float64 `yaml:"loss_penalty_factor,omitempty"` + MinDurationForLossValidity time.Duration `yaml:"min_duration_for_loss_validity,omitempty"` + MinPPSForLossValidity int `yaml:"min_pps_for_loss_validity,omitempty"` + LossPenaltyFactor float64 `yaml:"loss_penalty_factor,omitempty"` } var ( defaultWeightedLossConfig = WeightedLossConfig{ - MinPacketsForLossValidity: 20, - LossPenaltyFactor: 0.25, + MinDurationForLossValidity: 200 * time.Millisecond, + MinPPSForLossValidity: 20, + LossPenaltyFactor: 0.25, } ) @@ -101,7 +103,7 @@ func (ts *trafficStats) CapturedTrafficRatio() float64 { } // apply a penalty for lost packets, - // tha rationale being packet dropping is a strategy to relieve congestion + // the rationale being packet dropping is a strategy to relieve congestion // and if they were not dropped, they would have increased queuing delay, // as it is not possible to know the reason for the losses, // apply a small penalty to receive delta aggregate to simulate those packets @@ -110,8 +112,14 @@ func (ts *trafficStats) CapturedTrafficRatio() float64 { } func (ts *trafficStats) WeightedLoss() float64 { + durationMicro := ts.Duration() + if time.Duration(durationMicro*1000) < ts.params.Config.MinDurationForLossValidity { + return 0.0 + } + totalPackets := float64(ts.lostPackets + ts.ackedPackets) - if int(totalPackets) < ts.params.Config.MinPacketsForLossValidity { + pps := totalPackets * 1e6 / float64(durationMicro) + if int(pps) < ts.params.Config.MinPPSForLossValidity { return 0.0 } @@ -120,8 +128,6 @@ func (ts *trafficStats) WeightedLoss() float64 { lossRatio = float64(ts.lostPackets) / totalPackets } - pps := totalPackets * 1e6 / float64(ts.Duration()) - // Log10 is used to give higher weight for the same loss ratio at higher packet rates, // for e.g. // - 10% loss at 20 pps = 0.1 * log10(20) = 0.130 @@ -144,6 +150,11 @@ func (ts *trafficStats) MarshalLogObject(e zapcore.ObjectEncoder) error { duration := time.Duration(ts.Duration() * 1000) e.AddDuration("duration", duration) + e.AddInt("ackedPackets", ts.ackedPackets) + e.AddInt("ackedBytes", ts.ackedBytes) + e.AddInt("lostPackets", ts.lostPackets) + e.AddInt("lostBytes", ts.lostBytes) + bitrate := float64(0) if duration != 0 { bitrate = float64(ts.ackedBytes*8) / duration.Seconds() diff --git a/pkg/sfu/ccutils/prober.go b/pkg/sfu/ccutils/prober.go index c3068354b..5e7e31eb2 100644 --- a/pkg/sfu/ccutils/prober.go +++ b/pkg/sfu/ccutils/prober.go @@ -353,7 +353,12 @@ func (p ProbeClusterResult) Duration() time.Duration { } func (p ProbeClusterResult) Bitrate() float64 { - return float64(p.Bytes()*8) / p.Duration().Seconds() + duration := p.Duration().Seconds() + if duration != 0 { + return float64(p.Bytes()*8) / duration + } + + return 0 } func (p ProbeClusterResult) MarshalLogObject(e zapcore.ObjectEncoder) error { diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 501c3c732..ab285f69e 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1874,11 +1874,15 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { d.params.Logger.Errorw("could not unmarshal rtp packet in retransmit", err) continue } - pkt.Header.Marker = epm.marker - pkt.Header.SequenceNumber = epm.targetSeqNo - pkt.Header.Timestamp = epm.timestamp - pkt.Header.SSRC = d.ssrc - pkt.Header.PayloadType = d.getTranslatedPayloadType(pkt.Header.PayloadType) + hdr := &rtp.Header{ + Version: pkt.Header.Version, + Padding: pkt.Header.Padding, + Marker: epm.marker, + PayloadType: d.getTranslatedPayloadType(pkt.Header.PayloadType), + SequenceNumber: epm.targetSeqNo, + Timestamp: epm.timestamp, + SSRC: d.ssrc, + } poolEntity := PacketFactory.Get().(*[]byte) payload := *poolEntity @@ -1899,26 +1903,28 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { } else { ddBytes = epm.ddBytes[:epm.ddBytesSize] } - pkt.Header.SetExtension(uint8(d.dependencyDescriptorExtID), ddBytes) + if len(ddBytes) != 0 { + hdr.SetExtension(uint8(d.dependencyDescriptorExtID), ddBytes) + } } if d.absCaptureTimeExtID != 0 && len(epm.actBytes) != 0 { - pkt.Header.SetExtension(uint8(d.absCaptureTimeExtID), epm.actBytes) + hdr.SetExtension(uint8(d.absCaptureTimeExtID), epm.actBytes) } - d.addDummyExtensions(&pkt.Header) + d.addDummyExtensions(hdr) - headerSize := pkt.Header.MarshalSize() + headerSize := hdr.MarshalSize() d.rtpStats.Update( mono.UnixNano(), epm.extSequenceNumber, epm.extTimestamp, - pkt.Header.Marker, + hdr.Marker, headerSize, len(payload), 0, true, ) d.pacer.Enqueue(&pacer.Packet{ - Header: &pkt.Header, + Header: hdr, HeaderSize: headerSize, Payload: payload, ProbeClusterId: ccutils.ProbeClusterId(d.probeClusterId.Load()), diff --git a/pkg/sfu/pacer/base.go b/pkg/sfu/pacer/base.go index 07b7d9d0a..b7097786f 100644 --- a/pkg/sfu/pacer/base.go +++ b/pkg/sfu/pacer/base.go @@ -23,6 +23,7 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils/mono" "github.com/pion/rtp" + "go.uber.org/atomic" ) type Base struct { @@ -30,6 +31,8 @@ type Base struct { bwe bwe.BWE + lastPacketSentAt atomic.Int64 + *ProbeObserver } @@ -47,6 +50,10 @@ func (b *Base) SetInterval(_interval time.Duration) { func (b *Base) SetBitrate(_bitrate int) { } +func (b *Base) TimeSinceLastSentPacket() time.Duration { + return time.Duration(mono.UnixNano() - b.lastPacketSentAt.Load()) +} + func (b *Base) SendPacket(p *Packet) (int, error) { defer func() { if p.Pool != nil && p.PoolEntity != nil { @@ -56,7 +63,7 @@ func (b *Base) SendPacket(p *Packet) (int, error) { err := b.patchRTPHeaderExtensions(p) if err != nil { - b.logger.Errorw("writing rtp header extensions err", err) + b.logger.Errorw("patching rtp header extensions err", err) return 0, err } @@ -76,15 +83,17 @@ func (b *Base) SendPacket(p *Packet) (int, error) { func (b *Base) patchRTPHeaderExtensions(p *Packet) error { sendingAt := mono.Now() if p.AbsSendTimeExtID != 0 { - sendTime := rtp.NewAbsSendTimeExtension(sendingAt) - b, err := sendTime.Marshal() + absSendTime := rtp.NewAbsSendTimeExtension(sendingAt) + absSendTimeBytes, err := absSendTime.Marshal() if err != nil { return err } - if err = p.Header.SetExtension(p.AbsSendTimeExtID, b); err != nil { + if err = p.Header.SetExtension(p.AbsSendTimeExtID, absSendTimeBytes); err != nil { return err } + + b.lastPacketSentAt.Store(sendingAt.UnixNano()) } packetSize := p.HeaderSize + len(p.Payload) @@ -99,14 +108,16 @@ func (b *Base) patchRTPHeaderExtensions(p *Packet) error { twccExt := rtp.TransportCCExtension{ TransportSequence: twccSN, } - b, err := twccExt.Marshal() + twccExtBytes, err := twccExt.Marshal() if err != nil { return err } - if err = p.Header.SetExtension(p.TransportWideExtID, b); err != nil { + if err = p.Header.SetExtension(p.TransportWideExtID, twccExtBytes); err != nil { return err } + + b.lastPacketSentAt.Store(sendingAt.UnixNano()) } b.ProbeObserver.RecordPacket(packetSize, p.IsRTX, p.ProbeClusterId, p.IsProbe) diff --git a/pkg/sfu/pacer/no_queue.go b/pkg/sfu/pacer/no_queue.go index fb18d0517..70d138b20 100644 --- a/pkg/sfu/pacer/no_queue.go +++ b/pkg/sfu/pacer/no_queue.go @@ -48,6 +48,11 @@ func NewNoQueue(logger logger.Logger, bwe bwe.BWE) *NoQueue { func (n *NoQueue) Stop() { n.stop.Break() + + select { + case n.wake <- struct{}{}: + default: + } } func (n *NoQueue) Enqueue(p *Packet) { diff --git a/pkg/sfu/pacer/pacer.go b/pkg/sfu/pacer/pacer.go index f72886df3..8919dd63e 100644 --- a/pkg/sfu/pacer/pacer.go +++ b/pkg/sfu/pacer/pacer.go @@ -44,6 +44,8 @@ type Pacer interface { SetInterval(interval time.Duration) SetBitrate(bitrate int) + TimeSinceLastSentPacket() time.Duration + SetPacerProbeObserverListener(listener PacerProbeObserverListener) StartProbeCluster(pci ccutils.ProbeClusterInfo) EndProbeCluster(probeClusterId ccutils.ProbeClusterId) ccutils.ProbeClusterInfo diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 4afa9c6e6..bc92744df 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -149,20 +149,26 @@ const ( ) type StreamAllocatorConfig struct { - ProbeMode ProbeMode `yaml:"probe_mode,omitempty"` - MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"` - DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"` + MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"` + DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"` - ProbeOveragePct int64 `yaml:"probe_overage_pct,omitempty"` - ProbeMinBps int64 `yaml:"probe_min_bps,omitempty"` + ProbeMode ProbeMode `yaml:"probe_mode,omitempty"` + ProbeOveragePct int64 `yaml:"probe_overage_pct,omitempty"` + ProbeMinBps int64 `yaml:"probe_min_bps,omitempty"` + + PausedMinWait time.Duration `yaml:"paused_min_wait,omitempty"` } var ( DefaultStreamAllocatorConfig = StreamAllocatorConfig{ - ProbeMode: ProbeModePadding, + MinChannelCapacity: 0, + DisableEstimationUnmanagedTracks: false, + ProbeMode: ProbeModePadding, ProbeOveragePct: 120, ProbeMinBps: 200_000, + + PausedMinWait: 5 * time.Second, } ) @@ -655,6 +661,13 @@ func (s *StreamAllocator) handleSignalEstimate(event Event) { } func (s *StreamAllocator) handleSignalPeriodicPing(Event) { + // if pause is allowed, there may be no packets sent and BWE could be congested state, + // reset BWE if that persists for a while + if s.state == streamAllocatorStateDeficient && s.params.Pacer.TimeSinceLastSentPacket() > s.params.Config.PausedMinWait { + s.params.Logger.Infow("stream allocator: resetting bwe to enable probing") + s.params.BWE.Reset() + } + // finalize any probe that may have finished/aborted if s.activeProbeClusterId != ccutils.ProbeClusterIdInvalid { if probeSignal, channelCapacity, isFinalized := s.params.BWE.ProbeClusterFinalize(); isFinalized {