mirror of
https://github.com/livekit/livekit.git
synced 2026-04-04 21:15:44 +00:00
* Log write count atomic. * Return write count from WriteRTP. Apologies for the frequent changes on this. With relays, the down track could write to several targets. So, use count to have an accurate indication of how may subscribers were written to.
1044 lines
27 KiB
Go
1044 lines
27 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package sfu
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pion/rtcp"
|
|
"github.com/pion/webrtc/v4"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/livekit/mediatransportutil/pkg/bucket"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
"github.com/livekit/protocol/utils"
|
|
"github.com/livekit/protocol/utils/mono"
|
|
|
|
"github.com/livekit/livekit-server/pkg/sfu/audio"
|
|
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
|
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
|
|
"github.com/livekit/livekit-server/pkg/sfu/mime"
|
|
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
|
|
"github.com/livekit/livekit-server/pkg/sfu/streamtracker"
|
|
)
|
|
|
|
var (
|
|
ErrReceiverClosed = errors.New("receiver closed")
|
|
ErrDownTrackAlreadyExist = errors.New("DownTrack already exist")
|
|
ErrBufferNotFound = errors.New("buffer not found")
|
|
ErrDuplicateLayer = errors.New("duplicate layer")
|
|
ErrInvalidLayer = errors.New("invalid layer")
|
|
)
|
|
|
|
// --------------------------------------
|
|
|
|
type PLIThrottleConfig struct {
|
|
LowQuality time.Duration `yaml:"low_quality,omitempty"`
|
|
MidQuality time.Duration `yaml:"mid_quality,omitempty"`
|
|
HighQuality time.Duration `yaml:"high_quality,omitempty"`
|
|
}
|
|
|
|
var (
|
|
DefaultPLIThrottleConfig = PLIThrottleConfig{
|
|
LowQuality: 500 * time.Millisecond,
|
|
MidQuality: time.Second,
|
|
HighQuality: time.Second,
|
|
}
|
|
)
|
|
|
|
// --------------------------------------
|
|
|
|
type AudioConfig struct {
|
|
audio.AudioLevelConfig `yaml:",inline"`
|
|
|
|
// enable red encoding downtrack for opus only audio up track
|
|
ActiveREDEncoding bool `yaml:"active_red_encoding,omitempty"`
|
|
// enable proxying weakest subscriber loss to publisher in RTCP Receiver Report
|
|
EnableLossProxying bool `yaml:"enable_loss_proxying,omitempty"`
|
|
}
|
|
|
|
var (
|
|
DefaultAudioConfig = AudioConfig{
|
|
AudioLevelConfig: audio.DefaultAudioLevelConfig,
|
|
}
|
|
)
|
|
|
|
// --------------------------------------
|
|
|
|
type AudioLevelHandle func(level uint8, duration uint32)
|
|
|
|
type Bitrates [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerTemporal + 1]int64
|
|
|
|
type ReceiverCodecState int
|
|
|
|
const (
|
|
ReceiverCodecStateNormal ReceiverCodecState = iota
|
|
ReceiverCodecStateSuspended
|
|
ReceiverCodecStateInvalid
|
|
)
|
|
|
|
// TrackReceiver defines an interface receive media from remote peer
|
|
type TrackReceiver interface {
|
|
TrackID() livekit.TrackID
|
|
StreamID() string
|
|
|
|
// returns the initial codec of the receiver, it is determined by the track's codec
|
|
// and will not change if the codec changes during the session (publisher changes codec)
|
|
Codec() webrtc.RTPCodecParameters
|
|
Mime() mime.MimeType
|
|
VideoLayerMode() livekit.VideoLayer_Mode
|
|
HeaderExtensions() []webrtc.RTPHeaderExtensionParameter
|
|
IsClosed() bool
|
|
|
|
ReadRTP(buf []byte, layer uint8, esn uint64) (int, error)
|
|
GetLayeredBitrate() ([]int32, Bitrates)
|
|
|
|
GetAudioLevel() (float64, bool)
|
|
|
|
SendPLI(layer int32, force bool)
|
|
|
|
SetUpTrackPaused(paused bool)
|
|
SetMaxExpectedSpatialLayer(layer int32)
|
|
|
|
AddDownTrack(track TrackSender) error
|
|
DeleteDownTrack(participantID livekit.ParticipantID)
|
|
GetDownTracks() []TrackSender
|
|
|
|
DebugInfo() map[string]interface{}
|
|
|
|
TrackInfo() *livekit.TrackInfo
|
|
UpdateTrackInfo(ti *livekit.TrackInfo)
|
|
|
|
// Get primary receiver if this receiver represents a RED codec; otherwise it will return itself
|
|
GetPrimaryReceiverForRed() TrackReceiver
|
|
|
|
// Get red receiver for primary codec, used by forward red encodings for opus only codec
|
|
GetRedReceiver() TrackReceiver
|
|
|
|
GetTemporalLayerFpsForSpatial(layer int32) []float32
|
|
|
|
GetTrackStats() *livekit.RTPStats
|
|
|
|
// AddOnReady adds a function to be called when the receiver is ready, the callback
|
|
// could be called immediately if the receiver is ready when the callback is added
|
|
AddOnReady(func())
|
|
|
|
AddOnCodecStateChange(func(webrtc.RTPCodecParameters, ReceiverCodecState))
|
|
CodecState() ReceiverCodecState
|
|
|
|
// VideoSizes returns the video size parsed from rtp packet for each spatial layer.
|
|
VideoSizes() []buffer.VideoSize
|
|
}
|
|
|
|
type REDTransformer interface {
|
|
ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int32
|
|
ForwardRTCPSenderReport(
|
|
payloadType webrtc.PayloadType,
|
|
layer int32,
|
|
publisherSRData *livekit.RTCPSenderReportState,
|
|
)
|
|
ResyncDownTracks()
|
|
CanClose() bool
|
|
Close()
|
|
}
|
|
|
|
var _ TrackReceiver = (*WebRTCReceiver)(nil)
|
|
|
|
// WebRTCReceiver receives a media track
|
|
type WebRTCReceiver struct {
|
|
logger logger.Logger
|
|
|
|
pliThrottleConfig PLIThrottleConfig
|
|
audioConfig AudioConfig
|
|
|
|
trackID livekit.TrackID
|
|
streamID string
|
|
kind webrtc.RTPCodecType
|
|
receiver *webrtc.RTPReceiver
|
|
codec webrtc.RTPCodecParameters
|
|
codecState ReceiverCodecState
|
|
codecStateLock sync.Mutex
|
|
onCodecStateChange []func(webrtc.RTPCodecParameters, ReceiverCodecState)
|
|
isRED bool
|
|
onCloseHandler func()
|
|
closeOnce sync.Once
|
|
closed atomic.Bool
|
|
trackInfo atomic.Pointer[livekit.TrackInfo]
|
|
videoLayerMode livekit.VideoLayer_Mode
|
|
|
|
onRTCP func([]rtcp.Packet)
|
|
|
|
bufferMu sync.RWMutex
|
|
buffers [buffer.DefaultMaxLayerSpatial + 1]*buffer.Buffer
|
|
upTracks [buffer.DefaultMaxLayerSpatial + 1]TrackRemote
|
|
videoSizeMu sync.RWMutex
|
|
videoSizes [buffer.DefaultMaxLayerSpatial + 1]buffer.VideoSize
|
|
onVideoSizeChanged func()
|
|
rtt uint32
|
|
|
|
lbThreshold int
|
|
|
|
streamTrackerManager *StreamTrackerManager
|
|
|
|
downTrackSpreader *DownTrackSpreader
|
|
|
|
connectionStats *connectionquality.ConnectionStats
|
|
|
|
onStatsUpdate func(w *WebRTCReceiver, stat *livekit.AnalyticsStat)
|
|
onMaxLayerChange func(mimeType mime.MimeType, maxLayer int32)
|
|
|
|
redTransformer atomic.Value // redTransformer interface
|
|
|
|
forwardStats *ForwardStats
|
|
}
|
|
|
|
type ReceiverOpts func(w *WebRTCReceiver) *WebRTCReceiver
|
|
|
|
// WithPliThrottleConfig indicates minimum time(ms) between sending PLIs
|
|
func WithPliThrottleConfig(pliThrottleConfig PLIThrottleConfig) ReceiverOpts {
|
|
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
|
w.pliThrottleConfig = pliThrottleConfig
|
|
return w
|
|
}
|
|
}
|
|
|
|
// WithAudioConfig sets up parameters for active speaker detection
|
|
func WithAudioConfig(audioConfig AudioConfig) ReceiverOpts {
|
|
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
|
w.audioConfig = audioConfig
|
|
return w
|
|
}
|
|
}
|
|
|
|
// WithLoadBalanceThreshold enables parallelization of packet writes when downTracks exceeds threshold
|
|
// Value should be between 3 and 150.
|
|
// For a server handling a few large rooms, use a smaller value (required to handle very large (250+ participant) rooms).
|
|
// For a server handling many small rooms, use a larger value or disable.
|
|
// Set to 0 (disabled) by default.
|
|
func WithLoadBalanceThreshold(downTracks int) ReceiverOpts {
|
|
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
|
w.lbThreshold = downTracks
|
|
return w
|
|
}
|
|
}
|
|
|
|
func WithForwardStats(forwardStats *ForwardStats) ReceiverOpts {
|
|
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
|
w.forwardStats = forwardStats
|
|
return w
|
|
}
|
|
}
|
|
|
|
// NewWebRTCReceiver creates a new webrtc track receiver
|
|
func NewWebRTCReceiver(
|
|
receiver *webrtc.RTPReceiver,
|
|
track TrackRemote,
|
|
trackInfo *livekit.TrackInfo,
|
|
logger logger.Logger,
|
|
onRTCP func([]rtcp.Packet),
|
|
streamTrackerManagerConfig StreamTrackerManagerConfig,
|
|
opts ...ReceiverOpts,
|
|
) *WebRTCReceiver {
|
|
w := &WebRTCReceiver{
|
|
logger: logger,
|
|
receiver: receiver,
|
|
trackID: livekit.TrackID(track.ID()),
|
|
streamID: track.StreamID(),
|
|
codec: track.Codec(),
|
|
codecState: ReceiverCodecStateNormal,
|
|
kind: track.Kind(),
|
|
onRTCP: onRTCP,
|
|
isRED: mime.IsMimeTypeStringRED(track.Codec().MimeType),
|
|
videoLayerMode: buffer.GetVideoLayerModeForMimeType(mime.NormalizeMimeType(track.Codec().MimeType), trackInfo),
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
w = opt(w)
|
|
}
|
|
w.trackInfo.Store(utils.CloneProto(trackInfo))
|
|
|
|
w.downTrackSpreader = NewDownTrackSpreader(DownTrackSpreaderParams{
|
|
Threshold: w.lbThreshold,
|
|
Logger: logger,
|
|
})
|
|
|
|
w.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
|
|
ReceiverProvider: w,
|
|
Logger: w.logger.WithValues("direction", "up"),
|
|
})
|
|
w.connectionStats.OnStatsUpdate(func(_cs *connectionquality.ConnectionStats, stat *livekit.AnalyticsStat) {
|
|
if w.onStatsUpdate != nil {
|
|
w.onStatsUpdate(w, stat)
|
|
}
|
|
})
|
|
w.connectionStats.Start(
|
|
mime.NormalizeMimeType(w.codec.MimeType),
|
|
// TODO: technically not correct to declare FEC on when RED. Need the primary codec's fmtp line to check.
|
|
mime.IsMimeTypeStringRED(w.codec.MimeType) || strings.Contains(strings.ToLower(w.codec.SDPFmtpLine), "useinbandfec=1"),
|
|
)
|
|
|
|
w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.Mime(), w.codec.ClockRate, streamTrackerManagerConfig)
|
|
w.streamTrackerManager.SetListener(w)
|
|
|
|
return w
|
|
}
|
|
|
|
func (w *WebRTCReceiver) TrackInfo() *livekit.TrackInfo {
|
|
return w.trackInfo.Load()
|
|
}
|
|
|
|
func (w *WebRTCReceiver) UpdateTrackInfo(ti *livekit.TrackInfo) {
|
|
w.trackInfo.Store(utils.CloneProto(ti))
|
|
w.streamTrackerManager.UpdateTrackInfo(ti)
|
|
}
|
|
|
|
func (w *WebRTCReceiver) OnStatsUpdate(fn func(w *WebRTCReceiver, stat *livekit.AnalyticsStat)) {
|
|
w.onStatsUpdate = fn
|
|
}
|
|
|
|
func (w *WebRTCReceiver) OnMaxLayerChange(fn func(mimeType mime.MimeType, maxLayer int32)) {
|
|
w.bufferMu.Lock()
|
|
w.onMaxLayerChange = fn
|
|
w.bufferMu.Unlock()
|
|
}
|
|
|
|
func (w *WebRTCReceiver) getOnMaxLayerChange() func(mimeType mime.MimeType, maxLayer int32) {
|
|
w.bufferMu.RLock()
|
|
defer w.bufferMu.RUnlock()
|
|
|
|
return w.onMaxLayerChange
|
|
}
|
|
|
|
func (w *WebRTCReceiver) GetConnectionScoreAndQuality() (float32, livekit.ConnectionQuality) {
|
|
return w.connectionStats.GetScoreAndQuality()
|
|
}
|
|
|
|
func (w *WebRTCReceiver) IsClosed() bool {
|
|
return w.closed.Load()
|
|
}
|
|
|
|
func (w *WebRTCReceiver) SetRTT(rtt uint32) {
|
|
w.bufferMu.Lock()
|
|
if w.rtt == rtt {
|
|
w.bufferMu.Unlock()
|
|
return
|
|
}
|
|
|
|
w.rtt = rtt
|
|
buffers := w.buffers
|
|
w.bufferMu.Unlock()
|
|
|
|
for _, buff := range buffers {
|
|
if buff == nil {
|
|
continue
|
|
}
|
|
|
|
buff.SetRTT(rtt)
|
|
}
|
|
}
|
|
|
|
func (w *WebRTCReceiver) StreamID() string {
|
|
return w.streamID
|
|
}
|
|
|
|
func (w *WebRTCReceiver) TrackID() livekit.TrackID {
|
|
return w.trackID
|
|
}
|
|
|
|
func (w *WebRTCReceiver) ssrc(layer int) uint32 {
|
|
if track := w.upTracks[layer]; track != nil {
|
|
return uint32(track.SSRC())
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (w *WebRTCReceiver) Codec() webrtc.RTPCodecParameters {
|
|
return w.codec
|
|
}
|
|
|
|
func (w *WebRTCReceiver) Mime() mime.MimeType {
|
|
return mime.NormalizeMimeType(w.codec.MimeType)
|
|
}
|
|
|
|
func (w *WebRTCReceiver) VideoLayerMode() livekit.VideoLayer_Mode {
|
|
return w.videoLayerMode
|
|
}
|
|
|
|
func (w *WebRTCReceiver) HeaderExtensions() []webrtc.RTPHeaderExtensionParameter {
|
|
return w.receiver.GetParameters().HeaderExtensions
|
|
}
|
|
|
|
func (w *WebRTCReceiver) Kind() webrtc.RTPCodecType {
|
|
return w.kind
|
|
}
|
|
|
|
func (w *WebRTCReceiver) AddUpTrack(track TrackRemote, buff *buffer.Buffer) error {
|
|
if w.closed.Load() {
|
|
return ErrReceiverClosed
|
|
}
|
|
|
|
layer := int32(0)
|
|
if w.Kind() == webrtc.RTPCodecTypeVideo && w.videoLayerMode != livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM {
|
|
layer = buffer.GetSpatialLayerForRid(w.Mime(), track.RID(), w.trackInfo.Load())
|
|
}
|
|
if layer < 0 {
|
|
w.logger.Warnw(
|
|
"invalid layer", nil,
|
|
"rid", track.RID(),
|
|
"trackInfo", logger.Proto(w.trackInfo.Load()),
|
|
)
|
|
return ErrInvalidLayer
|
|
}
|
|
buff.SetLogger(w.logger.WithValues("layer", layer))
|
|
buff.SetAudioLevelParams(audio.AudioLevelParams{
|
|
Config: w.audioConfig.AudioLevelConfig,
|
|
})
|
|
buff.SetAudioLossProxying(w.audioConfig.EnableLossProxying)
|
|
buff.OnRtcpFeedback(w.sendRTCP)
|
|
buff.OnRtcpSenderReport(func() {
|
|
srData := buff.GetSenderReportData()
|
|
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
|
_ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData)
|
|
})
|
|
|
|
if rt := w.redTransformer.Load(); rt != nil {
|
|
rt.(REDTransformer).ForwardRTCPSenderReport(w.codec.PayloadType, layer, srData)
|
|
}
|
|
})
|
|
buff.OnVideoSizeChanged(func(videoSize []buffer.VideoSize) {
|
|
w.videoSizeMu.Lock()
|
|
if w.videoLayerMode == livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM {
|
|
copy(w.videoSizes[:], videoSize)
|
|
} else {
|
|
w.videoSizes[layer] = videoSize[0]
|
|
}
|
|
w.logger.Debugw("video size changed", "size", w.videoSizes)
|
|
cb := w.onVideoSizeChanged
|
|
w.videoSizeMu.Unlock()
|
|
|
|
if cb != nil {
|
|
cb()
|
|
}
|
|
})
|
|
|
|
if w.Kind() == webrtc.RTPCodecTypeVideo && layer == 0 {
|
|
buff.OnCodecChange(w.handleCodecChange)
|
|
}
|
|
|
|
var duration time.Duration
|
|
switch layer {
|
|
case 2:
|
|
duration = w.pliThrottleConfig.HighQuality
|
|
case 1:
|
|
duration = w.pliThrottleConfig.MidQuality
|
|
case 0:
|
|
duration = w.pliThrottleConfig.LowQuality
|
|
default:
|
|
duration = w.pliThrottleConfig.MidQuality
|
|
}
|
|
if duration != 0 {
|
|
buff.SetPLIThrottle(duration.Nanoseconds())
|
|
}
|
|
|
|
w.bufferMu.Lock()
|
|
if w.upTracks[layer] != nil {
|
|
w.bufferMu.Unlock()
|
|
return ErrDuplicateLayer
|
|
}
|
|
w.upTracks[layer] = track
|
|
w.buffers[layer] = buff
|
|
rtt := w.rtt
|
|
w.bufferMu.Unlock()
|
|
|
|
buff.SetRTT(rtt)
|
|
buff.SetPaused(w.streamTrackerManager.IsPaused())
|
|
|
|
go w.forwardRTP(layer, buff)
|
|
return nil
|
|
}
|
|
|
|
// SetUpTrackPaused indicates upstream will not be sending any data.
|
|
// this will reflect the "muted" status and will pause streamtracker to ensure we don't turn off
|
|
// the layer
|
|
func (w *WebRTCReceiver) SetUpTrackPaused(paused bool) {
|
|
w.streamTrackerManager.SetPaused(paused)
|
|
|
|
w.bufferMu.RLock()
|
|
for _, buff := range w.buffers {
|
|
if buff == nil {
|
|
continue
|
|
}
|
|
|
|
buff.SetPaused(paused)
|
|
}
|
|
w.bufferMu.RUnlock()
|
|
|
|
w.connectionStats.UpdateMute(paused)
|
|
}
|
|
|
|
func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error {
|
|
if w.closed.Load() {
|
|
return ErrReceiverClosed
|
|
}
|
|
|
|
if w.downTrackSpreader.HasDownTrack(track.SubscriberID()) {
|
|
w.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID())
|
|
}
|
|
|
|
track.UpTrackMaxPublishedLayerChange(w.streamTrackerManager.GetMaxPublishedLayer())
|
|
track.UpTrackMaxTemporalLayerSeenChange(w.streamTrackerManager.GetMaxTemporalLayerSeen())
|
|
|
|
w.downTrackSpreader.Store(track)
|
|
w.logger.Debugw("downtrack added", "subscriberID", track.SubscriberID())
|
|
return nil
|
|
}
|
|
|
|
func (w *WebRTCReceiver) GetDownTracks() []TrackSender {
|
|
return w.downTrackSpreader.GetDownTracks()
|
|
}
|
|
|
|
func (w *WebRTCReceiver) notifyMaxExpectedLayer(layer int32) {
|
|
ti := w.TrackInfo()
|
|
if ti == nil {
|
|
return
|
|
}
|
|
|
|
if w.Kind() == webrtc.RTPCodecTypeAudio || ti.Source == livekit.TrackSource_SCREEN_SHARE {
|
|
// screen share tracks have highly variable bitrate, do not use bit rate based quality for those
|
|
return
|
|
}
|
|
|
|
expectedBitrate := int64(0)
|
|
for _, vl := range buffer.GetVideoLayersForMimeType(w.Mime(), ti) {
|
|
if vl.SpatialLayer <= layer {
|
|
expectedBitrate += int64(vl.Bitrate)
|
|
}
|
|
}
|
|
|
|
w.connectionStats.AddBitrateTransition(expectedBitrate)
|
|
}
|
|
|
|
func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32) {
|
|
w.streamTrackerManager.SetMaxExpectedSpatialLayer(layer)
|
|
w.notifyMaxExpectedLayer(layer)
|
|
|
|
if layer == buffer.InvalidLayerSpatial {
|
|
w.connectionStats.UpdateLayerMute(true)
|
|
} else {
|
|
w.connectionStats.UpdateLayerMute(false)
|
|
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
|
|
}
|
|
}
|
|
|
|
// StreamTrackerManagerListener.OnAvailableLayersChanged
|
|
func (w *WebRTCReceiver) OnAvailableLayersChanged() {
|
|
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
|
dt.UpTrackLayersChange()
|
|
})
|
|
|
|
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
|
|
}
|
|
|
|
// StreamTrackerManagerListener.OnBitrateAvailabilityChanged
|
|
func (w *WebRTCReceiver) OnBitrateAvailabilityChanged() {
|
|
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
|
dt.UpTrackBitrateAvailabilityChange()
|
|
})
|
|
}
|
|
|
|
// StreamTrackerManagerListener.OnMaxPublishedLayerChanged
|
|
func (w *WebRTCReceiver) OnMaxPublishedLayerChanged(maxPublishedLayer int32) {
|
|
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
|
dt.UpTrackMaxPublishedLayerChange(maxPublishedLayer)
|
|
})
|
|
|
|
w.notifyMaxExpectedLayer(maxPublishedLayer)
|
|
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
|
|
}
|
|
|
|
// StreamTrackerManagerListener.OnMaxTemporalLayerSeenChanged
|
|
func (w *WebRTCReceiver) OnMaxTemporalLayerSeenChanged(maxTemporalLayerSeen int32) {
|
|
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
|
dt.UpTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen)
|
|
})
|
|
|
|
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
|
|
}
|
|
|
|
// StreamTrackerManagerListener.OnMaxAvailableLayerChanged
|
|
func (w *WebRTCReceiver) OnMaxAvailableLayerChanged(maxAvailableLayer int32) {
|
|
if onMaxLayerChange := w.getOnMaxLayerChange(); onMaxLayerChange != nil {
|
|
onMaxLayerChange(w.Mime(), maxAvailableLayer)
|
|
}
|
|
}
|
|
|
|
// StreamTrackerManagerListener.OnBitrateReport
|
|
func (w *WebRTCReceiver) OnBitrateReport(availableLayers []int32, bitrates Bitrates) {
|
|
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
|
dt.UpTrackBitrateReport(availableLayers, bitrates)
|
|
})
|
|
|
|
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
|
|
}
|
|
|
|
func (w *WebRTCReceiver) GetLayeredBitrate() ([]int32, Bitrates) {
|
|
return w.streamTrackerManager.GetLayeredBitrate()
|
|
}
|
|
|
|
// OnCloseHandler method to be called on remote track removed
|
|
func (w *WebRTCReceiver) OnCloseHandler(fn func()) {
|
|
w.onCloseHandler = fn
|
|
}
|
|
|
|
// DeleteDownTrack removes a DownTrack from a Receiver
|
|
func (w *WebRTCReceiver) DeleteDownTrack(subscriberID livekit.ParticipantID) {
|
|
if w.closed.Load() {
|
|
return
|
|
}
|
|
|
|
w.downTrackSpreader.Free(subscriberID)
|
|
w.logger.Debugw("downtrack deleted", "subscriberID", subscriberID)
|
|
}
|
|
|
|
func (w *WebRTCReceiver) sendRTCP(packets []rtcp.Packet) {
|
|
if packets == nil || w.closed.Load() {
|
|
return
|
|
}
|
|
|
|
if w.onRTCP != nil {
|
|
w.onRTCP(packets)
|
|
}
|
|
}
|
|
|
|
func (w *WebRTCReceiver) SendPLI(layer int32, force bool) {
|
|
// SVC-TODO : should send LRR (Layer Refresh Request) instead of PLI
|
|
buff := w.getBuffer(layer)
|
|
if buff == nil {
|
|
return
|
|
}
|
|
|
|
buff.SendPLI(force)
|
|
}
|
|
|
|
func (w *WebRTCReceiver) getBuffer(layer int32) *buffer.Buffer {
|
|
w.bufferMu.RLock()
|
|
defer w.bufferMu.RUnlock()
|
|
|
|
return w.getBufferLocked(layer)
|
|
}
|
|
|
|
func (w *WebRTCReceiver) getBufferLocked(layer int32) *buffer.Buffer {
|
|
// for svc codecs, use layer = 0 always.
|
|
// spatial layers are in-built and handled by single buffer
|
|
if w.videoLayerMode == livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM {
|
|
layer = 0
|
|
}
|
|
|
|
if layer < 0 || int(layer) >= len(w.buffers) {
|
|
return nil
|
|
}
|
|
|
|
return w.buffers[layer]
|
|
}
|
|
|
|
func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) {
|
|
b := w.getBuffer(int32(layer))
|
|
if b == nil {
|
|
return 0, ErrBufferNotFound
|
|
}
|
|
|
|
return b.GetPacket(buf, esn)
|
|
}
|
|
|
|
func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats {
|
|
w.bufferMu.RLock()
|
|
defer w.bufferMu.RUnlock()
|
|
|
|
stats := make([]*livekit.RTPStats, 0, len(w.buffers))
|
|
for _, buff := range w.buffers {
|
|
if buff == nil {
|
|
continue
|
|
}
|
|
|
|
sswl := buff.GetStats()
|
|
if sswl == nil {
|
|
continue
|
|
}
|
|
|
|
stats = append(stats, sswl)
|
|
}
|
|
|
|
return rtpstats.AggregateRTPStats(stats)
|
|
}
|
|
|
|
func (w *WebRTCReceiver) GetAudioLevel() (float64, bool) {
|
|
if w.Kind() == webrtc.RTPCodecTypeVideo {
|
|
return 0, false
|
|
}
|
|
|
|
w.bufferMu.RLock()
|
|
defer w.bufferMu.RUnlock()
|
|
|
|
for _, buff := range w.buffers {
|
|
if buff == nil {
|
|
continue
|
|
}
|
|
|
|
return buff.GetAudioLevel()
|
|
}
|
|
|
|
return 0, false
|
|
}
|
|
|
|
func (w *WebRTCReceiver) GetDeltaStats() map[uint32]*buffer.StreamStatsWithLayers {
|
|
w.bufferMu.RLock()
|
|
defer w.bufferMu.RUnlock()
|
|
|
|
deltaStats := make(map[uint32]*buffer.StreamStatsWithLayers, len(w.buffers))
|
|
|
|
for layer, buff := range w.buffers {
|
|
if buff == nil {
|
|
continue
|
|
}
|
|
|
|
sswl := buff.GetDeltaStats()
|
|
if sswl == nil {
|
|
continue
|
|
}
|
|
|
|
// patch buffer stats with correct layer
|
|
patched := make(map[int32]*rtpstats.RTPDeltaInfo, 1)
|
|
patched[int32(layer)] = sswl.Layers[0]
|
|
sswl.Layers = patched
|
|
|
|
deltaStats[w.ssrc(layer)] = sswl
|
|
}
|
|
|
|
return deltaStats
|
|
}
|
|
|
|
func (w *WebRTCReceiver) GetLastSenderReportTime() time.Time {
|
|
w.bufferMu.RLock()
|
|
defer w.bufferMu.RUnlock()
|
|
|
|
latestSRTime := time.Time{}
|
|
for _, buff := range w.buffers {
|
|
if buff == nil {
|
|
continue
|
|
}
|
|
|
|
srAt := buff.GetLastSenderReportTime()
|
|
if srAt.After(latestSRTime) {
|
|
latestSRTime = srAt
|
|
}
|
|
}
|
|
|
|
return latestSRTime
|
|
}
|
|
|
|
func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) {
|
|
numPacketsForwarded := 0
|
|
numPacketsDropped := 0
|
|
defer func() {
|
|
w.closeOnce.Do(func() {
|
|
w.closed.Store(true)
|
|
w.closeTracks()
|
|
if rt := w.redTransformer.Load(); rt != nil {
|
|
rt.(REDTransformer).Close()
|
|
}
|
|
})
|
|
|
|
w.streamTrackerManager.RemoveTracker(layer)
|
|
if w.videoLayerMode == livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM {
|
|
w.streamTrackerManager.RemoveAllTrackers()
|
|
}
|
|
|
|
w.logger.Debugw(
|
|
"closing forwarder",
|
|
"layer", layer,
|
|
"numPacketsForwarded", numPacketsForwarded,
|
|
"numPacketsDropped", numPacketsDropped,
|
|
)
|
|
}()
|
|
|
|
var spatialTrackers [buffer.DefaultMaxLayerSpatial + 1]streamtracker.StreamTrackerWorker
|
|
if layer < 0 || int(layer) >= len(spatialTrackers) {
|
|
w.logger.Errorw("invalid layer", nil, "layer", layer)
|
|
return
|
|
}
|
|
|
|
pktBuf := make([]byte, bucket.MaxPktSize)
|
|
for {
|
|
pkt, err := buff.ReadExtended(pktBuf)
|
|
if err == io.EOF {
|
|
return
|
|
}
|
|
dequeuedAt := mono.UnixNano()
|
|
|
|
if pkt.Packet.PayloadType != uint8(w.codec.PayloadType) {
|
|
// drop packets as we don't support codec fallback directly
|
|
w.logger.Debugw(
|
|
"dropping packet - payload mismatch",
|
|
"packetPayloadType", pkt.Packet.PayloadType,
|
|
"payloadType", w.codec.PayloadType,
|
|
)
|
|
numPacketsDropped++
|
|
continue
|
|
}
|
|
|
|
spatialLayer := layer
|
|
if pkt.Spatial >= 0 {
|
|
// svc packet, take spatial layer info from packet
|
|
spatialLayer = pkt.Spatial
|
|
}
|
|
if int(spatialLayer) >= len(spatialTrackers) {
|
|
w.logger.Errorw(
|
|
"unexpected spatial layer", nil,
|
|
"spatialLayer", spatialLayer,
|
|
"pktSpatialLayer", pkt.Spatial,
|
|
)
|
|
numPacketsDropped++
|
|
continue
|
|
}
|
|
|
|
var writeCount atomic.Int32
|
|
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
|
writeCount.Add(dt.WriteRTP(pkt, spatialLayer))
|
|
})
|
|
|
|
if rt := w.redTransformer.Load(); rt != nil {
|
|
writeCount.Add(rt.(REDTransformer).ForwardRTP(pkt, spatialLayer))
|
|
}
|
|
|
|
// track delay/jitter
|
|
if writeCount.Load() > 0 && w.forwardStats != nil {
|
|
if latency, isHigh := w.forwardStats.Update(pkt.Arrival, mono.UnixNano()); isHigh {
|
|
w.logger.Infow(
|
|
"high forwarding latency",
|
|
"latency", latency,
|
|
"writeCount", writeCount.Load(),
|
|
"queuingLatency", dequeuedAt-pkt.Arrival,
|
|
"isOutOfOrder", pkt.IsOutOfOrder,
|
|
)
|
|
}
|
|
}
|
|
|
|
// track video layers
|
|
if w.Kind() == webrtc.RTPCodecTypeVideo {
|
|
if spatialTrackers[spatialLayer] == nil {
|
|
spatialTrackers[spatialLayer] = w.streamTrackerManager.GetTracker(spatialLayer)
|
|
if spatialTrackers[spatialLayer] == nil {
|
|
if w.videoLayerMode == livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM && pkt.DependencyDescriptor != nil {
|
|
w.streamTrackerManager.AddDependencyDescriptorTrackers()
|
|
}
|
|
spatialTrackers[spatialLayer] = w.streamTrackerManager.AddTracker(spatialLayer)
|
|
}
|
|
}
|
|
if spatialTrackers[spatialLayer] != nil {
|
|
spatialTrackers[spatialLayer].Observe(
|
|
pkt.Temporal,
|
|
len(pkt.RawPacket),
|
|
len(pkt.Packet.Payload),
|
|
pkt.Packet.Marker,
|
|
pkt.Packet.Timestamp,
|
|
pkt.DependencyDescriptor,
|
|
)
|
|
}
|
|
}
|
|
|
|
numPacketsForwarded++
|
|
}
|
|
}
|
|
|
|
// closeTracks close all tracks from Receiver
|
|
func (w *WebRTCReceiver) closeTracks() {
|
|
w.connectionStats.Close()
|
|
w.streamTrackerManager.Close()
|
|
|
|
closeTrackSenders(w.downTrackSpreader.ResetAndGetDownTracks())
|
|
|
|
if w.onCloseHandler != nil {
|
|
w.onCloseHandler()
|
|
}
|
|
}
|
|
|
|
func (w *WebRTCReceiver) DebugInfo() map[string]interface{} {
|
|
var videoLayerMode livekit.VideoLayer_Mode
|
|
if ti := w.trackInfo.Load(); ti != nil {
|
|
videoLayerMode = buffer.GetVideoLayerModeForMimeType(w.Mime(), ti)
|
|
}
|
|
info := map[string]interface{}{
|
|
"VideoLayerMode": videoLayerMode.String(),
|
|
}
|
|
|
|
w.bufferMu.RLock()
|
|
upTrackInfo := make([]map[string]interface{}, 0, len(w.upTracks))
|
|
for layer, ut := range w.upTracks {
|
|
if ut != nil {
|
|
upTrackInfo = append(upTrackInfo, map[string]interface{}{
|
|
"Layer": layer,
|
|
"SSRC": ut.SSRC(),
|
|
"Msid": ut.Msid(),
|
|
"RID": ut.RID(),
|
|
})
|
|
}
|
|
}
|
|
w.bufferMu.RUnlock()
|
|
info["UpTracks"] = upTrackInfo
|
|
|
|
return info
|
|
}
|
|
|
|
func (w *WebRTCReceiver) GetPrimaryReceiverForRed() TrackReceiver {
|
|
w.bufferMu.Lock()
|
|
defer w.bufferMu.Unlock()
|
|
|
|
if !w.isRED || w.closed.Load() {
|
|
return w
|
|
}
|
|
|
|
rt := w.redTransformer.Load()
|
|
if rt == nil {
|
|
pr := NewRedPrimaryReceiver(w, DownTrackSpreaderParams{
|
|
Threshold: w.lbThreshold,
|
|
Logger: w.logger,
|
|
})
|
|
w.redTransformer.Store(pr)
|
|
return pr
|
|
} else {
|
|
if pr, ok := rt.(*RedPrimaryReceiver); ok {
|
|
return pr
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *WebRTCReceiver) GetRedReceiver() TrackReceiver {
|
|
w.bufferMu.Lock()
|
|
defer w.bufferMu.Unlock()
|
|
|
|
if w.isRED || w.closed.Load() {
|
|
return w
|
|
}
|
|
|
|
rt := w.redTransformer.Load()
|
|
if rt == nil {
|
|
pr := NewRedReceiver(w, DownTrackSpreaderParams{
|
|
Threshold: w.lbThreshold,
|
|
Logger: w.logger,
|
|
})
|
|
w.redTransformer.Store(pr)
|
|
return pr
|
|
} else {
|
|
if pr, ok := rt.(*RedReceiver); ok {
|
|
return pr
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 {
|
|
b := w.getBuffer(layer)
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
|
|
if w.videoLayerMode != livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM {
|
|
return b.GetTemporalLayerFpsForSpatial(0)
|
|
}
|
|
|
|
return b.GetTemporalLayerFpsForSpatial(layer)
|
|
}
|
|
|
|
func (w *WebRTCReceiver) AddOnReady(fn func()) {
|
|
// webRTCReceiver is always ready after created
|
|
fn()
|
|
}
|
|
|
|
func (w *WebRTCReceiver) handleCodecChange(newCodec webrtc.RTPCodecParameters) {
|
|
// we don't support the codec fallback directly, set the codec state to invalid once it happens
|
|
w.SetCodecState(ReceiverCodecStateInvalid)
|
|
}
|
|
|
|
func (w *WebRTCReceiver) AddOnCodecStateChange(f func(webrtc.RTPCodecParameters, ReceiverCodecState)) {
|
|
w.codecStateLock.Lock()
|
|
w.onCodecStateChange = append(w.onCodecStateChange, f)
|
|
w.codecStateLock.Unlock()
|
|
}
|
|
|
|
func (w *WebRTCReceiver) CodecState() ReceiverCodecState {
|
|
w.codecStateLock.Lock()
|
|
defer w.codecStateLock.Unlock()
|
|
|
|
return w.codecState
|
|
}
|
|
|
|
func (w *WebRTCReceiver) SetCodecState(state ReceiverCodecState) {
|
|
w.codecStateLock.Lock()
|
|
if w.codecState == state || w.codecState == ReceiverCodecStateInvalid {
|
|
w.codecStateLock.Unlock()
|
|
return
|
|
}
|
|
|
|
w.codecState = state
|
|
fns := w.onCodecStateChange
|
|
w.codecStateLock.Unlock()
|
|
|
|
for _, f := range fns {
|
|
f(w.codec, state)
|
|
}
|
|
}
|
|
|
|
func (w *WebRTCReceiver) VideoSizes() []buffer.VideoSize {
|
|
var sizes []buffer.VideoSize
|
|
w.videoSizeMu.RLock()
|
|
defer w.videoSizeMu.RUnlock()
|
|
for _, v := range w.videoSizes {
|
|
if v.Width == 0 || v.Height == 0 {
|
|
break
|
|
}
|
|
sizes = append(sizes, v)
|
|
}
|
|
|
|
return sizes
|
|
}
|
|
|
|
func (w *WebRTCReceiver) OnVideoSizeChanged(f func()) {
|
|
w.videoSizeMu.Lock()
|
|
w.onVideoSizeChanged = f
|
|
w.videoSizeMu.Unlock()
|
|
}
|
|
|
|
// -----------------------------------------------------------
|
|
|
|
// closes all track senders in parallel, returns when all are closed
|
|
func closeTrackSenders(senders []TrackSender) {
|
|
wg := sync.WaitGroup{}
|
|
for _, dt := range senders {
|
|
dt := dt
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
dt.Close()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|