Update protocol and deque

This commit is contained in:
Benjamin Pracht
2024-12-19 17:30:26 -08:00
parent 36338bab5c
commit 246fed2b0a
9 changed files with 25 additions and 20 deletions
+4 -4
View File
@@ -10,8 +10,8 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/elliotchance/orderedmap/v2 v2.4.0
github.com/florianl/go-tc v0.4.4
github.com/frostbyte73/core v0.0.13
github.com/gammazero/deque v0.2.1
github.com/frostbyte73/core v0.1.0
github.com/gammazero/deque v1.0.0
github.com/gammazero/workerpool v1.1.3
github.com/google/wire v0.6.0
github.com/gorilla/websocket v1.5.3
@@ -20,8 +20,8 @@ require (
github.com/jellydator/ttlcache/v3 v3.3.0
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98
github.com/livekit/protocol v1.29.5-0.20241217013317-bc388341b9f2
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564
github.com/livekit/protocol v1.29.5-0.20241219224350-c87b1afc6161
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
+8 -8
View File
@@ -68,12 +68,12 @@ github.com/florianl/go-tc v0.4.4 h1:q6lhEWEfyhGffRzdl3eIcNqX/yVIw0IJwXqa9Rdcctw=
github.com/florianl/go-tc v0.4.4/go.mod h1:uvp6pIlOw7Z8hhfnT5M4+V1hHVgZWRZwwMS8Z0JsRxc=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
github.com/frostbyte73/core v0.0.13 h1:W/NFPNiCkGTRzMWnCVptn6vX6Tr4a7LvN0RFc0xsC2k=
github.com/frostbyte73/core v0.0.13/go.mod h1:XsOGqrqe/VEV7+8vJ+3a8qnCIXNbKsoEiu/czs7nrcU=
github.com/frostbyte73/core v0.1.0 h1:KA4klxRjLbEHLv+judmlRtweyjcj1NWOJ+BQHQgNxfw=
github.com/frostbyte73/core v0.1.0/go.mod h1:mhfOtR+xWAvwXiwor7jnqPMnu4fxbv1F2MwZ0BEpzZo=
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q=
github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc=
github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k=
@@ -163,10 +163,10 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98 h1:QA7DqIC/ZSsMj8HC0+zNfMMwssHbA0alZALK68r30LQ=
github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98/go.mod h1:WIVFAGzVZ7VMjPC5+nbSfwdFjWcbuLgx97KeNSUDTEo=
github.com/livekit/protocol v1.29.5-0.20241217013317-bc388341b9f2 h1:knHtTlhR89ly9TZ2JiyfT1ibqziv/rDcfSf3voQw8rE=
github.com/livekit/protocol v1.29.5-0.20241217013317-bc388341b9f2/go.mod h1:NDg1btMpKCzr/w6QR5kDuXw/e4Y7yOBE+RUAHsc+Y/M=
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 h1:GX7KF/V9ExmcfT/2Bdia8aROjkxrgx7WpyH7w9MB4J4=
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY=
github.com/livekit/protocol v1.29.5-0.20241219224350-c87b1afc6161 h1:IT5WQA+96qmLOkffNKrp5qNuupzf/9O+xIdvGjv5JCI=
github.com/livekit/protocol v1.29.5-0.20241219224350-c87b1afc6161/go.mod h1:08wT2rI6GecTCwh9n8OA28Gb7ZQNtIR+hX/LccP1TaY=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 h1:Ibh0LoFl5NW5a1KFJEE0eLxxz7dqqKmYTj/BfCb0PbY=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
+1 -1
View File
@@ -62,7 +62,7 @@ func NewPublicationMonitor(params PublicationMonitorParams) *PublicationMonitor
params: params,
isConnected: params.IsPeerConnectionConnected,
}
p.desiredPublishes.SetMinCapacity(2)
p.desiredPublishes.SetBaseCap(4)
return p
}
+1 -1
View File
@@ -153,7 +153,7 @@ func NewBuffer(ssrc uint32, maxVideoPkts, maxAudioPkts int) *Buffer {
logger: l.WithComponent(sutils.ComponentPub).WithComponent(sutils.ComponentSFU),
}
b.readCond = sync.NewCond(&b.RWMutex)
b.extPackets.SetMinCapacity(7)
b.extPackets.SetBaseCap(128)
return b
}
+1 -1
View File
@@ -157,7 +157,7 @@ func NewProber(params ProberParams) *Prober {
p := &Prober{
params: params,
}
p.clusters.SetMinCapacity(2)
p.clusters.SetBaseCap(16)
return p
}
+2 -1
View File
@@ -37,10 +37,11 @@ func NewBitrateCalculator(duration time.Duration, window time.Duration) *Bitrate
c := &BitrateCalculator{
duration: duration,
windowDuration: window,
windows: deque.New[bitrateWindow](windowCnt+1, windowCnt+1),
windows: &deque.Deque[bitrateWindow]{},
start: now,
active: bitrateWindow{start: now},
}
c.windows.SetBaseCap(windowCnt + 1)
return c
}
+1 -1
View File
@@ -47,7 +47,7 @@ func NewLeakyBucket(logger logger.Logger, bwe bwe.BWE, interval time.Duration, b
interval: interval,
bitrate: bitrate,
}
l.packets.SetMinCapacity(9)
l.packets.SetBaseCap(512)
go l.sendWorker()
return l
+1 -1
View File
@@ -40,7 +40,7 @@ func NewNoQueue(logger logger.Logger, bwe bwe.BWE) *NoQueue {
logger: logger,
wake: make(chan struct{}, 1),
}
n.packets.SetMinCapacity(9)
n.packets.SetBaseCap(512)
go n.sendWorker()
return n
+6 -2
View File
@@ -81,12 +81,16 @@ type opsQueueBase[T opsQueueItem] struct {
}
func newOpsQueueBase[T opsQueueItem](params OpsQueueParams) *opsQueueBase[T] {
return &opsQueueBase[T]{
oq := &opsQueueBase[T]{
params: params,
ops: *deque.New[T](min(bits.Len64(uint64(params.MinSize-1)), 7)),
ops: deque.Deque[T]{},
wake: make(chan struct{}, 1),
doneChan: make(chan struct{}),
}
oq.ops.SetBaseCap(min(bits.Len64(uint64(params.MinSize-1)), 7))
return oq
}
func (oq *opsQueueBase[T]) Start() {