From 247807b2cc092ed8b58aa39992a13fea705eda99 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 1 Feb 2022 12:41:37 +0530 Subject: [PATCH] Use an atomic flag to stop stream allocator (#395) * Use an atomic flag to stop stream allocator * use a mutex for event channel * RLock while posting event * lock isStopped flag to prevent posting to closed channel --- pkg/sfu/streamallocator.go | 39 +++++++++++++++++--------------------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index 4bf5f692a..a29c41c3a 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -9,6 +9,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" "github.com/pion/interceptor/pkg/cc" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" @@ -143,9 +144,10 @@ type StreamAllocator struct { state State - chMu sync.RWMutex + eventChMu sync.RWMutex eventCh chan Event - runningCh chan struct{} + + isStopped utils.AtomicFlag } func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { @@ -156,8 +158,7 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { prober: NewProber(ProberParams{ Logger: params.Logger, }), - eventCh: make(chan Event, 20), - runningCh: make(chan struct{}), + eventCh: make(chan Event, 20), } s.initializeEstimate() @@ -173,11 +174,14 @@ func (s *StreamAllocator) Start() { } func (s *StreamAllocator) Stop() { - s.chMu.Lock() - defer s.chMu.Unlock() + s.eventChMu.Lock() + if !s.isStopped.TrySet(true) { + s.eventChMu.Unlock() + return + } - close(s.runningCh) close(s.eventCh) + s.eventChMu.Unlock() } func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) error) { @@ -302,14 +306,14 @@ func (s *StreamAllocator) onSendProbe(bytesToSend int) { } func (s *StreamAllocator) postEvent(event Event) { - s.chMu.RLock() - defer s.chMu.RUnlock() - - if !s.isRunning() { + s.eventChMu.RLock() + if s.isStopped.Get() { + s.eventChMu.RUnlock() return } s.eventCh <- event + s.eventChMu.RUnlock() } func (s *StreamAllocator) processEvents() { @@ -318,21 +322,12 @@ func (s *StreamAllocator) processEvents() { } } -func (s *StreamAllocator) isRunning() bool { - select { - case <-s.runningCh: - return false - default: - return true - } -} - func (s *StreamAllocator) ping() { ticker := time.NewTicker(time.Second) - for s.isRunning() { + for { <-ticker.C - if !s.isRunning() { + if s.isStopped.Get() { return }