Stream allocator fixes (#212)

* Stream allocator fixes

- Treat simple track like simulcast track with one layer to make
it stream allocator friendly.

* Address David's comments
This commit is contained in:
Raja Subramanian
2021-11-27 09:22:39 +05:30
committed by GitHub
parent dc60e27413
commit 092789a08f
8 changed files with 268 additions and 214 deletions

View File

@@ -707,6 +707,7 @@ func (p *ParticipantImpl) GetConnectionQuality() livekit.ConnectionQuality {
if subTrack.IsMuted() {
continue
}
// LK-TODO: maybe this should check CurrentSpatialLayer as target may not have been achieved
if subTrack.DownTrack().TargetSpatialLayer() < subTrack.DownTrack().MaxSpatialLayer() {
reducedQualitySub = true
}

View File

@@ -1,7 +1,8 @@
// Code generated by Wire. DO NOT EDIT.
//go:generate go run github.com/google/wire/cmd/wire
//+build !wireinject
//go:build !wireinject
// +build !wireinject
package service

View File

@@ -22,7 +22,7 @@ import (
// TrackSender defines a interface send media to remote peer
type TrackSender interface {
UptrackLayersChange(availableLayers []uint16, layerAdded bool) (int32, error)
UptrackLayersChange(availableLayers []uint16, layerAdded bool)
WriteRTP(p *buffer.ExtPacket, layer int32) error
Close()
// ID is the globally unique identifier for this Track.
@@ -43,6 +43,9 @@ const (
RTPPaddingMaxPayloadSize = 255
RTPPaddingEstimatedHeaderSize = 20
RTPBlankFramesMax = 6
InvalidSpatialLayer = -1
InvalidTemporalLayer = -1
)
type SequenceNumberOrdering int
@@ -103,7 +106,9 @@ type DownTrack struct {
currentSpatialLayer atomicInt32
targetSpatialLayer atomicInt32
temporalLayer atomicInt32
currentTemporalLayer atomicInt32
targetTemporalLayer atomicInt32
enabled atomicBool
reSync atomicBool
@@ -133,16 +138,13 @@ type DownTrack struct {
lossFraction atomicUint8
// Debug info
lastPli atomicInt64
lastRTP atomicInt64
pktsMuted atomicUint32
pktsDropped atomicUint32
pktsBandwidthConstrainedDropped atomicUint32
lastPli atomicInt64
lastRTP atomicInt64
pktsMuted atomicUint32
pktsDropped atomicUint32
maxPacketTs uint32
bandwidthConstrainedMuted atomicBool
// RTCP callbacks
onRTCP func([]rtcp.Packet)
onREMB func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate)
@@ -182,10 +184,18 @@ func NewDownTrack(c webrtc.RTPCodecCapability, r TrackReceiver, bf *buffer.Facto
d.maxSpatialLayer.set(2)
d.maxTemporalLayer.set(2)
} else {
d.maxSpatialLayer.set(-1)
d.maxTemporalLayer.set(-1)
d.maxSpatialLayer.set(InvalidSpatialLayer)
d.maxTemporalLayer.set(InvalidTemporalLayer)
}
// start off with nothing, let streamallocator set things
d.currentSpatialLayer.set(InvalidSpatialLayer)
d.targetSpatialLayer.set(InvalidSpatialLayer)
// start off with nothing, let streamallocator set things
d.currentTemporalLayer.set(InvalidTemporalLayer)
d.targetTemporalLayer.set(InvalidTemporalLayer)
return d, nil
}
@@ -209,7 +219,6 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
d.mime = strings.ToLower(codec.MimeType)
d.reSync.set(true)
d.enabled.set(true)
d.bandwidthConstrainedMute(false)
if rr := d.bufferFactory.GetOrNew(packetio.RTCPBufferPacket, uint32(t.SSRC())).(*buffer.RTCPReader); rr != nil {
rr.OnPacket(func(pkt []byte) {
d.handleRTCP(pkt)
@@ -340,11 +349,6 @@ func (d *DownTrack) WriteRTP(p *buffer.ExtPacket, layer int32) error {
return nil
}
if d.bandwidthConstrainedMuted.get() {
d.pktsBandwidthConstrainedDropped.add(1)
return nil
}
switch d.trackType {
case SimpleDownTrack:
return d.writeSimpleRTP(p)
@@ -388,7 +392,12 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int {
size = RTPPaddingMaxPayloadSize + RTPPaddingEstimatedHeaderSize
}
sn, ts, err := d.munger.UpdateAndGetPaddingSnTs(false)
// padding is used for probing. Padding packets should be
// at frame boundaries only to ensure decoder sequencer does
// not get out-of-sync. But, when a stream is paused,
// force a frame marker as a restart of the stream will
// start with a key frame which will reset the decoder.
sn, ts, err := d.munger.UpdateAndGetPaddingSnTs(d.TargetSpatialLayer() == InvalidSpatialLayer)
if err != nil {
return bytesSent
}
@@ -513,7 +522,7 @@ func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) error {
d.maxSpatialLayer.set(spatialLayer)
if d.onSubscribedLayersChanged != nil {
if d.enabled.get() && d.onSubscribedLayersChanged != nil {
d.onSubscribedLayersChanged(d, spatialLayer, d.MaxTemporalLayer())
}
@@ -540,82 +549,27 @@ func (d *DownTrack) MaxTemporalLayer() int32 {
return d.maxTemporalLayer.get()
}
// switchSpatialLayer switches the current layer
func (d *DownTrack) switchSpatialLayer(targetLayer int32) error {
if d.trackType != SimulcastDownTrack {
return ErrSpatialNotSupported
}
// already set
if d.CurrentSpatialLayer() == targetLayer {
return nil
}
// switchSpatialLayer switches the target layer
func (d *DownTrack) switchSpatialLayer(targetLayer int32) {
d.targetSpatialLayer.set(targetLayer)
return nil
}
func (d *DownTrack) UptrackLayersChange(availableLayers []uint16, layerAdded bool) (int32, error) {
if d.trackType == SimulcastDownTrack {
currentLayer := uint16(d.CurrentSpatialLayer())
maxLayer := uint16(d.maxSpatialLayer.get())
var maxFound uint16 = 0
layerFound := false
var minFound uint16 = 0
for _, target := range availableLayers {
if target <= maxLayer {
if target > maxFound {
maxFound = target
layerFound = true
}
} else {
if minFound > target {
minFound = target
}
}
}
var targetLayer uint16
if layerFound {
targetLayer = maxFound
} else {
targetLayer = minFound
}
if currentLayer != targetLayer {
// LK-TODO-START
// This layer switch should be removed when StreamAllocator is used.
// Available layers change should be signalled to StreamAllocator
// and StreamAllocator will take care of adjusting allocations.
// LK-TODO-END
if err := d.switchSpatialLayer(int32(targetLayer)); err != nil {
return int32(targetLayer), err
}
}
if d.onAvailableLayersChanged != nil {
d.onAvailableLayersChanged(d, layerAdded)
}
return int32(targetLayer), nil
func (d *DownTrack) UptrackLayersChange(availableLayers []uint16, layerAdded bool) {
if d.onAvailableLayersChanged != nil {
d.onAvailableLayersChanged(d, layerAdded)
}
return -1, fmt.Errorf("downtrack %s does not support simulcast", d.id)
}
func (d *DownTrack) switchTemporalLayer(targetLayer int32) {
if d.trackType != SimulcastDownTrack {
return
}
d.targetTemporalLayer.set(targetLayer)
}
layer := d.temporalLayer.get()
currentLayer := uint16(layer)
currentTargetLayer := uint16(layer >> 16)
func (d *DownTrack) disableSend() {
d.currentSpatialLayer.set(InvalidSpatialLayer)
d.targetSpatialLayer.set(InvalidSpatialLayer)
// Don't switch until previous switch is done or canceled
if currentLayer != currentTargetLayer {
return
}
d.temporalLayer.set((targetLayer << 16) | int32(currentLayer))
d.currentTemporalLayer.set(InvalidTemporalLayer)
d.targetTemporalLayer.set(InvalidTemporalLayer)
}
// OnCloseHandler method to be called on remote tracked removed
@@ -683,10 +637,9 @@ func (d *DownTrack) AdjustAllocation(availableChannelCapacity uint64) (isPausing
optimalBandwidthNeeded = brs[i][j]
}
if brs[i][j] < availableChannelCapacity {
isResuming = d.bandwidthConstrainedMuted.get()
isResuming = d.TargetSpatialLayer() == InvalidSpatialLayer
bandwidthRequested = brs[i][j]
d.bandwidthConstrainedMute(false) // just in case it was muted
d.switchSpatialLayer(int32(i))
d.switchTemporalLayer(int32(j))
@@ -695,9 +648,13 @@ func (d *DownTrack) AdjustAllocation(availableChannelCapacity uint64) (isPausing
}
}
// no layer fits in the available channel capacity, disable the track
isPausing = !d.bandwidthConstrainedMuted.get()
d.bandwidthConstrainedMute(true)
if optimalBandwidthNeeded != 0 {
// no layer fits in the available channel capacity, disable the track
isPausing = d.TargetSpatialLayer() != InvalidSpatialLayer
d.disableSend()
d.reSync.set(true) // re-sync required on next layer switch
}
return
}
@@ -718,13 +675,14 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) {
currentSpatialLayer := d.CurrentSpatialLayer()
targetSpatialLayer := d.TargetSpatialLayer()
temporalLayer := d.temporalLayer.get()
currentTemporalLayer := temporalLayer & 0x0f
targetTemporalLayer := temporalLayer >> 16
currentTemporalLayer := d.currentTemporalLayer.get()
targetTemporalLayer := d.targetTemporalLayer.get()
// if targets are still pending, don't increase
if targetSpatialLayer != currentSpatialLayer || targetTemporalLayer != currentTemporalLayer {
return false, 0, 0
if targetSpatialLayer != InvalidSpatialLayer {
if targetSpatialLayer != currentSpatialLayer || targetTemporalLayer != currentTemporalLayer {
return false, 0, 0
}
}
// move to the next available layer
@@ -746,7 +704,7 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) {
}
}
if d.bandwidthConstrainedMuted.get() {
if d.TargetSpatialLayer() == InvalidSpatialLayer {
// try the lowest spatial and temporal layer if available
// LK-TODO-START
// note that this will never be zero because we do not track
@@ -757,7 +715,6 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) {
return false, 0, 0
}
d.bandwidthConstrainedMute(false)
d.switchSpatialLayer(int32(0))
d.switchTemporalLayer(int32(0))
return true, brs[0][0], optimalBandwidthNeeded
@@ -772,7 +729,7 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) {
}
// try moving spatial layer up if already at max temporal layer of current spatial layer
// LK-TODO currentTemporalLayer may be outside available range because of inital value being out of range, fix it
// LK-TODO currentSpatialLayer may be outside available range because of inital value being out of range, fix it
nextSpatialLayer := currentSpatialLayer + 1
if nextSpatialLayer <= d.maxSpatialLayer.get() && brs[nextSpatialLayer][0] > 0 {
d.switchSpatialLayer(nextSpatialLayer)
@@ -809,7 +766,12 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport {
return nil
}
srRTP, srNTP := d.receiver.GetSenderReportTime(d.CurrentSpatialLayer())
currentSpatialLayer := d.CurrentSpatialLayer()
if currentSpatialLayer == InvalidSpatialLayer {
return nil
}
srRTP, srNTP := d.receiver.GetSenderReportTime(currentSpatialLayer)
if srRTP == 0 {
return nil
}
@@ -836,25 +798,23 @@ func (d *DownTrack) UpdateStats(packetLen uint32) {
d.packetCount.add(1)
}
// bandwidthConstrainedMute enables or disables media forwarding dictated by channel bandwidth constraints
func (d *DownTrack) bandwidthConstrainedMute(val bool) {
if d.bandwidthConstrainedMuted.get() == val {
return
}
d.bandwidthConstrainedMuted.set(val)
if val {
d.reSync.set(val)
}
}
func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error {
if d.reSync.get() {
if d.Kind() == webrtc.RTPCodecTypeVideo {
if d.TargetSpatialLayer() == InvalidSpatialLayer {
d.pktsDropped.add(1)
return nil
}
if !extPkt.KeyFrame {
d.lastPli.set(time.Now().UnixNano())
d.receiver.SendPLI(0)
d.pktsDropped.add(1)
return nil
} else {
// although one spatial layer, this is done so it
// works proper with stream allocator.
d.currentSpatialLayer.set(d.TargetSpatialLayer())
}
}
if d.packetCount.get() > 0 {
@@ -898,7 +858,7 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error {
// that, the sequence numbers should be updated to ensure that subsequent packet
// translations works fine and produce proper translated sequence numbers.
// LK-TODO-END
translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.temporalLayer.get()>>16)
translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.targetTemporalLayer.get())
if err != nil {
if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss {
if err == ErrFilteredVP8TemporalLayer {
@@ -959,6 +919,11 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error {
func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) error {
tsl := d.TargetSpatialLayer()
if tsl == InvalidSpatialLayer {
d.pktsDropped.add(1)
return nil
}
csl := d.CurrentSpatialLayer()
if tsl == layer && csl != tsl {
if extPkt.KeyFrame {
@@ -1084,7 +1049,7 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err
// that, the sequence numbers should be updated to ensure that subsequent packet
// translations works fine and produce proper translated sequence numbers.
// LK-TODO-END
translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.temporalLayer.get()>>16)
translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.targetTemporalLayer.get())
if err != nil {
if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss {
if err == ErrFilteredVP8TemporalLayer {
@@ -1282,7 +1247,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
}
sendPliOnce := func() {
if pliOnce {
if pliOnce && d.TargetSpatialLayer() != InvalidSpatialLayer {
d.lastPli.set(time.Now().UnixNano())
d.receiver.SendPLI(d.TargetSpatialLayer())
pliOnce = false
@@ -1379,13 +1344,12 @@ func (d *DownTrack) getSRStats() (octets, packets uint32) {
func (d *DownTrack) translateVP8Packet(pkt *rtp.Packet, incomingVP8 *buffer.VP8, translatedVP8 *buffer.VP8, adjustTemporal bool) (buf []byte, err error) {
if adjustTemporal {
temporalLayer := d.temporalLayer.get()
currentLayer := uint16(temporalLayer)
currentTargetLayer := uint16(temporalLayer >> 16)
currentTemporalLayer := d.currentTemporalLayer.get()
targetTemporalLayer := d.targetTemporalLayer.get()
// catch up temporal layer if necessary
if currentTargetLayer != currentLayer {
if incomingVP8.TIDPresent == 1 && incomingVP8.TID <= uint8(currentTargetLayer) {
d.temporalLayer.set(int32(currentTargetLayer)<<16 | int32(currentTargetLayer))
if currentTemporalLayer != targetTemporalLayer {
if incomingVP8.TIDPresent == 1 && incomingVP8.TID <= uint8(targetTemporalLayer) {
d.currentTemporalLayer.set(targetTemporalLayer)
}
}
}

View File

@@ -372,9 +372,9 @@ func (c *Cluster) Process(p *Prober) bool {
}
func (c *Cluster) String() string {
activeTimeMs := time.Duration(0)
activeTimeMs := int64(0)
if !c.startTime.IsZero() {
activeTimeMs = time.Since(c.startTime) * time.Millisecond
activeTimeMs = time.Since(c.startTime).Milliseconds()
}
return fmt.Sprintf("bytes: desired %d / probe %d / non-probe %d / remaining: %d, time(ms): active %d / min %d / max %d",
@@ -383,6 +383,6 @@ func (c *Cluster) String() string {
c.bytesSentNonProbe,
c.desiredBytes-c.bytesSentProbe-c.bytesSentNonProbe,
activeTimeMs,
c.minDuration*time.Millisecond,
c.maxDuration*time.Millisecond)
c.minDuration.Milliseconds(),
c.maxDuration.Milliseconds())
}

View File

@@ -204,16 +204,23 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff
w.buffers[layer] = buff
w.bufferMu.Unlock()
if w.isSimulcast && w.useTrackers {
tracker := NewStreamTracker()
if w.Kind() == webrtc.RTPCodecTypeVideo && w.useTrackers {
samplesRequired := uint32(5)
cyclesRequired := uint64(60) // 30s of continuous stream
if layer == 0 {
// be very forgiving for base layer
samplesRequired = 1
cyclesRequired = 4 // 2s of continuous stream
}
tracker := NewStreamTracker(samplesRequired, cyclesRequired, 500*time.Millisecond)
w.trackers[layer] = tracker
tracker.OnStatusChanged = func(status StreamStatus) {
tracker.OnStatusChanged(func(status StreamStatus) {
if status == StreamStatusStopped {
w.removeAvailableLayer(uint16(layer))
} else {
w.addAvailableLayer(uint16(layer))
}
}
})
tracker.Start()
}
go w.forwardRTP(layer)
@@ -223,9 +230,6 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff
// this will reflect the "muted" status and will pause streamtracker to ensure we don't turn off
// the layer
func (w *WebRTCReceiver) SetUpTrackPaused(paused bool) {
if !w.isSimulcast {
return
}
w.upTrackMu.Lock()
defer w.upTrackMu.Unlock()
for _, tracker := range w.trackers {
@@ -247,18 +251,15 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) {
return
}
if w.isSimulcast {
track.SetTrackType(true)
track.SetTrackType(w.isSimulcast)
if w.Kind() == webrtc.RTPCodecTypeVideo {
// notify added downtrack of available layers
w.upTrackMu.RLock()
layers, ok := w.availableLayers.Load().([]uint16)
w.upTrackMu.RUnlock()
if ok && len(layers) != 0 {
_, _ = track.UptrackLayersChange(layers, true)
track.UptrackLayersChange(layers, true)
}
} else {
track.SetTrackType(false)
}
w.storeDownTrack(track)
@@ -292,7 +293,7 @@ func (w *WebRTCReceiver) downtrackLayerChange(layers []uint16, layerAdded bool)
defer w.downTrackMu.RUnlock()
for _, dt := range w.downTracks {
if dt != nil {
_, _ = dt.UptrackLayersChange(layers, layerAdded)
dt.UptrackLayersChange(layers, layerAdded)
}
}
}

View File

@@ -123,8 +123,8 @@ const (
GratuitousProbeHeadroomBps = 1 * 1000 * 1000 // if headroom is more than 1 Mbps, don't probe
GratuitousProbePct = 10
GratuitousProbeMaxBps = 300 * 1000 // 300 kbps
GratuitousProbeMinDurationMs = 500
GratuitousProbeMaxDurationMs = 600
GratuitousProbeMinDurationMs = 500 * time.Millisecond
GratuitousProbeMaxDurationMs = 600 * time.Millisecond
AudioLossWeight = 0.75
VideoLossWeight = 0.25
@@ -418,7 +418,7 @@ func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstima
s.prevReceivedEstimate = s.receivedEstimate
s.receivedEstimate = uint64(remb.Bitrate)
if s.prevReceivedEstimate != s.receivedEstimate {
s.logger.Debugw("received new estimate", "pariticpant", s.participantID, "old(bps)", s.prevReceivedEstimate, "new(bps)", s.receivedEstimate)
s.logger.Debugw("received new estimate", "participant", s.participantID, "old(bps)", s.prevReceivedEstimate, "new(bps)", s.receivedEstimate)
}
signal := s.maybeCommitEstimate()
s.estimateMu.Unlock()
@@ -617,21 +617,52 @@ func (s *StreamAllocator) runStateMachine(event Event) {
}
}
// LK-TODO-START
// Signal_ADD_TRACK is not useful. Probably can get rid of it.
// AVAILABLE_LAYERS_ADD/REMOVE should be how track start should
// be getting an allocation.
// LK-TODO-END
// LK-TODO: ADD_TRACK should not reallocate if added track is audio
func (s *StreamAllocator) runStatePreCommit(event Event) {
switch event.Signal {
case Signal_ADD_TRACK:
// wait for layers add/remove signal
s.allocate()
case Signal_REMOVE_TRACK:
s.allocate()
case Signal_ESTIMATE_INCREASE:
s.allocate()
// should never happen as the intialized capacity is very high
s.setState(State_STABLE)
case Signal_ESTIMATE_DECREASE:
s.allocate()
// first estimate could be off as things are ramping up.
//
// Basically, an initial channel capacity of InitialChannelCapacity
// which is very high is used so that there is no throttling before
// getting an estimate. The first estimate happens 6 - 8 seconds
// after streaming starts. With screen share like stream, the traffic
// is sparse/spikey depending on the content. So, the estimate could
// be small and still ramping up. So, doing an allocation could result
// in stream being paused.
//
// This is one of the significant challenges here, i. e. time alignment.
// Estimate was potentially measured using data from a little while back.
// But, that estimate is used to allocate based on current bitrate.
// So, in the intervening period if there was movement on the screen,
// the bitrate could have spiked and the REMB value is probably stale.
//
// A few options to consider
// o what is implemented here (i. e. change to STABLE state and
// let further streaming drive the esimation and move the state
// machine in whichever direction it needs to go)
// o Forward padding packets also. But, that could have an adverse
// effect on the channel when a new stream comes on line and
// starts probing, i. e. it could affect existing flows if it
// congests the channel because of the spurt of padding packets.
// o Do probing downstream in the first few seconds in addition to
// forwarding any streams and get a better estimate of the channel.
// o Measure up stream bitrates over a much longer window to smooth
// out spikes and get a more steady-state rate and use it in
// allocations.
//
// A good challenge. Basically, the goal should be to aviod re-allocation
// of streams. Any re-allocation means potential for estimate and current
// state of up streams not being in sync because of time differences resulting
// in streams getting paused unnecessarily.
s.setState(State_STABLE)
case Signal_RECEIVER_REPORT:
case Signal_AVAILABLE_LAYERS_ADD:
s.allocate()
@@ -648,7 +679,7 @@ func (s *StreamAllocator) runStatePreCommit(event Event) {
func (s *StreamAllocator) runStateStable(event Event) {
switch event.Signal {
case Signal_ADD_TRACK:
// wait for layers add/remove signal
s.allocate()
case Signal_REMOVE_TRACK:
// LK-TODO - may want to re-calculate channel usage?
case Signal_ESTIMATE_INCREASE:
@@ -666,7 +697,9 @@ func (s *StreamAllocator) runStateStable(event Event) {
s.allocate()
case Signal_PERIODIC_PING:
// if bandwidth estimate has been stable for a while, maybe gratuitously probe
s.maybeGratuitousProbe()
if s.maybeGratuitousProbe() {
s.setState(State_GRATUITOUS_PROBING)
}
}
}
@@ -686,16 +719,13 @@ func (s *StreamAllocator) runStateStable(event Event) {
func (s *StreamAllocator) runStateDeficient(event Event) {
switch event.Signal {
case Signal_ADD_TRACK:
// wait for layers add/remove signal
s.allocate()
case Signal_REMOVE_TRACK:
s.allocate()
case Signal_ESTIMATE_INCREASE:
// as long as estimate is increasing, keep going.
// Switch to STABLE state if estimate exceeds optimal bandwidth needed.
if s.getChannelCapacity() > s.getOptimalBandwidthUsage() {
s.resetBoost()
s.setState(State_STABLE)
}
// try an allocation to check if state can move to STABLE
s.allocate()
case Signal_ESTIMATE_DECREASE:
// stop using the boosted estimate
s.resetBoost()
@@ -723,7 +753,7 @@ func (s *StreamAllocator) runStateGratuitousProbing(event Event) {
// to avoid any self-inflicted damaage
switch event.Signal {
case Signal_ADD_TRACK:
// wait for layers add/remove signal
s.allocate()
case Signal_REMOVE_TRACK:
// LK-TODO - may want to re-calculate channel usage?
case Signal_ESTIMATE_INCREASE:
@@ -748,8 +778,12 @@ func (s *StreamAllocator) runStateGratuitousProbing(event Event) {
s.allocate()
case Signal_PERIODIC_PING:
// try for more
if !s.prober.IsRunning() && s.maybeGratuitousProbe() {
s.logger.Infow("trying more gratuitous probing", "participant", s.participantID)
if !s.prober.IsRunning() {
if s.maybeGratuitousProbe() {
s.logger.Debugw("trying more gratuitous probing", "participant", s.participantID)
} else {
s.setState(State_STABLE)
}
}
}
}
@@ -798,11 +832,11 @@ func (s *StreamAllocator) maybeCommitEstimate() Signal {
return signal
}
func (s *StreamAllocator) getChannelCapacity() uint64 {
func (s *StreamAllocator) getChannelCapacity() (uint64, uint64) {
s.estimateMu.RLock()
defer s.estimateMu.RUnlock()
return s.committedChannelCapacity
return s.committedChannelCapacity, s.receivedEstimate
}
func (s *StreamAllocator) allocate() {
@@ -892,12 +926,13 @@ func (s *StreamAllocator) allocate() {
// So, track total requested bandwidth and mark DEFICIENT state if the total is
// above the estimated channel capacity even if the optimal signal is true.
//
// LK-TODO make protocol friendly structures
var pausedTracks map[string][]string
var resumedTracks map[string][]string
isOptimal := true
totalBandwidthRequested := uint64(0)
committedChannelCapacity := s.getChannelCapacity()
committedChannelCapacity, _ := s.getChannelCapacity()
availableChannelCapacity := committedChannelCapacity
if availableChannelCapacity < s.boostedChannelCapacity {
availableChannelCapacity = s.boostedChannelCapacity
@@ -1079,7 +1114,7 @@ func (s *StreamAllocator) maybeBoostLayer() {
func (s *StreamAllocator) maybeBoostBandwidth() {
// temporarily boost estimate for probing.
// Boost either the committed channel capacity or previous boost point if there is one
baseBps := s.getChannelCapacity()
baseBps, _ := s.getChannelCapacity()
if baseBps < s.boostedChannelCapacity {
baseBps = s.boostedChannelCapacity
}
@@ -1116,25 +1151,32 @@ func (s *StreamAllocator) resetBoost() {
}
func (s *StreamAllocator) maybeGratuitousProbe() bool {
// LK-TODO: do not probe if there are no video tracks
if time.Since(s.lastEstimateDecreaseTime) < GratuitousProbeWaitMs {
return false
}
committedChannelCapacity := s.getChannelCapacity()
// use last received estimate for gratuitous probing base as
// more updates may have been received since the last commit
_, receivedEstimate := s.getChannelCapacity()
expectedRateBps := s.getExpectedBandwidthUsage()
headroomBps := committedChannelCapacity - expectedRateBps
headroomBps := receivedEstimate - expectedRateBps
if headroomBps > GratuitousProbeHeadroomBps {
return false
}
probeRateBps := (committedChannelCapacity * GratuitousProbePct) / 100
probeRateBps := (receivedEstimate * GratuitousProbePct) / 100
if probeRateBps > GratuitousProbeMaxBps {
probeRateBps = GratuitousProbeMaxBps
}
s.prober.AddCluster(int(committedChannelCapacity+probeRateBps), int(expectedRateBps), GratuitousProbeMinDurationMs, GratuitousProbeMaxDurationMs)
s.prober.AddCluster(
int(receivedEstimate+probeRateBps),
int(expectedRateBps),
GratuitousProbeMinDurationMs,
GratuitousProbeMaxDurationMs,
)
s.setState(State_GRATUITOUS_PROBING)
return true
}

View File

@@ -1,6 +1,7 @@
package sfu
import (
"sync"
"sync/atomic"
"time"
)
@@ -27,16 +28,22 @@ const (
// It runs its own goroutine for detection, and fires OnStatusChanged callback
type StreamTracker struct {
// number of samples needed per cycle
SamplesRequired uint32
samplesRequired uint32
// number of cycles needed to be active
CyclesRequired uint64
CycleDuration time.Duration
OnStatusChanged func(StreamStatus)
initialized atomicBool
paused atomicBool
status atomicInt32 // stores StreamStatus
countSinceLast uint32 // number of packets received since last check
running chan struct{}
cyclesRequired uint64
cycleDuration time.Duration
onStatusChanged func(status StreamStatus)
paused atomicBool
countSinceLast uint32 // number of packets received since last check
running chan struct{}
initMu sync.Mutex
initialized bool
statusMu sync.RWMutex
status StreamStatus
// only access within detectWorker
cycleCount uint64
@@ -45,30 +52,58 @@ type StreamTracker struct {
lastSN uint16
}
func NewStreamTracker() *StreamTracker {
func NewStreamTracker(samplesRequired uint32, cyclesRequired uint64, cycleDuration time.Duration) *StreamTracker {
s := &StreamTracker{
SamplesRequired: 5,
CyclesRequired: 60, // 30s of continuous stream
CycleDuration: 500 * time.Millisecond,
samplesRequired: samplesRequired,
cyclesRequired: cyclesRequired,
cycleDuration: cycleDuration,
status: StreamStatusStopped,
}
s.status.set(int32(StreamStatusStopped))
return s
}
func (s *StreamTracker) Status() StreamStatus {
return StreamStatus(s.status.get())
func (s *StreamTracker) OnStatusChanged(f func(status StreamStatus)) {
s.onStatusChanged = f
}
func (s *StreamTracker) setStatus(status StreamStatus) {
if status != s.Status() {
s.status.set(int32(status))
if s.OnStatusChanged != nil {
s.OnStatusChanged(status)
}
func (s *StreamTracker) Status() StreamStatus {
s.statusMu.RLock()
defer s.statusMu.RUnlock()
return s.status
}
func (s *StreamTracker) maybeSetActive() {
changed := false
s.statusMu.Lock()
if s.status != StreamStatusActive {
s.status = StreamStatusActive
changed = true
}
s.statusMu.Unlock()
if changed && s.onStatusChanged != nil {
s.onStatusChanged(StreamStatusActive)
}
}
func (s *StreamTracker) Start() {
func (s *StreamTracker) maybeSetStopped() {
changed := false
s.statusMu.Lock()
if s.status != StreamStatusStopped {
s.status = StreamStatusStopped
changed = true
}
s.statusMu.Unlock()
if changed && s.onStatusChanged != nil {
s.onStatusChanged(StreamStatusStopped)
}
}
func (s *StreamTracker) init() {
s.maybeSetActive()
if s.isRunning() {
return
}
@@ -76,6 +111,9 @@ func (s *StreamTracker) Start() {
go s.detectWorker()
}
func (s *StreamTracker) Start() {
}
func (s *StreamTracker) Stop() {
if s.running != nil {
close(s.running)
@@ -105,11 +143,19 @@ func (s *StreamTracker) Observe(sn uint16) {
return
}
if !s.initialized.get() {
s.initialized.set(true)
s.initMu.Lock()
if !s.initialized {
// first packet
go s.setStatus(StreamStatusActive)
s.lastSN = sn
s.initialized = true
s.initMu.Unlock()
// declare stream active and start the detect worker
go s.init()
return
}
s.initMu.Unlock()
// ignore out-of-order SNs
if (sn - s.lastSN) > uint16(1<<15) {
@@ -120,7 +166,7 @@ func (s *StreamTracker) Observe(sn uint16) {
}
func (s *StreamTracker) detectWorker() {
ticker := time.NewTicker(s.CycleDuration)
ticker := time.NewTicker(s.cycleDuration)
for s.isRunning() {
<-ticker.C
@@ -133,22 +179,22 @@ func (s *StreamTracker) detectWorker() {
}
func (s *StreamTracker) detectChanges() {
if s.paused.get() || !s.initialized.get() {
if s.paused.get() {
return
}
if atomic.LoadUint32(&s.countSinceLast) >= s.SamplesRequired {
if atomic.LoadUint32(&s.countSinceLast) >= s.samplesRequired {
s.cycleCount += 1
} else {
s.cycleCount = 0
}
if s.cycleCount == 0 && s.Status() == StreamStatusActive {
if s.cycleCount == 0 {
// flip to stopped
s.setStatus(StreamStatusStopped)
} else if s.cycleCount >= s.CyclesRequired && s.Status() == StreamStatusStopped {
s.maybeSetStopped()
} else if s.cycleCount >= s.cyclesRequired {
// flip to active
s.setStatus(StreamStatusActive)
s.maybeSetActive()
}
atomic.StoreUint32(&s.countSinceLast, 0)

View File

@@ -2,6 +2,7 @@ package sfu
import (
"testing"
"time"
"github.com/livekit/livekit-server/pkg/testutils"
@@ -11,10 +12,10 @@ import (
func TestStreamTracker(t *testing.T) {
t.Run("flips to active on first observe", func(t *testing.T) {
callbackCalled := atomicBool(0)
tracker := NewStreamTracker()
tracker.OnStatusChanged = func(status StreamStatus) {
tracker := NewStreamTracker(5, 60, 500*time.Millisecond)
tracker.OnStatusChanged(func(status StreamStatus) {
callbackCalled.set(true)
}
})
require.Equal(t, StreamStatusStopped, tracker.Status())
// observe first packet
@@ -29,7 +30,7 @@ func TestStreamTracker(t *testing.T) {
})
t.Run("flips to inactive immediately", func(t *testing.T) {
tracker := NewStreamTracker()
tracker := NewStreamTracker(5, 60, 500*time.Millisecond)
require.Equal(t, StreamStatusStopped, tracker.Status())
tracker.Observe(1)
@@ -38,9 +39,9 @@ func TestStreamTracker(t *testing.T) {
})
callbackCalled := atomicBool(0)
tracker.OnStatusChanged = func(status StreamStatus) {
tracker.OnStatusChanged(func(status StreamStatus) {
callbackCalled.set(true)
}
})
require.Equal(t, StreamStatusActive, tracker.Status())
// run a single interation
@@ -50,7 +51,7 @@ func TestStreamTracker(t *testing.T) {
})
t.Run("flips back to active after iterations", func(t *testing.T) {
tracker := NewStreamTracker()
tracker := NewStreamTracker(1, 2, 500*time.Millisecond)
require.Equal(t, StreamStatusStopped, tracker.Status())
tracker.Observe(1)
@@ -58,9 +59,7 @@ func TestStreamTracker(t *testing.T) {
return tracker.Status() == StreamStatusActive
})
tracker.CyclesRequired = 2
tracker.SamplesRequired = 1
tracker.setStatus(StreamStatusStopped)
tracker.maybeSetStopped()
tracker.Observe(2)
tracker.detectChanges()
@@ -72,7 +71,7 @@ func TestStreamTracker(t *testing.T) {
})
t.Run("does not change to inactive when paused", func(t *testing.T) {
tracker := NewStreamTracker()
tracker := NewStreamTracker(5, 60, 500*time.Millisecond)
tracker.Observe(1)
testutils.WithTimeout(t, "first packet makes stream active", func() bool {
return tracker.Status() == StreamStatusActive