mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 19:55:41 +00:00
* Do not post close callback in ops queue if not started. Ops queue is started in `Bind()`. If `Close()` is called when bind did not happen (because the underlying peer connection closed before bind), the close callback does not run. Check if ops queue is running before posting close callback into the queue. Not pretty, but covers this case. Need to think about it more. * correct check
85 lines
1.2 KiB
Go
85 lines
1.2 KiB
Go
package utils
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/livekit/protocol/logger"
|
|
)
|
|
|
|
type OpsQueue struct {
|
|
logger logger.Logger
|
|
name string
|
|
size int
|
|
|
|
lock sync.RWMutex
|
|
ops chan func()
|
|
isStarted bool
|
|
isStopped bool
|
|
}
|
|
|
|
func NewOpsQueue(logger logger.Logger, name string, size int) *OpsQueue {
|
|
return &OpsQueue{
|
|
logger: logger,
|
|
name: name,
|
|
size: size,
|
|
ops: make(chan func(), size),
|
|
}
|
|
}
|
|
|
|
func (oq *OpsQueue) SetLogger(logger logger.Logger) {
|
|
oq.logger = logger
|
|
}
|
|
|
|
func (oq *OpsQueue) Start() {
|
|
oq.lock.Lock()
|
|
if oq.isStarted {
|
|
oq.lock.Unlock()
|
|
return
|
|
}
|
|
|
|
oq.isStarted = true
|
|
oq.lock.Unlock()
|
|
|
|
go oq.process()
|
|
}
|
|
|
|
func (oq *OpsQueue) Stop() {
|
|
oq.lock.Lock()
|
|
if oq.isStopped {
|
|
oq.lock.Unlock()
|
|
return
|
|
}
|
|
|
|
oq.isStopped = true
|
|
close(oq.ops)
|
|
oq.lock.Unlock()
|
|
}
|
|
|
|
func (oq *OpsQueue) IsStarted() bool {
|
|
oq.lock.RLock()
|
|
defer oq.lock.RUnlock()
|
|
|
|
return oq.isStarted
|
|
}
|
|
|
|
func (oq *OpsQueue) Enqueue(op func()) {
|
|
oq.lock.RLock()
|
|
if oq.isStopped {
|
|
oq.lock.RUnlock()
|
|
return
|
|
}
|
|
|
|
select {
|
|
case oq.ops <- op:
|
|
default:
|
|
oq.logger.Errorw("ops queue full", nil, "name", oq.name, "size", oq.size)
|
|
}
|
|
oq.lock.RUnlock()
|
|
}
|
|
|
|
func (oq *OpsQueue) process() {
|
|
for op := range oq.ops {
|
|
op()
|
|
}
|
|
}
|