Add support for "abs-capture-time" extension. (#2640)

* Add support for "abs-capture-time" extension.

Currently, it is just passed through from publisher -> subscriber side.

TODO: Need to store in sequencer and restore for retransmission.

* abs-capture-time in retransmissions

* clean up

* fix test

* more test fixes

* more test fixes

* more test fixes

* log only when size is non-zero

* log on both sides for debugging

* add marshal/unmarshal

* normalize abs capture time to SFU clock

* comment out adding abs-capture-time from registered extensions
This commit is contained in:
Raja Subramanian
2024-04-11 15:25:10 +05:30
committed by GitHub
parent eaaf44d2a2
commit ad1f508680
32 changed files with 383 additions and 103 deletions
+10 -2
View File
@@ -20,7 +20,7 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
)
@@ -88,6 +88,7 @@ func NewWebRTCConfig(conf *config.Config) (*WebRTCConfig, error) {
sdp.SDESMidURI,
sdp.SDESRTPStreamIDURI,
sdp.AudioLevelURI,
//act.AbsCaptureTimeURI,
},
Video: []string{
sdp.SDESMidURI,
@@ -96,6 +97,7 @@ func NewWebRTCConfig(conf *config.Config) (*WebRTCConfig, error) {
frameMarking,
dd.ExtensionURI,
repairedRTPStreamID,
//act.AbsCaptureTimeURI,
},
},
RTCPFeedback: RTCPFeedbackConfig{
@@ -115,7 +117,13 @@ func NewWebRTCConfig(conf *config.Config) (*WebRTCConfig, error) {
subscriberConfig := DirectionConfig{
StrictACKs: conf.RTC.StrictACKs,
RTPHeaderExtension: RTPHeaderExtensionConfig{
Video: []string{dd.ExtensionURI},
Video: []string{
dd.ExtensionURI,
//act.AbsCaptureTimeURI,
},
Audio: []string{
//act.AbsCaptureTimeURI,
},
},
RTCPFeedback: RTCPFeedbackConfig{
Video: []webrtc.RTCPFeedback{
+63 -17
View File
@@ -29,9 +29,22 @@ const (
videoRTXMimeType = "video/rtx"
)
var opusCodecCapability = webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 2, SDPFmtpLine: "minptime=10;useinbandfec=1"}
var redCodecCapability = webrtc.RTPCodecCapability{MimeType: sfu.MimeTypeAudioRed, ClockRate: 48000, Channels: 2, SDPFmtpLine: "111/111"}
var videoRTX = webrtc.RTPCodecCapability{MimeType: videoRTXMimeType, ClockRate: 90000}
var opusCodecCapability = webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
SDPFmtpLine: "minptime=10;useinbandfec=1",
}
var redCodecCapability = webrtc.RTPCodecCapability{
MimeType: sfu.MimeTypeAudioRed,
ClockRate: 48000,
Channels: 2,
SDPFmtpLine: "111/111",
}
var videoRTX = webrtc.RTPCodecCapability{
MimeType: videoRTXMimeType,
ClockRate: 90000,
}
func registerCodecs(me *webrtc.MediaEngine, codecs []*livekit.Codec, rtcpFeedback RTCPFeedbackConfig, filterOutH264HighProfile bool) error {
opusCodec := opusCodecCapability
@@ -61,32 +74,65 @@ func registerCodecs(me *webrtc.MediaEngine, codecs []*livekit.Codec, rtcpFeedbac
h264HighProfileFmtp := "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=640032"
for _, codec := range []webrtc.RTPCodecParameters{
{
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8, ClockRate: 90000, RTCPFeedback: rtcpFeedback.Video},
PayloadType: 96,
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP8,
ClockRate: 90000,
RTCPFeedback: rtcpFeedback.Video,
},
PayloadType: 96,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP9, ClockRate: 90000, SDPFmtpLine: "profile-id=0", RTCPFeedback: rtcpFeedback.Video},
PayloadType: 98,
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
ClockRate: 90000,
SDPFmtpLine: "profile-id=0",
RTCPFeedback: rtcpFeedback.Video,
},
PayloadType: 98,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP9, ClockRate: 90000, SDPFmtpLine: "profile-id=1", RTCPFeedback: rtcpFeedback.Video},
PayloadType: 100,
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
ClockRate: 90000,
SDPFmtpLine: "profile-id=1",
RTCPFeedback: rtcpFeedback.Video,
},
PayloadType: 100,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", RTCPFeedback: rtcpFeedback.Video},
PayloadType: 125,
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
RTCPFeedback: rtcpFeedback.Video,
},
PayloadType: 125,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f", RTCPFeedback: rtcpFeedback.Video},
PayloadType: 108,
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f",
RTCPFeedback: rtcpFeedback.Video,
},
PayloadType: 108,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, SDPFmtpLine: h264HighProfileFmtp, RTCPFeedback: rtcpFeedback.Video},
PayloadType: 123,
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
SDPFmtpLine: h264HighProfileFmtp,
RTCPFeedback: rtcpFeedback.Video,
},
PayloadType: 123,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeAV1, ClockRate: 90000, RTCPFeedback: rtcpFeedback.Video},
PayloadType: 35,
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeAV1,
ClockRate: 90000,
RTCPFeedback: rtcpFeedback.Video,
},
PayloadType: 35,
},
} {
if filterOutH264HighProfile && codec.RTPCodecCapability.SDPFmtpLine == h264HighProfileFmtp {
+1 -1
View File
@@ -34,7 +34,7 @@ import (
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/telemetry"
)
+1 -1
View File
@@ -22,7 +22,7 @@ import (
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/protocol/livekit"
lksdp "github.com/livekit/protocol/sdp"
)
+2 -3
View File
@@ -47,7 +47,7 @@ import (
"github.com/livekit/livekit-server/pkg/rtc/types"
sfuinterceptor "github.com/livekit/livekit-server/pkg/sfu/interceptor"
"github.com/livekit/livekit-server/pkg/sfu/pacer"
"github.com/livekit/livekit-server/pkg/sfu/rtpextension"
pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay"
"github.com/livekit/livekit-server/pkg/sfu/streamallocator"
sfuutils "github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
@@ -222,9 +222,8 @@ type TransportParams struct {
func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimator cc.BandwidthEstimator)) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) {
directionConfig := params.DirectionConfig
if params.AllowPlayoutDelay {
directionConfig.RTPHeaderExtension.Video = append(directionConfig.RTPHeaderExtension.Video, rtpextension.PlayoutDelayURI)
directionConfig.RTPHeaderExtension.Video = append(directionConfig.RTPHeaderExtension.Video, pd.PlayoutDelayURI)
}
// Some of the browser clients do not handle H.264 High Profile in signalling properly.
+47 -28
View File
@@ -31,7 +31,8 @@ import (
"go.uber.org/atomic"
"github.com/livekit/livekit-server/pkg/sfu/audio"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
act "github.com/livekit/livekit-server/pkg/sfu/rtpextension/abscapturetime"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/sfu/utils"
sutils "github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/mediatransportutil"
@@ -64,29 +65,30 @@ type ExtPacket struct {
KeyFrame bool
RawPacket []byte
DependencyDescriptor *ExtDependencyDescriptor
AbsCaptureTimeExt *act.AbsCaptureTime
}
// Buffer contains all packets
type Buffer struct {
sync.RWMutex
readCond *sync.Cond
bucket *bucket.Bucket
nacker *nack.NackQueue
maxVideoPkts int
maxAudioPkts int
codecType webrtc.RTPCodecType
payloadType uint8
extPackets deque.Deque[*ExtPacket]
pPackets []pendingPacket
closeOnce sync.Once
mediaSSRC uint32
clockRate uint32
lastReport time.Time
twccExt uint8
audioLevelExt uint8
bound bool
closed atomic.Bool
mime string
readCond *sync.Cond
bucket *bucket.Bucket
nacker *nack.NackQueue
maxVideoPkts int
maxAudioPkts int
codecType webrtc.RTPCodecType
payloadType uint8
extPackets deque.Deque[*ExtPacket]
pPackets []pendingPacket
closeOnce sync.Once
mediaSSRC uint32
clockRate uint32
lastReport time.Time
twccExtID uint8
audioLevelExtID uint8
bound bool
closed atomic.Bool
mime string
snRangeMap *utils.RangeMap[uint64, uint64]
@@ -120,7 +122,7 @@ type Buffer struct {
logger logger.Logger
// dependency descriptor
ddExt uint8
ddExtID uint8
ddParser *DependencyDescriptorParser
paused bool
@@ -133,6 +135,8 @@ type Buffer struct {
primaryBufferForRTX *Buffer
rtxPktBuf []byte
absCaptureTimeExtID uint8
}
// NewBuffer constructs a new Buffer
@@ -173,7 +177,7 @@ func (b *Buffer) SetTWCCAndExtID(twcc *twcc.Responder, extID uint8) {
defer b.Unlock()
b.twcc = twcc
b.twccExt = extID
b.twccExtID = extID
}
func (b *Buffer) SetAudioLevelParams(audioLevelParams audio.AudioLevelParams) {
@@ -223,18 +227,21 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
for _, ext := range params.HeaderExtensions {
switch ext.URI {
case dd.ExtensionURI:
b.ddExt = uint8(ext.ID)
b.ddExtID = uint8(ext.ID)
frc := NewFrameRateCalculatorDD(b.clockRate, b.logger)
for i := range b.frameRateCalculator {
b.frameRateCalculator[i] = frc.GetFrameRateCalculatorForSpatial(int32(i))
}
b.ddParser = NewDependencyDescriptorParser(b.ddExt, b.logger, func(spatial, temporal int32) {
b.ddParser = NewDependencyDescriptorParser(b.ddExtID, b.logger, func(spatial, temporal int32) {
frc.SetMaxLayer(spatial, temporal)
})
case sdp.AudioLevelURI:
b.audioLevelExt = uint8(ext.ID)
b.audioLevelExtID = uint8(ext.ID)
b.audioLevel = audio.NewAudioLevel(b.audioLevelParams)
case act.AbsCaptureTimeURI:
b.absCaptureTimeExtID = uint8(ext.ID)
}
}
@@ -302,8 +309,8 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) {
}
now := time.Now()
if b.twcc != nil && b.twccExt != 0 && !b.closed.Load() {
if ext := rtpPacket.GetExtension(b.twccExt); ext != nil {
if b.twcc != nil && b.twccExtID != 0 && !b.closed.Load() {
if ext := rtpPacket.GetExtension(b.twccExtID); ext != nil {
b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), now.UnixNano(), rtpPacket.Marker)
}
}
@@ -664,12 +671,12 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlow
}
func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time, isRTX bool) {
if b.audioLevelExt != 0 && !isRTX {
if b.audioLevelExtID != 0 && !isRTX {
if !b.latestTSForAudioLevelInitialized {
b.latestTSForAudioLevelInitialized = true
b.latestTSForAudioLevel = p.Timestamp
}
if e := p.GetExtension(b.audioLevelExt); e != nil {
if e := p.GetExtension(b.audioLevelExtID); e != nil {
ext := rtp.AudioLevelExtension{}
if err := ext.Unmarshal(e); err == nil {
if (p.Timestamp - b.latestTSForAudioLevel) < (1 << 31) {
@@ -729,6 +736,7 @@ func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flow
ep.Spatial = InvalidLayerSpatial // vp8 don't have spatial scalability, reset to invalid
}
ep.Payload = vp8Packet
case "video/vp9":
if ep.DependencyDescriptor == nil {
var vp9Packet codecs.VP9Packet
@@ -744,8 +752,10 @@ func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flow
ep.Payload = vp9Packet
}
ep.KeyFrame = IsVP9KeyFrame(rtpPacket.Payload)
case "video/h264":
ep.KeyFrame = IsH264KeyFrame(rtpPacket.Payload)
case "video/av1":
ep.KeyFrame = IsAV1KeyFrame(rtpPacket.Payload)
}
@@ -756,6 +766,15 @@ func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flow
}
}
if b.absCaptureTimeExtID != 0 {
extData := rtpPacket.GetExtension(b.absCaptureTimeExtID)
var actExt act.AbsCaptureTime
if err := actExt.Unmarshal(extData); err == nil {
ep.AbsCaptureTimeExt = &actExt
}
}
return ep
}
+1 -1
View File
@@ -21,7 +21,7 @@ import (
"github.com/pion/rtp"
"go.uber.org/atomic"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/protocol/logger"
+3 -3
View File
@@ -20,7 +20,7 @@ import (
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/protocol/logger"
)
@@ -46,9 +46,9 @@ func (f *testFrameInfo) toDD() *ExtPacket {
return &ExtPacket{
Packet: &rtp.Packet{Header: f.header},
DependencyDescriptor: &ExtDependencyDescriptor{
Descriptor: &dependencydescriptor.DependencyDescriptor{
Descriptor: &dd.DependencyDescriptor{
FrameNumber: f.framenumber,
FrameDependencies: &dependencydescriptor.FrameDependencyTemplate{
FrameDependencies: &dd.FrameDependencyTemplate{
FrameDiffs: f.frameDiff,
},
},
+1 -1
View File
@@ -15,7 +15,7 @@
package buffer
import (
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
)
type FrameEntity struct {
+12 -12
View File
@@ -20,47 +20,47 @@ import (
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
)
func TestFrameIntegrityChecker(t *testing.T) {
fc := NewFrameIntegrityChecker(100, 1000)
// first frame out of order
fc.AddPacket(10, 10, &dependencydescriptor.DependencyDescriptor{})
fc.AddPacket(10, 10, &dd.DependencyDescriptor{})
require.False(t, fc.FrameIntegrity(10))
fc.AddPacket(9, 10, &dependencydescriptor.DependencyDescriptor{FirstPacketInFrame: true})
fc.AddPacket(9, 10, &dd.DependencyDescriptor{FirstPacketInFrame: true})
require.False(t, fc.FrameIntegrity(10))
fc.AddPacket(11, 10, &dependencydescriptor.DependencyDescriptor{LastPacketInFrame: true})
fc.AddPacket(11, 10, &dd.DependencyDescriptor{LastPacketInFrame: true})
require.True(t, fc.FrameIntegrity(10))
// single packet frame
fc.AddPacket(100, 100, &dependencydescriptor.DependencyDescriptor{FirstPacketInFrame: true, LastPacketInFrame: true})
fc.AddPacket(100, 100, &dd.DependencyDescriptor{FirstPacketInFrame: true, LastPacketInFrame: true})
require.True(t, fc.FrameIntegrity(100))
require.False(t, fc.FrameIntegrity(101))
require.False(t, fc.FrameIntegrity(99))
// frame too old than first frame
fc.AddPacket(99, 99, &dependencydescriptor.DependencyDescriptor{FirstPacketInFrame: true, LastPacketInFrame: true})
fc.AddPacket(99, 99, &dd.DependencyDescriptor{FirstPacketInFrame: true, LastPacketInFrame: true})
// multiple packet frame, out of order
fc.AddPacket(2001, 2001, &dependencydescriptor.DependencyDescriptor{})
fc.AddPacket(2001, 2001, &dd.DependencyDescriptor{})
require.False(t, fc.FrameIntegrity(2001))
require.False(t, fc.FrameIntegrity(1999))
// out of frame count(100)
require.False(t, fc.FrameIntegrity(100))
require.False(t, fc.FrameIntegrity(1900))
fc.AddPacket(2000, 2001, &dependencydescriptor.DependencyDescriptor{FirstPacketInFrame: true})
fc.AddPacket(2000, 2001, &dd.DependencyDescriptor{FirstPacketInFrame: true})
require.False(t, fc.FrameIntegrity(2001))
fc.AddPacket(2002, 2001, &dependencydescriptor.DependencyDescriptor{LastPacketInFrame: true})
fc.AddPacket(2002, 2001, &dd.DependencyDescriptor{LastPacketInFrame: true})
require.True(t, fc.FrameIntegrity(2001))
// duplicate packet
fc.AddPacket(2001, 2001, &dependencydescriptor.DependencyDescriptor{})
fc.AddPacket(2001, 2001, &dd.DependencyDescriptor{})
require.True(t, fc.FrameIntegrity(2001))
// frame too old
fc.AddPacket(900, 1900, &dependencydescriptor.DependencyDescriptor{FirstPacketInFrame: true, LastPacketInFrame: true})
fc.AddPacket(900, 1900, &dd.DependencyDescriptor{FirstPacketInFrame: true, LastPacketInFrame: true})
require.False(t, fc.FrameIntegrity(1900))
for frame := uint64(2002); frame < 2102; frame++ {
@@ -75,7 +75,7 @@ func TestFrameIntegrityChecker(t *testing.T) {
rand.Seed(int64(frame))
rand.Shuffle(len(frames), func(i, j int) { frames[i], frames[j] = frames[j], frames[i] })
for i, f := range frames {
fc.AddPacket(f, frame, &dependencydescriptor.DependencyDescriptor{
fc.AddPacket(f, frame, &dd.DependencyDescriptor{
FirstPacketInFrame: f == firstFrame,
LastPacketInFrame: f == lastFrame,
})
+81 -11
View File
@@ -35,9 +35,10 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/sfu/pacer"
"github.com/livekit/livekit-server/pkg/sfu/rtpextension"
act "github.com/livekit/livekit-server/pkg/sfu/rtpextension/abscapturetime"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay"
"github.com/livekit/livekit-server/pkg/sfu/utils"
)
@@ -237,6 +238,7 @@ type DownTrack struct {
transportWideExtID int
dependencyDescriptorExtID int
playoutDelayExtID int
absCaptureTimeExtID int
transceiver atomic.Pointer[webrtc.RTPTransceiver]
writeStream webrtc.TrackLocalWriter
rtcpReader *buffer.RTCPReader
@@ -553,7 +555,7 @@ func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeade
}
case dd.ExtensionURI:
d.dependencyDescriptorExtID = ext.ID
case rtpextension.PlayoutDelayURI:
case pd.PlayoutDelayURI:
d.playoutDelayExtID = ext.ID
case sdp.TransportCCURI:
if isBWEEnabled {
@@ -561,6 +563,8 @@ func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeade
} else {
d.transportWideExtID = 0
}
case act.AbsCaptureTimeURI:
d.absCaptureTimeExtID = ext.ID
}
}
}
@@ -730,13 +734,59 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
var extensions []pacer.ExtensionData
if tp.ddBytes != nil {
extensions = []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: tp.ddBytes}}
extensions = append(
extensions,
pacer.ExtensionData{
ID: uint8(d.dependencyDescriptorExtID),
Payload: tp.ddBytes,
},
)
}
if d.playoutDelayExtID != 0 && d.playoutDelay != nil {
if val := d.playoutDelay.GetDelayExtension(hdr.SequenceNumber); val != nil {
extensions = append(extensions, pacer.ExtensionData{ID: uint8(d.playoutDelayExtID), Payload: val})
extensions = append(
extensions,
pacer.ExtensionData{
ID: uint8(d.playoutDelayExtID),
Payload: val,
},
)
// NOTE: play out delay extension is not cached in sequencer,
// i. e. they will not be added to retransmitted packet.
// But, it is okay as the extension is added till a RTCP Receiver Report for
// the corresponding sequence number is received.
// The extreme case is all packets containing the play out delay are lost and
// all of them retransmitted and an RTCP Receiver Report received for those
// retransmited sequence numbers. But, that is highly improbable, if not impossible.
}
}
var actBytes []byte
if extPkt.AbsCaptureTimeExt != nil && d.absCaptureTimeExtID != 0 {
// normalize capture time to SFU clock.
// NOTE: even if there is estimated offset populated, just re-map the
// absolute capture time stamp as it should be the same RTCP sender report
// clock domain of publisher. SFU is normalising sender reports of publisher
// to SFU clock before sending to subscribers. So, capture time should be
// normalized to the same clock. Clear out any offset.
_, _, refSenderReport := d.forwarder.GetSenderReportParams()
if refSenderReport != nil {
actExtCopy := *extPkt.AbsCaptureTimeExt
if err = actExtCopy.Rewrite(refSenderReport.AtAdjusted.Sub(refSenderReport.NTPTimestamp.Time())); err == nil {
actBytes, err = actExtCopy.Marshal()
if err == nil {
extensions = append(
extensions,
pacer.ExtensionData{
ID: uint8(d.absCaptureTimeExtID),
Payload: actBytes,
},
)
}
}
}
}
if d.sequencer != nil {
d.sequencer.push(
extPkt.Arrival,
@@ -748,6 +798,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
payload[:outgoingHeaderSize],
incomingHeaderSize,
tp.ddBytes,
actBytes,
)
}
@@ -1715,11 +1766,30 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
payload = payload[:int(epm.numCodecBytesOut)+len(pkt.Payload)-int(epm.numCodecBytesIn)]
}
var ddBytes []byte
if len(epm.ddBytesSlice) != 0 {
ddBytes = epm.ddBytesSlice
} else {
ddBytes = epm.ddBytes[:epm.ddBytesSize]
var extensions []pacer.ExtensionData
if d.dependencyDescriptorExtID != 0 {
var ddBytes []byte
if len(epm.ddBytesSlice) != 0 {
ddBytes = epm.ddBytesSlice
} else {
ddBytes = epm.ddBytes[:epm.ddBytesSize]
}
extensions = append(
extensions,
pacer.ExtensionData{
ID: uint8(d.dependencyDescriptorExtID),
Payload: ddBytes,
},
)
}
if d.absCaptureTimeExtID != 0 && len(epm.actBytes) != 0 {
extensions = append(
extensions,
pacer.ExtensionData{
ID: uint8(d.absCaptureTimeExtID),
Payload: epm.actBytes,
},
)
}
d.sendingPacket(
@@ -1735,7 +1805,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
)
d.pacer.Enqueue(pacer.Packet{
Header: &pkt.Header,
Extensions: []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: ddBytes}},
Extensions: extensions,
Payload: payload,
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
TransportWideExtID: uint8(d.transportWideExtID),
+1 -1
View File
@@ -31,7 +31,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/codecmunger"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/sfu/videolayerselector"
"github.com/livekit/livekit-server/pkg/sfu/videolayerselector/temporallayerselector"
)
+5 -5
View File
@@ -20,7 +20,7 @@ import (
"time"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/rtpextension"
pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay"
"github.com/livekit/protocol/logger"
)
@@ -67,10 +67,10 @@ type PlayoutDelayController struct {
func NewPlayoutDelayController(minDelay, maxDelay uint32, logger logger.Logger, rtpStats *buffer.RTPStatsSender) (*PlayoutDelayController, error) {
if maxDelay == 0 && minDelay > 0 {
maxDelay = rtpextension.MaxPlayoutDelayDefault
maxDelay = pd.MaxPlayoutDelayDefault
}
if maxDelay > rtpextension.PlayoutDelayMaxValue {
maxDelay = rtpextension.PlayoutDelayMaxValue
if maxDelay > pd.PlayoutDelayMaxValue {
maxDelay = pd.PlayoutDelayMaxValue
}
c := &PlayoutDelayController{
currentDelay: minDelay,
@@ -153,7 +153,7 @@ func (c *PlayoutDelayController) GetDelayExtension(seq uint16) []byte {
}
func (c *PlayoutDelayController) createExtData() error {
delay := rtpextension.PlayoutDelayFromValue(
delay := pd.PlayoutDelayFromValue(
uint16(c.currentDelay),
uint16(c.maxDelay),
)
+3 -3
View File
@@ -21,7 +21,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/rtpextension"
pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay"
"github.com/livekit/protocol/logger"
)
@@ -60,7 +60,7 @@ func TestPlayoutDelay(t *testing.T) {
c.SetJitter(50)
t.Log(c.currentDelay, c.state.Load())
ext = c.GetDelayExtension(108)
var delay rtpextension.PlayOutDelay
var delay pd.PlayOutDelay
require.NoError(t, delay.Unmarshal(ext))
require.Greater(t, delay.Min, uint16(100))
@@ -72,7 +72,7 @@ func TestPlayoutDelay(t *testing.T) {
}
func playoutDelayEqual(t *testing.T, data []byte, min, max uint16) {
var delay rtpextension.PlayOutDelay
var delay pd.PlayOutDelay
require.NoError(t, delay.Unmarshal(data))
require.Equal(t, min, delay.Min)
require.Equal(t, max, delay.Max)
+1 -1
View File
@@ -35,7 +35,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/audio"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
)
var (
@@ -0,0 +1,114 @@
// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package abscapturetime
import (
"encoding/binary"
"errors"
"time"
"github.com/livekit/mediatransportutil"
)
const (
AbsCaptureTimeURI = "http://www.webrtc.org/experiments/rtp-hdrext/abs-capture-time"
)
var (
errInvalidData = errors.New("invalid data")
errTooSmall = errors.New("buffer too small")
)
// Reference: https://webrtc.googlesource.com/src/+/refs/heads/main/docs/native-code/rtp-hdrext/abs-capture-time/
//
// Data layout of the shortened version of abs-capture-time with a 1-byte header + 8 bytes of data:
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ID | len=7 | absolute capture timestamp (bit 0-23) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | absolute capture timestamp (bit 24-55) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ... (56-63) |
// +-+-+-+-+-+-+-+-+
//
//Data layout of the extended version of abs-capture-time with a 1-byte header + 16 bytes of data:
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ID | len=15| absolute capture timestamp (bit 0-23) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | absolute capture timestamp (bit 24-55) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ... (56-63) | estimated capture clock offset (bit 0-23) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | estimated capture clock offset (bit 24-55) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ... (56-63) |
// +-+-+-+-+-+-+-+-+
type AbsCaptureTime struct {
absoluteCaptureTimestamp mediatransportutil.NtpTime
estimatedCaptureClockOffset int64
}
func AbsCaptureTimeFromValue(absoluteCaptureTimestamp uint64, estimatedCaptureClockOffset int64) *AbsCaptureTime {
return &AbsCaptureTime{
absoluteCaptureTimestamp: mediatransportutil.NtpTime(absoluteCaptureTimestamp),
estimatedCaptureClockOffset: estimatedCaptureClockOffset,
}
}
func (a *AbsCaptureTime) Rewrite(offset time.Duration) error {
if a.absoluteCaptureTimestamp == 0 {
return errInvalidData
}
capturedAt := a.absoluteCaptureTimestamp.Time().Add(offset)
a.absoluteCaptureTimestamp = mediatransportutil.ToNtpTime(capturedAt)
a.estimatedCaptureClockOffset = 0
return nil
}
func (a *AbsCaptureTime) Marshal() ([]byte, error) {
if a.absoluteCaptureTimestamp == 0 {
return nil, errInvalidData
}
size := 8
if a.estimatedCaptureClockOffset != 0 {
size += 8
}
marshalled := make([]byte, size)
binary.BigEndian.PutUint64(marshalled, uint64(a.absoluteCaptureTimestamp))
if a.estimatedCaptureClockOffset != 0 {
binary.BigEndian.PutUint64(marshalled[8:], uint64(a.estimatedCaptureClockOffset))
}
return marshalled, nil
}
func (a *AbsCaptureTime) Unmarshal(marshalled []byte) error {
if len(marshalled) < 8 {
return errTooSmall
}
a.absoluteCaptureTimestamp = mediatransportutil.NtpTime(binary.BigEndian.Uint64(marshalled))
if len(marshalled) >= 16 {
a.estimatedCaptureClockOffset = int64(binary.BigEndian.Uint64(marshalled[8:]))
}
return nil
}
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package rtpextension
package playoutdelay
import (
"encoding/binary"
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package rtpextension
package playoutdelay
import (
"testing"
+6
View File
@@ -75,6 +75,8 @@ type packetMeta struct {
ddBytes [8]byte
ddBytesSize uint8
ddBytesSlice []byte
// abs-capture-time of packet
actBytes []byte
}
type extPacketMeta struct {
@@ -134,6 +136,7 @@ func (s *sequencer) push(
codecBytes []byte,
numCodecBytesIn int,
ddBytes []byte,
actBytes []byte,
) {
s.Lock()
defer s.Unlock()
@@ -220,6 +223,8 @@ func (s *sequencer) push(
copy(pm.ddBytes[:pm.ddBytesSize], ddBytes)
}
pm.actBytes = append([]byte{}, actBytes...)
if extModifiedSN > s.extHighestSN {
s.extHighestSN = extModifiedSN
}
@@ -344,6 +349,7 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta {
}
epm.codecBytesSlice = append([]byte{}, meta.codecBytesSlice...)
epm.ddBytesSlice = append([]byte{}, meta.ddBytesSlice...)
epm.actBytes = append([]byte{}, meta.actBytes...)
extPacketMetas = append(extPacketMetas, epm)
}
}
+23 -5
View File
@@ -29,11 +29,11 @@ func Test_sequencer(t *testing.T) {
off := uint16(15)
for i := uint64(1); i < 518; i++ {
seq.push(time.Now(), i, i+uint64(off), 123, true, 2, nil, 0, nil)
seq.push(time.Now(), i, i+uint64(off), 123, true, 2, nil, 0, nil, nil)
}
// send the last two out-of-order
seq.push(time.Now(), 519, 519+uint64(off), 123, false, 2, nil, 0, nil)
seq.push(time.Now(), 518, 518+uint64(off), 123, true, 2, nil, 0, nil)
seq.push(time.Now(), 519, 519+uint64(off), 123, false, 2, nil, 0, nil, nil)
seq.push(time.Now(), 518, 518+uint64(off), 123, true, 2, nil, 0, nil, nil)
req := []uint16{57, 58, 62, 63, 513, 514, 515, 516, 517}
res := seq.getExtPacketMetas(req)
@@ -63,14 +63,14 @@ func Test_sequencer(t *testing.T) {
require.Equal(t, val.extTimestamp, uint64(123))
}
seq.push(time.Now(), 521, 521+uint64(off), 123, true, 1, nil, 0, nil)
seq.push(time.Now(), 521, 521+uint64(off), 123, true, 1, nil, 0, nil, nil)
m := seq.getExtPacketMetas([]uint16{521 + off})
require.Equal(t, 0, len(m))
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
m = seq.getExtPacketMetas([]uint16{521 + off})
require.Equal(t, 1, len(m))
seq.push(time.Now(), 505, 505+uint64(off), 123, false, 1, nil, 0, nil)
seq.push(time.Now(), 505, 505+uint64(off), 123, false, 1, nil, 0, nil, nil)
m = seq.getExtPacketMetas([]uint16{505 + off})
require.Equal(t, 0, len(m))
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
@@ -99,6 +99,8 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
ddBytesOdd []byte
ddBytesEven []byte
ddBytesOversized []byte
actBytesOdd []byte
actBytesEven []byte
}
tests := []struct {
@@ -132,6 +134,8 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
ddBytesOdd: []byte{8, 9, 10},
ddBytesEven: []byte{11, 12},
ddBytesOversized: []byte{11, 12, 13, 14, 15, 16, 17, 18, 19},
actBytesOdd: []byte{0, 1, 2, 3, 4, 5, 6, 7},
actBytesEven: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
},
args: args{
seqNo: []uint16{65526 + 5, 65527 + 5, 65530 + 5, 0 /* 65531 input */, 1 /* 65532 input */, 2 /* 65533 input */, 3 /* 65534 input */},
@@ -162,6 +166,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
tt.fields.codecBytesOversized,
len(tt.fields.codecBytesOversized),
tt.fields.ddBytesOversized,
tt.fields.actBytesOdd,
)
} else {
if i.seqNo%2 == 0 {
@@ -175,6 +180,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
tt.fields.codecBytesEven,
tt.fields.numCodecBytesInEven,
tt.fields.ddBytesEven,
tt.fields.actBytesEven,
)
} else {
n.push(
@@ -187,6 +193,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
tt.fields.codecBytesOdd,
tt.fields.numCodecBytesInOdd,
tt.fields.ddBytesOdd,
tt.fields.actBytesOdd,
)
}
}
@@ -204,6 +211,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
require.Equal(t, uint8(len(tt.fields.codecBytesOversized)), sn.numCodecBytesIn)
require.Equal(t, tt.fields.ddBytesOversized, sn.ddBytesSlice)
require.Equal(t, uint8(len(tt.fields.codecBytesOversized)), sn.ddBytesSize)
require.Equal(t, tt.fields.actBytesOdd, sn.actBytes)
} else {
if sn.sourceSeqNo%2 == 0 {
require.Equal(t, tt.fields.markerEven, sn.marker)
@@ -211,12 +219,14 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
require.Equal(t, uint8(tt.fields.numCodecBytesInEven), sn.numCodecBytesIn)
require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes[:sn.ddBytesSize])
require.Equal(t, uint8(len(tt.fields.ddBytesEven)), sn.ddBytesSize)
require.Equal(t, tt.fields.actBytesEven, sn.actBytes)
} else {
require.Equal(t, tt.fields.markerOdd, sn.marker)
require.Equal(t, tt.fields.codecBytesOdd, sn.codecBytes[:sn.numCodecBytesOut])
require.Equal(t, uint8(tt.fields.numCodecBytesInOdd), sn.numCodecBytesIn)
require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes[:sn.ddBytesSize])
require.Equal(t, uint8(len(tt.fields.ddBytesOdd)), sn.ddBytesSize)
require.Equal(t, tt.fields.actBytesOdd, sn.actBytes)
}
}
}
@@ -246,6 +256,8 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
numCodecBytesInEven int
ddBytesOdd []byte
ddBytesEven []byte
actBytesOdd []byte
actBytesEven []byte
}
tests := []struct {
@@ -278,6 +290,8 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
numCodecBytesInEven: 4,
ddBytesOdd: []byte{8, 9, 10},
ddBytesEven: []byte{11, 12},
actBytesOdd: []byte{8, 9, 10},
actBytesEven: []byte{11, 12},
},
args: args{
seqNo: []uint16{4 + 5, 5 + 5, 8 + 5, 9 + 5, 10 + 5, 11 + 5, 12 + 5},
@@ -306,6 +320,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
tt.fields.codecBytesEven,
tt.fields.numCodecBytesInEven,
tt.fields.ddBytesEven,
tt.fields.actBytesEven,
)
} else {
n.push(
@@ -318,6 +333,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
tt.fields.codecBytesOdd,
tt.fields.numCodecBytesInOdd,
tt.fields.ddBytesOdd,
tt.fields.actBytesOdd,
)
}
}
@@ -334,12 +350,14 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
require.Equal(t, uint8(tt.fields.numCodecBytesInEven), sn.numCodecBytesIn)
require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes[:sn.ddBytesSize])
require.Equal(t, uint8(len(tt.fields.ddBytesEven)), sn.ddBytesSize)
require.Equal(t, tt.fields.actBytesEven, sn.actBytes)
} else {
require.Equal(t, tt.fields.markerOdd, sn.marker)
require.Equal(t, tt.fields.codecBytesOdd, sn.codecBytes[:sn.numCodecBytesOut])
require.Equal(t, uint8(tt.fields.numCodecBytesInOdd), sn.numCodecBytesIn)
require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes[:sn.ddBytesSize])
require.Equal(t, uint8(len(tt.fields.ddBytesOdd)), sn.ddBytesSize)
require.Equal(t, tt.fields.actBytesOdd, sn.actBytes)
}
}
if !reflect.DeepEqual(got, tt.want) {
+1 -1
View File
@@ -21,7 +21,7 @@ import (
"go.uber.org/atomic"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
)
type StreamTrackerDependencyDescriptor struct {
@@ -21,7 +21,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/protocol/logger"
)
+1 -1
View File
@@ -18,7 +18,7 @@ import (
"fmt"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
)
type DecodeTarget struct {
@@ -20,7 +20,7 @@ import (
"sync"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
dede "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dede "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/protocol/logger"
)
@@ -22,7 +22,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/protocol/logger"
)
+1 -1
View File
@@ -15,7 +15,7 @@
package videolayerselector
import (
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/protocol/logger"
)