From f622fc2490baa98295fb5f1d1f5e6953d9616fdc Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 23 Oct 2023 16:58:02 +0530 Subject: [PATCH 01/13] Sample clock skew down by an order of magnitude (#2173) --- pkg/sfu/buffer/rtpstats_receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 77300f7ab..a5d102c61 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -333,7 +333,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) if (timeSinceLast > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate)) || (timeSinceFirst > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate)) { - if r.clockSkewCount%10 == 0 { + if r.clockSkewCount%100 == 0 { r.logger.Infow( "clock rate skew", "first", r.srFirst.ToString(), From 0f27dda2812c8e6c58924de4d43bd450523e78c4 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 23 Oct 2023 09:15:18 -0700 Subject: [PATCH 02/13] move CreateEgress call (#2168) --- pkg/service/egress.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/service/egress.go b/pkg/service/egress.go index 0a77a231f..3dfa2b6ca 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -190,7 +190,17 @@ func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomNa } func (s *egressLauncher) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { - return s.StartEgressWithClusterId(ctx, "", req) + info, err := s.StartEgressWithClusterId(ctx, "", req) + if err != nil { + return nil, err + } + + _, err = s.io.CreateEgress(ctx, info) + if err != nil { + logger.Errorw("failed to create egress", err) + } + + return info, nil } func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { @@ -203,17 +213,7 @@ func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId req.EgressId = utils.NewGuid(utils.EgressPrefix) } - info, err := s.client.StartEgress(ctx, clusterId, req) - if err != nil { - return nil, err - } - - _, err = s.io.CreateEgress(ctx, info) - if err != nil { - logger.Errorw("failed to create egress", err) - } - - return info, nil + return s.client.StartEgress(ctx, clusterId, req) } type LayoutMetadata struct { From f4a361800000f269cc6b52614698e97f90970ad0 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 23 Oct 2023 23:00:03 +0530 Subject: [PATCH 03/13] Log error on 0 time stamp. (#2174) Need backtrace for source of it. Also, do not reset start if 0, that is incorrect. --- pkg/sfu/buffer/rtpstats_sender.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 59e01a831..8efb6cef2 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -394,7 +394,18 @@ func (r *RTPStatsSender) Update( "tsBefore", r.extStartTS, "tsAfter", extTimestamp, ) - r.extStartTS = extTimestamp + if extTimestamp == 0 { // TODO-REMOVE-AFTER-DEBUG + r.logger.Errorw( + "invalid start timestamp", nil, + "snBefore", r.extStartSN, + "snAfter", extSequenceNumber, + "tsBefore", r.extStartTS, + "tsAfter", extTimestamp, + ) + } + if extTimestamp != 0 { + r.extStartTS = extTimestamp + } } if extTimestamp > r.extHighestTS { From 1ee808ec7d3132c875642b0935543e982917e37b Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Tue, 24 Oct 2023 14:09:40 +0800 Subject: [PATCH 04/13] Fix frame chain can't detect broken if currentLayer is not valid (#2176) --- config-sample.yaml | 6 ------ pkg/sfu/videolayerselector/dependencydescriptor.go | 5 ----- 2 files changed, 11 deletions(-) diff --git a/config-sample.yaml b/config-sample.yaml index b204340c4..4f9fb3a1a 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -112,12 +112,6 @@ rtc: # batch_io: # batch_size: 128 # max_flush_interval: 2ms - # # force a reconnect on a publication error - # reconnect_on_publication_error: true - # # force a reconnect on a subscription error - # reconnect_on_subscription_error: true - # # force a reconnect on a data channel error - # reconnect_on_data_channel_error: true # # max number of bytes to buffer for data channel. 0 means unlimited. # # when this limit is breached, data messages will be dropped till the buffered amount drops below this limit. # data_channel_max_buffered_amount: 0 diff --git a/pkg/sfu/videolayerselector/dependencydescriptor.go b/pkg/sfu/videolayerselector/dependencydescriptor.go index 9d29427be..1306d94b6 100644 --- a/pkg/sfu/videolayerselector/dependencydescriptor.go +++ b/pkg/sfu/videolayerselector/dependencydescriptor.go @@ -88,11 +88,6 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r return } - if !d.currentLayer.IsValid() && !extPkt.KeyFrame { - d.decisions.AddDropped(extFrameNum) - return - } - if ddwdt.StructureUpdated { d.updateDependencyStructure(dd.AttachedStructure, ddwdt.DecodeTargets) } From df9d6ee0f4ac5429295b8311d1cbc036b573ad02 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 24 Oct 2023 12:59:25 +0530 Subject: [PATCH 05/13] Update protocol. (#2177) --- go.mod | 6 +++--- go.sum | 12 ++++++------ pkg/service/egress.go | 4 ---- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 8a990ac1a..8ec9cd47e 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,8 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca - github.com/livekit/psrpc v0.3.3 + github.com/livekit/protocol v1.8.1-0.20231024024326-07ca9d4e47bd + github.com/livekit/psrpc v0.4.0 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.7.0 @@ -78,7 +78,7 @@ require ( github.com/mdlayher/netlink v1.7.1 // indirect github.com/mdlayher/socket v0.4.0 // indirect github.com/nats-io/nats.go v1.31.0 // indirect - github.com/nats-io/nkeys v0.4.5 // indirect + github.com/nats-io/nkeys v0.4.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pion/datachannel v1.5.5 // indirect github.com/pion/logging v0.2.2 // indirect diff --git a/go.sum b/go.sum index 9927c1032..53e811338 100644 --- a/go.sum +++ b/go.sum @@ -125,10 +125,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca h1:M3gymSJFriayMERPin3BSGgYTUpLVfGaMgmqp+3JV6I= -github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca/go.mod h1:K47UnOyc9C0L9LEBDznwd1NMBSsohgyhfurshU5f+4Y= -github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= -github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= +github.com/livekit/protocol v1.8.1-0.20231024024326-07ca9d4e47bd h1:5By2nxIMS9MApyJ654STmtwb0V4MeuaH77lqUTxV3nw= +github.com/livekit/protocol v1.8.1-0.20231024024326-07ca9d4e47bd/go.mod h1:oTWtPGfpZSJGKRrbSvDQK0jiuUylYzhiw/bnGB4Cqko= +github.com/livekit/psrpc v0.4.0 h1:oC4l99HSot/aUza4ZR3ZcZW1SRZm34KCHX3wAbmw6Lo= +github.com/livekit/psrpc v0.4.0/go.mod h1:1XYH1LLoD/YbvBvt6xg2KQ/J3InLXSJK6PL/+DKmuAU= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= @@ -162,8 +162,8 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= -github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= -github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= +github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= diff --git a/pkg/service/egress.go b/pkg/service/egress.go index 3dfa2b6ca..d971aaa34 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -283,10 +283,6 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr return info, nil } -func (s *EgressService) UpdateOutputs(ctx context.Context, req *livekit.UpdateOutputsRequest) (*livekit.EgressInfo, error) { - return nil, twirp.NewError(twirp.Unimplemented, "Update Outputs unimplemented") -} - func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) { if req.RoomName != "" { AppendLogFields(ctx, "room", req.RoomName) From d6ad857506bb0ebebcc47cd6a2f5ebec091d2eca Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 24 Oct 2023 18:21:59 +0530 Subject: [PATCH 06/13] Do not post to closed channels. (#2179) * Do not post to closed channels. Perils of atomics. Hard to imagine, but I guess it could happen. The postMaxLayerNotifier checked for closed and down track was not closed. But, between that check and posting to channel (which is a very small window), the down track could have been closed and the channel (maxLayerNotiferCh) is closed. Protect that channel post + close with the bind lock. * reduce the change * Check for closed inside lock --- pkg/sfu/downtrack.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 880f047e8..98f3f4c12 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -637,14 +637,18 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { } func (d *DownTrack) postMaxLayerNotifierEvent() { - if d.IsClosed() || d.kind != webrtc.RTPCodecTypeVideo { + if d.kind != webrtc.RTPCodecTypeVideo { return } - select { - case d.maxLayerNotifierCh <- struct{}{}: - default: + d.bindLock.Lock() + if !d.IsClosed() { + select { + case d.maxLayerNotifierCh <- struct{}{}: + default: + } } + d.bindLock.Unlock() } func (d *DownTrack) maxLayerNotifierWorker() { @@ -930,12 +934,13 @@ func (d *DownTrack) Close() { // 2. in case of session migration, participant migrate from other node, video track should // be resumed with same participant, set flush=false since we don't need to flush decoder. func (d *DownTrack) CloseWithFlush(flush bool) { + d.bindLock.Lock() if d.isClosed.Swap(true) { + d.bindLock.Unlock() // already closed return } - d.bindLock.Lock() d.params.Logger.Debugw("close down track", "flushBlankFrame", flush) if d.bound.Load() { d.forwarder.Mute(true, true) @@ -959,6 +964,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) { } d.bound.Store(false) + d.onBindAndConnectedChange() d.params.Logger.Debugw("closing sender", "kind", d.kind) } d.params.Receiver.DeleteDownTrack(d.params.SubID) @@ -969,13 +975,12 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.rtcpReader.OnPacket(nil) } + close(d.maxLayerNotifierCh) d.bindLock.Unlock() d.connectionStats.Close() d.rtpStats.Stop() d.params.Logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", d.rtpStats.ToString()) - close(d.maxLayerNotifierCh) - if onCloseHandler := d.getOnCloseHandler(); onCloseHandler != nil { onCloseHandler(!flush) } From 66750e4ba8b8fdedb0398babd683dddb99ecfac4 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 24 Oct 2023 22:12:38 +0530 Subject: [PATCH 07/13] Fix deadlock (#2180) * Fix deadlock My previous PR to wrap layer notifier post in bind lock was problematic as `onBinding` callback happens within that lock and that onBinding callback can call set max layer which will post to channel. Use a separate mutex. * RUnlock --- pkg/sfu/downtrack.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 98f3f4c12..86188797b 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -269,7 +269,9 @@ type DownTrack struct { pacer pacer.Pacer - maxLayerNotifierCh chan struct{} + maxLayerNotifierChMu sync.RWMutex + maxLayerNotifierCh chan struct{} + maxLayerNotifierChClosed bool cbMu sync.RWMutex onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat) @@ -641,14 +643,14 @@ func (d *DownTrack) postMaxLayerNotifierEvent() { return } - d.bindLock.Lock() - if !d.IsClosed() { + d.maxLayerNotifierChMu.RLock() + if !d.maxLayerNotifierChClosed { select { case d.maxLayerNotifierCh <- struct{}{}: default: } } - d.bindLock.Unlock() + d.maxLayerNotifierChMu.RUnlock() } func (d *DownTrack) maxLayerNotifierWorker() { @@ -934,13 +936,12 @@ func (d *DownTrack) Close() { // 2. in case of session migration, participant migrate from other node, video track should // be resumed with same participant, set flush=false since we don't need to flush decoder. func (d *DownTrack) CloseWithFlush(flush bool) { - d.bindLock.Lock() if d.isClosed.Swap(true) { - d.bindLock.Unlock() // already closed return } + d.bindLock.Lock() d.params.Logger.Debugw("close down track", "flushBlankFrame", flush) if d.bound.Load() { d.forwarder.Mute(true, true) @@ -975,12 +976,16 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.rtcpReader.OnPacket(nil) } - close(d.maxLayerNotifierCh) d.bindLock.Unlock() d.connectionStats.Close() d.rtpStats.Stop() d.params.Logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", d.rtpStats.ToString()) + d.maxLayerNotifierChMu.Lock() + d.maxLayerNotifierChClosed = true + close(d.maxLayerNotifierCh) + d.maxLayerNotifierChMu.Unlock() + if onCloseHandler := d.getOnCloseHandler(); onCloseHandler != nil { onCloseHandler(!flush) } From f80e87b2167da68486afa0c4369f0b9efa2d9fb2 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Tue, 24 Oct 2023 11:41:52 -0700 Subject: [PATCH 08/13] skip psrpc service registration unless the config is enabled (#2181) --- pkg/service/roommanager.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 8a21933dd..14caa049e 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -433,10 +433,12 @@ func (r *RoomManager) StartSession( _ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false) return err } - if err := r.roomServer.RegisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())); err != nil { - pLogger.Errorw("could not join register participant topic", err) - _ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false) - return err + if r.config.PSRPC.Enabled { + if err := r.roomServer.RegisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())); err != nil { + pLogger.Errorw("could not join register participant topic", err) + _ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false) + return err + } } if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil { pLogger.Errorw("could not store participant", err) @@ -461,7 +463,9 @@ func (r *RoomManager) StartSession( pLogger.Errorw("could not delete participant", err) } - r.roomServer.DeregisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())) + if r.config.PSRPC.Enabled { + r.roomServer.DeregisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())) + } // update room store with new numParticipants proto := room.ToProto() @@ -503,8 +507,10 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room return nil, err } - if err := r.roomServer.RegisterAllRoomTopics(rpc.FormatRoomTopic(roomName)); err != nil { - return nil, err + if r.config.PSRPC.Enabled { + if err := r.roomServer.RegisterAllRoomTopics(rpc.FormatRoomTopic(roomName)); err != nil { + return nil, err + } } r.lock.Lock() @@ -525,7 +531,9 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry, r.egressLauncher) newRoom.OnClose(func() { - r.roomServer.DeregisterAllRoomTopics(rpc.FormatRoomTopic(roomName)) + if r.config.PSRPC.Enabled { + r.roomServer.DeregisterAllRoomTopics(rpc.FormatRoomTopic(roomName)) + } roomInfo := newRoom.ToProto() r.telemetry.RoomEnded(ctx, roomInfo) From b8ac836b9bd5820ea28c6cafdcd98521e174cd2e Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 24 Oct 2023 13:05:23 -0700 Subject: [PATCH 09/13] Only launch room egress once (#2175) * only launch room egress once * regenerate fakes --- pkg/service/interfaces.go | 2 +- pkg/service/roomallocator.go | 22 ++++++------- pkg/service/roomallocator_test.go | 6 ++-- pkg/service/roomservice.go | 6 ++-- pkg/service/rtcservice.go | 2 +- .../servicefakes/fake_room_allocator.go | 33 +++++++++++-------- 6 files changed, 38 insertions(+), 33 deletions(-) diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index ba390554f..653269225 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -83,6 +83,6 @@ type IOClient interface { //counterfeiter:generate . RoomAllocator type RoomAllocator interface { - CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) + CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error) ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error } diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index 2d538cdd1..f222f7b15 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -50,10 +50,10 @@ func NewRoomAllocator(conf *config.Config, router routing.Router, rs ObjectStore // CreateRoom creates a new room from a request and allocates it to a node to handle // it'll also monitor its state, and cleans it up when appropriate -func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) { +func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error) { token, err := r.roomStore.LockRoom(ctx, livekit.RoomName(req.Name), 5*time.Second) if err != nil { - return nil, err + return nil, false, err } defer func() { _ = r.roomStore.UnlockRoom(ctx, livekit.RoomName(req.Name), token) @@ -71,7 +71,7 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre internal = &livekit.RoomInternal{} applyDefaultRoomConfig(rm, internal, &r.config.Room) } else if err != nil { - return nil, err + return nil, false, err } if req.EmptyTimeout > 0 { @@ -98,23 +98,23 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre } if err = r.roomStore.StoreRoom(ctx, rm, internal); err != nil { - return nil, err + return nil, false, err } // check if room already assigned existing, err := r.router.GetNodeForRoom(ctx, livekit.RoomName(rm.Name)) if err != routing.ErrNotFound && err != nil { - return nil, err + return nil, false, err } // if already assigned and still available, keep it on that node if err == nil && selector.IsAvailable(existing) { // if node hosting the room is full, deny entry if selector.LimitsReached(r.config.Limit, existing.Stats) { - return nil, routing.ErrNodeLimitReached + return nil, false, routing.ErrNodeLimitReached } - return rm, nil + return rm, false, nil } // select a new node @@ -122,12 +122,12 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre if nodeID == "" { nodes, err := r.router.ListNodes() if err != nil { - return nil, err + return nil, false, err } node, err := r.selector.SelectNode(nodes) if err != nil { - return nil, err + return nil, false, err } nodeID = livekit.NodeID(node.Id) @@ -136,10 +136,10 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre logger.Infow("selected node for room", "room", rm.Name, "roomID", rm.Sid, "selectedNodeID", nodeID) err = r.router.SetNodeForRoom(ctx, livekit.RoomName(rm.Name), nodeID) if err != nil { - return nil, err + return nil, false, err } - return rm, nil + return rm, true, nil } func (r *StandardRoomAllocator) ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error { diff --git a/pkg/service/roomallocator_test.go b/pkg/service/roomallocator_test.go index 4397e22c4..8c87dc8c2 100644 --- a/pkg/service/roomallocator_test.go +++ b/pkg/service/roomallocator_test.go @@ -39,7 +39,7 @@ func TestCreateRoom(t *testing.T) { ra, conf := newTestRoomAllocator(t, conf, node) - room, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"}) + room, _, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"}) require.NoError(t, err) require.Equal(t, conf.Room.EmptyTimeout, room.EmptyTimeout) require.NotEmpty(t, room.EnabledCodecs) @@ -57,7 +57,7 @@ func TestCreateRoom(t *testing.T) { ra, _ := newTestRoomAllocator(t, conf, node) - _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"}) + _, _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"}) require.ErrorIs(t, err, routing.ErrNodeLimitReached) }) @@ -73,7 +73,7 @@ func TestCreateRoom(t *testing.T) { ra, _ := newTestRoomAllocator(t, conf, node) - _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"}) + _, _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"}) require.ErrorIs(t, err, routing.ErrNodeLimitReached) }) } diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 2f75ac00c..dd9cefebc 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -79,7 +79,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return nil, ErrEgressNotConnected } - rm, err := s.roomAllocator.CreateRoom(ctx, req) + rm, created, err := s.roomAllocator.CreateRoom(ctx, req) if err != nil { err = errors.Wrap(err, "could not create room") return nil, err @@ -109,7 +109,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return nil, err } - if req.Egress != nil && req.Egress.Room != nil { + if created && req.Egress != nil && req.Egress.Room != nil { egress := &rpc.StartEgressRequest{ Request: &rpc.StartEgressRequest_RoomComposite{ RoomComposite: req.Egress.Room, @@ -424,7 +424,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat // no one has joined the room, would not have been created on an RTC node. // in this case, we'd want to run create again - _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{ + _, _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{ Name: req.Room, Metadata: req.Metadata, }) diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index dc203fec9..b0683115b 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -517,7 +517,7 @@ type connectionResult struct { func (s *RTCService) startConnection(ctx context.Context, roomName livekit.RoomName, pi routing.ParticipantInit, timeout time.Duration) (connectionResult, *livekit.SignalResponse, error) { var cr connectionResult var err error - cr.Room, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName)}) + cr.Room, _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName)}) if err != nil { return cr, nil, err } diff --git a/pkg/service/servicefakes/fake_room_allocator.go b/pkg/service/servicefakes/fake_room_allocator.go index e88dde554..134b9649b 100644 --- a/pkg/service/servicefakes/fake_room_allocator.go +++ b/pkg/service/servicefakes/fake_room_allocator.go @@ -10,7 +10,7 @@ import ( ) type FakeRoomAllocator struct { - CreateRoomStub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, error) + CreateRoomStub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, bool, error) createRoomMutex sync.RWMutex createRoomArgsForCall []struct { arg1 context.Context @@ -18,11 +18,13 @@ type FakeRoomAllocator struct { } createRoomReturns struct { result1 *livekit.Room - result2 error + result2 bool + result3 error } createRoomReturnsOnCall map[int]struct { result1 *livekit.Room - result2 error + result2 bool + result3 error } ValidateCreateRoomStub func(context.Context, livekit.RoomName) error validateCreateRoomMutex sync.RWMutex @@ -40,7 +42,7 @@ type FakeRoomAllocator struct { invocationsMutex sync.RWMutex } -func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.CreateRoomRequest) (*livekit.Room, error) { +func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.CreateRoomRequest) (*livekit.Room, bool, error) { fake.createRoomMutex.Lock() ret, specificReturn := fake.createRoomReturnsOnCall[len(fake.createRoomArgsForCall)] fake.createRoomArgsForCall = append(fake.createRoomArgsForCall, struct { @@ -55,9 +57,9 @@ func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.Cr return stub(arg1, arg2) } if specificReturn { - return ret.result1, ret.result2 + return ret.result1, ret.result2, ret.result3 } - return fakeReturns.result1, fakeReturns.result2 + return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3 } func (fake *FakeRoomAllocator) CreateRoomCallCount() int { @@ -66,7 +68,7 @@ func (fake *FakeRoomAllocator) CreateRoomCallCount() int { return len(fake.createRoomArgsForCall) } -func (fake *FakeRoomAllocator) CreateRoomCalls(stub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, error)) { +func (fake *FakeRoomAllocator) CreateRoomCalls(stub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, bool, error)) { fake.createRoomMutex.Lock() defer fake.createRoomMutex.Unlock() fake.CreateRoomStub = stub @@ -79,30 +81,33 @@ func (fake *FakeRoomAllocator) CreateRoomArgsForCall(i int) (context.Context, *l return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeRoomAllocator) CreateRoomReturns(result1 *livekit.Room, result2 error) { +func (fake *FakeRoomAllocator) CreateRoomReturns(result1 *livekit.Room, result2 bool, result3 error) { fake.createRoomMutex.Lock() defer fake.createRoomMutex.Unlock() fake.CreateRoomStub = nil fake.createRoomReturns = struct { result1 *livekit.Room - result2 error - }{result1, result2} + result2 bool + result3 error + }{result1, result2, result3} } -func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.Room, result2 error) { +func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.Room, result2 bool, result3 error) { fake.createRoomMutex.Lock() defer fake.createRoomMutex.Unlock() fake.CreateRoomStub = nil if fake.createRoomReturnsOnCall == nil { fake.createRoomReturnsOnCall = make(map[int]struct { result1 *livekit.Room - result2 error + result2 bool + result3 error }) } fake.createRoomReturnsOnCall[i] = struct { result1 *livekit.Room - result2 error - }{result1, result2} + result2 bool + result3 error + }{result1, result2, result3} } func (fake *FakeRoomAllocator) ValidateCreateRoom(arg1 context.Context, arg2 livekit.RoomName) error { From 48dba9d58984a0ff9d3fff56b570711771cde852 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Tue, 24 Oct 2023 17:46:07 -0700 Subject: [PATCH 10/13] reduce closing signal stream log level (#2182) --- pkg/routing/signal.go | 2 +- pkg/service/signal.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/routing/signal.go b/pkg/routing/signal.go index ecb1c6c7c..e7181038d 100644 --- a/pkg/routing/signal.go +++ b/pkg/routing/signal.go @@ -133,7 +133,7 @@ func (r *signalClient) StartParticipantSignal( signalResponseMessageReader{}, r.config, ) - l.Infow("signal stream closed", "error", err) + l.Debugw("signal stream closed", "error", err) resChan.Close() }() diff --git a/pkg/service/signal.go b/pkg/service/signal.go index 862d79e44..4c9e085be 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -164,7 +164,7 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe signalRequestMessageReader{}, r.config, ) - l.Infow("signal stream closed", "error", err) + l.Debugw("signal stream closed", "error", err) reqChan.Close() }() From 0296a5bd8672453dcea01e30582ae2bd8238754b Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 25 Oct 2023 16:59:37 +0800 Subject: [PATCH 11/13] Remove un-preferred codecs for android firefox (#2183) * Remove un-preferred codecs for android firefox Android firefox don't comply with the codec order in answer sdp and has problem to publish h.264, remove other codecs to fix this. * false(false) is true --- pkg/rtc/clientinfo.go | 8 ++++++++ pkg/rtc/participant_sdp.go | 5 ++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/rtc/clientinfo.go b/pkg/rtc/clientinfo.go index 7912968b0..62117e0d1 100644 --- a/pkg/rtc/clientinfo.go +++ b/pkg/rtc/clientinfo.go @@ -37,6 +37,10 @@ func (c ClientInfo) isGo() bool { return c.ClientInfo != nil && c.ClientInfo.Sdk == livekit.ClientInfo_GO } +func (c ClientInfo) isLinux() bool { + return c.ClientInfo != nil && strings.EqualFold(c.ClientInfo.Os, "linux") +} + func (c ClientInfo) SupportsAudioRED() bool { return !c.isFirefox() && !c.isSafari() } @@ -80,6 +84,10 @@ func (c ClientInfo) SupportsChangeRTPSenderEncodingActive() bool { return !c.isFirefox() } +func (c ClientInfo) ComplyWithCodecOrderInSDPAnswer() bool { + return !(c.isLinux() && c.isFirefox()) +} + // compareVersion compares a semver against the current client SDK version // returning 1 if current version is greater than version // 0 if they are the same, and -1 if it's an earlier version diff --git a/pkg/rtc/participant_sdp.go b/pkg/rtc/participant_sdp.go index 9bacce907..70829a34c 100644 --- a/pkg/rtc/participant_sdp.go +++ b/pkg/rtc/participant_sdp.go @@ -180,7 +180,10 @@ func (p *ParticipantImpl) setCodecPreferencesVideoForPublisher(offer webrtc.Sess } unmatchVideo.MediaName.Formats = append(unmatchVideo.MediaName.Formats[:0], preferredCodecs...) - unmatchVideo.MediaName.Formats = append(unmatchVideo.MediaName.Formats, leftCodecs...) + // if the client don't comply with codec order in SDP answer, only keep preferred codecs to force client to use it + if p.params.ClientInfo.ComplyWithCodecOrderInSDPAnswer() { + unmatchVideo.MediaName.Formats = append(unmatchVideo.MediaName.Formats, leftCodecs...) + } } } From fa01297d96624a6773336d61b8706778db65cb2f Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 25 Oct 2023 23:12:14 +0530 Subject: [PATCH 12/13] Slight sequencer tweaks. (#2184) The buffer is not for padding packets. So, calculate adjusted sequence numbers before comparing against size. Also, it is possible that invalidated slot is accessed due to not being able to exclude padding range. This was causing time stamp reset to 0. Will remove the error log after this goes out and the condition does not show up for a few days. --- pkg/sfu/buffer/rtpstats_receiver.go | 4 +- pkg/sfu/buffer/rtpstats_sender.go | 8 +- pkg/sfu/sequencer.go | 114 ++++++++++++++++++++-------- 3 files changed, 89 insertions(+), 37 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index a5d102c61..3869ebae8 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -154,7 +154,7 @@ func (r *RTPStatsReceiver) Update( return } } - if -gapSN >= cNumSequenceNumbers { + if -gapSN >= cNumSequenceNumbers/2 { r.logger.Warnw( "large sequence number gap negative", nil, "extStartSN", r.sequenceNumber.GetExtendedStart(), @@ -226,7 +226,7 @@ func (r *RTPStatsReceiver) Update( flowState.ExtSequenceNumber = resSN.ExtendedVal flowState.ExtTimestamp = resTS.ExtendedVal } else { // in-order - if gapSN >= cNumSequenceNumbers { + if gapSN >= cNumSequenceNumbers/2 { r.logger.Warnw( "large sequence number gap", nil, "extStartSN", r.sequenceNumber.GetExtendedStart(), diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 8efb6cef2..ff262c11c 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -286,7 +286,7 @@ func (r *RTPStatsSender) Update( // do not start on a padding only packet return } - if -gapSN >= cNumSequenceNumbers { + if -gapSN >= cNumSequenceNumbers/2 { r.logger.Warnw( "large sequence number gap negative", nil, "extStartSN", r.extStartSN, @@ -352,7 +352,7 @@ func (r *RTPStatsSender) Update( r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint8(hdrSize), uint16(payloadSize), marker, true) } } else { // in-order - if gapSN >= cNumSequenceNumbers { + if gapSN >= cNumSequenceNumbers/2 { r.logger.Warnw( "large sequence number gap", nil, "extStartSN", r.extStartSN, @@ -399,8 +399,12 @@ func (r *RTPStatsSender) Update( "invalid start timestamp", nil, "snBefore", r.extStartSN, "snAfter", extSequenceNumber, + "snHighest", r.extHighestSN, "tsBefore", r.extStartTS, "tsAfter", extTimestamp, + "tsHighest", r.extHighestTS, + "firstTime", r.firstTime.String(), + "startTime", r.startTime.String(), ) } if extTimestamp != 0 { diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index ee6ed8bdb..4e26a8735 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -84,6 +84,7 @@ type sequencer struct { size int startTime int64 initialized bool + extStartSN uint64 extHighestSN uint64 snOffset uint64 extHighestTS uint64 @@ -133,46 +134,61 @@ func (s *sequencer) push( if !s.initialized { s.initialized = true - s.extHighestSN = extModifiedSN - 1 + s.extStartSN = extModifiedSN + s.extHighestSN = extModifiedSN s.extHighestTS = extModifiedTS s.updateSNOffset() } - snOffset := s.snOffset - diff := int64(extModifiedSN - s.extHighestSN) - if diff >= 0 { - s.extHighestSN = extModifiedSN - } else { - if diff < -int64(s.size) { - s.logger.Warnw( - "old packet, cannot be sequenced", nil, - "extHighestSN", s.extHighestSN, - "extIncomingSN", extIncomingSN, - "extModifiedSN", extModifiedSN, - ) - return - } + if extModifiedSN < s.extStartSN { + // old packet, should not happen + return + } + extHighestSNAdjusted := s.extHighestSN - s.snOffset + extModifiedSNAdjusted := extModifiedSN - s.snOffset + if extModifiedSN < s.extHighestSN { if s.snRangeMap != nil { - var err error - snOffset, err = s.snRangeMap.GetValue(extModifiedSN) + snOffset, err := s.snRangeMap.GetValue(extModifiedSN) if err != nil { s.logger.Errorw( "could not get sequence number offset", err, + "extStartSN", s.extStartSN, "extHighestSN", s.extHighestSN, "extIncomingSN", extIncomingSN, "extModifiedSN", extModifiedSN, + "snOffset", s.snOffset, ) return } + + extModifiedSNAdjusted = extModifiedSN - snOffset + } + } + + if int64(extModifiedSNAdjusted-extHighestSNAdjusted) <= -int64(s.size) { + s.logger.Warnw( + "old packet, cannot be sequenced", nil, + "extHighestSN", s.extHighestSN, + "extIncomingSN", extIncomingSN, + "extModifiedSN", extModifiedSN, + ) + return + } + + // invalidate missing sequence numbers + if extModifiedSNAdjusted > extHighestSNAdjusted { + numInvalidated := 0 + for esn := extHighestSNAdjusted + 1; esn != extModifiedSNAdjusted; esn++ { + s.invalidateSlot(int(esn % uint64(s.size))) + numInvalidated++ + if numInvalidated >= s.size { + break + } } } - if int64(extModifiedTS-s.extHighestTS) >= 0 { - s.extHighestTS = extModifiedTS - } - - slot := (extModifiedSN - snOffset) % uint64(s.size) + slot := extModifiedSNAdjusted % uint64(s.size) s.meta[slot] = packetMeta{ sourceSeqNo: uint16(extIncomingSN), targetSeqNo: uint16(extModifiedSN), @@ -183,6 +199,13 @@ func (s *sequencer) push( ddBytes: append([]byte{}, ddBytes...), lastNack: s.getRefTime(packetTime), // delay retransmissions after the original transmission } + + if extModifiedSN > s.extHighestSN { + s.extHighestSN = extModifiedSN + } + if extModifiedTS > s.extHighestTS { + s.extHighestTS = extModifiedTS + } } func (s *sequencer) pushPadding(extStartSNInclusive uint64, extEndSNInclusive uint64) { @@ -223,10 +246,7 @@ func (s *sequencer) pushPadding(extStartSNInclusive uint64, extEndSNInclusive ui } slot := (sn - snOffset) % uint64(s.size) - s.meta[slot] = packetMeta{ - sourceSeqNo: 0, - targetSeqNo: 0, - } + s.invalidateSlot(int(slot)) } return } @@ -244,6 +264,10 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { s.Lock() defer s.Unlock() + if !s.initialized { + return nil + } + snOffset := uint64(0) var err error extPacketMetas := make([]extPacketMeta, 0, len(seqNo)) @@ -263,11 +287,6 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { extSN -= (1 << 16) } - if s.extHighestSN-extSN >= uint64(s.size) { - // too old - continue - } - if s.snRangeMap != nil { snOffset, err = s.snRangeMap.GetValue(extSN) if err != nil { @@ -276,9 +295,17 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { } } - slot := (extSN - snOffset) % uint64(s.size) + extSNAdjusted := extSN - snOffset + extHighestSNAdjusted := s.extHighestSN - s.snOffset + if extHighestSNAdjusted-extSNAdjusted >= uint64(s.size) { + // too old + continue + } + + slot := extSNAdjusted % uint64(s.size) meta := &s.meta[slot] - if meta.targetSeqNo != sn { + if meta.targetSeqNo != sn || s.isInvalidSlot(int(slot)) { + // invalid slot access could happen if padding packets exclusion range could not be recorded continue } @@ -320,3 +347,24 @@ func (s *sequencer) updateSNOffset() { } s.snOffset = snOffset } + +func (s *sequencer) invalidateSlot(slot int) { + if slot >= len(s.meta) { + return + } + + s.meta[slot] = packetMeta{ + sourceSeqNo: 0, + targetSeqNo: 0, + lastNack: 0, + } +} + +func (s *sequencer) isInvalidSlot(slot int) bool { + if slot >= len(s.meta) { + return true + } + + meta := &s.meta[slot] + return meta.sourceSeqNo == 0 && meta.targetSeqNo == 0 && meta.lastNack == 0 +} From d8e4933dd1a9e8907607679ccd473f781b019dbd Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 25 Oct 2023 23:27:43 +0530 Subject: [PATCH 13/13] Reference time stamp for SVC. (#2185) SVC has only one stream and when calculating reference time stamp, irrespective of reference layer, reference time stamp will be the same as the given time stamp as there is only one stream and no offset. TODO: Need better all around SVC handling. --- pkg/sfu/streamtrackermanager.go | 6 ++++++ pkg/sfu/videolayerselector/dependencydescriptor.go | 5 +++-- pkg/sfu/videolayerselector/framechain.go | 4 ++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 9aa82f91e..0aba20ff9 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -651,6 +651,12 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in return 0, fmt.Errorf("invalid layer, target: %d, reference: %d", layer, referenceLayer) } + // SVC-TODO: better SVC detection + if s.isSVC { + // there is only one stream in SVC + return ts, nil + } + if layer != referenceLayer && s.layerOffsets[referenceLayer][layer] == 0 { return 0, fmt.Errorf("offset unavailable, target: %d, reference: %d", layer, referenceLayer) } diff --git a/pkg/sfu/videolayerselector/dependencydescriptor.go b/pkg/sfu/videolayerselector/dependencydescriptor.go index 1306d94b6..afff336c3 100644 --- a/pkg/sfu/videolayerselector/dependencydescriptor.go +++ b/pkg/sfu/videolayerselector/dependencydescriptor.go @@ -144,7 +144,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r return } - // // DD-TODO : if bandwidth in congest, could drop the 'Discardable' frame + // DD-TODO : if bandwidth in congest, could drop the 'Discardable' frame if dti == dede.DecodeTargetNotPresent { // d.logger.Debugw(fmt.Sprintf("drop packet for decode target not present, highestDecodeTarget %d, incoming %v, fn: %d/%d", // highestDecodeTarget, @@ -188,6 +188,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r "req", d.requestSpatial, "maxSeen", d.maxSeenLayer, "feed", extPkt.Packet.SSRC, + "frame", extFrameNum, ) } @@ -196,7 +197,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r d.previousActiveDecodeTargetsBitmask = d.activeDecodeTargetsBitmask d.activeDecodeTargetsBitmask = buffer.GetActiveDecodeTargetBitmask(d.currentLayer, ddwdt.DecodeTargets) - d.logger.Debugw("switch to target", "highest", highestDecodeTarget.Layer, "current", d.currentLayer, "bitmask", *d.activeDecodeTargetsBitmask) + d.logger.Debugw("switch to target", "highest", highestDecodeTarget.Layer, "current", d.currentLayer, "bitmask", *d.activeDecodeTargetsBitmask, "frame", extFrameNum) } ddExtension := &dede.DependencyDescriptorExtension{ diff --git a/pkg/sfu/videolayerselector/framechain.go b/pkg/sfu/videolayerselector/framechain.go index 5edddfad9..613f2d264 100644 --- a/pkg/sfu/videolayerselector/framechain.go +++ b/pkg/sfu/videolayerselector/framechain.go @@ -49,7 +49,7 @@ func (fc *FrameChain) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate if fd.ChainDiffs[fc.chainIdx] == 0 { if fc.broken { fc.broken = false - fc.logger.Debugw("frame chain intact", "chanIdx", fc.chainIdx) + fc.logger.Debugw("frame chain intact", "chanIdx", fc.chainIdx, "frame", extFrameNum) } fc.expectFrames = fc.expectFrames[:0] return true @@ -62,7 +62,7 @@ func (fc *FrameChain) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate prevFrameInChain := extFrameNum - uint64(fd.ChainDiffs[fc.chainIdx]) sd, err := fc.decisions.GetDecision(prevFrameInChain) if err != nil { - fc.logger.Debugw("could not get decision", "err", err, "frame", extFrameNum, "prevFrame", prevFrameInChain) + fc.logger.Debugw("could not get decision", "err", err, "chanIdx", fc.chainIdx, "frame", extFrameNum, "prevFrame", prevFrameInChain) } var intact bool