Misc/minor clean up. (#3183)

Cosmetic. While thinking through how to structure probing better,
noticing small things here and there. Cleaning up and making some small
PRs along the way.
This commit is contained in:
Raja Subramanian
2024-11-17 12:14:46 +05:30
committed by GitHub
parent aa2ce22655
commit cd718c84f6
11 changed files with 85 additions and 79 deletions

View File

@@ -458,12 +458,11 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
if params.CongestionControlConfig.UseSendSideBWE || params.UseSendSideBWE {
params.Logger.Infow("using send side BWE")
ssbwe := sendsidebwe.NewSendSideBWE(sendsidebwe.SendSideBWEParams{
t.bwe = sendsidebwe.NewSendSideBWE(sendsidebwe.SendSideBWEParams{
Config: params.CongestionControlConfig.SendSideBWE,
Logger: params.Logger,
})
t.pacer = pacer.NewNoQueue(params.Logger, ssbwe)
t.bwe = ssbwe
t.pacer = pacer.NewNoQueue(params.Logger, t.bwe)
} else {
t.bwe = remotebwe.NewRemoteBWE(remotebwe.RemoteBWEParams{
Config: params.CongestionControlConfig.RemoteBWE,

View File

@@ -89,6 +89,9 @@ type BWE interface {
repeatedNacks uint32,
)
// TWCC sequence number
RecordPacketSendAndGetSequenceNumber(atMicro int64, size int, isRTX bool) uint16
HandleTWCCFeedback(report *rtcp.TransportLayerCC)
ProbingStart(expectedBandwidthUsage int64)

View File

@@ -14,7 +14,9 @@
package bwe
import "github.com/pion/rtcp"
import (
"github.com/pion/rtcp"
)
type NullBWE struct {
}
@@ -25,6 +27,10 @@ func (n *NullBWE) Reset() {}
func (n *NullBWE) Stop() {}
func (n *NullBWE) RecordPacketSendAndGetSequenceNumber(_atMicro int64, _size int, _isRTX bool) uint16 {
return 0
}
func (n *NullBWE) HandleREMB(
_receivedEstimate int64,
_isProbeFinalizing bool,

View File

@@ -17,7 +17,6 @@ package sendsidebwe
import (
"math/rand"
"sync"
"time"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
@@ -51,18 +50,17 @@ func newPacketTracker(params packetTrackerParams) *packetTracker {
}
// SSBWE-TODO: this potentially needs to take isProbe as argument?
func (p *packetTracker) RecordPacketSendAndGetSequenceNumber(at time.Time, size int, isRTX bool) uint16 {
func (p *packetTracker) RecordPacketSendAndGetSequenceNumber(atMicro int64, size int, isRTX bool) uint16 {
p.lock.Lock()
defer p.lock.Unlock()
sendTime := at.UnixMicro()
if p.baseSendTime == 0 {
p.baseSendTime = sendTime
p.baseSendTime = atMicro
}
pi := p.getPacketInfo(uint16(p.sequenceNumber))
pi.sequenceNumber = p.sequenceNumber
pi.sendTime = sendTime - p.baseSendTime
pi.sendTime = atMicro - p.baseSendTime
pi.recvTime = 0
pi.size = uint16(size)
pi.isRTX = isRTX

View File

@@ -137,11 +137,12 @@ type ProberListener interface {
}
type ProberParams struct {
Logger logger.Logger
Listener ProberListener
Logger logger.Logger
}
type Prober struct {
logger logger.Logger
params ProberParams
clusterId atomic.Uint32
@@ -150,32 +151,16 @@ type Prober struct {
activeCluster *Cluster
activeStateQueue []bool
activeStateQueueInProcess atomic.Bool
listenerMu sync.RWMutex
listener ProberListener
}
func NewProber(params ProberParams) *Prober {
p := &Prober{
logger: params.Logger,
params: params,
}
p.clusters.SetMinCapacity(2)
return p
}
func (p *Prober) SetProberListener(listener ProberListener) {
p.listenerMu.Lock()
p.listener = listener
p.listenerMu.Unlock()
}
func (p *Prober) getProberListener() ProberListener {
p.listenerMu.RLock()
defer p.listenerMu.RUnlock()
return p.listener
}
func (p *Prober) IsRunning() bool {
p.clustersMu.RLock()
defer p.clustersMu.RUnlock()
@@ -189,7 +174,7 @@ func (p *Prober) Reset() {
p.clustersMu.Lock()
if p.activeCluster != nil {
p.logger.Debugw("prober: resetting active cluster", "cluster", p.activeCluster.String())
p.params.Logger.Debugw("prober: resetting active cluster", "cluster", p.activeCluster.String())
reset = true
info = p.activeCluster.GetInfo()
}
@@ -201,8 +186,8 @@ func (p *Prober) Reset() {
p.clustersMu.Unlock()
if reset {
if pl := p.getProberListener(); pl != nil {
pl.OnProbeClusterDone(info)
if p.params.Listener != nil {
p.params.Listener.OnProbeClusterDone(info)
}
}
@@ -215,8 +200,16 @@ func (p *Prober) AddCluster(mode ProbeClusterMode, desiredRateBps int, expectedR
}
clusterId := ProbeClusterId(p.clusterId.Inc())
cluster := NewCluster(clusterId, mode, desiredRateBps, expectedRateBps, minDuration, maxDuration)
p.logger.Debugw("cluster added", "cluster", cluster.String())
cluster := newCluster(
clusterId,
mode,
desiredRateBps,
expectedRateBps,
minDuration,
maxDuration,
p.params.Listener,
)
p.params.Logger.Debugw("cluster added", "cluster", cluster.String())
p.pushBackClusterAndMaybeStart(cluster)
@@ -314,8 +307,8 @@ func (p *Prober) processActiveStateQueue() {
p.activeStateQueue = p.activeStateQueue[1:]
p.clustersMu.Unlock()
if pl := p.getProberListener(); pl != nil {
pl.OnActiveChanged(isActive)
if p.params.Listener != nil {
p.params.Listener.OnActiveChanged(isActive)
}
}
@@ -339,13 +332,13 @@ func (p *Prober) run() {
return
}
cluster.Process(p.getProberListener())
cluster.Process()
if cluster.IsFinished() {
p.logger.Debugw("cluster finished", "cluster", cluster.String())
p.params.Logger.Debugw("cluster finished", "cluster", cluster.String())
if pl := p.getProberListener(); pl != nil {
pl.OnProbeClusterDone(cluster.GetInfo())
if p.params.Listener != nil {
p.params.Listener.OnProbeClusterDone(cluster.GetInfo())
}
p.popFrontCluster(cluster)
@@ -368,9 +361,9 @@ type ProbeClusterId uint32
const (
ProbeClusterIdInvalid ProbeClusterId = 0
bucketDuration = 100 * time.Millisecond
bytesPerProbe = 1000
minProbeRateBps = 10000
cBucketDuration = 100 * time.Millisecond
cBytesPerProbe = 1000
cMinProbeRateBps = 10000
)
// -----------------------------------
@@ -412,6 +405,7 @@ type Cluster struct {
id ProbeClusterId
mode ProbeClusterMode
listener ProberListener
desiredBytes int
minDuration time.Duration
maxDuration time.Duration
@@ -424,7 +418,15 @@ type Cluster struct {
startTime time.Time
}
func NewCluster(id ProbeClusterId, mode ProbeClusterMode, desiredRateBps int, expectedRateBps int, minDuration time.Duration, maxDuration time.Duration) *Cluster {
func newCluster(
id ProbeClusterId,
mode ProbeClusterMode,
desiredRateBps int,
expectedRateBps int,
minDuration time.Duration,
maxDuration time.Duration,
listener ProberListener,
) *Cluster {
c := &Cluster{
id: id,
mode: mode,
@@ -439,7 +441,7 @@ func NewCluster(id ProbeClusterId, mode ProbeClusterMode, desiredRateBps int, ex
func (c *Cluster) initBuckets(desiredRateBps int, expectedRateBps int, minDuration time.Duration) {
// split into granular buckets
// NOTE: splitting even if mode is unitform
numBuckets := int((minDuration.Milliseconds() + bucketDuration.Milliseconds() - 1) / bucketDuration.Milliseconds())
numBuckets := int((minDuration.Milliseconds() + cBucketDuration.Milliseconds() - 1) / cBucketDuration.Milliseconds())
if numBuckets < 1 {
numBuckets = 1
}
@@ -458,17 +460,17 @@ func (c *Cluster) initBuckets(desiredRateBps int, expectedRateBps int, minDurati
}
bucketProbeRateBps := baseProbeRateBps * multiplier
if bucketProbeRateBps < minProbeRateBps {
bucketProbeRateBps = minProbeRateBps
if bucketProbeRateBps < cMinProbeRateBps {
bucketProbeRateBps = cMinProbeRateBps
}
bucketProbeRateBytesPerSec := (bucketProbeRateBps + 7) / 8
// pace based on bytes per probe
numProbesPerSec := (bucketProbeRateBytesPerSec + bytesPerProbe - 1) / bytesPerProbe
numProbesPerSec := (bucketProbeRateBytesPerSec + cBytesPerProbe - 1) / cBytesPerProbe
sleepDurationMicroSeconds := int(float64(1_000_000)/float64(numProbesPerSec) + 0.5)
runningDesiredBytes += (((bucketProbeRateBytesPerSec + expectedRateBytesPerSec) * int(bucketDuration.Milliseconds())) + 999) / 1000
runningDesiredElapsedTime += bucketDuration
runningDesiredBytes += (((bucketProbeRateBytesPerSec + expectedRateBytesPerSec) * int(cBucketDuration.Milliseconds())) + 999) / 1000
runningDesiredElapsedTime += cBucketDuration
c.buckets = append(c.buckets, clusterBucket{
desiredBytes: runningDesiredBytes,
@@ -538,7 +540,7 @@ func (c *Cluster) GetInfo() ProbeClusterInfo {
}
}
func (c *Cluster) Process(pl ProberListener) {
func (c *Cluster) Process() {
c.lock.RLock()
timeElapsed := time.Since(c.startTime)
@@ -568,8 +570,8 @@ func (c *Cluster) Process(pl ProberListener) {
}
c.lock.RUnlock()
if bytesShortFall > 0 && pl != nil {
pl.OnSendProbe(bytesShortFall)
if bytesShortFall > 0 && c.listener != nil {
c.listener.OnSendProbe(bytesShortFall)
}
// STREAM-ALLOCATOR-TODO look at adapting sleep time based on how many bytes and how much time is left

View File

@@ -19,7 +19,7 @@ import (
"io"
"time"
"github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
"github.com/pion/rtp"
@@ -28,13 +28,13 @@ import (
type Base struct {
logger logger.Logger
sendSideBWE *sendsidebwe.SendSideBWE
bwe bwe.BWE
}
func NewBase(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) *Base {
func NewBase(logger logger.Logger, bwe bwe.BWE) *Base {
return &Base{
logger: logger,
sendSideBWE: sendSideBWE,
logger: logger,
bwe: bwe,
}
}
@@ -84,9 +84,9 @@ func (b *Base) patchRTPHeaderExtensions(p *Packet) error {
}
}
if p.TransportWideExtID != 0 && b.sendSideBWE != nil {
twccSN := b.sendSideBWE.RecordPacketSendAndGetSequenceNumber(
sendingAt,
if p.TransportWideExtID != 0 && b.bwe != nil {
twccSN := b.bwe.RecordPacketSendAndGetSequenceNumber(
sendingAt.UnixMicro(),
p.Header.MarshalSize()+len(p.Payload),
p.IsRTX,
)

View File

@@ -20,7 +20,7 @@ import (
"github.com/frostbyte73/core"
"github.com/gammazero/deque"
"github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/protocol/logger"
)
@@ -40,9 +40,9 @@ type LeakyBucket struct {
stop core.Fuse
}
func NewLeakyBucket(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE, interval time.Duration, bitrate int) *LeakyBucket {
func NewLeakyBucket(logger logger.Logger, bwe bwe.BWE, interval time.Duration, bitrate int) *LeakyBucket {
l := &LeakyBucket{
Base: NewBase(logger, sendSideBWE),
Base: NewBase(logger, bwe),
logger: logger,
interval: interval,
bitrate: bitrate,

View File

@@ -19,7 +19,7 @@ import (
"github.com/frostbyte73/core"
"github.com/gammazero/deque"
"github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/protocol/logger"
)
@@ -34,9 +34,9 @@ type NoQueue struct {
stop core.Fuse
}
func NewNoQueue(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) *NoQueue {
func NewNoQueue(logger logger.Logger, bwe bwe.BWE) *NoQueue {
n := &NoQueue{
Base: NewBase(logger, sendSideBWE),
Base: NewBase(logger, bwe),
logger: logger,
wake: make(chan struct{}, 1),
}

View File

@@ -15,7 +15,7 @@
package pacer
import (
"github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/protocol/logger"
)
@@ -23,9 +23,9 @@ type PassThrough struct {
*Base
}
func NewPassThrough(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) *PassThrough {
func NewPassThrough(logger logger.Logger, bwe bwe.BWE) *PassThrough {
return &PassThrough{
Base: NewBase(logger, sendSideBWE),
Base: NewBase(logger, bwe),
}
}

View File

@@ -120,15 +120,13 @@ func (p *ProbeController) Reset() {
func (p *ProbeController) ProbeClusterDone(info ccutils.ProbeClusterInfo) {
p.lock.Lock()
defer p.lock.Unlock()
if p.probeClusterId != info.Id {
p.params.Logger.Debugw("not expected probe cluster", "probeClusterId", p.probeClusterId, "resetProbeClusterId", info.Id)
p.lock.Unlock()
return
} else {
p.doneProbeClusterInfo = info
}
p.doneProbeClusterInfo = info
p.lock.Unlock()
}
func (p *ProbeController) MaybeFinalizeProbe(

View File

@@ -210,9 +210,6 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b
params: params,
enabled: enabled,
allowPause: allowPause,
prober: ccutils.NewProber(ccutils.ProberParams{
Logger: params.Logger,
}),
// STREAM-ALLOCATOR-DATA rateMonitor: NewRateMonitor(),
videoTracks: make(map[livekit.TrackID]*Track),
eventsQueue: utils.NewTypedOpsQueue[Event](utils.OpsQueueParams{
@@ -222,6 +219,11 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b
}),
}
s.prober = ccutils.NewProber(ccutils.ProberParams{
Listener: s,
Logger: params.Logger,
})
s.probeController = NewProbeController(ProbeControllerParams{
Config: s.params.Config.ProbeController,
Prober: s.prober,
@@ -230,8 +232,6 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b
s.resetState()
s.prober.SetProberListener(s)
return s
}