diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 035472df3..f2cce209d 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "io" + "math" "strings" "sync" "time" @@ -625,21 +626,21 @@ func (d *DownTrack) keyFrameRequester() { return time.Duration(interval) * time.Millisecond } - interval := getInterval() - ticker := time.NewTicker(interval) - defer ticker.Stop() + timer := time.NewTimer(math.MaxInt64) + defer timer.Stop() - for { - if d.IsClosed() { - return + for !d.IsClosed() { + if !timer.Stop() { + <-timer.C } + timer.Reset(getInterval()) select { case _, more := <-d.keyFrameRequesterCh: if !more { return } - case <-ticker.C: + case <-timer.C: } locked, layer := d.forwarder.CheckSync() @@ -648,8 +649,6 @@ func (d *DownTrack) keyFrameRequester() { d.params.Receiver.SendPLI(layer, false) d.rtpStats.UpdateLayerLockPliAndTime(1) } - - ticker.Reset(getInterval()) } } diff --git a/pkg/utils/opsqueue.go b/pkg/utils/opsqueue.go index 9f6b2cb78..2513f0b8c 100644 --- a/pkg/utils/opsqueue.go +++ b/pkg/utils/opsqueue.go @@ -30,50 +30,46 @@ type OpsQueueParams struct { Logger logger.Logger } -type untypedOpsQueueOp func() +type UntypedQueueOp func() -func (it untypedOpsQueueOp) run() { +func (it UntypedQueueOp) run() { it() } type OpsQueue struct { - OpsQueueBase[untypedOpsQueueOp] + opsQueueBase[UntypedQueueOp] } func NewOpsQueue(params OpsQueueParams) *OpsQueue { - return &OpsQueue{ - OpsQueueBase: *newOpsQueueBase[untypedOpsQueueOp](params), - } + return &OpsQueue{*newOpsQueueBase[UntypedQueueOp](params)} } -type typedOpsQueueOp[T any] struct { +type typedQueueOp[T any] struct { fn func(T) arg T } -func (it typedOpsQueueOp[T]) run() { +func (it typedQueueOp[T]) run() { it.fn(it.arg) } type TypedOpsQueue[T any] struct { - OpsQueueBase[typedOpsQueueOp[T]] + opsQueueBase[typedQueueOp[T]] } func NewTypedOpsQueue[T any](params OpsQueueParams) *TypedOpsQueue[T] { - return &TypedOpsQueue[T]{ - OpsQueueBase: *newOpsQueueBase[typedOpsQueueOp[T]](params), - } + return &TypedOpsQueue[T]{*newOpsQueueBase[typedQueueOp[T]](params)} } func (oq *TypedOpsQueue[T]) Enqueue(fn func(T), arg T) { - oq.OpsQueueBase.Enqueue(typedOpsQueueOp[T]{fn, arg}) + oq.opsQueueBase.Enqueue(typedQueueOp[T]{fn, arg}) } type opsQueueItem interface { run() } -type OpsQueueBase[T opsQueueItem] struct { +type opsQueueBase[T opsQueueItem] struct { params OpsQueueParams lock sync.Mutex @@ -84,8 +80,8 @@ type OpsQueueBase[T opsQueueItem] struct { isStopped bool } -func newOpsQueueBase[T opsQueueItem](params OpsQueueParams) *OpsQueueBase[T] { - oq := &OpsQueueBase[T]{ +func newOpsQueueBase[T opsQueueItem](params OpsQueueParams) *opsQueueBase[T] { + oq := &opsQueueBase[T]{ params: params, wake: make(chan struct{}, 1), doneChan: make(chan struct{}), @@ -94,7 +90,7 @@ func newOpsQueueBase[T opsQueueItem](params OpsQueueParams) *OpsQueueBase[T] { return oq } -func (oq *OpsQueueBase[T]) Start() { +func (oq *opsQueueBase[T]) Start() { oq.lock.Lock() if oq.isStarted { oq.lock.Unlock() @@ -107,7 +103,7 @@ func (oq *OpsQueueBase[T]) Start() { go oq.process() } -func (oq *OpsQueueBase[T]) Stop() <-chan struct{} { +func (oq *opsQueueBase[T]) Stop() <-chan struct{} { oq.lock.Lock() if oq.isStopped { oq.lock.Unlock() @@ -120,7 +116,7 @@ func (oq *OpsQueueBase[T]) Stop() <-chan struct{} { return oq.doneChan } -func (oq *OpsQueueBase[T]) Enqueue(op T) { +func (oq *opsQueueBase[T]) Enqueue(op T) { oq.lock.Lock() defer oq.lock.Unlock() @@ -137,7 +133,7 @@ func (oq *OpsQueueBase[T]) Enqueue(op T) { } } -func (oq *OpsQueueBase[T]) process() { +func (oq *opsQueueBase[T]) process() { defer close(oq.doneChan) for {