Files
livekit/pkg/sfu/prober.go

455 lines
13 KiB
Go

//
// Design of Prober
//
// Probing is used to check for existence of excess channel capacity.
// This is especially useful in the downstream direction of SFU.
// SFU forwards audio/video streams from one or more publishers to
// all the subscribers. But, the downstream channel of a subscriber
// may not be big enough to carry all the streams. It is also a time
// varying quantity.
//
// When there is not enough capacity, some streams will be paused.
// To resume a stream, SFU would need to know that the channel has
// enough capacity. That's where probing comes in. When conditions
// are favorable, SFU can send probe packets so that the bandwidth
// estimator has more data to estimate available channel capacity
// better.
// NOTE: What defines `favorable conditions` is implementation dependent.
//
// There are two options for probing
// - Use padding only RTP packets: This one is preferable as
// probe rate can be controlled more tightly.
// - Resume a paused stream or forward a higher spatial layer:
// Have to find a stream at probing rate. Also, a stream could
// get a key frame unexpectedly boosting rate in the probing
// window.
//
// The strategy used depends on stream allocator implementation.
// This module can be used if the stream allocator decides to use
// padding only RTP packets for probing purposes.
//
// Implementation:
// There are a couple of options
// - Check prober in the forwarding path (pull from prober).
// This is preferred for scalability reasons. But, this
// suffers from not being able to probe when all streams
// are paused (could be due to downstream bandwidth
// constraints or the corresponding upstream tracks may
// have paused due to upstream bandwidth constraints).
// Another issue is not being to have tight control on
// probing window boundary as the packet forwarding path
// may not have a packet to forward. But, it should not
// be a major concern as long as some stream(s) is/are
// forwarded as there should be a packet at least every
// 60 ms or so (forwarding only one stream at 15 fps).
// Usually, it will be serviced much more frequently when
// there are multiple streams getting forwarded.
// - Run it a go routine. But, that would have to wake up
// very often to prevent bunching up of probe
// packets. So, a scalability concern as there is one prober
// per subscriber peer connection. But, probe windows
// should be very short (of the order of 100s of ms).
// So, this approach might be fine.
//
// The implementation here follows the second approach of using a
// go routine.
//
// Pacing:
// ------
// Ideally, the subscriber peer connection should have a pacer which
// trickles data out at the estimated channel capacity rate (and
// estimated channel capacity + probing rate when actively probing).
//
// But, there a few significant challenges
// 1. Pacer will require buffering of forwarded packets. That means
// more memory, more CPU (have to make copy of packets) and
// more latency in the media stream.
// 2. Scalability concern as SFU may be handling hundreds of
// subscriber peer connections and each one processing the pacing
// loop at 5ms interval will add up.
//
// So, this module assumes that pacing is inherently provided by the
// publishers for media streams. That is a reasonable assumption given
// that publishing clients will run their own pacer and pacing data out
// at a steady rate.
//
// A further assumption is that if there are multiple publishers for
// a subscriber peer connection, all the publishers are not pacing
// in sync, i.e. each publisher's pacer is completely independent
// and SFU will be receiving the media packets with a good spread and
// not clumped together.
//
// Given those assumptions, this module monitors media send rate and
// adjusts probing packet sends accordingly. Although the probing may
// have a high enough wake up frequency, it is for short windows.
// For example, probing at 5 Mbps for 1/2 second and sending 1000 byte
// probe per iteration will wake up every 1.6 ms. That is very high,
// but should last for 1/2 second or so.
// 5 Mbps over 1/2 second = 2.5 Mbps
// 2.5 Mbps = 312500 bytes = 313 probes at 1000 byte probes
// 313 probes over 1/2 second = 1.6 ms between probes
//
// A few things to note
// 1. When a probe cluster is added, the expected media rate is provided.
// So, the wake-up interval takes that into account. For example,
// if probing at 5 Mbps for 1/2 second and if 4 Mbps of it is expected
// to be provided by media traffic, the wake-up interval becomes 8 ms.
// 2. The amount of probing should actually be capped at some value to
// avoid too much self-induced congestion. It maybe something like 500 kbps.
// That will increase the wake-up interval to 16 ms in the above example.
// 3. In practice, the probing interval may also be shorter. Typically,
// it can be run for 2 - 3 RTTs to get a good measurement. For
// the longest hauls, RTT could be 250 ms or so leading to the probing
// window being long(ish). But, RTT should be much shorter especially if
// the subscriber peer connection of the client is able to connect to
// the nearest data center.
//
package sfu
import (
"fmt"
"sync"
"time"
"github.com/gammazero/deque"
"go.uber.org/atomic"
"github.com/livekit/protocol/logger"
)
type ProberParams struct {
Logger logger.Logger
}
type Prober struct {
logger logger.Logger
clusterId atomic.Uint32
clustersMu sync.RWMutex
clusters deque.Deque
activeCluster *Cluster
onSendProbe func(bytesToSend int)
onProbeClusterDone func(info ProbeClusterInfo)
}
func NewProber(params ProberParams) *Prober {
p := &Prober{
logger: params.Logger,
}
p.clusters.SetMinCapacity(2)
return p
}
func (p *Prober) IsRunning() bool {
p.clustersMu.RLock()
defer p.clustersMu.RUnlock()
return p.clusters.Len() > 0
}
func (p *Prober) Reset() {
reset := false
var info ProbeClusterInfo
p.clustersMu.Lock()
if p.activeCluster != nil {
p.logger.Debugw("resetting active cluster", "cluster", p.activeCluster.String())
reset = true
info = p.activeCluster.GetInfo()
}
p.clusters.Clear()
p.activeCluster = nil
p.clustersMu.Unlock()
if p.onProbeClusterDone != nil && reset {
p.onProbeClusterDone(info)
}
}
func (p *Prober) OnSendProbe(f func(bytesToSend int)) {
p.onSendProbe = f
}
func (p *Prober) OnProbeClusterDone(f func(info ProbeClusterInfo)) {
p.onProbeClusterDone = f
}
func (p *Prober) AddCluster(desiredRateBps int, expectedRateBps int, minDuration time.Duration, maxDuration time.Duration) ProbeClusterId {
if desiredRateBps <= 0 {
return ProbeClusterIdInvalid
}
clusterId := ProbeClusterId(p.clusterId.Inc())
cluster := NewCluster(clusterId, desiredRateBps, expectedRateBps, minDuration, maxDuration)
p.logger.Debugw("cluster added", "cluster", cluster.String())
p.pushBackClusterAndMaybeStart(cluster)
return clusterId
}
func (p *Prober) PacketSent(size int) {
cluster := p.getFrontCluster()
if cluster == nil {
return
}
cluster.PacketSent(size)
}
func (p *Prober) ProbeSent(size int) {
cluster := p.getFrontCluster()
if cluster == nil {
return
}
cluster.ProbeSent(size)
}
func (p *Prober) getFrontCluster() *Cluster {
p.clustersMu.Lock()
defer p.clustersMu.Unlock()
if p.activeCluster != nil {
return p.activeCluster
}
if p.clusters.Len() == 0 {
p.activeCluster = nil
} else {
p.activeCluster = p.clusters.Front().(*Cluster)
p.activeCluster.Start()
}
return p.activeCluster
}
func (p *Prober) popFrontCluster(cluster *Cluster) {
p.clustersMu.Lock()
defer p.clustersMu.Unlock()
if p.clusters.Len() == 0 {
p.activeCluster = nil
return
}
if p.clusters.Front().(*Cluster) == cluster {
p.clusters.PopFront()
}
if cluster == p.activeCluster {
p.activeCluster = nil
}
}
func (p *Prober) pushBackClusterAndMaybeStart(cluster *Cluster) {
p.clustersMu.Lock()
defer p.clustersMu.Unlock()
p.clusters.PushBack(cluster)
if p.clusters.Len() == 1 {
go p.run()
}
}
func (p *Prober) run() {
for {
// determine how long to sleep
cluster := p.getFrontCluster()
if cluster == nil {
return
}
time.Sleep(cluster.GetSleepDuration())
// wake up and check for probes to send
cluster = p.getFrontCluster()
if cluster == nil {
return
}
cluster.Process(p.onSendProbe)
if cluster.IsFinished() {
p.logger.Debugw("cluster finished", "cluster", cluster.String())
if p.onProbeClusterDone != nil {
p.onProbeClusterDone(cluster.GetInfo())
}
p.popFrontCluster(cluster)
continue
}
}
}
// ---------------------------------
type ProbeClusterId uint32
const (
ProbeClusterIdInvalid ProbeClusterId = 0
)
type ProbeClusterInfo struct {
Id ProbeClusterId
BytesSent int
Duration time.Duration
}
type Cluster struct {
// LK-TODO-START
// Check if we can operate at cluster level without a lock.
// The quantities that are updated in a different thread are
// bytesSentNonProbe - maybe make this an atomic value
// Lock contention time should be very minimal though.
// LK-TODO-END
lock sync.RWMutex
id ProbeClusterId
desiredBytes int
minDuration time.Duration
maxDuration time.Duration
sleepDuration time.Duration
bytesSentProbe int
bytesSentNonProbe int
startTime time.Time
}
func NewCluster(id ProbeClusterId, desiredRateBps int, expectedRateBps int, minDuration time.Duration, maxDuration time.Duration) *Cluster {
minDurationMs := minDuration.Milliseconds()
desiredBytes := int((int64(desiredRateBps)*minDurationMs/time.Second.Milliseconds() + 7) / 8)
expectedBytes := int((int64(expectedRateBps)*minDurationMs/time.Second.Milliseconds() + 7) / 8)
// pace based on sending approximately 1000 bytes per probe
numProbes := (desiredBytes - expectedBytes + 999) / 1000
sleepDurationMicroSeconds := int(float64(minDurationMs*1000)/float64(numProbes) + 0.5)
c := &Cluster{
id: id,
desiredBytes: desiredBytes,
minDuration: minDuration,
maxDuration: maxDuration,
sleepDuration: time.Duration(sleepDurationMicroSeconds) * time.Microsecond,
}
return c
}
func (c *Cluster) Start() {
c.lock.Lock()
defer c.lock.Unlock()
if c.startTime.IsZero() {
c.startTime = time.Now()
}
}
func (c *Cluster) GetSleepDuration() time.Duration {
c.lock.RLock()
defer c.lock.RUnlock()
return c.sleepDuration
}
func (c *Cluster) PacketSent(size int) {
c.lock.Lock()
defer c.lock.Unlock()
c.bytesSentNonProbe += size
}
func (c *Cluster) ProbeSent(size int) {
c.lock.Lock()
defer c.lock.Unlock()
c.bytesSentProbe += size
}
func (c *Cluster) IsFinished() bool {
c.lock.RLock()
defer c.lock.RUnlock()
// if already past deadline, end the cluster
timeElapsed := time.Since(c.startTime)
if timeElapsed > c.maxDuration {
return true
}
// do not end cluster until minDuration elapses even if rate is achieved.
// Ensures that the next cluster (if any) does not start early.
if (c.bytesSentProbe+c.bytesSentNonProbe) >= c.desiredBytes && timeElapsed >= c.minDuration {
return true
}
return false
}
func (c *Cluster) GetInfo() ProbeClusterInfo {
c.lock.RLock()
defer c.lock.RUnlock()
return ProbeClusterInfo{
Id: c.id,
BytesSent: c.bytesSentProbe + c.bytesSentNonProbe,
Duration: time.Since(c.startTime),
}
}
func (c *Cluster) Process(onSendProbe func(bytesToSend int)) {
c.lock.RLock()
timeElapsed := time.Since(c.startTime)
// Calculate number of probe bytes that should have been sent since start.
// Overall goal is to send desired number of probe bytes in minDuration.
// However, it is possible that timeElapsed is more than minDuration due
// to scheduling variance. When overshooting time budget, use a capped
// short fall if there is a grace period given.
windowDone := float64(timeElapsed) / float64(c.minDuration)
if windowDone > 1.0 {
// cluster has been running for longer than minDuration
windowDone = 1.0
}
bytesShouldHaveBeenSent := int(windowDone * float64(c.desiredBytes))
bytesShortFall := bytesShouldHaveBeenSent - c.bytesSentProbe - c.bytesSentNonProbe
if bytesShortFall < 0 {
bytesShortFall = 0
}
// cap short fall to limit to 8 packets in an iteration
// 275 bytes per packet (255 max RTP padding payload + 20 bytes RTP header)
if bytesShortFall > (275 * 8) {
bytesShortFall = 275 * 8
}
// round up to packet size
bytesShortFall = ((bytesShortFall + 274) / 275) * 275
c.lock.RUnlock()
if bytesShortFall > 0 && onSendProbe != nil {
onSendProbe(bytesShortFall)
}
// LK-TODO look at adapting sleep time based on how many bytes and how much time is left
}
func (c *Cluster) String() string {
activeTimeMs := int64(0)
if !c.startTime.IsZero() {
activeTimeMs = time.Since(c.startTime).Milliseconds()
}
return fmt.Sprintf("id: %d, bytes: desired %d / probe %d / non-probe %d / remaining: %d, time(ms): active %d / min %d / max %d",
c.id,
c.desiredBytes,
c.bytesSentProbe,
c.bytesSentNonProbe,
c.desiredBytes-c.bytesSentProbe-c.bytesSentNonProbe,
activeTimeMs,
c.minDuration.Milliseconds(),
c.maxDuration.Milliseconds())
}