mirror of
https://github.com/livekit/livekit.git
synced 2026-05-26 14:06:51 +00:00
Keep congestion state only in BWE. (#3224)
This commit is contained in:
@@ -71,6 +71,8 @@ type BWE interface {
|
||||
|
||||
HandleTWCCFeedback(report *rtcp.TransportLayerCC)
|
||||
|
||||
CongestionState() CongestionState
|
||||
|
||||
ProbeClusterStarting(pci ccutils.ProbeClusterInfo)
|
||||
ProbeClusterDone(pci ccutils.ProbeClusterInfo) (bool, int64)
|
||||
}
|
||||
|
||||
@@ -42,6 +42,10 @@ func (n *NullBWE) HandleREMB(
|
||||
|
||||
func (n *NullBWE) HandleTWCCFeedback(_report *rtcp.TransportLayerCC) {}
|
||||
|
||||
func (n *NullBWE) CongestionState() CongestionState {
|
||||
return CongestionStateNone
|
||||
}
|
||||
|
||||
func (n *NullBWE) ProbeClusterStarting(_pci ccutils.ProbeClusterInfo) {}
|
||||
|
||||
func (n *NullBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (bool, int64) {
|
||||
|
||||
@@ -136,6 +136,13 @@ func (r *RemoteBWE) HandleREMB(
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RemoteBWE) CongestionState() bwe.CongestionState {
|
||||
r.lock.RLock()
|
||||
defer r.lock.RUnlock()
|
||||
|
||||
return r.congestionState
|
||||
}
|
||||
|
||||
func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState, int64) {
|
||||
newState := r.congestionState
|
||||
update := false
|
||||
|
||||
@@ -331,6 +331,8 @@ func newCongestionDetector(params congestionDetectorParams) *congestionDetector
|
||||
twccFeedback: newTWCCFeedback(twccFeedbackParams{Logger: params.Logger}),
|
||||
wake: make(chan struct{}, 1),
|
||||
estimatedAvailableChannelCapacity: 100_000_000,
|
||||
congestionState: bwe.CongestionStateNone,
|
||||
congestionStateSwitchedAt: mono.Now(),
|
||||
}
|
||||
|
||||
c.feedbackReports.SetMinCapacity(3)
|
||||
@@ -339,14 +341,6 @@ func newCongestionDetector(params congestionDetectorParams) *congestionDetector
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *congestionDetector) Reset() {
|
||||
// SSBWE-TODO
|
||||
// 1. may be clear all packet groups?
|
||||
// 2. reset congestion state to none
|
||||
// 3. reset estimate to 100 Mbps
|
||||
// 4. reset packet_tracker?? maybe only the probe state??
|
||||
}
|
||||
|
||||
func (c *congestionDetector) Stop() {
|
||||
c.stop.Break()
|
||||
}
|
||||
@@ -377,6 +371,13 @@ func (c *congestionDetector) HandleTWCCFeedback(report *rtcp.TransportLayerCC) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *congestionDetector) CongestionState() bwe.CongestionState {
|
||||
c.lock.RLock()
|
||||
defer c.lock.RUnlock()
|
||||
|
||||
return c.congestionState
|
||||
}
|
||||
|
||||
func (c *congestionDetector) prunePacketGroups() {
|
||||
if len(c.packetGroups) == 0 {
|
||||
return
|
||||
|
||||
@@ -90,7 +90,10 @@ func (s *SendSideBWE) SetBWEListener(bweListener bwe.BWEListener) {
|
||||
}
|
||||
|
||||
func (s *SendSideBWE) Reset() {
|
||||
s.congestionDetector.Reset()
|
||||
s.congestionDetector = newCongestionDetector(congestionDetectorParams{
|
||||
Config: s.params.Config.CongestionDetector,
|
||||
Logger: s.params.Logger,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *SendSideBWE) Stop() {
|
||||
@@ -101,4 +104,8 @@ func (s *SendSideBWE) HandleTWCCFeedback(report *rtcp.TransportLayerCC) {
|
||||
s.congestionDetector.HandleTWCCFeedback(report)
|
||||
}
|
||||
|
||||
func (s *SendSideBWE) CongestionState() bwe.CongestionState {
|
||||
return s.congestionDetector.CongestionState()
|
||||
}
|
||||
|
||||
// ------------------------------------------------
|
||||
|
||||
@@ -203,9 +203,8 @@ type StreamAllocator struct {
|
||||
isAllocateAllPending bool
|
||||
rembTrackingSSRC uint32
|
||||
|
||||
state streamAllocatorState
|
||||
congestionState bwe.CongestionState
|
||||
isHolding bool
|
||||
state streamAllocatorState
|
||||
isHolding bool
|
||||
|
||||
eventsQueue *utils.TypedOpsQueue[Event]
|
||||
|
||||
@@ -220,9 +219,8 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b
|
||||
enabled: enabled,
|
||||
allowPause: allowPause,
|
||||
// STREAM-ALLOCATOR-DATA rateMonitor: NewRateMonitor(),
|
||||
videoTracks: make(map[livekit.TrackID]*Track),
|
||||
state: streamAllocatorStateStable,
|
||||
congestionState: bwe.CongestionStateNone,
|
||||
videoTracks: make(map[livekit.TrackID]*Track),
|
||||
state: streamAllocatorStateStable,
|
||||
eventsQueue: utils.NewTypedOpsQueue[Event](utils.OpsQueueParams{
|
||||
Name: "stream-allocator",
|
||||
MinSize: 64,
|
||||
@@ -881,8 +879,6 @@ func (s *StreamAllocator) handleSignalCongestionStateChange(event Event) {
|
||||
s.allocateAllTracks()
|
||||
}
|
||||
}
|
||||
|
||||
s.congestionState = cscd.congestionState
|
||||
}
|
||||
|
||||
func (s *StreamAllocator) setState(state streamAllocatorState) {
|
||||
@@ -1302,7 +1298,7 @@ func (s *StreamAllocator) maybeProbe() {
|
||||
return
|
||||
}
|
||||
|
||||
if s.congestionState != bwe.CongestionStateNone || !s.probeController.CanProbe() {
|
||||
if s.params.BWE.CongestionState() != bwe.CongestionStateNone || !s.probeController.CanProbe() {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user