mirror of
https://github.com/livekit/livekit.git
synced 2026-04-14 04:25:44 +00:00
353 lines
9.8 KiB
Go
353 lines
9.8 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package sfu
|
|
|
|
import (
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pion/rtcp"
|
|
"github.com/pion/webrtc/v4"
|
|
|
|
"github.com/livekit/protocol/codecs/mime"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
|
|
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
|
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
|
|
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
|
|
)
|
|
|
|
var _ TrackReceiver = (*WebRTCReceiver)(nil)
|
|
|
|
// WebRTCReceiver receives a media track
|
|
type WebRTCReceiver struct {
|
|
*ReceiverBase
|
|
|
|
receiver *webrtc.RTPReceiver
|
|
onCloseHandler func()
|
|
|
|
onRTCP func([]rtcp.Packet)
|
|
|
|
upTracksMu sync.Mutex
|
|
upTracks [buffer.DefaultMaxLayerSpatial + 1]TrackRemote
|
|
|
|
connectionStats *connectionquality.ConnectionStats
|
|
onStatsUpdate func(w *WebRTCReceiver, stat *livekit.AnalyticsStat)
|
|
}
|
|
|
|
type ReceiverOpts func(w *WebRTCReceiver) *WebRTCReceiver
|
|
|
|
// WithPliThrottleConfig indicates minimum time(ms) between sending PLIs
|
|
func WithPliThrottleConfig(pliThrottleConfig PLIThrottleConfig) ReceiverOpts {
|
|
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
|
w.ReceiverBase.SetPLIThrottleConfig(pliThrottleConfig)
|
|
return w
|
|
}
|
|
}
|
|
|
|
// WithAudioConfig sets up parameters for active speaker detection
|
|
func WithAudioConfig(audioConfig AudioConfig) ReceiverOpts {
|
|
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
|
w.ReceiverBase.SetAudioConfig(audioConfig)
|
|
return w
|
|
}
|
|
}
|
|
|
|
func WithEnableRTPStreamRestartDetection(enable bool) ReceiverOpts {
|
|
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
|
w.ReceiverBase.SetEnableRTPStreamRestartDetection(enable)
|
|
return w
|
|
}
|
|
}
|
|
|
|
// WithLoadBalanceThreshold enables parallelization of packet writes when downTracks exceeds threshold
|
|
// Value should be between 3 and 150.
|
|
// For a server handling a few large rooms, use a smaller value (required to handle very large (250+ participant) rooms).
|
|
// For a server handling many small rooms, use a larger value or disable.
|
|
// Set to 0 (disabled) by default.
|
|
func WithLoadBalanceThreshold(downTracks int) ReceiverOpts {
|
|
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
|
w.ReceiverBase.SetLBThreshold(downTracks)
|
|
return w
|
|
}
|
|
}
|
|
|
|
func WithForwardStats(forwardStats *ForwardStats) ReceiverOpts {
|
|
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
|
w.ReceiverBase.SetForwardStats(forwardStats)
|
|
return w
|
|
}
|
|
}
|
|
|
|
// NewWebRTCReceiver creates a new webrtc track receiver
|
|
func NewWebRTCReceiver(
|
|
receiver *webrtc.RTPReceiver,
|
|
track TrackRemote,
|
|
trackInfo *livekit.TrackInfo,
|
|
logger logger.Logger,
|
|
onRTCP func([]rtcp.Packet),
|
|
streamTrackerManagerConfig StreamTrackerManagerConfig,
|
|
opts ...ReceiverOpts,
|
|
) *WebRTCReceiver {
|
|
w := &WebRTCReceiver{
|
|
receiver: receiver,
|
|
onRTCP: onRTCP,
|
|
}
|
|
|
|
w.ReceiverBase = NewReceiverBase(
|
|
ReceiverBaseParams{
|
|
TrackID: livekit.TrackID(track.ID()),
|
|
StreamID: track.StreamID(),
|
|
Kind: track.Kind(),
|
|
Codec: track.Codec(),
|
|
HeaderExtensions: receiver.GetParameters().HeaderExtensions,
|
|
Logger: logger,
|
|
StreamTrackerManagerConfig: streamTrackerManagerConfig,
|
|
StreamTrackerManagerListener: w,
|
|
IsSelfClosing: true,
|
|
OnClosed: w.onClosed,
|
|
},
|
|
trackInfo,
|
|
ReceiverCodecStateNormal,
|
|
)
|
|
|
|
for _, opt := range opts {
|
|
w = opt(w)
|
|
}
|
|
|
|
w.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
|
|
ReceiverProvider: w,
|
|
Logger: logger.WithValues("direction", "up"),
|
|
})
|
|
w.connectionStats.OnStatsUpdate(func(_cs *connectionquality.ConnectionStats, stat *livekit.AnalyticsStat) {
|
|
if w.onStatsUpdate != nil {
|
|
w.onStatsUpdate(w, stat)
|
|
}
|
|
})
|
|
codec := track.Codec()
|
|
w.connectionStats.Start(
|
|
mime.NormalizeMimeType(codec.MimeType),
|
|
// TODO: technically not correct to declare FEC on when RED. Need the primary codec's fmtp line to check.
|
|
mime.IsMimeTypeStringRED(codec.MimeType) || strings.Contains(strings.ToLower(codec.SDPFmtpLine), "useinbandfec=1"),
|
|
)
|
|
|
|
return w
|
|
}
|
|
|
|
func (w *WebRTCReceiver) OnStatsUpdate(fn func(w *WebRTCReceiver, stat *livekit.AnalyticsStat)) {
|
|
w.onStatsUpdate = fn
|
|
}
|
|
|
|
func (w *WebRTCReceiver) GetConnectionScoreAndQuality() (float32, livekit.ConnectionQuality) {
|
|
return w.connectionStats.GetScoreAndQuality()
|
|
}
|
|
|
|
func (w *WebRTCReceiver) ssrc(layer int) uint32 {
|
|
if track := w.upTracks[layer]; track != nil {
|
|
return uint32(track.SSRC())
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (w *WebRTCReceiver) AddUpTrack(track TrackRemote, buff *buffer.Buffer) error {
|
|
if w.isClosed.Load() {
|
|
return ErrReceiverClosed
|
|
}
|
|
|
|
layer := int32(0)
|
|
if w.Kind() == webrtc.RTPCodecTypeVideo && w.videoLayerMode != livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM {
|
|
layer = buffer.GetSpatialLayerForRid(w.Mime(), track.RID(), w.ReceiverBase.TrackInfo())
|
|
}
|
|
if layer < 0 {
|
|
w.ReceiverBase.Logger().Warnw(
|
|
"invalid layer", nil,
|
|
"rid", track.RID(),
|
|
"trackInfo", logger.Proto(w.ReceiverBase.TrackInfo()),
|
|
)
|
|
return ErrInvalidLayer
|
|
}
|
|
|
|
w.upTracksMu.Lock()
|
|
if w.upTracks[layer] != nil {
|
|
w.upTracksMu.Unlock()
|
|
return ErrDuplicateLayer
|
|
}
|
|
w.upTracks[layer] = track
|
|
w.upTracksMu.Unlock()
|
|
|
|
w.ReceiverBase.AddBuffer(buff, layer)
|
|
buff.OnRtcpFeedback(w.sendRTCP)
|
|
w.ReceiverBase.StartBuffer(buff, layer)
|
|
return nil
|
|
}
|
|
|
|
func (w *WebRTCReceiver) SetUpTrackPaused(paused bool) {
|
|
w.ReceiverBase.SetUpTrackPaused(paused)
|
|
|
|
w.connectionStats.UpdateMute(paused)
|
|
}
|
|
|
|
func (w *WebRTCReceiver) notifyMaxExpectedLayer(layer int32) {
|
|
ti := w.TrackInfo()
|
|
if ti == nil {
|
|
return
|
|
}
|
|
|
|
if w.Kind() == webrtc.RTPCodecTypeAudio || ti.Source == livekit.TrackSource_SCREEN_SHARE {
|
|
// screen share tracks have highly variable bitrate, do not use bit rate based quality for those
|
|
return
|
|
}
|
|
|
|
expectedBitrate := int64(0)
|
|
for _, vl := range buffer.GetVideoLayersForMimeType(w.Mime(), ti) {
|
|
if vl.SpatialLayer <= layer {
|
|
expectedBitrate += int64(vl.Bitrate)
|
|
}
|
|
}
|
|
|
|
w.connectionStats.AddBitrateTransition(expectedBitrate)
|
|
}
|
|
|
|
func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32) {
|
|
w.ReceiverBase.SetMaxExpectedSpatialLayer(layer)
|
|
|
|
w.notifyMaxExpectedLayer(layer)
|
|
|
|
if layer == buffer.InvalidLayerSpatial {
|
|
w.connectionStats.UpdateLayerMute(true)
|
|
} else {
|
|
w.connectionStats.UpdateLayerMute(false)
|
|
w.connectionStats.AddLayerTransition(w.ReceiverBase.StreamTrackerManager().DistanceToDesired())
|
|
}
|
|
}
|
|
|
|
// StreamTrackerManagerListener.OnAvailableLayersChanged
|
|
func (w *WebRTCReceiver) OnAvailableLayersChanged() {
|
|
w.connectionStats.AddLayerTransition(w.ReceiverBase.StreamTrackerManager().DistanceToDesired())
|
|
}
|
|
|
|
// StreamTrackerManagerListener.OnBitrateAvailabilityChanged
|
|
func (w *WebRTCReceiver) OnBitrateAvailabilityChanged() {
|
|
}
|
|
|
|
// StreamTrackerManagerListener.OnMaxPublishedLayerChanged
|
|
func (w *WebRTCReceiver) OnMaxPublishedLayerChanged(maxPublishedLayer int32) {
|
|
w.notifyMaxExpectedLayer(maxPublishedLayer)
|
|
w.connectionStats.AddLayerTransition(w.ReceiverBase.StreamTrackerManager().DistanceToDesired())
|
|
}
|
|
|
|
// StreamTrackerManagerListener.OnMaxTemporalLayerSeenChanged
|
|
func (w *WebRTCReceiver) OnMaxTemporalLayerSeenChanged(maxTemporalLayerSeen int32) {
|
|
w.connectionStats.AddLayerTransition(w.ReceiverBase.StreamTrackerManager().DistanceToDesired())
|
|
}
|
|
|
|
// StreamTrackerManagerListener.OnMaxAvailableLayerChanged
|
|
func (w *WebRTCReceiver) OnMaxAvailableLayerChanged(maxAvailableLayer int32) {
|
|
}
|
|
|
|
// StreamTrackerManagerListener.OnBitrateReport
|
|
func (w *WebRTCReceiver) OnBitrateReport(availableLayers []int32, bitrates Bitrates) {
|
|
w.connectionStats.AddLayerTransition(w.ReceiverBase.StreamTrackerManager().DistanceToDesired())
|
|
}
|
|
|
|
// OnCloseHandler method to be called on remote track removed
|
|
func (w *WebRTCReceiver) OnCloseHandler(fn func()) {
|
|
w.onCloseHandler = fn
|
|
}
|
|
|
|
func (w *WebRTCReceiver) sendRTCP(packets []rtcp.Packet) {
|
|
if packets == nil || w.isClosed.Load() {
|
|
return
|
|
}
|
|
|
|
if w.onRTCP != nil {
|
|
w.onRTCP(packets)
|
|
}
|
|
}
|
|
|
|
func (w *WebRTCReceiver) GetDeltaStats() map[uint32]*buffer.StreamStatsWithLayers {
|
|
buffers := w.ReceiverBase.GetAllBuffers()
|
|
deltaStats := make(map[uint32]*buffer.StreamStatsWithLayers, len(buffers))
|
|
for layer, buff := range buffers {
|
|
if buff == nil {
|
|
continue
|
|
}
|
|
|
|
sswl := buff.GetDeltaStats()
|
|
if sswl == nil {
|
|
continue
|
|
}
|
|
|
|
// patch buffer stats with correct layer
|
|
patched := make(map[int32]*rtpstats.RTPDeltaInfo, 1)
|
|
patched[int32(layer)] = sswl.Layers[0]
|
|
sswl.Layers = patched
|
|
|
|
deltaStats[w.ssrc(layer)] = sswl
|
|
}
|
|
|
|
return deltaStats
|
|
}
|
|
|
|
func (w *WebRTCReceiver) GetLastSenderReportTime() time.Time {
|
|
buffers := w.ReceiverBase.GetAllBuffers()
|
|
latestSRTime := time.Time{}
|
|
for _, buff := range buffers {
|
|
if buff == nil {
|
|
continue
|
|
}
|
|
|
|
srAt := buff.GetLastSenderReportTime()
|
|
if srAt.After(latestSRTime) {
|
|
latestSRTime = srAt
|
|
}
|
|
}
|
|
|
|
return latestSRTime
|
|
}
|
|
|
|
func (w *WebRTCReceiver) onClosed() {
|
|
w.connectionStats.Close()
|
|
|
|
if w.onCloseHandler != nil {
|
|
w.onCloseHandler()
|
|
}
|
|
}
|
|
|
|
func (w *WebRTCReceiver) DebugInfo() map[string]any {
|
|
info := w.ReceiverBase.DebugInfo()
|
|
|
|
w.upTracksMu.Lock()
|
|
upTrackInfo := make([]map[string]any, 0, len(w.upTracks))
|
|
for layer, ut := range w.upTracks {
|
|
if ut != nil {
|
|
upTrackInfo = append(upTrackInfo, map[string]any{
|
|
"Layer": layer,
|
|
"SSRC": ut.SSRC(),
|
|
"Msid": ut.Msid(),
|
|
"RID": ut.RID(),
|
|
})
|
|
}
|
|
}
|
|
w.upTracksMu.Unlock()
|
|
info["UpTracks"] = upTrackInfo
|
|
|
|
return info
|
|
}
|
|
|
|
// -----------------------------------------------------------
|