From fc7d4bd01eca3b043ae67dd2f03ac32fed6de96e Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 27 Jul 2023 16:50:18 +0530 Subject: [PATCH] E2EE trailer for server injected packets. (#1908) * Ability to use trailer with server injected frames A 32-byte trailer generated per room. Trailer appended when track encryption is enabled. * E2EE trailer for server injected packets. - Generate a 32-byte per room trailer. Too reasons for longer length o Laziness: utils generates a 32 byte string. o Longer length random string reduces chances of colliding with real data. - Trailer sent in JoinResponse - Trailer added to server injected frames (not to padding only packets) * generate * add a length check * pass trailer in as an argument --- pkg/rtc/mediatrackreceiver.go | 7 ++ pkg/rtc/mediatracksubscriptions.go | 5 ++ pkg/rtc/participant.go | 7 ++ pkg/rtc/room.go | 13 ++++ pkg/rtc/types/interfaces.go | 3 + .../typesfakes/fake_local_media_track.go | 65 +++++++++++++++++++ .../typesfakes/fake_local_participant.go | 65 +++++++++++++++++++ pkg/rtc/types/typesfakes/fake_media_track.go | 65 +++++++++++++++++++ pkg/service/roommanager.go | 1 + pkg/sfu/downtrack.go | 35 +++++++--- 10 files changed, 257 insertions(+), 9 deletions(-) diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 8fc33f20f..a35b328a4 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -800,4 +800,11 @@ func (t *MediaTrackReceiver) GetTemporalLayerForSpatialFps(spatial int32, fps ui return buffer.DefaultMaxLayerTemporal } +func (t *MediaTrackReceiver) IsEncrypted() bool { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.trackInfo.Encryption != livekit.Encryption_NONE +} + // --------------------------- diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 8887176cd..2b107d9bd 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -98,6 +98,10 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * for _, c := range codecs { c.RTCPFeedback = rtcpFeedback } + var trailer []byte + if t.params.MediaTrack.IsEncrypted() { + trailer = sub.GetTrailer() + } downTrack, err := sfu.NewDownTrack( codecs, wr, @@ -105,6 +109,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * subscriberID, t.params.ReceiverConfig.PacketBufferSize, sub.GetPacer(), + trailer, LoggerWithTrack(sub.GetLogger(), trackID, t.params.IsRelayed), ) if err != nil { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index fbc17ac6c..759b39523 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -68,6 +68,7 @@ type ParticipantParams struct { VideoConfig config.VideoConfig ProtocolVersion types.ProtocolVersion Telemetry telemetry.TelemetryService + Trailer []byte PLIThrottleConfig config.PLIThrottleConfig CongestionControlConfig config.CongestionControlConfig EnabledCodecs []*livekit.Codec @@ -224,6 +225,12 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { return p, nil } +func (p *ParticipantImpl) GetTrailer() []byte { + trailer := make([]byte, len(p.params.Trailer)) + copy(trailer, p.params.Trailer) + return trailer +} + func (p *ParticipantImpl) GetLogger() logger.Logger { return p.params.Logger } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 7602659ac..a609a25d2 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -77,6 +77,8 @@ type Room struct { leftAt atomic.Int64 closed chan struct{} + trailer []byte + onParticipantChanged func(p types.LocalParticipant) onRoomUpdated func() onClose func() @@ -111,6 +113,7 @@ func NewRoom( bufferFactory: buffer.NewFactoryOfBufferFactory(config.Receiver.PacketBufferSize), batchedUpdates: make(map[livekit.ParticipantIdentity]*livekit.ParticipantInfo), closed: make(chan struct{}), + trailer: []byte(utils.RandomSecret()), } r.protoProxy = utils.NewProtoProxy[*livekit.Room](roomUpdateInterval, r.updateProto) if r.protoRoom.EmptyTimeout == 0 { @@ -139,6 +142,15 @@ func (r *Room) ID() livekit.RoomID { return livekit.RoomID(r.protoRoom.Sid) } +func (r *Room) Trailer() []byte { + r.lock.RLock() + defer r.lock.RUnlock() + + trailer := make([]byte, len(r.trailer)) + copy(trailer, r.trailer) + return trailer +} + func (r *Room) GetParticipant(identity livekit.ParticipantIdentity) types.LocalParticipant { r.lock.RLock() defer r.lock.RUnlock() @@ -821,6 +833,7 @@ func (r *Room) createJoinResponseLocked(participant types.LocalParticipant, iceS ServerInfo: r.serverInfo, ServerVersion: r.serverInfo.Version, ServerRegion: r.serverInfo.Region, + SifTrailer: r.trailer, } } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index eb9336121..6a426b521 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -277,6 +277,7 @@ type LocalParticipant interface { ToProtoWithVersion() (*livekit.ParticipantInfo, utils.TimedVersion) // getters + GetTrailer() []byte GetLogger() logger.Logger GetAdaptiveStream() bool ProtocolVersion() ProtocolVersion @@ -449,6 +450,8 @@ type MediaTrack interface { Receivers() []sfu.TrackReceiver ClearAllReceivers(willBeResumed bool) + + IsEncrypted() bool } //counterfeiter:generate . LocalMediaTrack diff --git a/pkg/rtc/types/typesfakes/fake_local_media_track.go b/pkg/rtc/types/typesfakes/fake_local_media_track.go index f6ee5b324..0b9a4517c 100644 --- a/pkg/rtc/types/typesfakes/fake_local_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_local_media_track.go @@ -128,6 +128,16 @@ type FakeLocalMediaTrack struct { iDReturnsOnCall map[int]struct { result1 livekit.TrackID } + IsEncryptedStub func() bool + isEncryptedMutex sync.RWMutex + isEncryptedArgsForCall []struct { + } + isEncryptedReturns struct { + result1 bool + } + isEncryptedReturnsOnCall map[int]struct { + result1 bool + } IsMutedStub func() bool isMutedMutex sync.RWMutex isMutedArgsForCall []struct { @@ -928,6 +938,59 @@ func (fake *FakeLocalMediaTrack) IDReturnsOnCall(i int, result1 livekit.TrackID) }{result1} } +func (fake *FakeLocalMediaTrack) IsEncrypted() bool { + fake.isEncryptedMutex.Lock() + ret, specificReturn := fake.isEncryptedReturnsOnCall[len(fake.isEncryptedArgsForCall)] + fake.isEncryptedArgsForCall = append(fake.isEncryptedArgsForCall, struct { + }{}) + stub := fake.IsEncryptedStub + fakeReturns := fake.isEncryptedReturns + fake.recordInvocation("IsEncrypted", []interface{}{}) + fake.isEncryptedMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalMediaTrack) IsEncryptedCallCount() int { + fake.isEncryptedMutex.RLock() + defer fake.isEncryptedMutex.RUnlock() + return len(fake.isEncryptedArgsForCall) +} + +func (fake *FakeLocalMediaTrack) IsEncryptedCalls(stub func() bool) { + fake.isEncryptedMutex.Lock() + defer fake.isEncryptedMutex.Unlock() + fake.IsEncryptedStub = stub +} + +func (fake *FakeLocalMediaTrack) IsEncryptedReturns(result1 bool) { + fake.isEncryptedMutex.Lock() + defer fake.isEncryptedMutex.Unlock() + fake.IsEncryptedStub = nil + fake.isEncryptedReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeLocalMediaTrack) IsEncryptedReturnsOnCall(i int, result1 bool) { + fake.isEncryptedMutex.Lock() + defer fake.isEncryptedMutex.Unlock() + fake.IsEncryptedStub = nil + if fake.isEncryptedReturnsOnCall == nil { + fake.isEncryptedReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.isEncryptedReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeLocalMediaTrack) IsMuted() bool { fake.isMutedMutex.Lock() ret, specificReturn := fake.isMutedReturnsOnCall[len(fake.isMutedArgsForCall)] @@ -1947,6 +2010,8 @@ func (fake *FakeLocalMediaTrack) Invocations() map[string][][]interface{} { defer fake.hasSdpCidMutex.RUnlock() fake.iDMutex.RLock() defer fake.iDMutex.RUnlock() + fake.isEncryptedMutex.RLock() + defer fake.isEncryptedMutex.RUnlock() fake.isMutedMutex.RLock() defer fake.isMutedMutex.RUnlock() fake.isOpenMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index b071877d9..6e3863a45 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -304,6 +304,16 @@ type FakeLocalParticipant struct { getSubscribedTracksReturnsOnCall map[int]struct { result1 []types.SubscribedTrack } + GetTrailerStub func() []byte + getTrailerMutex sync.RWMutex + getTrailerArgsForCall []struct { + } + getTrailerReturns struct { + result1 []byte + } + getTrailerReturnsOnCall map[int]struct { + result1 []byte + } HandleAnswerStub func(webrtc.SessionDescription) handleAnswerMutex sync.RWMutex handleAnswerArgsForCall []struct { @@ -2359,6 +2369,59 @@ func (fake *FakeLocalParticipant) GetSubscribedTracksReturnsOnCall(i int, result }{result1} } +func (fake *FakeLocalParticipant) GetTrailer() []byte { + fake.getTrailerMutex.Lock() + ret, specificReturn := fake.getTrailerReturnsOnCall[len(fake.getTrailerArgsForCall)] + fake.getTrailerArgsForCall = append(fake.getTrailerArgsForCall, struct { + }{}) + stub := fake.GetTrailerStub + fakeReturns := fake.getTrailerReturns + fake.recordInvocation("GetTrailer", []interface{}{}) + fake.getTrailerMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) GetTrailerCallCount() int { + fake.getTrailerMutex.RLock() + defer fake.getTrailerMutex.RUnlock() + return len(fake.getTrailerArgsForCall) +} + +func (fake *FakeLocalParticipant) GetTrailerCalls(stub func() []byte) { + fake.getTrailerMutex.Lock() + defer fake.getTrailerMutex.Unlock() + fake.GetTrailerStub = stub +} + +func (fake *FakeLocalParticipant) GetTrailerReturns(result1 []byte) { + fake.getTrailerMutex.Lock() + defer fake.getTrailerMutex.Unlock() + fake.GetTrailerStub = nil + fake.getTrailerReturns = struct { + result1 []byte + }{result1} +} + +func (fake *FakeLocalParticipant) GetTrailerReturnsOnCall(i int, result1 []byte) { + fake.getTrailerMutex.Lock() + defer fake.getTrailerMutex.Unlock() + fake.GetTrailerStub = nil + if fake.getTrailerReturnsOnCall == nil { + fake.getTrailerReturnsOnCall = make(map[int]struct { + result1 []byte + }) + } + fake.getTrailerReturnsOnCall[i] = struct { + result1 []byte + }{result1} +} + func (fake *FakeLocalParticipant) HandleAnswer(arg1 webrtc.SessionDescription) { fake.handleAnswerMutex.Lock() fake.handleAnswerArgsForCall = append(fake.handleAnswerArgsForCall, struct { @@ -5648,6 +5711,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.getSubscribedParticipantsMutex.RUnlock() fake.getSubscribedTracksMutex.RLock() defer fake.getSubscribedTracksMutex.RUnlock() + fake.getTrailerMutex.RLock() + defer fake.getTrailerMutex.RUnlock() fake.handleAnswerMutex.RLock() defer fake.handleAnswerMutex.RUnlock() fake.handleOfferMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_media_track.go b/pkg/rtc/types/typesfakes/fake_media_track.go index de0e870f4..bb0c07e31 100644 --- a/pkg/rtc/types/typesfakes/fake_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_media_track.go @@ -93,6 +93,16 @@ type FakeMediaTrack struct { iDReturnsOnCall map[int]struct { result1 livekit.TrackID } + IsEncryptedStub func() bool + isEncryptedMutex sync.RWMutex + isEncryptedArgsForCall []struct { + } + isEncryptedReturns struct { + result1 bool + } + isEncryptedReturnsOnCall map[int]struct { + result1 bool + } IsMutedStub func() bool isMutedMutex sync.RWMutex isMutedArgsForCall []struct { @@ -689,6 +699,59 @@ func (fake *FakeMediaTrack) IDReturnsOnCall(i int, result1 livekit.TrackID) { }{result1} } +func (fake *FakeMediaTrack) IsEncrypted() bool { + fake.isEncryptedMutex.Lock() + ret, specificReturn := fake.isEncryptedReturnsOnCall[len(fake.isEncryptedArgsForCall)] + fake.isEncryptedArgsForCall = append(fake.isEncryptedArgsForCall, struct { + }{}) + stub := fake.IsEncryptedStub + fakeReturns := fake.isEncryptedReturns + fake.recordInvocation("IsEncrypted", []interface{}{}) + fake.isEncryptedMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeMediaTrack) IsEncryptedCallCount() int { + fake.isEncryptedMutex.RLock() + defer fake.isEncryptedMutex.RUnlock() + return len(fake.isEncryptedArgsForCall) +} + +func (fake *FakeMediaTrack) IsEncryptedCalls(stub func() bool) { + fake.isEncryptedMutex.Lock() + defer fake.isEncryptedMutex.Unlock() + fake.IsEncryptedStub = stub +} + +func (fake *FakeMediaTrack) IsEncryptedReturns(result1 bool) { + fake.isEncryptedMutex.Lock() + defer fake.isEncryptedMutex.Unlock() + fake.IsEncryptedStub = nil + fake.isEncryptedReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeMediaTrack) IsEncryptedReturnsOnCall(i int, result1 bool) { + fake.isEncryptedMutex.Lock() + defer fake.isEncryptedMutex.Unlock() + fake.IsEncryptedStub = nil + if fake.isEncryptedReturnsOnCall == nil { + fake.isEncryptedReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.isEncryptedReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeMediaTrack) IsMuted() bool { fake.isMutedMutex.Lock() ret, specificReturn := fake.isMutedReturnsOnCall[len(fake.isMutedArgsForCall)] @@ -1522,6 +1585,8 @@ func (fake *FakeMediaTrack) Invocations() map[string][][]interface{} { defer fake.getTemporalLayerForSpatialFpsMutex.RUnlock() fake.iDMutex.RLock() defer fake.iDMutex.RUnlock() + fake.isEncryptedMutex.RLock() + defer fake.isEncryptedMutex.RUnlock() fake.isMutedMutex.RLock() defer fake.isMutedMutex.RUnlock() fake.isOpenMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index b0cefd31c..1920ecda3 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -351,6 +351,7 @@ func (r *RoomManager) StartSession( VideoConfig: r.config.Video, ProtocolVersion: pv, Telemetry: r.telemetry, + Trailer: room.Trailer(), PLIThrottleConfig: r.config.RTC.PLIThrottle, CongestionControlConfig: r.config.RTC.CongestionControl, EnabledCodecs: protoRoom.EnabledCodecs, diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 74f47f84b..4147b1650 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -240,6 +240,8 @@ type DownTrack struct { maxLayerNotifierCh chan struct{} + trailer []byte + cbMu sync.RWMutex onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat) onMaxSubscribedLayerChanged func(dt *DownTrack, layer int32) @@ -255,6 +257,7 @@ func NewDownTrack( subID livekit.ParticipantID, mt int, pacer pacer.Pacer, + trailer []byte, logger logger.Logger, ) (*DownTrack, error) { var kind webrtc.RTPCodecType @@ -279,6 +282,7 @@ func NewDownTrack( kind: kind, codec: codecs[0].RTPCodecCapability, pacer: pacer, + trailer: trailer, maxLayerNotifierCh: make(chan struct{}, 20), } d.forwarder = NewForwarder( @@ -1273,19 +1277,30 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan return done } +func (d *DownTrack) maybeAddTrailer(buf []byte) int { + if len(buf) < len(d.trailer) { + d.logger.Warnw("trailer too big", nil, "bufLen", len(buf), "trailerLen", len(d.trailer)) + return 0 + } + + copy(buf, d.trailer) + return len(d.trailer) +} + func (d *DownTrack) getOpusBlankFrame(_frameEndNeeded bool) ([]byte, error) { // silence frame // Used shortly after muting to ensure residual noise does not keep // generating noise at the decoder after the stream is stopped // i. e. comfort noise generation actually not producing something comfortable. - payload := make([]byte, len(OpusSilenceFrame)) + payload := make([]byte, 1000) copy(payload[0:], OpusSilenceFrame) - return payload, nil + trailerLen := d.maybeAddTrailer(payload[len(OpusSilenceFrame):]) + return payload[:len(OpusSilenceFrame)+trailerLen], nil } func (d *DownTrack) getOpusRedBlankFrame(_frameEndNeeded bool) ([]byte, error) { // primary only silence frame for opus/red, there is no need to contain redundant silent frames - payload := make([]byte, len(OpusSilenceFrame)+1) + payload := make([]byte, 1000) // primary header // 0 1 2 3 4 5 6 7 @@ -1294,7 +1309,8 @@ func (d *DownTrack) getOpusRedBlankFrame(_frameEndNeeded bool) ([]byte, error) { // +-+-+-+-+-+-+-+-+ payload[0] = opusPT copy(payload[1:], OpusSilenceFrame) - return payload, nil + trailerLen := d.maybeAddTrailer(payload[1+len(OpusSilenceFrame):]) + return payload[:1+len(OpusSilenceFrame)+trailerLen], nil } func (d *DownTrack) getVP8BlankFrame(frameEndNeeded bool) ([]byte, error) { @@ -1307,17 +1323,18 @@ func (d *DownTrack) getVP8BlankFrame(frameEndNeeded bool) ([]byte, error) { // Used even when closing out a previous frame. Looks like receivers // do not care about content (it will probably end up being an undecodable // frame, but that should be okay as there are key frames following) - payload := make([]byte, len(blankVP8)+len(VP8KeyFrame8x8)) + payload := make([]byte, 1000) copy(payload[:len(blankVP8)], blankVP8) copy(payload[len(blankVP8):], VP8KeyFrame8x8) - return payload, nil + trailerLen := d.maybeAddTrailer(payload[len(blankVP8)+len(VP8KeyFrame8x8):]) + return payload[:len(blankVP8)+len(VP8KeyFrame8x8)+trailerLen], nil } func (d *DownTrack) getH264BlankFrame(_frameEndNeeded bool) ([]byte, error) { // TODO - Jie Zeng // now use STAP-A to compose sps, pps, idr together, most decoder support packetization-mode 1. // if client only support packetization-mode 0, use single nalu unit packet - buf := make([]byte, 1462) + buf := make([]byte, 1000) offset := 0 buf[0] = 0x18 // STAP-A offset++ @@ -1327,8 +1344,8 @@ func (d *DownTrack) getH264BlankFrame(_frameEndNeeded bool) ([]byte, error) { copy(buf[offset:offset+len(payload)], payload) offset += len(payload) } - payload := buf[:offset] - return payload, nil + offset += d.maybeAddTrailer(buf[offset:]) + return buf[:offset], nil } func (d *DownTrack) handleRTCP(bytes []byte) {