Use an atomic flag to stop stream allocator (#395)

* Use an atomic flag to stop stream allocator

* use a mutex for event channel

* RLock while posting event

* lock isStopped flag to prevent posting to closed channel
This commit is contained in:
Raja Subramanian
2022-02-01 12:41:37 +05:30
committed by GitHub
parent 0574803be8
commit 247807b2cc
+17 -22
View File
@@ -9,6 +9,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/pion/interceptor/pkg/cc"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
@@ -143,9 +144,10 @@ type StreamAllocator struct {
state State
chMu sync.RWMutex
eventChMu sync.RWMutex
eventCh chan Event
runningCh chan struct{}
isStopped utils.AtomicFlag
}
func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
@@ -156,8 +158,7 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
prober: NewProber(ProberParams{
Logger: params.Logger,
}),
eventCh: make(chan Event, 20),
runningCh: make(chan struct{}),
eventCh: make(chan Event, 20),
}
s.initializeEstimate()
@@ -173,11 +174,14 @@ func (s *StreamAllocator) Start() {
}
func (s *StreamAllocator) Stop() {
s.chMu.Lock()
defer s.chMu.Unlock()
s.eventChMu.Lock()
if !s.isStopped.TrySet(true) {
s.eventChMu.Unlock()
return
}
close(s.runningCh)
close(s.eventCh)
s.eventChMu.Unlock()
}
func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) error) {
@@ -302,14 +306,14 @@ func (s *StreamAllocator) onSendProbe(bytesToSend int) {
}
func (s *StreamAllocator) postEvent(event Event) {
s.chMu.RLock()
defer s.chMu.RUnlock()
if !s.isRunning() {
s.eventChMu.RLock()
if s.isStopped.Get() {
s.eventChMu.RUnlock()
return
}
s.eventCh <- event
s.eventChMu.RUnlock()
}
func (s *StreamAllocator) processEvents() {
@@ -318,21 +322,12 @@ func (s *StreamAllocator) processEvents() {
}
}
func (s *StreamAllocator) isRunning() bool {
select {
case <-s.runningCh:
return false
default:
return true
}
}
func (s *StreamAllocator) ping() {
ticker := time.NewTicker(time.Second)
for s.isRunning() {
for {
<-ticker.C
if !s.isRunning() {
if s.isStopped.Get() {
return
}