From ad1f508680eb4c0a32bb5378556130e81aa727a2 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 11 Apr 2024 15:25:10 +0530 Subject: [PATCH] Add support for "abs-capture-time" extension. (#2640) * Add support for "abs-capture-time" extension. Currently, it is just passed through from publisher -> subscriber side. TODO: Need to store in sequencer and restore for retransmission. * abs-capture-time in retransmissions * clean up * fix test * more test fixes * more test fixes * more test fixes * log only when size is non-zero * log on both sides for debugging * add marshal/unmarshal * normalize abs capture time to SFU clock * comment out adding abs-capture-time from registered extensions --- pkg/rtc/config.go | 12 +- pkg/rtc/mediaengine.go | 80 +++++++++--- pkg/rtc/mediatrackreceiver.go | 2 +- pkg/rtc/participant_sdp.go | 2 +- pkg/rtc/transport.go | 5 +- pkg/sfu/buffer/buffer.go | 75 +++++++----- pkg/sfu/buffer/dependencydescriptorparser.go | 2 +- pkg/sfu/buffer/fps_test.go | 6 +- pkg/sfu/buffer/frameintegrity.go | 2 +- pkg/sfu/buffer/frameintegrity_test.go | 24 ++-- pkg/sfu/downtrack.go | 92 ++++++++++++-- pkg/sfu/forwarder.go | 2 +- pkg/sfu/playoutdelay.go | 10 +- pkg/sfu/playoutdelay_test.go | 6 +- pkg/sfu/receiver.go | 2 +- .../abscapturetime/abscapturetime.go | 114 ++++++++++++++++++ .../dependencydescriptor/bitstreamreader.go | 0 .../dependencydescriptor/bitstreamwriter.go | 0 .../dependencydescriptorextension.go | 0 .../dependencydescriptorextension_test.go | 0 .../dependencydescriptorreader.go | 0 .../dependencydescriptorwriter.go | 0 .../{ => playoutdelay}/playoutdelay.go | 2 +- .../{ => playoutdelay}/playoutdelay_test.go | 2 +- pkg/sfu/sequencer.go | 6 + pkg/sfu/sequencer_test.go | 28 ++++- pkg/sfu/streamtracker/streamtracker_dd.go | 2 +- .../streamtracker/streamtracker_dd_test.go | 2 +- pkg/sfu/videolayerselector/decodetarget.go | 2 +- .../dependencydescriptor.go | 2 +- .../dependencydescriptor_test.go | 2 +- pkg/sfu/videolayerselector/framechain.go | 2 +- 32 files changed, 383 insertions(+), 103 deletions(-) create mode 100644 pkg/sfu/rtpextension/abscapturetime/abscapturetime.go rename pkg/sfu/{ => rtpextension}/dependencydescriptor/bitstreamreader.go (100%) rename pkg/sfu/{ => rtpextension}/dependencydescriptor/bitstreamwriter.go (100%) rename pkg/sfu/{ => rtpextension}/dependencydescriptor/dependencydescriptorextension.go (100%) rename pkg/sfu/{ => rtpextension}/dependencydescriptor/dependencydescriptorextension_test.go (100%) rename pkg/sfu/{ => rtpextension}/dependencydescriptor/dependencydescriptorreader.go (100%) rename pkg/sfu/{ => rtpextension}/dependencydescriptor/dependencydescriptorwriter.go (100%) rename pkg/sfu/rtpextension/{ => playoutdelay}/playoutdelay.go (99%) rename pkg/sfu/rtpextension/{ => playoutdelay}/playoutdelay_test.go (98%) diff --git a/pkg/rtc/config.go b/pkg/rtc/config.go index f8a8054be..5c608d1f2 100644 --- a/pkg/rtc/config.go +++ b/pkg/rtc/config.go @@ -20,7 +20,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/sfu/buffer" - dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" "github.com/livekit/mediatransportutil/pkg/rtcconfig" ) @@ -88,6 +88,7 @@ func NewWebRTCConfig(conf *config.Config) (*WebRTCConfig, error) { sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.AudioLevelURI, + //act.AbsCaptureTimeURI, }, Video: []string{ sdp.SDESMidURI, @@ -96,6 +97,7 @@ func NewWebRTCConfig(conf *config.Config) (*WebRTCConfig, error) { frameMarking, dd.ExtensionURI, repairedRTPStreamID, + //act.AbsCaptureTimeURI, }, }, RTCPFeedback: RTCPFeedbackConfig{ @@ -115,7 +117,13 @@ func NewWebRTCConfig(conf *config.Config) (*WebRTCConfig, error) { subscriberConfig := DirectionConfig{ StrictACKs: conf.RTC.StrictACKs, RTPHeaderExtension: RTPHeaderExtensionConfig{ - Video: []string{dd.ExtensionURI}, + Video: []string{ + dd.ExtensionURI, + //act.AbsCaptureTimeURI, + }, + Audio: []string{ + //act.AbsCaptureTimeURI, + }, }, RTCPFeedback: RTCPFeedbackConfig{ Video: []webrtc.RTCPFeedback{ diff --git a/pkg/rtc/mediaengine.go b/pkg/rtc/mediaengine.go index 55836b472..a9b1d623b 100644 --- a/pkg/rtc/mediaengine.go +++ b/pkg/rtc/mediaengine.go @@ -29,9 +29,22 @@ const ( videoRTXMimeType = "video/rtx" ) -var opusCodecCapability = webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 2, SDPFmtpLine: "minptime=10;useinbandfec=1"} -var redCodecCapability = webrtc.RTPCodecCapability{MimeType: sfu.MimeTypeAudioRed, ClockRate: 48000, Channels: 2, SDPFmtpLine: "111/111"} -var videoRTX = webrtc.RTPCodecCapability{MimeType: videoRTXMimeType, ClockRate: 90000} +var opusCodecCapability = webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeOpus, + ClockRate: 48000, + Channels: 2, + SDPFmtpLine: "minptime=10;useinbandfec=1", +} +var redCodecCapability = webrtc.RTPCodecCapability{ + MimeType: sfu.MimeTypeAudioRed, + ClockRate: 48000, + Channels: 2, + SDPFmtpLine: "111/111", +} +var videoRTX = webrtc.RTPCodecCapability{ + MimeType: videoRTXMimeType, + ClockRate: 90000, +} func registerCodecs(me *webrtc.MediaEngine, codecs []*livekit.Codec, rtcpFeedback RTCPFeedbackConfig, filterOutH264HighProfile bool) error { opusCodec := opusCodecCapability @@ -61,32 +74,65 @@ func registerCodecs(me *webrtc.MediaEngine, codecs []*livekit.Codec, rtcpFeedbac h264HighProfileFmtp := "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=640032" for _, codec := range []webrtc.RTPCodecParameters{ { - RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8, ClockRate: 90000, RTCPFeedback: rtcpFeedback.Video}, - PayloadType: 96, + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP8, + ClockRate: 90000, + RTCPFeedback: rtcpFeedback.Video, + }, + PayloadType: 96, }, { - RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP9, ClockRate: 90000, SDPFmtpLine: "profile-id=0", RTCPFeedback: rtcpFeedback.Video}, - PayloadType: 98, + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP9, + ClockRate: 90000, + SDPFmtpLine: "profile-id=0", + RTCPFeedback: rtcpFeedback.Video, + }, + PayloadType: 98, }, { - RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP9, ClockRate: 90000, SDPFmtpLine: "profile-id=1", RTCPFeedback: rtcpFeedback.Video}, - PayloadType: 100, + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP9, + ClockRate: 90000, + SDPFmtpLine: "profile-id=1", + RTCPFeedback: rtcpFeedback.Video, + }, + PayloadType: 100, }, { - RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", RTCPFeedback: rtcpFeedback.Video}, - PayloadType: 125, + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeH264, + ClockRate: 90000, + SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", + RTCPFeedback: rtcpFeedback.Video, + }, + PayloadType: 125, }, { - RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f", RTCPFeedback: rtcpFeedback.Video}, - PayloadType: 108, + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeH264, + ClockRate: 90000, + SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f", + RTCPFeedback: rtcpFeedback.Video, + }, + PayloadType: 108, }, { - RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, SDPFmtpLine: h264HighProfileFmtp, RTCPFeedback: rtcpFeedback.Video}, - PayloadType: 123, + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeH264, + ClockRate: 90000, + SDPFmtpLine: h264HighProfileFmtp, + RTCPFeedback: rtcpFeedback.Video, + }, + PayloadType: 123, }, { - RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeAV1, ClockRate: 90000, RTCPFeedback: rtcpFeedback.Video}, - PayloadType: 35, + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeAV1, + ClockRate: 90000, + RTCPFeedback: rtcpFeedback.Video, + }, + PayloadType: 35, }, } { if filterOutH264HighProfile && codec.RTPCodecCapability.SDPFmtpLine == h264HighProfileFmtp { diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index a29b2a630..1644ca4ca 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -34,7 +34,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" - "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" "github.com/livekit/livekit-server/pkg/telemetry" ) diff --git a/pkg/rtc/participant_sdp.go b/pkg/rtc/participant_sdp.go index 2516291d3..d3fae1577 100644 --- a/pkg/rtc/participant_sdp.go +++ b/pkg/rtc/participant_sdp.go @@ -22,7 +22,7 @@ import ( "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" - dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" "github.com/livekit/protocol/livekit" lksdp "github.com/livekit/protocol/sdp" ) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 257820320..74117c8f1 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -47,7 +47,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/types" sfuinterceptor "github.com/livekit/livekit-server/pkg/sfu/interceptor" "github.com/livekit/livekit-server/pkg/sfu/pacer" - "github.com/livekit/livekit-server/pkg/sfu/rtpextension" + pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay" "github.com/livekit/livekit-server/pkg/sfu/streamallocator" sfuutils "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" @@ -222,9 +222,8 @@ type TransportParams struct { func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimator cc.BandwidthEstimator)) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) { directionConfig := params.DirectionConfig - if params.AllowPlayoutDelay { - directionConfig.RTPHeaderExtension.Video = append(directionConfig.RTPHeaderExtension.Video, rtpextension.PlayoutDelayURI) + directionConfig.RTPHeaderExtension.Video = append(directionConfig.RTPHeaderExtension.Video, pd.PlayoutDelayURI) } // Some of the browser clients do not handle H.264 High Profile in signalling properly. diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 8b7678671..04e468d7e 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -31,7 +31,8 @@ import ( "go.uber.org/atomic" "github.com/livekit/livekit-server/pkg/sfu/audio" - dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + act "github.com/livekit/livekit-server/pkg/sfu/rtpextension/abscapturetime" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" "github.com/livekit/livekit-server/pkg/sfu/utils" sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/mediatransportutil" @@ -64,29 +65,30 @@ type ExtPacket struct { KeyFrame bool RawPacket []byte DependencyDescriptor *ExtDependencyDescriptor + AbsCaptureTimeExt *act.AbsCaptureTime } // Buffer contains all packets type Buffer struct { sync.RWMutex - readCond *sync.Cond - bucket *bucket.Bucket - nacker *nack.NackQueue - maxVideoPkts int - maxAudioPkts int - codecType webrtc.RTPCodecType - payloadType uint8 - extPackets deque.Deque[*ExtPacket] - pPackets []pendingPacket - closeOnce sync.Once - mediaSSRC uint32 - clockRate uint32 - lastReport time.Time - twccExt uint8 - audioLevelExt uint8 - bound bool - closed atomic.Bool - mime string + readCond *sync.Cond + bucket *bucket.Bucket + nacker *nack.NackQueue + maxVideoPkts int + maxAudioPkts int + codecType webrtc.RTPCodecType + payloadType uint8 + extPackets deque.Deque[*ExtPacket] + pPackets []pendingPacket + closeOnce sync.Once + mediaSSRC uint32 + clockRate uint32 + lastReport time.Time + twccExtID uint8 + audioLevelExtID uint8 + bound bool + closed atomic.Bool + mime string snRangeMap *utils.RangeMap[uint64, uint64] @@ -120,7 +122,7 @@ type Buffer struct { logger logger.Logger // dependency descriptor - ddExt uint8 + ddExtID uint8 ddParser *DependencyDescriptorParser paused bool @@ -133,6 +135,8 @@ type Buffer struct { primaryBufferForRTX *Buffer rtxPktBuf []byte + + absCaptureTimeExtID uint8 } // NewBuffer constructs a new Buffer @@ -173,7 +177,7 @@ func (b *Buffer) SetTWCCAndExtID(twcc *twcc.Responder, extID uint8) { defer b.Unlock() b.twcc = twcc - b.twccExt = extID + b.twccExtID = extID } func (b *Buffer) SetAudioLevelParams(audioLevelParams audio.AudioLevelParams) { @@ -223,18 +227,21 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili for _, ext := range params.HeaderExtensions { switch ext.URI { case dd.ExtensionURI: - b.ddExt = uint8(ext.ID) + b.ddExtID = uint8(ext.ID) frc := NewFrameRateCalculatorDD(b.clockRate, b.logger) for i := range b.frameRateCalculator { b.frameRateCalculator[i] = frc.GetFrameRateCalculatorForSpatial(int32(i)) } - b.ddParser = NewDependencyDescriptorParser(b.ddExt, b.logger, func(spatial, temporal int32) { + b.ddParser = NewDependencyDescriptorParser(b.ddExtID, b.logger, func(spatial, temporal int32) { frc.SetMaxLayer(spatial, temporal) }) case sdp.AudioLevelURI: - b.audioLevelExt = uint8(ext.ID) + b.audioLevelExtID = uint8(ext.ID) b.audioLevel = audio.NewAudioLevel(b.audioLevelParams) + + case act.AbsCaptureTimeURI: + b.absCaptureTimeExtID = uint8(ext.ID) } } @@ -302,8 +309,8 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) { } now := time.Now() - if b.twcc != nil && b.twccExt != 0 && !b.closed.Load() { - if ext := rtpPacket.GetExtension(b.twccExt); ext != nil { + if b.twcc != nil && b.twccExtID != 0 && !b.closed.Load() { + if ext := rtpPacket.GetExtension(b.twccExtID); ext != nil { b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), now.UnixNano(), rtpPacket.Marker) } } @@ -664,12 +671,12 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlow } func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time, isRTX bool) { - if b.audioLevelExt != 0 && !isRTX { + if b.audioLevelExtID != 0 && !isRTX { if !b.latestTSForAudioLevelInitialized { b.latestTSForAudioLevelInitialized = true b.latestTSForAudioLevel = p.Timestamp } - if e := p.GetExtension(b.audioLevelExt); e != nil { + if e := p.GetExtension(b.audioLevelExtID); e != nil { ext := rtp.AudioLevelExtension{} if err := ext.Unmarshal(e); err == nil { if (p.Timestamp - b.latestTSForAudioLevel) < (1 << 31) { @@ -729,6 +736,7 @@ func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flow ep.Spatial = InvalidLayerSpatial // vp8 don't have spatial scalability, reset to invalid } ep.Payload = vp8Packet + case "video/vp9": if ep.DependencyDescriptor == nil { var vp9Packet codecs.VP9Packet @@ -744,8 +752,10 @@ func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flow ep.Payload = vp9Packet } ep.KeyFrame = IsVP9KeyFrame(rtpPacket.Payload) + case "video/h264": ep.KeyFrame = IsH264KeyFrame(rtpPacket.Payload) + case "video/av1": ep.KeyFrame = IsAV1KeyFrame(rtpPacket.Payload) } @@ -756,6 +766,15 @@ func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flow } } + if b.absCaptureTimeExtID != 0 { + extData := rtpPacket.GetExtension(b.absCaptureTimeExtID) + + var actExt act.AbsCaptureTime + if err := actExt.Unmarshal(extData); err == nil { + ep.AbsCaptureTimeExt = &actExt + } + } + return ep } diff --git a/pkg/sfu/buffer/dependencydescriptorparser.go b/pkg/sfu/buffer/dependencydescriptorparser.go index da79c301b..30851442c 100644 --- a/pkg/sfu/buffer/dependencydescriptorparser.go +++ b/pkg/sfu/buffer/dependencydescriptorparser.go @@ -21,7 +21,7 @@ import ( "github.com/pion/rtp" "go.uber.org/atomic" - dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/protocol/logger" diff --git a/pkg/sfu/buffer/fps_test.go b/pkg/sfu/buffer/fps_test.go index 4a85d2808..39dd93df3 100644 --- a/pkg/sfu/buffer/fps_test.go +++ b/pkg/sfu/buffer/fps_test.go @@ -20,7 +20,7 @@ import ( "github.com/pion/rtp" "github.com/stretchr/testify/require" - "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" "github.com/livekit/protocol/logger" ) @@ -46,9 +46,9 @@ func (f *testFrameInfo) toDD() *ExtPacket { return &ExtPacket{ Packet: &rtp.Packet{Header: f.header}, DependencyDescriptor: &ExtDependencyDescriptor{ - Descriptor: &dependencydescriptor.DependencyDescriptor{ + Descriptor: &dd.DependencyDescriptor{ FrameNumber: f.framenumber, - FrameDependencies: &dependencydescriptor.FrameDependencyTemplate{ + FrameDependencies: &dd.FrameDependencyTemplate{ FrameDiffs: f.frameDiff, }, }, diff --git a/pkg/sfu/buffer/frameintegrity.go b/pkg/sfu/buffer/frameintegrity.go index 536f793cf..935263b7d 100644 --- a/pkg/sfu/buffer/frameintegrity.go +++ b/pkg/sfu/buffer/frameintegrity.go @@ -15,7 +15,7 @@ package buffer import ( - dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" ) type FrameEntity struct { diff --git a/pkg/sfu/buffer/frameintegrity_test.go b/pkg/sfu/buffer/frameintegrity_test.go index 144a9f0f2..2815cf5e9 100644 --- a/pkg/sfu/buffer/frameintegrity_test.go +++ b/pkg/sfu/buffer/frameintegrity_test.go @@ -20,47 +20,47 @@ import ( "github.com/stretchr/testify/require" - "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" ) func TestFrameIntegrityChecker(t *testing.T) { fc := NewFrameIntegrityChecker(100, 1000) // first frame out of order - fc.AddPacket(10, 10, &dependencydescriptor.DependencyDescriptor{}) + fc.AddPacket(10, 10, &dd.DependencyDescriptor{}) require.False(t, fc.FrameIntegrity(10)) - fc.AddPacket(9, 10, &dependencydescriptor.DependencyDescriptor{FirstPacketInFrame: true}) + fc.AddPacket(9, 10, &dd.DependencyDescriptor{FirstPacketInFrame: true}) require.False(t, fc.FrameIntegrity(10)) - fc.AddPacket(11, 10, &dependencydescriptor.DependencyDescriptor{LastPacketInFrame: true}) + fc.AddPacket(11, 10, &dd.DependencyDescriptor{LastPacketInFrame: true}) require.True(t, fc.FrameIntegrity(10)) // single packet frame - fc.AddPacket(100, 100, &dependencydescriptor.DependencyDescriptor{FirstPacketInFrame: true, LastPacketInFrame: true}) + fc.AddPacket(100, 100, &dd.DependencyDescriptor{FirstPacketInFrame: true, LastPacketInFrame: true}) require.True(t, fc.FrameIntegrity(100)) require.False(t, fc.FrameIntegrity(101)) require.False(t, fc.FrameIntegrity(99)) // frame too old than first frame - fc.AddPacket(99, 99, &dependencydescriptor.DependencyDescriptor{FirstPacketInFrame: true, LastPacketInFrame: true}) + fc.AddPacket(99, 99, &dd.DependencyDescriptor{FirstPacketInFrame: true, LastPacketInFrame: true}) // multiple packet frame, out of order - fc.AddPacket(2001, 2001, &dependencydescriptor.DependencyDescriptor{}) + fc.AddPacket(2001, 2001, &dd.DependencyDescriptor{}) require.False(t, fc.FrameIntegrity(2001)) require.False(t, fc.FrameIntegrity(1999)) // out of frame count(100) require.False(t, fc.FrameIntegrity(100)) require.False(t, fc.FrameIntegrity(1900)) - fc.AddPacket(2000, 2001, &dependencydescriptor.DependencyDescriptor{FirstPacketInFrame: true}) + fc.AddPacket(2000, 2001, &dd.DependencyDescriptor{FirstPacketInFrame: true}) require.False(t, fc.FrameIntegrity(2001)) - fc.AddPacket(2002, 2001, &dependencydescriptor.DependencyDescriptor{LastPacketInFrame: true}) + fc.AddPacket(2002, 2001, &dd.DependencyDescriptor{LastPacketInFrame: true}) require.True(t, fc.FrameIntegrity(2001)) // duplicate packet - fc.AddPacket(2001, 2001, &dependencydescriptor.DependencyDescriptor{}) + fc.AddPacket(2001, 2001, &dd.DependencyDescriptor{}) require.True(t, fc.FrameIntegrity(2001)) // frame too old - fc.AddPacket(900, 1900, &dependencydescriptor.DependencyDescriptor{FirstPacketInFrame: true, LastPacketInFrame: true}) + fc.AddPacket(900, 1900, &dd.DependencyDescriptor{FirstPacketInFrame: true, LastPacketInFrame: true}) require.False(t, fc.FrameIntegrity(1900)) for frame := uint64(2002); frame < 2102; frame++ { @@ -75,7 +75,7 @@ func TestFrameIntegrityChecker(t *testing.T) { rand.Seed(int64(frame)) rand.Shuffle(len(frames), func(i, j int) { frames[i], frames[j] = frames[j], frames[i] }) for i, f := range frames { - fc.AddPacket(f, frame, &dependencydescriptor.DependencyDescriptor{ + fc.AddPacket(f, frame, &dd.DependencyDescriptor{ FirstPacketInFrame: f == firstFrame, LastPacketInFrame: f == lastFrame, }) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 7e9081901..878fcf19a 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -35,9 +35,10 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" - dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" "github.com/livekit/livekit-server/pkg/sfu/pacer" - "github.com/livekit/livekit-server/pkg/sfu/rtpextension" + act "github.com/livekit/livekit-server/pkg/sfu/rtpextension/abscapturetime" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" + pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay" "github.com/livekit/livekit-server/pkg/sfu/utils" ) @@ -237,6 +238,7 @@ type DownTrack struct { transportWideExtID int dependencyDescriptorExtID int playoutDelayExtID int + absCaptureTimeExtID int transceiver atomic.Pointer[webrtc.RTPTransceiver] writeStream webrtc.TrackLocalWriter rtcpReader *buffer.RTCPReader @@ -553,7 +555,7 @@ func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeade } case dd.ExtensionURI: d.dependencyDescriptorExtID = ext.ID - case rtpextension.PlayoutDelayURI: + case pd.PlayoutDelayURI: d.playoutDelayExtID = ext.ID case sdp.TransportCCURI: if isBWEEnabled { @@ -561,6 +563,8 @@ func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeade } else { d.transportWideExtID = 0 } + case act.AbsCaptureTimeURI: + d.absCaptureTimeExtID = ext.ID } } } @@ -730,13 +734,59 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { var extensions []pacer.ExtensionData if tp.ddBytes != nil { - extensions = []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: tp.ddBytes}} + extensions = append( + extensions, + pacer.ExtensionData{ + ID: uint8(d.dependencyDescriptorExtID), + Payload: tp.ddBytes, + }, + ) } if d.playoutDelayExtID != 0 && d.playoutDelay != nil { if val := d.playoutDelay.GetDelayExtension(hdr.SequenceNumber); val != nil { - extensions = append(extensions, pacer.ExtensionData{ID: uint8(d.playoutDelayExtID), Payload: val}) + extensions = append( + extensions, + pacer.ExtensionData{ + ID: uint8(d.playoutDelayExtID), + Payload: val, + }, + ) + + // NOTE: play out delay extension is not cached in sequencer, + // i. e. they will not be added to retransmitted packet. + // But, it is okay as the extension is added till a RTCP Receiver Report for + // the corresponding sequence number is received. + // The extreme case is all packets containing the play out delay are lost and + // all of them retransmitted and an RTCP Receiver Report received for those + // retransmited sequence numbers. But, that is highly improbable, if not impossible. } } + var actBytes []byte + if extPkt.AbsCaptureTimeExt != nil && d.absCaptureTimeExtID != 0 { + // normalize capture time to SFU clock. + // NOTE: even if there is estimated offset populated, just re-map the + // absolute capture time stamp as it should be the same RTCP sender report + // clock domain of publisher. SFU is normalising sender reports of publisher + // to SFU clock before sending to subscribers. So, capture time should be + // normalized to the same clock. Clear out any offset. + _, _, refSenderReport := d.forwarder.GetSenderReportParams() + if refSenderReport != nil { + actExtCopy := *extPkt.AbsCaptureTimeExt + if err = actExtCopy.Rewrite(refSenderReport.AtAdjusted.Sub(refSenderReport.NTPTimestamp.Time())); err == nil { + actBytes, err = actExtCopy.Marshal() + if err == nil { + extensions = append( + extensions, + pacer.ExtensionData{ + ID: uint8(d.absCaptureTimeExtID), + Payload: actBytes, + }, + ) + } + } + } + } + if d.sequencer != nil { d.sequencer.push( extPkt.Arrival, @@ -748,6 +798,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { payload[:outgoingHeaderSize], incomingHeaderSize, tp.ddBytes, + actBytes, ) } @@ -1715,11 +1766,30 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { payload = payload[:int(epm.numCodecBytesOut)+len(pkt.Payload)-int(epm.numCodecBytesIn)] } - var ddBytes []byte - if len(epm.ddBytesSlice) != 0 { - ddBytes = epm.ddBytesSlice - } else { - ddBytes = epm.ddBytes[:epm.ddBytesSize] + var extensions []pacer.ExtensionData + if d.dependencyDescriptorExtID != 0 { + var ddBytes []byte + if len(epm.ddBytesSlice) != 0 { + ddBytes = epm.ddBytesSlice + } else { + ddBytes = epm.ddBytes[:epm.ddBytesSize] + } + extensions = append( + extensions, + pacer.ExtensionData{ + ID: uint8(d.dependencyDescriptorExtID), + Payload: ddBytes, + }, + ) + } + if d.absCaptureTimeExtID != 0 && len(epm.actBytes) != 0 { + extensions = append( + extensions, + pacer.ExtensionData{ + ID: uint8(d.absCaptureTimeExtID), + Payload: epm.actBytes, + }, + ) } d.sendingPacket( @@ -1735,7 +1805,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { ) d.pacer.Enqueue(pacer.Packet{ Header: &pkt.Header, - Extensions: []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: ddBytes}}, + Extensions: extensions, Payload: payload, AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 721dd6c9e..af610e34c 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -31,7 +31,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/codecmunger" - dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" "github.com/livekit/livekit-server/pkg/sfu/videolayerselector" "github.com/livekit/livekit-server/pkg/sfu/videolayerselector/temporallayerselector" ) diff --git a/pkg/sfu/playoutdelay.go b/pkg/sfu/playoutdelay.go index 991f1401b..c0ed985b0 100644 --- a/pkg/sfu/playoutdelay.go +++ b/pkg/sfu/playoutdelay.go @@ -20,7 +20,7 @@ import ( "time" "github.com/livekit/livekit-server/pkg/sfu/buffer" - "github.com/livekit/livekit-server/pkg/sfu/rtpextension" + pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay" "github.com/livekit/protocol/logger" ) @@ -67,10 +67,10 @@ type PlayoutDelayController struct { func NewPlayoutDelayController(minDelay, maxDelay uint32, logger logger.Logger, rtpStats *buffer.RTPStatsSender) (*PlayoutDelayController, error) { if maxDelay == 0 && minDelay > 0 { - maxDelay = rtpextension.MaxPlayoutDelayDefault + maxDelay = pd.MaxPlayoutDelayDefault } - if maxDelay > rtpextension.PlayoutDelayMaxValue { - maxDelay = rtpextension.PlayoutDelayMaxValue + if maxDelay > pd.PlayoutDelayMaxValue { + maxDelay = pd.PlayoutDelayMaxValue } c := &PlayoutDelayController{ currentDelay: minDelay, @@ -153,7 +153,7 @@ func (c *PlayoutDelayController) GetDelayExtension(seq uint16) []byte { } func (c *PlayoutDelayController) createExtData() error { - delay := rtpextension.PlayoutDelayFromValue( + delay := pd.PlayoutDelayFromValue( uint16(c.currentDelay), uint16(c.maxDelay), ) diff --git a/pkg/sfu/playoutdelay_test.go b/pkg/sfu/playoutdelay_test.go index cc24d2848..338b4d64b 100644 --- a/pkg/sfu/playoutdelay_test.go +++ b/pkg/sfu/playoutdelay_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/sfu/buffer" - "github.com/livekit/livekit-server/pkg/sfu/rtpextension" + pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay" "github.com/livekit/protocol/logger" ) @@ -60,7 +60,7 @@ func TestPlayoutDelay(t *testing.T) { c.SetJitter(50) t.Log(c.currentDelay, c.state.Load()) ext = c.GetDelayExtension(108) - var delay rtpextension.PlayOutDelay + var delay pd.PlayOutDelay require.NoError(t, delay.Unmarshal(ext)) require.Greater(t, delay.Min, uint16(100)) @@ -72,7 +72,7 @@ func TestPlayoutDelay(t *testing.T) { } func playoutDelayEqual(t *testing.T, data []byte, min, max uint16) { - var delay rtpextension.PlayOutDelay + var delay pd.PlayOutDelay require.NoError(t, delay.Unmarshal(data)) require.Equal(t, min, delay.Min) require.Equal(t, max, delay.Max) diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index e6e9c54ae..78d49fb2b 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -35,7 +35,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/audio" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" - dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" ) var ( diff --git a/pkg/sfu/rtpextension/abscapturetime/abscapturetime.go b/pkg/sfu/rtpextension/abscapturetime/abscapturetime.go new file mode 100644 index 000000000..1eb3f8d2a --- /dev/null +++ b/pkg/sfu/rtpextension/abscapturetime/abscapturetime.go @@ -0,0 +1,114 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package abscapturetime + +import ( + "encoding/binary" + "errors" + "time" + + "github.com/livekit/mediatransportutil" +) + +const ( + AbsCaptureTimeURI = "http://www.webrtc.org/experiments/rtp-hdrext/abs-capture-time" +) + +var ( + errInvalidData = errors.New("invalid data") + errTooSmall = errors.New("buffer too small") +) + +// Reference: https://webrtc.googlesource.com/src/+/refs/heads/main/docs/native-code/rtp-hdrext/abs-capture-time/ +// +// Data layout of the shortened version of abs-capture-time with a 1-byte header + 8 bytes of data: +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | ID | len=7 | absolute capture timestamp (bit 0-23) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | absolute capture timestamp (bit 24-55) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | ... (56-63) | +// +-+-+-+-+-+-+-+-+ +// +//Data layout of the extended version of abs-capture-time with a 1-byte header + 16 bytes of data: +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | ID | len=15| absolute capture timestamp (bit 0-23) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | absolute capture timestamp (bit 24-55) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | ... (56-63) | estimated capture clock offset (bit 0-23) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | estimated capture clock offset (bit 24-55) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | ... (56-63) | +// +-+-+-+-+-+-+-+-+ + +type AbsCaptureTime struct { + absoluteCaptureTimestamp mediatransportutil.NtpTime + estimatedCaptureClockOffset int64 +} + +func AbsCaptureTimeFromValue(absoluteCaptureTimestamp uint64, estimatedCaptureClockOffset int64) *AbsCaptureTime { + return &AbsCaptureTime{ + absoluteCaptureTimestamp: mediatransportutil.NtpTime(absoluteCaptureTimestamp), + estimatedCaptureClockOffset: estimatedCaptureClockOffset, + } +} + +func (a *AbsCaptureTime) Rewrite(offset time.Duration) error { + if a.absoluteCaptureTimestamp == 0 { + return errInvalidData + } + + capturedAt := a.absoluteCaptureTimestamp.Time().Add(offset) + a.absoluteCaptureTimestamp = mediatransportutil.ToNtpTime(capturedAt) + a.estimatedCaptureClockOffset = 0 + return nil +} + +func (a *AbsCaptureTime) Marshal() ([]byte, error) { + if a.absoluteCaptureTimestamp == 0 { + return nil, errInvalidData + } + + size := 8 + if a.estimatedCaptureClockOffset != 0 { + size += 8 + } + marshalled := make([]byte, size) + binary.BigEndian.PutUint64(marshalled, uint64(a.absoluteCaptureTimestamp)) + if a.estimatedCaptureClockOffset != 0 { + binary.BigEndian.PutUint64(marshalled[8:], uint64(a.estimatedCaptureClockOffset)) + } + return marshalled, nil +} + +func (a *AbsCaptureTime) Unmarshal(marshalled []byte) error { + if len(marshalled) < 8 { + return errTooSmall + } + + a.absoluteCaptureTimestamp = mediatransportutil.NtpTime(binary.BigEndian.Uint64(marshalled)) + if len(marshalled) >= 16 { + a.estimatedCaptureClockOffset = int64(binary.BigEndian.Uint64(marshalled[8:])) + } + return nil +} diff --git a/pkg/sfu/dependencydescriptor/bitstreamreader.go b/pkg/sfu/rtpextension/dependencydescriptor/bitstreamreader.go similarity index 100% rename from pkg/sfu/dependencydescriptor/bitstreamreader.go rename to pkg/sfu/rtpextension/dependencydescriptor/bitstreamreader.go diff --git a/pkg/sfu/dependencydescriptor/bitstreamwriter.go b/pkg/sfu/rtpextension/dependencydescriptor/bitstreamwriter.go similarity index 100% rename from pkg/sfu/dependencydescriptor/bitstreamwriter.go rename to pkg/sfu/rtpextension/dependencydescriptor/bitstreamwriter.go diff --git a/pkg/sfu/dependencydescriptor/dependencydescriptorextension.go b/pkg/sfu/rtpextension/dependencydescriptor/dependencydescriptorextension.go similarity index 100% rename from pkg/sfu/dependencydescriptor/dependencydescriptorextension.go rename to pkg/sfu/rtpextension/dependencydescriptor/dependencydescriptorextension.go diff --git a/pkg/sfu/dependencydescriptor/dependencydescriptorextension_test.go b/pkg/sfu/rtpextension/dependencydescriptor/dependencydescriptorextension_test.go similarity index 100% rename from pkg/sfu/dependencydescriptor/dependencydescriptorextension_test.go rename to pkg/sfu/rtpextension/dependencydescriptor/dependencydescriptorextension_test.go diff --git a/pkg/sfu/dependencydescriptor/dependencydescriptorreader.go b/pkg/sfu/rtpextension/dependencydescriptor/dependencydescriptorreader.go similarity index 100% rename from pkg/sfu/dependencydescriptor/dependencydescriptorreader.go rename to pkg/sfu/rtpextension/dependencydescriptor/dependencydescriptorreader.go diff --git a/pkg/sfu/dependencydescriptor/dependencydescriptorwriter.go b/pkg/sfu/rtpextension/dependencydescriptor/dependencydescriptorwriter.go similarity index 100% rename from pkg/sfu/dependencydescriptor/dependencydescriptorwriter.go rename to pkg/sfu/rtpextension/dependencydescriptor/dependencydescriptorwriter.go diff --git a/pkg/sfu/rtpextension/playoutdelay.go b/pkg/sfu/rtpextension/playoutdelay/playoutdelay.go similarity index 99% rename from pkg/sfu/rtpextension/playoutdelay.go rename to pkg/sfu/rtpextension/playoutdelay/playoutdelay.go index 2e311a621..d1017846e 100644 --- a/pkg/sfu/rtpextension/playoutdelay.go +++ b/pkg/sfu/rtpextension/playoutdelay/playoutdelay.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rtpextension +package playoutdelay import ( "encoding/binary" diff --git a/pkg/sfu/rtpextension/playoutdelay_test.go b/pkg/sfu/rtpextension/playoutdelay/playoutdelay_test.go similarity index 98% rename from pkg/sfu/rtpextension/playoutdelay_test.go rename to pkg/sfu/rtpextension/playoutdelay/playoutdelay_test.go index 7c07ed26c..5a2dc078b 100644 --- a/pkg/sfu/rtpextension/playoutdelay_test.go +++ b/pkg/sfu/rtpextension/playoutdelay/playoutdelay_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rtpextension +package playoutdelay import ( "testing" diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index 0fafcfc81..a78b23eb6 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -75,6 +75,8 @@ type packetMeta struct { ddBytes [8]byte ddBytesSize uint8 ddBytesSlice []byte + // abs-capture-time of packet + actBytes []byte } type extPacketMeta struct { @@ -134,6 +136,7 @@ func (s *sequencer) push( codecBytes []byte, numCodecBytesIn int, ddBytes []byte, + actBytes []byte, ) { s.Lock() defer s.Unlock() @@ -220,6 +223,8 @@ func (s *sequencer) push( copy(pm.ddBytes[:pm.ddBytesSize], ddBytes) } + pm.actBytes = append([]byte{}, actBytes...) + if extModifiedSN > s.extHighestSN { s.extHighestSN = extModifiedSN } @@ -344,6 +349,7 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { } epm.codecBytesSlice = append([]byte{}, meta.codecBytesSlice...) epm.ddBytesSlice = append([]byte{}, meta.ddBytesSlice...) + epm.actBytes = append([]byte{}, meta.actBytes...) extPacketMetas = append(extPacketMetas, epm) } } diff --git a/pkg/sfu/sequencer_test.go b/pkg/sfu/sequencer_test.go index 39dcec906..f8eabe045 100644 --- a/pkg/sfu/sequencer_test.go +++ b/pkg/sfu/sequencer_test.go @@ -29,11 +29,11 @@ func Test_sequencer(t *testing.T) { off := uint16(15) for i := uint64(1); i < 518; i++ { - seq.push(time.Now(), i, i+uint64(off), 123, true, 2, nil, 0, nil) + seq.push(time.Now(), i, i+uint64(off), 123, true, 2, nil, 0, nil, nil) } // send the last two out-of-order - seq.push(time.Now(), 519, 519+uint64(off), 123, false, 2, nil, 0, nil) - seq.push(time.Now(), 518, 518+uint64(off), 123, true, 2, nil, 0, nil) + seq.push(time.Now(), 519, 519+uint64(off), 123, false, 2, nil, 0, nil, nil) + seq.push(time.Now(), 518, 518+uint64(off), 123, true, 2, nil, 0, nil, nil) req := []uint16{57, 58, 62, 63, 513, 514, 515, 516, 517} res := seq.getExtPacketMetas(req) @@ -63,14 +63,14 @@ func Test_sequencer(t *testing.T) { require.Equal(t, val.extTimestamp, uint64(123)) } - seq.push(time.Now(), 521, 521+uint64(off), 123, true, 1, nil, 0, nil) + seq.push(time.Now(), 521, 521+uint64(off), 123, true, 1, nil, 0, nil, nil) m := seq.getExtPacketMetas([]uint16{521 + off}) require.Equal(t, 0, len(m)) time.Sleep((ignoreRetransmission + 10) * time.Millisecond) m = seq.getExtPacketMetas([]uint16{521 + off}) require.Equal(t, 1, len(m)) - seq.push(time.Now(), 505, 505+uint64(off), 123, false, 1, nil, 0, nil) + seq.push(time.Now(), 505, 505+uint64(off), 123, false, 1, nil, 0, nil, nil) m = seq.getExtPacketMetas([]uint16{505 + off}) require.Equal(t, 0, len(m)) time.Sleep((ignoreRetransmission + 10) * time.Millisecond) @@ -99,6 +99,8 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { ddBytesOdd []byte ddBytesEven []byte ddBytesOversized []byte + actBytesOdd []byte + actBytesEven []byte } tests := []struct { @@ -132,6 +134,8 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { ddBytesOdd: []byte{8, 9, 10}, ddBytesEven: []byte{11, 12}, ddBytesOversized: []byte{11, 12, 13, 14, 15, 16, 17, 18, 19}, + actBytesOdd: []byte{0, 1, 2, 3, 4, 5, 6, 7}, + actBytesEven: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, }, args: args{ seqNo: []uint16{65526 + 5, 65527 + 5, 65530 + 5, 0 /* 65531 input */, 1 /* 65532 input */, 2 /* 65533 input */, 3 /* 65534 input */}, @@ -162,6 +166,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { tt.fields.codecBytesOversized, len(tt.fields.codecBytesOversized), tt.fields.ddBytesOversized, + tt.fields.actBytesOdd, ) } else { if i.seqNo%2 == 0 { @@ -175,6 +180,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { tt.fields.codecBytesEven, tt.fields.numCodecBytesInEven, tt.fields.ddBytesEven, + tt.fields.actBytesEven, ) } else { n.push( @@ -187,6 +193,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { tt.fields.codecBytesOdd, tt.fields.numCodecBytesInOdd, tt.fields.ddBytesOdd, + tt.fields.actBytesOdd, ) } } @@ -204,6 +211,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { require.Equal(t, uint8(len(tt.fields.codecBytesOversized)), sn.numCodecBytesIn) require.Equal(t, tt.fields.ddBytesOversized, sn.ddBytesSlice) require.Equal(t, uint8(len(tt.fields.codecBytesOversized)), sn.ddBytesSize) + require.Equal(t, tt.fields.actBytesOdd, sn.actBytes) } else { if sn.sourceSeqNo%2 == 0 { require.Equal(t, tt.fields.markerEven, sn.marker) @@ -211,12 +219,14 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { require.Equal(t, uint8(tt.fields.numCodecBytesInEven), sn.numCodecBytesIn) require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes[:sn.ddBytesSize]) require.Equal(t, uint8(len(tt.fields.ddBytesEven)), sn.ddBytesSize) + require.Equal(t, tt.fields.actBytesEven, sn.actBytes) } else { require.Equal(t, tt.fields.markerOdd, sn.marker) require.Equal(t, tt.fields.codecBytesOdd, sn.codecBytes[:sn.numCodecBytesOut]) require.Equal(t, uint8(tt.fields.numCodecBytesInOdd), sn.numCodecBytesIn) require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes[:sn.ddBytesSize]) require.Equal(t, uint8(len(tt.fields.ddBytesOdd)), sn.ddBytesSize) + require.Equal(t, tt.fields.actBytesOdd, sn.actBytes) } } } @@ -246,6 +256,8 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) { numCodecBytesInEven int ddBytesOdd []byte ddBytesEven []byte + actBytesOdd []byte + actBytesEven []byte } tests := []struct { @@ -278,6 +290,8 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) { numCodecBytesInEven: 4, ddBytesOdd: []byte{8, 9, 10}, ddBytesEven: []byte{11, 12}, + actBytesOdd: []byte{8, 9, 10}, + actBytesEven: []byte{11, 12}, }, args: args{ seqNo: []uint16{4 + 5, 5 + 5, 8 + 5, 9 + 5, 10 + 5, 11 + 5, 12 + 5}, @@ -306,6 +320,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) { tt.fields.codecBytesEven, tt.fields.numCodecBytesInEven, tt.fields.ddBytesEven, + tt.fields.actBytesEven, ) } else { n.push( @@ -318,6 +333,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) { tt.fields.codecBytesOdd, tt.fields.numCodecBytesInOdd, tt.fields.ddBytesOdd, + tt.fields.actBytesOdd, ) } } @@ -334,12 +350,14 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) { require.Equal(t, uint8(tt.fields.numCodecBytesInEven), sn.numCodecBytesIn) require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes[:sn.ddBytesSize]) require.Equal(t, uint8(len(tt.fields.ddBytesEven)), sn.ddBytesSize) + require.Equal(t, tt.fields.actBytesEven, sn.actBytes) } else { require.Equal(t, tt.fields.markerOdd, sn.marker) require.Equal(t, tt.fields.codecBytesOdd, sn.codecBytes[:sn.numCodecBytesOut]) require.Equal(t, uint8(tt.fields.numCodecBytesInOdd), sn.numCodecBytesIn) require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes[:sn.ddBytesSize]) require.Equal(t, uint8(len(tt.fields.ddBytesOdd)), sn.ddBytesSize) + require.Equal(t, tt.fields.actBytesOdd, sn.actBytes) } } if !reflect.DeepEqual(got, tt.want) { diff --git a/pkg/sfu/streamtracker/streamtracker_dd.go b/pkg/sfu/streamtracker/streamtracker_dd.go index be8009eb1..29b876c70 100644 --- a/pkg/sfu/streamtracker/streamtracker_dd.go +++ b/pkg/sfu/streamtracker/streamtracker_dd.go @@ -21,7 +21,7 @@ import ( "go.uber.org/atomic" "github.com/livekit/livekit-server/pkg/sfu/buffer" - dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" ) type StreamTrackerDependencyDescriptor struct { diff --git a/pkg/sfu/streamtracker/streamtracker_dd_test.go b/pkg/sfu/streamtracker/streamtracker_dd_test.go index f638e4e2d..8e3f4001c 100644 --- a/pkg/sfu/streamtracker/streamtracker_dd_test.go +++ b/pkg/sfu/streamtracker/streamtracker_dd_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/sfu/buffer" - dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" "github.com/livekit/protocol/logger" ) diff --git a/pkg/sfu/videolayerselector/decodetarget.go b/pkg/sfu/videolayerselector/decodetarget.go index 7204b329f..70114f21f 100644 --- a/pkg/sfu/videolayerselector/decodetarget.go +++ b/pkg/sfu/videolayerselector/decodetarget.go @@ -18,7 +18,7 @@ import ( "fmt" "github.com/livekit/livekit-server/pkg/sfu/buffer" - dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" ) type DecodeTarget struct { diff --git a/pkg/sfu/videolayerselector/dependencydescriptor.go b/pkg/sfu/videolayerselector/dependencydescriptor.go index 15d9df1eb..acfe5b85c 100644 --- a/pkg/sfu/videolayerselector/dependencydescriptor.go +++ b/pkg/sfu/videolayerselector/dependencydescriptor.go @@ -20,7 +20,7 @@ import ( "sync" "github.com/livekit/livekit-server/pkg/sfu/buffer" - dede "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dede "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" "github.com/livekit/protocol/logger" ) diff --git a/pkg/sfu/videolayerselector/dependencydescriptor_test.go b/pkg/sfu/videolayerselector/dependencydescriptor_test.go index 2b346dedb..a562988f9 100644 --- a/pkg/sfu/videolayerselector/dependencydescriptor_test.go +++ b/pkg/sfu/videolayerselector/dependencydescriptor_test.go @@ -22,7 +22,7 @@ import ( "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/sfu/buffer" - dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" "github.com/livekit/protocol/logger" ) diff --git a/pkg/sfu/videolayerselector/framechain.go b/pkg/sfu/videolayerselector/framechain.go index 7b226ed0c..836783864 100644 --- a/pkg/sfu/videolayerselector/framechain.go +++ b/pkg/sfu/videolayerselector/framechain.go @@ -15,7 +15,7 @@ package videolayerselector import ( - dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" "github.com/livekit/protocol/logger" )