From 3c42ccbb64f3fe50de37fb5a568da2696c864c46 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 2 Dec 2024 09:42:51 +0530 Subject: [PATCH] Keep congestion state only in BWE. (#3224) --- pkg/sfu/bwe/bwe.go | 2 ++ pkg/sfu/bwe/null_bwe.go | 4 ++++ pkg/sfu/bwe/remotebwe/remote_bwe.go | 7 +++++++ pkg/sfu/bwe/sendsidebwe/congestion_detector.go | 17 +++++++++-------- pkg/sfu/bwe/sendsidebwe/send_side_bwe.go | 9 ++++++++- pkg/sfu/streamallocator/streamallocator.go | 14 +++++--------- 6 files changed, 35 insertions(+), 18 deletions(-) diff --git a/pkg/sfu/bwe/bwe.go b/pkg/sfu/bwe/bwe.go index c15eea60b..ec841df6e 100644 --- a/pkg/sfu/bwe/bwe.go +++ b/pkg/sfu/bwe/bwe.go @@ -71,6 +71,8 @@ type BWE interface { HandleTWCCFeedback(report *rtcp.TransportLayerCC) + CongestionState() CongestionState + ProbeClusterStarting(pci ccutils.ProbeClusterInfo) ProbeClusterDone(pci ccutils.ProbeClusterInfo) (bool, int64) } diff --git a/pkg/sfu/bwe/null_bwe.go b/pkg/sfu/bwe/null_bwe.go index 4f810c678..a5673e76d 100644 --- a/pkg/sfu/bwe/null_bwe.go +++ b/pkg/sfu/bwe/null_bwe.go @@ -42,6 +42,10 @@ func (n *NullBWE) HandleREMB( func (n *NullBWE) HandleTWCCFeedback(_report *rtcp.TransportLayerCC) {} +func (n *NullBWE) CongestionState() CongestionState { + return CongestionStateNone +} + func (n *NullBWE) ProbeClusterStarting(_pci ccutils.ProbeClusterInfo) {} func (n *NullBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (bool, int64) { diff --git a/pkg/sfu/bwe/remotebwe/remote_bwe.go b/pkg/sfu/bwe/remotebwe/remote_bwe.go index 33bad45a4..c934db456 100644 --- a/pkg/sfu/bwe/remotebwe/remote_bwe.go +++ b/pkg/sfu/bwe/remotebwe/remote_bwe.go @@ -136,6 +136,13 @@ func (r *RemoteBWE) HandleREMB( } } +func (r *RemoteBWE) CongestionState() bwe.CongestionState { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.congestionState +} + func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState, int64) { newState := r.congestionState update := false diff --git a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go index c525c9588..23704448e 100644 --- a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go +++ b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go @@ -331,6 +331,8 @@ func newCongestionDetector(params congestionDetectorParams) *congestionDetector twccFeedback: newTWCCFeedback(twccFeedbackParams{Logger: params.Logger}), wake: make(chan struct{}, 1), estimatedAvailableChannelCapacity: 100_000_000, + congestionState: bwe.CongestionStateNone, + congestionStateSwitchedAt: mono.Now(), } c.feedbackReports.SetMinCapacity(3) @@ -339,14 +341,6 @@ func newCongestionDetector(params congestionDetectorParams) *congestionDetector return c } -func (c *congestionDetector) Reset() { - // SSBWE-TODO - // 1. may be clear all packet groups? - // 2. reset congestion state to none - // 3. reset estimate to 100 Mbps - // 4. reset packet_tracker?? maybe only the probe state?? -} - func (c *congestionDetector) Stop() { c.stop.Break() } @@ -377,6 +371,13 @@ func (c *congestionDetector) HandleTWCCFeedback(report *rtcp.TransportLayerCC) { } } +func (c *congestionDetector) CongestionState() bwe.CongestionState { + c.lock.RLock() + defer c.lock.RUnlock() + + return c.congestionState +} + func (c *congestionDetector) prunePacketGroups() { if len(c.packetGroups) == 0 { return diff --git a/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go b/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go index 653b2e3e5..e0ab0976a 100644 --- a/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go +++ b/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go @@ -90,7 +90,10 @@ func (s *SendSideBWE) SetBWEListener(bweListener bwe.BWEListener) { } func (s *SendSideBWE) Reset() { - s.congestionDetector.Reset() + s.congestionDetector = newCongestionDetector(congestionDetectorParams{ + Config: s.params.Config.CongestionDetector, + Logger: s.params.Logger, + }) } func (s *SendSideBWE) Stop() { @@ -101,4 +104,8 @@ func (s *SendSideBWE) HandleTWCCFeedback(report *rtcp.TransportLayerCC) { s.congestionDetector.HandleTWCCFeedback(report) } +func (s *SendSideBWE) CongestionState() bwe.CongestionState { + return s.congestionDetector.CongestionState() +} + // ------------------------------------------------ diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 7b8396c76..67959906e 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -203,9 +203,8 @@ type StreamAllocator struct { isAllocateAllPending bool rembTrackingSSRC uint32 - state streamAllocatorState - congestionState bwe.CongestionState - isHolding bool + state streamAllocatorState + isHolding bool eventsQueue *utils.TypedOpsQueue[Event] @@ -220,9 +219,8 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b enabled: enabled, allowPause: allowPause, // STREAM-ALLOCATOR-DATA rateMonitor: NewRateMonitor(), - videoTracks: make(map[livekit.TrackID]*Track), - state: streamAllocatorStateStable, - congestionState: bwe.CongestionStateNone, + videoTracks: make(map[livekit.TrackID]*Track), + state: streamAllocatorStateStable, eventsQueue: utils.NewTypedOpsQueue[Event](utils.OpsQueueParams{ Name: "stream-allocator", MinSize: 64, @@ -881,8 +879,6 @@ func (s *StreamAllocator) handleSignalCongestionStateChange(event Event) { s.allocateAllTracks() } } - - s.congestionState = cscd.congestionState } func (s *StreamAllocator) setState(state streamAllocatorState) { @@ -1302,7 +1298,7 @@ func (s *StreamAllocator) maybeProbe() { return } - if s.congestionState != bwe.CongestionStateNone || !s.probeController.CanProbe() { + if s.params.BWE.CongestionState() != bwe.CongestionStateNone || !s.probeController.CanProbe() { return }