mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 19:55:41 +00:00
It is possible that the stream stops just after start and restarts much later introducing a large gap in sequence number. That could look like an unhandled case because the wrap back handler does not have enough packets yet. Let other checks based on time stamp gap take effect and only if that also leaves the sequence number unhandled, drop the packet.
1452 lines
35 KiB
Go
1452 lines
35 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package buffer
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gammazero/deque"
|
|
"github.com/pion/rtcp"
|
|
"github.com/pion/rtp"
|
|
"github.com/pion/rtp/codecs"
|
|
"github.com/pion/sdp/v3"
|
|
"github.com/pion/webrtc/v4"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/livekit/livekit-server/pkg/sfu/audio"
|
|
"github.com/livekit/livekit-server/pkg/sfu/mime"
|
|
act "github.com/livekit/livekit-server/pkg/sfu/rtpextension/abscapturetime"
|
|
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
|
|
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
|
|
"github.com/livekit/livekit-server/pkg/sfu/utils"
|
|
sutils "github.com/livekit/livekit-server/pkg/utils"
|
|
"github.com/livekit/mediatransportutil/pkg/bucket"
|
|
"github.com/livekit/mediatransportutil/pkg/nack"
|
|
"github.com/livekit/mediatransportutil/pkg/twcc"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
"github.com/livekit/protocol/utils/mono"
|
|
)
|
|
|
|
var (
|
|
ExtPacketFactory = &sync.Pool{
|
|
New: func() any {
|
|
return &ExtPacket{}
|
|
},
|
|
}
|
|
)
|
|
|
|
// --------------------------------------
|
|
|
|
const (
|
|
ReportDelta = 1e9
|
|
|
|
InitPacketBufferSizeVideo = 300
|
|
InitPacketBufferSizeAudio = 70
|
|
)
|
|
|
|
var (
|
|
errInvalidCodec = errors.New("invalid codec")
|
|
)
|
|
|
|
type pendingPacket struct {
|
|
arrivalTime int64
|
|
packet []byte
|
|
}
|
|
|
|
type ExtPacket struct {
|
|
VideoLayer
|
|
Arrival int64
|
|
ExtSequenceNumber uint64
|
|
ExtTimestamp uint64
|
|
Packet *rtp.Packet
|
|
Payload any
|
|
KeyFrame bool
|
|
RawPacket []byte
|
|
DependencyDescriptor *ExtDependencyDescriptor
|
|
AbsCaptureTimeExt *act.AbsCaptureTime
|
|
IsOutOfOrder bool
|
|
IsBuffered bool
|
|
}
|
|
|
|
// VideoSize represents video resolution
|
|
type VideoSize struct {
|
|
Width uint32
|
|
Height uint32
|
|
}
|
|
|
|
// Buffer contains all packets
|
|
type Buffer struct {
|
|
sync.RWMutex
|
|
readCond *sync.Cond
|
|
bucket *bucket.Bucket[uint64, uint16]
|
|
nacker *nack.NackQueue
|
|
maxVideoPkts int
|
|
maxAudioPkts int
|
|
codecType webrtc.RTPCodecType
|
|
extPackets deque.Deque[*ExtPacket]
|
|
pPackets []pendingPacket
|
|
closeOnce sync.Once
|
|
mediaSSRC uint32
|
|
clockRate uint32
|
|
lastReport int64
|
|
twccExtID uint8
|
|
audioLevelExtID uint8
|
|
bound bool
|
|
closed atomic.Bool
|
|
|
|
rtpParameters webrtc.RTPParameters
|
|
payloadType uint8
|
|
rtxPayloadType uint8
|
|
mime mime.MimeType
|
|
|
|
snRangeMap *utils.RangeMap[uint64, uint64]
|
|
|
|
latestTSForAudioLevelInitialized bool
|
|
latestTSForAudioLevel uint32
|
|
|
|
twcc *twcc.Responder
|
|
audioLevelParams audio.AudioLevelParams
|
|
audioLevel *audio.AudioLevel
|
|
enableAudioLossProxying bool
|
|
|
|
lastPacketRead int
|
|
|
|
pliThrottle int64
|
|
|
|
rtpStats *rtpstats.RTPStatsReceiver
|
|
rrSnapshotId uint32
|
|
deltaStatsSnapshotId uint32
|
|
ppsSnapshotId uint32
|
|
|
|
lastFractionLostToReport uint8 // Last fraction lost from subscribers, should report to publisher; Audio only
|
|
|
|
// callbacks
|
|
onClose func()
|
|
onRtcpFeedback func([]rtcp.Packet)
|
|
onRtcpSenderReport func()
|
|
onFpsChanged func()
|
|
onFinalRtpStats func(*livekit.RTPStats)
|
|
onCodecChange func(webrtc.RTPCodecParameters)
|
|
onVideoSizeChanged func([]VideoSize)
|
|
|
|
// video size tracking for multiple spatial layers
|
|
currentVideoSize [DefaultMaxLayerSpatial + 1]VideoSize
|
|
|
|
// logger
|
|
logger logger.Logger
|
|
|
|
// dependency descriptor
|
|
ddExtID uint8
|
|
ddParser *DependencyDescriptorParser
|
|
|
|
paused bool
|
|
frameRateCalculator [DefaultMaxLayerSpatial + 1]FrameRateCalculator
|
|
frameRateCalculated bool
|
|
|
|
packetNotFoundCount atomic.Uint32
|
|
packetTooOldCount atomic.Uint32
|
|
extPacketTooMuchCount atomic.Uint32
|
|
|
|
primaryBufferForRTX *Buffer
|
|
rtxPktBuf []byte
|
|
|
|
absCaptureTimeExtID uint8
|
|
|
|
keyFrameSeederGeneration atomic.Int32
|
|
}
|
|
|
|
// NewBuffer constructs a new Buffer
|
|
func NewBuffer(ssrc uint32, maxVideoPkts, maxAudioPkts int) *Buffer {
|
|
l := logger.GetLogger() // will be reset with correct context via SetLogger
|
|
b := &Buffer{
|
|
mediaSSRC: ssrc,
|
|
maxVideoPkts: maxVideoPkts,
|
|
maxAudioPkts: maxAudioPkts,
|
|
snRangeMap: utils.NewRangeMap[uint64, uint64](100),
|
|
pliThrottle: int64(500 * time.Millisecond),
|
|
logger: l.WithComponent(sutils.ComponentPub).WithComponent(sutils.ComponentSFU),
|
|
}
|
|
b.readCond = sync.NewCond(&b.RWMutex)
|
|
b.extPackets.SetBaseCap(128)
|
|
return b
|
|
}
|
|
|
|
func (b *Buffer) SetLogger(logger logger.Logger) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
|
|
b.logger = logger.WithComponent(sutils.ComponentPub).WithComponent(sutils.ComponentSFU).WithValues("ssrc", b.mediaSSRC)
|
|
if b.rtpStats != nil {
|
|
b.rtpStats.SetLogger(b.logger)
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) SetPaused(paused bool) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
|
|
b.paused = paused
|
|
}
|
|
|
|
func (b *Buffer) SetTWCCAndExtID(twcc *twcc.Responder, extID uint8) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
|
|
b.twcc = twcc
|
|
b.twccExtID = extID
|
|
}
|
|
|
|
func (b *Buffer) SetAudioLevelParams(audioLevelParams audio.AudioLevelParams) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
|
|
b.audioLevelParams = audioLevelParams
|
|
}
|
|
|
|
func (b *Buffer) SetAudioLossProxying(enable bool) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
|
|
b.enableAudioLossProxying = enable
|
|
}
|
|
|
|
func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapability, bitrates int) error {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
if b.bound {
|
|
return nil
|
|
}
|
|
|
|
b.logger.Debugw("binding track")
|
|
if codec.ClockRate == 0 {
|
|
b.logger.Warnw("invalid codec", nil, "params", params, "codec", codec, "bitrates", bitrates)
|
|
return errInvalidCodec
|
|
}
|
|
|
|
b.rtpStats = rtpstats.NewRTPStatsReceiver(rtpstats.RTPStatsParams{
|
|
ClockRate: codec.ClockRate,
|
|
Logger: b.logger,
|
|
})
|
|
b.rrSnapshotId = b.rtpStats.NewSnapshotId()
|
|
b.deltaStatsSnapshotId = b.rtpStats.NewSnapshotId()
|
|
b.ppsSnapshotId = b.rtpStats.NewSnapshotId()
|
|
|
|
b.clockRate = codec.ClockRate
|
|
b.lastReport = mono.UnixNano()
|
|
b.mime = mime.NormalizeMimeType(codec.MimeType)
|
|
b.rtpParameters = params
|
|
for _, codecParameter := range params.Codecs {
|
|
if mime.IsMimeTypeStringEqual(codecParameter.MimeType, codec.MimeType) {
|
|
b.payloadType = uint8(codecParameter.PayloadType)
|
|
break
|
|
}
|
|
}
|
|
|
|
if b.payloadType == 0 && !mime.IsMimeTypeStringEqual(codec.MimeType, webrtc.MimeTypePCMU) {
|
|
b.logger.Warnw("could not find payload type for codec", nil, "codec", codec.MimeType, "parameters", params)
|
|
b.payloadType = uint8(params.Codecs[0].PayloadType)
|
|
}
|
|
|
|
// find RTX payload type
|
|
for _, codec := range params.Codecs {
|
|
if mime.IsMimeTypeStringRTX(codec.MimeType) && strings.Contains(codec.SDPFmtpLine, fmt.Sprintf("apt=%d", b.payloadType)) {
|
|
b.rtxPayloadType = uint8(codec.PayloadType)
|
|
break
|
|
}
|
|
}
|
|
|
|
for _, ext := range params.HeaderExtensions {
|
|
switch ext.URI {
|
|
case dd.ExtensionURI:
|
|
if b.ddExtID != 0 {
|
|
b.logger.Warnw("multiple dependency descriptor extensions found", nil, "id", ext.ID, "previous", b.ddExtID)
|
|
continue
|
|
}
|
|
b.ddExtID = uint8(ext.ID)
|
|
b.createDDParserAndFrameRateCalculator()
|
|
|
|
case sdp.AudioLevelURI:
|
|
b.audioLevelExtID = uint8(ext.ID)
|
|
b.audioLevel = audio.NewAudioLevel(b.audioLevelParams)
|
|
|
|
case act.AbsCaptureTimeURI:
|
|
b.absCaptureTimeExtID = uint8(ext.ID)
|
|
}
|
|
}
|
|
|
|
switch {
|
|
case mime.IsMimeTypeAudio(b.mime):
|
|
b.codecType = webrtc.RTPCodecTypeAudio
|
|
b.bucket = bucket.NewBucket[uint64, uint16](InitPacketBufferSizeAudio, bucket.RTPMaxPktSize, bucket.RTPSeqNumOffset)
|
|
|
|
case mime.IsMimeTypeVideo(b.mime):
|
|
b.codecType = webrtc.RTPCodecTypeVideo
|
|
b.bucket = bucket.NewBucket[uint64, uint16](InitPacketBufferSizeVideo, bucket.RTPMaxPktSize, bucket.RTPSeqNumOffset)
|
|
if b.frameRateCalculator[0] == nil {
|
|
b.createFrameRateCalculator()
|
|
}
|
|
if bitrates > 0 {
|
|
pps := bitrates / 8 / 1200
|
|
for pps > b.bucket.Capacity() {
|
|
if b.bucket.Grow() >= b.maxVideoPkts {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
default:
|
|
b.codecType = webrtc.RTPCodecType(0)
|
|
}
|
|
|
|
for _, fb := range codec.RTCPFeedback {
|
|
switch fb.Type {
|
|
case webrtc.TypeRTCPFBGoogREMB:
|
|
b.logger.Debugw("Setting feedback", "type", webrtc.TypeRTCPFBGoogREMB)
|
|
b.logger.Debugw("REMB not supported, RTCP feedback will not be generated")
|
|
case webrtc.TypeRTCPFBNACK:
|
|
// pion use a single mediaengine to manage negotiated codecs of peerconnection, that means we can't have different
|
|
// codec settings at track level for same codec type, so enable nack for all audio receivers but don't create nack queue
|
|
// for red codec.
|
|
if b.mime == mime.MimeTypeRED {
|
|
break
|
|
}
|
|
b.logger.Debugw("Setting feedback", "type", webrtc.TypeRTCPFBNACK)
|
|
b.nacker = nack.NewNACKQueue(nack.NackQueueParamsDefault)
|
|
}
|
|
}
|
|
|
|
if len(b.pPackets) != 0 {
|
|
b.logger.Debugw("releasing queued packets on bind", "count", len(b.pPackets))
|
|
}
|
|
for _, pp := range b.pPackets {
|
|
b.calc(pp.packet, nil, pp.arrivalTime, false, true)
|
|
}
|
|
b.pPackets = nil
|
|
b.bound = true
|
|
|
|
if mime.IsMimeTypeVideo(b.mime) {
|
|
go b.seedKeyFrame(b.keyFrameSeederGeneration.Inc())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *Buffer) OnCodecChange(fn func(webrtc.RTPCodecParameters)) {
|
|
b.Lock()
|
|
b.onCodecChange = fn
|
|
b.Unlock()
|
|
}
|
|
|
|
func (b *Buffer) createDDParserAndFrameRateCalculator() {
|
|
if mime.IsMimeTypeSVCCapable(b.mime) || b.mime == mime.MimeTypeVP8 {
|
|
frc := NewFrameRateCalculatorDD(b.clockRate, b.logger)
|
|
for i := range b.frameRateCalculator {
|
|
b.frameRateCalculator[i] = frc.GetFrameRateCalculatorForSpatial(int32(i))
|
|
}
|
|
b.ddParser = NewDependencyDescriptorParser(
|
|
b.ddExtID,
|
|
b.logger,
|
|
func(spatial, temporal int32) {
|
|
frc.SetMaxLayer(spatial, temporal)
|
|
},
|
|
false,
|
|
)
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) createFrameRateCalculator() {
|
|
switch b.mime {
|
|
case mime.MimeTypeVP8:
|
|
b.frameRateCalculator[0] = NewFrameRateCalculatorVP8(b.clockRate, b.logger)
|
|
|
|
case mime.MimeTypeVP9:
|
|
frc := NewFrameRateCalculatorVP9(b.clockRate, b.logger)
|
|
for i := range b.frameRateCalculator {
|
|
b.frameRateCalculator[i] = frc.GetFrameRateCalculatorForSpatial(int32(i))
|
|
}
|
|
|
|
case mime.MimeTypeH265:
|
|
b.frameRateCalculator[0] = NewFrameRateCalculatorH26x(b.clockRate, b.logger)
|
|
}
|
|
}
|
|
|
|
// Write adds an RTP Packet, ordering is not guaranteed, newer packets may arrive later
|
|
//
|
|
//go:noinline
|
|
func (b *Buffer) Write(pkt []byte) (n int, err error) {
|
|
var rtpPacket rtp.Packet
|
|
err = rtpPacket.Unmarshal(pkt)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
b.Lock()
|
|
if b.closed.Load() {
|
|
b.Unlock()
|
|
err = io.EOF
|
|
return
|
|
}
|
|
|
|
now := mono.UnixNano()
|
|
if b.twcc != nil && b.twccExtID != 0 && !b.closed.Load() {
|
|
if ext := rtpPacket.GetExtension(b.twccExtID); ext != nil {
|
|
b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), now, rtpPacket.Marker)
|
|
}
|
|
}
|
|
|
|
// libwebrtc will use 0 ssrc for probing, don't push the packet to pending queue to avoid memory increasing since
|
|
// the Bind will not be called to consume the pending packets. More details in https://github.com/pion/webrtc/pull/2816
|
|
if rtpPacket.SSRC == 0 {
|
|
b.Unlock()
|
|
return
|
|
}
|
|
|
|
// handle RTX packet
|
|
if pb := b.primaryBufferForRTX; pb != nil {
|
|
b.Unlock()
|
|
|
|
// skip padding only packets
|
|
if rtpPacket.Padding && len(rtpPacket.Payload) == 0 {
|
|
return
|
|
}
|
|
|
|
pb.writeRTX(&rtpPacket, now)
|
|
return
|
|
}
|
|
|
|
if !b.bound {
|
|
packet := make([]byte, len(pkt))
|
|
copy(packet, pkt)
|
|
|
|
if len(b.pPackets) == 0 {
|
|
b.logger.Debugw("received first packet")
|
|
}
|
|
|
|
startIdx := 0
|
|
overflow := len(b.pPackets) - max(b.maxVideoPkts, b.maxAudioPkts)
|
|
if overflow > 0 {
|
|
startIdx = overflow
|
|
}
|
|
b.pPackets = append(b.pPackets[startIdx:], pendingPacket{
|
|
packet: packet,
|
|
arrivalTime: now,
|
|
})
|
|
|
|
b.readCond.Broadcast()
|
|
b.Unlock()
|
|
return
|
|
}
|
|
|
|
b.calc(pkt, &rtpPacket, now, false, false)
|
|
b.readCond.Broadcast()
|
|
b.Unlock()
|
|
return
|
|
}
|
|
|
|
func (b *Buffer) SetPrimaryBufferForRTX(primaryBuffer *Buffer) {
|
|
b.Lock()
|
|
b.primaryBufferForRTX = primaryBuffer
|
|
pkts := b.pPackets
|
|
b.pPackets = nil
|
|
b.Unlock()
|
|
for _, pp := range pkts {
|
|
var rtpPacket rtp.Packet
|
|
err := rtpPacket.Unmarshal(pp.packet)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if rtpPacket.Padding && len(rtpPacket.Payload) == 0 {
|
|
continue
|
|
}
|
|
primaryBuffer.writeRTX(&rtpPacket, pp.arrivalTime)
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) writeRTX(rtxPkt *rtp.Packet, arrivalTime int64) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
if !b.bound {
|
|
return
|
|
}
|
|
|
|
if rtxPkt.PayloadType != b.rtxPayloadType {
|
|
b.logger.Debugw("unexpected rtx payload type", "expected", b.rtxPayloadType, "actual", rtxPkt.PayloadType)
|
|
return
|
|
}
|
|
|
|
if b.rtxPktBuf == nil {
|
|
b.rtxPktBuf = make([]byte, bucket.RTPMaxPktSize)
|
|
}
|
|
|
|
if len(rtxPkt.Payload) < 2 {
|
|
b.logger.Warnw("rtx payload too short", nil, "size", len(rtxPkt.Payload))
|
|
return
|
|
}
|
|
|
|
repairedPkt := *rtxPkt
|
|
repairedPkt.PayloadType = b.payloadType
|
|
repairedPkt.SequenceNumber = binary.BigEndian.Uint16(rtxPkt.Payload[:2])
|
|
repairedPkt.SSRC = b.mediaSSRC
|
|
repairedPkt.Payload = rtxPkt.Payload[2:]
|
|
n, err := repairedPkt.MarshalTo(b.rtxPktBuf)
|
|
if err != nil {
|
|
b.logger.Errorw("could not marshal repaired packet", err, "ssrc", b.mediaSSRC, "sn", repairedPkt.SequenceNumber)
|
|
return
|
|
}
|
|
|
|
b.calc(b.rtxPktBuf[:n], &repairedPkt, arrivalTime, true, false)
|
|
b.readCond.Broadcast()
|
|
}
|
|
|
|
func (b *Buffer) Read(buff []byte) (n int, err error) {
|
|
b.Lock()
|
|
for {
|
|
if b.closed.Load() {
|
|
b.Unlock()
|
|
return 0, io.EOF
|
|
}
|
|
if b.pPackets != nil && len(b.pPackets) > b.lastPacketRead {
|
|
if len(buff) < len(b.pPackets[b.lastPacketRead].packet) {
|
|
b.Unlock()
|
|
return 0, bucket.ErrBufferTooSmall
|
|
}
|
|
|
|
n = copy(buff, b.pPackets[b.lastPacketRead].packet)
|
|
b.lastPacketRead++
|
|
b.Unlock()
|
|
return
|
|
}
|
|
b.readCond.Wait()
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) ReadExtended(buf []byte) (*ExtPacket, error) {
|
|
b.Lock()
|
|
for {
|
|
if b.closed.Load() {
|
|
b.Unlock()
|
|
return nil, io.EOF
|
|
}
|
|
if b.extPackets.Len() > 0 {
|
|
ep := b.extPackets.PopFront()
|
|
patched := b.patchExtPacket(ep, buf)
|
|
if patched == nil {
|
|
ReleaseExtPacket(ep)
|
|
continue
|
|
}
|
|
|
|
b.Unlock()
|
|
return patched, nil
|
|
}
|
|
b.readCond.Wait()
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) Close() error {
|
|
b.closeOnce.Do(func() {
|
|
b.closed.Store(true)
|
|
|
|
b.RLock()
|
|
rtpStats := b.rtpStats
|
|
b.readCond.Broadcast()
|
|
b.RUnlock()
|
|
|
|
if rtpStats != nil {
|
|
rtpStats.Stop()
|
|
b.logger.Debugw("rtp stats",
|
|
"direction", "upstream",
|
|
"stats", rtpStats,
|
|
)
|
|
if cb := b.getOnFinalRtpStats(); cb != nil {
|
|
cb(rtpStats.ToProto())
|
|
}
|
|
}
|
|
|
|
if cb := b.getOnClose(); cb != nil {
|
|
cb()
|
|
}
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (b *Buffer) OnClose(fn func()) {
|
|
b.Lock()
|
|
b.onClose = fn
|
|
b.Unlock()
|
|
}
|
|
|
|
func (b *Buffer) getOnClose() func() {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
return b.onClose
|
|
}
|
|
|
|
func (b *Buffer) SetPLIThrottle(duration int64) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
|
|
b.pliThrottle = duration
|
|
}
|
|
|
|
func (b *Buffer) SendPLI(force bool) {
|
|
b.RLock()
|
|
rtpStats := b.rtpStats
|
|
pliThrottle := b.pliThrottle
|
|
b.RUnlock()
|
|
|
|
if (rtpStats == nil && !force) || !rtpStats.CheckAndUpdatePli(pliThrottle, force) {
|
|
return
|
|
}
|
|
|
|
b.logger.Debugw("send pli", "ssrc", b.mediaSSRC, "force", force)
|
|
pli := []rtcp.Packet{
|
|
&rtcp.PictureLossIndication{SenderSSRC: b.mediaSSRC, MediaSSRC: b.mediaSSRC},
|
|
}
|
|
|
|
if cb := b.getOnRtcpFeedback(); cb != nil {
|
|
cb(pli)
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) SetRTT(rtt uint32) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
|
|
if rtt == 0 {
|
|
return
|
|
}
|
|
|
|
if b.nacker != nil {
|
|
b.nacker.SetRTT(rtt)
|
|
}
|
|
|
|
if b.rtpStats != nil {
|
|
b.rtpStats.UpdateRtt(rtt)
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, isRTX bool, isBuffered bool) {
|
|
defer func() {
|
|
b.doNACKs()
|
|
|
|
b.doReports(arrivalTime)
|
|
}()
|
|
|
|
if rtpPacket == nil {
|
|
rtpPacket = &rtp.Packet{}
|
|
if err := rtpPacket.Unmarshal(rawPkt); err != nil {
|
|
b.logger.Errorw("could not unmarshal RTP packet", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
// process header extensions always as padding packets could be used for probing
|
|
b.processHeaderExtensions(rtpPacket, arrivalTime, isRTX)
|
|
|
|
flowState := b.updateStreamState(rtpPacket, arrivalTime)
|
|
if flowState.IsNotHandled {
|
|
return
|
|
}
|
|
|
|
if len(rtpPacket.Payload) == 0 && (!flowState.IsOutOfOrder || flowState.IsDuplicate) {
|
|
// drop padding only in-order or duplicate packet
|
|
if !flowState.IsOutOfOrder {
|
|
// in-order packet - increment sequence number offset for subsequent packets
|
|
// Example:
|
|
// 40 - regular packet - pass through as sequence number 40
|
|
// 41 - missing packet - don't know what it is, could be padding or not
|
|
// 42 - padding only packet - in-order - drop - increment sequence number offset to 1 -
|
|
// range[0, 42] = 0 offset
|
|
// 41 - arrives out of order - get offset 0 from cache - passed through as sequence number 41
|
|
// 43 - regular packet - offset = 1 (running offset) - passes through as sequence number 42
|
|
// 44 - padding only - in order - drop - increment sequence number offset to 2
|
|
// range[0, 42] = 0 offset, range[43, 44] = 1 offset
|
|
// 43 - regular packet - out of order + duplicate - offset = 1 from cache -
|
|
// adjusted sequence number is 42, will be dropped by RTX buffer AddPacket method as duplicate
|
|
// 45 - regular packet - offset = 2 (running offset) - passed through with adjusted sequence number as 43
|
|
// 44 - padding only - out-of-order + duplicate - dropped as duplicate
|
|
//
|
|
if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil {
|
|
b.logger.Errorw(
|
|
"could not exclude range", err,
|
|
"sn", rtpPacket.SequenceNumber,
|
|
"esn", flowState.ExtSequenceNumber,
|
|
"rtpStats", b.rtpStats,
|
|
"snRangeMap", b.snRangeMap,
|
|
)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
if !flowState.IsOutOfOrder && rtpPacket.PayloadType != b.payloadType && b.codecType == webrtc.RTPCodecTypeVideo {
|
|
b.handleCodecChange(rtpPacket.PayloadType)
|
|
}
|
|
|
|
// add to RTX buffer using sequence number after accounting for dropped padding only packets
|
|
snAdjustment, err := b.snRangeMap.GetValue(flowState.ExtSequenceNumber)
|
|
if err != nil {
|
|
b.logger.Errorw(
|
|
"could not get sequence number adjustment", err,
|
|
"sn", rtpPacket.SequenceNumber,
|
|
"esn", flowState.ExtSequenceNumber,
|
|
"payloadSize", len(rtpPacket.Payload),
|
|
"rtpStats", b.rtpStats,
|
|
"snRangeMap", b.snRangeMap,
|
|
)
|
|
return
|
|
}
|
|
flowState.ExtSequenceNumber -= snAdjustment
|
|
rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber)
|
|
_, err = b.bucket.AddPacketWithSequenceNumber(rawPkt, flowState.ExtSequenceNumber)
|
|
if err != nil {
|
|
if !flowState.IsDuplicate {
|
|
if errors.Is(err, bucket.ErrPacketTooOld) {
|
|
packetTooOldCount := b.packetTooOldCount.Inc()
|
|
if (packetTooOldCount-1)%100 == 0 {
|
|
b.logger.Warnw(
|
|
"could not add packet to bucket", err,
|
|
"count", packetTooOldCount,
|
|
"flowState", &flowState,
|
|
"snAdjustment", snAdjustment,
|
|
"incomingSequenceNumber", flowState.ExtSequenceNumber+snAdjustment,
|
|
"rtpStats", b.rtpStats,
|
|
"snRangeMap", b.snRangeMap,
|
|
)
|
|
}
|
|
} else if err != bucket.ErrRTXPacket {
|
|
b.logger.Warnw(
|
|
"could not add packet to bucket", err,
|
|
"flowState", &flowState,
|
|
"snAdjustment", snAdjustment,
|
|
"incomingSequenceNumber", flowState.ExtSequenceNumber+snAdjustment,
|
|
"rtpStats", b.rtpStats,
|
|
"snRangeMap", b.snRangeMap,
|
|
)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
ep := b.getExtPacket(rtpPacket, arrivalTime, isBuffered, flowState)
|
|
if ep == nil {
|
|
return
|
|
}
|
|
b.extPackets.PushBack(ep)
|
|
|
|
if b.extPackets.Len() > b.bucket.Capacity() {
|
|
if (b.extPacketTooMuchCount.Inc()-1)%100 == 0 {
|
|
b.logger.Warnw("too much ext packets", nil, "count", b.extPackets.Len())
|
|
}
|
|
}
|
|
|
|
b.doFpsCalc(ep)
|
|
}
|
|
|
|
func (b *Buffer) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket {
|
|
n, err := b.getPacket(buf, ep.ExtSequenceNumber)
|
|
if err != nil {
|
|
packetNotFoundCount := b.packetNotFoundCount.Inc()
|
|
if (packetNotFoundCount-1)%20 == 0 {
|
|
b.logger.Warnw(
|
|
"could not get packet from bucket", err,
|
|
"sn", ep.Packet.SequenceNumber,
|
|
"headSN", b.bucket.HeadSequenceNumber(),
|
|
"count", packetNotFoundCount,
|
|
"rtpStats", b.rtpStats,
|
|
"snRangeMap", b.snRangeMap,
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
ep.RawPacket = buf[:n]
|
|
|
|
// patch RTP packet to point payload to new buffer
|
|
pkt := *ep.Packet
|
|
payloadStart := ep.Packet.Header.MarshalSize()
|
|
payloadEnd := payloadStart + len(ep.Packet.Payload)
|
|
if payloadEnd > n {
|
|
b.logger.Warnw("unexpected marshal size", nil, "max", n, "need", payloadEnd)
|
|
return nil
|
|
}
|
|
pkt.Payload = buf[payloadStart:payloadEnd]
|
|
ep.Packet = &pkt
|
|
|
|
return ep
|
|
}
|
|
|
|
func (b *Buffer) doFpsCalc(ep *ExtPacket) {
|
|
if b.paused || b.frameRateCalculated || len(ep.Packet.Payload) == 0 {
|
|
return
|
|
}
|
|
spatial := ep.Spatial
|
|
if spatial < 0 || int(spatial) >= len(b.frameRateCalculator) {
|
|
spatial = 0
|
|
}
|
|
if fr := b.frameRateCalculator[spatial]; fr != nil {
|
|
if fr.RecvPacket(ep) {
|
|
complete := true
|
|
for _, fr2 := range b.frameRateCalculator {
|
|
if fr2 != nil && !fr2.Completed() {
|
|
complete = false
|
|
break
|
|
}
|
|
}
|
|
if complete {
|
|
b.frameRateCalculated = true
|
|
if f := b.onFpsChanged; f != nil {
|
|
go f()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) handleCodecChange(newPT uint8) {
|
|
var (
|
|
codecFound, rtxFound bool
|
|
rtxPt uint8
|
|
newCodec webrtc.RTPCodecParameters
|
|
)
|
|
for _, codec := range b.rtpParameters.Codecs {
|
|
if !codecFound && uint8(codec.PayloadType) == newPT {
|
|
newCodec = codec
|
|
codecFound = true
|
|
}
|
|
|
|
if mime.IsMimeTypeStringRTX(codec.MimeType) && strings.Contains(codec.SDPFmtpLine, fmt.Sprintf("apt=%d", newPT)) {
|
|
rtxFound = true
|
|
rtxPt = uint8(codec.PayloadType)
|
|
}
|
|
|
|
if codecFound && rtxFound {
|
|
break
|
|
}
|
|
}
|
|
if !codecFound {
|
|
b.logger.Errorw("could not find codec for new payload type", nil, "pt", newPT, "rtpParameters", b.rtpParameters)
|
|
return
|
|
}
|
|
b.logger.Infow(
|
|
"codec changed",
|
|
"oldPayload", b.payloadType, "newPayload", newPT,
|
|
"oldRtxPayload", b.rtxPayloadType, "newRtxPayload", rtxPt,
|
|
"oldMime", b.mime, "newMime", newCodec.MimeType)
|
|
b.payloadType = newPT
|
|
b.rtxPayloadType = rtxPt
|
|
b.mime = mime.NormalizeMimeType(newCodec.MimeType)
|
|
b.frameRateCalculated = false
|
|
|
|
if b.ddExtID != 0 {
|
|
b.createDDParserAndFrameRateCalculator()
|
|
}
|
|
|
|
if b.frameRateCalculator[0] == nil {
|
|
b.createFrameRateCalculator()
|
|
}
|
|
|
|
b.bucket.ResyncOnNextPacket()
|
|
|
|
if f := b.onCodecChange; f != nil {
|
|
go f(newCodec)
|
|
}
|
|
|
|
if mime.IsMimeTypeVideo(b.mime) {
|
|
go b.seedKeyFrame(b.keyFrameSeederGeneration.Inc())
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) rtpstats.RTPFlowState {
|
|
flowState := b.rtpStats.Update(
|
|
arrivalTime,
|
|
p.Header.SequenceNumber,
|
|
p.Header.Timestamp,
|
|
p.Header.Marker,
|
|
p.Header.MarshalSize(),
|
|
len(p.Payload),
|
|
int(p.PaddingSize),
|
|
)
|
|
|
|
if b.nacker != nil {
|
|
b.nacker.Remove(p.SequenceNumber)
|
|
|
|
for lost := flowState.LossStartInclusive; lost != flowState.LossEndExclusive; lost++ {
|
|
b.nacker.Push(uint16(lost))
|
|
}
|
|
}
|
|
|
|
return flowState
|
|
}
|
|
|
|
func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64, isRTX bool) {
|
|
if b.audioLevelExtID != 0 && !isRTX {
|
|
if !b.latestTSForAudioLevelInitialized {
|
|
b.latestTSForAudioLevelInitialized = true
|
|
b.latestTSForAudioLevel = p.Timestamp
|
|
}
|
|
if e := p.GetExtension(b.audioLevelExtID); e != nil {
|
|
ext := rtp.AudioLevelExtension{}
|
|
if err := ext.Unmarshal(e); err == nil {
|
|
if (p.Timestamp - b.latestTSForAudioLevel) < (1 << 31) {
|
|
duration := (int64(p.Timestamp) - int64(b.latestTSForAudioLevel)) * 1e3 / int64(b.clockRate)
|
|
if duration > 0 {
|
|
b.audioLevel.Observe(ext.Level, uint32(duration), arrivalTime)
|
|
}
|
|
|
|
b.latestTSForAudioLevel = p.Timestamp
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime int64, isBuffered bool, flowState rtpstats.RTPFlowState) *ExtPacket {
|
|
ep := ExtPacketFactory.Get().(*ExtPacket)
|
|
*ep = ExtPacket{
|
|
Arrival: arrivalTime,
|
|
ExtSequenceNumber: flowState.ExtSequenceNumber,
|
|
ExtTimestamp: flowState.ExtTimestamp,
|
|
Packet: rtpPacket,
|
|
VideoLayer: VideoLayer{
|
|
Spatial: InvalidLayerSpatial,
|
|
Temporal: InvalidLayerTemporal,
|
|
},
|
|
IsOutOfOrder: flowState.IsOutOfOrder,
|
|
IsBuffered: isBuffered,
|
|
}
|
|
|
|
if len(rtpPacket.Payload) == 0 {
|
|
// padding only packet, nothing else to do
|
|
return ep
|
|
}
|
|
|
|
ep.Temporal = 0
|
|
var videoSize []VideoSize
|
|
if b.ddParser != nil {
|
|
ddVal, videoLayer, err := b.ddParser.Parse(ep.Packet)
|
|
if err != nil {
|
|
if errors.Is(err, ErrDDExtentionNotFound) {
|
|
if b.mime == mime.MimeTypeVP8 || b.mime == mime.MimeTypeVP9 {
|
|
b.logger.Infow("dd extension not found, disable dd parser")
|
|
b.ddParser = nil
|
|
b.createFrameRateCalculator()
|
|
}
|
|
} else {
|
|
ReleaseExtPacket(ep)
|
|
return nil
|
|
}
|
|
} else if ddVal != nil {
|
|
ep.DependencyDescriptor = ddVal
|
|
ep.VideoLayer = videoLayer
|
|
videoSize = ExtractDependencyDescriptorVideoSize(ddVal.Descriptor)
|
|
// DD-TODO : notify active decode target change if changed.
|
|
}
|
|
}
|
|
|
|
switch b.mime {
|
|
case mime.MimeTypeVP8:
|
|
vp8Packet := VP8{}
|
|
if err := vp8Packet.Unmarshal(rtpPacket.Payload); err != nil {
|
|
b.logger.Warnw("could not unmarshal VP8 packet", err)
|
|
ReleaseExtPacket(ep)
|
|
return nil
|
|
}
|
|
ep.KeyFrame = vp8Packet.IsKeyFrame
|
|
if ep.DependencyDescriptor == nil {
|
|
ep.Temporal = int32(vp8Packet.TID)
|
|
|
|
if ep.KeyFrame {
|
|
if sz := ExtractVP8VideoSize(&vp8Packet, rtpPacket.Payload); sz.Width > 0 && sz.Height > 0 {
|
|
videoSize = append(videoSize, sz)
|
|
}
|
|
}
|
|
} else {
|
|
// vp8 with DependencyDescriptor enabled, use the TID from the descriptor
|
|
vp8Packet.TID = uint8(ep.Temporal)
|
|
}
|
|
ep.Payload = vp8Packet
|
|
ep.Spatial = InvalidLayerSpatial // vp8 don't have spatial scalability, reset to invalid
|
|
|
|
case mime.MimeTypeVP9:
|
|
if ep.DependencyDescriptor == nil {
|
|
var vp9Packet codecs.VP9Packet
|
|
_, err := vp9Packet.Unmarshal(rtpPacket.Payload)
|
|
if err != nil {
|
|
b.logger.Warnw("could not unmarshal VP9 packet", err)
|
|
ReleaseExtPacket(ep)
|
|
return nil
|
|
}
|
|
ep.VideoLayer = VideoLayer{
|
|
Spatial: int32(vp9Packet.SID),
|
|
Temporal: int32(vp9Packet.TID),
|
|
}
|
|
ep.Payload = vp9Packet
|
|
ep.KeyFrame = IsVP9KeyFrame(&vp9Packet, rtpPacket.Payload)
|
|
|
|
if ep.KeyFrame {
|
|
for i := 0; i < len(vp9Packet.Width); i++ {
|
|
videoSize = append(videoSize, VideoSize{
|
|
Width: uint32(vp9Packet.Width[i]),
|
|
Height: uint32(vp9Packet.Height[i]),
|
|
})
|
|
}
|
|
}
|
|
} else {
|
|
ep.KeyFrame = IsVP9KeyFrame(nil, rtpPacket.Payload)
|
|
}
|
|
|
|
case mime.MimeTypeH264:
|
|
ep.KeyFrame = IsH264KeyFrame(rtpPacket.Payload)
|
|
ep.Spatial = InvalidLayerSpatial // h.264 don't have spatial scalability, reset to invalid
|
|
|
|
// Check H264 key frame video size
|
|
if ep.KeyFrame {
|
|
if sz := ExtractH264VideoSize(rtpPacket.Payload); sz.Width > 0 && sz.Height > 0 {
|
|
videoSize = append(videoSize, sz)
|
|
}
|
|
}
|
|
|
|
case mime.MimeTypeAV1:
|
|
ep.KeyFrame = IsAV1KeyFrame(rtpPacket.Payload)
|
|
|
|
case mime.MimeTypeH265:
|
|
ep.KeyFrame = IsH265KeyFrame(rtpPacket.Payload)
|
|
if ep.DependencyDescriptor == nil {
|
|
if len(rtpPacket.Payload) < 2 {
|
|
b.logger.Warnw("invalid H265 packet", nil)
|
|
ReleaseExtPacket(ep)
|
|
return nil
|
|
}
|
|
ep.VideoLayer = VideoLayer{
|
|
Temporal: int32(rtpPacket.Payload[1]&0x07) - 1,
|
|
}
|
|
ep.Spatial = InvalidLayerSpatial
|
|
|
|
if ep.KeyFrame {
|
|
if sz := ExtractH265VideoSize(rtpPacket.Payload); sz.Width > 0 && sz.Height > 0 {
|
|
videoSize = append(videoSize, sz)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if ep.KeyFrame {
|
|
if b.rtpStats != nil {
|
|
b.rtpStats.UpdateKeyFrame(1)
|
|
}
|
|
}
|
|
|
|
if b.absCaptureTimeExtID != 0 {
|
|
extData := rtpPacket.GetExtension(b.absCaptureTimeExtID)
|
|
|
|
var actExt act.AbsCaptureTime
|
|
if err := actExt.Unmarshal(extData); err == nil {
|
|
ep.AbsCaptureTimeExt = &actExt
|
|
}
|
|
}
|
|
|
|
if len(videoSize) > 0 {
|
|
b.checkVideoSizeChange(videoSize)
|
|
}
|
|
|
|
return ep
|
|
}
|
|
|
|
func (b *Buffer) doNACKs() {
|
|
if b.nacker == nil {
|
|
return
|
|
}
|
|
|
|
if r, numSeqNumsNacked := b.buildNACKPacket(); r != nil {
|
|
if cb := b.onRtcpFeedback; cb != nil {
|
|
cb(r)
|
|
}
|
|
if b.rtpStats != nil {
|
|
b.rtpStats.UpdateNack(uint32(numSeqNumsNacked))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) doReports(arrivalTime int64) {
|
|
if arrivalTime-b.lastReport < ReportDelta {
|
|
return
|
|
}
|
|
|
|
b.lastReport = arrivalTime
|
|
|
|
// RTCP reports
|
|
pkts := b.getRTCP()
|
|
if pkts != nil {
|
|
if cb := b.onRtcpFeedback; cb != nil {
|
|
cb(pkts)
|
|
}
|
|
}
|
|
|
|
b.mayGrowBucket()
|
|
}
|
|
|
|
func (b *Buffer) mayGrowBucket() {
|
|
cap := b.bucket.Capacity()
|
|
maxPkts := b.maxVideoPkts
|
|
if b.codecType == webrtc.RTPCodecTypeAudio {
|
|
maxPkts = b.maxAudioPkts
|
|
}
|
|
if cap >= maxPkts {
|
|
return
|
|
}
|
|
oldCap := cap
|
|
if deltaInfo := b.rtpStats.DeltaInfo(b.ppsSnapshotId); deltaInfo != nil {
|
|
duration := deltaInfo.EndTime.Sub(deltaInfo.StartTime)
|
|
if duration > 500*time.Millisecond {
|
|
pps := int(time.Duration(deltaInfo.Packets) * time.Second / duration)
|
|
for pps > cap && cap < maxPkts {
|
|
cap = b.bucket.Grow()
|
|
}
|
|
if cap > oldCap {
|
|
b.logger.Infow(
|
|
"grow bucket",
|
|
"from", oldCap,
|
|
"to", cap,
|
|
"pps", pps,
|
|
"deltaInfo", deltaInfo,
|
|
"rtpStats", b.rtpStats,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) buildNACKPacket() ([]rtcp.Packet, int) {
|
|
if nacks, numSeqNumsNacked := b.nacker.Pairs(); len(nacks) > 0 {
|
|
pkts := []rtcp.Packet{&rtcp.TransportLayerNack{
|
|
SenderSSRC: b.mediaSSRC,
|
|
MediaSSRC: b.mediaSSRC,
|
|
Nacks: nacks,
|
|
}}
|
|
return pkts, numSeqNumsNacked
|
|
}
|
|
return nil, 0
|
|
}
|
|
|
|
func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport {
|
|
if b.rtpStats == nil {
|
|
return nil
|
|
}
|
|
|
|
proxyLoss := b.lastFractionLostToReport
|
|
if b.codecType == webrtc.RTPCodecTypeAudio && !b.enableAudioLossProxying {
|
|
proxyLoss = 0
|
|
}
|
|
|
|
return b.rtpStats.GetRtcpReceptionReport(b.mediaSSRC, proxyLoss, b.rrSnapshotId)
|
|
}
|
|
|
|
func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64, packets uint32, octets uint32) {
|
|
b.RLock()
|
|
srData := &livekit.RTCPSenderReportState{
|
|
RtpTimestamp: rtpTime,
|
|
NtpTimestamp: ntpTime,
|
|
At: mono.UnixNano(),
|
|
Packets: packets,
|
|
Octets: uint64(octets),
|
|
}
|
|
|
|
didSet := false
|
|
if b.rtpStats != nil {
|
|
didSet = b.rtpStats.SetRtcpSenderReportData(srData)
|
|
}
|
|
b.RUnlock()
|
|
|
|
if didSet {
|
|
if cb := b.getOnRtcpSenderReport(); cb != nil {
|
|
cb()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) GetSenderReportData() *livekit.RTCPSenderReportState {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
if b.rtpStats != nil {
|
|
return b.rtpStats.GetRtcpSenderReportData()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *Buffer) SetLastFractionLostReport(lost uint8) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
|
|
b.lastFractionLostToReport = lost
|
|
}
|
|
|
|
func (b *Buffer) getRTCP() []rtcp.Packet {
|
|
var pkts []rtcp.Packet
|
|
|
|
rr := b.buildReceptionReport()
|
|
if rr != nil {
|
|
pkts = append(pkts, &rtcp.ReceiverReport{
|
|
SSRC: b.mediaSSRC,
|
|
Reports: []rtcp.ReceptionReport{*rr},
|
|
})
|
|
}
|
|
|
|
return pkts
|
|
}
|
|
|
|
func (b *Buffer) GetPacket(buff []byte, esn uint64) (int, error) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
|
|
return b.getPacket(buff, esn)
|
|
}
|
|
|
|
func (b *Buffer) getPacket(buff []byte, esn uint64) (int, error) {
|
|
if b.closed.Load() {
|
|
return 0, io.EOF
|
|
}
|
|
return b.bucket.GetPacket(buff, esn)
|
|
}
|
|
|
|
func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) {
|
|
b.Lock()
|
|
b.onRtcpFeedback = fn
|
|
b.Unlock()
|
|
}
|
|
|
|
func (b *Buffer) getOnRtcpFeedback() func(fb []rtcp.Packet) {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
return b.onRtcpFeedback
|
|
}
|
|
|
|
func (b *Buffer) OnRtcpSenderReport(fn func()) {
|
|
b.Lock()
|
|
b.onRtcpSenderReport = fn
|
|
b.Unlock()
|
|
}
|
|
|
|
func (b *Buffer) getOnRtcpSenderReport() func() {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
return b.onRtcpSenderReport
|
|
}
|
|
|
|
func (b *Buffer) OnFinalRtpStats(fn func(*livekit.RTPStats)) {
|
|
b.Lock()
|
|
b.onFinalRtpStats = fn
|
|
b.Unlock()
|
|
}
|
|
|
|
func (b *Buffer) getOnFinalRtpStats() func(*livekit.RTPStats) {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
return b.onFinalRtpStats
|
|
}
|
|
|
|
// GetMediaSSRC returns the associated SSRC of the RTP stream
|
|
func (b *Buffer) GetMediaSSRC() uint32 {
|
|
return b.mediaSSRC
|
|
}
|
|
|
|
// GetClockRate returns the RTP clock rate
|
|
func (b *Buffer) GetClockRate() uint32 {
|
|
return b.clockRate
|
|
}
|
|
|
|
func (b *Buffer) GetStats() *livekit.RTPStats {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
if b.rtpStats == nil {
|
|
return nil
|
|
}
|
|
|
|
return b.rtpStats.ToProto()
|
|
}
|
|
|
|
func (b *Buffer) GetDeltaStats() *StreamStatsWithLayers {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
if b.rtpStats == nil {
|
|
return nil
|
|
}
|
|
|
|
deltaStats := b.rtpStats.DeltaInfo(b.deltaStatsSnapshotId)
|
|
if deltaStats == nil {
|
|
return nil
|
|
}
|
|
|
|
return &StreamStatsWithLayers{
|
|
RTPStats: deltaStats,
|
|
Layers: map[int32]*rtpstats.RTPDeltaInfo{
|
|
0: deltaStats,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) GetLastSenderReportTime() time.Time {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
if b.rtpStats == nil {
|
|
return time.Time{}
|
|
}
|
|
|
|
return b.rtpStats.LastSenderReportTime()
|
|
}
|
|
|
|
func (b *Buffer) GetAudioLevel() (float64, bool) {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
if b.audioLevel == nil {
|
|
return 0, false
|
|
}
|
|
|
|
return b.audioLevel.GetLevel(mono.UnixNano())
|
|
}
|
|
|
|
func (b *Buffer) OnFpsChanged(f func()) {
|
|
b.Lock()
|
|
b.onFpsChanged = f
|
|
b.Unlock()
|
|
}
|
|
|
|
func (b *Buffer) OnVideoSizeChanged(fn func([]VideoSize)) {
|
|
b.Lock()
|
|
b.onVideoSizeChanged = fn
|
|
b.Unlock()
|
|
}
|
|
|
|
// checkVideoSizeChange checks if video size has changed for a specific spatial layer and fires callback
|
|
func (b *Buffer) checkVideoSizeChange(videoSizes []VideoSize) {
|
|
if len(videoSizes) > len(b.currentVideoSize) {
|
|
b.logger.Warnw("video size index out of range", nil, "newSize", videoSizes, "currentVideoSize", b.currentVideoSize)
|
|
return
|
|
}
|
|
|
|
if len(videoSizes) < len(b.currentVideoSize) {
|
|
videoSizes = append(videoSizes, make([]VideoSize, len(b.currentVideoSize)-len(videoSizes))...)
|
|
}
|
|
|
|
changed := false
|
|
for i, sz := range videoSizes {
|
|
if b.currentVideoSize[i].Width != sz.Width || b.currentVideoSize[i].Height != sz.Height {
|
|
changed = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if changed {
|
|
b.logger.Debugw("video size changed", "from", b.currentVideoSize, "to", videoSizes)
|
|
copy(b.currentVideoSize[:], videoSizes[:])
|
|
if b.onVideoSizeChanged != nil {
|
|
go b.onVideoSizeChanged(videoSizes)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) GetTemporalLayerFpsForSpatial(layer int32) []float32 {
|
|
if int(layer) >= len(b.frameRateCalculator) {
|
|
return nil
|
|
}
|
|
|
|
if fc := b.frameRateCalculator[layer]; fc != nil {
|
|
return fc.GetFrameRate()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *Buffer) seedKeyFrame(keyFrameSeederGeneration int32) {
|
|
// a key frame is needed especially when using Dependency Descriptor
|
|
// to get the DD structure which is used in parsing subsequent packets,
|
|
// till then packets are dropped which results in stream tracker not
|
|
// getting any data which means it does not declare layer start.
|
|
//
|
|
// send gratuitous PLIs for some time or until a key frame is seen to
|
|
// get the engine rolling
|
|
b.logger.Debugw("starting key frame seeder")
|
|
timer := time.NewTimer(30 * time.Second)
|
|
defer timer.Stop()
|
|
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
|
|
initialCount := uint32(0)
|
|
b.RLock()
|
|
rtpStats := b.rtpStats
|
|
b.RUnlock()
|
|
if rtpStats != nil {
|
|
initialCount, _ = rtpStats.KeyFrame()
|
|
}
|
|
|
|
for {
|
|
if b.closed.Load() || b.keyFrameSeederGeneration.Load() != keyFrameSeederGeneration {
|
|
b.logger.Debugw("stopping key frame seeder: stopped")
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-timer.C:
|
|
b.logger.Debugw("stopping key frame seeder: timeout")
|
|
return
|
|
|
|
case <-ticker.C:
|
|
if rtpStats != nil {
|
|
cnt, last := rtpStats.KeyFrame()
|
|
if cnt > initialCount {
|
|
b.logger.Debugw(
|
|
"stopping key frame seeder: received key frame",
|
|
"keyFrameCountInitial", initialCount,
|
|
"keyFrameCount", cnt,
|
|
"lastKeyFrame", last,
|
|
)
|
|
return
|
|
}
|
|
|
|
b.SendPLI(false)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
|
|
func ReleaseExtPacket(extPkt *ExtPacket) {
|
|
if extPkt == nil {
|
|
return
|
|
}
|
|
|
|
ReleaseExtDependencyDescriptor(extPkt.DependencyDescriptor)
|
|
|
|
*extPkt = ExtPacket{}
|
|
ExtPacketFactory.Put(extPkt)
|
|
}
|