diff --git a/pkg/rtc/dynacastmanager.go b/pkg/rtc/dynacastmanager.go index 20b35cb81..1de8d4d2a 100644 --- a/pkg/rtc/dynacastmanager.go +++ b/pkg/rtc/dynacastmanager.go @@ -59,7 +59,12 @@ func NewDynacastManager(params DynacastManagerParams) *DynacastManager { dynacastQuality: make(map[string]*DynacastQuality), maxSubscribedQuality: make(map[string]livekit.VideoQuality), committedMaxSubscribedQuality: make(map[string]livekit.VideoQuality), - qualityNotifyOpQueue: utils.NewOpsQueue("quality-notify", 64, true), + qualityNotifyOpQueue: utils.NewOpsQueue(utils.OpsQueueParams{ + Name: "quality-notify", + MinSize: 64, + FlushOnStop: true, + Logger: params.Logger, + }), } if params.DynacastPauseDelay > 0 { d.maxSubscribedQualityDebounce = debounce.New(params.DynacastPauseDelay) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 1146495de..3c45151aa 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -241,8 +241,12 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { return nil, ErrMissingGrants } p := &ParticipantImpl{ - params: params, - pubRTCPQueue: sutils.NewOpsQueue("pub-rtcp", 64, false), + params: params, + pubRTCPQueue: sutils.NewOpsQueue(sutils.OpsQueueParams{ + Name: "pub-rtcp", + MinSize: 64, + Logger: params.Logger, + }), pendingTracks: make(map[string]*pendingTrackInfo), pendingPublishingTracks: make(map[livekit.TrackID]*pendingTrackInfo), connectedAt: time.Now(), diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 09e124bae..29558ae47 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -44,6 +44,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/streamallocator" sfuutils "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" + "github.com/livekit/livekit-server/pkg/utils" sutils "github.com/livekit/livekit-server/pkg/utils" lkinterceptor "github.com/livekit/mediatransportutil/pkg/interceptor" lktwcc "github.com/livekit/mediatransportutil/pkg/twcc" @@ -384,10 +385,14 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) { params.Logger = logger.GetLogger() } t := &PCTransport{ - params: params, - debouncedNegotiate: debounce.New(negotiationFrequency), - negotiationState: transport.NegotiationStateNone, - eventsQueue: sutils.NewOpsQueue("transport", 64, false), + params: params, + debouncedNegotiate: debounce.New(negotiationFrequency), + negotiationState: transport.NegotiationStateNone, + eventsQueue: sutils.NewOpsQueue(utils.OpsQueueParams{ + Name: "transport", + MinSize: 64, + Logger: params.Logger, + }), previousTrackDescription: make(map[string]*trackDescription), canReuseTransceiver: true, connectionDetails: types.NewICEConnectionDetails(params.Transport, params.Logger), diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index beda5f10a..352b50c2e 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -180,7 +180,11 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { }), rateMonitor: NewRateMonitor(), videoTracks: make(map[livekit.TrackID]*Track), - eventsQueue: utils.NewOpsQueue("stream-allocator", 64, false), + eventsQueue: utils.NewOpsQueue(utils.OpsQueueParams{ + Name: "stream-allocator", + MinSize: 64, + Logger: params.Logger, + }), } s.probeController = NewProbeController(ProbeControllerParams{ diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 7bf544167..9695476ab 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -104,9 +104,14 @@ func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsSer t := &telemetryService{ AnalyticsService: analytics, - notifier: notifier, - jobsQueue: utils.NewOpsQueue("telemetry", jobsQueueMinSize, true), - workers: make(map[livekit.ParticipantID]*StatsWorker), + notifier: notifier, + jobsQueue: utils.NewOpsQueue(utils.OpsQueueParams{ + Name: "telemetry", + MinSize: jobsQueueMinSize, + FlushOnStop: true, + Logger: logger.GetLogger(), + }), + workers: make(map[livekit.ParticipantID]*StatsWorker), } t.jobsQueue.Start() diff --git a/pkg/utils/opsqueue.go b/pkg/utils/opsqueue.go index 19f8d7ecc..bda4ceea1 100644 --- a/pkg/utils/opsqueue.go +++ b/pkg/utils/opsqueue.go @@ -19,12 +19,19 @@ import ( "sync" "github.com/gammazero/deque" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" ) +type OpsQueueParams struct { + Name string + MinSize uint + FlushOnStop bool + Logger logger.Logger +} + type OpsQueue struct { - name string - flushOnStop bool + params OpsQueueParams lock sync.Mutex ops deque.Deque[func()] @@ -34,14 +41,13 @@ type OpsQueue struct { isStopped bool } -func NewOpsQueue(name string, minSize uint, flushOnStop bool) *OpsQueue { +func NewOpsQueue(params OpsQueueParams) *OpsQueue { oq := &OpsQueue{ - name: name, - flushOnStop: flushOnStop, - wake: make(chan struct{}, 1), - doneChan: make(chan struct{}), + params: params, + wake: make(chan struct{}, 1), + doneChan: make(chan struct{}), } - oq.ops.SetMinCapacity(uint(utils.Min(bits.Len64(uint64(minSize-1)), 7))) + oq.ops.SetMinCapacity(uint(utils.Min(bits.Len64(uint64(oq.params.MinSize-1)), 7))) return oq } @@ -95,7 +101,7 @@ func (oq *OpsQueue) process() { <-oq.wake for { oq.lock.Lock() - if oq.isStopped && (!oq.flushOnStop || oq.ops.Len() == 0) { + if oq.isStopped && (!oq.params.FlushOnStop || oq.ops.Len() == 0) { oq.lock.Unlock() return }