Send side BWE - fixes (#3244)

* WIP

* no worker

* fixes

* use congested packet groups

* oldest group

* markers

* WIP

* WIP

* WIP

* WIP

* WIP

* clean up

* fmt

* consolidate

* store last packet only for bwe extension cases
This commit is contained in:
Raja Subramanian
2024-12-11 21:31:26 +05:30
committed by GitHub
parent d0f7eaeadb
commit 4b16017d09
14 changed files with 403 additions and 349 deletions
-3
View File
@@ -1018,9 +1018,6 @@ func (t *PCTransport) Close() {
if t.pacer != nil {
t.pacer.Stop()
}
if t.bwe != nil {
t.bwe.Stop()
}
_ = t.pc.Close()
-2
View File
@@ -65,8 +65,6 @@ type BWE interface {
Reset()
Stop()
HandleREMB(
receivedEstimate int64,
expectedBandwidthUsage int64,
-2
View File
@@ -28,8 +28,6 @@ func (n *NullBWE) SetBWEListener(_bweListener BWEListener) {}
func (n *NullBWE) Reset() {}
func (n *NullBWE) Stop() {}
func (n *NullBWE) RecordPacketSendAndGetSequenceNumber(
_atMicro int64,
_size int,
+310 -299
View File
@@ -18,8 +18,6 @@ import (
"sync"
"time"
"github.com/frostbyte73/core"
"github.com/gammazero/deque"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/livekit/protocol/logger"
@@ -40,22 +38,22 @@ func (c CongestionSignalConfig) IsTriggered(numGroups int, duration int64) bool
}
var (
DefaultQueuingDelayEarlyWarningCongestionSignalConfig = CongestionSignalConfig{
defaultQueuingDelayEarlyWarningCongestionSignalConfig = CongestionSignalConfig{
MinNumberOfGroups: 1,
MinDuration: 100 * time.Millisecond,
}
DefaultLossEarlyWarningCongestionSignalConfig = CongestionSignalConfig{
defaultLossEarlyWarningCongestionSignalConfig = CongestionSignalConfig{
MinNumberOfGroups: 2,
MinDuration: 200 * time.Millisecond,
}
DefaultQueuingDelayCongestedCongestionSignalConfig = CongestionSignalConfig{
defaultQueuingDelayCongestedCongestionSignalConfig = CongestionSignalConfig{
MinNumberOfGroups: 3,
MinDuration: 300 * time.Millisecond,
}
DefaultLossCongestedCongestionSignalConfig = CongestionSignalConfig{
defaultLossCongestedCongestionSignalConfig = CongestionSignalConfig{
MinNumberOfGroups: 5,
MinDuration: 600 * time.Millisecond,
}
@@ -101,7 +99,7 @@ func (p ProbeSignalConfig) ProbeSignal(ppg *probePacketGroup) (ccutils.ProbeSign
}
var (
DefaultProbeSignalConfig = ProbeSignalConfig{
defaultProbeSignalConfig = ProbeSignalConfig{
MinBytesRatio: 0.5,
MinDurationRatio: 0.5,
@@ -121,10 +119,11 @@ type qdMeasurement struct {
jqrMin int64
dqrMax int64
numGroups int
minSendTime int64
maxSendTime int64
isSealed bool
numGroups int
smallestGroupIdx int
minSendTime int64
maxSendTime int64
isSealed bool
}
func newQdMeasurement(
@@ -141,11 +140,15 @@ func newQdMeasurement(
}
}
func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup) {
func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) {
if q.isSealed {
return
}
if q.smallestGroupIdx == 0 || q.smallestGroupIdx > groupIdx {
q.smallestGroupIdx = groupIdx
}
pqd, pqdOk := pg.FinalizedPropagatedQueuingDelay()
if !pqdOk {
return
@@ -178,6 +181,10 @@ func (q *qdMeasurement) IsSealed() bool {
return q.isSealed
}
func (q *qdMeasurement) SmallestGroupIdx() int {
return q.smallestGroupIdx
}
func (q *qdMeasurement) IsEarlyWarningTriggered() bool {
return q.earlyWarningConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime)
}
@@ -192,6 +199,7 @@ func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error {
}
e.AddInt("numGroups", q.numGroups)
e.AddInt("smallestGroupIdx", q.smallestGroupIdx)
e.AddInt64("minSendTime", q.minSendTime)
e.AddInt64("maxSendTime", q.maxSendTime)
e.AddDuration("duration", time.Duration((q.maxSendTime-q.minSendTime)*1000))
@@ -208,11 +216,13 @@ type lossMeasurement struct {
congestedConfig CongestionSignalConfig
congestionMinLoss float64
numGroups int
ts *trafficStats
numGroups int
smallestGroupIdx int
ts *trafficStats
earlyWarningWeightedLoss float64
congestedWeightedLoss float64
earlyWarningWeightedLoss float64
earlyWarningWeightedLossDone bool
congestedWeightedLoss float64
isSealed bool
}
@@ -235,17 +245,22 @@ func newLossMeasurement(
}
}
func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup) {
if l.isSealed {
func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) {
if l.isSealed || !pg.IsFinalized() {
return
}
if l.smallestGroupIdx == 0 || l.smallestGroupIdx > groupIdx {
l.smallestGroupIdx = groupIdx
}
l.numGroups++
l.ts.Merge(pg.Traffic())
duration := l.ts.Duration()
if l.earlyWarningConfig.IsTriggered(l.numGroups, duration) {
if l.earlyWarningConfig.IsTriggered(l.numGroups, duration) && !l.earlyWarningWeightedLossDone {
l.earlyWarningWeightedLoss = l.ts.WeightedLoss()
l.earlyWarningWeightedLossDone = true
}
if l.congestedConfig.IsTriggered(l.numGroups, duration) {
l.congestedWeightedLoss = l.ts.WeightedLoss()
@@ -257,6 +272,10 @@ func (l *lossMeasurement) IsSealed() bool {
return l.isSealed
}
func (l *lossMeasurement) SmallestGroupIdx() int {
return l.smallestGroupIdx
}
func (l *lossMeasurement) IsEarlyWarningTriggered() bool {
return l.earlyWarningWeightedLoss > l.congestionMinLoss
}
@@ -271,6 +290,7 @@ func (l *lossMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error {
}
e.AddInt("numGroups", l.numGroups)
e.AddInt("smallestGroupIdx", l.smallestGroupIdx)
e.AddObject("ts", l.ts)
e.AddFloat64("earlyWarningWeightedLoss", l.earlyWarningWeightedLoss)
e.AddFloat64("congestedWeightedLoss", l.congestedWeightedLoss)
@@ -304,32 +324,33 @@ type CongestionDetectorConfig struct {
LossCongested CongestionSignalConfig `yaml:"loss_congested,omitempty"`
CongestedHangover time.Duration `yaml:"congested_hangover,omitempty"`
RateMeasurementWindowDurationMin time.Duration `yaml:"rate_measurement_window_duration_min,omitempty"`
RateMeasurementWindowDurationMax time.Duration `yaml:"rate_measurement_window_duration_max,omitempty"`
PeriodicCheckInterval time.Duration `yaml:"periodic_check_interval,omitempty"`
PeriodicCheckIntervalCongested time.Duration `yaml:"periodic_check_interval_congested,omitempty"`
CongestedCTRTrend ccutils.TrendDetectorConfig `yaml:"congested_ctr_trend,omitempty"`
CongestedCTREpsilon float64 `yaml:"congested_ctr_epsilon,omitempty"`
CongestedCTRTrend ccutils.TrendDetectorConfig `yaml:"congested_ctr_trend,omitempty"`
CongestedCTREpsilon float64 `yaml:"congested_ctr_epsilon,omitempty"`
CongestedPacketGroup PacketGroupConfig `yaml:"congested_packet_group,omitempty"`
}
var (
defaultTrendDetectorConfigCongestedCTR = ccutils.TrendDetectorConfig{
RequiredSamples: 5,
RequiredSamples: 4,
RequiredSamplesMin: 2,
DownwardTrendThreshold: -0.5,
DownwardTrendMaxWait: 5 * time.Second,
DownwardTrendMaxWait: 2 * time.Second,
CollapseThreshold: 500 * time.Millisecond,
ValidityWindow: 10 * time.Second,
}
DefaultCongestionDetectorConfig = CongestionDetectorConfig{
PacketGroup: DefaultPacketGroupConfig,
PacketGroupMaxAge: 15 * time.Second,
defaultCongestedPacketGroupConfig = PacketGroupConfig{
MinPackets: 20,
MaxWindowDuration: 150 * time.Millisecond,
}
ProbePacketGroup: DefaultPacketGroupConfigProbe,
defaultCongestionDetectorConfig = CongestionDetectorConfig{
PacketGroup: defaultPacketGroupConfig,
PacketGroupMaxAge: 10 * time.Second,
ProbePacketGroup: defaultProbePacketGroupConfig,
ProbeRegulator: ccutils.DefaultProbeRegulatorConfig,
ProbeSignal: DefaultProbeSignalConfig,
ProbeSignal: defaultProbeSignalConfig,
JQRMinDelay: 15 * time.Millisecond,
DQRMaxDelay: 5 * time.Millisecond,
@@ -337,31 +358,22 @@ var (
WeightedLoss: defaultWeightedLossConfig,
CongestionMinLoss: 0.25,
QueuingDelayEarlyWarning: DefaultQueuingDelayEarlyWarningCongestionSignalConfig,
LossEarlyWarning: DefaultLossEarlyWarningCongestionSignalConfig,
QueuingDelayEarlyWarning: defaultQueuingDelayEarlyWarningCongestionSignalConfig,
LossEarlyWarning: defaultLossEarlyWarningCongestionSignalConfig,
EarlyWarningHangover: 500 * time.Millisecond,
QueuingDelayCongested: DefaultQueuingDelayCongestedCongestionSignalConfig,
LossCongested: DefaultLossCongestedCongestionSignalConfig,
QueuingDelayCongested: defaultQueuingDelayCongestedCongestionSignalConfig,
LossCongested: defaultLossCongestedCongestionSignalConfig,
CongestedHangover: 3 * time.Second,
RateMeasurementWindowDurationMin: 800 * time.Millisecond,
RateMeasurementWindowDurationMax: 2 * time.Second,
PeriodicCheckInterval: 2 * time.Second,
PeriodicCheckIntervalCongested: 200 * time.Millisecond,
CongestedCTRTrend: defaultTrendDetectorConfigCongestedCTR,
CongestedCTREpsilon: 0.05,
CongestedCTRTrend: defaultTrendDetectorConfigCongestedCTR,
CongestedCTREpsilon: 0.05,
CongestedPacketGroup: defaultCongestedPacketGroupConfig,
}
)
// -------------------------------------------------------------------------------
type feedbackReport struct {
at time.Time
report *rtcp.TransportLayerCC
}
type congestionDetectorParams struct {
Config CongestionDetectorConfig
Logger logger.Logger
@@ -370,8 +382,7 @@ type congestionDetectorParams struct {
type congestionDetector struct {
params congestionDetectorParams
lock sync.RWMutex
feedbackReports deque.Deque[feedbackReport]
lock sync.Mutex
rtt float64
@@ -383,14 +394,13 @@ type congestionDetector struct {
probePacketGroup *probePacketGroup
probeRegulator *ccutils.ProbeRegulator
wake chan struct{}
stop core.Fuse
estimatedAvailableChannelCapacity int64
congestionState bwe.CongestionState
congestionStateSwitchedAt time.Time
congestedCTRTrend *ccutils.TrendDetector[float64]
congestedTrafficStats *trafficStats
congestedCTRTrend *ccutils.TrendDetector[float64]
congestedTrafficStats *trafficStats
congestedPacketGroup *packetGroup
bweListener bwe.BWEListener
}
@@ -398,27 +408,29 @@ type congestionDetector struct {
func newCongestionDetector(params congestionDetectorParams) *congestionDetector {
c := &congestionDetector{
params: params,
rtt: bwe.DefaultRTT,
packetTracker: newPacketTracker(packetTrackerParams{Logger: params.Logger}),
twccFeedback: newTWCCFeedback(twccFeedbackParams{Logger: params.Logger}),
probeRegulator: ccutils.NewProbeRegulator(ccutils.ProbeRegulatorParams{
Config: params.Config.ProbeRegulator,
Logger: params.Logger,
}),
wake: make(chan struct{}, 1),
estimatedAvailableChannelCapacity: 100_000_000,
congestionState: bwe.CongestionStateNone,
congestionStateSwitchedAt: mono.Now(),
}
c.Reset()
c.feedbackReports.SetMinCapacity(3)
go c.worker()
return c
}
func (c *congestionDetector) Stop() {
c.stop.Break()
func (c *congestionDetector) Reset() {
c.lock.Lock()
defer c.lock.Unlock()
c.rtt = bwe.DefaultRTT
c.packetGroups = nil
c.probePacketGroup = nil
c.probeRegulator = ccutils.NewProbeRegulator(ccutils.ProbeRegulatorParams{
Config: c.params.Config.ProbeRegulator,
Logger: c.params.Logger,
})
c.estimatedAvailableChannelCapacity = 100_000_000
c.congestionState = bwe.CongestionStateNone
c.congestionStateSwitchedAt = mono.Now()
c.clearCTRTrend()
}
func (c *congestionDetector) SetBWEListener(bweListener bwe.BWEListener) {
@@ -429,25 +441,163 @@ func (c *congestionDetector) SetBWEListener(bweListener bwe.BWEListener) {
}
func (c *congestionDetector) getBWEListener() bwe.BWEListener {
c.lock.RLock()
defer c.lock.RUnlock()
c.lock.Lock()
defer c.lock.Unlock()
return c.bweListener
}
func (c *congestionDetector) HandleTWCCFeedback(report *rtcp.TransportLayerCC) {
c.lock.Lock()
c.feedbackReports.PushBack(feedbackReport{mono.Now(), report})
recvRefTime, isOutOfOrder := c.twccFeedback.ProcessReport(report, mono.Now())
if isOutOfOrder {
c.params.Logger.Infow("send side bwe: received out-of-order feedback report")
}
if len(c.packetGroups) == 0 {
c.packetGroups = append(
c.packetGroups,
newPacketGroup(
packetGroupParams{
Config: c.params.Config.PacketGroup,
WeightedLoss: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
},
0,
),
)
}
pg := c.packetGroups[len(c.packetGroups)-1]
trackPacketGroup := func(pi *packetInfo, sendDelta, recvDelta int64, isLost bool) {
if pi == nil {
return
}
c.updateCTRTrend(pi, sendDelta, recvDelta, isLost)
if c.probePacketGroup != nil {
c.probePacketGroup.Add(pi, sendDelta, recvDelta, isLost)
}
err := pg.Add(pi, sendDelta, recvDelta, isLost)
if err == nil {
return
}
if err == errGroupFinalized {
// previous group ended, start a new group
pg = newPacketGroup(
packetGroupParams{
Config: c.params.Config.PacketGroup,
WeightedLoss: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
},
pg.PropagatedQueuingDelay(),
)
c.packetGroups = append(c.packetGroups, pg)
if err = pg.Add(pi, sendDelta, recvDelta, isLost); err != nil {
c.params.Logger.Warnw("send side bwe: could not add packet to new packet group", err, "packetInfo", pi, "packetGroup", pg)
}
return
}
// try an older group
for idx := len(c.packetGroups) - 2; idx >= 0; idx-- {
opg := c.packetGroups[idx]
if err := opg.Add(pi, sendDelta, recvDelta, isLost); err == nil {
return
} else if err == errGroupFinalized {
c.params.Logger.Infow("send side bwe: unexpected finalized group", "packetInfo", pi, "packetGroup", opg)
}
}
}
// 1. go through the TWCC feedback report and record recive time as reported by remote
// 2. process acknowledged packet and group them
//
// losses are not recorded if a feedback report is completely lost.
// RFC recommends treating lost reports by ignoring packets that would have been in it.
// -----------------------------------------------------------------------------------
// | From a congestion control perspective, lost feedback messages are |
// | handled by ignoring packets which would have been reported as lost or |
// | received in the lost feedback messages. This behavior is similar to |
// | how a lost RTCP receiver report is handled. |
// -----------------------------------------------------------------------------------
// Reference: https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01#page-4
sequenceNumber := report.BaseSequenceNumber
endSequenceNumberExclusive := sequenceNumber + report.PacketStatusCount
deltaIdx := 0
for _, chunk := range report.PacketChunks {
if sequenceNumber == endSequenceNumberExclusive {
break
}
switch chunk := chunk.(type) {
case *rtcp.RunLengthChunk:
for i := uint16(0); i < chunk.RunLength; i++ {
if sequenceNumber == endSequenceNumberExclusive {
break
}
recvTime := int64(0)
isLost := false
if chunk.PacketStatusSymbol != rtcp.TypeTCCPacketNotReceived {
recvRefTime += report.RecvDeltas[deltaIdx].Delta
deltaIdx++
recvTime = recvRefTime
} else {
isLost = true
}
pi, sendDelta, recvDelta := c.packetTracker.RecordPacketIndicationFromRemote(sequenceNumber, recvTime)
if pi.sendTime != 0 {
trackPacketGroup(&pi, sendDelta, recvDelta, isLost)
}
sequenceNumber++
}
case *rtcp.StatusVectorChunk:
for _, symbol := range chunk.SymbolList {
if sequenceNumber == endSequenceNumberExclusive {
break
}
recvTime := int64(0)
isLost := false
if symbol != rtcp.TypeTCCPacketNotReceived {
recvRefTime += report.RecvDeltas[deltaIdx].Delta
deltaIdx++
recvTime = recvRefTime
} else {
isLost = true
}
pi, sendDelta, recvDelta := c.packetTracker.RecordPacketIndicationFromRemote(sequenceNumber, recvTime)
if pi.sendTime != 0 {
trackPacketGroup(&pi, sendDelta, recvDelta, isLost)
}
sequenceNumber++
}
}
}
c.prunePacketGroups()
shouldNotify, state, committedChannelCapacity := c.congestionDetectionStateMachine()
c.lock.Unlock()
// notify worker of a new feedback
select {
case c.wake <- struct{}{}:
default:
if shouldNotify {
if bweListener := c.getBWEListener(); bweListener != nil {
bweListener.OnCongestionStateChange(state, committedChannelCapacity)
}
}
}
func (c *congestionDetector) UpdateRTT(rtt float64) {
c.lock.Lock()
defer c.lock.Unlock()
if rtt == 0 {
c.rtt = bwe.DefaultRTT
} else {
@@ -566,23 +716,33 @@ func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool,
var idx int
for idx = len(c.packetGroups) - 1; idx >= 0; idx-- {
pg := c.packetGroups[idx]
qdMeasurement.ProcessPacketGroup(pg)
lossMeasurement.ProcessPacketGroup(pg)
qdMeasurement.ProcessPacketGroup(pg, idx)
lossMeasurement.ProcessPacketGroup(pg, idx)
// if both measurements have enough data to make a decision, stop processing groups
if qdMeasurement.IsSealed() && lossMeasurement.IsSealed() {
break
}
// if "congested" triggered, can stop as that is the longer duration check and also
// the worst case check, i. e. if "congested" is triggered due to any condition,
// there can be nothing else that can trigger
if qdMeasurement.IsCongestedTriggered() || lossMeasurement.IsCongestedTriggered() {
break
}
}
oldestContributingGroup := max(0, idx)
earlyWarningReason := ""
earlyWarningTriggered := qdMeasurement.IsEarlyWarningTriggered()
if earlyWarningTriggered {
earlyWarningReason = "queuing-delay"
oldestContributingGroup = qdMeasurement.SmallestGroupIdx()
} else {
earlyWarningTriggered = lossMeasurement.IsEarlyWarningTriggered()
if earlyWarningTriggered {
earlyWarningReason = "loss"
oldestContributingGroup = lossMeasurement.SmallestGroupIdx()
}
}
@@ -590,17 +750,19 @@ func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool,
congestedTriggered := qdMeasurement.IsCongestedTriggered()
if congestedTriggered {
congestedReason = "queuing-delay"
oldestContributingGroup = qdMeasurement.SmallestGroupIdx()
} else {
congestedTriggered = lossMeasurement.IsCongestedTriggered()
if congestedTriggered {
congestedReason = "loss"
oldestContributingGroup = lossMeasurement.SmallestGroupIdx()
}
}
return earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, max(0, idx)
return earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, oldestContributingGroup
}
func (c *congestionDetector) congestionDetectionStateMachine() {
func (c *congestionDetector) congestionDetectionStateMachine() (bool, bwe.CongestionState, int64) {
state := c.congestionState
newState := c.congestionState
reason := ""
@@ -653,12 +815,39 @@ func (c *congestionDetector) congestionDetectionStateMachine() {
}
}
c.estimateAvailableChannelCapacity()
c.estimateAvailableChannelCapacity(oldestContributingGroup)
// update after running the above estimate as state change callback includes the estimated available channel capacity
shouldNotify := false
if newState != state {
c.updateCongestionState(newState, reason, oldestContributingGroup)
shouldNotify = true
}
if c.congestedCTRTrend != nil && c.congestedCTRTrend.GetDirection() == ccutils.TrendDirectionDownward {
shouldNotify = true
congestedAckedBitrate := c.congestedTrafficStats.AcknowledgedBitrate()
if congestedAckedBitrate < c.estimatedAvailableChannelCapacity {
c.estimatedAvailableChannelCapacity = congestedAckedBitrate
}
c.params.Logger.Infow(
"send side bwe: captured traffic ratio is trending downward",
"channel", c.congestedCTRTrend,
"trafficStats", c.congestedTrafficStats,
"estimatedAvailableChannelCapacity", c.estimatedAvailableChannelCapacity,
)
// reset to get new set of samples for next trend
c.resetCTRTrend()
}
return shouldNotify, c.congestionState, c.estimatedAvailableChannelCapacity
}
func (c *congestionDetector) createCTRTrend() {
c.resetCTRTrend()
c.congestedPacketGroup = nil
}
func (c *congestionDetector) resetCTRTrend() {
@@ -676,42 +865,53 @@ func (c *congestionDetector) resetCTRTrend() {
func (c *congestionDetector) clearCTRTrend() {
c.congestedCTRTrend = nil
c.congestedTrafficStats = nil
c.congestedPacketGroup = nil
}
func (c *congestionDetector) updateCTRTrend(pg *packetGroup) {
func (c *congestionDetector) updateCTRTrend(pi *packetInfo, sendDelta, recvDelta int64, isLost bool) {
if c.congestedCTRTrend == nil {
return
}
// progressively keep increasing the window and make measurements over longer windows,
// if congestion is not relieving, CTR will trend down
c.congestedTrafficStats.Merge(pg.Traffic())
ctr := c.congestedTrafficStats.CapturedTrafficRatio()
// quantise CTR to filter out small changes
c.congestedCTRTrend.AddValue(float64(int((ctr+(c.params.Config.CongestedCTREpsilon/2))/c.params.Config.CongestedCTREpsilon)) * c.params.Config.CongestedCTREpsilon)
if c.congestedCTRTrend.GetDirection() != ccutils.TrendDirectionDownward {
return
if c.congestedPacketGroup == nil {
c.congestedPacketGroup = newPacketGroup(
packetGroupParams{
Config: c.params.Config.CongestedPacketGroup,
WeightedLoss: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
},
0,
)
}
c.params.Logger.Infow("send side bwe: captured traffic ratio is trending downward", "channel", c.congestedCTRTrend)
if err := c.congestedPacketGroup.Add(pi, sendDelta, recvDelta, isLost); err == errGroupFinalized {
// progressively keep increasing the window and make measurements over longer windows,
// if congestion is not relieving, CTR will trend down
c.congestedTrafficStats.Merge(c.congestedPacketGroup.Traffic())
if bweListener := c.getBWEListener(); bweListener != nil {
bweListener.OnCongestionStateChange(c.congestionState, c.estimatedAvailableChannelCapacity)
ts := newTrafficStats(trafficStatsParams{
Config: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
})
ts.Merge(c.congestedPacketGroup.Traffic())
ctr := ts.CapturedTrafficRatio()
// quantise CTR to filter out small changes
c.congestedCTRTrend.AddValue(float64(int((ctr+(c.params.Config.CongestedCTREpsilon/2))/c.params.Config.CongestedCTREpsilon)) * c.params.Config.CongestedCTREpsilon)
c.congestedPacketGroup = newPacketGroup(
packetGroupParams{
Config: c.params.Config.CongestedPacketGroup,
WeightedLoss: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
},
c.congestedPacketGroup.PropagatedQueuingDelay(),
)
}
// reset to get new set of samples for next trend
c.resetCTRTrend()
}
func (c *congestionDetector) estimateAvailableChannelCapacity() {
if len(c.packetGroups) == 0 || c.probePacketGroup != nil {
return
}
threshold, ok := c.packetTracker.BaseSendTimeThreshold(c.params.Config.RateMeasurementWindowDurationMax.Microseconds())
if !ok {
func (c *congestionDetector) estimateAvailableChannelCapacity(oldestContributingGroup int) {
if len(c.packetGroups) == 0 || c.congestedCTRTrend != nil || c.probePacketGroup != nil {
return
}
@@ -719,26 +919,15 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() {
Config: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
})
var idx int
for idx = len(c.packetGroups) - 1; idx >= 0; idx-- {
for idx := len(c.packetGroups) - 1; idx >= oldestContributingGroup; idx-- {
pg := c.packetGroups[idx]
if mst := pg.MinSendTime(); mst != 0 && mst < threshold {
break
if !pg.IsFinalized() {
continue
}
agg.Merge(pg.Traffic())
}
if agg.Duration() < c.params.Config.RateMeasurementWindowDurationMin.Microseconds() {
c.params.Logger.Infow(
"send side bwe: not enough data to estimate available channel capacity",
"duration", agg.Duration(),
"numGroups", len(c.packetGroups),
"oldestUsed", max(0, idx),
)
return
}
c.estimatedAvailableChannelCapacity = agg.AcknowledgedBitrate()
}
@@ -761,10 +950,6 @@ func (c *congestionDetector) updateCongestionState(state bwe.CongestionState, re
prevState := c.congestionState
c.congestionState = state
if bweListener := c.getBWEListener(); bweListener != nil {
bweListener.OnCongestionStateChange(state, c.estimatedAvailableChannelCapacity)
}
// when in congested state, monitor changes in captured traffic ratio (CTR)
// to ensure allocations are in line with latest estimates, it is possible that
// the estimate is incorrect when congestion starts and the allocation may be
@@ -772,182 +957,8 @@ func (c *congestionDetector) updateCongestionState(state bwe.CongestionState, re
// on a continuous basis allocations can be adjusted in the direction of
// reducing/relieving congestion
if state == bwe.CongestionStateCongested && prevState != bwe.CongestionStateCongested {
c.resetCTRTrend()
c.createCTRTrend()
} else if state != bwe.CongestionStateCongested {
c.clearCTRTrend()
}
}
func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) {
recvRefTime, isOutOfOrder := c.twccFeedback.ProcessReport(fbr.report, fbr.at)
if isOutOfOrder {
c.params.Logger.Infow("send side bwe: received out-of-order feedback report")
}
if len(c.packetGroups) == 0 {
c.packetGroups = append(
c.packetGroups,
newPacketGroup(
packetGroupParams{
Config: c.params.Config.PacketGroup,
WeightedLoss: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
},
0,
),
)
}
pg := c.packetGroups[len(c.packetGroups)-1]
trackPacketGroup := func(pi *packetInfo, sendDelta, recvDelta int64, isLost bool) {
if pi == nil {
return
}
if c.probePacketGroup != nil {
c.probePacketGroup.Add(pi, sendDelta, recvDelta, isLost)
}
err := pg.Add(pi, sendDelta, recvDelta, isLost)
if err == nil {
return
}
if err == errGroupFinalized {
// previous group ended, start a new group
c.updateCTRTrend(pg)
pg = newPacketGroup(
packetGroupParams{
Config: c.params.Config.PacketGroup,
WeightedLoss: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
},
pg.PropagatedQueuingDelay(),
)
c.packetGroups = append(c.packetGroups, pg)
if err = pg.Add(pi, sendDelta, recvDelta, isLost); err != nil {
c.params.Logger.Warnw("send side bwe: could not add packet to new packet group", err, "packetInfo", pi, "packetGroup", pg)
}
return
}
// try an older group
for idx := len(c.packetGroups) - 2; idx >= 0; idx-- {
opg := c.packetGroups[idx]
if err := opg.Add(pi, sendDelta, recvDelta, isLost); err == nil {
return
} else if err == errGroupFinalized {
c.params.Logger.Infow("send side bwe: unexpected finalized group", "packetInfo", pi, "packetGroup", opg)
}
}
}
// 1. go through the TWCC feedback report and record recive time as reported by remote
// 2. process acknowledged packet and group them
//
// losses are not recorded if a feedback report is completely lost.
// RFC recommends treating lost reports by ignoring packets that would have been in it.
// -----------------------------------------------------------------------------------
// | From a congestion control perspective, lost feedback messages are |
// | handled by ignoring packets which would have been reported as lost or |
// | received in the lost feedback messages. This behavior is similar to |
// | how a lost RTCP receiver report is handled. |
// -----------------------------------------------------------------------------------
// Reference: https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01#page-4
sequenceNumber := fbr.report.BaseSequenceNumber
endSequenceNumberExclusive := sequenceNumber + fbr.report.PacketStatusCount
deltaIdx := 0
for _, chunk := range fbr.report.PacketChunks {
if sequenceNumber == endSequenceNumberExclusive {
break
}
switch chunk := chunk.(type) {
case *rtcp.RunLengthChunk:
for i := uint16(0); i < chunk.RunLength; i++ {
if sequenceNumber == endSequenceNumberExclusive {
break
}
recvTime := int64(0)
isLost := false
if chunk.PacketStatusSymbol != rtcp.TypeTCCPacketNotReceived {
recvRefTime += fbr.report.RecvDeltas[deltaIdx].Delta
deltaIdx++
recvTime = recvRefTime
} else {
isLost = true
}
pi, sendDelta, recvDelta := c.packetTracker.RecordPacketIndicationFromRemote(sequenceNumber, recvTime)
if pi.sendTime != 0 {
trackPacketGroup(&pi, sendDelta, recvDelta, isLost)
}
sequenceNumber++
}
case *rtcp.StatusVectorChunk:
for _, symbol := range chunk.SymbolList {
if sequenceNumber == endSequenceNumberExclusive {
break
}
recvTime := int64(0)
isLost := false
if symbol != rtcp.TypeTCCPacketNotReceived {
recvRefTime += fbr.report.RecvDeltas[deltaIdx].Delta
deltaIdx++
recvTime = recvRefTime
} else {
isLost = true
}
pi, sendDelta, recvDelta := c.packetTracker.RecordPacketIndicationFromRemote(sequenceNumber, recvTime)
if pi.sendTime != 0 {
trackPacketGroup(&pi, sendDelta, recvDelta, isLost)
}
sequenceNumber++
}
}
}
c.prunePacketGroups()
c.congestionDetectionStateMachine()
}
func (c *congestionDetector) worker() {
ticker := time.NewTicker(c.params.Config.PeriodicCheckInterval)
defer ticker.Stop()
for {
select {
case <-c.wake:
for {
c.lock.Lock()
if c.feedbackReports.Len() == 0 {
c.lock.Unlock()
break
}
fbReport := c.feedbackReports.PopFront()
c.lock.Unlock()
c.processFeedbackReport(fbReport)
}
if c.congestionState == bwe.CongestionStateCongested {
ticker.Reset(c.params.Config.PeriodicCheckIntervalCongested)
} else {
ticker.Reset(c.params.Config.PeriodicCheckInterval)
}
case <-ticker.C:
c.prunePacketGroups()
c.congestionDetectionStateMachine()
case <-c.stop.Watch():
return
}
}
}
+5 -1
View File
@@ -38,7 +38,7 @@ type PacketGroupConfig struct {
}
var (
DefaultPacketGroupConfig = PacketGroupConfig{
defaultPacketGroupConfig = PacketGroupConfig{
MinPackets: 20,
MaxWindowDuration: 500 * time.Millisecond,
}
@@ -246,6 +246,10 @@ func (p *packetGroup) FinalizedPropagatedQueuingDelay() (int64, bool) {
return p.PropagatedQueuingDelay(), true
}
func (p *packetGroup) IsFinalized() bool {
return p.isFinalized
}
func (p *packetGroup) Traffic() *trafficStats {
return &trafficStats{
minSendTime: p.minSendTime,
@@ -35,7 +35,7 @@ type ProbePacketGroupConfig struct {
var (
// large numbers to treat a probe packet group as one
DefaultPacketGroupConfigProbe = ProbePacketGroupConfig{
defaultProbePacketGroupConfig = ProbePacketGroupConfig{
PacketGroup: PacketGroupConfig{
MinPackets: 16384,
MaxWindowDuration: time.Minute,
+2 -9
View File
@@ -59,7 +59,7 @@ type SendSideBWEConfig struct {
var (
DefaultSendSideBWEConfig = SendSideBWEConfig{
CongestionDetector: DefaultCongestionDetectorConfig,
CongestionDetector: defaultCongestionDetectorConfig,
}
)
@@ -93,14 +93,7 @@ func (s *SendSideBWE) SetBWEListener(bweListener bwe.BWEListener) {
}
func (s *SendSideBWE) Reset() {
s.congestionDetector = newCongestionDetector(congestionDetectorParams{
Config: s.params.Config.CongestionDetector,
Logger: s.params.Logger,
})
}
func (s *SendSideBWE) Stop() {
s.congestionDetector.Stop()
s.congestionDetector.Reset()
}
func (s *SendSideBWE) RecordPacketSendAndGetSequenceNumber(
+19 -8
View File
@@ -25,14 +25,16 @@ import (
// -----------------------------------------------------------
type WeightedLossConfig struct {
MinPacketsForLossValidity int `yaml:"min_packets_for_loss_validity,omitempty"`
LossPenaltyFactor float64 `yaml:"loss_penalty_factor,omitempty"`
MinDurationForLossValidity time.Duration `yaml:"min_duration_for_loss_validity,omitempty"`
MinPPSForLossValidity int `yaml:"min_pps_for_loss_validity,omitempty"`
LossPenaltyFactor float64 `yaml:"loss_penalty_factor,omitempty"`
}
var (
defaultWeightedLossConfig = WeightedLossConfig{
MinPacketsForLossValidity: 20,
LossPenaltyFactor: 0.25,
MinDurationForLossValidity: 200 * time.Millisecond,
MinPPSForLossValidity: 20,
LossPenaltyFactor: 0.25,
}
)
@@ -101,7 +103,7 @@ func (ts *trafficStats) CapturedTrafficRatio() float64 {
}
// apply a penalty for lost packets,
// tha rationale being packet dropping is a strategy to relieve congestion
// the rationale being packet dropping is a strategy to relieve congestion
// and if they were not dropped, they would have increased queuing delay,
// as it is not possible to know the reason for the losses,
// apply a small penalty to receive delta aggregate to simulate those packets
@@ -110,8 +112,14 @@ func (ts *trafficStats) CapturedTrafficRatio() float64 {
}
func (ts *trafficStats) WeightedLoss() float64 {
durationMicro := ts.Duration()
if time.Duration(durationMicro*1000) < ts.params.Config.MinDurationForLossValidity {
return 0.0
}
totalPackets := float64(ts.lostPackets + ts.ackedPackets)
if int(totalPackets) < ts.params.Config.MinPacketsForLossValidity {
pps := totalPackets * 1e6 / float64(durationMicro)
if int(pps) < ts.params.Config.MinPPSForLossValidity {
return 0.0
}
@@ -120,8 +128,6 @@ func (ts *trafficStats) WeightedLoss() float64 {
lossRatio = float64(ts.lostPackets) / totalPackets
}
pps := totalPackets * 1e6 / float64(ts.Duration())
// Log10 is used to give higher weight for the same loss ratio at higher packet rates,
// for e.g.
// - 10% loss at 20 pps = 0.1 * log10(20) = 0.130
@@ -144,6 +150,11 @@ func (ts *trafficStats) MarshalLogObject(e zapcore.ObjectEncoder) error {
duration := time.Duration(ts.Duration() * 1000)
e.AddDuration("duration", duration)
e.AddInt("ackedPackets", ts.ackedPackets)
e.AddInt("ackedBytes", ts.ackedBytes)
e.AddInt("lostPackets", ts.lostPackets)
e.AddInt("lostBytes", ts.lostBytes)
bitrate := float64(0)
if duration != 0 {
bitrate = float64(ts.ackedBytes*8) / duration.Seconds()
+6 -1
View File
@@ -353,7 +353,12 @@ func (p ProbeClusterResult) Duration() time.Duration {
}
func (p ProbeClusterResult) Bitrate() float64 {
return float64(p.Bytes()*8) / p.Duration().Seconds()
duration := p.Duration().Seconds()
if duration != 0 {
return float64(p.Bytes()*8) / duration
}
return 0
}
func (p ProbeClusterResult) MarshalLogObject(e zapcore.ObjectEncoder) error {
+17 -11
View File
@@ -1874,11 +1874,15 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
d.params.Logger.Errorw("could not unmarshal rtp packet in retransmit", err)
continue
}
pkt.Header.Marker = epm.marker
pkt.Header.SequenceNumber = epm.targetSeqNo
pkt.Header.Timestamp = epm.timestamp
pkt.Header.SSRC = d.ssrc
pkt.Header.PayloadType = d.getTranslatedPayloadType(pkt.Header.PayloadType)
hdr := &rtp.Header{
Version: pkt.Header.Version,
Padding: pkt.Header.Padding,
Marker: epm.marker,
PayloadType: d.getTranslatedPayloadType(pkt.Header.PayloadType),
SequenceNumber: epm.targetSeqNo,
Timestamp: epm.timestamp,
SSRC: d.ssrc,
}
poolEntity := PacketFactory.Get().(*[]byte)
payload := *poolEntity
@@ -1899,26 +1903,28 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
} else {
ddBytes = epm.ddBytes[:epm.ddBytesSize]
}
pkt.Header.SetExtension(uint8(d.dependencyDescriptorExtID), ddBytes)
if len(ddBytes) != 0 {
hdr.SetExtension(uint8(d.dependencyDescriptorExtID), ddBytes)
}
}
if d.absCaptureTimeExtID != 0 && len(epm.actBytes) != 0 {
pkt.Header.SetExtension(uint8(d.absCaptureTimeExtID), epm.actBytes)
hdr.SetExtension(uint8(d.absCaptureTimeExtID), epm.actBytes)
}
d.addDummyExtensions(&pkt.Header)
d.addDummyExtensions(hdr)
headerSize := pkt.Header.MarshalSize()
headerSize := hdr.MarshalSize()
d.rtpStats.Update(
mono.UnixNano(),
epm.extSequenceNumber,
epm.extTimestamp,
pkt.Header.Marker,
hdr.Marker,
headerSize,
len(payload),
0,
true,
)
d.pacer.Enqueue(&pacer.Packet{
Header: &pkt.Header,
Header: hdr,
HeaderSize: headerSize,
Payload: payload,
ProbeClusterId: ccutils.ProbeClusterId(d.probeClusterId.Load()),
+17 -6
View File
@@ -23,6 +23,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
"github.com/pion/rtp"
"go.uber.org/atomic"
)
type Base struct {
@@ -30,6 +31,8 @@ type Base struct {
bwe bwe.BWE
lastPacketSentAt atomic.Int64
*ProbeObserver
}
@@ -47,6 +50,10 @@ func (b *Base) SetInterval(_interval time.Duration) {
func (b *Base) SetBitrate(_bitrate int) {
}
func (b *Base) TimeSinceLastSentPacket() time.Duration {
return time.Duration(mono.UnixNano() - b.lastPacketSentAt.Load())
}
func (b *Base) SendPacket(p *Packet) (int, error) {
defer func() {
if p.Pool != nil && p.PoolEntity != nil {
@@ -56,7 +63,7 @@ func (b *Base) SendPacket(p *Packet) (int, error) {
err := b.patchRTPHeaderExtensions(p)
if err != nil {
b.logger.Errorw("writing rtp header extensions err", err)
b.logger.Errorw("patching rtp header extensions err", err)
return 0, err
}
@@ -76,15 +83,17 @@ func (b *Base) SendPacket(p *Packet) (int, error) {
func (b *Base) patchRTPHeaderExtensions(p *Packet) error {
sendingAt := mono.Now()
if p.AbsSendTimeExtID != 0 {
sendTime := rtp.NewAbsSendTimeExtension(sendingAt)
b, err := sendTime.Marshal()
absSendTime := rtp.NewAbsSendTimeExtension(sendingAt)
absSendTimeBytes, err := absSendTime.Marshal()
if err != nil {
return err
}
if err = p.Header.SetExtension(p.AbsSendTimeExtID, b); err != nil {
if err = p.Header.SetExtension(p.AbsSendTimeExtID, absSendTimeBytes); err != nil {
return err
}
b.lastPacketSentAt.Store(sendingAt.UnixNano())
}
packetSize := p.HeaderSize + len(p.Payload)
@@ -99,14 +108,16 @@ func (b *Base) patchRTPHeaderExtensions(p *Packet) error {
twccExt := rtp.TransportCCExtension{
TransportSequence: twccSN,
}
b, err := twccExt.Marshal()
twccExtBytes, err := twccExt.Marshal()
if err != nil {
return err
}
if err = p.Header.SetExtension(p.TransportWideExtID, b); err != nil {
if err = p.Header.SetExtension(p.TransportWideExtID, twccExtBytes); err != nil {
return err
}
b.lastPacketSentAt.Store(sendingAt.UnixNano())
}
b.ProbeObserver.RecordPacket(packetSize, p.IsRTX, p.ProbeClusterId, p.IsProbe)
+5
View File
@@ -48,6 +48,11 @@ func NewNoQueue(logger logger.Logger, bwe bwe.BWE) *NoQueue {
func (n *NoQueue) Stop() {
n.stop.Break()
select {
case n.wake <- struct{}{}:
default:
}
}
func (n *NoQueue) Enqueue(p *Packet) {
+2
View File
@@ -44,6 +44,8 @@ type Pacer interface {
SetInterval(interval time.Duration)
SetBitrate(bitrate int)
TimeSinceLastSentPacket() time.Duration
SetPacerProbeObserverListener(listener PacerProbeObserverListener)
StartProbeCluster(pci ccutils.ProbeClusterInfo)
EndProbeCluster(probeClusterId ccutils.ProbeClusterId) ccutils.ProbeClusterInfo
+19 -6
View File
@@ -149,20 +149,26 @@ const (
)
type StreamAllocatorConfig struct {
ProbeMode ProbeMode `yaml:"probe_mode,omitempty"`
MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"`
DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"`
MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"`
DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"`
ProbeOveragePct int64 `yaml:"probe_overage_pct,omitempty"`
ProbeMinBps int64 `yaml:"probe_min_bps,omitempty"`
ProbeMode ProbeMode `yaml:"probe_mode,omitempty"`
ProbeOveragePct int64 `yaml:"probe_overage_pct,omitempty"`
ProbeMinBps int64 `yaml:"probe_min_bps,omitempty"`
PausedMinWait time.Duration `yaml:"paused_min_wait,omitempty"`
}
var (
DefaultStreamAllocatorConfig = StreamAllocatorConfig{
ProbeMode: ProbeModePadding,
MinChannelCapacity: 0,
DisableEstimationUnmanagedTracks: false,
ProbeMode: ProbeModePadding,
ProbeOveragePct: 120,
ProbeMinBps: 200_000,
PausedMinWait: 5 * time.Second,
}
)
@@ -655,6 +661,13 @@ func (s *StreamAllocator) handleSignalEstimate(event Event) {
}
func (s *StreamAllocator) handleSignalPeriodicPing(Event) {
// if pause is allowed, there may be no packets sent and BWE could be congested state,
// reset BWE if that persists for a while
if s.state == streamAllocatorStateDeficient && s.params.Pacer.TimeSinceLastSentPacket() > s.params.Config.PausedMinWait {
s.params.Logger.Infow("stream allocator: resetting bwe to enable probing")
s.params.BWE.Reset()
}
// finalize any probe that may have finished/aborted
if s.activeProbeClusterId != ccutils.ProbeClusterIdInvalid {
if probeSignal, channelCapacity, isFinalized := s.params.BWE.ProbeClusterFinalize(); isFinalized {