add packet trailer stripping support (#4361)

* bump protocol version to 17 to enable packet trailer stripping functionality
* check subscriber protocol version for trailer stripping
This commit is contained in:
David Chen
2026-03-23 13:33:42 -07:00
committed by GitHub
parent 0e3d765d03
commit a5333a86bb
10 changed files with 407 additions and 26 deletions
+4
View File
@@ -518,6 +518,10 @@ func (t *MediaTrackReceiver) IsEncrypted() bool {
return t.TrackInfo().Encryption != livekit.Encryption_NONE
}
func (t *MediaTrackReceiver) HasPacketTrailer() bool {
return len(t.TrackInfo().GetPacketTrailerFeatures()) > 0
}
func (t *MediaTrackReceiver) AddOnClose(f func(isExpectedToResume bool)) {
if f == nil {
return
+15 -14
View File
@@ -2832,20 +2832,21 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
}
ti := &livekit.TrackInfo{
Type: req.Type,
Name: req.Name,
Width: req.Width,
Height: req.Height,
Muted: req.Muted,
DisableDtx: req.DisableDtx,
Source: req.Source,
Layers: cloneLayers(req.Layers),
DisableRed: req.DisableRed,
Stereo: req.Stereo,
Encryption: req.Encryption,
Stream: req.Stream,
BackupCodecPolicy: backupCodecPolicy,
AudioFeatures: sutils.DedupeSlice(req.AudioFeatures),
Type: req.Type,
Name: req.Name,
Width: req.Width,
Height: req.Height,
Muted: req.Muted,
DisableDtx: req.DisableDtx,
Source: req.Source,
Layers: cloneLayers(req.Layers),
DisableRed: req.DisableRed,
Stereo: req.Stereo,
Encryption: req.Encryption,
Stream: req.Stream,
BackupCodecPolicy: backupCodecPolicy,
AudioFeatures: sutils.DedupeSlice(req.AudioFeatures),
PacketTrailerFeatures: sutils.DedupeSlice(req.PacketTrailerFeatures),
}
if req.Stereo && !slices.Contains(ti.AudioFeatures, livekit.AudioTrackFeature_TF_STEREO) {
ti.AudioFeatures = append(ti.AudioFeatures, livekit.AudioTrackFeature_TF_STEREO)
+14 -11
View File
@@ -129,18 +129,21 @@ func NewSubscribedTrack(params SubscribedTrackParams) (*SubscribedTrack, error)
if isEncrypted {
trailer = params.Subscriber.GetTrailer()
}
stripPacketTrailer := params.MediaTrack.HasPacketTrailer() &&
!params.Subscriber.ProtocolVersion().SupportsPacketTrailer()
downTrack, err := sfu.NewDownTrack(sfu.DownTrackParams{
Codecs: codecs,
IsEncrypted: isEncrypted,
Source: params.MediaTrack.Source(),
Receiver: params.WrappedReceiver,
BufferFactory: params.Subscriber.GetBufferFactory(),
SubID: params.Subscriber.ID(),
StreamID: streamID,
MaxTrack: maxTrack,
PlayoutDelayLimit: params.Subscriber.GetPlayoutDelayConfig(),
Pacer: params.Subscriber.GetPacer(),
Trailer: trailer,
Codecs: codecs,
IsEncrypted: isEncrypted,
Source: params.MediaTrack.Source(),
Receiver: params.WrappedReceiver,
BufferFactory: params.Subscriber.GetBufferFactory(),
SubID: params.Subscriber.ID(),
StreamID: streamID,
MaxTrack: maxTrack,
PlayoutDelayLimit: params.Subscriber.GetPlayoutDelayConfig(),
Pacer: params.Subscriber.GetPacer(),
Trailer: trailer,
StripPacketTrailer: stripPacketTrailer,
Logger: LoggerWithTrack(
params.Subscriber.GetLogger().WithComponent(sutils.ComponentSub),
params.MediaTrack.ID(),
+1
View File
@@ -766,6 +766,7 @@ type MediaTrack interface {
ClearAllReceivers(isExpectedToResume bool)
IsEncrypted() bool
HasPacketTrailer() bool
}
//counterfeiter:generate . LocalMediaTrack
+5 -1
View File
@@ -16,7 +16,7 @@ package types
type ProtocolVersion int
const CurrentProtocol = 16
const CurrentProtocol = 17
func (v ProtocolVersion) SupportsPackedStreamId() bool {
return v > 0
@@ -99,3 +99,7 @@ func (v ProtocolVersion) SupportsNonErrorSignalResponse() bool {
func (v ProtocolVersion) SupportsMoving() bool {
return v > 15
}
func (v ProtocolVersion) SupportsPacketTrailer() bool {
return v > 16
}
@@ -124,6 +124,16 @@ type FakeLocalMediaTrack struct {
getTrackStatsReturnsOnCall map[int]struct {
result1 *livekit.RTPStats
}
HasPacketTrailerStub func() bool
hasPacketTrailerMutex sync.RWMutex
hasPacketTrailerArgsForCall []struct {
}
hasPacketTrailerReturns struct {
result1 bool
}
hasPacketTrailerReturnsOnCall map[int]struct {
result1 bool
}
HasSdpCidStub func(string) bool
hasSdpCidMutex sync.RWMutex
hasSdpCidArgsForCall []struct {
@@ -950,6 +960,59 @@ func (fake *FakeLocalMediaTrack) GetTrackStatsReturnsOnCall(i int, result1 *live
}{result1}
}
func (fake *FakeLocalMediaTrack) HasPacketTrailer() bool {
fake.hasPacketTrailerMutex.Lock()
ret, specificReturn := fake.hasPacketTrailerReturnsOnCall[len(fake.hasPacketTrailerArgsForCall)]
fake.hasPacketTrailerArgsForCall = append(fake.hasPacketTrailerArgsForCall, struct {
}{})
stub := fake.HasPacketTrailerStub
fakeReturns := fake.hasPacketTrailerReturns
fake.recordInvocation("HasPacketTrailer", []interface{}{})
fake.hasPacketTrailerMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalMediaTrack) HasPacketTrailerCallCount() int {
fake.hasPacketTrailerMutex.RLock()
defer fake.hasPacketTrailerMutex.RUnlock()
return len(fake.hasPacketTrailerArgsForCall)
}
func (fake *FakeLocalMediaTrack) HasPacketTrailerCalls(stub func() bool) {
fake.hasPacketTrailerMutex.Lock()
defer fake.hasPacketTrailerMutex.Unlock()
fake.HasPacketTrailerStub = stub
}
func (fake *FakeLocalMediaTrack) HasPacketTrailerReturns(result1 bool) {
fake.hasPacketTrailerMutex.Lock()
defer fake.hasPacketTrailerMutex.Unlock()
fake.HasPacketTrailerStub = nil
fake.hasPacketTrailerReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalMediaTrack) HasPacketTrailerReturnsOnCall(i int, result1 bool) {
fake.hasPacketTrailerMutex.Lock()
defer fake.hasPacketTrailerMutex.Unlock()
fake.HasPacketTrailerStub = nil
if fake.hasPacketTrailerReturnsOnCall == nil {
fake.hasPacketTrailerReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.hasPacketTrailerReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalMediaTrack) HasSdpCid(arg1 string) bool {
fake.hasSdpCidMutex.Lock()
ret, specificReturn := fake.hasSdpCidReturnsOnCall[len(fake.hasSdpCidArgsForCall)]
@@ -98,6 +98,16 @@ type FakeMediaTrack struct {
getTemporalLayerForSpatialFpsReturnsOnCall map[int]struct {
result1 int32
}
HasPacketTrailerStub func() bool
hasPacketTrailerMutex sync.RWMutex
hasPacketTrailerArgsForCall []struct {
}
hasPacketTrailerReturns struct {
result1 bool
}
hasPacketTrailerReturnsOnCall map[int]struct {
result1 bool
}
IDStub func() livekit.TrackID
iDMutex sync.RWMutex
iDArgsForCall []struct {
@@ -742,6 +752,59 @@ func (fake *FakeMediaTrack) GetTemporalLayerForSpatialFpsReturnsOnCall(i int, re
}{result1}
}
func (fake *FakeMediaTrack) HasPacketTrailer() bool {
fake.hasPacketTrailerMutex.Lock()
ret, specificReturn := fake.hasPacketTrailerReturnsOnCall[len(fake.hasPacketTrailerArgsForCall)]
fake.hasPacketTrailerArgsForCall = append(fake.hasPacketTrailerArgsForCall, struct {
}{})
stub := fake.HasPacketTrailerStub
fakeReturns := fake.hasPacketTrailerReturns
fake.recordInvocation("HasPacketTrailer", []interface{}{})
fake.hasPacketTrailerMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeMediaTrack) HasPacketTrailerCallCount() int {
fake.hasPacketTrailerMutex.RLock()
defer fake.hasPacketTrailerMutex.RUnlock()
return len(fake.hasPacketTrailerArgsForCall)
}
func (fake *FakeMediaTrack) HasPacketTrailerCalls(stub func() bool) {
fake.hasPacketTrailerMutex.Lock()
defer fake.hasPacketTrailerMutex.Unlock()
fake.HasPacketTrailerStub = stub
}
func (fake *FakeMediaTrack) HasPacketTrailerReturns(result1 bool) {
fake.hasPacketTrailerMutex.Lock()
defer fake.hasPacketTrailerMutex.Unlock()
fake.HasPacketTrailerStub = nil
fake.hasPacketTrailerReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeMediaTrack) HasPacketTrailerReturnsOnCall(i int, result1 bool) {
fake.hasPacketTrailerMutex.Lock()
defer fake.hasPacketTrailerMutex.Unlock()
fake.HasPacketTrailerStub = nil
if fake.hasPacketTrailerReturnsOnCall == nil {
fake.hasPacketTrailerReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.hasPacketTrailerReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeMediaTrack) ID() livekit.TrackID {
fake.iDMutex.Lock()
ret, specificReturn := fake.iDReturnsOnCall[len(fake.iDArgsForCall)]
+14
View File
@@ -43,6 +43,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
"github.com/livekit/livekit-server/pkg/sfu/pacer"
"github.com/livekit/livekit-server/pkg/sfu/packettrailer"
act "github.com/livekit/livekit-server/pkg/sfu/rtpextension/abscapturetime"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay"
@@ -309,6 +310,7 @@ type DownTrackParams struct {
RTCPWriter func([]rtcp.Packet) error
DisableSenderReportPassThrough bool
SupportsCodecChange bool
StripPacketTrailer bool
Listener DownTrackListener
}
@@ -1088,6 +1090,12 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 {
}
payload = payload[:len(tp.codecBytes)+n]
if d.params.StripPacketTrailer {
if strip := packettrailer.StripTrailer(payload, tp.marker); strip > 0 {
payload = payload[:len(payload)-strip]
}
}
// translate RTP header
hdr := RTPHeaderFactory.Get().(*rtp.Header)
*hdr = rtp.Header{
@@ -2206,6 +2214,12 @@ func (d *DownTrack) retransmitPacket(epm *extPacketMeta, sourcePkt []byte, isPro
payload = payload[:rtxOffset+int(epm.numCodecBytesOut)+len(pkt.Payload)-int(epm.numCodecBytesIn)]
}
if d.params.StripPacketTrailer {
if strip := packettrailer.StripTrailer(payload[rtxOffset:], epm.marker); strip > 0 {
payload = payload[:len(payload)-strip]
}
}
headerSize := hdr.MarshalSize()
var (
payloadSize, paddingSize int
+46
View File
@@ -0,0 +1,46 @@
// Copyright 2026 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package packettrailer
var Magic = [4]byte{'L', 'K', 'T', 'S'}
const (
xorByte = 0xFF
envelopeSize = 5 // 1B trailer_len + 4B magic
)
// StripTrailer returns the number of bytes to strip from the end of an RTP
// payload if it contains an LKTS trailer. The trailer is located by checking
// for the "LKTS" magic suffix and then reading the XORed trailer_len byte
// immediately before it. Returns 0 if absent or ineligible.
func StripTrailer(payload []byte, marker bool) int {
if !marker || len(payload) < envelopeSize {
return 0
}
tail := payload[len(payload)-4:]
if tail[0] != Magic[0] || tail[1] != Magic[1] ||
tail[2] != Magic[2] || tail[3] != Magic[3] {
return 0
}
trailerLen := int(payload[len(payload)-5] ^ xorByte)
if trailerLen < envelopeSize || trailerLen > len(payload) {
return 0
}
return trailerLen
}
@@ -0,0 +1,182 @@
// Copyright 2026 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package packettrailer
import (
"encoding/binary"
"testing"
)
const (
tagTimestampUs = 0x01
tagFrameID = 0x02
)
// appendTLV appends a single XORed TLV element to dst.
func appendTLV(dst []byte, tag byte, value []byte) []byte {
dst = append(dst, tag^xorByte, byte(len(value))^xorByte)
for _, b := range value {
dst = append(dst, b^xorByte)
}
return dst
}
// appendEnvelope appends the 5-byte envelope (XORed trailer_len + magic).
func appendEnvelope(dst []byte, trailerLen byte) []byte {
dst = append(dst, trailerLen^xorByte)
dst = append(dst, Magic[:]...)
return dst
}
// makeTrailer builds a complete LKTS trailer with both timestamp and frame_id TLVs.
func makeTrailer(timestampUs int64, frameID uint32) []byte {
var trailer []byte
var tsBuf [8]byte
binary.BigEndian.PutUint64(tsBuf[:], uint64(timestampUs))
trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:])
var fidBuf [4]byte
binary.BigEndian.PutUint32(fidBuf[:], frameID)
trailer = appendTLV(trailer, tagFrameID, fidBuf[:])
trailerLen := byte(len(trailer) + envelopeSize)
trailer = appendEnvelope(trailer, trailerLen)
return trailer
}
// makePayloadWithTrailer builds a fake video payload followed by a full LKTS trailer.
func makePayloadWithTrailer(videoLen int, timestampUs int64, frameID uint32) []byte {
video := make([]byte, videoLen)
for i := range video {
video[i] = byte(i)
}
return append(video, makeTrailer(timestampUs, frameID)...)
}
// makeTimestampOnlyTrailer builds a trailer with only the timestamp TLV.
func makeTimestampOnlyTrailer(timestampUs int64) []byte {
var trailer []byte
var tsBuf [8]byte
binary.BigEndian.PutUint64(tsBuf[:], uint64(timestampUs))
trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:])
trailerLen := byte(len(trailer) + envelopeSize)
trailer = appendEnvelope(trailer, trailerLen)
return trailer
}
func TestStripTrailer(t *testing.T) {
fullTrailerSize := 21 // (1+1+8) + (1+1+4) + 5
tsOnlyTrailerSize := 15 // (1+1+8) + 5
tests := []struct {
name string
payload []byte
marker bool
wantStrip int
}{
{
name: "marker set with full trailer (timestamp + frame_id)",
payload: makePayloadWithTrailer(20, 1700000000000000, 42),
marker: true,
wantStrip: fullTrailerSize,
},
{
name: "marker set with timestamp-only trailer",
payload: func() []byte {
video := make([]byte, 20)
return append(video, makeTimestampOnlyTrailer(1700000000000000)...)
}(),
marker: true,
wantStrip: tsOnlyTrailerSize,
},
{
name: "marker not set with valid trailer",
payload: makePayloadWithTrailer(20, 1700000000000000, 42),
marker: false,
wantStrip: 0,
},
{
name: "marker set without magic",
payload: make([]byte, 32),
marker: true,
wantStrip: 0,
},
{
name: "marker set but payload too short for envelope",
payload: []byte{0x4C, 0x4B, 0x54, 0x53},
marker: true,
wantStrip: 0,
},
{
name: "marker set with partial magic mismatch",
payload: func() []byte {
p := makePayloadWithTrailer(20, 1700000000000000, 42)
p[len(p)-1] = 'x'
return p
}(),
marker: true,
wantStrip: 0,
},
{
name: "trailer_len exceeds payload length",
payload: func() []byte {
var trailer []byte
var tsBuf [8]byte
binary.BigEndian.PutUint64(tsBuf[:], 42)
trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:])
trailer = appendEnvelope(trailer, 200)
return trailer
}(),
marker: true,
wantStrip: 0,
},
{
name: "trailer_len smaller than envelope (invalid)",
payload: func() []byte {
video := make([]byte, 20)
var trailer []byte
var tsBuf [8]byte
binary.BigEndian.PutUint64(tsBuf[:], 42)
trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:])
trailer = appendEnvelope(trailer, 3)
return append(video, trailer...)
}(),
marker: true,
wantStrip: 0,
},
{
name: "exactly envelope-only trailer",
payload: appendEnvelope(nil, byte(envelopeSize)),
marker: true,
wantStrip: envelopeSize,
},
{
name: "empty payload",
payload: []byte{},
marker: true,
wantStrip: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := StripTrailer(tt.payload, tt.marker)
if got != tt.wantStrip {
t.Errorf("StripTrailer() = %d, want %d", got, tt.wantStrip)
}
})
}
}