diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index d28ddaa5d..69fa1b8cc 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -707,6 +707,7 @@ func (p *ParticipantImpl) GetConnectionQuality() livekit.ConnectionQuality { if subTrack.IsMuted() { continue } + // LK-TODO: maybe this should check CurrentSpatialLayer as target may not have been achieved if subTrack.DownTrack().TargetSpatialLayer() < subTrack.DownTrack().MaxSpatialLayer() { reducedQualitySub = true } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index fafb4ac46..29265178c 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,7 +1,8 @@ // Code generated by Wire. DO NOT EDIT. //go:generate go run github.com/google/wire/cmd/wire -//+build !wireinject +//go:build !wireinject +// +build !wireinject package service diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 2c136d5c3..23aff14e1 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -22,7 +22,7 @@ import ( // TrackSender defines a interface send media to remote peer type TrackSender interface { - UptrackLayersChange(availableLayers []uint16, layerAdded bool) (int32, error) + UptrackLayersChange(availableLayers []uint16, layerAdded bool) WriteRTP(p *buffer.ExtPacket, layer int32) error Close() // ID is the globally unique identifier for this Track. @@ -43,6 +43,9 @@ const ( RTPPaddingMaxPayloadSize = 255 RTPPaddingEstimatedHeaderSize = 20 RTPBlankFramesMax = 6 + + InvalidSpatialLayer = -1 + InvalidTemporalLayer = -1 ) type SequenceNumberOrdering int @@ -103,7 +106,9 @@ type DownTrack struct { currentSpatialLayer atomicInt32 targetSpatialLayer atomicInt32 - temporalLayer atomicInt32 + + currentTemporalLayer atomicInt32 + targetTemporalLayer atomicInt32 enabled atomicBool reSync atomicBool @@ -133,16 +138,13 @@ type DownTrack struct { lossFraction atomicUint8 // Debug info - lastPli atomicInt64 - lastRTP atomicInt64 - pktsMuted atomicUint32 - pktsDropped atomicUint32 - pktsBandwidthConstrainedDropped atomicUint32 + lastPli atomicInt64 + lastRTP atomicInt64 + pktsMuted atomicUint32 + pktsDropped atomicUint32 maxPacketTs uint32 - bandwidthConstrainedMuted atomicBool - // RTCP callbacks onRTCP func([]rtcp.Packet) onREMB func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) @@ -182,10 +184,18 @@ func NewDownTrack(c webrtc.RTPCodecCapability, r TrackReceiver, bf *buffer.Facto d.maxSpatialLayer.set(2) d.maxTemporalLayer.set(2) } else { - d.maxSpatialLayer.set(-1) - d.maxTemporalLayer.set(-1) + d.maxSpatialLayer.set(InvalidSpatialLayer) + d.maxTemporalLayer.set(InvalidTemporalLayer) } + // start off with nothing, let streamallocator set things + d.currentSpatialLayer.set(InvalidSpatialLayer) + d.targetSpatialLayer.set(InvalidSpatialLayer) + + // start off with nothing, let streamallocator set things + d.currentTemporalLayer.set(InvalidTemporalLayer) + d.targetTemporalLayer.set(InvalidTemporalLayer) + return d, nil } @@ -209,7 +219,6 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.mime = strings.ToLower(codec.MimeType) d.reSync.set(true) d.enabled.set(true) - d.bandwidthConstrainedMute(false) if rr := d.bufferFactory.GetOrNew(packetio.RTCPBufferPacket, uint32(t.SSRC())).(*buffer.RTCPReader); rr != nil { rr.OnPacket(func(pkt []byte) { d.handleRTCP(pkt) @@ -340,11 +349,6 @@ func (d *DownTrack) WriteRTP(p *buffer.ExtPacket, layer int32) error { return nil } - if d.bandwidthConstrainedMuted.get() { - d.pktsBandwidthConstrainedDropped.add(1) - return nil - } - switch d.trackType { case SimpleDownTrack: return d.writeSimpleRTP(p) @@ -388,7 +392,12 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { size = RTPPaddingMaxPayloadSize + RTPPaddingEstimatedHeaderSize } - sn, ts, err := d.munger.UpdateAndGetPaddingSnTs(false) + // padding is used for probing. Padding packets should be + // at frame boundaries only to ensure decoder sequencer does + // not get out-of-sync. But, when a stream is paused, + // force a frame marker as a restart of the stream will + // start with a key frame which will reset the decoder. + sn, ts, err := d.munger.UpdateAndGetPaddingSnTs(d.TargetSpatialLayer() == InvalidSpatialLayer) if err != nil { return bytesSent } @@ -513,7 +522,7 @@ func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) error { d.maxSpatialLayer.set(spatialLayer) - if d.onSubscribedLayersChanged != nil { + if d.enabled.get() && d.onSubscribedLayersChanged != nil { d.onSubscribedLayersChanged(d, spatialLayer, d.MaxTemporalLayer()) } @@ -540,82 +549,27 @@ func (d *DownTrack) MaxTemporalLayer() int32 { return d.maxTemporalLayer.get() } -// switchSpatialLayer switches the current layer -func (d *DownTrack) switchSpatialLayer(targetLayer int32) error { - if d.trackType != SimulcastDownTrack { - return ErrSpatialNotSupported - } - - // already set - if d.CurrentSpatialLayer() == targetLayer { - return nil - } - +// switchSpatialLayer switches the target layer +func (d *DownTrack) switchSpatialLayer(targetLayer int32) { d.targetSpatialLayer.set(targetLayer) - return nil } -func (d *DownTrack) UptrackLayersChange(availableLayers []uint16, layerAdded bool) (int32, error) { - if d.trackType == SimulcastDownTrack { - currentLayer := uint16(d.CurrentSpatialLayer()) - maxLayer := uint16(d.maxSpatialLayer.get()) - - var maxFound uint16 = 0 - layerFound := false - var minFound uint16 = 0 - for _, target := range availableLayers { - if target <= maxLayer { - if target > maxFound { - maxFound = target - layerFound = true - } - } else { - if minFound > target { - minFound = target - } - } - } - var targetLayer uint16 - if layerFound { - targetLayer = maxFound - } else { - targetLayer = minFound - } - if currentLayer != targetLayer { - // LK-TODO-START - // This layer switch should be removed when StreamAllocator is used. - // Available layers change should be signalled to StreamAllocator - // and StreamAllocator will take care of adjusting allocations. - // LK-TODO-END - if err := d.switchSpatialLayer(int32(targetLayer)); err != nil { - return int32(targetLayer), err - } - } - - if d.onAvailableLayersChanged != nil { - d.onAvailableLayersChanged(d, layerAdded) - } - - return int32(targetLayer), nil +func (d *DownTrack) UptrackLayersChange(availableLayers []uint16, layerAdded bool) { + if d.onAvailableLayersChanged != nil { + d.onAvailableLayersChanged(d, layerAdded) } - return -1, fmt.Errorf("downtrack %s does not support simulcast", d.id) } func (d *DownTrack) switchTemporalLayer(targetLayer int32) { - if d.trackType != SimulcastDownTrack { - return - } + d.targetTemporalLayer.set(targetLayer) +} - layer := d.temporalLayer.get() - currentLayer := uint16(layer) - currentTargetLayer := uint16(layer >> 16) +func (d *DownTrack) disableSend() { + d.currentSpatialLayer.set(InvalidSpatialLayer) + d.targetSpatialLayer.set(InvalidSpatialLayer) - // Don't switch until previous switch is done or canceled - if currentLayer != currentTargetLayer { - return - } - - d.temporalLayer.set((targetLayer << 16) | int32(currentLayer)) + d.currentTemporalLayer.set(InvalidTemporalLayer) + d.targetTemporalLayer.set(InvalidTemporalLayer) } // OnCloseHandler method to be called on remote tracked removed @@ -683,10 +637,9 @@ func (d *DownTrack) AdjustAllocation(availableChannelCapacity uint64) (isPausing optimalBandwidthNeeded = brs[i][j] } if brs[i][j] < availableChannelCapacity { - isResuming = d.bandwidthConstrainedMuted.get() + isResuming = d.TargetSpatialLayer() == InvalidSpatialLayer bandwidthRequested = brs[i][j] - d.bandwidthConstrainedMute(false) // just in case it was muted d.switchSpatialLayer(int32(i)) d.switchTemporalLayer(int32(j)) @@ -695,9 +648,13 @@ func (d *DownTrack) AdjustAllocation(availableChannelCapacity uint64) (isPausing } } - // no layer fits in the available channel capacity, disable the track - isPausing = !d.bandwidthConstrainedMuted.get() - d.bandwidthConstrainedMute(true) + if optimalBandwidthNeeded != 0 { + // no layer fits in the available channel capacity, disable the track + isPausing = d.TargetSpatialLayer() != InvalidSpatialLayer + d.disableSend() + + d.reSync.set(true) // re-sync required on next layer switch + } return } @@ -718,13 +675,14 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) { currentSpatialLayer := d.CurrentSpatialLayer() targetSpatialLayer := d.TargetSpatialLayer() - temporalLayer := d.temporalLayer.get() - currentTemporalLayer := temporalLayer & 0x0f - targetTemporalLayer := temporalLayer >> 16 + currentTemporalLayer := d.currentTemporalLayer.get() + targetTemporalLayer := d.targetTemporalLayer.get() // if targets are still pending, don't increase - if targetSpatialLayer != currentSpatialLayer || targetTemporalLayer != currentTemporalLayer { - return false, 0, 0 + if targetSpatialLayer != InvalidSpatialLayer { + if targetSpatialLayer != currentSpatialLayer || targetTemporalLayer != currentTemporalLayer { + return false, 0, 0 + } } // move to the next available layer @@ -746,7 +704,7 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) { } } - if d.bandwidthConstrainedMuted.get() { + if d.TargetSpatialLayer() == InvalidSpatialLayer { // try the lowest spatial and temporal layer if available // LK-TODO-START // note that this will never be zero because we do not track @@ -757,7 +715,6 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) { return false, 0, 0 } - d.bandwidthConstrainedMute(false) d.switchSpatialLayer(int32(0)) d.switchTemporalLayer(int32(0)) return true, brs[0][0], optimalBandwidthNeeded @@ -772,7 +729,7 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) { } // try moving spatial layer up if already at max temporal layer of current spatial layer - // LK-TODO currentTemporalLayer may be outside available range because of inital value being out of range, fix it + // LK-TODO currentSpatialLayer may be outside available range because of inital value being out of range, fix it nextSpatialLayer := currentSpatialLayer + 1 if nextSpatialLayer <= d.maxSpatialLayer.get() && brs[nextSpatialLayer][0] > 0 { d.switchSpatialLayer(nextSpatialLayer) @@ -809,7 +766,12 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { return nil } - srRTP, srNTP := d.receiver.GetSenderReportTime(d.CurrentSpatialLayer()) + currentSpatialLayer := d.CurrentSpatialLayer() + if currentSpatialLayer == InvalidSpatialLayer { + return nil + } + + srRTP, srNTP := d.receiver.GetSenderReportTime(currentSpatialLayer) if srRTP == 0 { return nil } @@ -836,25 +798,23 @@ func (d *DownTrack) UpdateStats(packetLen uint32) { d.packetCount.add(1) } -// bandwidthConstrainedMute enables or disables media forwarding dictated by channel bandwidth constraints -func (d *DownTrack) bandwidthConstrainedMute(val bool) { - if d.bandwidthConstrainedMuted.get() == val { - return - } - d.bandwidthConstrainedMuted.set(val) - if val { - d.reSync.set(val) - } -} - func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { if d.reSync.get() { if d.Kind() == webrtc.RTPCodecTypeVideo { + if d.TargetSpatialLayer() == InvalidSpatialLayer { + d.pktsDropped.add(1) + return nil + } + if !extPkt.KeyFrame { d.lastPli.set(time.Now().UnixNano()) d.receiver.SendPLI(0) d.pktsDropped.add(1) return nil + } else { + // although one spatial layer, this is done so it + // works proper with stream allocator. + d.currentSpatialLayer.set(d.TargetSpatialLayer()) } } if d.packetCount.get() > 0 { @@ -898,7 +858,7 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { // that, the sequence numbers should be updated to ensure that subsequent packet // translations works fine and produce proper translated sequence numbers. // LK-TODO-END - translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.temporalLayer.get()>>16) + translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.targetTemporalLayer.get()) if err != nil { if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss { if err == ErrFilteredVP8TemporalLayer { @@ -959,6 +919,11 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) error { tsl := d.TargetSpatialLayer() + if tsl == InvalidSpatialLayer { + d.pktsDropped.add(1) + return nil + } + csl := d.CurrentSpatialLayer() if tsl == layer && csl != tsl { if extPkt.KeyFrame { @@ -1084,7 +1049,7 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err // that, the sequence numbers should be updated to ensure that subsequent packet // translations works fine and produce proper translated sequence numbers. // LK-TODO-END - translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.temporalLayer.get()>>16) + translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.targetTemporalLayer.get()) if err != nil { if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss { if err == ErrFilteredVP8TemporalLayer { @@ -1282,7 +1247,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) { } sendPliOnce := func() { - if pliOnce { + if pliOnce && d.TargetSpatialLayer() != InvalidSpatialLayer { d.lastPli.set(time.Now().UnixNano()) d.receiver.SendPLI(d.TargetSpatialLayer()) pliOnce = false @@ -1379,13 +1344,12 @@ func (d *DownTrack) getSRStats() (octets, packets uint32) { func (d *DownTrack) translateVP8Packet(pkt *rtp.Packet, incomingVP8 *buffer.VP8, translatedVP8 *buffer.VP8, adjustTemporal bool) (buf []byte, err error) { if adjustTemporal { - temporalLayer := d.temporalLayer.get() - currentLayer := uint16(temporalLayer) - currentTargetLayer := uint16(temporalLayer >> 16) + currentTemporalLayer := d.currentTemporalLayer.get() + targetTemporalLayer := d.targetTemporalLayer.get() // catch up temporal layer if necessary - if currentTargetLayer != currentLayer { - if incomingVP8.TIDPresent == 1 && incomingVP8.TID <= uint8(currentTargetLayer) { - d.temporalLayer.set(int32(currentTargetLayer)<<16 | int32(currentTargetLayer)) + if currentTemporalLayer != targetTemporalLayer { + if incomingVP8.TIDPresent == 1 && incomingVP8.TID <= uint8(targetTemporalLayer) { + d.currentTemporalLayer.set(targetTemporalLayer) } } } diff --git a/pkg/sfu/prober.go b/pkg/sfu/prober.go index aadb921f8..cb11c44b6 100644 --- a/pkg/sfu/prober.go +++ b/pkg/sfu/prober.go @@ -372,9 +372,9 @@ func (c *Cluster) Process(p *Prober) bool { } func (c *Cluster) String() string { - activeTimeMs := time.Duration(0) + activeTimeMs := int64(0) if !c.startTime.IsZero() { - activeTimeMs = time.Since(c.startTime) * time.Millisecond + activeTimeMs = time.Since(c.startTime).Milliseconds() } return fmt.Sprintf("bytes: desired %d / probe %d / non-probe %d / remaining: %d, time(ms): active %d / min %d / max %d", @@ -383,6 +383,6 @@ func (c *Cluster) String() string { c.bytesSentNonProbe, c.desiredBytes-c.bytesSentProbe-c.bytesSentNonProbe, activeTimeMs, - c.minDuration*time.Millisecond, - c.maxDuration*time.Millisecond) + c.minDuration.Milliseconds(), + c.maxDuration.Milliseconds()) } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 49d679354..b081c8fb9 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -204,16 +204,23 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff w.buffers[layer] = buff w.bufferMu.Unlock() - if w.isSimulcast && w.useTrackers { - tracker := NewStreamTracker() + if w.Kind() == webrtc.RTPCodecTypeVideo && w.useTrackers { + samplesRequired := uint32(5) + cyclesRequired := uint64(60) // 30s of continuous stream + if layer == 0 { + // be very forgiving for base layer + samplesRequired = 1 + cyclesRequired = 4 // 2s of continuous stream + } + tracker := NewStreamTracker(samplesRequired, cyclesRequired, 500*time.Millisecond) w.trackers[layer] = tracker - tracker.OnStatusChanged = func(status StreamStatus) { + tracker.OnStatusChanged(func(status StreamStatus) { if status == StreamStatusStopped { w.removeAvailableLayer(uint16(layer)) } else { w.addAvailableLayer(uint16(layer)) } - } + }) tracker.Start() } go w.forwardRTP(layer) @@ -223,9 +230,6 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff // this will reflect the "muted" status and will pause streamtracker to ensure we don't turn off // the layer func (w *WebRTCReceiver) SetUpTrackPaused(paused bool) { - if !w.isSimulcast { - return - } w.upTrackMu.Lock() defer w.upTrackMu.Unlock() for _, tracker := range w.trackers { @@ -247,18 +251,15 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) { return } - if w.isSimulcast { - track.SetTrackType(true) - + track.SetTrackType(w.isSimulcast) + if w.Kind() == webrtc.RTPCodecTypeVideo { // notify added downtrack of available layers w.upTrackMu.RLock() layers, ok := w.availableLayers.Load().([]uint16) w.upTrackMu.RUnlock() if ok && len(layers) != 0 { - _, _ = track.UptrackLayersChange(layers, true) + track.UptrackLayersChange(layers, true) } - } else { - track.SetTrackType(false) } w.storeDownTrack(track) @@ -292,7 +293,7 @@ func (w *WebRTCReceiver) downtrackLayerChange(layers []uint16, layerAdded bool) defer w.downTrackMu.RUnlock() for _, dt := range w.downTracks { if dt != nil { - _, _ = dt.UptrackLayersChange(layers, layerAdded) + dt.UptrackLayersChange(layers, layerAdded) } } } diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index bb93c2969..f51ed2926 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -123,8 +123,8 @@ const ( GratuitousProbeHeadroomBps = 1 * 1000 * 1000 // if headroom is more than 1 Mbps, don't probe GratuitousProbePct = 10 GratuitousProbeMaxBps = 300 * 1000 // 300 kbps - GratuitousProbeMinDurationMs = 500 - GratuitousProbeMaxDurationMs = 600 + GratuitousProbeMinDurationMs = 500 * time.Millisecond + GratuitousProbeMaxDurationMs = 600 * time.Millisecond AudioLossWeight = 0.75 VideoLossWeight = 0.25 @@ -418,7 +418,7 @@ func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstima s.prevReceivedEstimate = s.receivedEstimate s.receivedEstimate = uint64(remb.Bitrate) if s.prevReceivedEstimate != s.receivedEstimate { - s.logger.Debugw("received new estimate", "pariticpant", s.participantID, "old(bps)", s.prevReceivedEstimate, "new(bps)", s.receivedEstimate) + s.logger.Debugw("received new estimate", "participant", s.participantID, "old(bps)", s.prevReceivedEstimate, "new(bps)", s.receivedEstimate) } signal := s.maybeCommitEstimate() s.estimateMu.Unlock() @@ -617,21 +617,52 @@ func (s *StreamAllocator) runStateMachine(event Event) { } } -// LK-TODO-START -// Signal_ADD_TRACK is not useful. Probably can get rid of it. -// AVAILABLE_LAYERS_ADD/REMOVE should be how track start should -// be getting an allocation. -// LK-TODO-END +// LK-TODO: ADD_TRACK should not reallocate if added track is audio func (s *StreamAllocator) runStatePreCommit(event Event) { switch event.Signal { case Signal_ADD_TRACK: - // wait for layers add/remove signal + s.allocate() case Signal_REMOVE_TRACK: s.allocate() case Signal_ESTIMATE_INCREASE: - s.allocate() + // should never happen as the intialized capacity is very high + s.setState(State_STABLE) case Signal_ESTIMATE_DECREASE: - s.allocate() + // first estimate could be off as things are ramping up. + // + // Basically, an initial channel capacity of InitialChannelCapacity + // which is very high is used so that there is no throttling before + // getting an estimate. The first estimate happens 6 - 8 seconds + // after streaming starts. With screen share like stream, the traffic + // is sparse/spikey depending on the content. So, the estimate could + // be small and still ramping up. So, doing an allocation could result + // in stream being paused. + // + // This is one of the significant challenges here, i. e. time alignment. + // Estimate was potentially measured using data from a little while back. + // But, that estimate is used to allocate based on current bitrate. + // So, in the intervening period if there was movement on the screen, + // the bitrate could have spiked and the REMB value is probably stale. + // + // A few options to consider + // o what is implemented here (i. e. change to STABLE state and + // let further streaming drive the esimation and move the state + // machine in whichever direction it needs to go) + // o Forward padding packets also. But, that could have an adverse + // effect on the channel when a new stream comes on line and + // starts probing, i. e. it could affect existing flows if it + // congests the channel because of the spurt of padding packets. + // o Do probing downstream in the first few seconds in addition to + // forwarding any streams and get a better estimate of the channel. + // o Measure up stream bitrates over a much longer window to smooth + // out spikes and get a more steady-state rate and use it in + // allocations. + // + // A good challenge. Basically, the goal should be to aviod re-allocation + // of streams. Any re-allocation means potential for estimate and current + // state of up streams not being in sync because of time differences resulting + // in streams getting paused unnecessarily. + s.setState(State_STABLE) case Signal_RECEIVER_REPORT: case Signal_AVAILABLE_LAYERS_ADD: s.allocate() @@ -648,7 +679,7 @@ func (s *StreamAllocator) runStatePreCommit(event Event) { func (s *StreamAllocator) runStateStable(event Event) { switch event.Signal { case Signal_ADD_TRACK: - // wait for layers add/remove signal + s.allocate() case Signal_REMOVE_TRACK: // LK-TODO - may want to re-calculate channel usage? case Signal_ESTIMATE_INCREASE: @@ -666,7 +697,9 @@ func (s *StreamAllocator) runStateStable(event Event) { s.allocate() case Signal_PERIODIC_PING: // if bandwidth estimate has been stable for a while, maybe gratuitously probe - s.maybeGratuitousProbe() + if s.maybeGratuitousProbe() { + s.setState(State_GRATUITOUS_PROBING) + } } } @@ -686,16 +719,13 @@ func (s *StreamAllocator) runStateStable(event Event) { func (s *StreamAllocator) runStateDeficient(event Event) { switch event.Signal { case Signal_ADD_TRACK: - // wait for layers add/remove signal + s.allocate() case Signal_REMOVE_TRACK: s.allocate() case Signal_ESTIMATE_INCREASE: // as long as estimate is increasing, keep going. - // Switch to STABLE state if estimate exceeds optimal bandwidth needed. - if s.getChannelCapacity() > s.getOptimalBandwidthUsage() { - s.resetBoost() - s.setState(State_STABLE) - } + // try an allocation to check if state can move to STABLE + s.allocate() case Signal_ESTIMATE_DECREASE: // stop using the boosted estimate s.resetBoost() @@ -723,7 +753,7 @@ func (s *StreamAllocator) runStateGratuitousProbing(event Event) { // to avoid any self-inflicted damaage switch event.Signal { case Signal_ADD_TRACK: - // wait for layers add/remove signal + s.allocate() case Signal_REMOVE_TRACK: // LK-TODO - may want to re-calculate channel usage? case Signal_ESTIMATE_INCREASE: @@ -748,8 +778,12 @@ func (s *StreamAllocator) runStateGratuitousProbing(event Event) { s.allocate() case Signal_PERIODIC_PING: // try for more - if !s.prober.IsRunning() && s.maybeGratuitousProbe() { - s.logger.Infow("trying more gratuitous probing", "participant", s.participantID) + if !s.prober.IsRunning() { + if s.maybeGratuitousProbe() { + s.logger.Debugw("trying more gratuitous probing", "participant", s.participantID) + } else { + s.setState(State_STABLE) + } } } } @@ -798,11 +832,11 @@ func (s *StreamAllocator) maybeCommitEstimate() Signal { return signal } -func (s *StreamAllocator) getChannelCapacity() uint64 { +func (s *StreamAllocator) getChannelCapacity() (uint64, uint64) { s.estimateMu.RLock() defer s.estimateMu.RUnlock() - return s.committedChannelCapacity + return s.committedChannelCapacity, s.receivedEstimate } func (s *StreamAllocator) allocate() { @@ -892,12 +926,13 @@ func (s *StreamAllocator) allocate() { // So, track total requested bandwidth and mark DEFICIENT state if the total is // above the estimated channel capacity even if the optimal signal is true. // + // LK-TODO make protocol friendly structures var pausedTracks map[string][]string var resumedTracks map[string][]string isOptimal := true totalBandwidthRequested := uint64(0) - committedChannelCapacity := s.getChannelCapacity() + committedChannelCapacity, _ := s.getChannelCapacity() availableChannelCapacity := committedChannelCapacity if availableChannelCapacity < s.boostedChannelCapacity { availableChannelCapacity = s.boostedChannelCapacity @@ -1079,7 +1114,7 @@ func (s *StreamAllocator) maybeBoostLayer() { func (s *StreamAllocator) maybeBoostBandwidth() { // temporarily boost estimate for probing. // Boost either the committed channel capacity or previous boost point if there is one - baseBps := s.getChannelCapacity() + baseBps, _ := s.getChannelCapacity() if baseBps < s.boostedChannelCapacity { baseBps = s.boostedChannelCapacity } @@ -1116,25 +1151,32 @@ func (s *StreamAllocator) resetBoost() { } func (s *StreamAllocator) maybeGratuitousProbe() bool { + // LK-TODO: do not probe if there are no video tracks if time.Since(s.lastEstimateDecreaseTime) < GratuitousProbeWaitMs { return false } - committedChannelCapacity := s.getChannelCapacity() + // use last received estimate for gratuitous probing base as + // more updates may have been received since the last commit + _, receivedEstimate := s.getChannelCapacity() expectedRateBps := s.getExpectedBandwidthUsage() - headroomBps := committedChannelCapacity - expectedRateBps + headroomBps := receivedEstimate - expectedRateBps if headroomBps > GratuitousProbeHeadroomBps { return false } - probeRateBps := (committedChannelCapacity * GratuitousProbePct) / 100 + probeRateBps := (receivedEstimate * GratuitousProbePct) / 100 if probeRateBps > GratuitousProbeMaxBps { probeRateBps = GratuitousProbeMaxBps } - s.prober.AddCluster(int(committedChannelCapacity+probeRateBps), int(expectedRateBps), GratuitousProbeMinDurationMs, GratuitousProbeMaxDurationMs) + s.prober.AddCluster( + int(receivedEstimate+probeRateBps), + int(expectedRateBps), + GratuitousProbeMinDurationMs, + GratuitousProbeMaxDurationMs, + ) - s.setState(State_GRATUITOUS_PROBING) return true } diff --git a/pkg/sfu/streamtracker.go b/pkg/sfu/streamtracker.go index dfd4c1eac..d2462cba1 100644 --- a/pkg/sfu/streamtracker.go +++ b/pkg/sfu/streamtracker.go @@ -1,6 +1,7 @@ package sfu import ( + "sync" "sync/atomic" "time" ) @@ -27,16 +28,22 @@ const ( // It runs its own goroutine for detection, and fires OnStatusChanged callback type StreamTracker struct { // number of samples needed per cycle - SamplesRequired uint32 + samplesRequired uint32 // number of cycles needed to be active - CyclesRequired uint64 - CycleDuration time.Duration - OnStatusChanged func(StreamStatus) - initialized atomicBool - paused atomicBool - status atomicInt32 // stores StreamStatus - countSinceLast uint32 // number of packets received since last check - running chan struct{} + cyclesRequired uint64 + cycleDuration time.Duration + + onStatusChanged func(status StreamStatus) + + paused atomicBool + countSinceLast uint32 // number of packets received since last check + running chan struct{} + + initMu sync.Mutex + initialized bool + + statusMu sync.RWMutex + status StreamStatus // only access within detectWorker cycleCount uint64 @@ -45,30 +52,58 @@ type StreamTracker struct { lastSN uint16 } -func NewStreamTracker() *StreamTracker { +func NewStreamTracker(samplesRequired uint32, cyclesRequired uint64, cycleDuration time.Duration) *StreamTracker { s := &StreamTracker{ - SamplesRequired: 5, - CyclesRequired: 60, // 30s of continuous stream - CycleDuration: 500 * time.Millisecond, + samplesRequired: samplesRequired, + cyclesRequired: cyclesRequired, + cycleDuration: cycleDuration, + status: StreamStatusStopped, } - s.status.set(int32(StreamStatusStopped)) return s } -func (s *StreamTracker) Status() StreamStatus { - return StreamStatus(s.status.get()) +func (s *StreamTracker) OnStatusChanged(f func(status StreamStatus)) { + s.onStatusChanged = f } -func (s *StreamTracker) setStatus(status StreamStatus) { - if status != s.Status() { - s.status.set(int32(status)) - if s.OnStatusChanged != nil { - s.OnStatusChanged(status) - } +func (s *StreamTracker) Status() StreamStatus { + s.statusMu.RLock() + defer s.statusMu.RUnlock() + + return s.status +} + +func (s *StreamTracker) maybeSetActive() { + changed := false + s.statusMu.Lock() + if s.status != StreamStatusActive { + s.status = StreamStatusActive + changed = true + } + s.statusMu.Unlock() + + if changed && s.onStatusChanged != nil { + s.onStatusChanged(StreamStatusActive) } } -func (s *StreamTracker) Start() { +func (s *StreamTracker) maybeSetStopped() { + changed := false + s.statusMu.Lock() + if s.status != StreamStatusStopped { + s.status = StreamStatusStopped + changed = true + } + s.statusMu.Unlock() + + if changed && s.onStatusChanged != nil { + s.onStatusChanged(StreamStatusStopped) + } +} + +func (s *StreamTracker) init() { + s.maybeSetActive() + if s.isRunning() { return } @@ -76,6 +111,9 @@ func (s *StreamTracker) Start() { go s.detectWorker() } +func (s *StreamTracker) Start() { +} + func (s *StreamTracker) Stop() { if s.running != nil { close(s.running) @@ -105,11 +143,19 @@ func (s *StreamTracker) Observe(sn uint16) { return } - if !s.initialized.get() { - s.initialized.set(true) + s.initMu.Lock() + if !s.initialized { // first packet - go s.setStatus(StreamStatusActive) + s.lastSN = sn + s.initialized = true + s.initMu.Unlock() + + // declare stream active and start the detect worker + go s.init() + + return } + s.initMu.Unlock() // ignore out-of-order SNs if (sn - s.lastSN) > uint16(1<<15) { @@ -120,7 +166,7 @@ func (s *StreamTracker) Observe(sn uint16) { } func (s *StreamTracker) detectWorker() { - ticker := time.NewTicker(s.CycleDuration) + ticker := time.NewTicker(s.cycleDuration) for s.isRunning() { <-ticker.C @@ -133,22 +179,22 @@ func (s *StreamTracker) detectWorker() { } func (s *StreamTracker) detectChanges() { - if s.paused.get() || !s.initialized.get() { + if s.paused.get() { return } - if atomic.LoadUint32(&s.countSinceLast) >= s.SamplesRequired { + if atomic.LoadUint32(&s.countSinceLast) >= s.samplesRequired { s.cycleCount += 1 } else { s.cycleCount = 0 } - if s.cycleCount == 0 && s.Status() == StreamStatusActive { + if s.cycleCount == 0 { // flip to stopped - s.setStatus(StreamStatusStopped) - } else if s.cycleCount >= s.CyclesRequired && s.Status() == StreamStatusStopped { + s.maybeSetStopped() + } else if s.cycleCount >= s.cyclesRequired { // flip to active - s.setStatus(StreamStatusActive) + s.maybeSetActive() } atomic.StoreUint32(&s.countSinceLast, 0) diff --git a/pkg/sfu/streamtracker_test.go b/pkg/sfu/streamtracker_test.go index 85c6a9ad1..d96ff6e7f 100644 --- a/pkg/sfu/streamtracker_test.go +++ b/pkg/sfu/streamtracker_test.go @@ -2,6 +2,7 @@ package sfu import ( "testing" + "time" "github.com/livekit/livekit-server/pkg/testutils" @@ -11,10 +12,10 @@ import ( func TestStreamTracker(t *testing.T) { t.Run("flips to active on first observe", func(t *testing.T) { callbackCalled := atomicBool(0) - tracker := NewStreamTracker() - tracker.OnStatusChanged = func(status StreamStatus) { + tracker := NewStreamTracker(5, 60, 500*time.Millisecond) + tracker.OnStatusChanged(func(status StreamStatus) { callbackCalled.set(true) - } + }) require.Equal(t, StreamStatusStopped, tracker.Status()) // observe first packet @@ -29,7 +30,7 @@ func TestStreamTracker(t *testing.T) { }) t.Run("flips to inactive immediately", func(t *testing.T) { - tracker := NewStreamTracker() + tracker := NewStreamTracker(5, 60, 500*time.Millisecond) require.Equal(t, StreamStatusStopped, tracker.Status()) tracker.Observe(1) @@ -38,9 +39,9 @@ func TestStreamTracker(t *testing.T) { }) callbackCalled := atomicBool(0) - tracker.OnStatusChanged = func(status StreamStatus) { + tracker.OnStatusChanged(func(status StreamStatus) { callbackCalled.set(true) - } + }) require.Equal(t, StreamStatusActive, tracker.Status()) // run a single interation @@ -50,7 +51,7 @@ func TestStreamTracker(t *testing.T) { }) t.Run("flips back to active after iterations", func(t *testing.T) { - tracker := NewStreamTracker() + tracker := NewStreamTracker(1, 2, 500*time.Millisecond) require.Equal(t, StreamStatusStopped, tracker.Status()) tracker.Observe(1) @@ -58,9 +59,7 @@ func TestStreamTracker(t *testing.T) { return tracker.Status() == StreamStatusActive }) - tracker.CyclesRequired = 2 - tracker.SamplesRequired = 1 - tracker.setStatus(StreamStatusStopped) + tracker.maybeSetStopped() tracker.Observe(2) tracker.detectChanges() @@ -72,7 +71,7 @@ func TestStreamTracker(t *testing.T) { }) t.Run("does not change to inactive when paused", func(t *testing.T) { - tracker := NewStreamTracker() + tracker := NewStreamTracker(5, 60, 500*time.Millisecond) tracker.Observe(1) testutils.WithTimeout(t, "first packet makes stream active", func() bool { return tracker.Status() == StreamStatusActive