Files
livekit/pkg/sfu/buffer/buffer_base.go
Raja Subramanian 370e0a4d52 Set up audio config in audio level module when config is updated. (#4290)
* Set up audio config in audio level module when config is updated.

It is possible to get audio config after bind (bind is where the audio
level module is created) for remote tracks. So, split out setting audio
level config in audio level module and invoke it when config is updated.

* coderabbit review

* prevent divide-by-0

* not active before config
2026-02-05 09:35:26 +05:30

1556 lines
38 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"errors"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/gammazero/deque"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v4"
"go.uber.org/atomic"
"github.com/livekit/livekit-server/pkg/sfu/audio"
act "github.com/livekit/livekit-server/pkg/sfu/rtpextension/abscapturetime"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/mediatransportutil/pkg/bucket"
"github.com/livekit/mediatransportutil/pkg/nack"
"github.com/livekit/protocol/codecs/mime"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
)
var (
ExtPacketFactory = &sync.Pool{
New: func() any {
return &ExtPacket{}
},
}
)
func ReleaseExtPacket(extPkt *ExtPacket) {
if extPkt == nil {
return
}
ReleaseExtDependencyDescriptor(extPkt.DependencyDescriptor)
*extPkt = ExtPacket{}
ExtPacketFactory.Put(extPkt)
}
// --------------------------------------
type ExtPacket struct {
VideoLayer
Arrival int64
ExtSequenceNumber uint64
ExtTimestamp uint64
Packet *rtp.Packet
Payload any
IsKeyFrame bool
RawPacket []byte
DependencyDescriptor *ExtDependencyDescriptor
AbsCaptureTimeExt *act.AbsCaptureTime
IsOutOfOrder bool
IsBuffered bool
}
// VideoSize represents video resolution
type VideoSize struct {
Width uint32
Height uint32
}
type BufferProvider interface {
SetLogger(lgr logger.Logger)
SetAudioLevelConfig(audioLevelConfig audio.AudioLevelConfig)
SetStreamRestartDetection(enable bool)
SetPLIThrottle(duration int64)
SetRTT(rtt uint32)
SetPaused(paused bool)
SendPLI(force bool)
ReadExtended(buf []byte) (*ExtPacket, error)
GetPacket(buf []byte, esn uint64) (int, error)
GetAudioLevel() (float64, bool)
GetTemporalLayerFpsForSpatial(layer int32) []float32
GetStats() *livekit.RTPStats
GetDeltaStats() *StreamStatsWithLayers
GetDeltaStatsLite() *rtpstats.RTPDeltaInfoLite
GetLastSenderReportTime() time.Time
GetNACKPairs() []rtcp.NackPair
SetSenderReportData(srData *livekit.RTCPSenderReportState)
GetSenderReportData() *livekit.RTCPSenderReportState
OnRtcpSenderReport(fn func())
OnFpsChanged(fn func())
OnVideoSizeChanged(fn func([]VideoSize))
OnCodecChange(fn func(webrtc.RTPCodecParameters))
OnStreamRestart(fn func(string))
StartKeyFrameSeeder()
StopKeyFrameSeeder()
HandleIncomingPacket(
rawPkt []byte,
rtpPacket *rtp.Packet,
arrivalTime int64,
isBuffered bool,
isRTX bool,
skippedSeqs []uint16,
oobSequenceNumber uint16,
) (uint64, error)
RestartStream(reason string)
CloseWithReason(reason string) (*livekit.RTPStats, error)
}
const (
bucketCapCheckInterval = 1e9
)
type BufferBaseParams struct {
SSRC uint32
MaxVideoPkts int
MaxAudioPkts int
LoggerComponents []string
SendPLI func()
IsReportingEnabled bool
IsOOBSequenceNumber bool
IsDDRestartEnabled bool
}
type BufferBase struct {
sync.RWMutex
params BufferBaseParams
readCond *sync.Cond
bucket *bucket.Bucket[uint64, uint16]
lastBucketCapCheckAt int64
nacker *nack.NackQueue
rtpStatsLite *rtpstats.RTPStatsReceiverLite
liteStatsSnapshotId uint32
extPackets deque.Deque[*ExtPacket]
codecType webrtc.RTPCodecType
closeOnce sync.Once
clockRate uint32
mime mime.MimeType
rtpParameters webrtc.RTPParameters
payloadType uint8
rtxPayloadType uint8
snRangeMap *utils.RangeMap[uint64, uint64]
audioLevelConfig audio.AudioLevelConfig
audioLevel *audio.AudioLevel
audioLevelExtID uint8
enableStreamRestartDetection bool
pliThrottle int64
rtpStats *rtpstats.RTPStatsReceiver
ppsSnapshotId uint32
rrSnapshotId uint32
deltaStatsSnapshotId uint32
// callbacks
onRtcpSenderReport func()
onFpsChanged func()
onVideoSizeChanged func([]VideoSize)
onCodecChange func(webrtc.RTPCodecParameters)
onStreamRestart func(string)
// video size tracking for multiple spatial layers
currentVideoSize [DefaultMaxLayerSpatial + 1]VideoSize
logger logger.Logger
// dependency descriptor
ddExtID uint8
ddParser *DependencyDescriptorParser
isPaused bool
frameRateCalculator [DefaultMaxLayerSpatial + 1]FrameRateCalculator
frameRateCalculated bool
packetNotFoundCount atomic.Uint32
packetTooOldCount atomic.Uint32
extPacketTooMuchCount atomic.Uint32
absCaptureTimeExtID uint8
keyFrameSeederGeneration atomic.Int32
isRestartPending bool
isClosed atomic.Bool
}
func NewBufferBase(params BufferBaseParams) *BufferBase {
l := logger.GetLogger() // will be reset with correct context via SetLogger
for _, component := range params.LoggerComponents {
l = l.WithComponent(component)
}
l = l.WithValues("ssrc", params.SSRC)
b := &BufferBase{
params: params,
lastBucketCapCheckAt: mono.UnixNano(),
snRangeMap: utils.NewRangeMap[uint64, uint64](100),
pliThrottle: int64(500 * time.Millisecond),
logger: l,
}
b.readCond = sync.NewCond(&b.RWMutex)
b.extPackets.SetBaseCap(128)
return b
}
func (b *BufferBase) SSRC() uint32 {
return b.params.SSRC
}
func (b *BufferBase) MaxVideoPkts() int {
return b.params.MaxVideoPkts
}
func (b *BufferBase) MaxAudioPkts() int {
return b.params.MaxAudioPkts
}
func (b *BufferBase) SetLogger(lgr logger.Logger) {
b.Lock()
defer b.Unlock()
for _, component := range b.params.LoggerComponents {
lgr = lgr.WithComponent(component)
}
lgr = lgr.WithValues("ssrc", b.params.SSRC)
b.logger = lgr
if b.rtpStats != nil {
b.rtpStats.SetLogger(b.logger)
}
if b.rtpStatsLite != nil {
b.rtpStatsLite.SetLogger(b.logger)
}
}
func (b *BufferBase) Bind(rtpParameters webrtc.RTPParameters, codec webrtc.RTPCodecCapability, bitrate int) error {
b.Lock()
defer b.Unlock()
return b.BindLocked(rtpParameters, codec, bitrate)
}
func (b *BufferBase) BindLocked(rtpParameters webrtc.RTPParameters, codec webrtc.RTPCodecCapability, bitrate int) error {
b.logger.Debugw("binding track")
if codec.ClockRate == 0 {
b.logger.Warnw("invalid codec", nil, "rtpParameters", rtpParameters, "codec", codec, "bitrate", bitrate)
return errInvalidCodec
}
b.setupRTPStats(codec.ClockRate)
b.clockRate = codec.ClockRate
b.mime = mime.NormalizeMimeType(codec.MimeType)
b.rtpParameters = rtpParameters
for _, codecParameter := range rtpParameters.Codecs {
if mime.IsMimeTypeStringEqual(codecParameter.MimeType, codec.MimeType) {
b.payloadType = uint8(codecParameter.PayloadType)
break
}
}
if b.payloadType == 0 && !mime.IsMimeTypeStringEqual(codec.MimeType, webrtc.MimeTypePCMU) {
b.logger.Warnw(
"could not find payload type for codec", nil,
"codec", codec.MimeType,
"rtpParameters", rtpParameters,
)
b.payloadType = uint8(rtpParameters.Codecs[0].PayloadType)
}
// find RTX payload type
for _, codec := range rtpParameters.Codecs {
if mime.IsMimeTypeStringRTX(codec.MimeType) && strings.Contains(codec.SDPFmtpLine, fmt.Sprintf("apt=%d", b.payloadType)) {
b.rtxPayloadType = uint8(codec.PayloadType)
break
}
}
for _, ext := range rtpParameters.HeaderExtensions {
switch ext.URI {
case dd.ExtensionURI:
if b.ddExtID != 0 {
b.logger.Warnw(
"multiple dependency descriptor extensions found", nil,
"id", ext.ID,
"previous", b.ddExtID,
)
continue
}
b.ddExtID = uint8(ext.ID)
b.createDDParserAndFrameRateCalculator()
case sdp.AudioLevelURI:
b.audioLevelExtID = uint8(ext.ID)
b.audioLevel = audio.NewAudioLevel(audio.AudioLevelParams{
ClockRate: b.clockRate,
})
b.audioLevel.SetConfig(b.audioLevelConfig)
case act.AbsCaptureTimeURI:
b.absCaptureTimeExtID = uint8(ext.ID)
}
}
switch {
case mime.IsMimeTypeAudio(b.mime):
b.codecType = webrtc.RTPCodecTypeAudio
b.bucket = bucket.NewBucket[uint64, uint16](
InitPacketBufferSizeAudio,
bucket.RTPMaxPktSize,
bucket.RTPSeqNumOffset,
)
case mime.IsMimeTypeVideo(b.mime):
b.codecType = webrtc.RTPCodecTypeVideo
b.bucket = bucket.NewBucket[uint64, uint16](
InitPacketBufferSizeVideo,
bucket.RTPMaxPktSize,
bucket.RTPSeqNumOffset,
)
if b.frameRateCalculator[0] == nil {
b.createFrameRateCalculator()
}
if bitrate > 0 {
pps := bitrate / 8 / 1200
for pps > b.bucket.Capacity() {
if b.bucket.Grow() >= b.params.MaxVideoPkts {
break
}
}
}
default:
b.codecType = webrtc.RTPCodecType(0)
}
for _, fb := range codec.RTCPFeedback {
switch fb.Type {
case webrtc.TypeRTCPFBGoogREMB:
b.logger.Debugw("Setting feedback", "type", webrtc.TypeRTCPFBGoogREMB)
b.logger.Debugw("REMB not supported, RTCP feedback will not be generated")
case webrtc.TypeRTCPFBNACK:
// pion uses a single mediaengine to manage negotiated codecs of peerconnection, that means we can't have different
// codec settings at track level for same codec type, so enable nack for all audio receivers but don't create nack queue
// for red codec.
if b.mime == mime.MimeTypeRED {
break
}
b.logger.Debugw("Setting feedback", "type", webrtc.TypeRTCPFBNACK)
b.nacker = nack.NewNACKQueue(nack.NackQueueParamsDefault)
}
}
if b.nacker == nil && b.params.IsOOBSequenceNumber {
b.nacker = nack.NewNACKQueue(nack.NackQueueParamsDefault)
}
b.StartKeyFrameSeeder()
return nil
}
func (b *BufferBase) CloseWithReason(reason string) (stats *livekit.RTPStats, err error) {
b.closeOnce.Do(func() {
b.isClosed.Store(true)
b.StopKeyFrameSeeder()
b.Lock()
stats, _ = b.stopRTPStats(reason)
b.readCond.Broadcast()
b.Unlock()
go b.flushExtPackets()
})
return
}
func (b *BufferBase) IsClosed() bool {
return b.isClosed.Load()
}
func (b *BufferBase) SetPaused(paused bool) {
b.Lock()
defer b.Unlock()
b.isPaused = paused
}
func (b *BufferBase) SetAudioLevelConfig(audioLevelConfig audio.AudioLevelConfig) {
b.Lock()
defer b.Unlock()
b.audioLevelConfig = audioLevelConfig
if b.audioLevel != nil {
b.audioLevel.SetConfig(b.audioLevelConfig)
}
}
func (b *BufferBase) SetStreamRestartDetection(enable bool) {
b.Lock()
defer b.Unlock()
b.enableStreamRestartDetection = enable
}
func (b *BufferBase) setupRTPStats(clockRate uint32) {
b.rtpStats = rtpstats.NewRTPStatsReceiver(rtpstats.RTPStatsParams{})
b.rtpStats.SetLogger(b.logger)
b.rtpStats.SetClockRate(clockRate)
b.ppsSnapshotId = b.rtpStats.NewSnapshotId()
if b.params.IsReportingEnabled {
b.rrSnapshotId = b.rtpStats.NewSnapshotId()
b.deltaStatsSnapshotId = b.rtpStats.NewSnapshotId()
}
if b.params.IsOOBSequenceNumber {
b.rtpStatsLite = rtpstats.NewRTPStatsReceiverLite(rtpstats.RTPStatsParams{})
b.rtpStatsLite.SetLogger(b.logger)
b.rtpStatsLite.SetClockRate(clockRate)
b.liteStatsSnapshotId = b.rtpStatsLite.NewSnapshotLiteId()
}
}
func (b *BufferBase) stopRTPStats(reason string) (stats *livekit.RTPStats, statsLite *livekit.RTPStats) {
if b.rtpStats != nil {
b.rtpStats.Stop()
stats = b.rtpStats.ToProto()
}
if b.rtpStatsLite != nil {
b.rtpStatsLite.Stop()
statsLite = b.rtpStatsLite.ToProto()
}
b.logger.Debugw(
"rtp stats",
"direction", "upstream",
"stats", b.rtpStats,
"statsLite", b.rtpStatsLite,
"reason", reason,
)
return
}
func (b *BufferBase) RestartStream(reason string) {
b.Lock()
defer b.Unlock()
b.restartStreamLocked(reason, false)
b.readCond.Broadcast()
}
func (b *BufferBase) restartStreamLocked(reason string, isDetected bool) {
b.logger.Infow("stream restart", "reason", reason)
// stop
b.StopKeyFrameSeeder()
b.stopRTPStats("stream-restart")
b.flushExtPacketsLocked()
// restart
b.snRangeMap = utils.NewRangeMap[uint64, uint64](100)
b.setupRTPStats(b.clockRate)
b.bucket.ResyncOnNextPacket()
b.lastBucketCapCheckAt = mono.UnixNano()
if b.nacker != nil {
b.nacker = nack.NewNACKQueue(nack.NackQueueParamsDefault)
}
if b.audioLevel != nil {
b.audioLevel = audio.NewAudioLevel(audio.AudioLevelParams{
ClockRate: b.clockRate,
})
b.audioLevel.SetConfig(b.audioLevelConfig)
}
if b.ddExtID != 0 {
b.createDDParserAndFrameRateCalculator()
}
b.frameRateCalculated = false
if b.frameRateCalculator[0] == nil {
b.createFrameRateCalculator()
}
b.StartKeyFrameSeeder()
b.isRestartPending = true
if f := b.onStreamRestart; f != nil && isDetected {
go f(reason)
}
}
func (b *BufferBase) createDDParserAndFrameRateCalculator() {
if mime.IsMimeTypeSVCCapable(b.mime) || b.mime == mime.MimeTypeVP8 {
frc := NewFrameRateCalculatorDD(b.clockRate, b.logger)
for i := range b.frameRateCalculator {
b.frameRateCalculator[i] = frc.GetFrameRateCalculatorForSpatial(int32(i))
}
b.ddParser = NewDependencyDescriptorParser(
b.ddExtID,
b.logger,
func(spatial, temporal int32) {
frc.SetMaxLayer(spatial, temporal)
},
b.params.IsDDRestartEnabled,
)
}
}
func (b *BufferBase) createFrameRateCalculator() {
switch b.mime {
case mime.MimeTypeVP8:
b.frameRateCalculator[0] = NewFrameRateCalculatorVP8(b.clockRate, b.logger)
case mime.MimeTypeVP9:
frc := NewFrameRateCalculatorVP9(b.clockRate, b.logger)
for i := range b.frameRateCalculator {
b.frameRateCalculator[i] = frc.GetFrameRateCalculatorForSpatial(int32(i))
}
case mime.MimeTypeH265:
b.frameRateCalculator[0] = NewFrameRateCalculatorH26x(b.clockRate, b.logger)
}
}
func (b *BufferBase) ReadExtended(buf []byte) (*ExtPacket, error) {
b.Lock()
for {
if b.isClosed.Load() {
b.Unlock()
return nil, io.EOF
}
if b.isRestartPending {
b.isRestartPending = false
b.Unlock()
return nil, nil
}
if b.extPackets.Len() > 0 {
ep := b.extPackets.PopFront()
patched := b.patchExtPacket(ep, buf)
if patched == nil {
ReleaseExtPacket(ep)
continue
}
b.Unlock()
return patched, nil
}
b.readCond.Wait()
}
}
func (b *BufferBase) SetPLIThrottle(duration int64) {
b.Lock()
defer b.Unlock()
b.pliThrottle = duration
}
func (b *BufferBase) SendPLI(force bool) {
b.RLock()
if b.codecType != webrtc.RTPCodecTypeVideo {
b.RUnlock()
return
}
rtpStats := b.rtpStats
pliThrottle := b.pliThrottle
b.RUnlock()
if (rtpStats == nil && !force) || !rtpStats.CheckAndUpdatePli(pliThrottle, force) {
return
}
if b.params.SendPLI != nil {
b.params.SendPLI()
}
}
func (b *BufferBase) SetRTT(rtt uint32) {
b.Lock()
defer b.Unlock()
if rtt == 0 {
return
}
if b.nacker != nil {
b.nacker.SetRTT(rtt)
}
if b.rtpStats != nil {
b.rtpStats.UpdateRtt(rtt)
}
}
func (b *BufferBase) WaitRead() {
b.readCond.Wait()
}
func (b *BufferBase) NotifyRead() {
b.readCond.Broadcast()
}
func (b *BufferBase) HandleIncomingPacket(
rawPkt []byte,
rtpPacket *rtp.Packet,
arrivalTime int64,
isBuffered bool,
isRTX bool,
skippedSeqs []uint16,
oobSequenceNumber uint16,
) (uint64, error) {
b.Lock()
defer b.Unlock()
if b.isClosed.Load() {
return 0, io.EOF
}
return b.HandleIncomingPacketLocked(
rawPkt,
rtpPacket,
arrivalTime,
isBuffered,
isRTX,
skippedSeqs,
oobSequenceNumber,
)
}
func (b *BufferBase) HandleIncomingPacketLocked(
rawPkt []byte,
rtpPacket *rtp.Packet,
arrivalTime int64,
isBuffered bool,
isRTX bool,
skippedSeqs []uint16,
oobSequenceNumber uint16,
) (uint64, error) {
if rtpPacket == nil {
rtpPacket = &rtp.Packet{}
if err := rtpPacket.Unmarshal(rawPkt); err != nil {
b.logger.Errorw("could not unmarshal RTP packet", err)
return 0, err
}
}
b.processAudioSsrcLevelHeaderExtension(rtpPacket, arrivalTime)
if len(skippedSeqs) > 0 {
skippedRtpPkt := rtp.Packet{
Header: rtpPacket.Header,
}
skippedRtpPkt.Marker = false
// Use the current highest timestamp to prevent the case of old sequence number and newer timestamp.
// It is possible that the skipped packet is older. An example sequence
// - Packet 10, skipped 6, 7, 9 -> Packet 8 is unknown at this point
// - Packet 11, skipped 8 -> this would cause sequence number be older, but using timestamp from Packet 11 will make time stamp diff +ve
skippedRtpPkt.Timestamp = b.rtpStats.HighestTimestamp()
for _, sn := range skippedSeqs {
skippedRtpPkt.SequenceNumber = sn
flowState := b.rtpStats.Update(
arrivalTime,
skippedRtpPkt.Header.SequenceNumber,
skippedRtpPkt.Header.Timestamp,
skippedRtpPkt.Header.Marker,
skippedRtpPkt.Header.MarshalSize(),
len(skippedRtpPkt.Payload),
int(skippedRtpPkt.PaddingSize),
)
if flowState.UnhandledReason == rtpstats.RTPFlowUnhandledReasonNone && !flowState.IsOutOfOrder {
if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil {
b.logger.Errorw(
"could not exclude range", err,
"sequenceNumber", sn,
"extSequenceNumber", flowState.ExtSequenceNumber,
"rtpStats", b.rtpStats,
"rtpStatsLite", b.rtpStatsLite,
"snRangeMap", b.snRangeMap,
"skipped", skippedSeqs,
)
}
}
}
}
// do not start on an RTX packet
if isRTX && !b.rtpStats.IsActive() {
return 0, errors.New("cannot start on rtx packet")
}
flowState := b.rtpStats.Update(
arrivalTime,
rtpPacket.Header.SequenceNumber,
rtpPacket.Header.Timestamp,
rtpPacket.Header.Marker,
rtpPacket.Header.MarshalSize(),
len(rtpPacket.Payload),
int(rtpPacket.PaddingSize),
)
switch flowState.UnhandledReason {
case rtpstats.RTPFlowUnhandledReasonNone:
case rtpstats.RTPFlowUnhandledReasonRestart:
if !b.enableStreamRestartDetection {
return 0, fmt.Errorf("unhandled reason: %s", flowState.UnhandledReason.String())
}
b.restartStreamLocked("discontinuity", true)
flowState = b.rtpStats.Update(
arrivalTime,
rtpPacket.Header.SequenceNumber,
rtpPacket.Header.Timestamp,
rtpPacket.Header.Marker,
rtpPacket.Header.MarshalSize(),
len(rtpPacket.Payload),
int(rtpPacket.PaddingSize),
)
default:
return 0, fmt.Errorf("unhandled reason: %s", flowState.UnhandledReason.String())
}
if b.params.IsOOBSequenceNumber {
b.updateOOBNACKState(oobSequenceNumber, arrivalTime, len(rawPkt))
} else {
b.updateNACKState(rtpPacket.SequenceNumber, flowState)
}
if len(rtpPacket.Payload) == 0 && (!flowState.IsOutOfOrder || flowState.IsDuplicate) {
// drop padding only in-order or duplicate packet
if !flowState.IsOutOfOrder {
// in-order packet - increment sequence number offset for subsequent packets
// Example:
// 40 - regular packet - pass through as sequence number 40
// 41 - missing packet - don't know what it is, could be padding or not
// 42 - padding only packet - in-order - drop - increment sequence number offset to 1 -
// range[0, 42] = 0 offset
// 41 - arrives out of order - get offset 0 from cache - passed through as sequence number 41
// 43 - regular packet - offset = 1 (running offset) - passes through as sequence number 42
// 44 - padding only - in order - drop - increment sequence number offset to 2
// range[0, 42] = 0 offset, range[43, 44] = 1 offset
// 43 - regular packet - out of order + duplicate - offset = 1 from cache -
// adjusted sequence number is 42, will be dropped by RTX buffer AddPacket method as duplicate
// 45 - regular packet - offset = 2 (running offset) - passed through with adjusted sequence number as 43
// 44 - padding only - out-of-order + duplicate - dropped as duplicate
//
if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil {
b.logger.Errorw(
"could not exclude range", err,
"sn", rtpPacket.SequenceNumber,
"esn", flowState.ExtSequenceNumber,
"rtpStats", b.rtpStats,
"snRangeMap", b.snRangeMap,
)
}
}
return 0, errors.New("padding only packet")
}
if !flowState.IsOutOfOrder && rtpPacket.PayloadType != b.payloadType && b.codecType == webrtc.RTPCodecTypeVideo {
b.logger.Infow("possible codec change", "oldPT", b.payloadType, "receivedPT", rtpPacket.PayloadType)
b.handleCodecChange(rtpPacket.PayloadType)
}
// add to RTX buffer using sequence number after accounting for dropped padding only packets
snAdjustment, err := b.snRangeMap.GetValue(flowState.ExtSequenceNumber)
if err != nil {
b.logger.Errorw(
"could not get sequence number adjustment", err,
"sequenceNumber", rtpPacket.SequenceNumber,
"extSequenceNumber", flowState.ExtSequenceNumber,
"timestamp", rtpPacket.Timestamp,
"extTimestamp", flowState.ExtTimestamp,
"payloadSize", len(rtpPacket.Payload),
"paddingSize", rtpPacket.PaddingSize,
"rtpStats", b.rtpStats,
"rtpStatsLite", b.rtpStatsLite,
"snRangeMap", b.snRangeMap,
)
return 0, err
}
flowState.ExtSequenceNumber -= snAdjustment
rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber)
if _, err = b.bucket.AddPacketWithSequenceNumber(rawPkt, flowState.ExtSequenceNumber); err != nil {
if !flowState.IsDuplicate {
if errors.Is(err, bucket.ErrPacketTooOld) {
packetTooOldCount := b.packetTooOldCount.Inc()
if (packetTooOldCount-1)%100 == 0 {
b.logger.Warnw(
"could not add packet to bucket", err,
"count", packetTooOldCount,
"flowState", &flowState,
"snAdjustment", snAdjustment,
"incomingSequenceNumber", flowState.ExtSequenceNumber+snAdjustment,
"rtpStats", b.rtpStats,
"rtpStatsLite", b.rtpStatsLite,
"snRangeMap", b.snRangeMap,
"skipped", skippedSeqs,
)
}
} else if err != bucket.ErrRTXPacket {
b.logger.Warnw(
"could not add packet to bucket", err,
"flowState", &flowState,
"snAdjustment", snAdjustment,
"incomingSequenceNumber", flowState.ExtSequenceNumber+snAdjustment,
"rtpStats", b.rtpStats,
"rtpStatsLite", b.rtpStatsLite,
"snRangeMap", b.snRangeMap,
"skipped", skippedSeqs,
)
}
}
return 0, err
}
ep := b.getExtPacket(rtpPacket, arrivalTime, isBuffered, flowState)
if ep == nil {
return 0, errors.New("could not get ext packet")
}
b.extPackets.PushBack(ep)
b.readCond.Broadcast()
if b.extPackets.Len() > b.bucket.Capacity() {
if (b.extPacketTooMuchCount.Inc()-1)%100 == 0 {
b.logger.Warnw("too much ext packets", nil, "count", b.extPackets.Len())
}
}
b.maybeGrowBucket(arrivalTime)
return ep.ExtSequenceNumber, nil
}
func (b *BufferBase) updateNACKState(sequenceNumber uint16, flowState rtpstats.RTPFlowState) {
if b.nacker == nil {
return
}
b.nacker.Remove(sequenceNumber)
for lost := flowState.LossStartInclusive; lost != flowState.LossEndExclusive; lost++ {
b.nacker.Push(uint16(lost))
}
}
func (b *BufferBase) updateOOBNACKState(sequenceNumber uint16, arrivalTime int64, size int) {
if b.nacker == nil || !b.params.IsOOBSequenceNumber {
return
}
fsLite := b.rtpStatsLite.Update(arrivalTime, size, sequenceNumber)
if fsLite.IsNotHandled {
return
}
b.nacker.Remove(sequenceNumber)
for lost := fsLite.LossStartInclusive; lost != fsLite.LossEndExclusive; lost++ {
b.nacker.Push(uint16(lost))
}
}
func (b *BufferBase) processAudioSsrcLevelHeaderExtension(p *rtp.Packet, arrivalTime int64) {
if b.audioLevelExtID == 0 {
return
}
if e := p.GetExtension(b.audioLevelExtID); e != nil {
ext := rtp.AudioLevelExtension{}
if err := ext.Unmarshal(e); err == nil {
b.audioLevel.ObserveWithRTPTimestamp(ext.Level, p.Timestamp, arrivalTime)
}
}
}
func (b *BufferBase) handleCodecChange(newPT uint8) {
var (
codecFound, rtxFound bool
rtxPt uint8
newCodec webrtc.RTPCodecParameters
)
for _, codec := range b.rtpParameters.Codecs {
if !codecFound && uint8(codec.PayloadType) == newPT {
newCodec = codec
codecFound = true
}
if mime.IsMimeTypeStringRTX(codec.MimeType) && strings.Contains(codec.SDPFmtpLine, fmt.Sprintf("apt=%d", newPT)) {
rtxFound = true
rtxPt = uint8(codec.PayloadType)
}
if codecFound && rtxFound {
break
}
}
if !codecFound {
b.logger.Errorw(
"could not find codec for new payload type", nil,
"pt", newPT,
"rtpParameters", b.rtpParameters,
)
return
}
b.logger.Infow(
"codec changed",
"oldPayload", b.payloadType, "newPayload", newPT,
"oldRtxPayload", b.rtxPayloadType, "newRtxPayload", rtxPt,
"oldMime", b.mime, "newMime", newCodec.MimeType,
)
b.payloadType = newPT
b.rtxPayloadType = rtxPt
b.mime = mime.NormalizeMimeType(newCodec.MimeType)
if f := b.onCodecChange; f != nil {
go f(newCodec)
}
}
func (b *BufferBase) getExtPacket(
rtpPacket *rtp.Packet,
arrivalTime int64,
isBuffered bool,
flowState rtpstats.RTPFlowState,
) *ExtPacket {
ep := ExtPacketFactory.Get().(*ExtPacket)
*ep = ExtPacket{
Arrival: arrivalTime,
ExtSequenceNumber: flowState.ExtSequenceNumber,
ExtTimestamp: flowState.ExtTimestamp,
Packet: rtpPacket,
VideoLayer: VideoLayer{
Spatial: InvalidLayerSpatial,
Temporal: InvalidLayerTemporal,
},
IsOutOfOrder: flowState.IsOutOfOrder,
IsBuffered: isBuffered,
}
if len(ep.Packet.Payload) == 0 {
// padding only packet, nothing else to do
return ep
}
if err := b.processVideoPacket(ep); err != nil {
ReleaseExtPacket(ep)
return nil
}
if b.absCaptureTimeExtID != 0 {
extData := rtpPacket.GetExtension(b.absCaptureTimeExtID)
var actExt act.AbsCaptureTime
if err := actExt.Unmarshal(extData); err == nil {
ep.AbsCaptureTimeExt = &actExt
}
}
return ep
}
func (b *BufferBase) processVideoPacket(ep *ExtPacket) error {
if b.codecType != webrtc.RTPCodecTypeVideo {
return nil
}
ep.Temporal = 0
var videoSize []VideoSize
if b.ddParser != nil {
ddVal, videoLayer, err := b.ddParser.Parse(ep.Packet)
if err != nil {
if errors.Is(err, ErrDDExtentionNotFound) {
if b.mime == mime.MimeTypeVP8 || b.mime == mime.MimeTypeVP9 {
b.logger.Infow("dd extension not found, disable dd parser")
b.ddParser = nil
b.createFrameRateCalculator()
}
} else {
return err
}
} else if ddVal != nil {
ep.DependencyDescriptor = ddVal
ep.VideoLayer = videoLayer
videoSize = ExtractDependencyDescriptorVideoSize(ddVal.Descriptor)
// DD-TODO : notify active decode target change if changed.
}
}
switch b.mime {
case mime.MimeTypeVP8:
vp8Packet := VP8{}
if err := vp8Packet.Unmarshal(ep.Packet.Payload); err != nil {
b.logger.Warnw("could not unmarshal VP8 packet", err)
return err
}
ep.IsKeyFrame = vp8Packet.IsKeyFrame
if ep.DependencyDescriptor == nil {
ep.Temporal = int32(vp8Packet.TID)
if ep.IsKeyFrame {
if sz := ExtractVP8VideoSize(&vp8Packet, ep.Packet.Payload); sz.Width > 0 && sz.Height > 0 {
videoSize = append(videoSize, sz)
}
}
} else {
// vp8 with DependencyDescriptor enabled, use the TID from the descriptor
vp8Packet.TID = uint8(ep.Temporal)
}
ep.Payload = vp8Packet
ep.Spatial = InvalidLayerSpatial // vp8 don't have spatial scalability, reset to invalid
case mime.MimeTypeVP9:
if ep.DependencyDescriptor == nil {
var vp9Packet codecs.VP9Packet
_, err := vp9Packet.Unmarshal(ep.Packet.Payload)
if err != nil {
b.logger.Warnw("could not unmarshal VP9 packet", err)
return err
}
ep.VideoLayer = VideoLayer{
Spatial: int32(vp9Packet.SID),
Temporal: int32(vp9Packet.TID),
}
ep.Payload = vp9Packet
ep.IsKeyFrame = IsVP9KeyFrame(&vp9Packet, ep.Packet.Payload)
if ep.IsKeyFrame {
for i := 0; i < len(vp9Packet.Width); i++ {
videoSize = append(videoSize, VideoSize{
Width: uint32(vp9Packet.Width[i]),
Height: uint32(vp9Packet.Height[i]),
})
}
}
} else {
ep.IsKeyFrame = IsVP9KeyFrame(nil, ep.Packet.Payload)
}
case mime.MimeTypeH264:
ep.IsKeyFrame = IsH264KeyFrame(ep.Packet.Payload)
ep.Spatial = InvalidLayerSpatial // h.264 don't have spatial scalability, reset to invalid
// Check H264 key frame video size
if ep.IsKeyFrame {
if sz := ExtractH264VideoSize(ep.Packet.Payload); sz.Width > 0 && sz.Height > 0 {
videoSize = append(videoSize, sz)
}
}
case mime.MimeTypeAV1:
ep.IsKeyFrame = IsAV1KeyFrame(ep.Packet.Payload)
case mime.MimeTypeH265:
ep.IsKeyFrame = IsH265KeyFrame(ep.Packet.Payload)
if ep.DependencyDescriptor == nil {
if len(ep.Packet.Payload) < 2 {
b.logger.Warnw("invalid H265 packet", nil, "payloadLen", len(ep.Packet.Payload))
return errors.New("invalid H265 packet")
}
ep.VideoLayer = VideoLayer{
Temporal: int32(ep.Packet.Payload[1]&0x07) - 1,
}
ep.Spatial = InvalidLayerSpatial
if ep.IsKeyFrame {
if sz := ExtractH265VideoSize(ep.Packet.Payload); sz.Width > 0 && sz.Height > 0 {
videoSize = append(videoSize, sz)
}
}
}
}
if ep.IsKeyFrame {
if b.rtpStats != nil {
b.rtpStats.UpdateKeyFrame(1)
}
}
if len(videoSize) > 0 {
b.checkVideoSizeChange(videoSize)
}
b.doFpsCalc(ep)
return nil
}
func (b *BufferBase) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket {
n, err := b.getPacketLocked(buf, ep.ExtSequenceNumber)
if err != nil {
packetNotFoundCount := b.packetNotFoundCount.Inc()
if (packetNotFoundCount-1)%20 == 0 {
b.logger.Warnw(
"could not get packet from bucket", err,
"sn", ep.Packet.SequenceNumber,
"headSN", b.bucket.HeadSequenceNumber(),
"count", packetNotFoundCount,
"rtpStats", b.rtpStats,
"rtpStatsLite", b.rtpStatsLite,
"snRangeMap", b.snRangeMap,
)
}
return nil
}
ep.RawPacket = buf[:n]
// patch RTP packet to point payload to new buffer
pkt := *ep.Packet
payloadStart := ep.Packet.Header.MarshalSize()
payloadEnd := payloadStart + len(ep.Packet.Payload)
if payloadEnd > n {
b.logger.Warnw("unexpected marshal size", nil, "max", n, "need", payloadEnd)
return nil
}
pkt.Payload = buf[payloadStart:payloadEnd]
ep.Packet = &pkt
return ep
}
func (b *BufferBase) flushExtPackets() {
b.Lock()
defer b.Unlock()
b.flushExtPacketsLocked()
}
func (b *BufferBase) flushExtPacketsLocked() {
for b.extPackets.Len() > 0 {
ep := b.extPackets.PopFront()
ReleaseExtPacket(ep)
}
b.extPackets.Clear()
}
func (b *BufferBase) maybeGrowBucket(now int64) {
if now-b.lastBucketCapCheckAt < bucketCapCheckInterval {
return
}
// check and allocate in a go routine, away from the forwarding path
go func() {
b.Lock()
defer b.Unlock()
b.lastBucketCapCheckAt = now
cap := b.bucket.Capacity()
maxPkts := b.params.MaxVideoPkts
if b.codecType == webrtc.RTPCodecTypeAudio {
maxPkts = b.params.MaxAudioPkts
}
if cap >= maxPkts {
return
}
oldCap := cap
if deltaInfo := b.rtpStats.DeltaInfo(b.ppsSnapshotId); deltaInfo != nil {
duration := deltaInfo.EndTime.Sub(deltaInfo.StartTime)
if duration < 500*time.Millisecond {
return
}
pps := int(time.Duration(deltaInfo.Packets) * time.Second / duration)
for pps > cap && cap < maxPkts {
cap = b.bucket.Grow()
}
if cap > oldCap {
b.logger.Infow(
"grow bucket",
"from", oldCap,
"to", cap,
"pps", pps,
"deltaInfo", deltaInfo,
"rtpStats", b.rtpStats,
)
}
}
}()
}
func (b *BufferBase) doFpsCalc(ep *ExtPacket) {
if b.isPaused || b.frameRateCalculated || len(ep.Packet.Payload) == 0 {
return
}
spatial := ep.Spatial
if spatial < 0 || int(spatial) >= len(b.frameRateCalculator) {
spatial = 0
}
if fr := b.frameRateCalculator[spatial]; fr != nil {
if fr.RecvPacket(ep) {
complete := true
for _, fr2 := range b.frameRateCalculator {
if fr2 != nil && !fr2.Completed() {
complete = false
break
}
}
if complete {
b.frameRateCalculated = true
if f := b.onFpsChanged; f != nil {
go f()
}
}
}
}
}
func (b *BufferBase) SetSenderReportData(srData *livekit.RTCPSenderReportState) {
b.RLock()
didSet := false
if b.rtpStats != nil {
didSet = b.rtpStats.SetRtcpSenderReportData(srData)
}
b.RUnlock()
if didSet {
if cb := b.getOnRtcpSenderReport(); cb != nil {
cb()
}
}
}
func (b *BufferBase) GetSenderReportData() *livekit.RTCPSenderReportState {
b.RLock()
defer b.RUnlock()
if b.rtpStats != nil {
return b.rtpStats.GetRtcpSenderReportData()
}
return nil
}
func (b *BufferBase) GetPacket(buff []byte, esn uint64) (int, error) {
b.Lock()
defer b.Unlock()
return b.getPacketLocked(buff, esn)
}
func (b *BufferBase) getPacketLocked(buff []byte, esn uint64) (int, error) {
if b.isClosed.Load() {
return 0, io.EOF
}
return b.bucket.GetPacket(buff, esn)
}
func (b *BufferBase) GetStats() *livekit.RTPStats {
b.RLock()
defer b.RUnlock()
if b.rtpStats == nil {
return nil
}
return b.rtpStats.ToProto()
}
func (b *BufferBase) GetDeltaStats() *StreamStatsWithLayers {
b.RLock()
defer b.RUnlock()
if b.rtpStats == nil {
return nil
}
deltaStats := b.rtpStats.DeltaInfo(b.deltaStatsSnapshotId)
if deltaStats == nil {
return nil
}
return &StreamStatsWithLayers{
RTPStats: deltaStats,
Layers: map[int32]*rtpstats.RTPDeltaInfo{
0: deltaStats,
},
}
}
func (b *BufferBase) GetDeltaStatsLite() *rtpstats.RTPDeltaInfoLite {
b.RLock()
defer b.RUnlock()
if b.rtpStatsLite == nil {
return nil
}
return b.rtpStatsLite.DeltaInfoLite(b.liteStatsSnapshotId)
}
func (b *BufferBase) GetLastSenderReportTime() time.Time {
b.RLock()
defer b.RUnlock()
if b.rtpStats == nil {
return time.Time{}
}
return b.rtpStats.LastSenderReportTime()
}
func (b *BufferBase) GetAudioLevel() (float64, bool) {
b.RLock()
defer b.RUnlock()
if b.audioLevel == nil {
return 0, false
}
return b.audioLevel.GetLevel(mono.UnixNano())
}
func (b *BufferBase) OnRtcpSenderReport(fn func()) {
b.Lock()
b.onRtcpSenderReport = fn
b.Unlock()
}
func (b *BufferBase) getOnRtcpSenderReport() func() {
b.RLock()
defer b.RUnlock()
return b.onRtcpSenderReport
}
func (b *BufferBase) OnFpsChanged(f func()) {
b.Lock()
b.onFpsChanged = f
b.Unlock()
}
func (b *BufferBase) OnVideoSizeChanged(fn func([]VideoSize)) {
b.Lock()
b.onVideoSizeChanged = fn
b.Unlock()
}
func (b *BufferBase) OnCodecChange(fn func(webrtc.RTPCodecParameters)) {
b.Lock()
b.onCodecChange = fn
b.Unlock()
}
func (b *BufferBase) OnStreamRestart(fn func(string)) {
b.Lock()
b.onStreamRestart = fn
b.Unlock()
}
// checkVideoSizeChange checks if video size has changed for a specific spatial layer and fires callback
func (b *BufferBase) checkVideoSizeChange(videoSizes []VideoSize) {
if len(videoSizes) > len(b.currentVideoSize) {
b.logger.Warnw(
"video size index out of range", nil,
"newSize", videoSizes,
"currentVideoSize", b.currentVideoSize,
)
return
}
if len(videoSizes) < len(b.currentVideoSize) {
videoSizes = append(videoSizes, make([]VideoSize, len(b.currentVideoSize)-len(videoSizes))...)
}
changed := false
for i, sz := range videoSizes {
if b.currentVideoSize[i].Width != sz.Width || b.currentVideoSize[i].Height != sz.Height {
changed = true
break
}
}
if changed {
b.logger.Debugw("video size changed", "from", b.currentVideoSize, "to", videoSizes)
copy(b.currentVideoSize[:], videoSizes[:])
if b.onVideoSizeChanged != nil {
go b.onVideoSizeChanged(videoSizes)
}
}
}
func (b *BufferBase) GetTemporalLayerFpsForSpatial(layer int32) []float32 {
b.RLock()
defer b.RUnlock()
if int(layer) >= len(b.frameRateCalculator) {
return nil
}
if fc := b.frameRateCalculator[layer]; fc != nil {
return fc.GetFrameRate()
}
return nil
}
func (b *BufferBase) StartKeyFrameSeeder() {
if b.codecType == webrtc.RTPCodecTypeVideo {
go b.seedKeyFrame(b.keyFrameSeederGeneration.Inc())
}
}
func (b *BufferBase) StopKeyFrameSeeder() {
b.keyFrameSeederGeneration.Inc()
}
func (b *BufferBase) seedKeyFrame(keyFrameSeederGeneration int32) {
// a key frame is needed especially when using Dependency Descriptor
// to get the DD structure which is used in parsing subsequent packets,
// till then packets are dropped which results in stream tracker not
// getting any data which means it does not declare layer start.
//
// send gratuitous PLIs for some time or until a key frame is seen to
// get the engine rolling
timer := time.NewTimer(30 * time.Second)
defer timer.Stop()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
initialCount := uint32(0)
b.RLock()
rtpStats := b.rtpStats
lgr := b.logger
b.RUnlock()
lgr.Debugw("starting key frame seeder", "generation", keyFrameSeederGeneration)
if rtpStats == nil {
lgr.Debugw("cannot do key frame seeding without stats", "generation", keyFrameSeederGeneration)
return
}
initialCount, _ = rtpStats.KeyFrame()
for {
if b.isClosed.Load() || b.keyFrameSeederGeneration.Load() != keyFrameSeederGeneration {
lgr.Debugw(
"stopping key frame seeder: stopped",
"generation", keyFrameSeederGeneration,
"currentGeneration", b.keyFrameSeederGeneration.Load(),
)
return
}
select {
case <-timer.C:
lgr.Debugw("stopping key frame seeder: timeout", "generation", keyFrameSeederGeneration)
return
case <-ticker.C:
cnt, last := rtpStats.KeyFrame()
if cnt > initialCount {
lgr.Debugw(
"stopping key frame seeder: received key frame",
"generation", keyFrameSeederGeneration,
"keyFrameCountInitial", initialCount,
"keyFrameCount", cnt,
"lastKeyFrame", last,
)
return
}
b.SendPLI(false)
}
}
}
func (b *BufferBase) GetNACKPairs() []rtcp.NackPair {
b.RLock()
defer b.RUnlock()
return b.GetNACKPairsLocked()
}
func (b *BufferBase) GetNACKPairsLocked() []rtcp.NackPair {
if b.nacker == nil {
return nil
}
pairs, numSeqNumsNacked := b.nacker.Pairs()
if !b.params.IsOOBSequenceNumber {
if b.rtpStats != nil {
b.rtpStats.UpdateNack(uint32(numSeqNumsNacked))
}
} else {
if b.rtpStatsLite != nil {
b.rtpStatsLite.UpdateNack(uint32(numSeqNumsNacked))
}
}
return pairs
}
func (b *BufferBase) GetRtcpReceptionReportLocked(proxyLoss uint8) *rtcp.ReceptionReport {
if b.rtpStats == nil {
return nil
}
return b.rtpStats.GetRtcpReceptionReport(b.params.SSRC, proxyLoss, b.rrSnapshotId)
}
// ---------------------------------------------------------------