Use weighted loss to detect loss based congesiton signal. (#3169)

* Use weighted loss to detect loss based congesiton signal.

- Increase JQR min loss to 0.25.
- Use weighted loss ratio so that more packet rate gets higher
  weightage. At default config, 10 packets in 1/2 second will form a
  valid packet group for loss based congestion signal consideration. Two
  packets lost in that group may not be bad. So, bumped up the
  JQR min loss to 0.25. However, 20% loss (or even much lesser loss)
  could be problematic if the packet rate is higher (potentially
  multiple streams affected and there could be a lot of NACKs as a result).
  So, weight it by packet rate so that higher packet rates enter JQR
  at lower losses.

* WIP

* use aggregated loss
This commit is contained in:
Raja Subramanian
2024-11-12 09:21:28 +05:30
committed by GitHub
parent 57b3dfdcf4
commit a825661aff
7 changed files with 488 additions and 206 deletions
+227 -101
View File
@@ -1,3 +1,17 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sendsidebwe
import (
@@ -43,36 +57,138 @@ var (
// -------------------------------------------------------------------------------
type congestionSignalCalculator[T int64 | float64] struct {
thresholdMin T
thresholdMax T
isSealed bool
numGroups int
duration int64
type qdMeasurement struct {
earlyWarningConfig CongestionSignalConfig
congestedConfig CongestionSignalConfig
jqrMin int64
dqrMax int64
numGroups int
minSendTime int64
maxSendTime int64
isSealed bool
}
func (c *congestionSignalCalculator[T]) processSample(val T, duration int64) {
if c.isSealed {
return
}
if val < c.thresholdMin {
// any DQR group breaks the continuity
c.isSealed = true
return
}
// INDETERMINATE group is treated as a no-op
// JQR group builds up congestion signal
if val > c.thresholdMax {
c.numGroups++
c.duration += duration
func newQdMeasurement(
earlyWarningConfig CongestionSignalConfig,
congestedConfig CongestionSignalConfig,
jqrMin int64,
dqrMax int64,
) *qdMeasurement {
return &qdMeasurement{
earlyWarningConfig: earlyWarningConfig,
congestedConfig: congestedConfig,
jqrMin: jqrMin,
dqrMax: dqrMax,
}
}
func (c *congestionSignalCalculator[T]) isTriggered(config CongestionSignalConfig) bool {
return c.numGroups >= config.MinNumberOfGroups && c.duration >= config.MinDuration.Microseconds()
func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup) {
if q.isSealed {
return
}
pqd, pqdOk := pg.PropagatedQueuingDelay()
if !pqdOk {
return
}
if pqd < q.dqrMax {
// a DQR breaks continuity
q.isSealed = true
return
}
if pqd > q.jqrMin {
q.numGroups++
minSendTime, maxSendTime := pg.SendWindow()
if q.minSendTime == 0 || minSendTime < q.minSendTime {
q.minSendTime = minSendTime
}
if maxSendTime > q.maxSendTime {
q.maxSendTime = maxSendTime
}
}
// can seal if congested config thresholds are met as they are longer
if q.numGroups >= q.congestedConfig.MinNumberOfGroups && (q.maxSendTime-q.minSendTime) >= q.congestedConfig.MinDuration.Microseconds() {
q.isSealed = true
}
}
func (q *qdMeasurement) IsSealed() bool {
return q.isSealed
}
func (q *qdMeasurement) IsEarlyWarningTriggered() bool {
return q.numGroups >= q.earlyWarningConfig.MinNumberOfGroups && (q.maxSendTime-q.minSendTime) >= q.earlyWarningConfig.MinDuration.Microseconds()
}
func (q *qdMeasurement) IsCongestedTriggered() bool {
return q.numGroups >= q.congestedConfig.MinNumberOfGroups && (q.maxSendTime-q.minSendTime) >= q.congestedConfig.MinDuration.Microseconds()
}
// -------------------------------------------------------------------------------
type lossMeasurement struct {
earlyWarningConfig CongestionSignalConfig
congestedConfig CongestionSignalConfig
congestionMinLoss float64
numGroups int
ts *trafficStats
earlyWarningWeightedLoss float64
congestedWeightedLoss float64
isSealed bool
}
func newLossMeasurement(
earlyWarningConfig CongestionSignalConfig,
congestedConfig CongestionSignalConfig,
weightedLossConfig WeightedLossConfig,
congestionMinLoss float64,
logger logger.Logger,
) *lossMeasurement {
return &lossMeasurement{
earlyWarningConfig: earlyWarningConfig,
congestedConfig: congestedConfig,
congestionMinLoss: congestionMinLoss,
ts: newTrafficStats(trafficStatsParams{
Config: weightedLossConfig,
Logger: logger,
}),
}
}
func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup) {
if l.isSealed {
return
}
l.ts.Merge(pg.Traffic())
duration := l.ts.Duration()
if l.numGroups >= l.earlyWarningConfig.MinNumberOfGroups && duration >= l.earlyWarningConfig.MinDuration.Microseconds() {
l.earlyWarningWeightedLoss = l.ts.WeightedLoss()
}
if l.numGroups >= l.congestedConfig.MinNumberOfGroups && duration >= l.congestedConfig.MinDuration.Microseconds() {
l.congestedWeightedLoss = l.ts.WeightedLoss()
l.isSealed = true // can seal if congested thresholds are satisfied as those should be higher
}
}
func (l *lossMeasurement) IsSealed() bool {
return l.isSealed
}
func (l *lossMeasurement) IsEarlyWarningTriggered() bool {
return l.earlyWarningWeightedLoss > l.congestionMinLoss
}
func (l *lossMeasurement) IsCongestedTriggered() bool {
return l.congestedWeightedLoss > l.congestionMinLoss
}
// -------------------------------------------------------------------------------
@@ -84,8 +200,8 @@ type CongestionDetectorConfig struct {
JQRMinDelay time.Duration `yaml:"jqr_min_delay,omitempty"`
DQRMaxDelay time.Duration `yaml:"dqr_max_delay,omitempty"`
JQRMinLoss float64 `yaml:"jqr_min_loss,omitempty"`
DQRMaxLoss float64 `yaml:"dqr_max_loss,omitempty"`
WeightedLoss WeightedLossConfig `yaml:"weighted_loss,omitempty"`
CongestionMinLoss float64 `yaml:"congestion_min_loss,omitempty"`
QueuingDelayEarlyWarning CongestionSignalConfig `yaml:"queuing_delay_early_warning,omitempty"`
LossEarlyWarning CongestionSignalConfig `yaml:"loss_early_warning,omitempty"`
@@ -95,7 +211,6 @@ type CongestionDetectorConfig struct {
LossCongested CongestionSignalConfig `yaml:"loss_congested,omitempty"`
CongestedHangover time.Duration `yaml:"congested_hangover,omitempty"`
RateMeasurementWindowFullnessMin float64 `yaml:"rate_measurement_window_fullness_min,omitempty"`
RateMeasurementWindowDurationMin time.Duration `yaml:"rate_measurement_window_duration_min,omitempty"`
RateMeasurementWindowDurationMax time.Duration `yaml:"rate_measurement_window_duration_max,omitempty"`
@@ -109,7 +224,7 @@ var (
defaultTrendDetectorConfigCongestedCTR = ccutils.TrendDetectorConfig{
RequiredSamples: 5,
RequiredSamplesMin: 2,
DownwardTrendThreshold: -0.6,
DownwardTrendThreshold: -0.5,
DownwardTrendMaxWait: 5 * time.Second,
CollapseThreshold: 500 * time.Millisecond,
ValidityWindow: 10 * time.Second,
@@ -120,15 +235,14 @@ var (
PacketGroupMaxAge: 30 * time.Second,
JQRMinDelay: 15 * time.Millisecond,
DQRMaxDelay: 5 * time.Millisecond,
JQRMinLoss: 0.15,
DQRMaxLoss: 0.05,
WeightedLoss: defaultWeightedLossConfig,
CongestionMinLoss: 0.25,
QueuingDelayEarlyWarning: DefaultQueuingDelayEarlyWarningCongestionSignalConfig,
LossEarlyWarning: DefaultLossEarlyWarningCongestionSignalConfig,
EarlyWarningHangover: 500 * time.Millisecond,
QueuingDelayCongested: DefaultQueuingDelayCongestedCongestionSignalConfig,
LossCongested: DefaultLossCongestedCongestionSignalConfig,
CongestedHangover: 3 * time.Second,
RateMeasurementWindowFullnessMin: 0.8,
RateMeasurementWindowDurationMin: 800 * time.Millisecond,
RateMeasurementWindowDurationMax: 2 * time.Second,
PeriodicCheckInterval: 2 * time.Second,
@@ -168,6 +282,7 @@ type congestionDetector struct {
congestionState CongestionState
congestionStateSwitchedAt time.Time
congestedCTRTrend *ccutils.TrendDetector[float64]
congestedTrafficStats *trafficStats
onCongestionStateChange func(congestionState CongestionState, estimatedAvailableChannelCapacity int64)
}
@@ -235,12 +350,17 @@ func (c *congestionDetector) updateCongestionState(state CongestionState, reason
// reducing/relieving congestion
if state == CongestionStateCongested && prevState != CongestionStateCongested {
c.congestedCTRTrend = ccutils.NewTrendDetector[float64](ccutils.TrendDetectorParams{
Name: "ssbwe-estimate",
Name: "ssbwe-ctr",
Logger: c.params.Logger,
Config: c.params.Config.CongestedCTRTrend,
})
c.congestedTrafficStats = newTrafficStats(trafficStatsParams{
Config: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
})
} else if state != CongestionStateCongested {
c.congestedCTRTrend = nil
c.congestedTrafficStats = nil
}
}
@@ -268,72 +388,67 @@ func (c *congestionDetector) prunePacketGroups() {
return
}
threshold := c.packetGroups[len(c.packetGroups)-1].MinSendTime() - c.params.Config.PacketGroupMaxAge.Microseconds()
bst := c.packetTracker.BaseSendTime()
if bst == 0 {
return
}
threshold := mono.UnixMicro() - bst - c.params.Config.PacketGroupMaxAge.Microseconds()
for idx, pg := range c.packetGroups {
if mst := pg.MinSendTime(); mst < threshold {
c.packetGroups = c.packetGroups[idx+1:]
if mst := pg.MinSendTime(); mst > threshold {
c.packetGroups = c.packetGroups[idx:]
return
}
}
}
func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool, string, int) {
earlyWarningTriggered := false
earlyWarningReason := ""
congestedTriggered := false
congestedReason := ""
qd := &congestionSignalCalculator[int64]{
thresholdMin: c.params.Config.JQRMinDelay.Microseconds(),
thresholdMax: c.params.Config.DQRMaxDelay.Microseconds(),
}
loss := &congestionSignalCalculator[float64]{
thresholdMin: c.params.Config.JQRMinLoss,
thresholdMax: c.params.Config.DQRMaxLoss,
}
qdMeasurement := newQdMeasurement(
c.params.Config.QueuingDelayEarlyWarning,
c.params.Config.QueuingDelayCongested,
c.params.Config.JQRMinDelay.Microseconds(),
c.params.Config.DQRMaxDelay.Microseconds(),
)
lossMeasurement := newLossMeasurement(
c.params.Config.LossEarlyWarning,
c.params.Config.LossCongested,
c.params.Config.WeightedLoss,
c.params.Config.CongestionMinLoss,
c.params.Logger,
)
var idx int
for idx = len(c.packetGroups) - 1; idx >= 0; idx-- {
pg := c.packetGroups[idx]
pqd, pqdOk := pg.PropagatedQueuingDelay()
lr, lrOk := pg.LossRatio()
if !pqdOk && !lrOk {
continue
}
qdMeasurement.ProcessPacketGroup(pg)
lossMeasurement.ProcessPacketGroup(pg)
// `queueing delay` and `loss` based congestion signals are determined independently,
// i. e. one packet group triggering `queueing delay` and another group triggering
// `loss` will not combine to trigger the aggregate congestion signal
sendDuration := pg.SendDuration()
if pqdOk {
qd.processSample(pqd, sendDuration)
}
if lrOk {
loss.processSample(lr, sendDuration)
// if both measurements have enough data to make a decision, stop processing groups
if qdMeasurement.IsSealed() && lossMeasurement.IsSealed() {
break
}
}
if !earlyWarningTriggered && qd.isTriggered(c.params.Config.QueuingDelayEarlyWarning) {
earlyWarningTriggered = true
earlyWarningReason = "queuing-delay"
}
if !earlyWarningTriggered && loss.isTriggered(c.params.Config.LossEarlyWarning) {
earlyWarningTriggered = true
earlyWarningReason := ""
earlyWarningTriggered := qdMeasurement.IsEarlyWarningTriggered()
if earlyWarningTriggered {
earlyWarningReason = "queuing-delay"
} else {
earlyWarningTriggered = lossMeasurement.IsEarlyWarningTriggered()
if earlyWarningTriggered {
earlyWarningReason = "loss"
}
}
if !congestedTriggered && qd.isTriggered(c.params.Config.QueuingDelayCongested) {
congestedTriggered = true
congestedReason = "queuing-delay"
}
if !congestedTriggered && loss.isTriggered(c.params.Config.LossCongested) {
congestedTriggered = true
congestedReason := ""
congestedTriggered := qdMeasurement.IsCongestedTriggered()
if congestedTriggered {
congestedReason = "queuing-delay"
} else {
congestedTriggered = lossMeasurement.IsCongestedTriggered()
if congestedTriggered {
congestedReason = "loss"
}
if earlyWarningTriggered && congestedTriggered {
break
}
}
return earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, idx
@@ -401,12 +516,17 @@ func (c *congestionDetector) congestionDetectionStateMachine() {
}
}
func (c *congestionDetector) updateTrend(ctr float64) {
func (c *congestionDetector) updateTrend(pg *packetGroup) {
if c.congestedCTRTrend == nil {
return
}
// quantise the CTR to filter out small changes
// progressively keep increasing the window and make measurements over longer windows,
// if congestion is not relieving, CTR will trend down
c.congestedTrafficStats.Merge(pg.Traffic())
ctr := c.congestedTrafficStats.CapturedTrafficRatio()
// quantise CTR to filter out small changes
c.congestedCTRTrend.AddValue(float64(int((ctr+(c.params.Config.CongestedCTREpsilon/2))/c.params.Config.CongestedCTREpsilon)) * c.params.Config.CongestedCTREpsilon)
if c.congestedCTRTrend.GetDirection() == ccutils.TrendDirectionDownward {
@@ -424,10 +544,14 @@ func (c *congestionDetector) updateTrend(ctr float64) {
// reset to get new set of samples for next trend
c.congestedCTRTrend = ccutils.NewTrendDetector[float64](ccutils.TrendDetectorParams{
Name: "ssbwe-estimate",
Name: "ssbwe-ctr",
Logger: c.params.Logger,
Config: c.params.Config.CongestedCTRTrend,
})
c.congestedTrafficStats = newTrafficStats(trafficStatsParams{
Config: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
})
}
}
@@ -436,31 +560,33 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() {
return
}
totalDuration := int64(0)
totalBytes := 0
threshold := c.packetGroups[len(c.packetGroups)-1].MinSendTime() - c.params.Config.RateMeasurementWindowDurationMax.Microseconds()
bst := c.packetTracker.BaseSendTime()
if bst == 0 {
return
}
threshold := mono.UnixMicro() - bst - c.params.Config.RateMeasurementWindowDurationMax.Microseconds()
agg := newTrafficStats(trafficStatsParams{
Config: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
})
for idx := len(c.packetGroups) - 1; idx >= 0; idx-- {
pg := c.packetGroups[idx]
mst, dur, nbytes, fullness := pg.Traffic()
if mst < threshold {
if pg.MinSendTime() < threshold {
break
}
if fullness < c.params.Config.RateMeasurementWindowFullnessMin {
continue
}
totalDuration += dur
totalBytes += nbytes
agg.Merge(pg.Traffic())
}
if totalDuration >= c.params.Config.RateMeasurementWindowDurationMin.Microseconds() {
c.lock.Lock()
c.estimatedAvailableChannelCapacity = int64(totalBytes) * 8 * 1e6 / totalDuration
c.lock.Unlock()
} else {
c.params.Logger.Infow("not enough data to estimate available channel capacity", "totalDuration", totalDuration)
if agg.Duration() < c.params.Config.RateMeasurementWindowDurationMin.Microseconds() {
c.params.Logger.Infow("not enough data to estimate available channel capacity", "duration", agg.Duration())
return
}
c.lock.Lock()
c.estimatedAvailableChannelCapacity = agg.AcknowledgedBitrate()
c.lock.Unlock()
}
func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) {
@@ -495,7 +621,7 @@ func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) {
if err == errGroupFinalized {
// previous group ended, start a new group
c.updateTrend(pg.CapturedTrafficRatio())
c.updateTrend(pg)
// SSBWE-REMOVE c.params.Logger.Infow("packet group done", "group", pg, "numGroups", len(c.packetGroups)) // SSBWE-REMOVE
pqd, _ := pg.PropagatedQueuingDelay()
+42 -105
View File
@@ -1,8 +1,21 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sendsidebwe
import (
"errors"
"math"
"time"
"github.com/livekit/protocol/logger"
@@ -22,18 +35,12 @@ var (
type PacketGroupConfig struct {
MinPackets int `yaml:"min_packets,omitempty"`
MaxWindowDuration time.Duration `yaml:"max_window_duration,omitempty"`
// should have at least this fraction of `MinPackets` for loss penalty consideration
LossPenaltyMinPacketsRatio float64 `yaml:"loss_penalty_min_packet_ratio,omitempty"`
LossPenaltyFactor float64 `yaml:"loss_penalty_factor,omitempty"`
}
var (
DefaultPacketGroupConfig = PacketGroupConfig{
MinPackets: 20,
MaxWindowDuration: 500 * time.Millisecond,
LossPenaltyMinPacketsRatio: 0.5,
LossPenaltyFactor: 0.25,
MinPackets: 20,
MaxWindowDuration: 500 * time.Millisecond,
}
)
@@ -108,8 +115,9 @@ func (c classStat) MarshalLogObject(e zapcore.ObjectEncoder) error {
// -------------------------------------------------------------
type packetGroupParams struct {
Config PacketGroupConfig
Logger logger.Logger
Config PacketGroupConfig
WeightedLoss WeightedLossConfig
Logger logger.Logger
}
type packetGroup struct {
@@ -212,6 +220,10 @@ func (p *packetGroup) MinSendTime() int64 {
return p.minSendTime
}
func (p *packetGroup) SendWindow() (int64, int64) {
return p.maxSendTime, p.minSendTime
}
func (p *packetGroup) PropagatedQueuingDelay() (int64, bool) {
if !p.isFinalized {
return 0, false
@@ -224,57 +236,16 @@ func (p *packetGroup) PropagatedQueuingDelay() (int64, bool) {
return max(0, p.aggregateRecvDelta-p.aggregateSendDelta), true
}
func (p *packetGroup) SendDuration() int64 {
if !p.isFinalized {
return 0
func (p *packetGroup) Traffic() *trafficStats {
return &trafficStats{
minSendTime: p.minSendTime,
maxSendTime: p.maxSendTime,
sendDelta: p.aggregateSendDelta,
recvDelta: p.aggregateRecvDelta,
ackedPackets: p.acked.numPackets(),
ackedBytes: p.acked.numBytes(),
lostPackets: p.lost.numPackets(),
}
return p.maxSendTime - p.minSendTime
}
func (p *packetGroup) CapturedTrafficRatio() float64 {
capturedTrafficRatio := float64(0.0)
if p.aggregateRecvDelta != 0 {
// apply a penalty for lost packets,
// tha rationale being packet dropping is a strategy to relieve congestion
// and if they were not dropped, they would have increased queuing delay,
// as it is not possible to know the reason for the losses,
// apply a small penalty to receive delta aggregate to simulate those packets
// build up queuing delay.
//
// note that it is applied only for determining rate and
// not while determining queuing region, adding synthetic delays
// like this could cause queuing region to be stuck in JQR
capturedTrafficRatio = float64(p.aggregateSendDelta) / float64(p.aggregateRecvDelta+p.getLossPenalty())
}
return min(1.0, capturedTrafficRatio)
}
func (p *packetGroup) Traffic() (int64, int64, int, float64) {
numBytes := int(float64(p.acked.numBytes()) * p.CapturedTrafficRatio())
fullness := max(
float64(p.acked.numPackets())/float64(p.params.Config.MinPackets),
float64(p.maxSendTime-p.minSendTime)/float64(p.params.Config.MaxWindowDuration.Microseconds()),
)
return p.minSendTime, p.maxSendTime - p.minSendTime, numBytes, fullness
}
func (p *packetGroup) LossRatio() (float64, bool) {
if !p.isFinalized {
return 0.0, false
}
return p.lossRatio()
}
func (p *packetGroup) lossRatio() (float64, bool) {
lostPackets := p.lost.numPackets()
totalPackets := float64(lostPackets + p.acked.numPackets())
lossRatio := float64(lostPackets) / totalPackets
// indeterminate if not enough packets, the second return value will be false if indeterminate
return lossRatio, totalPackets >= float64(p.params.Config.MinPackets)*p.params.Config.LossPenaltyMinPacketsRatio
}
func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error {
@@ -282,26 +253,15 @@ func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error {
return nil
}
e.AddInt64("minSendTime", p.minSendTime)
e.AddInt64("maxSendTime", p.maxSendTime)
sendDuration := time.Duration((p.maxSendTime - p.minSendTime) * 1000)
e.AddDuration("sendDuration", sendDuration)
e.AddInt64("minRecvTime", p.minRecvTime)
e.AddInt64("maxRecvTime", p.maxRecvTime)
recvDuration := time.Duration((p.maxRecvTime - p.minRecvTime) * 1000)
e.AddDuration("recvDuration", recvDuration)
e.AddUint64("minSequenceNumber", p.minSequenceNumber)
e.AddUint64("maxSequenceNumber", p.maxSequenceNumber)
e.AddObject("acked", p.acked)
e.AddObject("lost", p.lost)
sendBitrate := float64(0)
if sendDuration != 0 {
sendBitrate = float64(p.acked.numBytes()*8) / sendDuration.Seconds()
e.AddFloat64("sendBitrate", sendBitrate)
}
e.AddInt64("minRecvTime", p.minRecvTime)
e.AddInt64("maxRecvTime", p.maxRecvTime)
recvDuration := time.Duration((p.maxRecvTime - p.minRecvTime) * 1000)
e.AddDuration("recvDuration", recvDuration)
recvBitrate := float64(0)
if recvDuration != 0 {
@@ -309,20 +269,12 @@ func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddFloat64("recvBitrate", recvBitrate)
}
e.AddInt64("aggregateSendDelta", p.aggregateSendDelta)
e.AddInt64("aggregateRecvDelta", p.aggregateRecvDelta)
e.AddInt64("queuingDelay", p.queuingDelay)
e.AddInt64("groupDelay", p.aggregateRecvDelta-p.aggregateSendDelta)
lossRatio, lossRatioValid := p.lossRatio()
e.AddFloat64("lossRatio", lossRatio)
e.AddBool("lossRatioValid", lossRatioValid)
e.AddInt64("lossPenalty", p.getLossPenalty())
capturedTrafficRatio := p.CapturedTrafficRatio()
e.AddFloat64("capturedTrafficRatio", capturedTrafficRatio)
e.AddFloat64("estimatedAvailableChannelCapacity", sendBitrate*capturedTrafficRatio)
ts := newTrafficStats(trafficStatsParams{
Config: p.params.WeightedLoss,
Logger: p.params.Logger,
})
ts.Merge(p.Traffic())
e.AddObject("trafficStats", ts)
e.AddBool("isFinalized", p.isFinalized)
return nil
@@ -339,18 +291,3 @@ func (p *packetGroup) inGroup(sequenceNumber uint64) error {
return nil
}
func (p *packetGroup) getLossPenalty() int64 {
// Log10 is used to give higher weight for the same loss ratio at higher packet rates,
// for e.g. with a penalty factor of 0.25
// - 10% loss at 20 total packets = 0.1 * log10(20) * 0.25 = 0.032
// - 10% loss at 100 total packets = 0.1 * log10(100) * 0.25 = 0.05
// - 10% loss at 1000 total packets = 0.1 * log10(100) * 0.25 = 0.075
lossRatio, _ := p.lossRatio()
return int64(
float64(p.aggregateRecvDelta) *
lossRatio *
math.Log10(float64(p.acked.numPackets()+p.lost.numPackets())) *
p.params.Config.LossPenaltyFactor,
)
}
+14
View File
@@ -1,3 +1,17 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sendsidebwe
import (
+21
View File
@@ -1,3 +1,17 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sendsidebwe
import (
@@ -70,6 +84,13 @@ func (p *packetTracker) RecordPacketSendAndGetSequenceNumber(at time.Time, size
return uint16(pi.sequenceNumber)
}
func (p *packetTracker) BaseSendTime() int64 {
p.lock.Lock()
defer p.lock.Unlock()
return p.baseSendTime
}
func (p *packetTracker) RecordPacketIndicationFromRemote(sn uint16, recvTime int64) (piRecv packetInfo, sendDelta, recvDelta int64) {
p.lock.Lock()
defer p.lock.Unlock()
+14
View File
@@ -1,3 +1,17 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sendsidebwe
import (
+156
View File
@@ -0,0 +1,156 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sendsidebwe
import (
"math"
"time"
"github.com/livekit/protocol/logger"
"go.uber.org/zap/zapcore"
)
// -----------------------------------------------------------
type WeightedLossConfig struct {
MinPacketsForLossValidity int `yaml:"min_packets_for_loss_validity,omitempty"`
LossPenaltyFactor float64 `yaml:"loss_penalty_factor,omitempty"`
}
var (
defaultWeightedLossConfig = WeightedLossConfig{
MinPacketsForLossValidity: 20,
LossPenaltyFactor: 0.25,
}
)
// -----------------------------------------------------------
type trafficStatsParams struct {
Config WeightedLossConfig
Logger logger.Logger
}
type trafficStats struct {
params trafficStatsParams
minSendTime int64
maxSendTime int64
sendDelta int64
recvDelta int64
ackedPackets int
ackedBytes int
lostPackets int
}
func newTrafficStats(params trafficStatsParams) *trafficStats {
return &trafficStats{
params: params,
}
}
func (ts *trafficStats) Merge(rhs *trafficStats) {
if rhs.minSendTime == 0 || rhs.minSendTime < ts.minSendTime {
ts.minSendTime = rhs.minSendTime
}
if rhs.maxSendTime > ts.maxSendTime {
ts.maxSendTime = rhs.maxSendTime
}
ts.sendDelta += rhs.sendDelta
ts.recvDelta += rhs.recvDelta
ts.ackedPackets += rhs.ackedPackets
ts.ackedBytes += rhs.ackedBytes
ts.lostPackets += rhs.lostPackets
}
func (ts *trafficStats) Duration() int64 {
return ts.maxSendTime - ts.minSendTime
}
func (ts *trafficStats) AcknowledgedBitrate() int64 {
ackedBitrate := int64(ts.ackedBytes) * 8 * 1e6 / ts.Duration()
return int64(float64(ackedBitrate) * ts.CapturedTrafficRatio())
}
func (ts *trafficStats) CapturedTrafficRatio() float64 {
if ts.recvDelta == 0 {
return 0.0
}
// apply a penalty for lost packets,
// tha rationale being packet dropping is a strategy to relieve congestion
// and if they were not dropped, they would have increased queuing delay,
// as it is not possible to know the reason for the losses,
// apply a small penalty to receive delta aggregate to simulate those packets
// building up queuing delay.
return min(1.0, float64(ts.sendDelta)/float64(ts.recvDelta+ts.lossPenalty()))
}
func (ts *trafficStats) WeightedLoss() float64 {
totalPackets := float64(ts.lostPackets + ts.ackedPackets)
if int(totalPackets) < ts.params.Config.MinPacketsForLossValidity {
return 0.0
}
lossRatio := float64(0.0)
if totalPackets != 0 {
lossRatio = float64(ts.lostPackets) / totalPackets
}
pps := totalPackets * 1e6 / float64(ts.Duration())
// Log10 is used to give higher weight for the same loss ratio at higher packet rates,
// for e.g. with a penalty factor of 0.25
// - 10% loss at 20 pps = 0.1 * log10(20) * 0.25 = 0.032
// - 10% loss at 100 pps = 0.1 * log10(100) * 0.25 = 0.05
// - 10% loss at 1000 pps = 0.1 * log10(1000) * 0.25 = 0.075
return lossRatio * math.Log10(pps)
}
func (ts *trafficStats) lossPenalty() int64 {
return int64(float64(ts.recvDelta) * ts.WeightedLoss() * ts.params.Config.LossPenaltyFactor)
}
func (ts *trafficStats) MarshalLogObject(e zapcore.ObjectEncoder) error {
if ts == nil {
return nil
}
e.AddInt64("minSendTime", ts.minSendTime)
e.AddInt64("maxSendTime", ts.maxSendTime)
duration := time.Duration(ts.Duration() * 1000)
e.AddDuration("duration", duration)
bitrate := float64(0)
if duration != 0 {
bitrate = float64(ts.ackedBytes*8) / duration.Seconds()
e.AddFloat64("bitrate", bitrate)
}
e.AddInt64("sendDelta", ts.sendDelta)
e.AddInt64("recvDelta", ts.recvDelta)
e.AddInt64("groupDelay", ts.recvDelta-ts.sendDelta)
e.AddFloat64("weightedLoss", ts.WeightedLoss())
if (ts.ackedPackets + ts.lostPackets) != 0 {
e.AddFloat64("rawLoss", float64(ts.lostPackets)/float64(ts.ackedPackets+ts.lostPackets))
}
e.AddInt64("lossPenalty", ts.lossPenalty())
capturedTrafficRatio := ts.CapturedTrafficRatio()
e.AddFloat64("capturedTrafficRatio", capturedTrafficRatio)
e.AddFloat64("estimatedAvailableChannelCapacity", bitrate*capturedTrafficRatio)
return nil
}
+14
View File
@@ -1,3 +1,17 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sendsidebwe
import (