E2EE trailer for server injected packets. (#1908)

* Ability to use trailer with server injected frames

A 32-byte trailer generated per room.
Trailer appended when track encryption is enabled.

* E2EE trailer for server injected packets.

- Generate a 32-byte per room trailer. Too reasons for longer length
  o Laziness: utils generates a 32 byte string.
  o Longer length random string reduces chances of colliding with real data.
- Trailer sent in JoinResponse
- Trailer added to server injected frames (not to padding only packets)

* generate

* add a length check

* pass trailer in as an argument
This commit is contained in:
Raja Subramanian
2023-07-27 16:50:18 +05:30
committed by GitHub
parent 38c4eba5a3
commit fc7d4bd01e
10 changed files with 257 additions and 9 deletions

View File

@@ -800,4 +800,11 @@ func (t *MediaTrackReceiver) GetTemporalLayerForSpatialFps(spatial int32, fps ui
return buffer.DefaultMaxLayerTemporal
}
func (t *MediaTrackReceiver) IsEncrypted() bool {
t.lock.RLock()
defer t.lock.RUnlock()
return t.trackInfo.Encryption != livekit.Encryption_NONE
}
// ---------------------------

View File

@@ -98,6 +98,10 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
for _, c := range codecs {
c.RTCPFeedback = rtcpFeedback
}
var trailer []byte
if t.params.MediaTrack.IsEncrypted() {
trailer = sub.GetTrailer()
}
downTrack, err := sfu.NewDownTrack(
codecs,
wr,
@@ -105,6 +109,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
subscriberID,
t.params.ReceiverConfig.PacketBufferSize,
sub.GetPacer(),
trailer,
LoggerWithTrack(sub.GetLogger(), trackID, t.params.IsRelayed),
)
if err != nil {

View File

@@ -68,6 +68,7 @@ type ParticipantParams struct {
VideoConfig config.VideoConfig
ProtocolVersion types.ProtocolVersion
Telemetry telemetry.TelemetryService
Trailer []byte
PLIThrottleConfig config.PLIThrottleConfig
CongestionControlConfig config.CongestionControlConfig
EnabledCodecs []*livekit.Codec
@@ -224,6 +225,12 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
return p, nil
}
func (p *ParticipantImpl) GetTrailer() []byte {
trailer := make([]byte, len(p.params.Trailer))
copy(trailer, p.params.Trailer)
return trailer
}
func (p *ParticipantImpl) GetLogger() logger.Logger {
return p.params.Logger
}

View File

@@ -77,6 +77,8 @@ type Room struct {
leftAt atomic.Int64
closed chan struct{}
trailer []byte
onParticipantChanged func(p types.LocalParticipant)
onRoomUpdated func()
onClose func()
@@ -111,6 +113,7 @@ func NewRoom(
bufferFactory: buffer.NewFactoryOfBufferFactory(config.Receiver.PacketBufferSize),
batchedUpdates: make(map[livekit.ParticipantIdentity]*livekit.ParticipantInfo),
closed: make(chan struct{}),
trailer: []byte(utils.RandomSecret()),
}
r.protoProxy = utils.NewProtoProxy[*livekit.Room](roomUpdateInterval, r.updateProto)
if r.protoRoom.EmptyTimeout == 0 {
@@ -139,6 +142,15 @@ func (r *Room) ID() livekit.RoomID {
return livekit.RoomID(r.protoRoom.Sid)
}
func (r *Room) Trailer() []byte {
r.lock.RLock()
defer r.lock.RUnlock()
trailer := make([]byte, len(r.trailer))
copy(trailer, r.trailer)
return trailer
}
func (r *Room) GetParticipant(identity livekit.ParticipantIdentity) types.LocalParticipant {
r.lock.RLock()
defer r.lock.RUnlock()
@@ -821,6 +833,7 @@ func (r *Room) createJoinResponseLocked(participant types.LocalParticipant, iceS
ServerInfo: r.serverInfo,
ServerVersion: r.serverInfo.Version,
ServerRegion: r.serverInfo.Region,
SifTrailer: r.trailer,
}
}

View File

@@ -277,6 +277,7 @@ type LocalParticipant interface {
ToProtoWithVersion() (*livekit.ParticipantInfo, utils.TimedVersion)
// getters
GetTrailer() []byte
GetLogger() logger.Logger
GetAdaptiveStream() bool
ProtocolVersion() ProtocolVersion
@@ -449,6 +450,8 @@ type MediaTrack interface {
Receivers() []sfu.TrackReceiver
ClearAllReceivers(willBeResumed bool)
IsEncrypted() bool
}
//counterfeiter:generate . LocalMediaTrack

View File

@@ -128,6 +128,16 @@ type FakeLocalMediaTrack struct {
iDReturnsOnCall map[int]struct {
result1 livekit.TrackID
}
IsEncryptedStub func() bool
isEncryptedMutex sync.RWMutex
isEncryptedArgsForCall []struct {
}
isEncryptedReturns struct {
result1 bool
}
isEncryptedReturnsOnCall map[int]struct {
result1 bool
}
IsMutedStub func() bool
isMutedMutex sync.RWMutex
isMutedArgsForCall []struct {
@@ -928,6 +938,59 @@ func (fake *FakeLocalMediaTrack) IDReturnsOnCall(i int, result1 livekit.TrackID)
}{result1}
}
func (fake *FakeLocalMediaTrack) IsEncrypted() bool {
fake.isEncryptedMutex.Lock()
ret, specificReturn := fake.isEncryptedReturnsOnCall[len(fake.isEncryptedArgsForCall)]
fake.isEncryptedArgsForCall = append(fake.isEncryptedArgsForCall, struct {
}{})
stub := fake.IsEncryptedStub
fakeReturns := fake.isEncryptedReturns
fake.recordInvocation("IsEncrypted", []interface{}{})
fake.isEncryptedMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalMediaTrack) IsEncryptedCallCount() int {
fake.isEncryptedMutex.RLock()
defer fake.isEncryptedMutex.RUnlock()
return len(fake.isEncryptedArgsForCall)
}
func (fake *FakeLocalMediaTrack) IsEncryptedCalls(stub func() bool) {
fake.isEncryptedMutex.Lock()
defer fake.isEncryptedMutex.Unlock()
fake.IsEncryptedStub = stub
}
func (fake *FakeLocalMediaTrack) IsEncryptedReturns(result1 bool) {
fake.isEncryptedMutex.Lock()
defer fake.isEncryptedMutex.Unlock()
fake.IsEncryptedStub = nil
fake.isEncryptedReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalMediaTrack) IsEncryptedReturnsOnCall(i int, result1 bool) {
fake.isEncryptedMutex.Lock()
defer fake.isEncryptedMutex.Unlock()
fake.IsEncryptedStub = nil
if fake.isEncryptedReturnsOnCall == nil {
fake.isEncryptedReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.isEncryptedReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalMediaTrack) IsMuted() bool {
fake.isMutedMutex.Lock()
ret, specificReturn := fake.isMutedReturnsOnCall[len(fake.isMutedArgsForCall)]
@@ -1947,6 +2010,8 @@ func (fake *FakeLocalMediaTrack) Invocations() map[string][][]interface{} {
defer fake.hasSdpCidMutex.RUnlock()
fake.iDMutex.RLock()
defer fake.iDMutex.RUnlock()
fake.isEncryptedMutex.RLock()
defer fake.isEncryptedMutex.RUnlock()
fake.isMutedMutex.RLock()
defer fake.isMutedMutex.RUnlock()
fake.isOpenMutex.RLock()

View File

@@ -304,6 +304,16 @@ type FakeLocalParticipant struct {
getSubscribedTracksReturnsOnCall map[int]struct {
result1 []types.SubscribedTrack
}
GetTrailerStub func() []byte
getTrailerMutex sync.RWMutex
getTrailerArgsForCall []struct {
}
getTrailerReturns struct {
result1 []byte
}
getTrailerReturnsOnCall map[int]struct {
result1 []byte
}
HandleAnswerStub func(webrtc.SessionDescription)
handleAnswerMutex sync.RWMutex
handleAnswerArgsForCall []struct {
@@ -2359,6 +2369,59 @@ func (fake *FakeLocalParticipant) GetSubscribedTracksReturnsOnCall(i int, result
}{result1}
}
func (fake *FakeLocalParticipant) GetTrailer() []byte {
fake.getTrailerMutex.Lock()
ret, specificReturn := fake.getTrailerReturnsOnCall[len(fake.getTrailerArgsForCall)]
fake.getTrailerArgsForCall = append(fake.getTrailerArgsForCall, struct {
}{})
stub := fake.GetTrailerStub
fakeReturns := fake.getTrailerReturns
fake.recordInvocation("GetTrailer", []interface{}{})
fake.getTrailerMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) GetTrailerCallCount() int {
fake.getTrailerMutex.RLock()
defer fake.getTrailerMutex.RUnlock()
return len(fake.getTrailerArgsForCall)
}
func (fake *FakeLocalParticipant) GetTrailerCalls(stub func() []byte) {
fake.getTrailerMutex.Lock()
defer fake.getTrailerMutex.Unlock()
fake.GetTrailerStub = stub
}
func (fake *FakeLocalParticipant) GetTrailerReturns(result1 []byte) {
fake.getTrailerMutex.Lock()
defer fake.getTrailerMutex.Unlock()
fake.GetTrailerStub = nil
fake.getTrailerReturns = struct {
result1 []byte
}{result1}
}
func (fake *FakeLocalParticipant) GetTrailerReturnsOnCall(i int, result1 []byte) {
fake.getTrailerMutex.Lock()
defer fake.getTrailerMutex.Unlock()
fake.GetTrailerStub = nil
if fake.getTrailerReturnsOnCall == nil {
fake.getTrailerReturnsOnCall = make(map[int]struct {
result1 []byte
})
}
fake.getTrailerReturnsOnCall[i] = struct {
result1 []byte
}{result1}
}
func (fake *FakeLocalParticipant) HandleAnswer(arg1 webrtc.SessionDescription) {
fake.handleAnswerMutex.Lock()
fake.handleAnswerArgsForCall = append(fake.handleAnswerArgsForCall, struct {
@@ -5648,6 +5711,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.getSubscribedParticipantsMutex.RUnlock()
fake.getSubscribedTracksMutex.RLock()
defer fake.getSubscribedTracksMutex.RUnlock()
fake.getTrailerMutex.RLock()
defer fake.getTrailerMutex.RUnlock()
fake.handleAnswerMutex.RLock()
defer fake.handleAnswerMutex.RUnlock()
fake.handleOfferMutex.RLock()

View File

@@ -93,6 +93,16 @@ type FakeMediaTrack struct {
iDReturnsOnCall map[int]struct {
result1 livekit.TrackID
}
IsEncryptedStub func() bool
isEncryptedMutex sync.RWMutex
isEncryptedArgsForCall []struct {
}
isEncryptedReturns struct {
result1 bool
}
isEncryptedReturnsOnCall map[int]struct {
result1 bool
}
IsMutedStub func() bool
isMutedMutex sync.RWMutex
isMutedArgsForCall []struct {
@@ -689,6 +699,59 @@ func (fake *FakeMediaTrack) IDReturnsOnCall(i int, result1 livekit.TrackID) {
}{result1}
}
func (fake *FakeMediaTrack) IsEncrypted() bool {
fake.isEncryptedMutex.Lock()
ret, specificReturn := fake.isEncryptedReturnsOnCall[len(fake.isEncryptedArgsForCall)]
fake.isEncryptedArgsForCall = append(fake.isEncryptedArgsForCall, struct {
}{})
stub := fake.IsEncryptedStub
fakeReturns := fake.isEncryptedReturns
fake.recordInvocation("IsEncrypted", []interface{}{})
fake.isEncryptedMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeMediaTrack) IsEncryptedCallCount() int {
fake.isEncryptedMutex.RLock()
defer fake.isEncryptedMutex.RUnlock()
return len(fake.isEncryptedArgsForCall)
}
func (fake *FakeMediaTrack) IsEncryptedCalls(stub func() bool) {
fake.isEncryptedMutex.Lock()
defer fake.isEncryptedMutex.Unlock()
fake.IsEncryptedStub = stub
}
func (fake *FakeMediaTrack) IsEncryptedReturns(result1 bool) {
fake.isEncryptedMutex.Lock()
defer fake.isEncryptedMutex.Unlock()
fake.IsEncryptedStub = nil
fake.isEncryptedReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeMediaTrack) IsEncryptedReturnsOnCall(i int, result1 bool) {
fake.isEncryptedMutex.Lock()
defer fake.isEncryptedMutex.Unlock()
fake.IsEncryptedStub = nil
if fake.isEncryptedReturnsOnCall == nil {
fake.isEncryptedReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.isEncryptedReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeMediaTrack) IsMuted() bool {
fake.isMutedMutex.Lock()
ret, specificReturn := fake.isMutedReturnsOnCall[len(fake.isMutedArgsForCall)]
@@ -1522,6 +1585,8 @@ func (fake *FakeMediaTrack) Invocations() map[string][][]interface{} {
defer fake.getTemporalLayerForSpatialFpsMutex.RUnlock()
fake.iDMutex.RLock()
defer fake.iDMutex.RUnlock()
fake.isEncryptedMutex.RLock()
defer fake.isEncryptedMutex.RUnlock()
fake.isMutedMutex.RLock()
defer fake.isMutedMutex.RUnlock()
fake.isOpenMutex.RLock()

View File

@@ -351,6 +351,7 @@ func (r *RoomManager) StartSession(
VideoConfig: r.config.Video,
ProtocolVersion: pv,
Telemetry: r.telemetry,
Trailer: room.Trailer(),
PLIThrottleConfig: r.config.RTC.PLIThrottle,
CongestionControlConfig: r.config.RTC.CongestionControl,
EnabledCodecs: protoRoom.EnabledCodecs,

View File

@@ -240,6 +240,8 @@ type DownTrack struct {
maxLayerNotifierCh chan struct{}
trailer []byte
cbMu sync.RWMutex
onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat)
onMaxSubscribedLayerChanged func(dt *DownTrack, layer int32)
@@ -255,6 +257,7 @@ func NewDownTrack(
subID livekit.ParticipantID,
mt int,
pacer pacer.Pacer,
trailer []byte,
logger logger.Logger,
) (*DownTrack, error) {
var kind webrtc.RTPCodecType
@@ -279,6 +282,7 @@ func NewDownTrack(
kind: kind,
codec: codecs[0].RTPCodecCapability,
pacer: pacer,
trailer: trailer,
maxLayerNotifierCh: make(chan struct{}, 20),
}
d.forwarder = NewForwarder(
@@ -1273,19 +1277,30 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
return done
}
func (d *DownTrack) maybeAddTrailer(buf []byte) int {
if len(buf) < len(d.trailer) {
d.logger.Warnw("trailer too big", nil, "bufLen", len(buf), "trailerLen", len(d.trailer))
return 0
}
copy(buf, d.trailer)
return len(d.trailer)
}
func (d *DownTrack) getOpusBlankFrame(_frameEndNeeded bool) ([]byte, error) {
// silence frame
// Used shortly after muting to ensure residual noise does not keep
// generating noise at the decoder after the stream is stopped
// i. e. comfort noise generation actually not producing something comfortable.
payload := make([]byte, len(OpusSilenceFrame))
payload := make([]byte, 1000)
copy(payload[0:], OpusSilenceFrame)
return payload, nil
trailerLen := d.maybeAddTrailer(payload[len(OpusSilenceFrame):])
return payload[:len(OpusSilenceFrame)+trailerLen], nil
}
func (d *DownTrack) getOpusRedBlankFrame(_frameEndNeeded bool) ([]byte, error) {
// primary only silence frame for opus/red, there is no need to contain redundant silent frames
payload := make([]byte, len(OpusSilenceFrame)+1)
payload := make([]byte, 1000)
// primary header
// 0 1 2 3 4 5 6 7
@@ -1294,7 +1309,8 @@ func (d *DownTrack) getOpusRedBlankFrame(_frameEndNeeded bool) ([]byte, error) {
// +-+-+-+-+-+-+-+-+
payload[0] = opusPT
copy(payload[1:], OpusSilenceFrame)
return payload, nil
trailerLen := d.maybeAddTrailer(payload[1+len(OpusSilenceFrame):])
return payload[:1+len(OpusSilenceFrame)+trailerLen], nil
}
func (d *DownTrack) getVP8BlankFrame(frameEndNeeded bool) ([]byte, error) {
@@ -1307,17 +1323,18 @@ func (d *DownTrack) getVP8BlankFrame(frameEndNeeded bool) ([]byte, error) {
// Used even when closing out a previous frame. Looks like receivers
// do not care about content (it will probably end up being an undecodable
// frame, but that should be okay as there are key frames following)
payload := make([]byte, len(blankVP8)+len(VP8KeyFrame8x8))
payload := make([]byte, 1000)
copy(payload[:len(blankVP8)], blankVP8)
copy(payload[len(blankVP8):], VP8KeyFrame8x8)
return payload, nil
trailerLen := d.maybeAddTrailer(payload[len(blankVP8)+len(VP8KeyFrame8x8):])
return payload[:len(blankVP8)+len(VP8KeyFrame8x8)+trailerLen], nil
}
func (d *DownTrack) getH264BlankFrame(_frameEndNeeded bool) ([]byte, error) {
// TODO - Jie Zeng
// now use STAP-A to compose sps, pps, idr together, most decoder support packetization-mode 1.
// if client only support packetization-mode 0, use single nalu unit packet
buf := make([]byte, 1462)
buf := make([]byte, 1000)
offset := 0
buf[0] = 0x18 // STAP-A
offset++
@@ -1327,8 +1344,8 @@ func (d *DownTrack) getH264BlankFrame(_frameEndNeeded bool) ([]byte, error) {
copy(buf[offset:offset+len(payload)], payload)
offset += len(payload)
}
payload := buf[:offset]
return payload, nil
offset += d.maybeAddTrailer(buf[offset:])
return buf[:offset], nil
}
func (d *DownTrack) handleRTCP(bytes []byte) {