diff --git a/pkg/service/docker_test.go b/pkg/service/docker_test.go index 48937f3d4..a3da60484 100644 --- a/pkg/service/docker_test.go +++ b/pkg/service/docker_test.go @@ -19,9 +19,10 @@ import ( "log" "net" "os" - "sync/atomic" "testing" + "go.uber.org/atomic" + "github.com/ory/dockertest/v3" ) @@ -58,11 +59,11 @@ func waitTCPPort(t testing.TB, addr string) { } } -var redisLast uint32 +var redisLast atomic.Uint32 func runRedis(t testing.TB) string { c, err := Docker.RunWithOptions(&dockertest.RunOptions{ - Name: fmt.Sprintf("lktest-redis-%d", atomic.AddUint32(&redisLast, 1)), + Name: fmt.Sprintf("lktest-redis-%d", redisLast.Inc()), Repository: "redis", Tag: "latest", }) if err != nil { diff --git a/pkg/sfu/bwe/remotebwe/channel_observer.go b/pkg/sfu/bwe/remotebwe/channel_observer.go index 856d65537..584abae20 100644 --- a/pkg/sfu/bwe/remotebwe/channel_observer.go +++ b/pkg/sfu/bwe/remotebwe/channel_observer.go @@ -18,6 +18,8 @@ import ( "fmt" "time" + "go.uber.org/zap/zapcore" + "github.com/livekit/livekit-server/pkg/sfu/ccutils" "github.com/livekit/protocol/logger" ) @@ -175,11 +177,9 @@ func (c *channelObserver) GetTrend() (channelTrend, channelCongestionReason) { switch { case estimateDirection == ccutils.TrendDirectionDownward: - c.logger.Debugw("remote bwe: channel observer: estimate is trending downward", "channel", c) return channelTrendCongesting, channelCongestionReasonEstimate case c.nackTracker.IsTriggered(): - c.logger.Debugw("remote bwe: channel observer: high rate of repeated NACKs", "channel", c) return channelTrendCongesting, channelCongestionReasonLoss case estimateDirection == ccutils.TrendDirectionUpward: @@ -189,8 +189,19 @@ func (c *channelObserver) GetTrend() (channelTrend, channelCongestionReason) { return channelTrendNeutral, channelCongestionReasonNone } -func (c *channelObserver) String() string { - return fmt.Sprintf("name: %s, estimate: {%v}, nack {%v}", c.params.Name, c.estimateTrend, c.nackTracker) +func (c *channelObserver) MarshalLogObject(e zapcore.ObjectEncoder) error { + if c == nil { + return nil + } + + e.AddString("name", c.params.Name) + e.AddString("estimate", c.estimateTrend.String()) + e.AddObject("nack", c.nackTracker) + + channelTrend, channelCongestionReason := c.GetTrend() + e.AddString("channelTrend", channelTrend.String()) + e.AddString("channelCongestionReason", channelCongestionReason.String()) + return nil } // ------------------------------------------------ diff --git a/pkg/sfu/bwe/remotebwe/nack_tracker.go b/pkg/sfu/bwe/remotebwe/nack_tracker.go index c536ad7e3..d2d5b8d6d 100644 --- a/pkg/sfu/bwe/remotebwe/nack_tracker.go +++ b/pkg/sfu/bwe/remotebwe/nack_tracker.go @@ -15,9 +15,10 @@ package remotebwe import ( - "fmt" "time" + "go.uber.org/zap/zapcore" + "github.com/livekit/protocol/logger" ) @@ -116,14 +117,22 @@ func (n *nackTracker) IsTriggered() bool { return false } -func (n *nackTracker) String() string { - window := "" - if !n.windowStartTime.IsZero() { - now := time.Now() - elapsed := now.Sub(n.windowStartTime).Seconds() - window = fmt.Sprintf("t: %+v|%+v|%.2fs", n.windowStartTime.Format(time.UnixDate), now.Format(time.UnixDate), elapsed) +func (n *nackTracker) MarshalLogObject(e zapcore.ObjectEncoder) error { + if n == nil { + return nil } - return fmt.Sprintf("n: %s, %s, p: %d, rn: %d, rn/p: %.2f", n.params.Name, window, n.packets, n.repeatedNacks, n.GetRatio()) + + e.AddString("name", n.params.Name) + if n.windowStartTime.IsZero() { + e.AddString("window", "inactive") + } else { + e.AddTime("windowStartTime", n.windowStartTime) + e.AddDuration("windowDuration", time.Since(n.windowStartTime)) + e.AddUint32("packets", n.packets) + e.AddUint32("repeatedNacks", n.repeatedNacks) + e.AddFloat64("nackRatio", n.GetRatio()) + } + return nil } /* REMOTE-BWE-DATA diff --git a/pkg/sfu/bwe/remotebwe/remote_bwe.go b/pkg/sfu/bwe/remotebwe/remote_bwe.go index 4721f5f4b..33bad45a4 100644 --- a/pkg/sfu/bwe/remotebwe/remote_bwe.go +++ b/pkg/sfu/bwe/remotebwe/remote_bwe.go @@ -140,6 +140,10 @@ func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState newState := r.congestionState update := false trend, reason := r.channelObserver.GetTrend() + if trend == channelTrendCongesting { + r.params.Logger.Debugw("remote bwe, channel congesting", "channel", r.channelObserver) + } + switch r.congestionState { case bwe.CongestionStateNone: if trend == channelTrendCongesting { @@ -151,7 +155,7 @@ func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState case bwe.CongestionStateCongested: if trend == channelTrendCongesting { if r.estimateAvailableChannelCapacity(reason) { - // update state sa this needs to reset switch time to wait for congestion min duration again + // update state as this needs to reset switch time to wait for congestion min duration again update = true } } else if time.Since(r.congestionStateSwitchedAt) >= r.params.Config.CongestedMinDuration { @@ -273,13 +277,24 @@ func (r *RemoteBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (bool, int64 pco := r.channelObserver r.channelObserver = r.newChannelObserverNonProbe() + r.params.Logger.Debugw( + "remote bwe: probe done", + "lastReceived", r.lastReceivedEstimate, + "expectedBandwidthUsage", r.lastExpectedBandwidthUsage, + "channel", pco, + ) + if !pco.HasEnoughEstimateSamples() { // cannot decide success/failure without enough data - return false, pco.GetHighestEstimate() + return false, r.committedChannelCapacity } trend, _ := pco.GetTrend() - return trend == channelTrendClearing, pco.GetHighestEstimate() + highestEstimate := pco.GetHighestEstimate() + if trend == channelTrendClearing && highestEstimate > r.committedChannelCapacity { + r.committedChannelCapacity = highestEstimate + } + return trend == channelTrendClearing, r.committedChannelCapacity } func (r *RemoteBWE) worker() { diff --git a/pkg/sfu/ccutils/prober.go b/pkg/sfu/ccutils/prober.go index 88e127922..c3068354b 100644 --- a/pkg/sfu/ccutils/prober.go +++ b/pkg/sfu/ccutils/prober.go @@ -352,6 +352,10 @@ func (p ProbeClusterResult) Duration() time.Duration { return time.Duration(p.EndTime - p.StartTime) } +func (p ProbeClusterResult) Bitrate() float64 { + return float64(p.Bytes()*8) / p.Duration().Seconds() +} + func (p ProbeClusterResult) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddTime("StartTime", time.Unix(0, p.StartTime)) e.AddTime("EndTime", time.Unix(0, p.EndTime)) @@ -360,6 +364,7 @@ func (p ProbeClusterResult) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddInt("BytesNonProbePrimary", p.BytesNonProbePrimary) e.AddInt("BytesNonProbeRTX", p.BytesNonProbeRTX) e.AddInt("Bytes", p.Bytes()) + e.AddFloat64("Bitrate", p.Bitrate()) e.AddBool("IsCompleted", p.IsCompleted) return nil } @@ -470,11 +475,11 @@ func (c *Cluster) Process() time.Duration { return 0 } - sleepDuration := c.probeSleeps[c.probeIdx] + sleepDuration := c.probeSleeps[c.probeIdx%len(c.probeSleeps)] c.probeIdx++ if c.probeIdx >= len(c.probeSleeps) { - // stay in the last bucket till desired number of bytes are sent - c.probeIdx = len(c.probeSleeps) - 1 + // when overflowing, back off to ensure probe finishes, but not overshoot too much + sleepDuration *= time.Duration(c.probeIdx/len(c.probeSleeps) + 1) } c.lock.Unlock() diff --git a/pkg/sfu/forwardstats.go b/pkg/sfu/forwardstats.go index 5de0df3f4..f594b4154 100644 --- a/pkg/sfu/forwardstats.go +++ b/pkg/sfu/forwardstats.go @@ -3,9 +3,10 @@ package sfu import ( "fmt" "sync" - "sync/atomic" "time" + "go.uber.org/atomic" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" diff --git a/pkg/sfu/pacer/probe_observer.go b/pkg/sfu/pacer/probe_observer.go index 4fb9a568f..fd370d758 100644 --- a/pkg/sfu/pacer/probe_observer.go +++ b/pkg/sfu/pacer/probe_observer.go @@ -16,7 +16,9 @@ package pacer import ( "sync" - "sync/atomic" + "time" + + "go.uber.org/atomic" "github.com/livekit/livekit-server/pkg/sfu/ccutils" "github.com/livekit/protocol/logger" @@ -121,7 +123,7 @@ func (po *ProbeObserver) RecordPacket(size int, isRTX bool, probeClusterId ccuti notify := false var clusterId ccutils.ProbeClusterId - if po.pci.Result.EndTime == 0 && po.pci.Result.Bytes() >= po.pci.Goal.DesiredBytes { + if po.pci.Result.EndTime == 0 && ((po.pci.Result.Bytes() >= po.pci.Goal.DesiredBytes) && time.Duration(mono.UnixNano()-po.pci.Result.StartTime) >= po.pci.Goal.Duration) { po.pci.Result.EndTime = mono.UnixNano() po.pci.Result.IsCompleted = true diff --git a/pkg/sfu/playoutdelay.go b/pkg/sfu/playoutdelay.go index 6f3ab3aad..5fb70b001 100644 --- a/pkg/sfu/playoutdelay.go +++ b/pkg/sfu/playoutdelay.go @@ -16,12 +16,12 @@ package sfu import ( "sync" - "sync/atomic" "time" pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay" "github.com/livekit/livekit-server/pkg/sfu/rtpstats" "github.com/livekit/protocol/logger" + "go.uber.org/atomic" "go.uber.org/zap/zapcore" ) diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 67adcf59d..7b8396c76 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -228,6 +228,7 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b MinSize: 64, Logger: params.Logger, }), + lastRTTTime: time.Now().Add(-cRTTPullInterval), } s.prober = ccutils.NewProber(ccutils.ProberParams{ @@ -700,6 +701,11 @@ func (s *StreamAllocator) handleSignalPeriodicPing(Event) { // finalize any probe that may have finished/aborted if pci, ok := s.probeController.MaybeFinalizeProbe(); ok { isCongestionClearing, channelCapacity := s.params.BWE.ProbeClusterDone(pci) + s.params.Logger.Debugw( + "stream allocator: probe result", + "isCongestionClearing", isCongestionClearing, + "channelCapacity", channelCapacity, + ) if isCongestionClearing { if channelCapacity > s.committedChannelCapacity { s.committedChannelCapacity = channelCapacity @@ -765,7 +771,6 @@ func (s *StreamAllocator) handleSignalPacerProbeObserverClusterComplete(event Ev probeClusterId, _ := event.Data.(ccutils.ProbeClusterId) pci := s.params.Pacer.EndProbeCluster(probeClusterId) s.probeController.ProbeClusterDone(pci) - s.params.BWE.ProbeClusterDone(pci) } func (s *StreamAllocator) handleSignalResume(event Event) { @@ -849,7 +854,7 @@ func (s *StreamAllocator) handleSignalCongestionStateChange(event Event) { } if cscd.congestionState == bwe.CongestionStateCongested { - if s.probeController.GetActiveProbeClusterId() == ccutils.ProbeClusterIdInvalid { + if s.probeController.GetActiveProbeClusterId() != ccutils.ProbeClusterIdInvalid { s.params.Logger.Infow( "stream allocator: channel congestion detected, not updating channel capacity in active probe", "old(bps)", s.committedChannelCapacity, diff --git a/pkg/utils/incrementaldispatcher_test.go b/pkg/utils/incrementaldispatcher_test.go index 828019559..3e2d40edf 100644 --- a/pkg/utils/incrementaldispatcher_test.go +++ b/pkg/utils/incrementaldispatcher_test.go @@ -19,10 +19,11 @@ package utils_test import ( "fmt" "sync" - "sync/atomic" "testing" "time" + "go.uber.org/atomic" + "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/testutils"