From 246fed2b0aca1771beea399a720a92ff4d881775 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 19 Dec 2024 17:30:26 -0800 Subject: [PATCH] Update protocol and deque --- go.mod | 8 ++++---- go.sum | 16 ++++++++-------- pkg/rtc/supervisor/publication_monitor.go | 2 +- pkg/sfu/buffer/buffer.go | 2 +- pkg/sfu/ccutils/prober.go | 2 +- pkg/sfu/datachannel/bitrate.go | 3 ++- pkg/sfu/pacer/leaky_bucket.go | 2 +- pkg/sfu/pacer/no_queue.go | 2 +- pkg/utils/opsqueue.go | 8 ++++++-- 9 files changed, 25 insertions(+), 20 deletions(-) diff --git a/go.mod b/go.mod index f5376a0b2..7ab477910 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,8 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/elliotchance/orderedmap/v2 v2.4.0 github.com/florianl/go-tc v0.4.4 - github.com/frostbyte73/core v0.0.13 - github.com/gammazero/deque v0.2.1 + github.com/frostbyte73/core v0.1.0 + github.com/gammazero/deque v1.0.0 github.com/gammazero/workerpool v1.1.3 github.com/google/wire v0.6.0 github.com/gorilla/websocket v1.5.3 @@ -20,8 +20,8 @@ require ( github.com/jellydator/ttlcache/v3 v3.3.0 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98 - github.com/livekit/protocol v1.29.5-0.20241217013317-bc388341b9f2 + github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 + github.com/livekit/protocol v1.29.5-0.20241219224350-c87b1afc6161 github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index fe7cca267..ff96992c8 100644 --- a/go.sum +++ b/go.sum @@ -68,12 +68,12 @@ github.com/florianl/go-tc v0.4.4 h1:q6lhEWEfyhGffRzdl3eIcNqX/yVIw0IJwXqa9Rdcctw= github.com/florianl/go-tc v0.4.4/go.mod h1:uvp6pIlOw7Z8hhfnT5M4+V1hHVgZWRZwwMS8Z0JsRxc= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= -github.com/frostbyte73/core v0.0.13 h1:W/NFPNiCkGTRzMWnCVptn6vX6Tr4a7LvN0RFc0xsC2k= -github.com/frostbyte73/core v0.0.13/go.mod h1:XsOGqrqe/VEV7+8vJ+3a8qnCIXNbKsoEiu/czs7nrcU= +github.com/frostbyte73/core v0.1.0 h1:KA4klxRjLbEHLv+judmlRtweyjcj1NWOJ+BQHQgNxfw= +github.com/frostbyte73/core v0.1.0/go.mod h1:mhfOtR+xWAvwXiwor7jnqPMnu4fxbv1F2MwZ0BEpzZo= github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= -github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= -github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= +github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= +github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q= github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc= github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k= @@ -163,10 +163,10 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98 h1:QA7DqIC/ZSsMj8HC0+zNfMMwssHbA0alZALK68r30LQ= -github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98/go.mod h1:WIVFAGzVZ7VMjPC5+nbSfwdFjWcbuLgx97KeNSUDTEo= -github.com/livekit/protocol v1.29.5-0.20241217013317-bc388341b9f2 h1:knHtTlhR89ly9TZ2JiyfT1ibqziv/rDcfSf3voQw8rE= -github.com/livekit/protocol v1.29.5-0.20241217013317-bc388341b9f2/go.mod h1:NDg1btMpKCzr/w6QR5kDuXw/e4Y7yOBE+RUAHsc+Y/M= +github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 h1:GX7KF/V9ExmcfT/2Bdia8aROjkxrgx7WpyH7w9MB4J4= +github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY= +github.com/livekit/protocol v1.29.5-0.20241219224350-c87b1afc6161 h1:IT5WQA+96qmLOkffNKrp5qNuupzf/9O+xIdvGjv5JCI= +github.com/livekit/protocol v1.29.5-0.20241219224350-c87b1afc6161/go.mod h1:08wT2rI6GecTCwh9n8OA28Gb7ZQNtIR+hX/LccP1TaY= github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 h1:Ibh0LoFl5NW5a1KFJEE0eLxxz7dqqKmYTj/BfCb0PbY= github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/rtc/supervisor/publication_monitor.go b/pkg/rtc/supervisor/publication_monitor.go index c5c61c557..199d2188f 100644 --- a/pkg/rtc/supervisor/publication_monitor.go +++ b/pkg/rtc/supervisor/publication_monitor.go @@ -62,7 +62,7 @@ func NewPublicationMonitor(params PublicationMonitorParams) *PublicationMonitor params: params, isConnected: params.IsPeerConnectionConnected, } - p.desiredPublishes.SetMinCapacity(2) + p.desiredPublishes.SetBaseCap(4) return p } diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 2115861cc..8e79e0d31 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -153,7 +153,7 @@ func NewBuffer(ssrc uint32, maxVideoPkts, maxAudioPkts int) *Buffer { logger: l.WithComponent(sutils.ComponentPub).WithComponent(sutils.ComponentSFU), } b.readCond = sync.NewCond(&b.RWMutex) - b.extPackets.SetMinCapacity(7) + b.extPackets.SetBaseCap(128) return b } diff --git a/pkg/sfu/ccutils/prober.go b/pkg/sfu/ccutils/prober.go index e3428c112..c908e6053 100644 --- a/pkg/sfu/ccutils/prober.go +++ b/pkg/sfu/ccutils/prober.go @@ -157,7 +157,7 @@ func NewProber(params ProberParams) *Prober { p := &Prober{ params: params, } - p.clusters.SetMinCapacity(2) + p.clusters.SetBaseCap(16) return p } diff --git a/pkg/sfu/datachannel/bitrate.go b/pkg/sfu/datachannel/bitrate.go index 1e31a8c2b..cdce8434c 100644 --- a/pkg/sfu/datachannel/bitrate.go +++ b/pkg/sfu/datachannel/bitrate.go @@ -37,10 +37,11 @@ func NewBitrateCalculator(duration time.Duration, window time.Duration) *Bitrate c := &BitrateCalculator{ duration: duration, windowDuration: window, - windows: deque.New[bitrateWindow](windowCnt+1, windowCnt+1), + windows: &deque.Deque[bitrateWindow]{}, start: now, active: bitrateWindow{start: now}, } + c.windows.SetBaseCap(windowCnt + 1) return c } diff --git a/pkg/sfu/pacer/leaky_bucket.go b/pkg/sfu/pacer/leaky_bucket.go index 6565f786d..db5c89316 100644 --- a/pkg/sfu/pacer/leaky_bucket.go +++ b/pkg/sfu/pacer/leaky_bucket.go @@ -47,7 +47,7 @@ func NewLeakyBucket(logger logger.Logger, bwe bwe.BWE, interval time.Duration, b interval: interval, bitrate: bitrate, } - l.packets.SetMinCapacity(9) + l.packets.SetBaseCap(512) go l.sendWorker() return l diff --git a/pkg/sfu/pacer/no_queue.go b/pkg/sfu/pacer/no_queue.go index 70d138b20..7d7858990 100644 --- a/pkg/sfu/pacer/no_queue.go +++ b/pkg/sfu/pacer/no_queue.go @@ -40,7 +40,7 @@ func NewNoQueue(logger logger.Logger, bwe bwe.BWE) *NoQueue { logger: logger, wake: make(chan struct{}, 1), } - n.packets.SetMinCapacity(9) + n.packets.SetBaseCap(512) go n.sendWorker() return n diff --git a/pkg/utils/opsqueue.go b/pkg/utils/opsqueue.go index c4e00e4ec..36c901aeb 100644 --- a/pkg/utils/opsqueue.go +++ b/pkg/utils/opsqueue.go @@ -81,12 +81,16 @@ type opsQueueBase[T opsQueueItem] struct { } func newOpsQueueBase[T opsQueueItem](params OpsQueueParams) *opsQueueBase[T] { - return &opsQueueBase[T]{ + oq := &opsQueueBase[T]{ params: params, - ops: *deque.New[T](min(bits.Len64(uint64(params.MinSize-1)), 7)), + ops: deque.Deque[T]{}, wake: make(chan struct{}, 1), doneChan: make(chan struct{}), } + + oq.ops.SetBaseCap(min(bits.Len64(uint64(params.MinSize-1)), 7)) + + return oq } func (oq *opsQueueBase[T]) Start() {