Files
livekit/pkg/sfu/receiver.go
Raja Subramanian af0b0c4734 Connection quality LOST only if RTCP is also not available. (#2670)
* Connection quality LOST only if RTCP is also not available.

It is possible that sender stops all layers of video due to some
constraint (CPU or bandwidth). Packet reception going dry due to
that should not trigger `LOST` quality.

Add last received RTCP time also to distinguish the case
of real `LOST` and sender stopping traffic.

Some bits to watch for
- With audio, RTCP reports could be more than 5 seconds apart (5 seconds
  is the default interval for connection quality scorer), but audio
  senders usually send silence packets even when there is no input.
  So audio completely stopping can be considered `LOST`.
- With video, have to observe if all clients continue to send RTCP even
  if all layers are stopped.
- RTCP bandwidth is not supposed to exceed the primary stream bandwidth.
  libwebrtc calculates that and spaces out RTCP reports accordingly.
  That is the reason why audio reports are that far apart. If a video
  stream is encoded at a very low bit rate, it could also be sending
  RTCP rarely. So, there is the case of LOST being indistinguishable
  from sender stopping all layers. But, this should be a rare case.

* typo
2024-04-21 23:35:24 +05:30

837 lines
21 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sfu
import (
"errors"
"io"
"strings"
"sync"
"time"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/livekit/mediatransportutil/pkg/bucket"
"github.com/livekit/mediatransportutil/pkg/twcc"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/sfu/audio"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
)
var (
ErrReceiverClosed = errors.New("receiver closed")
ErrDownTrackAlreadyExist = errors.New("DownTrack already exist")
ErrBufferNotFound = errors.New("buffer not found")
ErrDuplicateLayer = errors.New("duplicate layer")
)
type AudioLevelHandle func(level uint8, duration uint32)
type Bitrates [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerTemporal + 1]int64
// TrackReceiver defines an interface receive media from remote peer
type TrackReceiver interface {
TrackID() livekit.TrackID
StreamID() string
Codec() webrtc.RTPCodecParameters
HeaderExtensions() []webrtc.RTPHeaderExtensionParameter
IsClosed() bool
ReadRTP(buf []byte, layer uint8, sn uint16) (int, error)
GetLayeredBitrate() ([]int32, Bitrates)
GetAudioLevel() (float64, bool)
SendPLI(layer int32, force bool)
SetUpTrackPaused(paused bool)
SetMaxExpectedSpatialLayer(layer int32)
AddDownTrack(track TrackSender) error
DeleteDownTrack(participantID livekit.ParticipantID)
DebugInfo() map[string]interface{}
TrackInfo() *livekit.TrackInfo
UpdateTrackInfo(ti *livekit.TrackInfo)
// Get primary receiver if this receiver represents a RED codec; otherwise it will return itself
GetPrimaryReceiverForRed() TrackReceiver
// Get red receiver for primary codec, used by forward red encodings for opus only codec
GetRedReceiver() TrackReceiver
GetTemporalLayerFpsForSpatial(layer int32) []float32
GetTrackStats() *livekit.RTPStats
}
// WebRTCReceiver receives a media track
type WebRTCReceiver struct {
logger logger.Logger
pliThrottleConfig config.PLIThrottleConfig
audioConfig config.AudioConfig
trackID livekit.TrackID
streamID string
kind webrtc.RTPCodecType
receiver *webrtc.RTPReceiver
codec webrtc.RTPCodecParameters
isSVC bool
isRED bool
onCloseHandler func()
closeOnce sync.Once
closed atomic.Bool
useTrackers bool
trackInfo atomic.Pointer[livekit.TrackInfo]
onRTCP func([]rtcp.Packet)
twcc *twcc.Responder
bufferMu sync.RWMutex
buffers [buffer.DefaultMaxLayerSpatial + 1]*buffer.Buffer
upTracks [buffer.DefaultMaxLayerSpatial + 1]*webrtc.TrackRemote
rtt uint32
lbThreshold int
streamTrackerManager *StreamTrackerManager
downTrackSpreader *DownTrackSpreader
connectionStats *connectionquality.ConnectionStats
onStatsUpdate func(w *WebRTCReceiver, stat *livekit.AnalyticsStat)
onMaxLayerChange func(maxLayer int32)
primaryReceiver atomic.Pointer[RedPrimaryReceiver]
redReceiver atomic.Pointer[RedReceiver]
redPktWriter func(pkt *buffer.ExtPacket, spatialLayer int32)
}
// SVC-TODO: Have to use more conditions to differentiate between
// SVC-TODO: SVC and non-SVC (could be single layer or simulcast).
// SVC-TODO: May only need to differentiate between simulcast and non-simulcast
// SVC-TODO: i. e. may be possible to treat single layer as SVC to get proper/intended functionality.
func IsSvcCodec(mime string) bool {
switch strings.ToLower(mime) {
case "video/av1":
fallthrough
case "video/vp9":
return true
}
return false
}
func IsRedCodec(mime string) bool {
return strings.HasSuffix(strings.ToLower(mime), "red")
}
type ReceiverOpts func(w *WebRTCReceiver) *WebRTCReceiver
// WithPliThrottleConfig indicates minimum time(ms) between sending PLIs
func WithPliThrottleConfig(pliThrottleConfig config.PLIThrottleConfig) ReceiverOpts {
return func(w *WebRTCReceiver) *WebRTCReceiver {
w.pliThrottleConfig = pliThrottleConfig
return w
}
}
// WithAudioConfig sets up parameters for active speaker detection
func WithAudioConfig(audioConfig config.AudioConfig) ReceiverOpts {
return func(w *WebRTCReceiver) *WebRTCReceiver {
w.audioConfig = audioConfig
return w
}
}
// WithStreamTrackers enables StreamTracker use for simulcast
func WithStreamTrackers() ReceiverOpts {
return func(w *WebRTCReceiver) *WebRTCReceiver {
w.useTrackers = true
return w
}
}
// WithLoadBalanceThreshold enables parallelization of packet writes when downTracks exceeds threshold
// Value should be between 3 and 150.
// For a server handling a few large rooms, use a smaller value (required to handle very large (250+ participant) rooms).
// For a server handling many small rooms, use a larger value or disable.
// Set to 0 (disabled) by default.
func WithLoadBalanceThreshold(downTracks int) ReceiverOpts {
return func(w *WebRTCReceiver) *WebRTCReceiver {
w.lbThreshold = downTracks
return w
}
}
// NewWebRTCReceiver creates a new webrtc track receiver
func NewWebRTCReceiver(
receiver *webrtc.RTPReceiver,
track *webrtc.TrackRemote,
trackInfo *livekit.TrackInfo,
logger logger.Logger,
onRTCP func([]rtcp.Packet),
trackersConfig config.StreamTrackersConfig,
opts ...ReceiverOpts,
) *WebRTCReceiver {
w := &WebRTCReceiver{
logger: logger,
receiver: receiver,
trackID: livekit.TrackID(track.ID()),
streamID: track.StreamID(),
codec: track.Codec(),
kind: track.Kind(),
onRTCP: onRTCP,
isSVC: IsSvcCodec(track.Codec().MimeType),
isRED: IsRedCodec(track.Codec().MimeType),
}
for _, opt := range opts {
w = opt(w)
}
w.trackInfo.Store(proto.Clone(trackInfo).(*livekit.TrackInfo))
w.downTrackSpreader = NewDownTrackSpreader(DownTrackSpreaderParams{
Threshold: w.lbThreshold,
Logger: logger,
})
w.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
MimeType: w.codec.MimeType,
IsFECEnabled: strings.EqualFold(w.codec.MimeType, webrtc.MimeTypeOpus) && strings.Contains(strings.ToLower(w.codec.SDPFmtpLine), "fec"),
ReceiverProvider: w,
Logger: w.logger.WithValues("direction", "up"),
})
w.connectionStats.OnStatsUpdate(func(_cs *connectionquality.ConnectionStats, stat *livekit.AnalyticsStat) {
if w.onStatsUpdate != nil {
w.onStatsUpdate(w, stat)
}
})
w.connectionStats.Start(trackInfo)
w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig)
w.streamTrackerManager.SetListener(w)
// SVC-TODO: Handle DD for non-SVC cases???
if w.isSVC {
for _, ext := range receiver.GetParameters().HeaderExtensions {
if ext.URI == dd.ExtensionURI {
w.streamTrackerManager.AddDependencyDescriptorTrackers()
break
}
}
}
return w
}
func (w *WebRTCReceiver) TrackInfo() *livekit.TrackInfo {
return w.trackInfo.Load()
}
func (w *WebRTCReceiver) UpdateTrackInfo(ti *livekit.TrackInfo) {
w.trackInfo.Store(proto.Clone(ti).(*livekit.TrackInfo))
w.streamTrackerManager.UpdateTrackInfo(ti)
}
func (w *WebRTCReceiver) OnStatsUpdate(fn func(w *WebRTCReceiver, stat *livekit.AnalyticsStat)) {
w.onStatsUpdate = fn
}
func (w *WebRTCReceiver) OnMaxLayerChange(fn func(maxLayer int32)) {
w.bufferMu.Lock()
w.onMaxLayerChange = fn
w.bufferMu.Unlock()
}
func (w *WebRTCReceiver) getOnMaxLayerChange() func(maxLayer int32) {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
return w.onMaxLayerChange
}
func (w *WebRTCReceiver) GetConnectionScoreAndQuality() (float32, livekit.ConnectionQuality) {
return w.connectionStats.GetScoreAndQuality()
}
func (w *WebRTCReceiver) IsClosed() bool {
return w.closed.Load()
}
func (w *WebRTCReceiver) SetRTT(rtt uint32) {
w.bufferMu.Lock()
if w.rtt == rtt {
w.bufferMu.Unlock()
return
}
w.rtt = rtt
buffers := w.buffers
w.bufferMu.Unlock()
for _, buff := range buffers {
if buff == nil {
continue
}
buff.SetRTT(rtt)
}
}
func (w *WebRTCReceiver) StreamID() string {
return w.streamID
}
func (w *WebRTCReceiver) TrackID() livekit.TrackID {
return w.trackID
}
func (w *WebRTCReceiver) ssrc(layer int) uint32 {
if track := w.upTracks[layer]; track != nil {
return uint32(track.SSRC())
}
return 0
}
func (w *WebRTCReceiver) Codec() webrtc.RTPCodecParameters {
return w.codec
}
func (w *WebRTCReceiver) HeaderExtensions() []webrtc.RTPHeaderExtensionParameter {
return w.receiver.GetParameters().HeaderExtensions
}
func (w *WebRTCReceiver) Kind() webrtc.RTPCodecType {
return w.kind
}
func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer) error {
if w.closed.Load() {
return ErrReceiverClosed
}
layer := int32(0)
if w.Kind() == webrtc.RTPCodecTypeVideo && !w.isSVC {
layer = buffer.RidToSpatialLayer(track.RID(), w.trackInfo.Load())
}
buff.SetLogger(w.logger.WithValues("layer", layer))
buff.SetAudioLevelParams(audio.AudioLevelParams{
ActiveLevel: w.audioConfig.ActiveLevel,
MinPercentile: w.audioConfig.MinPercentile,
ObserveDuration: w.audioConfig.UpdateInterval,
SmoothIntervals: w.audioConfig.SmoothIntervals,
})
buff.SetAudioLossProxying(w.audioConfig.EnableLossProxying)
buff.OnRtcpFeedback(w.sendRTCP)
buff.OnRtcpSenderReport(func() {
srData := buff.GetSenderReportData()
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, w.isSVC, layer, srData)
})
})
var duration time.Duration
switch layer {
case 2:
duration = w.pliThrottleConfig.HighQuality
case 1:
duration = w.pliThrottleConfig.MidQuality
case 0:
duration = w.pliThrottleConfig.LowQuality
default:
duration = w.pliThrottleConfig.MidQuality
}
if duration != 0 {
buff.SetPLIThrottle(duration.Nanoseconds())
}
w.bufferMu.Lock()
if w.upTracks[layer] != nil {
w.bufferMu.Unlock()
return ErrDuplicateLayer
}
w.upTracks[layer] = track
w.buffers[layer] = buff
rtt := w.rtt
w.bufferMu.Unlock()
buff.SetRTT(rtt)
buff.SetPaused(w.streamTrackerManager.IsPaused())
if w.Kind() == webrtc.RTPCodecTypeVideo && w.useTrackers {
w.streamTrackerManager.AddTracker(layer)
}
go w.forwardRTP(layer)
return nil
}
// SetUpTrackPaused indicates upstream will not be sending any data.
// this will reflect the "muted" status and will pause streamtracker to ensure we don't turn off
// the layer
func (w *WebRTCReceiver) SetUpTrackPaused(paused bool) {
w.streamTrackerManager.SetPaused(paused)
w.bufferMu.RLock()
for _, buff := range w.buffers {
if buff == nil {
continue
}
buff.SetPaused(paused)
}
w.bufferMu.RUnlock()
w.connectionStats.UpdateMute(paused)
}
func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error {
if w.closed.Load() {
return ErrReceiverClosed
}
if w.downTrackSpreader.HasDownTrack(track.SubscriberID()) {
w.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID())
}
track.TrackInfoAvailable()
track.UpTrackMaxPublishedLayerChange(w.streamTrackerManager.GetMaxPublishedLayer())
track.UpTrackMaxTemporalLayerSeenChange(w.streamTrackerManager.GetMaxTemporalLayerSeen())
w.downTrackSpreader.Store(track)
w.logger.Debugw("downtrack added", "subscriberID", track.SubscriberID())
return nil
}
func (w *WebRTCReceiver) notifyMaxExpectedLayer(layer int32) {
ti := w.TrackInfo()
if ti == nil {
return
}
if w.Kind() == webrtc.RTPCodecTypeAudio || ti.Source == livekit.TrackSource_SCREEN_SHARE {
// screen share tracks have highly variable bitrate, do not use bit rate based quality for those
return
}
expectedBitrate := int64(0)
for _, vl := range ti.Layers {
l := buffer.VideoQualityToSpatialLayer(vl.Quality, ti)
if l <= layer {
expectedBitrate += int64(vl.Bitrate)
}
}
w.connectionStats.AddBitrateTransition(expectedBitrate)
}
func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32) {
w.streamTrackerManager.SetMaxExpectedSpatialLayer(layer)
w.notifyMaxExpectedLayer(layer)
if layer == buffer.InvalidLayerSpatial {
w.connectionStats.UpdateLayerMute(true)
} else {
w.connectionStats.UpdateLayerMute(false)
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
}
}
// StreamTrackerManagerListener.OnAvailableLayersChanged
func (w *WebRTCReceiver) OnAvailableLayersChanged() {
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
dt.UpTrackLayersChange()
})
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
}
// StreamTrackerManagerListener.OnBitrateAvailabilityChanged
func (w *WebRTCReceiver) OnBitrateAvailabilityChanged() {
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
dt.UpTrackBitrateAvailabilityChange()
})
}
// StreamTrackerManagerListener.OnMaxPublishedLayerChanged
func (w *WebRTCReceiver) OnMaxPublishedLayerChanged(maxPublishedLayer int32) {
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
dt.UpTrackMaxPublishedLayerChange(maxPublishedLayer)
})
w.notifyMaxExpectedLayer(maxPublishedLayer)
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
}
// StreamTrackerManagerListener.OnMaxTemporalLayerSeenChanged
func (w *WebRTCReceiver) OnMaxTemporalLayerSeenChanged(maxTemporalLayerSeen int32) {
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
dt.UpTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen)
})
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
}
// StreamTrackerManagerListener.OnMaxAvailableLayerChanged
func (w *WebRTCReceiver) OnMaxAvailableLayerChanged(maxAvailableLayer int32) {
if onMaxLayerChange := w.getOnMaxLayerChange(); onMaxLayerChange != nil {
onMaxLayerChange(maxAvailableLayer)
}
}
// StreamTrackerManagerListener.OnBitrateReport
func (w *WebRTCReceiver) OnBitrateReport(availableLayers []int32, bitrates Bitrates) {
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
dt.UpTrackBitrateReport(availableLayers, bitrates)
})
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
}
func (w *WebRTCReceiver) GetLayeredBitrate() ([]int32, Bitrates) {
return w.streamTrackerManager.GetLayeredBitrate()
}
// OnCloseHandler method to be called on remote tracked removed
func (w *WebRTCReceiver) OnCloseHandler(fn func()) {
w.onCloseHandler = fn
}
// DeleteDownTrack removes a DownTrack from a Receiver
func (w *WebRTCReceiver) DeleteDownTrack(subscriberID livekit.ParticipantID) {
if w.closed.Load() {
return
}
w.downTrackSpreader.Free(subscriberID)
w.logger.Debugw("downtrack deleted", "subscriberID", subscriberID)
}
func (w *WebRTCReceiver) sendRTCP(packets []rtcp.Packet) {
if packets == nil || w.closed.Load() {
return
}
if w.onRTCP != nil {
w.onRTCP(packets)
}
}
func (w *WebRTCReceiver) SendPLI(layer int32, force bool) {
// SVC-TODO : should send LRR (Layer Refresh Request) instead of PLI
buff := w.getBuffer(layer)
if buff == nil {
return
}
buff.SendPLI(force)
}
func (w *WebRTCReceiver) getBuffer(layer int32) *buffer.Buffer {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
return w.getBufferLocked(layer)
}
func (w *WebRTCReceiver) getBufferLocked(layer int32) *buffer.Buffer {
// for svc codecs, use layer = 0 always.
// spatial layers are in-built and handled by single buffer
if w.isSVC {
layer = 0
}
if layer < 0 || int(layer) >= len(w.buffers) {
return nil
}
return w.buffers[layer]
}
func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {
b := w.getBuffer(int32(layer))
if b == nil {
return 0, ErrBufferNotFound
}
return b.GetPacket(buf, sn)
}
func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
stats := make([]*livekit.RTPStats, 0, len(w.buffers))
for _, buff := range w.buffers {
if buff == nil {
continue
}
sswl := buff.GetStats()
if sswl == nil {
continue
}
stats = append(stats, sswl)
}
return buffer.AggregateRTPStats(stats)
}
func (w *WebRTCReceiver) GetAudioLevel() (float64, bool) {
if w.Kind() == webrtc.RTPCodecTypeVideo {
return 0, false
}
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
for _, buff := range w.buffers {
if buff == nil {
continue
}
return buff.GetAudioLevel()
}
return 0, false
}
func (w *WebRTCReceiver) GetDeltaStats() map[uint32]*buffer.StreamStatsWithLayers {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
deltaStats := make(map[uint32]*buffer.StreamStatsWithLayers, len(w.buffers))
for layer, buff := range w.buffers {
if buff == nil {
continue
}
sswl := buff.GetDeltaStats()
if sswl == nil {
continue
}
// patch buffer stats with correct layer
patched := make(map[int32]*buffer.RTPDeltaInfo, 1)
patched[int32(layer)] = sswl.Layers[0]
sswl.Layers = patched
deltaStats[w.ssrc(layer)] = sswl
}
return deltaStats
}
func (w *WebRTCReceiver) GetLastSenderReportTime() time.Time {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
latestSRTime := time.Time{}
for _, buff := range w.buffers {
if buff == nil {
continue
}
srAt := buff.GetLastSenderReportTime()
if srAt.After(latestSRTime) {
latestSRTime = srAt
}
}
return latestSRTime
}
func (w *WebRTCReceiver) forwardRTP(layer int32) {
pktBuf := make([]byte, bucket.MaxPktSize)
tracker := w.streamTrackerManager.GetTracker(layer)
defer func() {
w.closeOnce.Do(func() {
w.closed.Store(true)
w.closeTracks()
if pr := w.primaryReceiver.Load(); pr != nil {
pr.Close()
}
if pr := w.redReceiver.Load(); pr != nil {
pr.Close()
}
})
w.streamTrackerManager.RemoveTracker(layer)
if w.isSVC {
w.streamTrackerManager.RemoveAllTrackers()
}
}()
for {
w.bufferMu.RLock()
buf := w.buffers[layer]
redPktWriter := w.redPktWriter
w.bufferMu.RUnlock()
pkt, err := buf.ReadExtended(pktBuf)
if err == io.EOF {
return
}
spatialTracker := tracker
spatialLayer := layer
if pkt.Spatial >= 0 {
// svc packet, dispatch to correct tracker
spatialLayer = pkt.Spatial
spatialTracker = w.streamTrackerManager.GetTracker(pkt.Spatial)
if spatialTracker == nil {
spatialTracker = w.streamTrackerManager.AddTracker(pkt.Spatial)
}
}
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.WriteRTP(pkt, spatialLayer)
})
if redPktWriter != nil {
redPktWriter(pkt, spatialLayer)
}
if spatialTracker != nil {
spatialTracker.Observe(
pkt.Temporal,
len(pkt.RawPacket),
len(pkt.Packet.Payload),
pkt.Packet.Marker,
pkt.Packet.Timestamp,
pkt.DependencyDescriptor,
)
}
}
}
// closeTracks close all tracks from Receiver
func (w *WebRTCReceiver) closeTracks() {
w.connectionStats.Close()
w.streamTrackerManager.Close()
closeTrackSenders(w.downTrackSpreader.ResetAndGetDownTracks())
if w.onCloseHandler != nil {
w.onCloseHandler()
}
}
func (w *WebRTCReceiver) DebugInfo() map[string]interface{} {
isSimulcast := !w.isSVC
if ti := w.trackInfo.Load(); ti != nil {
isSimulcast = isSimulcast && len(ti.Layers) > 1
}
info := map[string]interface{}{
"SVC": w.isSVC,
"Simulcast": isSimulcast,
}
w.bufferMu.RLock()
upTrackInfo := make([]map[string]interface{}, 0, len(w.upTracks))
for layer, ut := range w.upTracks {
if ut != nil {
upTrackInfo = append(upTrackInfo, map[string]interface{}{
"Layer": layer,
"SSRC": ut.SSRC(),
"Msid": ut.Msid(),
"RID": ut.RID(),
})
}
}
w.bufferMu.RUnlock()
info["UpTracks"] = upTrackInfo
return info
}
func (w *WebRTCReceiver) GetPrimaryReceiverForRed() TrackReceiver {
if !w.isRED || w.closed.Load() {
return w
}
if w.primaryReceiver.Load() == nil {
pr := NewRedPrimaryReceiver(w, DownTrackSpreaderParams{
Threshold: w.lbThreshold,
Logger: w.logger,
})
if w.primaryReceiver.CompareAndSwap(nil, pr) {
w.bufferMu.Lock()
w.redPktWriter = pr.ForwardRTP
w.bufferMu.Unlock()
}
}
return w.primaryReceiver.Load()
}
func (w *WebRTCReceiver) GetRedReceiver() TrackReceiver {
if w.isRED || w.closed.Load() {
return w
}
if w.redReceiver.Load() == nil {
pr := NewRedReceiver(w, DownTrackSpreaderParams{
Threshold: w.lbThreshold,
Logger: w.logger,
})
if w.redReceiver.CompareAndSwap(nil, pr) {
w.bufferMu.Lock()
w.redPktWriter = pr.ForwardRTP
w.bufferMu.Unlock()
}
}
return w.redReceiver.Load()
}
func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 {
b := w.getBuffer(layer)
if b == nil {
return nil
}
if !w.isSVC {
return b.GetTemporalLayerFpsForSpatial(0)
}
return b.GetTemporalLayerFpsForSpatial(layer)
}
// closes all track senders in parallel, returns when all are closed
func closeTrackSenders(senders []TrackSender) {
wg := sync.WaitGroup{}
for _, dt := range senders {
dt := dt
wg.Add(1)
go func() {
defer wg.Done()
dt.Close()
}()
}
wg.Wait()
}