diff --git a/pkg/sfu/streamallocator/probe_controller.go b/pkg/sfu/streamallocator/probe_controller.go index f32c38b14..edb0dfefe 100644 --- a/pkg/sfu/streamallocator/probe_controller.go +++ b/pkg/sfu/streamallocator/probe_controller.go @@ -39,7 +39,7 @@ type ProbeController struct { probeTrendObserved bool probeEndTime time.Time - onProbeSuccess func() + onProbeDone func(isSuccessful bool) } func NewProbeController(params ProbeControllerParams) *ProbeController { @@ -51,11 +51,11 @@ func NewProbeController(params ProbeControllerParams) *ProbeController { return p } -func (p *ProbeController) OnProbeSuccess(f func()) { +func (p *ProbeController) OnProbeDone(f func(isSuccessful bool)) { p.lock.Lock() defer p.lock.Unlock() - p.onProbeSuccess = f + p.onProbeDone = f } func (p *ProbeController) Reset() { @@ -79,15 +79,11 @@ func (p *ProbeController) ProbeClusterDone(info ProbeClusterInfo, lowestEstimate if p.abortedProbeClusterId == ProbeClusterIdInvalid { // successful probe, finalize isSuccessful := p.finalizeProbeLocked() - - var onProbeSuccess func() - if isSuccessful { - onProbeSuccess = p.onProbeSuccess - } + onProbeDone := p.onProbeDone p.lock.Unlock() - if onProbeSuccess != nil { - onProbeSuccess() + if onProbeDone != nil { + onProbeDone(isSuccessful) } return } @@ -145,19 +141,16 @@ func (p *ProbeController) CheckProbe(trend ChannelTrend, highestEstimate int64) func (p *ProbeController) MaybeFinalizeProbe() { p.lock.Lock() + var onProbeDone func(bool) isSuccessful := false if p.isInProbeLocked() && !p.probeEndTime.IsZero() && time.Now().After(p.probeEndTime) { isSuccessful = p.finalizeProbeLocked() - } - - var onProbeSuccess func() - if isSuccessful { - onProbeSuccess = p.onProbeSuccess + onProbeDone = p.onProbeDone } p.lock.Unlock() - if onProbeSuccess != nil { - onProbeSuccess() + if onProbeDone != nil { + onProbeDone(isSuccessful) } } diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 7cc35f84a..edea5051c 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -200,7 +200,7 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { Prober: s.prober, Logger: params.Logger, }) - s.probeController.OnProbeSuccess(s.onProbeSuccess) + s.probeController.OnProbeDone(s.onProbeDone) s.resetState() @@ -916,7 +916,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) { s.adjustState() } -func (s *StreamAllocator) onProbeSuccess() { +func (s *StreamAllocator) onProbeDone(isSuccessful bool) { highestEstimateInProbe := s.channelObserver.GetHighestEstimate() // @@ -931,6 +931,9 @@ func (s *StreamAllocator) onProbeSuccess() { // the send side is in full control of bandwidth estimation. // s.channelObserver = s.newChannelObserverNonProbe() + if !isSuccessful { + return + } // probe estimate is same or higher, commit it and try to allocate deficient tracks s.params.Logger.Infow(