Files
livekit/pkg/utils/opsqueue.go
Raja Subramanian d38566850a Do not post close callback in ops queue if not started. (#649)
* Do not post close callback in ops queue if not started.

Ops queue is started in `Bind()`. If `Close()` is called
when bind did not happen (because the underlying peer connection
closed before bind), the close callback does not run.

Check if ops queue is running before posting close callback
into the queue.

Not pretty, but covers this case. Need to think about it more.

* correct check
2022-04-24 11:40:22 +05:30

85 lines
1.2 KiB
Go

package utils
import (
"sync"
"github.com/livekit/protocol/logger"
)
type OpsQueue struct {
logger logger.Logger
name string
size int
lock sync.RWMutex
ops chan func()
isStarted bool
isStopped bool
}
func NewOpsQueue(logger logger.Logger, name string, size int) *OpsQueue {
return &OpsQueue{
logger: logger,
name: name,
size: size,
ops: make(chan func(), size),
}
}
func (oq *OpsQueue) SetLogger(logger logger.Logger) {
oq.logger = logger
}
func (oq *OpsQueue) Start() {
oq.lock.Lock()
if oq.isStarted {
oq.lock.Unlock()
return
}
oq.isStarted = true
oq.lock.Unlock()
go oq.process()
}
func (oq *OpsQueue) Stop() {
oq.lock.Lock()
if oq.isStopped {
oq.lock.Unlock()
return
}
oq.isStopped = true
close(oq.ops)
oq.lock.Unlock()
}
func (oq *OpsQueue) IsStarted() bool {
oq.lock.RLock()
defer oq.lock.RUnlock()
return oq.isStarted
}
func (oq *OpsQueue) Enqueue(op func()) {
oq.lock.RLock()
if oq.isStopped {
oq.lock.RUnlock()
return
}
select {
case oq.ops <- op:
default:
oq.logger.Errorw("ops queue full", nil, "name", oq.name, "size", oq.size)
}
oq.lock.RUnlock()
}
func (oq *OpsQueue) process() {
for op := range oq.ops {
op()
}
}