diff --git a/pkg/utils/opsqueue.go b/pkg/utils/opsqueue.go index 5ecb9ef0b..65e0dafe2 100644 --- a/pkg/utils/opsqueue.go +++ b/pkg/utils/opsqueue.go @@ -17,6 +17,8 @@ package utils import ( "sync" + "github.com/gammazero/deque" + "github.com/livekit/protocol/logger" ) @@ -27,16 +29,12 @@ type OpsQueueParams struct { Logger logger.Logger } -// ---------------------------- - type UntypedQueueOp func() func (op UntypedQueueOp) run() { op() } -// ---------------------------- - type OpsQueue struct { opsQueueBase[UntypedQueueOp] } @@ -45,8 +43,6 @@ func NewOpsQueue(params OpsQueueParams) *OpsQueue { return &OpsQueue{*newOpsQueueBase[UntypedQueueOp](params)} } -// ---------------------------- - type typedQueueOp[T any] struct { fn func(T) arg T @@ -56,8 +52,6 @@ func (op typedQueueOp[T]) run() { op.fn(op.arg) } -// ---------------------------- - type TypedOpsQueue[T any] struct { opsQueueBase[typedQueueOp[T]] } @@ -70,25 +64,15 @@ func (oq *TypedOpsQueue[T]) Enqueue(fn func(T), arg T) { oq.opsQueueBase.Enqueue(typedQueueOp[T]{fn, arg}) } -// ---------------------------- - type opsQueueItem interface { run() } -type element struct { - opsQueueItem - next *element -} - -// ---------------------------- - type opsQueueBase[T opsQueueItem] struct { params OpsQueueParams lock sync.Mutex - opsHead *element - opsTail *element + ops deque.Deque[T] wake chan struct{} isStarted bool doneChan chan struct{} @@ -101,6 +85,7 @@ func newOpsQueueBase[T opsQueueItem](params OpsQueueParams) *opsQueueBase[T] { wake: make(chan struct{}, 1), doneChan: make(chan struct{}), } + o.ops.SetBaseCap(int(min(params.MinSize, 128))) return o } @@ -138,19 +123,12 @@ func (oq *opsQueueBase[T]) Enqueue(op T) { return } - if oq.opsHead == nil { - oq.opsHead = &element{op, nil} - oq.opsTail = oq.opsHead - - // wake up on first entry in queue + oq.ops.PushBack(op) + if oq.ops.Len() == 1 { select { case oq.wake <- struct{}{}: default: } - } else { - elem := &element{op, nil} - oq.opsTail.next = elem - oq.opsTail = elem } } @@ -161,24 +139,19 @@ func (oq *opsQueueBase[T]) process() { <-oq.wake for { oq.lock.Lock() - if oq.isStopped && (!oq.params.FlushOnStop || oq.opsHead == nil) { + if oq.isStopped && (!oq.params.FlushOnStop || oq.ops.Len() == 0) { oq.lock.Unlock() return } - elem := oq.opsHead - if elem == nil { + if oq.ops.Len() == 0 { oq.lock.Unlock() break } - - oq.opsHead = elem.next - if oq.opsHead == nil { - oq.opsTail = nil - } + op := oq.ops.PopFront() oq.lock.Unlock() - elem.opsQueueItem.run() + op.run() } } }