From c93df27329af22eef21e55be41132816bfbab08b Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 29 Sep 2022 12:41:52 +0530 Subject: [PATCH] Accumulate spatial layers also for SVC codecs (#1053) * Accumulate spatial layers also for SVC codecs * Remove TODO * include temporal layer 0 --- pkg/rtc/wrappedreceiver.go | 4 ++-- pkg/sfu/downtrack.go | 12 ++++++------ pkg/sfu/receiver.go | 18 +++++++++--------- pkg/sfu/streamtrackermanager.go | 19 ++++++++++++++++--- 4 files changed, 33 insertions(+), 20 deletions(-) diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 4aa49b64d..3ea4f6c59 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -159,9 +159,9 @@ func (d *DummyReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) return 0, errors.New("no receiver") } -func (d *DummyReceiver) GetBitrateTemporalCumulative() sfu.Bitrates { +func (d *DummyReceiver) GetLayeredBitrate() sfu.Bitrates { if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { - return r.GetBitrateTemporalCumulative() + return r.GetLayeredBitrate() } return sfu.Bitrates{} } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index d04b33457..cca7258fa 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -834,7 +834,7 @@ func (d *DownTrack) IsDeficient() bool { } func (d *DownTrack) BandwidthRequested() int64 { - return d.forwarder.BandwidthRequested(d.receiver.GetBitrateTemporalCumulative()) + return d.forwarder.BandwidthRequested(d.receiver.GetLayeredBitrate()) } func (d *DownTrack) DistanceToDesired() int32 { @@ -842,13 +842,13 @@ func (d *DownTrack) DistanceToDesired() int32 { } func (d *DownTrack) AllocateOptimal(allowOvershoot bool) VideoAllocation { - allocation := d.forwarder.AllocateOptimal(d.receiver.GetBitrateTemporalCumulative(), allowOvershoot) + allocation := d.forwarder.AllocateOptimal(d.receiver.GetLayeredBitrate(), allowOvershoot) d.maybeStartKeyFrameRequester() return allocation } func (d *DownTrack) ProvisionalAllocatePrepare() { - d.forwarder.ProvisionalAllocatePrepare(d.receiver.GetBitrateTemporalCumulative()) + d.forwarder.ProvisionalAllocatePrepare(d.receiver.GetLayeredBitrate()) } func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool, allowOvershoot bool) int64 { @@ -874,19 +874,19 @@ func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation { } func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (VideoAllocation, bool) { - allocation, available := d.forwarder.AllocateNextHigher(availableChannelCapacity, d.receiver.GetBitrateTemporalCumulative(), allowOvershoot) + allocation, available := d.forwarder.AllocateNextHigher(availableChannelCapacity, d.receiver.GetLayeredBitrate(), allowOvershoot) d.maybeStartKeyFrameRequester() return allocation, available } func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransition, bool) { - transition, available := d.forwarder.GetNextHigherTransition(d.receiver.GetBitrateTemporalCumulative(), allowOvershoot) + transition, available := d.forwarder.GetNextHigherTransition(d.receiver.GetLayeredBitrate(), allowOvershoot) d.logger.Debugw("stream: get next higher layer", "transition", transition, "available", available) return transition, available } func (d *DownTrack) Pause() VideoAllocation { - allocation := d.forwarder.Pause(d.receiver.GetBitrateTemporalCumulative()) + allocation := d.forwarder.Pause(d.receiver.GetLayeredBitrate()) d.maybeStartKeyFrameRequester() return allocation } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index dcb823017..4b987126f 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -38,7 +38,7 @@ type TrackReceiver interface { HeaderExtensions() []webrtc.RTPHeaderExtensionParameter ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) - GetBitrateTemporalCumulative() Bitrates + GetLayeredBitrate() Bitrates GetAudioLevel() (float64, bool) @@ -177,14 +177,14 @@ func NewWebRTCReceiver( codec: track.Codec(), kind: track.Kind(), // LK-TODO: this should be based on VideoLayers protocol message rather than RID based - isSimulcast: len(track.RID()) > 0, - twcc: twcc, - streamTrackerManager: NewStreamTrackerManager(logger, trackInfo), - trackInfo: trackInfo, - isSVC: IsSvcCodec(track.Codec().MimeType), - isRED: IsRedCodec(track.Codec().MimeType), + isSimulcast: len(track.RID()) > 0, + twcc: twcc, + trackInfo: trackInfo, + isSVC: IsSvcCodec(track.Codec().MimeType), + isRED: IsRedCodec(track.Codec().MimeType), } + w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC) w.streamTrackerManager.OnMaxLayerChanged(w.onMaxLayerChange) w.streamTrackerManager.OnAvailableLayersChanged(w.downTrackLayerChange) w.streamTrackerManager.OnBitrateAvailabilityChanged(w.downTrackBitrateAvailabilityChange) @@ -382,8 +382,8 @@ func (w *WebRTCReceiver) downTrackBitrateAvailabilityChange() { } } -func (w *WebRTCReceiver) GetBitrateTemporalCumulative() Bitrates { - return w.streamTrackerManager.GetBitrateTemporalCumulative() +func (w *WebRTCReceiver) GetLayeredBitrate() Bitrates { + return w.streamTrackerManager.GetLayeredBitrate() } // OnCloseHandler method to be called on remote tracked removed diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 8b703ef49..476c7aa80 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -65,6 +65,7 @@ var ( type StreamTrackerManager struct { logger logger.Logger trackInfo *livekit.TrackInfo + isSVC bool maxPublishedLayer int32 lock sync.RWMutex @@ -81,10 +82,11 @@ type StreamTrackerManager struct { onMaxLayerChanged func(maxLayer int32) } -func NewStreamTrackerManager(logger logger.Logger, trackInfo *livekit.TrackInfo) *StreamTrackerManager { +func NewStreamTrackerManager(logger logger.Logger, trackInfo *livekit.TrackInfo, isSVC bool) *StreamTrackerManager { s := &StreamTrackerManager{ logger: logger, trackInfo: trackInfo, + isSVC: isSVC, maxPublishedLayer: 0, } @@ -293,11 +295,10 @@ func (s *StreamTrackerManager) GetMaxExpectedLayer() int32 { return maxExpectedLayer } -func (s *StreamTrackerManager) GetBitrateTemporalCumulative() Bitrates { +func (s *StreamTrackerManager) GetLayeredBitrate() Bitrates { s.lock.RLock() defer s.lock.RUnlock() - // LK-TODO: For SVC tracks, need to accumulate across spatial layers also var br Bitrates for i, tracker := range s.trackers { @@ -313,6 +314,18 @@ func (s *StreamTrackerManager) GetBitrateTemporalCumulative() Bitrates { } } + if s.isSVC { + for i := len(br) - 1; i >= 1; i-- { + for j := len(br[i]) - 1; j >= 0; j-- { + if br[i][j] != 0 { + for k := i - 1; k >= 0; k-- { + br[i][j] += br[k][j] + } + } + } + } + } + return br }