Files
livekit/pkg/sfu/ccutils/prober.go
2024-12-20 07:16:14 +05:30

597 lines
17 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Design of Prober
//
// Probing is used to check for existence of excess channel capacity.
// This is especially useful in the downstream direction of SFU.
// SFU forwards audio/video streams from one or more publishers to
// all the subscribers. But, the downstream channel of a subscriber
// may not be big enough to carry all the streams. It is also a time
// varying quantity.
//
// When there is not enough capacity, some streams will be paused.
// To resume a stream, SFU would need to know that the channel has
// enough capacity. That's where probing comes in. When conditions
// are favorable, SFU can send probe packets so that the bandwidth
// estimator has more data to estimate available channel capacity
// better.
// NOTE: What defines `favorable conditions` is implementation dependent.
//
// There are two options for probing
// - Use padding only RTP packets: This one is preferable as
// probe rate can be controlled more tightly.
// - Resume a paused stream or forward a higher spatial layer:
// Have to find a stream at probing rate. Also, a stream could
// get a key frame unexpectedly boosting rate in the probing
// window.
//
// The strategy used depends on stream allocator implementation.
// This module can be used if the stream allocator decides to use
// padding only RTP packets for probing purposes.
//
// Implementation:
// There are a couple of options
// - Check prober in the forwarding path (pull from prober).
// This is preferred for scalability reasons. But, this
// suffers from not being able to probe when all streams
// are paused (could be due to downstream bandwidth
// constraints or the corresponding upstream tracks may
// have paused due to upstream bandwidth constraints).
// Another issue is not being able to have tight control on
// probing window boundary as the packet forwarding path
// may not have a packet to forward. But, it should not
// be a major concern as long as some stream(s) is/are
// forwarded as there should be a packet at least every
// 60 ms or so (forwarding only one stream at 15 fps).
// Usually, it will be serviced much more frequently when
// there are multiple streams getting forwarded.
// - Run it a go routine. But, that would have to wake up
// very often to prevent bunching up of probe
// packets. So, a scalability concern as there is one prober
// per subscriber peer connection. But, probe windows
// should be very short (of the order of 100s of ms).
// So, this approach might be fine.
//
// The implementation here follows the second approach of using a
// go routine.
//
// Pacing:
// ------
// Ideally, the subscriber peer connection should have a pacer which
// trickles data out at the estimated channel capacity rate (and
// estimated channel capacity + probing rate when actively probing).
//
// But, there a few significant challenges
// 1. Pacer will require buffering of forwarded packets. That means
// more memory, more CPU (have to make copy of packets) and
// more latency in the media stream.
// 2. Scalability concern as SFU may be handling hundreds of
// subscriber peer connections and each one processing the pacing
// loop at 5ms interval will add up.
//
// So, this module assumes that pacing is inherently provided by the
// publishers for media streams. That is a reasonable assumption given
// that publishing clients will run their own pacer and pacing data out
// at a steady rate.
//
// A further assumption is that if there are multiple publishers for
// a subscriber peer connection, all the publishers are not pacing
// in sync, i.e. each publisher's pacer is completely independent
// and SFU will be receiving the media packets with a good spread and
// not clumped together.
//
// Given those assumptions, this module monitors media send rate and
// adjusts probing packet sends accordingly. Although the probing may
// have a high enough wake up frequency, it is for short windows.
// For example, probing at 5 Mbps for 1/2 second and sending 1000 byte
// probe per iteration will wake up every 1.6 ms. That is very high,
// but should last for 1/2 second or so.
//
// 5 Mbps over 1/2 second = 2.5 Mbps
// 2.5 Mbps = 312500 bytes = 313 probes at 1000 byte probes
// 313 probes over 1/2 second = 1.6 ms between probes
//
// A few things to note
// 1. When a probe cluster is added, the expected media rate is provided.
// So, the wake-up interval takes that into account. For example,
// if probing at 5 Mbps for 1/2 second and if 4 Mbps of it is expected
// to be provided by media traffic, the wake-up interval becomes 8 ms.
// 2. The amount of probing should actually be capped at some value to
// avoid too much self-induced congestion. It maybe something like 500 kbps.
// That will increase the wake-up interval to 16 ms in the above example.
// 3. In practice, the probing interval may also be shorter. Typically,
// it can be run for 2 - 3 RTTs to get a good measurement. For
// the longest hauls, RTT could be 250 ms or so leading to the probing
// window being long(ish). But, RTT should be much shorter especially if
// the subscriber peer connection of the client is able to connect to
// the nearest data center.
package ccutils
import (
"fmt"
"math"
"sync"
"time"
"github.com/gammazero/deque"
"go.uber.org/atomic"
"go.uber.org/zap/zapcore"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
)
type ProberListener interface {
OnProbeClusterSwitch(info ProbeClusterInfo)
OnSendProbe(bytesToSend int)
}
type ProberParams struct {
Listener ProberListener
Logger logger.Logger
}
type Prober struct {
params ProberParams
clusterId atomic.Uint32
clustersMu sync.RWMutex
clusters deque.Deque[*Cluster]
activeCluster *Cluster
}
func NewProber(params ProberParams) *Prober {
p := &Prober{
params: params,
}
p.clusters.SetBaseCap(2)
return p
}
func (p *Prober) IsRunning() bool {
p.clustersMu.RLock()
defer p.clustersMu.RUnlock()
return p.clusters.Len() > 0
}
func (p *Prober) Reset(info ProbeClusterInfo) {
p.clustersMu.Lock()
defer p.clustersMu.Unlock()
if p.activeCluster != nil && p.activeCluster.Id() == info.Id {
p.activeCluster.MarkCompleted(info.Result)
p.params.Logger.Debugw("prober: resetting active cluster", "cluster", p.activeCluster)
}
p.clusters.Clear()
p.activeCluster = nil
}
func (p *Prober) AddCluster(mode ProbeClusterMode, pcg ProbeClusterGoal) ProbeClusterInfo {
if pcg.DesiredBps <= 0 {
return ProbeClusterInfoInvalid
}
clusterId := ProbeClusterId(p.clusterId.Inc())
cluster := newCluster(clusterId, mode, pcg, p.params.Listener)
p.params.Logger.Debugw("cluster added", "cluster", cluster)
p.pushBackClusterAndMaybeStart(cluster)
return cluster.Info()
}
func (p *Prober) ProbesSent(bytesSent int) {
cluster := p.getFrontCluster()
if cluster == nil {
return
}
cluster.ProbesSent(bytesSent)
}
func (p *Prober) ClusterDone(info ProbeClusterInfo) {
cluster := p.getFrontCluster()
if cluster == nil {
return
}
if cluster.Id() == info.Id {
cluster.MarkCompleted(info.Result)
p.params.Logger.Debugw("cluster done", "cluster", cluster)
p.popFrontCluster(cluster)
}
}
func (p *Prober) GetActiveClusterId() ProbeClusterId {
p.clustersMu.RLock()
defer p.clustersMu.RUnlock()
if p.activeCluster != nil {
return p.activeCluster.Id()
}
return ProbeClusterIdInvalid
}
func (p *Prober) getFrontCluster() *Cluster {
p.clustersMu.Lock()
defer p.clustersMu.Unlock()
if p.activeCluster != nil {
return p.activeCluster
}
if p.clusters.Len() == 0 {
p.activeCluster = nil
} else {
p.activeCluster = p.clusters.Front()
p.activeCluster.Start()
}
return p.activeCluster
}
func (p *Prober) popFrontCluster(cluster *Cluster) {
p.clustersMu.Lock()
if p.clusters.Len() == 0 {
p.activeCluster = nil
p.clustersMu.Unlock()
return
}
if p.clusters.Front() == cluster {
p.clusters.PopFront()
}
if cluster == p.activeCluster {
p.activeCluster = nil
}
p.clustersMu.Unlock()
}
func (p *Prober) pushBackClusterAndMaybeStart(cluster *Cluster) {
p.clustersMu.Lock()
p.clusters.PushBack(cluster)
if p.clusters.Len() == 1 {
go p.run()
}
p.clustersMu.Unlock()
}
func (p *Prober) run() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
cluster := p.getFrontCluster()
if cluster == nil {
return
}
sleepDuration := cluster.Process()
if sleepDuration == 0 {
p.popFrontCluster(cluster)
continue
}
ticker.Reset(sleepDuration)
<-ticker.C
}
}
// ---------------------------------
type ProbeClusterId uint32
const (
ProbeClusterIdInvalid ProbeClusterId = 0
// padding only packets are 255 bytes max + 20 byte header = 4 packets per probe,
// when not using padding only packets, this is a min and actual sent could be higher
cBytesPerProbe = 1100
cSleepDuration = 20 * time.Millisecond
cSleepDurationMin = 10 * time.Millisecond
)
// -----------------------------------
type ProbeClusterMode int
const (
ProbeClusterModeUniform ProbeClusterMode = iota
ProbeClusterModeLinearChirp
)
func (p ProbeClusterMode) String() string {
switch p {
case ProbeClusterModeUniform:
return "UNIFORM"
case ProbeClusterModeLinearChirp:
return "LINEAR_CHIRP"
default:
return fmt.Sprintf("%d", int(p))
}
}
// ---------------------------------------------------------------------------
type ProbeClusterGoal struct {
AvailableBandwidthBps int
ExpectedUsageBps int
DesiredBps int
Duration time.Duration
DesiredBytes int
}
func (p ProbeClusterGoal) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddInt("AvailableBandwidthBps", p.AvailableBandwidthBps)
e.AddInt("ExpectedUsageBps", p.ExpectedUsageBps)
e.AddInt("DesiredBps", p.DesiredBps)
e.AddDuration("Duration", p.Duration)
e.AddInt("DesiredBytes", p.DesiredBytes)
return nil
}
type ProbeClusterResult struct {
StartTime int64
EndTime int64
PacketsProbe int
BytesProbe int
PacketsNonProbePrimary int
BytesNonProbePrimary int
PacketsNonProbeRTX int
BytesNonProbeRTX int
IsCompleted bool
}
func (p ProbeClusterResult) Bytes() int {
return p.BytesProbe + p.BytesNonProbePrimary + p.BytesNonProbeRTX
}
func (p ProbeClusterResult) Duration() time.Duration {
return time.Duration(p.EndTime - p.StartTime)
}
func (p ProbeClusterResult) Bitrate() float64 {
duration := p.Duration().Seconds()
if duration != 0 {
return float64(p.Bytes()*8) / duration
}
return 0
}
func (p ProbeClusterResult) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddTime("StartTime", time.Unix(0, p.StartTime))
e.AddTime("EndTime", time.Unix(0, p.EndTime))
e.AddDuration("Duration", p.Duration())
e.AddInt("PacketsProbe", p.PacketsProbe)
e.AddInt("BytesProbe", p.BytesProbe)
e.AddInt("PacketsNonProbePrimary", p.PacketsNonProbePrimary)
e.AddInt("BytesNonProbePrimary", p.BytesNonProbePrimary)
e.AddInt("PacketsNonProbeRTX", p.PacketsNonProbeRTX)
e.AddInt("BytesNonProbeRTX", p.BytesNonProbeRTX)
e.AddInt("Bytes", p.Bytes())
e.AddFloat64("Bitrate", p.Bitrate())
e.AddBool("IsCompleted", p.IsCompleted)
return nil
}
type ProbeClusterInfo struct {
Id ProbeClusterId
CreatedAt time.Time
Goal ProbeClusterGoal
Result ProbeClusterResult
}
var (
ProbeClusterInfoInvalid = ProbeClusterInfo{Id: ProbeClusterIdInvalid}
)
func (p ProbeClusterInfo) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddUint32("Id", uint32(p.Id))
e.AddTime("CreatedAt", p.CreatedAt)
e.AddObject("Goal", p.Goal)
e.AddObject("Result", p.Result)
return nil
}
// ---------------------------------------------------------------------------
type bucket struct {
expectedElapsedDuration time.Duration
expectedProbeBytesSent int
}
func (b bucket) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddDuration("expectedElapsedDuration", b.expectedElapsedDuration)
e.AddInt("expectedProbesBytesSent", b.expectedProbeBytesSent)
return nil
}
// ---------------------------------------------------------------------------
type Cluster struct {
lock sync.RWMutex
info ProbeClusterInfo
mode ProbeClusterMode
listener ProberListener
baseSleepDuration time.Duration
buckets []bucket
bucketIdx int
probeBytesSent int
startTime time.Time
isComplete bool
}
func newCluster(id ProbeClusterId, mode ProbeClusterMode, pcg ProbeClusterGoal, listener ProberListener) *Cluster {
c := &Cluster{
mode: mode,
info: ProbeClusterInfo{
Id: id,
CreatedAt: mono.Now(),
Goal: pcg,
},
listener: listener,
}
c.initProbes()
return c
}
func (c *Cluster) initProbes() {
c.info.Goal.DesiredBytes = int(math.Round(float64(c.info.Goal.DesiredBps)*c.info.Goal.Duration.Seconds()/8 + 0.5))
numBuckets := int(math.Round(c.info.Goal.Duration.Seconds()/cSleepDuration.Seconds() + 0.5))
if numBuckets < 1 {
numBuckets = 1
}
numIntervals := numBuckets
// for linear chirp, group intervals with decreasing duration, i.e. incraasing bitrate,
// by aiming to send same number of bytes in each interval, as intervals get shorter, the bitrate is higher
if c.mode == ProbeClusterModeLinearChirp {
sum := 0
i := 1
for {
sum += i
if sum >= numBuckets {
break
}
i++
}
numBuckets = i
numIntervals = sum
}
c.baseSleepDuration = c.info.Goal.Duration / time.Duration(numIntervals)
if c.baseSleepDuration < cSleepDurationMin {
c.baseSleepDuration = cSleepDurationMin
}
numIntervals = int(math.Round(c.info.Goal.Duration.Seconds()/c.baseSleepDuration.Seconds() + 0.5))
desiredProbeBytesPerInterval := int(math.Round(((c.info.Goal.Duration.Seconds()*float64(c.info.Goal.DesiredBps-c.info.Goal.ExpectedUsageBps)/8)+float64(numIntervals)-1)/float64(numIntervals) + 0.5))
c.buckets = make([]bucket, numBuckets)
for i := 0; i < numBuckets; i++ {
switch c.mode {
case ProbeClusterModeUniform:
c.buckets[i] = bucket{
expectedElapsedDuration: c.baseSleepDuration,
}
case ProbeClusterModeLinearChirp:
c.buckets[i] = bucket{
expectedElapsedDuration: time.Duration(numBuckets-i) * c.baseSleepDuration,
}
}
if i > 0 {
c.buckets[i].expectedElapsedDuration += c.buckets[i-1].expectedElapsedDuration
}
c.buckets[i].expectedProbeBytesSent = (i + 1) * desiredProbeBytesPerInterval
}
}
func (c *Cluster) Start() {
if c.listener != nil {
c.listener.OnProbeClusterSwitch(c.info)
}
}
func (c *Cluster) Id() ProbeClusterId {
return c.info.Id
}
func (c *Cluster) Info() ProbeClusterInfo {
c.lock.RLock()
defer c.lock.RUnlock()
return c.info
}
func (c *Cluster) ProbesSent(bytesSent int) {
c.lock.Lock()
defer c.lock.Unlock()
c.probeBytesSent += bytesSent
}
func (c *Cluster) MarkCompleted(result ProbeClusterResult) {
c.lock.Lock()
defer c.lock.Unlock()
c.isComplete = true
c.info.Result = result
}
func (c *Cluster) Process() time.Duration {
c.lock.Lock()
if c.isComplete {
c.lock.Unlock()
return 0
}
bytesToSend := 0
if c.startTime.IsZero() {
c.startTime = mono.Now()
bytesToSend = cBytesPerProbe
} else {
sinceStart := time.Since(c.startTime)
if sinceStart > c.buckets[c.bucketIdx].expectedElapsedDuration {
c.bucketIdx++
overflow := false
if c.bucketIdx >= len(c.buckets) {
// when overflowing, repeat the last bucket
c.bucketIdx = len(c.buckets) - 1
overflow = true
}
if c.buckets[c.bucketIdx].expectedProbeBytesSent > c.probeBytesSent || overflow {
bytesToSend = max(cBytesPerProbe, c.buckets[c.bucketIdx].expectedProbeBytesSent-c.probeBytesSent)
}
}
}
c.lock.Unlock()
if bytesToSend != 0 && c.listener != nil {
c.listener.OnSendProbe(bytesToSend)
}
return cSleepDurationMin
}
func (c *Cluster) MarshalLogObject(e zapcore.ObjectEncoder) error {
if c != nil {
e.AddString("mode", c.mode.String())
e.AddObject("info", c.info)
e.AddDuration("baseSleepDuration", c.baseSleepDuration)
e.AddInt("numBuckets", len(c.buckets))
e.AddInt("bucketIdx", c.bucketIdx)
e.AddInt("probeBytesSent", c.probeBytesSent)
e.AddTime("startTime", c.startTime)
e.AddDuration("elapsed", time.Since(c.startTime))
e.AddBool("isComplete", c.isComplete)
}
return nil
}
// ----------------------------------------------------------------------