From a5333a86bb63519521cdd9bc0809fe40d9364253 Mon Sep 17 00:00:00 2001 From: David Chen Date: Mon, 23 Mar 2026 13:33:42 -0700 Subject: [PATCH] add packet trailer stripping support (#4361) * bump protocol version to 17 to enable packet trailer stripping functionality * check subscriber protocol version for trailer stripping --- pkg/rtc/mediatrackreceiver.go | 4 + pkg/rtc/participant.go | 29 +-- pkg/rtc/subscribedtrack.go | 25 +-- pkg/rtc/types/interfaces.go | 1 + pkg/rtc/types/protocol_version.go | 6 +- .../typesfakes/fake_local_media_track.go | 63 ++++++ pkg/rtc/types/typesfakes/fake_media_track.go | 63 ++++++ pkg/sfu/downtrack.go | 14 ++ pkg/sfu/packettrailer/packet_trailer.go | 46 +++++ pkg/sfu/packettrailer/packet_trailer_test.go | 182 ++++++++++++++++++ 10 files changed, 407 insertions(+), 26 deletions(-) create mode 100644 pkg/sfu/packettrailer/packet_trailer.go create mode 100644 pkg/sfu/packettrailer/packet_trailer_test.go diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 7705f742c..b71e02ccf 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -518,6 +518,10 @@ func (t *MediaTrackReceiver) IsEncrypted() bool { return t.TrackInfo().Encryption != livekit.Encryption_NONE } +func (t *MediaTrackReceiver) HasPacketTrailer() bool { + return len(t.TrackInfo().GetPacketTrailerFeatures()) > 0 +} + func (t *MediaTrackReceiver) AddOnClose(f func(isExpectedToResume bool)) { if f == nil { return diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 3bf18eb4f..e58dcb1c5 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -2832,20 +2832,21 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l } ti := &livekit.TrackInfo{ - Type: req.Type, - Name: req.Name, - Width: req.Width, - Height: req.Height, - Muted: req.Muted, - DisableDtx: req.DisableDtx, - Source: req.Source, - Layers: cloneLayers(req.Layers), - DisableRed: req.DisableRed, - Stereo: req.Stereo, - Encryption: req.Encryption, - Stream: req.Stream, - BackupCodecPolicy: backupCodecPolicy, - AudioFeatures: sutils.DedupeSlice(req.AudioFeatures), + Type: req.Type, + Name: req.Name, + Width: req.Width, + Height: req.Height, + Muted: req.Muted, + DisableDtx: req.DisableDtx, + Source: req.Source, + Layers: cloneLayers(req.Layers), + DisableRed: req.DisableRed, + Stereo: req.Stereo, + Encryption: req.Encryption, + Stream: req.Stream, + BackupCodecPolicy: backupCodecPolicy, + AudioFeatures: sutils.DedupeSlice(req.AudioFeatures), + PacketTrailerFeatures: sutils.DedupeSlice(req.PacketTrailerFeatures), } if req.Stereo && !slices.Contains(ti.AudioFeatures, livekit.AudioTrackFeature_TF_STEREO) { ti.AudioFeatures = append(ti.AudioFeatures, livekit.AudioTrackFeature_TF_STEREO) diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index 6f566b81e..eae084956 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -129,18 +129,21 @@ func NewSubscribedTrack(params SubscribedTrackParams) (*SubscribedTrack, error) if isEncrypted { trailer = params.Subscriber.GetTrailer() } + stripPacketTrailer := params.MediaTrack.HasPacketTrailer() && + !params.Subscriber.ProtocolVersion().SupportsPacketTrailer() downTrack, err := sfu.NewDownTrack(sfu.DownTrackParams{ - Codecs: codecs, - IsEncrypted: isEncrypted, - Source: params.MediaTrack.Source(), - Receiver: params.WrappedReceiver, - BufferFactory: params.Subscriber.GetBufferFactory(), - SubID: params.Subscriber.ID(), - StreamID: streamID, - MaxTrack: maxTrack, - PlayoutDelayLimit: params.Subscriber.GetPlayoutDelayConfig(), - Pacer: params.Subscriber.GetPacer(), - Trailer: trailer, + Codecs: codecs, + IsEncrypted: isEncrypted, + Source: params.MediaTrack.Source(), + Receiver: params.WrappedReceiver, + BufferFactory: params.Subscriber.GetBufferFactory(), + SubID: params.Subscriber.ID(), + StreamID: streamID, + MaxTrack: maxTrack, + PlayoutDelayLimit: params.Subscriber.GetPlayoutDelayConfig(), + Pacer: params.Subscriber.GetPacer(), + Trailer: trailer, + StripPacketTrailer: stripPacketTrailer, Logger: LoggerWithTrack( params.Subscriber.GetLogger().WithComponent(sutils.ComponentSub), params.MediaTrack.ID(), diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index d579ccb80..cbb25346f 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -766,6 +766,7 @@ type MediaTrack interface { ClearAllReceivers(isExpectedToResume bool) IsEncrypted() bool + HasPacketTrailer() bool } //counterfeiter:generate . LocalMediaTrack diff --git a/pkg/rtc/types/protocol_version.go b/pkg/rtc/types/protocol_version.go index e7f569854..9edab8b07 100644 --- a/pkg/rtc/types/protocol_version.go +++ b/pkg/rtc/types/protocol_version.go @@ -16,7 +16,7 @@ package types type ProtocolVersion int -const CurrentProtocol = 16 +const CurrentProtocol = 17 func (v ProtocolVersion) SupportsPackedStreamId() bool { return v > 0 @@ -99,3 +99,7 @@ func (v ProtocolVersion) SupportsNonErrorSignalResponse() bool { func (v ProtocolVersion) SupportsMoving() bool { return v > 15 } + +func (v ProtocolVersion) SupportsPacketTrailer() bool { + return v > 16 +} diff --git a/pkg/rtc/types/typesfakes/fake_local_media_track.go b/pkg/rtc/types/typesfakes/fake_local_media_track.go index a06094ec2..d66f4299a 100644 --- a/pkg/rtc/types/typesfakes/fake_local_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_local_media_track.go @@ -124,6 +124,16 @@ type FakeLocalMediaTrack struct { getTrackStatsReturnsOnCall map[int]struct { result1 *livekit.RTPStats } + HasPacketTrailerStub func() bool + hasPacketTrailerMutex sync.RWMutex + hasPacketTrailerArgsForCall []struct { + } + hasPacketTrailerReturns struct { + result1 bool + } + hasPacketTrailerReturnsOnCall map[int]struct { + result1 bool + } HasSdpCidStub func(string) bool hasSdpCidMutex sync.RWMutex hasSdpCidArgsForCall []struct { @@ -950,6 +960,59 @@ func (fake *FakeLocalMediaTrack) GetTrackStatsReturnsOnCall(i int, result1 *live }{result1} } +func (fake *FakeLocalMediaTrack) HasPacketTrailer() bool { + fake.hasPacketTrailerMutex.Lock() + ret, specificReturn := fake.hasPacketTrailerReturnsOnCall[len(fake.hasPacketTrailerArgsForCall)] + fake.hasPacketTrailerArgsForCall = append(fake.hasPacketTrailerArgsForCall, struct { + }{}) + stub := fake.HasPacketTrailerStub + fakeReturns := fake.hasPacketTrailerReturns + fake.recordInvocation("HasPacketTrailer", []interface{}{}) + fake.hasPacketTrailerMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalMediaTrack) HasPacketTrailerCallCount() int { + fake.hasPacketTrailerMutex.RLock() + defer fake.hasPacketTrailerMutex.RUnlock() + return len(fake.hasPacketTrailerArgsForCall) +} + +func (fake *FakeLocalMediaTrack) HasPacketTrailerCalls(stub func() bool) { + fake.hasPacketTrailerMutex.Lock() + defer fake.hasPacketTrailerMutex.Unlock() + fake.HasPacketTrailerStub = stub +} + +func (fake *FakeLocalMediaTrack) HasPacketTrailerReturns(result1 bool) { + fake.hasPacketTrailerMutex.Lock() + defer fake.hasPacketTrailerMutex.Unlock() + fake.HasPacketTrailerStub = nil + fake.hasPacketTrailerReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeLocalMediaTrack) HasPacketTrailerReturnsOnCall(i int, result1 bool) { + fake.hasPacketTrailerMutex.Lock() + defer fake.hasPacketTrailerMutex.Unlock() + fake.HasPacketTrailerStub = nil + if fake.hasPacketTrailerReturnsOnCall == nil { + fake.hasPacketTrailerReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.hasPacketTrailerReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeLocalMediaTrack) HasSdpCid(arg1 string) bool { fake.hasSdpCidMutex.Lock() ret, specificReturn := fake.hasSdpCidReturnsOnCall[len(fake.hasSdpCidArgsForCall)] diff --git a/pkg/rtc/types/typesfakes/fake_media_track.go b/pkg/rtc/types/typesfakes/fake_media_track.go index 95a4d0649..797c8120a 100644 --- a/pkg/rtc/types/typesfakes/fake_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_media_track.go @@ -98,6 +98,16 @@ type FakeMediaTrack struct { getTemporalLayerForSpatialFpsReturnsOnCall map[int]struct { result1 int32 } + HasPacketTrailerStub func() bool + hasPacketTrailerMutex sync.RWMutex + hasPacketTrailerArgsForCall []struct { + } + hasPacketTrailerReturns struct { + result1 bool + } + hasPacketTrailerReturnsOnCall map[int]struct { + result1 bool + } IDStub func() livekit.TrackID iDMutex sync.RWMutex iDArgsForCall []struct { @@ -742,6 +752,59 @@ func (fake *FakeMediaTrack) GetTemporalLayerForSpatialFpsReturnsOnCall(i int, re }{result1} } +func (fake *FakeMediaTrack) HasPacketTrailer() bool { + fake.hasPacketTrailerMutex.Lock() + ret, specificReturn := fake.hasPacketTrailerReturnsOnCall[len(fake.hasPacketTrailerArgsForCall)] + fake.hasPacketTrailerArgsForCall = append(fake.hasPacketTrailerArgsForCall, struct { + }{}) + stub := fake.HasPacketTrailerStub + fakeReturns := fake.hasPacketTrailerReturns + fake.recordInvocation("HasPacketTrailer", []interface{}{}) + fake.hasPacketTrailerMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeMediaTrack) HasPacketTrailerCallCount() int { + fake.hasPacketTrailerMutex.RLock() + defer fake.hasPacketTrailerMutex.RUnlock() + return len(fake.hasPacketTrailerArgsForCall) +} + +func (fake *FakeMediaTrack) HasPacketTrailerCalls(stub func() bool) { + fake.hasPacketTrailerMutex.Lock() + defer fake.hasPacketTrailerMutex.Unlock() + fake.HasPacketTrailerStub = stub +} + +func (fake *FakeMediaTrack) HasPacketTrailerReturns(result1 bool) { + fake.hasPacketTrailerMutex.Lock() + defer fake.hasPacketTrailerMutex.Unlock() + fake.HasPacketTrailerStub = nil + fake.hasPacketTrailerReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeMediaTrack) HasPacketTrailerReturnsOnCall(i int, result1 bool) { + fake.hasPacketTrailerMutex.Lock() + defer fake.hasPacketTrailerMutex.Unlock() + fake.HasPacketTrailerStub = nil + if fake.hasPacketTrailerReturnsOnCall == nil { + fake.hasPacketTrailerReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.hasPacketTrailerReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeMediaTrack) ID() livekit.TrackID { fake.iDMutex.Lock() ret, specificReturn := fake.iDReturnsOnCall[len(fake.iDArgsForCall)] diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index a3bad6d4d..4eb148a80 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -43,6 +43,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/ccutils" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" "github.com/livekit/livekit-server/pkg/sfu/pacer" + "github.com/livekit/livekit-server/pkg/sfu/packettrailer" act "github.com/livekit/livekit-server/pkg/sfu/rtpextension/abscapturetime" dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay" @@ -309,6 +310,7 @@ type DownTrackParams struct { RTCPWriter func([]rtcp.Packet) error DisableSenderReportPassThrough bool SupportsCodecChange bool + StripPacketTrailer bool Listener DownTrackListener } @@ -1088,6 +1090,12 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 { } payload = payload[:len(tp.codecBytes)+n] + if d.params.StripPacketTrailer { + if strip := packettrailer.StripTrailer(payload, tp.marker); strip > 0 { + payload = payload[:len(payload)-strip] + } + } + // translate RTP header hdr := RTPHeaderFactory.Get().(*rtp.Header) *hdr = rtp.Header{ @@ -2206,6 +2214,12 @@ func (d *DownTrack) retransmitPacket(epm *extPacketMeta, sourcePkt []byte, isPro payload = payload[:rtxOffset+int(epm.numCodecBytesOut)+len(pkt.Payload)-int(epm.numCodecBytesIn)] } + if d.params.StripPacketTrailer { + if strip := packettrailer.StripTrailer(payload[rtxOffset:], epm.marker); strip > 0 { + payload = payload[:len(payload)-strip] + } + } + headerSize := hdr.MarshalSize() var ( payloadSize, paddingSize int diff --git a/pkg/sfu/packettrailer/packet_trailer.go b/pkg/sfu/packettrailer/packet_trailer.go new file mode 100644 index 000000000..3b58f6681 --- /dev/null +++ b/pkg/sfu/packettrailer/packet_trailer.go @@ -0,0 +1,46 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package packettrailer + +var Magic = [4]byte{'L', 'K', 'T', 'S'} + +const ( + xorByte = 0xFF + + envelopeSize = 5 // 1B trailer_len + 4B magic +) + +// StripTrailer returns the number of bytes to strip from the end of an RTP +// payload if it contains an LKTS trailer. The trailer is located by checking +// for the "LKTS" magic suffix and then reading the XORed trailer_len byte +// immediately before it. Returns 0 if absent or ineligible. +func StripTrailer(payload []byte, marker bool) int { + if !marker || len(payload) < envelopeSize { + return 0 + } + + tail := payload[len(payload)-4:] + if tail[0] != Magic[0] || tail[1] != Magic[1] || + tail[2] != Magic[2] || tail[3] != Magic[3] { + return 0 + } + + trailerLen := int(payload[len(payload)-5] ^ xorByte) + if trailerLen < envelopeSize || trailerLen > len(payload) { + return 0 + } + + return trailerLen +} diff --git a/pkg/sfu/packettrailer/packet_trailer_test.go b/pkg/sfu/packettrailer/packet_trailer_test.go new file mode 100644 index 000000000..653b71fa2 --- /dev/null +++ b/pkg/sfu/packettrailer/packet_trailer_test.go @@ -0,0 +1,182 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package packettrailer + +import ( + "encoding/binary" + "testing" +) + +const ( + tagTimestampUs = 0x01 + tagFrameID = 0x02 +) + +// appendTLV appends a single XORed TLV element to dst. +func appendTLV(dst []byte, tag byte, value []byte) []byte { + dst = append(dst, tag^xorByte, byte(len(value))^xorByte) + for _, b := range value { + dst = append(dst, b^xorByte) + } + return dst +} + +// appendEnvelope appends the 5-byte envelope (XORed trailer_len + magic). +func appendEnvelope(dst []byte, trailerLen byte) []byte { + dst = append(dst, trailerLen^xorByte) + dst = append(dst, Magic[:]...) + return dst +} + +// makeTrailer builds a complete LKTS trailer with both timestamp and frame_id TLVs. +func makeTrailer(timestampUs int64, frameID uint32) []byte { + var trailer []byte + + var tsBuf [8]byte + binary.BigEndian.PutUint64(tsBuf[:], uint64(timestampUs)) + trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:]) + + var fidBuf [4]byte + binary.BigEndian.PutUint32(fidBuf[:], frameID) + trailer = appendTLV(trailer, tagFrameID, fidBuf[:]) + + trailerLen := byte(len(trailer) + envelopeSize) + trailer = appendEnvelope(trailer, trailerLen) + return trailer +} + +// makePayloadWithTrailer builds a fake video payload followed by a full LKTS trailer. +func makePayloadWithTrailer(videoLen int, timestampUs int64, frameID uint32) []byte { + video := make([]byte, videoLen) + for i := range video { + video[i] = byte(i) + } + return append(video, makeTrailer(timestampUs, frameID)...) +} + +// makeTimestampOnlyTrailer builds a trailer with only the timestamp TLV. +func makeTimestampOnlyTrailer(timestampUs int64) []byte { + var trailer []byte + var tsBuf [8]byte + binary.BigEndian.PutUint64(tsBuf[:], uint64(timestampUs)) + trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:]) + trailerLen := byte(len(trailer) + envelopeSize) + trailer = appendEnvelope(trailer, trailerLen) + return trailer +} + +func TestStripTrailer(t *testing.T) { + fullTrailerSize := 21 // (1+1+8) + (1+1+4) + 5 + tsOnlyTrailerSize := 15 // (1+1+8) + 5 + + tests := []struct { + name string + payload []byte + marker bool + wantStrip int + }{ + { + name: "marker set with full trailer (timestamp + frame_id)", + payload: makePayloadWithTrailer(20, 1700000000000000, 42), + marker: true, + wantStrip: fullTrailerSize, + }, + { + name: "marker set with timestamp-only trailer", + payload: func() []byte { + video := make([]byte, 20) + return append(video, makeTimestampOnlyTrailer(1700000000000000)...) + }(), + marker: true, + wantStrip: tsOnlyTrailerSize, + }, + { + name: "marker not set with valid trailer", + payload: makePayloadWithTrailer(20, 1700000000000000, 42), + marker: false, + wantStrip: 0, + }, + { + name: "marker set without magic", + payload: make([]byte, 32), + marker: true, + wantStrip: 0, + }, + { + name: "marker set but payload too short for envelope", + payload: []byte{0x4C, 0x4B, 0x54, 0x53}, + marker: true, + wantStrip: 0, + }, + { + name: "marker set with partial magic mismatch", + payload: func() []byte { + p := makePayloadWithTrailer(20, 1700000000000000, 42) + p[len(p)-1] = 'x' + return p + }(), + marker: true, + wantStrip: 0, + }, + { + name: "trailer_len exceeds payload length", + payload: func() []byte { + var trailer []byte + var tsBuf [8]byte + binary.BigEndian.PutUint64(tsBuf[:], 42) + trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:]) + trailer = appendEnvelope(trailer, 200) + return trailer + }(), + marker: true, + wantStrip: 0, + }, + { + name: "trailer_len smaller than envelope (invalid)", + payload: func() []byte { + video := make([]byte, 20) + var trailer []byte + var tsBuf [8]byte + binary.BigEndian.PutUint64(tsBuf[:], 42) + trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:]) + trailer = appendEnvelope(trailer, 3) + return append(video, trailer...) + }(), + marker: true, + wantStrip: 0, + }, + { + name: "exactly envelope-only trailer", + payload: appendEnvelope(nil, byte(envelopeSize)), + marker: true, + wantStrip: envelopeSize, + }, + { + name: "empty payload", + payload: []byte{}, + marker: true, + wantStrip: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := StripTrailer(tt.payload, tt.marker) + if got != tt.wantStrip { + t.Errorf("StripTrailer() = %d, want %d", got, tt.wantStrip) + } + }) + } +}