diff --git a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go index 23bcc36e5..bc838557f 100644 --- a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go +++ b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go @@ -15,6 +15,7 @@ package sendsidebwe import ( + "fmt" "sync" "time" @@ -45,7 +46,7 @@ var ( defaultLossEarlyWarningCongestionSignalConfig = CongestionSignalConfig{ MinNumberOfGroups: 2, - MinDuration: 200 * time.Millisecond, + MinDuration: 300 * time.Millisecond, } defaultQueuingDelayCongestedCongestionSignalConfig = CongestionSignalConfig{ @@ -54,8 +55,8 @@ var ( } defaultLossCongestedCongestionSignalConfig = CongestionSignalConfig{ - MinNumberOfGroups: 5, - MinDuration: 600 * time.Millisecond, + MinNumberOfGroups: 6, + MinDuration: 900 * time.Millisecond, } ) @@ -68,8 +69,9 @@ type ProbeSignalConfig struct { JQRMinDelay time.Duration `yaml:"jqr_min_delay,omitempty"` DQRMaxDelay time.Duration `yaml:"dqr_max_delay,omitempty"` - WeightedLoss WeightedLossConfig `yaml:"weighted_loss,omitempty"` - CongestionMinWeightedLoss float64 `yaml:"congestion_min_weighted_loss,omitempty"` + WeightedLoss WeightedLossConfig `yaml:"weighted_loss,omitempty"` + JQRMinWeightedLoss float64 `yaml:"jqr_min_weighted_loss,omitempty"` + DQRMaxWeightedLoss float64 `yaml:"dqr_max_weighted_loss,omitempty"` } func (p ProbeSignalConfig) IsValid(pci ccutils.ProbeClusterInfo) bool { @@ -83,15 +85,11 @@ func (p ProbeSignalConfig) ProbeSignal(ppg *probePacketGroup) (ccutils.ProbeSign ts.Merge(ppg.Traffic()) pqd := ppg.PropagatedQueuingDelay() - if pqd > p.JQRMinDelay.Microseconds() { + if pqd > p.JQRMinDelay.Microseconds() || ts.WeightedLoss() > p.JQRMinWeightedLoss { return ccutils.ProbeSignalCongesting, ts.AcknowledgedBitrate() } - if ts.WeightedLoss() > p.CongestionMinWeightedLoss { - return ccutils.ProbeSignalCongesting, ts.AcknowledgedBitrate() - } - - if pqd < p.DQRMaxDelay.Microseconds() { + if pqd < p.DQRMaxDelay.Microseconds() && ts.WeightedLoss() < p.DQRMaxWeightedLoss { return ccutils.ProbeSignalNotCongesting, ts.AcknowledgedBitrate() } @@ -106,43 +104,58 @@ var ( JQRMinDelay: 15 * time.Millisecond, DQRMaxDelay: 5 * time.Millisecond, - WeightedLoss: defaultWeightedLossConfig, - CongestionMinWeightedLoss: 0.25, + WeightedLoss: defaultWeightedLossConfig, + JQRMinWeightedLoss: 0.25, + DQRMaxWeightedLoss: 0.1, } ) // ------------------------------------------------------------------------------- +type queuingRegion int + +const ( + queuingRegionDQR queuingRegion = iota + queuingRegionIndeterminate + queuingRegionJQR +) + +func (q queuingRegion) String() string { + switch q { + case queuingRegionDQR: + return "DQR" + case queuingRegionIndeterminate: + return "INDETERMINATE" + case queuingRegionJQR: + return "JQR" + default: + return fmt.Sprintf("%d", int(q)) + } +} + +// ------------------------------------------------------------------------------- + type qdMeasurement struct { - earlyWarningConfig CongestionSignalConfig - congestedConfig CongestionSignalConfig - jqrMin int64 - dqrMax int64 + config CongestionSignalConfig + jqrMin int64 + dqrMax int64 numGroups int minSendTime int64 maxSendTime int64 - isSealed bool + isSealed bool + sealedGroupIdx int - isEarlyWarningTriggered bool - earlyWarningGroupIdx int - - isCongestedTriggered bool - congestedGroupIdx int + queuingRegion queuingRegion } -func newQdMeasurement( - earlyWarningConfig CongestionSignalConfig, - congestedConfig CongestionSignalConfig, - jqrMin int64, - dqrMax int64, -) *qdMeasurement { +func newQdMeasurement(config CongestionSignalConfig, jqrMin int64, dqrMax int64) *qdMeasurement { return &qdMeasurement{ - earlyWarningConfig: earlyWarningConfig, - congestedConfig: congestedConfig, - jqrMin: jqrMin, - dqrMax: dqrMax, + config: config, + jqrMin: jqrMin, + dqrMax: dqrMax, + queuingRegion: queuingRegionIndeterminate, } } @@ -159,6 +172,8 @@ func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) { if pqd < q.dqrMax { // a DQR breaks continuity q.isSealed = true + q.sealedGroupIdx = groupIdx + q.queuingRegion = queuingRegionDQR return } @@ -171,17 +186,10 @@ func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) { q.maxSendTime = max(q.maxSendTime, maxSendTime) } - if !q.isEarlyWarningTriggered && q.earlyWarningConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) { - q.isEarlyWarningTriggered = true - q.earlyWarningGroupIdx = groupIdx - } - - if !q.isCongestedTriggered && q.congestedConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) { - q.isCongestedTriggered = true - q.congestedGroupIdx = groupIdx - - // can seal if congested config thresholds are met as they are longer + if q.config.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) { q.isSealed = true + q.sealedGroupIdx = groupIdx + q.queuingRegion = queuingRegionJQR } } @@ -189,20 +197,8 @@ func (q *qdMeasurement) IsSealed() bool { return q.isSealed } -func (q *qdMeasurement) IsEarlyWarningTriggered() bool { - return q.isEarlyWarningTriggered -} - -func (q *qdMeasurement) EarlyWarningGroupIdx() int { - return q.earlyWarningGroupIdx -} - -func (q *qdMeasurement) IsCongestedTriggered() bool { - return q.isCongestedTriggered -} - -func (q *qdMeasurement) CongestedGroupIdx() int { - return q.congestedGroupIdx +func (q *qdMeasurement) Result() (queuingRegion, int) { + return q.queuingRegion, q.sealedGroupIdx } func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { @@ -215,49 +211,45 @@ func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddInt64("maxSendTime", q.maxSendTime) e.AddDuration("duration", time.Duration((q.maxSendTime-q.minSendTime)*1000)) e.AddBool("isSealed", q.isSealed) - e.AddBool("isEarlyWarningTriggered", q.isEarlyWarningTriggered) - e.AddInt("earlyWarningGroupIdx", q.earlyWarningGroupIdx) - e.AddBool("isCongestedTriggered", q.isCongestedTriggered) - e.AddInt("congestedGroupIdx", q.congestedGroupIdx) + e.AddInt("sealedGroupIdx", q.sealedGroupIdx) + e.AddString("queuingRegion", q.queuingRegion.String()) return nil } // ------------------------------------------------------------------------------- type lossMeasurement struct { - earlyWarningConfig CongestionSignalConfig - congestedConfig CongestionSignalConfig - congestionMinLoss float64 + config CongestionSignalConfig + jqrMinLoss float64 + dqrMaxLoss float64 numGroups int ts *trafficStats - isEarlyWarningGrouped bool - earlyWarningGroupIdx int - earlyWarningWeightedLoss float64 + isSealed bool + sealedGroupIdx int - isCongestedGrouped bool - congestedGroupIdx int - congestedWeightedLoss float64 + weightedLoss float64 - isSealed bool + queuingRegion queuingRegion } func newLossMeasurement( - earlyWarningConfig CongestionSignalConfig, - congestedConfig CongestionSignalConfig, + config CongestionSignalConfig, weightedLossConfig WeightedLossConfig, - congestionMinLoss float64, + jqrMinLoss float64, + dqrMaxLoss float64, logger logger.Logger, ) *lossMeasurement { return &lossMeasurement{ - earlyWarningConfig: earlyWarningConfig, - congestedConfig: congestedConfig, - congestionMinLoss: congestionMinLoss, + config: config, + jqrMinLoss: jqrMinLoss, + dqrMaxLoss: dqrMaxLoss, ts: newTrafficStats(trafficStatsParams{ Config: weightedLossConfig, Logger: logger, }), + queuingRegion: queuingRegionIndeterminate, } } @@ -270,16 +262,16 @@ func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) { l.ts.Merge(pg.Traffic()) duration := l.ts.Duration() - if !l.isEarlyWarningGrouped && l.earlyWarningConfig.IsTriggered(l.numGroups, duration) { - l.isEarlyWarningGrouped = true - l.earlyWarningGroupIdx = groupIdx - l.earlyWarningWeightedLoss = l.ts.WeightedLoss() - } - if !l.isCongestedGrouped && l.congestedConfig.IsTriggered(l.numGroups, duration) { - l.isCongestedGrouped = true - l.congestedGroupIdx = groupIdx - l.congestedWeightedLoss = l.ts.WeightedLoss() - l.isSealed = true // can seal if congested thresholds are satisfied as those should be higher + if l.config.IsTriggered(l.numGroups, duration) { + l.isSealed = true + l.sealedGroupIdx = groupIdx + l.weightedLoss = l.ts.WeightedLoss() + + if l.weightedLoss < l.dqrMaxLoss { + l.queuingRegion = queuingRegionDQR + } else if l.weightedLoss > l.jqrMinLoss { + l.queuingRegion = queuingRegionJQR + } } } @@ -287,20 +279,8 @@ func (l *lossMeasurement) IsSealed() bool { return l.isSealed } -func (l *lossMeasurement) IsEarlyWarningTriggered() bool { - return l.earlyWarningWeightedLoss > l.congestionMinLoss -} - -func (l *lossMeasurement) EarlyWarningGroupIdx() int { - return l.earlyWarningGroupIdx -} - -func (l *lossMeasurement) IsCongestedTriggered() bool { - return l.congestedWeightedLoss > l.congestionMinLoss -} - -func (l *lossMeasurement) CongestedGroupIdx() int { - return l.congestedGroupIdx +func (l *lossMeasurement) Result() (queuingRegion, int) { + return l.queuingRegion, l.sealedGroupIdx } func (l *lossMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { @@ -310,15 +290,10 @@ func (l *lossMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddInt("numGroups", l.numGroups) e.AddObject("ts", l.ts) - e.AddBool("isEarlyWarningGrouped", l.isEarlyWarningGrouped) - e.AddInt("earlyWarningGroupIdx", l.earlyWarningGroupIdx) - e.AddFloat64("earlyWarningWeightedLoss", l.earlyWarningWeightedLoss) - e.AddBool("isCongestedGrouped", l.isCongestedGrouped) - e.AddInt("congestedGroupIdx", l.congestedGroupIdx) - e.AddFloat64("congestedWeightedLoss", l.congestedWeightedLoss) e.AddBool("isSealed", l.isSealed) - e.AddBool("isEarlyWarningTriggered", l.IsEarlyWarningTriggered()) - e.AddBool("isCongestedTriggered", l.IsCongestedTriggered()) + e.AddInt("sealedGroupIdx", l.sealedGroupIdx) + e.AddFloat64("weightedLoss", l.weightedLoss) + e.AddString("queuingRegion", l.queuingRegion.String()) return nil } @@ -335,8 +310,9 @@ type CongestionDetectorConfig struct { JQRMinDelay time.Duration `yaml:"jqr_min_delay,omitempty"` DQRMaxDelay time.Duration `yaml:"dqr_max_delay,omitempty"` - WeightedLoss WeightedLossConfig `yaml:"weighted_loss,omitempty"` - CongestionMinWeightedLoss float64 `yaml:"congestion_min_weighted_loss,omitempty"` + WeightedLoss WeightedLossConfig `yaml:"weighted_loss,omitempty"` + JQRMinWeightedLoss float64 `yaml:"jqr_min_weighted_loss,omitempty"` + DQRMaxWeightedLoss float64 `yaml:"dqr_max_weighted_loss,omitempty"` QueuingDelayEarlyWarning CongestionSignalConfig `yaml:"queuing_delay_early_warning,omitempty"` LossEarlyWarning CongestionSignalConfig `yaml:"loss_early_warning,omitempty"` @@ -368,7 +344,7 @@ var ( defaultCongestionDetectorConfig = CongestionDetectorConfig{ PacketGroup: defaultPacketGroupConfig, - PacketGroupMaxAge: 10 * time.Second, + PacketGroupMaxAge: 5 * time.Second, ProbePacketGroup: defaultProbePacketGroupConfig, ProbeRegulator: ccutils.DefaultProbeRegulatorConfig, @@ -377,8 +353,9 @@ var ( JQRMinDelay: 15 * time.Millisecond, DQRMaxDelay: 5 * time.Millisecond, - WeightedLoss: defaultWeightedLossConfig, - CongestionMinWeightedLoss: 0.25, + WeightedLoss: defaultWeightedLossConfig, + JQRMinWeightedLoss: 0.25, + DQRMaxWeightedLoss: 0.1, QueuingDelayEarlyWarning: defaultQueuingDelayEarlyWarningCongestionSignalConfig, LossEarlyWarning: defaultLossEarlyWarningCongestionSignalConfig, @@ -671,6 +648,23 @@ func (c *congestionDetector) ProbeClusterDone(pci ccutils.ProbeClusterInfo) { } } +func (c *congestionDetector) ProbeClusterIsGoalReached() bool { + c.lock.Lock() + defer c.lock.Unlock() + + if c.probePacketGroup == nil || c.congestionState != bwe.CongestionStateNone { + return false + } + + pci := c.probePacketGroup.ProbeClusterInfo() + if !c.params.Config.ProbeSignal.IsValid(pci) { + return false + } + + probeSignal, estimatedAvailableChannelCapacity := c.params.Config.ProbeSignal.ProbeSignal(c.probePacketGroup) + return probeSignal != ccutils.ProbeSignalNotCongesting && estimatedAvailableChannelCapacity > int64(pci.Goal.DesiredBps) +} + func (c *congestionDetector) ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool) { c.lock.Lock() defer c.lock.Unlock() @@ -720,18 +714,21 @@ func (c *congestionDetector) prunePacketGroups() { } } -func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool, string, int) { +func (c *congestionDetector) getCongestionSignal( + stage string, + qdConfig CongestionSignalConfig, + lossConfig CongestionSignalConfig, +) (queuingRegion, string, int) { qdMeasurement := newQdMeasurement( - c.params.Config.QueuingDelayEarlyWarning, - c.params.Config.QueuingDelayCongested, + qdConfig, c.params.Config.JQRMinDelay.Microseconds(), c.params.Config.DQRMaxDelay.Microseconds(), ) lossMeasurement := newLossMeasurement( - c.params.Config.LossEarlyWarning, - c.params.Config.LossCongested, + lossConfig, c.params.Config.WeightedLoss, - c.params.Config.CongestionMinWeightedLoss, + c.params.Config.JQRMinWeightedLoss, + c.params.Config.DQRMaxWeightedLoss, c.params.Logger, ) @@ -745,97 +742,88 @@ func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool, if qdMeasurement.IsSealed() && lossMeasurement.IsSealed() { break } - - // if "congested" triggered, can stop as that is the longer duration check and also - // the worst case check, i. e. if "congested" is triggered due to any condition, - // there can be nothing else that can trigger - if qdMeasurement.IsCongestedTriggered() || lossMeasurement.IsCongestedTriggered() { - break - } } oldestContributingGroup := max(0, idx) - earlyWarningReason := "" - earlyWarningTriggered := qdMeasurement.IsEarlyWarningTriggered() - if earlyWarningTriggered { - earlyWarningReason = "queuing-delay" - oldestContributingGroup = qdMeasurement.EarlyWarningGroupIdx() - c.params.Logger.Debugw("send side bwe: early warning queuing-delay", "qd", qdMeasurement) + reason := "" + qr, groupIdx := qdMeasurement.Result() + if qr == queuingRegionJQR { + reason = "queuing-delay" + oldestContributingGroup = groupIdx + c.params.Logger.Debugw("send side bwe: queuing-delay in JQR", "stage", stage, "qd", qdMeasurement) } else { - earlyWarningTriggered = lossMeasurement.IsEarlyWarningTriggered() - if earlyWarningTriggered { - earlyWarningReason = "loss" - oldestContributingGroup = lossMeasurement.EarlyWarningGroupIdx() - c.params.Logger.Debugw("send side bwe: early warning loss", "loss", lossMeasurement) + qr, groupIdx = lossMeasurement.Result() + if qr == queuingRegionJQR { + reason = "loss" + oldestContributingGroup = groupIdx + c.params.Logger.Debugw("send side bwe: loss in JQR", "stage", stage, "loss", lossMeasurement) } } - congestedReason := "" - congestedTriggered := qdMeasurement.IsCongestedTriggered() - if congestedTriggered { - congestedReason = "queuing-delay" - oldestContributingGroup = qdMeasurement.CongestedGroupIdx() - c.params.Logger.Debugw("send side bwe: congested queuing-delay", "qd", qdMeasurement) - } else { - congestedTriggered = lossMeasurement.IsCongestedTriggered() - if congestedTriggered { - congestedReason = "loss" - oldestContributingGroup = lossMeasurement.CongestedGroupIdx() - c.params.Logger.Debugw("send side bwe: congested loss", "loss", lossMeasurement) - } - } + return qr, reason, oldestContributingGroup +} - return earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, oldestContributingGroup +func (c *congestionDetector) getEarlyWarningSignal() (queuingRegion, string, int) { + return c.getCongestionSignal( + "early-warning", + c.params.Config.QueuingDelayEarlyWarning, + c.params.Config.LossEarlyWarning, + ) +} + +func (c *congestionDetector) getCongestedSignal() (queuingRegion, string, int) { + return c.getCongestionSignal( + "congested", + c.params.Config.QueuingDelayCongested, + c.params.Config.LossCongested, + ) } func (c *congestionDetector) congestionDetectionStateMachine() (bool, bwe.CongestionState, int64) { state := c.congestionState newState := c.congestionState - reason := "" - - earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, oldestContributingGroup := c.isCongestionSignalTriggered() + var ( + qr queuingRegion + reason string + oldestContributingGroup int + ) switch state { case bwe.CongestionStateNone: - if congestedTriggered && !earlyWarningTriggered { - c.params.Logger.Warnw("send side bwe: invalid congested state transition", nil, "from", state, "reason", congestedReason) - } - if earlyWarningTriggered { + qr, reason, oldestContributingGroup = c.getEarlyWarningSignal() + if qr == queuingRegionJQR { newState = bwe.CongestionStateEarlyWarning - reason = earlyWarningReason } case bwe.CongestionStateEarlyWarning: - if congestedTriggered { + qr, reason, oldestContributingGroup = c.getCongestedSignal() + if qr == queuingRegionJQR { newState = bwe.CongestionStateCongested - reason = congestedReason - } else if !earlyWarningTriggered { - newState = bwe.CongestionStateEarlyWarningHangover + } else { + qr, _, _ := c.getEarlyWarningSignal() + if qr == queuingRegionDQR { + newState = bwe.CongestionStateEarlyWarningHangover + } } case bwe.CongestionStateEarlyWarningHangover: - if congestedTriggered && !earlyWarningTriggered { - c.params.Logger.Warnw("send side bwe: invalid congested state transition", nil, "from", state, "reason", congestedReason) - } - if earlyWarningTriggered { + qr, reason, oldestContributingGroup = c.getEarlyWarningSignal() + if qr == queuingRegionJQR { newState = bwe.CongestionStateEarlyWarning - reason = earlyWarningReason } else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.EarlyWarningHangover { newState = bwe.CongestionStateNone } case bwe.CongestionStateCongested: - if !congestedTriggered { + qr, _, _ = c.getCongestedSignal() + if qr == queuingRegionDQR { newState = bwe.CongestionStateCongestedHangover } case bwe.CongestionStateCongestedHangover: - if congestedTriggered && !earlyWarningTriggered { - c.params.Logger.Warnw("send side bwe: invalid congested state transition", nil, "from", state, "reason", congestedReason) - } - if earlyWarningTriggered { + qr, reason, oldestContributingGroup = c.getEarlyWarningSignal() + if qr == queuingRegionJQR { newState = bwe.CongestionStateEarlyWarning - reason = earlyWarningReason } else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.CongestedHangover { newState = bwe.CongestionStateNone } diff --git a/pkg/sfu/bwe/sendsidebwe/probe_packet_group.go b/pkg/sfu/bwe/sendsidebwe/probe_packet_group.go index 2d4201bcf..68a1b0c79 100644 --- a/pkg/sfu/bwe/sendsidebwe/probe_packet_group.go +++ b/pkg/sfu/bwe/sendsidebwe/probe_packet_group.go @@ -87,6 +87,10 @@ func (p *probePacketGroup) ProbeClusterDone(pci ccutils.ProbeClusterInfo) { p.doneAt = mono.Now() } +func (p *probePacketGroup) ProbeClusterInfo() ccutils.ProbeClusterInfo { + return p.pci +} + func (p *probePacketGroup) MaybeFinalizeProbe(maxSequenceNumber uint64, rtt float64) (ccutils.ProbeClusterInfo, bool) { if p.doneAt.IsZero() { return ccutils.ProbeClusterInfoInvalid, false @@ -111,7 +115,7 @@ func (p *probePacketGroup) MaybeFinalizeProbe(maxSequenceNumber uint64, rtt floa } func (p *probePacketGroup) Add(pi *packetInfo, sendDelta, recvDelta int64, isLost bool) error { - if !p.doneAt.IsZero() || pi.probeClusterId != p.pci.Id { + if pi.probeClusterId != p.pci.Id { return nil } diff --git a/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go b/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go index a1cc20e44..59fdc0614 100644 --- a/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go +++ b/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go @@ -130,6 +130,10 @@ func (s *SendSideBWE) ProbeClusterDone(pci ccutils.ProbeClusterInfo) { s.congestionDetector.ProbeClusterDone(pci) } +func (s *SendSideBWE) ProbeClusterIsGoalReached() bool { + return s.congestionDetector.ProbeClusterIsGoalReached() +} + func (s *SendSideBWE) ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool) { return s.congestionDetector.ProbeClusterFinalize() } diff --git a/pkg/sfu/bwe/sendsidebwe/traffic_stats.go b/pkg/sfu/bwe/sendsidebwe/traffic_stats.go index 63b18e364..ba2102759 100644 --- a/pkg/sfu/bwe/sendsidebwe/traffic_stats.go +++ b/pkg/sfu/bwe/sendsidebwe/traffic_stats.go @@ -165,10 +165,14 @@ func (ts *trafficStats) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddInt64("recvDelta", ts.recvDelta) e.AddInt64("groupDelay", ts.recvDelta-ts.sendDelta) - e.AddFloat64("weightedLoss", ts.WeightedLoss()) - if (ts.ackedPackets + ts.lostPackets) != 0 { - e.AddFloat64("rawLoss", float64(ts.lostPackets)/float64(ts.ackedPackets+ts.lostPackets)) + totalPackets := ts.lostPackets + ts.ackedPackets + if duration != 0 { + e.AddFloat64("pps", float64(totalPackets)/duration.Seconds()) } + if (totalPackets) != 0 { + e.AddFloat64("rawLoss", float64(ts.lostPackets)/float64(totalPackets)) + } + e.AddFloat64("weightedLoss", ts.WeightedLoss()) e.AddInt64("lossPenalty", ts.lossPenalty()) capturedTrafficRatio := ts.CapturedTrafficRatio() diff --git a/pkg/sfu/ccutils/prober.go b/pkg/sfu/ccutils/prober.go index b6c92662e..e3428c112 100644 --- a/pkg/sfu/ccutils/prober.go +++ b/pkg/sfu/ccutils/prober.go @@ -349,12 +349,15 @@ func (p ProbeClusterGoal) MarshalLogObject(e zapcore.ObjectEncoder) error { } type ProbeClusterResult struct { - StartTime int64 - EndTime int64 - BytesProbe int - BytesNonProbePrimary int - BytesNonProbeRTX int - IsCompleted bool + StartTime int64 + EndTime int64 + PacketsProbe int + BytesProbe int + PacketsNonProbePrimary int + BytesNonProbePrimary int + PacketsNonProbeRTX int + BytesNonProbeRTX int + IsCompleted bool } func (p ProbeClusterResult) Bytes() int { @@ -378,8 +381,11 @@ func (p ProbeClusterResult) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddTime("StartTime", time.Unix(0, p.StartTime)) e.AddTime("EndTime", time.Unix(0, p.EndTime)) e.AddDuration("Duration", p.Duration()) + e.AddInt("PacketsProbe", p.PacketsProbe) e.AddInt("BytesProbe", p.BytesProbe) + e.AddInt("PacketsNonProbePrimary", p.PacketsNonProbePrimary) e.AddInt("BytesNonProbePrimary", p.BytesNonProbePrimary) + e.AddInt("PacketsNonProbeRTX", p.PacketsNonProbeRTX) e.AddInt("BytesNonProbeRTX", p.BytesNonProbeRTX) e.AddInt("Bytes", p.Bytes()) e.AddFloat64("Bitrate", p.Bitrate()) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index f75d6502a..badfaaff4 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1817,7 +1817,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) { } case *rtcp.TransportLayerCC: - if p.MediaSSRC == d.ssrc || (d.ssrcRTX != 0 && p.MediaSSRC == d.ssrcRTX) { + if p.MediaSSRC == d.ssrc { if sal := d.getStreamAllocatorListener(); sal != nil { sal.OnTransportCCFeedback(d, p) } @@ -1885,6 +1885,13 @@ func (d *DownTrack) handleRTCPRTX(bytes []byte) { d.rtpStatsRTX.UpdateFromReceiverReport(r) } + + case *rtcp.TransportLayerCC: + if p.MediaSSRC == d.ssrcRTX { + if sal := d.getStreamAllocatorListener(); sal != nil { + sal.OnTransportCCFeedback(d, p) + } + } } } } diff --git a/pkg/sfu/pacer/probe_observer.go b/pkg/sfu/pacer/probe_observer.go index fd370d758..ff51b8169 100644 --- a/pkg/sfu/pacer/probe_observer.go +++ b/pkg/sfu/pacer/probe_observer.go @@ -112,11 +112,14 @@ func (po *ProbeObserver) RecordPacket(size int, isRTX bool, probeClusterId ccuti } if isProbe { + po.pci.Result.PacketsProbe++ po.pci.Result.BytesProbe += size } else { if isRTX { + po.pci.Result.PacketsNonProbeRTX++ po.pci.Result.BytesNonProbeRTX += size } else { + po.pci.Result.PacketsNonProbePrimary++ po.pci.Result.BytesNonProbePrimary += size } } diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 062f13b9a..01bd53c3a 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -83,6 +83,7 @@ const ( streamAllocatorSignalAllocateAllTracks streamAllocatorSignalAdjustState streamAllocatorSignalEstimate + streamAllocatorSignalFeedback streamAllocatorSignalPeriodicPing streamAllocatorSignalProbeClusterSwitch streamAllocatorSignalSendProbe @@ -103,6 +104,8 @@ func (s streamAllocatorSignal) String() string { return "ADJUST_STATE" case streamAllocatorSignalEstimate: return "ESTIMATE" + case streamAllocatorSignalFeedback: + return "FEEDBACK" case streamAllocatorSignalPeriodicPing: return "PERIODIC_PING" case streamAllocatorSignalProbeClusterSwitch: @@ -435,11 +438,10 @@ func (s *StreamAllocator) OnREMB(downTrack *sfu.DownTrack, remb *rtcp.ReceiverEs // called when a new transport-cc feedback is received func (s *StreamAllocator) OnTransportCCFeedback(downTrack *sfu.DownTrack, fb *rtcp.TransportLayerCC) { - if s.sendSideBWEInterceptor != nil { - s.sendSideBWEInterceptor.WriteRTCP([]rtcp.Packet{fb}, nil) - } - - s.params.BWE.HandleTWCCFeedback(fb) + s.postEvent(Event{ + Signal: streamAllocatorSignalFeedback, + Data: fb, + }) } // called when target bitrate changes (send side bandwidth estimation) @@ -612,6 +614,8 @@ func (s *StreamAllocator) postEvent(event Event) { event.handleSignalAdjustState(event) case streamAllocatorSignalEstimate: event.handleSignalEstimate(event) + case streamAllocatorSignalFeedback: + event.handleSignalFeedback(event) case streamAllocatorSignalPeriodicPing: event.handleSignalPeriodicPing(event) case streamAllocatorSignalProbeClusterSwitch: @@ -660,7 +664,7 @@ func (s *StreamAllocator) handleSignalAdjustState(Event) { } func (s *StreamAllocator) handleSignalEstimate(event Event) { - receivedEstimate, _ := event.Data.(int64) + receivedEstimate := event.Data.(int64) // always update NACKs packetDelta, repeatedNackDelta := s.getNackDelta() @@ -673,6 +677,15 @@ func (s *StreamAllocator) handleSignalEstimate(event Event) { ) } +func (s *StreamAllocator) handleSignalFeedback(event Event) { + fb := event.Data.(*rtcp.TransportLayerCC) + if s.sendSideBWEInterceptor != nil { + s.sendSideBWEInterceptor.WriteRTCP([]rtcp.Packet{fb}, nil) + } + + s.params.BWE.HandleTWCCFeedback(fb) +} + func (s *StreamAllocator) handleSignalPeriodicPing(Event) { // if pause is allowed, there may be no packets sent and BWE could be congested state, // reset BWE if that persists for a while