From cd718c84f6b2f22bfc5333a4eb7091e53ae40ebc Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 17 Nov 2024 12:14:46 +0530 Subject: [PATCH] Misc/minor clean up. (#3183) Cosmetic. While thinking through how to structure probing better, noticing small things here and there. Cleaning up and making some small PRs along the way. --- pkg/rtc/transport.go | 5 +- pkg/sfu/bwe/bwe.go | 3 + pkg/sfu/bwe/null_bwe.go | 8 +- pkg/sfu/bwe/sendsidebwe/packet_tracker.go | 8 +- pkg/sfu/ccutils/prober.go | 88 +++++++++++---------- pkg/sfu/pacer/base.go | 16 ++-- pkg/sfu/pacer/leaky_bucket.go | 6 +- pkg/sfu/pacer/no_queue.go | 6 +- pkg/sfu/pacer/pass_through.go | 6 +- pkg/sfu/streamallocator/probe_controller.go | 8 +- pkg/sfu/streamallocator/streamallocator.go | 10 +-- 11 files changed, 85 insertions(+), 79 deletions(-) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 8982dd4c1..803ebdfe2 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -458,12 +458,11 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) { if params.CongestionControlConfig.UseSendSideBWE || params.UseSendSideBWE { params.Logger.Infow("using send side BWE") - ssbwe := sendsidebwe.NewSendSideBWE(sendsidebwe.SendSideBWEParams{ + t.bwe = sendsidebwe.NewSendSideBWE(sendsidebwe.SendSideBWEParams{ Config: params.CongestionControlConfig.SendSideBWE, Logger: params.Logger, }) - t.pacer = pacer.NewNoQueue(params.Logger, ssbwe) - t.bwe = ssbwe + t.pacer = pacer.NewNoQueue(params.Logger, t.bwe) } else { t.bwe = remotebwe.NewRemoteBWE(remotebwe.RemoteBWEParams{ Config: params.CongestionControlConfig.RemoteBWE, diff --git a/pkg/sfu/bwe/bwe.go b/pkg/sfu/bwe/bwe.go index 372455228..400861c49 100644 --- a/pkg/sfu/bwe/bwe.go +++ b/pkg/sfu/bwe/bwe.go @@ -89,6 +89,9 @@ type BWE interface { repeatedNacks uint32, ) + // TWCC sequence number + RecordPacketSendAndGetSequenceNumber(atMicro int64, size int, isRTX bool) uint16 + HandleTWCCFeedback(report *rtcp.TransportLayerCC) ProbingStart(expectedBandwidthUsage int64) diff --git a/pkg/sfu/bwe/null_bwe.go b/pkg/sfu/bwe/null_bwe.go index b89081dd6..436e2e361 100644 --- a/pkg/sfu/bwe/null_bwe.go +++ b/pkg/sfu/bwe/null_bwe.go @@ -14,7 +14,9 @@ package bwe -import "github.com/pion/rtcp" +import ( + "github.com/pion/rtcp" +) type NullBWE struct { } @@ -25,6 +27,10 @@ func (n *NullBWE) Reset() {} func (n *NullBWE) Stop() {} +func (n *NullBWE) RecordPacketSendAndGetSequenceNumber(_atMicro int64, _size int, _isRTX bool) uint16 { + return 0 +} + func (n *NullBWE) HandleREMB( _receivedEstimate int64, _isProbeFinalizing bool, diff --git a/pkg/sfu/bwe/sendsidebwe/packet_tracker.go b/pkg/sfu/bwe/sendsidebwe/packet_tracker.go index a4022768d..92eeec1e5 100644 --- a/pkg/sfu/bwe/sendsidebwe/packet_tracker.go +++ b/pkg/sfu/bwe/sendsidebwe/packet_tracker.go @@ -17,7 +17,6 @@ package sendsidebwe import ( "math/rand" "sync" - "time" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils/mono" @@ -51,18 +50,17 @@ func newPacketTracker(params packetTrackerParams) *packetTracker { } // SSBWE-TODO: this potentially needs to take isProbe as argument? -func (p *packetTracker) RecordPacketSendAndGetSequenceNumber(at time.Time, size int, isRTX bool) uint16 { +func (p *packetTracker) RecordPacketSendAndGetSequenceNumber(atMicro int64, size int, isRTX bool) uint16 { p.lock.Lock() defer p.lock.Unlock() - sendTime := at.UnixMicro() if p.baseSendTime == 0 { - p.baseSendTime = sendTime + p.baseSendTime = atMicro } pi := p.getPacketInfo(uint16(p.sequenceNumber)) pi.sequenceNumber = p.sequenceNumber - pi.sendTime = sendTime - p.baseSendTime + pi.sendTime = atMicro - p.baseSendTime pi.recvTime = 0 pi.size = uint16(size) pi.isRTX = isRTX diff --git a/pkg/sfu/ccutils/prober.go b/pkg/sfu/ccutils/prober.go index b0cf62345..b37fdfbb1 100644 --- a/pkg/sfu/ccutils/prober.go +++ b/pkg/sfu/ccutils/prober.go @@ -137,11 +137,12 @@ type ProberListener interface { } type ProberParams struct { - Logger logger.Logger + Listener ProberListener + Logger logger.Logger } type Prober struct { - logger logger.Logger + params ProberParams clusterId atomic.Uint32 @@ -150,32 +151,16 @@ type Prober struct { activeCluster *Cluster activeStateQueue []bool activeStateQueueInProcess atomic.Bool - - listenerMu sync.RWMutex - listener ProberListener } func NewProber(params ProberParams) *Prober { p := &Prober{ - logger: params.Logger, + params: params, } p.clusters.SetMinCapacity(2) return p } -func (p *Prober) SetProberListener(listener ProberListener) { - p.listenerMu.Lock() - p.listener = listener - p.listenerMu.Unlock() -} - -func (p *Prober) getProberListener() ProberListener { - p.listenerMu.RLock() - defer p.listenerMu.RUnlock() - - return p.listener -} - func (p *Prober) IsRunning() bool { p.clustersMu.RLock() defer p.clustersMu.RUnlock() @@ -189,7 +174,7 @@ func (p *Prober) Reset() { p.clustersMu.Lock() if p.activeCluster != nil { - p.logger.Debugw("prober: resetting active cluster", "cluster", p.activeCluster.String()) + p.params.Logger.Debugw("prober: resetting active cluster", "cluster", p.activeCluster.String()) reset = true info = p.activeCluster.GetInfo() } @@ -201,8 +186,8 @@ func (p *Prober) Reset() { p.clustersMu.Unlock() if reset { - if pl := p.getProberListener(); pl != nil { - pl.OnProbeClusterDone(info) + if p.params.Listener != nil { + p.params.Listener.OnProbeClusterDone(info) } } @@ -215,8 +200,16 @@ func (p *Prober) AddCluster(mode ProbeClusterMode, desiredRateBps int, expectedR } clusterId := ProbeClusterId(p.clusterId.Inc()) - cluster := NewCluster(clusterId, mode, desiredRateBps, expectedRateBps, minDuration, maxDuration) - p.logger.Debugw("cluster added", "cluster", cluster.String()) + cluster := newCluster( + clusterId, + mode, + desiredRateBps, + expectedRateBps, + minDuration, + maxDuration, + p.params.Listener, + ) + p.params.Logger.Debugw("cluster added", "cluster", cluster.String()) p.pushBackClusterAndMaybeStart(cluster) @@ -314,8 +307,8 @@ func (p *Prober) processActiveStateQueue() { p.activeStateQueue = p.activeStateQueue[1:] p.clustersMu.Unlock() - if pl := p.getProberListener(); pl != nil { - pl.OnActiveChanged(isActive) + if p.params.Listener != nil { + p.params.Listener.OnActiveChanged(isActive) } } @@ -339,13 +332,13 @@ func (p *Prober) run() { return } - cluster.Process(p.getProberListener()) + cluster.Process() if cluster.IsFinished() { - p.logger.Debugw("cluster finished", "cluster", cluster.String()) + p.params.Logger.Debugw("cluster finished", "cluster", cluster.String()) - if pl := p.getProberListener(); pl != nil { - pl.OnProbeClusterDone(cluster.GetInfo()) + if p.params.Listener != nil { + p.params.Listener.OnProbeClusterDone(cluster.GetInfo()) } p.popFrontCluster(cluster) @@ -368,9 +361,9 @@ type ProbeClusterId uint32 const ( ProbeClusterIdInvalid ProbeClusterId = 0 - bucketDuration = 100 * time.Millisecond - bytesPerProbe = 1000 - minProbeRateBps = 10000 + cBucketDuration = 100 * time.Millisecond + cBytesPerProbe = 1000 + cMinProbeRateBps = 10000 ) // ----------------------------------- @@ -412,6 +405,7 @@ type Cluster struct { id ProbeClusterId mode ProbeClusterMode + listener ProberListener desiredBytes int minDuration time.Duration maxDuration time.Duration @@ -424,7 +418,15 @@ type Cluster struct { startTime time.Time } -func NewCluster(id ProbeClusterId, mode ProbeClusterMode, desiredRateBps int, expectedRateBps int, minDuration time.Duration, maxDuration time.Duration) *Cluster { +func newCluster( + id ProbeClusterId, + mode ProbeClusterMode, + desiredRateBps int, + expectedRateBps int, + minDuration time.Duration, + maxDuration time.Duration, + listener ProberListener, +) *Cluster { c := &Cluster{ id: id, mode: mode, @@ -439,7 +441,7 @@ func NewCluster(id ProbeClusterId, mode ProbeClusterMode, desiredRateBps int, ex func (c *Cluster) initBuckets(desiredRateBps int, expectedRateBps int, minDuration time.Duration) { // split into granular buckets // NOTE: splitting even if mode is unitform - numBuckets := int((minDuration.Milliseconds() + bucketDuration.Milliseconds() - 1) / bucketDuration.Milliseconds()) + numBuckets := int((minDuration.Milliseconds() + cBucketDuration.Milliseconds() - 1) / cBucketDuration.Milliseconds()) if numBuckets < 1 { numBuckets = 1 } @@ -458,17 +460,17 @@ func (c *Cluster) initBuckets(desiredRateBps int, expectedRateBps int, minDurati } bucketProbeRateBps := baseProbeRateBps * multiplier - if bucketProbeRateBps < minProbeRateBps { - bucketProbeRateBps = minProbeRateBps + if bucketProbeRateBps < cMinProbeRateBps { + bucketProbeRateBps = cMinProbeRateBps } bucketProbeRateBytesPerSec := (bucketProbeRateBps + 7) / 8 // pace based on bytes per probe - numProbesPerSec := (bucketProbeRateBytesPerSec + bytesPerProbe - 1) / bytesPerProbe + numProbesPerSec := (bucketProbeRateBytesPerSec + cBytesPerProbe - 1) / cBytesPerProbe sleepDurationMicroSeconds := int(float64(1_000_000)/float64(numProbesPerSec) + 0.5) - runningDesiredBytes += (((bucketProbeRateBytesPerSec + expectedRateBytesPerSec) * int(bucketDuration.Milliseconds())) + 999) / 1000 - runningDesiredElapsedTime += bucketDuration + runningDesiredBytes += (((bucketProbeRateBytesPerSec + expectedRateBytesPerSec) * int(cBucketDuration.Milliseconds())) + 999) / 1000 + runningDesiredElapsedTime += cBucketDuration c.buckets = append(c.buckets, clusterBucket{ desiredBytes: runningDesiredBytes, @@ -538,7 +540,7 @@ func (c *Cluster) GetInfo() ProbeClusterInfo { } } -func (c *Cluster) Process(pl ProberListener) { +func (c *Cluster) Process() { c.lock.RLock() timeElapsed := time.Since(c.startTime) @@ -568,8 +570,8 @@ func (c *Cluster) Process(pl ProberListener) { } c.lock.RUnlock() - if bytesShortFall > 0 && pl != nil { - pl.OnSendProbe(bytesShortFall) + if bytesShortFall > 0 && c.listener != nil { + c.listener.OnSendProbe(bytesShortFall) } // STREAM-ALLOCATOR-TODO look at adapting sleep time based on how many bytes and how much time is left diff --git a/pkg/sfu/pacer/base.go b/pkg/sfu/pacer/base.go index 9d6344cd0..18125e68f 100644 --- a/pkg/sfu/pacer/base.go +++ b/pkg/sfu/pacer/base.go @@ -19,7 +19,7 @@ import ( "io" "time" - "github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe" + "github.com/livekit/livekit-server/pkg/sfu/bwe" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils/mono" "github.com/pion/rtp" @@ -28,13 +28,13 @@ import ( type Base struct { logger logger.Logger - sendSideBWE *sendsidebwe.SendSideBWE + bwe bwe.BWE } -func NewBase(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) *Base { +func NewBase(logger logger.Logger, bwe bwe.BWE) *Base { return &Base{ - logger: logger, - sendSideBWE: sendSideBWE, + logger: logger, + bwe: bwe, } } @@ -84,9 +84,9 @@ func (b *Base) patchRTPHeaderExtensions(p *Packet) error { } } - if p.TransportWideExtID != 0 && b.sendSideBWE != nil { - twccSN := b.sendSideBWE.RecordPacketSendAndGetSequenceNumber( - sendingAt, + if p.TransportWideExtID != 0 && b.bwe != nil { + twccSN := b.bwe.RecordPacketSendAndGetSequenceNumber( + sendingAt.UnixMicro(), p.Header.MarshalSize()+len(p.Payload), p.IsRTX, ) diff --git a/pkg/sfu/pacer/leaky_bucket.go b/pkg/sfu/pacer/leaky_bucket.go index 325779f94..6565f786d 100644 --- a/pkg/sfu/pacer/leaky_bucket.go +++ b/pkg/sfu/pacer/leaky_bucket.go @@ -20,7 +20,7 @@ import ( "github.com/frostbyte73/core" "github.com/gammazero/deque" - "github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe" + "github.com/livekit/livekit-server/pkg/sfu/bwe" "github.com/livekit/protocol/logger" ) @@ -40,9 +40,9 @@ type LeakyBucket struct { stop core.Fuse } -func NewLeakyBucket(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE, interval time.Duration, bitrate int) *LeakyBucket { +func NewLeakyBucket(logger logger.Logger, bwe bwe.BWE, interval time.Duration, bitrate int) *LeakyBucket { l := &LeakyBucket{ - Base: NewBase(logger, sendSideBWE), + Base: NewBase(logger, bwe), logger: logger, interval: interval, bitrate: bitrate, diff --git a/pkg/sfu/pacer/no_queue.go b/pkg/sfu/pacer/no_queue.go index 5e0037fa9..fb18d0517 100644 --- a/pkg/sfu/pacer/no_queue.go +++ b/pkg/sfu/pacer/no_queue.go @@ -19,7 +19,7 @@ import ( "github.com/frostbyte73/core" "github.com/gammazero/deque" - "github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe" + "github.com/livekit/livekit-server/pkg/sfu/bwe" "github.com/livekit/protocol/logger" ) @@ -34,9 +34,9 @@ type NoQueue struct { stop core.Fuse } -func NewNoQueue(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) *NoQueue { +func NewNoQueue(logger logger.Logger, bwe bwe.BWE) *NoQueue { n := &NoQueue{ - Base: NewBase(logger, sendSideBWE), + Base: NewBase(logger, bwe), logger: logger, wake: make(chan struct{}, 1), } diff --git a/pkg/sfu/pacer/pass_through.go b/pkg/sfu/pacer/pass_through.go index 1bc33026d..21fd07e22 100644 --- a/pkg/sfu/pacer/pass_through.go +++ b/pkg/sfu/pacer/pass_through.go @@ -15,7 +15,7 @@ package pacer import ( - "github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe" + "github.com/livekit/livekit-server/pkg/sfu/bwe" "github.com/livekit/protocol/logger" ) @@ -23,9 +23,9 @@ type PassThrough struct { *Base } -func NewPassThrough(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) *PassThrough { +func NewPassThrough(logger logger.Logger, bwe bwe.BWE) *PassThrough { return &PassThrough{ - Base: NewBase(logger, sendSideBWE), + Base: NewBase(logger, bwe), } } diff --git a/pkg/sfu/streamallocator/probe_controller.go b/pkg/sfu/streamallocator/probe_controller.go index 411e46d7b..6ee6196e7 100644 --- a/pkg/sfu/streamallocator/probe_controller.go +++ b/pkg/sfu/streamallocator/probe_controller.go @@ -120,15 +120,13 @@ func (p *ProbeController) Reset() { func (p *ProbeController) ProbeClusterDone(info ccutils.ProbeClusterInfo) { p.lock.Lock() + defer p.lock.Unlock() if p.probeClusterId != info.Id { p.params.Logger.Debugw("not expected probe cluster", "probeClusterId", p.probeClusterId, "resetProbeClusterId", info.Id) - p.lock.Unlock() - return + } else { + p.doneProbeClusterInfo = info } - - p.doneProbeClusterInfo = info - p.lock.Unlock() } func (p *ProbeController) MaybeFinalizeProbe( diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 3c004b620..ff02a0185 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -210,9 +210,6 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b params: params, enabled: enabled, allowPause: allowPause, - prober: ccutils.NewProber(ccutils.ProberParams{ - Logger: params.Logger, - }), // STREAM-ALLOCATOR-DATA rateMonitor: NewRateMonitor(), videoTracks: make(map[livekit.TrackID]*Track), eventsQueue: utils.NewTypedOpsQueue[Event](utils.OpsQueueParams{ @@ -222,6 +219,11 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b }), } + s.prober = ccutils.NewProber(ccutils.ProberParams{ + Listener: s, + Logger: params.Logger, + }) + s.probeController = NewProbeController(ProbeControllerParams{ Config: s.params.Config.ProbeController, Prober: s.prober, @@ -230,8 +232,6 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b s.resetState() - s.prober.SetProberListener(s) - return s }