Dampen stream allocator (#551)

* WIP commit

* WIP commit

* WIP commit

* format

* NACK window

* Remove layer when it is expected to stop

* Remove debug
This commit is contained in:
Raja Subramanian
2022-03-22 22:23:22 +05:30
committed by GitHub
parent 779fe0f549
commit 076eb1c8ae
5 changed files with 210 additions and 120 deletions
+7 -2
View File
@@ -264,11 +264,16 @@ func (r *RTPStats) GetTotalPackets() uint32 {
return r.getNumPacketsSeen() + r.packetsDuplicate + r.packetsPadding
}
func (r *RTPStats) GetTotalPacketsSansDuplicate() uint32 {
func (r *RTPStats) GetTotalPacketsPrimary() uint32 {
r.lock.RLock()
defer r.lock.RUnlock()
return r.getNumPacketsSeen() + r.packetsPadding
packetsSeen := r.getNumPacketsSeen()
if r.packetsPadding > packetsSeen {
return 0
}
return packetsSeen - r.packetsPadding
}
func (r *RTPStats) GetTotalBytes() uint64 {
+1 -5
View File
@@ -1056,10 +1056,6 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
nackMisses := uint32(0)
for _, meta := range d.sequencer.getPacketsMeta(filtered) {
if meta.layer == int8(InvalidLayerSpatial) {
if meta.nacked > 1 {
numRepeatedNACKs++
}
// padding packet, no RTX for those
continue
}
@@ -1279,7 +1275,7 @@ func (d *DownTrack) getQualityParams() *buffer.ConnectionQualityParams {
}
func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint32) {
totalPackets = d.rtpStats.GetTotalPacketsSansDuplicate()
totalPackets = d.rtpStats.GetTotalPacketsPrimary()
d.statsLock.RLock()
totalRepeatedNACKs = d.totalRepeatedNACKs
+198 -109
View File
@@ -23,7 +23,13 @@ const (
NumRequiredEstimatesNonProbe = 8
NumRequiredEstimatesProbe = 3
NackRatioThresholdNonProbe = 0.06
DownwardTrendThresholdNonProbe = -0.5
DownwardTrendThresholdProbe = 0.0
NackWindowDurationProbe = 0 * time.Second
NackWindowDurationNonProbe = 2 * time.Second
NackRatioThresholdNonProbe = 0.08
NackRatioThresholdProbe = 0.04
NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate
@@ -145,7 +151,6 @@ type StreamAllocator struct {
abortedProbeClusterId ProbeClusterId
probeTrendObserved bool
probeEndTime time.Time
probeChannelObserver *ChannelObserver
prober *Prober
@@ -167,9 +172,8 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
prober: NewProber(ProberParams{
Logger: params.Logger,
}),
channelObserver: NewChannelObserver("non-probe", params.Logger, NumRequiredEstimatesNonProbe, NackRatioThresholdNonProbe),
videoTracks: make(map[livekit.TrackID]*Track),
eventCh: make(chan Event, 20),
videoTracks: make(map[livekit.TrackID]*Track),
eventCh: make(chan Event, 20),
}
s.resetState()
@@ -248,7 +252,7 @@ func (s *StreamAllocator) SetTrackPriority(downTrack *DownTrack, priority uint8)
}
func (s *StreamAllocator) resetState() {
s.channelObserver.Reset()
s.channelObserver = s.newChannelObserverNonProbe()
s.resetProbe()
s.state = StateStable
@@ -603,7 +607,7 @@ func (s *StreamAllocator) handleSignalProbeClusterDone(event *Event) {
// ensure probe queue is flushed
// LK-TODO: ProbeSettleWait should actually be a certain number of RTTs.
lowestEstimate := int64(math.Min(float64(s.committedChannelCapacity), float64(s.probeChannelObserver.GetLowestEstimate())))
lowestEstimate := int64(math.Min(float64(s.committedChannelCapacity), float64(s.channelObserver.GetLowestEstimate())))
expectedDuration := float64(info.BytesSent*8*1000) / float64(lowestEstimate)
queueTime := expectedDuration - float64(info.Duration.Milliseconds())
if queueTime < 0.0 {
@@ -648,18 +652,23 @@ func (s *StreamAllocator) handleNewEstimate(receivedEstimate int64) {
}
func (s *StreamAllocator) handleNewEstimateInProbe() {
s.probeChannelObserver.AddEstimate(s.lastReceivedEstimate)
// always update NACKs, even if aborted
packetDelta, repeatedNackDelta := s.getNackDelta()
s.probeChannelObserver.AddNack(packetDelta, repeatedNackDelta)
trend := s.probeChannelObserver.GetTrend()
if s.abortedProbeClusterId != ProbeClusterIdInvalid {
// waiting for aborted probe to finalize
return
}
s.channelObserver.AddEstimate(s.lastReceivedEstimate)
s.channelObserver.AddNack(packetDelta, repeatedNackDelta)
trend, _ := s.channelObserver.GetTrend()
if trend != ChannelTrendNeutral {
s.probeTrendObserved = true
}
switch {
case s.abortedProbeClusterId != ProbeClusterIdInvalid:
return
case !s.probeTrendObserved && time.Since(s.lastProbeStartTime) > ProbeTrendWait:
//
// More of a safety net.
@@ -672,13 +681,13 @@ func (s *StreamAllocator) handleNewEstimateInProbe() {
// stop immediately if the probe is congesting channel more
s.params.Logger.Debugw("probe: aborting, channel is congesting", "cluster", s.probeClusterId)
s.abortProbe()
case s.probeChannelObserver.GetHighestEstimate() > s.probeGoalBps:
case s.channelObserver.GetHighestEstimate() > s.probeGoalBps:
// reached goal, stop probing
s.params.Logger.Debugw(
"probe: stopping, goal reached",
"cluster", s.probeClusterId,
"goal", s.probeGoalBps,
"highest", s.probeChannelObserver.GetHighestEstimate(),
"highest", s.channelObserver.GetHighestEstimate(),
)
s.stopProbe()
}
@@ -690,28 +699,38 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() {
packetDelta, repeatedNackDelta := s.getNackDelta()
s.channelObserver.AddNack(packetDelta, repeatedNackDelta)
trend := s.channelObserver.GetTrend()
trend, reason := s.channelObserver.GetTrend()
if trend != ChannelTrendCongesting {
return
}
nackRatio := s.channelObserver.GetNackRatio()
lossAdjustedEstimate := s.lastReceivedEstimate
if nackRatio > NackRatioThresholdNonProbe {
lossAdjustedEstimate = int64(float64(lossAdjustedEstimate) * (1.0 - NackRatioAttenuator*nackRatio))
var estimateToCommit int64
var nackRatio float64
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
switch reason {
case ChannelCongestionReasonLoss:
estimateToCommit = expectedBandwidthUsage
nackRatio = s.channelObserver.GetNackRatio()
if nackRatio > NackRatioThresholdNonProbe {
estimateToCommit = int64(float64(estimateToCommit) * (1.0 - NackRatioAttenuator*nackRatio))
}
default:
estimateToCommit = s.lastReceivedEstimate
}
s.params.Logger.Infow(
"channel congestion detected, updating channel capacity",
"reason", reason,
"old(bps)", s.committedChannelCapacity,
"new(bps)", lossAdjustedEstimate,
"new(bps)", estimateToCommit,
"lastReceived(bps)", s.lastReceivedEstimate,
"expectedUsage(bps)", expectedBandwidthUsage,
"nackRatio", nackRatio,
)
s.committedChannelCapacity = lossAdjustedEstimate
s.committedChannelCapacity = estimateToCommit
// reset to get new set of samples for next trend
s.channelObserver.Reset()
s.channelObserver = s.newChannelObserverNonProbe()
// reset probe to ensure it does not start too soon after a downward trend
s.resetProbe()
@@ -812,7 +831,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
func (s *StreamAllocator) finalizeProbe() {
aborted := s.probeClusterId == s.abortedProbeClusterId
highestEstimateInProbe := s.probeChannelObserver.GetHighestEstimate()
highestEstimateInProbe := s.channelObserver.GetHighestEstimate()
s.clearProbe()
@@ -827,7 +846,7 @@ func (s *StreamAllocator) finalizeProbe() {
// NOTE: With TWCC, it is possible to reset bandwidth estimation to clean state as
// the send side is in full control of bandwidth estimation.
//
s.channelObserver.Reset()
s.channelObserver = s.newChannelObserverNonProbe()
if aborted {
// failed probe, backoff
@@ -1013,6 +1032,30 @@ func (s *StreamAllocator) getNackDelta() (uint32, uint32) {
return aggPacketDelta, aggRepeatedNackDelta
}
func (s *StreamAllocator) newChannelObserverProbe() *ChannelObserver {
return NewChannelObserver(ChannelObserverParams{
Name: "probe",
Logger: s.params.Logger,
EstimateRequiredSamples: NumRequiredEstimatesProbe,
EstimateDownwardTrendThreshold: DownwardTrendThresholdProbe,
EstimateCollapseValues: false,
NackWindowDuration: NackWindowDurationProbe,
NackRatioThreshold: NackRatioThresholdProbe,
})
}
func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver {
return NewChannelObserver(ChannelObserverParams{
Name: "non-probe",
Logger: s.params.Logger,
EstimateRequiredSamples: NumRequiredEstimatesNonProbe,
EstimateDownwardTrendThreshold: DownwardTrendThresholdNonProbe,
EstimateCollapseValues: true,
NackWindowDuration: NackWindowDurationNonProbe,
NackRatioThreshold: NackRatioThresholdNonProbe,
})
}
func (s *StreamAllocator) initProbe(probeRateBps int64) {
s.lastProbeStartTime = time.Now()
@@ -1025,8 +1068,8 @@ func (s *StreamAllocator) initProbe(probeRateBps int64) {
s.probeEndTime = time.Time{}
s.probeChannelObserver = NewChannelObserver("probe", s.params.Logger, NumRequiredEstimatesProbe, NackRatioThresholdProbe)
s.probeChannelObserver.SeedEstimate(s.lastReceivedEstimate)
s.channelObserver = s.newChannelObserverProbe()
s.channelObserver.SeedEstimate(s.lastReceivedEstimate)
desiredRateBps := int(probeRateBps) + int(math.Max(float64(s.committedChannelCapacity), float64(expectedBandwidthUsage)))
s.probeClusterId = s.prober.AddCluster(
@@ -1057,12 +1100,6 @@ func (s *StreamAllocator) resetProbe() {
func (s *StreamAllocator) clearProbe() {
s.probeClusterId = ProbeClusterIdInvalid
s.abortedProbeClusterId = ProbeClusterIdInvalid
s.probeTrendObserved = false
s.probeEndTime = time.Time{}
s.probeChannelObserver = nil
}
func (s *StreamAllocator) backoffProbeInterval() {
@@ -1468,36 +1505,59 @@ func (c ChannelTrend) String() string {
}
}
type ChannelObserver struct {
name string
logger logger.Logger
type ChannelCongestionReason int
estimateTrend *TrendDetector
const (
ChannelCongestionReasonNone ChannelCongestionReason = iota
ChannelCongestionReasonEstimate
ChannelCongestionReasonLoss
)
nackRatioThreshold float64
packets uint32
repeatedNacks uint32
}
func NewChannelObserver(
name string,
logger logger.Logger,
estimateRequiredSamples int,
nackRatioThreshold float64,
) *ChannelObserver {
return &ChannelObserver{
name: name,
logger: logger,
estimateTrend: NewTrendDetector(name+"-estimate", logger, estimateRequiredSamples),
nackRatioThreshold: nackRatioThreshold,
func (c ChannelCongestionReason) String() string {
switch c {
case ChannelCongestionReasonNone:
return "NONE"
case ChannelCongestionReasonEstimate:
return "ESTIMATE"
case ChannelCongestionReasonLoss:
return "LOSS"
default:
return fmt.Sprintf("%d", int(c))
}
}
func (c *ChannelObserver) Reset() {
c.estimateTrend.Reset()
type ChannelObserverParams struct {
Name string
Logger logger.Logger
EstimateRequiredSamples int
EstimateDownwardTrendThreshold float64
EstimateCollapseValues bool
NackWindowDuration time.Duration
NackRatioThreshold float64
}
c.packets = 0
c.repeatedNacks = 0
type ChannelObserver struct {
params ChannelObserverParams
estimateTrend *TrendDetector
nackWindowStartTime time.Time
packets uint32
repeatedNacks uint32
}
func NewChannelObserver(params ChannelObserverParams) *ChannelObserver {
return &ChannelObserver{
params: params,
estimateTrend: NewTrendDetector(TrendDetectorParams{
Name: params.Name + "-estimate",
Logger: params.Logger,
RequiredSamples: params.EstimateRequiredSamples,
DownwardTrendThreshold: params.EstimateDownwardTrendThreshold,
CollapseValues: params.EstimateCollapseValues,
}),
nackWindowStartTime: time.Now(),
}
}
func (c *ChannelObserver) SeedEstimate(estimate int64) {
@@ -1514,6 +1574,12 @@ func (c *ChannelObserver) AddEstimate(estimate int64) {
}
func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32) {
if c.params.NackWindowDuration != 0 && time.Since(c.nackWindowStartTime) > c.params.NackWindowDuration {
c.nackWindowStartTime = time.Now()
c.packets = 0
c.repeatedNacks = 0
}
c.packets += packets
c.repeatedNacks += repeatedNacks
}
@@ -1538,27 +1604,30 @@ func (c *ChannelObserver) GetNackRatio() float64 {
return ratio
}
func (c *ChannelObserver) GetTrend() ChannelTrend {
func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) {
estimateDirection := c.estimateTrend.GetDirection()
nackRatio := c.GetNackRatio()
switch {
case estimateDirection == TrendDirectionDownward:
c.logger.Debugw(
c.params.Logger.Debugw(
"channel observer: estimate is trending downward",
"lowest", c.estimateTrend.GetLowest(),
"highest", c.estimateTrend.GetHighest(),
"estimates", c.estimateTrend.GetValues(),
"name", c.params.Name,
"estimate", c.estimateTrend.ToString(),
)
return ChannelTrendCongesting
case nackRatio > c.nackRatioThreshold:
c.logger.Debugw("channel observer: high rate of repeated NACKs", "ratio", nackRatio)
return ChannelTrendCongesting
return ChannelTrendCongesting, ChannelCongestionReasonEstimate
case nackRatio > c.params.NackRatioThreshold:
c.params.Logger.Debugw(
"channel observer: high rate of repeated NACKs",
"name", c.params.Name,
"ratio", nackRatio,
)
return ChannelTrendCongesting, ChannelCongestionReasonLoss
case estimateDirection == TrendDirectionUpward:
return ChannelTrendClearing
return ChannelTrendClearing, ChannelCongestionReasonNone
}
return ChannelTrendNeutral
return ChannelTrendNeutral, ChannelCongestionReasonNone
}
// ------------------------------------------------
@@ -1584,33 +1653,34 @@ func (t TrendDirection) String() string {
}
}
type TrendDetector struct {
name string
logger logger.Logger
requiredSamples int
type TrendDetectorParams struct {
Name string
Logger logger.Logger
RequiredSamples int
DownwardTrendThreshold float64
CollapseValues bool
}
type TrendDetector struct {
params TrendDetectorParams
startTime time.Time
numSamples int
values []int64
lowestvalue int64
highestvalue int64
lowestValue int64
highestValue int64
direction TrendDirection
}
func NewTrendDetector(name string, logger logger.Logger, requiredSamples int) *TrendDetector {
func NewTrendDetector(params TrendDetectorParams) *TrendDetector {
return &TrendDetector{
name: name,
logger: logger,
requiredSamples: requiredSamples,
direction: TrendDirectionNeutral,
params: params,
startTime: time.Now(),
direction: TrendDirectionNeutral,
}
}
func (t *TrendDetector) Reset() {
t.values = nil
t.lowestvalue = int64(0)
t.highestvalue = int64(0)
}
func (t *TrendDetector) Seed(value int64) {
if len(t.values) != 0 {
return
@@ -1620,14 +1690,20 @@ func (t *TrendDetector) Seed(value int64) {
}
func (t *TrendDetector) AddValue(value int64) {
if t.lowestvalue == 0 || value < t.lowestvalue {
t.lowestvalue = value
t.numSamples++
if t.lowestValue == 0 || value < t.lowestValue {
t.lowestValue = value
}
if value > t.highestvalue {
t.highestvalue = value
if value > t.highestValue {
t.highestValue = value
}
if len(t.values) == t.requiredSamples {
// ignore duplicate values
if t.params.CollapseValues && len(t.values) != 0 && t.values[len(t.values)-1] == value {
return
}
if len(t.values) == t.params.RequiredSamples {
t.values = t.values[1:]
}
t.values = append(t.values, value)
@@ -1636,11 +1712,11 @@ func (t *TrendDetector) AddValue(value int64) {
}
func (t *TrendDetector) GetLowest() int64 {
return t.lowestvalue
return t.lowestValue
}
func (t *TrendDetector) GetHighest() int64 {
return t.highestvalue
return t.highestValue
}
func (t *TrendDetector) GetValues() []int64 {
@@ -1651,39 +1727,52 @@ func (t *TrendDetector) GetDirection() TrendDirection {
return t.direction
}
func (t *TrendDetector) ToString() string {
now := time.Now()
elapsed := now.Sub(t.startTime).Seconds()
str := fmt.Sprintf("n: %s", t.params.Name)
str += fmt.Sprintf(", t: %+v|%+v|%.2fs", t.startTime.Format(time.UnixDate), now.Format(time.UnixDate), elapsed)
str += fmt.Sprintf(", v: %d|%d|%d|%+v|%.2f", t.numSamples, t.lowestValue, t.highestValue, t.values, kendallsTau(t.values))
return str
}
func (t *TrendDetector) updateDirection() {
if len(t.values) < t.requiredSamples {
if len(t.values) < t.params.RequiredSamples {
t.direction = TrendDirectionNeutral
return
}
// using Kendall's Tau to find trend
kt := kendallsTau(t.values)
t.direction = TrendDirectionNeutral
switch {
case kt > 0:
t.direction = TrendDirectionUpward
case kt < t.params.DownwardTrendThreshold:
t.direction = TrendDirectionDownward
}
}
// ------------------------------------------------
func kendallsTau(values []int64) float64 {
concordantPairs := 0
discordantPairs := 0
for i := 0; i < len(t.values)-1; i++ {
for j := i + 1; j < len(t.values); j++ {
if t.values[i] < t.values[j] {
for i := 0; i < len(values)-1; i++ {
for j := i + 1; j < len(values); j++ {
if values[i] < values[j] {
concordantPairs++
} else if t.values[i] > t.values[j] {
} else if values[i] > values[j] {
discordantPairs++
}
}
}
if (concordantPairs + discordantPairs) == 0 {
t.direction = TrendDirectionNeutral
return
return 0.0
}
t.direction = TrendDirectionNeutral
kt := (float64(concordantPairs) - float64(discordantPairs)) / (float64(concordantPairs) + float64(discordantPairs))
switch {
case kt > 0:
t.direction = TrendDirectionUpward
case kt < 0:
t.direction = TrendDirectionDownward
}
return (float64(concordantPairs) - float64(discordantPairs)) / (float64(concordantPairs) + float64(discordantPairs))
}
// ------------------------------------------------
+1 -1
View File
@@ -109,7 +109,7 @@ func (s *StreamTrackerManager) AddTracker(layer int32) {
break
}
}
if !exempt {
if !exempt || layer > s.maxExpectedLayer {
s.removeAvailableLayer(layer)
} else {
s.logger.Debugw("not removing exempt layer", "layer", layer)
+3 -3
View File
@@ -22,9 +22,9 @@ var (
func initRoomStats(nodeID string) {
promRoomTotal = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: livekitNamespace,
Subsystem: "room",
Name: "total",
Namespace: livekitNamespace,
Subsystem: "room",
Name: "total",
ConstLabels: prometheus.Labels{"node_id": nodeID},
})
promRoomDuration = prometheus.NewHistogram(prometheus.HistogramOpts{