Make OpsQueueParams to make it easier to understand args. (#2578)

This commit is contained in:
Raja Subramanian
2024-03-14 10:27:24 +05:30
committed by GitHub
parent e376625a13
commit 14321f21bf
6 changed files with 49 additions and 20 deletions
+6 -1
View File
@@ -59,7 +59,12 @@ func NewDynacastManager(params DynacastManagerParams) *DynacastManager {
dynacastQuality: make(map[string]*DynacastQuality),
maxSubscribedQuality: make(map[string]livekit.VideoQuality),
committedMaxSubscribedQuality: make(map[string]livekit.VideoQuality),
qualityNotifyOpQueue: utils.NewOpsQueue("quality-notify", 64, true),
qualityNotifyOpQueue: utils.NewOpsQueue(utils.OpsQueueParams{
Name: "quality-notify",
MinSize: 64,
FlushOnStop: true,
Logger: params.Logger,
}),
}
if params.DynacastPauseDelay > 0 {
d.maxSubscribedQualityDebounce = debounce.New(params.DynacastPauseDelay)
+6 -2
View File
@@ -241,8 +241,12 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
return nil, ErrMissingGrants
}
p := &ParticipantImpl{
params: params,
pubRTCPQueue: sutils.NewOpsQueue("pub-rtcp", 64, false),
params: params,
pubRTCPQueue: sutils.NewOpsQueue(sutils.OpsQueueParams{
Name: "pub-rtcp",
MinSize: 64,
Logger: params.Logger,
}),
pendingTracks: make(map[string]*pendingTrackInfo),
pendingPublishingTracks: make(map[livekit.TrackID]*pendingTrackInfo),
connectedAt: time.Now(),
+9 -4
View File
@@ -44,6 +44,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/streamallocator"
sfuutils "github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/livekit-server/pkg/utils"
sutils "github.com/livekit/livekit-server/pkg/utils"
lkinterceptor "github.com/livekit/mediatransportutil/pkg/interceptor"
lktwcc "github.com/livekit/mediatransportutil/pkg/twcc"
@@ -384,10 +385,14 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
params.Logger = logger.GetLogger()
}
t := &PCTransport{
params: params,
debouncedNegotiate: debounce.New(negotiationFrequency),
negotiationState: transport.NegotiationStateNone,
eventsQueue: sutils.NewOpsQueue("transport", 64, false),
params: params,
debouncedNegotiate: debounce.New(negotiationFrequency),
negotiationState: transport.NegotiationStateNone,
eventsQueue: sutils.NewOpsQueue(utils.OpsQueueParams{
Name: "transport",
MinSize: 64,
Logger: params.Logger,
}),
previousTrackDescription: make(map[string]*trackDescription),
canReuseTransceiver: true,
connectionDetails: types.NewICEConnectionDetails(params.Transport, params.Logger),
+5 -1
View File
@@ -180,7 +180,11 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
}),
rateMonitor: NewRateMonitor(),
videoTracks: make(map[livekit.TrackID]*Track),
eventsQueue: utils.NewOpsQueue("stream-allocator", 64, false),
eventsQueue: utils.NewOpsQueue(utils.OpsQueueParams{
Name: "stream-allocator",
MinSize: 64,
Logger: params.Logger,
}),
}
s.probeController = NewProbeController(ProbeControllerParams{
+8 -3
View File
@@ -104,9 +104,14 @@ func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsSer
t := &telemetryService{
AnalyticsService: analytics,
notifier: notifier,
jobsQueue: utils.NewOpsQueue("telemetry", jobsQueueMinSize, true),
workers: make(map[livekit.ParticipantID]*StatsWorker),
notifier: notifier,
jobsQueue: utils.NewOpsQueue(utils.OpsQueueParams{
Name: "telemetry",
MinSize: jobsQueueMinSize,
FlushOnStop: true,
Logger: logger.GetLogger(),
}),
workers: make(map[livekit.ParticipantID]*StatsWorker),
}
t.jobsQueue.Start()
+15 -9
View File
@@ -19,12 +19,19 @@ import (
"sync"
"github.com/gammazero/deque"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)
type OpsQueueParams struct {
Name string
MinSize uint
FlushOnStop bool
Logger logger.Logger
}
type OpsQueue struct {
name string
flushOnStop bool
params OpsQueueParams
lock sync.Mutex
ops deque.Deque[func()]
@@ -34,14 +41,13 @@ type OpsQueue struct {
isStopped bool
}
func NewOpsQueue(name string, minSize uint, flushOnStop bool) *OpsQueue {
func NewOpsQueue(params OpsQueueParams) *OpsQueue {
oq := &OpsQueue{
name: name,
flushOnStop: flushOnStop,
wake: make(chan struct{}, 1),
doneChan: make(chan struct{}),
params: params,
wake: make(chan struct{}, 1),
doneChan: make(chan struct{}),
}
oq.ops.SetMinCapacity(uint(utils.Min(bits.Len64(uint64(minSize-1)), 7)))
oq.ops.SetMinCapacity(uint(utils.Min(bits.Len64(uint64(oq.params.MinSize-1)), 7)))
return oq
}
@@ -95,7 +101,7 @@ func (oq *OpsQueue) process() {
<-oq.wake
for {
oq.lock.Lock()
if oq.isStopped && (!oq.flushOnStop || oq.ops.Len() == 0) {
if oq.isStopped && (!oq.params.FlushOnStop || oq.ops.Len() == 0) {
oq.lock.Unlock()
return
}