Fixes from probe controller refactor (#3222)

* Fixes from probe controller refactor

* fmt

* static check
This commit is contained in:
Raja Subramanian
2024-11-30 13:34:01 +05:30
committed by GitHub
parent c76fb0bcf4
commit 8bb29c3a7b
10 changed files with 78 additions and 28 deletions
+4 -3
View File
@@ -19,9 +19,10 @@ import (
"log"
"net"
"os"
"sync/atomic"
"testing"
"go.uber.org/atomic"
"github.com/ory/dockertest/v3"
)
@@ -58,11 +59,11 @@ func waitTCPPort(t testing.TB, addr string) {
}
}
var redisLast uint32
var redisLast atomic.Uint32
func runRedis(t testing.TB) string {
c, err := Docker.RunWithOptions(&dockertest.RunOptions{
Name: fmt.Sprintf("lktest-redis-%d", atomic.AddUint32(&redisLast, 1)),
Name: fmt.Sprintf("lktest-redis-%d", redisLast.Inc()),
Repository: "redis", Tag: "latest",
})
if err != nil {
+15 -4
View File
@@ -18,6 +18,8 @@ import (
"fmt"
"time"
"go.uber.org/zap/zapcore"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/livekit/protocol/logger"
)
@@ -175,11 +177,9 @@ func (c *channelObserver) GetTrend() (channelTrend, channelCongestionReason) {
switch {
case estimateDirection == ccutils.TrendDirectionDownward:
c.logger.Debugw("remote bwe: channel observer: estimate is trending downward", "channel", c)
return channelTrendCongesting, channelCongestionReasonEstimate
case c.nackTracker.IsTriggered():
c.logger.Debugw("remote bwe: channel observer: high rate of repeated NACKs", "channel", c)
return channelTrendCongesting, channelCongestionReasonLoss
case estimateDirection == ccutils.TrendDirectionUpward:
@@ -189,8 +189,19 @@ func (c *channelObserver) GetTrend() (channelTrend, channelCongestionReason) {
return channelTrendNeutral, channelCongestionReasonNone
}
func (c *channelObserver) String() string {
return fmt.Sprintf("name: %s, estimate: {%v}, nack {%v}", c.params.Name, c.estimateTrend, c.nackTracker)
func (c *channelObserver) MarshalLogObject(e zapcore.ObjectEncoder) error {
if c == nil {
return nil
}
e.AddString("name", c.params.Name)
e.AddString("estimate", c.estimateTrend.String())
e.AddObject("nack", c.nackTracker)
channelTrend, channelCongestionReason := c.GetTrend()
e.AddString("channelTrend", channelTrend.String())
e.AddString("channelCongestionReason", channelCongestionReason.String())
return nil
}
// ------------------------------------------------
+17 -8
View File
@@ -15,9 +15,10 @@
package remotebwe
import (
"fmt"
"time"
"go.uber.org/zap/zapcore"
"github.com/livekit/protocol/logger"
)
@@ -116,14 +117,22 @@ func (n *nackTracker) IsTriggered() bool {
return false
}
func (n *nackTracker) String() string {
window := ""
if !n.windowStartTime.IsZero() {
now := time.Now()
elapsed := now.Sub(n.windowStartTime).Seconds()
window = fmt.Sprintf("t: %+v|%+v|%.2fs", n.windowStartTime.Format(time.UnixDate), now.Format(time.UnixDate), elapsed)
func (n *nackTracker) MarshalLogObject(e zapcore.ObjectEncoder) error {
if n == nil {
return nil
}
return fmt.Sprintf("n: %s, %s, p: %d, rn: %d, rn/p: %.2f", n.params.Name, window, n.packets, n.repeatedNacks, n.GetRatio())
e.AddString("name", n.params.Name)
if n.windowStartTime.IsZero() {
e.AddString("window", "inactive")
} else {
e.AddTime("windowStartTime", n.windowStartTime)
e.AddDuration("windowDuration", time.Since(n.windowStartTime))
e.AddUint32("packets", n.packets)
e.AddUint32("repeatedNacks", n.repeatedNacks)
e.AddFloat64("nackRatio", n.GetRatio())
}
return nil
}
/* REMOTE-BWE-DATA
+18 -3
View File
@@ -140,6 +140,10 @@ func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState
newState := r.congestionState
update := false
trend, reason := r.channelObserver.GetTrend()
if trend == channelTrendCongesting {
r.params.Logger.Debugw("remote bwe, channel congesting", "channel", r.channelObserver)
}
switch r.congestionState {
case bwe.CongestionStateNone:
if trend == channelTrendCongesting {
@@ -151,7 +155,7 @@ func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState
case bwe.CongestionStateCongested:
if trend == channelTrendCongesting {
if r.estimateAvailableChannelCapacity(reason) {
// update state sa this needs to reset switch time to wait for congestion min duration again
// update state as this needs to reset switch time to wait for congestion min duration again
update = true
}
} else if time.Since(r.congestionStateSwitchedAt) >= r.params.Config.CongestedMinDuration {
@@ -273,13 +277,24 @@ func (r *RemoteBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (bool, int64
pco := r.channelObserver
r.channelObserver = r.newChannelObserverNonProbe()
r.params.Logger.Debugw(
"remote bwe: probe done",
"lastReceived", r.lastReceivedEstimate,
"expectedBandwidthUsage", r.lastExpectedBandwidthUsage,
"channel", pco,
)
if !pco.HasEnoughEstimateSamples() {
// cannot decide success/failure without enough data
return false, pco.GetHighestEstimate()
return false, r.committedChannelCapacity
}
trend, _ := pco.GetTrend()
return trend == channelTrendClearing, pco.GetHighestEstimate()
highestEstimate := pco.GetHighestEstimate()
if trend == channelTrendClearing && highestEstimate > r.committedChannelCapacity {
r.committedChannelCapacity = highestEstimate
}
return trend == channelTrendClearing, r.committedChannelCapacity
}
func (r *RemoteBWE) worker() {
+8 -3
View File
@@ -352,6 +352,10 @@ func (p ProbeClusterResult) Duration() time.Duration {
return time.Duration(p.EndTime - p.StartTime)
}
func (p ProbeClusterResult) Bitrate() float64 {
return float64(p.Bytes()*8) / p.Duration().Seconds()
}
func (p ProbeClusterResult) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddTime("StartTime", time.Unix(0, p.StartTime))
e.AddTime("EndTime", time.Unix(0, p.EndTime))
@@ -360,6 +364,7 @@ func (p ProbeClusterResult) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddInt("BytesNonProbePrimary", p.BytesNonProbePrimary)
e.AddInt("BytesNonProbeRTX", p.BytesNonProbeRTX)
e.AddInt("Bytes", p.Bytes())
e.AddFloat64("Bitrate", p.Bitrate())
e.AddBool("IsCompleted", p.IsCompleted)
return nil
}
@@ -470,11 +475,11 @@ func (c *Cluster) Process() time.Duration {
return 0
}
sleepDuration := c.probeSleeps[c.probeIdx]
sleepDuration := c.probeSleeps[c.probeIdx%len(c.probeSleeps)]
c.probeIdx++
if c.probeIdx >= len(c.probeSleeps) {
// stay in the last bucket till desired number of bytes are sent
c.probeIdx = len(c.probeSleeps) - 1
// when overflowing, back off to ensure probe finishes, but not overshoot too much
sleepDuration *= time.Duration(c.probeIdx/len(c.probeSleeps) + 1)
}
c.lock.Unlock()
+2 -1
View File
@@ -3,9 +3,10 @@ package sfu
import (
"fmt"
"sync"
"sync/atomic"
"time"
"go.uber.org/atomic"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
+4 -2
View File
@@ -16,7 +16,9 @@ package pacer
import (
"sync"
"sync/atomic"
"time"
"go.uber.org/atomic"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/livekit/protocol/logger"
@@ -121,7 +123,7 @@ func (po *ProbeObserver) RecordPacket(size int, isRTX bool, probeClusterId ccuti
notify := false
var clusterId ccutils.ProbeClusterId
if po.pci.Result.EndTime == 0 && po.pci.Result.Bytes() >= po.pci.Goal.DesiredBytes {
if po.pci.Result.EndTime == 0 && ((po.pci.Result.Bytes() >= po.pci.Goal.DesiredBytes) && time.Duration(mono.UnixNano()-po.pci.Result.StartTime) >= po.pci.Goal.Duration) {
po.pci.Result.EndTime = mono.UnixNano()
po.pci.Result.IsCompleted = true
+1 -1
View File
@@ -16,12 +16,12 @@ package sfu
import (
"sync"
"sync/atomic"
"time"
pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay"
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
"github.com/livekit/protocol/logger"
"go.uber.org/atomic"
"go.uber.org/zap/zapcore"
)
+7 -2
View File
@@ -228,6 +228,7 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b
MinSize: 64,
Logger: params.Logger,
}),
lastRTTTime: time.Now().Add(-cRTTPullInterval),
}
s.prober = ccutils.NewProber(ccutils.ProberParams{
@@ -700,6 +701,11 @@ func (s *StreamAllocator) handleSignalPeriodicPing(Event) {
// finalize any probe that may have finished/aborted
if pci, ok := s.probeController.MaybeFinalizeProbe(); ok {
isCongestionClearing, channelCapacity := s.params.BWE.ProbeClusterDone(pci)
s.params.Logger.Debugw(
"stream allocator: probe result",
"isCongestionClearing", isCongestionClearing,
"channelCapacity", channelCapacity,
)
if isCongestionClearing {
if channelCapacity > s.committedChannelCapacity {
s.committedChannelCapacity = channelCapacity
@@ -765,7 +771,6 @@ func (s *StreamAllocator) handleSignalPacerProbeObserverClusterComplete(event Ev
probeClusterId, _ := event.Data.(ccutils.ProbeClusterId)
pci := s.params.Pacer.EndProbeCluster(probeClusterId)
s.probeController.ProbeClusterDone(pci)
s.params.BWE.ProbeClusterDone(pci)
}
func (s *StreamAllocator) handleSignalResume(event Event) {
@@ -849,7 +854,7 @@ func (s *StreamAllocator) handleSignalCongestionStateChange(event Event) {
}
if cscd.congestionState == bwe.CongestionStateCongested {
if s.probeController.GetActiveProbeClusterId() == ccutils.ProbeClusterIdInvalid {
if s.probeController.GetActiveProbeClusterId() != ccutils.ProbeClusterIdInvalid {
s.params.Logger.Infow(
"stream allocator: channel congestion detected, not updating channel capacity in active probe",
"old(bps)", s.committedChannelCapacity,
+2 -1
View File
@@ -19,10 +19,11 @@ package utils_test
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"go.uber.org/atomic"
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/testutils"