From c91889edfdf9fd233bc62c0c1e4c03ecc99d7bcd Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Mon, 12 Jun 2023 15:07:47 +0800 Subject: [PATCH] Add dependency descriptor stream tracker for svc codecs (#1788) * Add dependency descriptor stream tracker for svc codecs * Solve comments --- pkg/rtc/mediatrackreceiver.go | 16 +- pkg/sfu/buffer/buffer.go | 14 +- pkg/sfu/receiver.go | 9 + pkg/sfu/streamtracker/interfaces.go | 14 + pkg/sfu/streamtracker/streamtracker.go | 2 + pkg/sfu/streamtracker/streamtracker_dd.go | 272 ++++++++++++++++++ .../streamtracker/streamtracker_dd_test.go | 83 ++++++ .../streamtracker_packet_test.go | 24 +- pkg/sfu/streamtrackermanager.go | 75 +++-- 9 files changed, 465 insertions(+), 44 deletions(-) create mode 100644 pkg/sfu/streamtracker/streamtracker_dd.go create mode 100644 pkg/sfu/streamtracker/streamtracker_dd_test.go diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 0ae04a970..6a785ec9e 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -19,6 +19,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" "github.com/livekit/livekit-server/pkg/telemetry" ) @@ -206,6 +207,15 @@ func (t *MediaTrackReceiver) SetupReceiver(receiver sfu.TrackReceiver, priority } func (t *MediaTrackReceiver) SetPotentialCodecs(codecs []webrtc.RTPCodecParameters, headers []webrtc.RTPHeaderExtensionParameter) { + // The potential codecs have not published yet, so we can't get the actual Extensions, the client/browser uses same extensions + // for all video codecs so we assume they will have same extensions as the primary codec except for the dependency descriptor + // that is munged in svc codec. + headersWithoutDD := make([]webrtc.RTPHeaderExtensionParameter, 0, len(headers)) + for _, h := range headers { + if h.URI != dependencydescriptor.ExtensionUrl { + headersWithoutDD = append(headersWithoutDD, h) + } + } t.lock.Lock() t.potentialCodecs = codecs for i, c := range codecs { @@ -217,8 +227,12 @@ func (t *MediaTrackReceiver) SetPotentialCodecs(codecs []webrtc.RTPCodecParamete } } if !exist { + extHeaders := headers + if !sfu.IsSvcCodec(c.MimeType) { + extHeaders = headersWithoutDD + } t.receivers = append(t.receivers, &simulcastReceiver{ - TrackReceiver: NewDummyReceiver(livekit.TrackID(t.trackInfo.Sid), string(t.PublisherID()), c, headers), + TrackReceiver: NewDummyReceiver(livekit.TrackID(t.trackInfo.Sid), string(t.PublisherID()), c, extHeaders), priority: i, }) } diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 4eabb04a1..0234be9f8 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -94,9 +94,8 @@ type Buffer struct { logger logger.Logger // dependency descriptor - ddExt uint8 - ddParser *DependencyDescriptorParser - maxLayerChangedCB func(int32, int32) + ddExt uint8 + ddParser *DependencyDescriptorParser paused bool frameRateCalculator [DefaultMaxLayerSpatial + 1]FrameRateCalculator @@ -175,9 +174,6 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili b.frameRateCalculator[i] = frc.GetFrameRateCalculatorForSpatial(int32(i)) } b.ddParser = NewDependencyDescriptorParser(b.ddExt, b.logger, func(spatial, temporal int32) { - if b.maxLayerChangedCB != nil { - b.maxLayerChangedCB(spatial, temporal) - } frc.SetMaxLayer(spatial, temporal) }) @@ -779,12 +775,6 @@ func (b *Buffer) GetAudioLevel() (float64, bool) { return b.audioLevel.GetLevel() } -// DD-TODO : now we rely on stream tracker for layer change, dependency still -// work for that too. Do we keep it unchanged or use both methods? -func (b *Buffer) OnMaxLayerChanged(fn func(int32, int32)) { - b.maxLayerChangedCB = fn -} - func (b *Buffer) OnFpsChanged(f func()) { b.Lock() b.onFpsChanged = f diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 07c072501..80ef3f007 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -20,6 +20,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/audio" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" + dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" ) var ( @@ -216,6 +217,13 @@ func NewWebRTCReceiver( }) w.connectionStats.Start(w.trackInfo, time.Now()) + for _, ext := range receiver.GetParameters().HeaderExtensions { + if ext.URI == dd.ExtensionUrl { + w.streamTrackerManager.AddDependencyDescriptorTrackers() + break + } + } + return w } @@ -644,6 +652,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { len(pkt.Packet.Payload), pkt.Packet.Marker, pkt.Packet.Timestamp, + pkt.DependencyDescriptor, ) } diff --git a/pkg/sfu/streamtracker/interfaces.go b/pkg/sfu/streamtracker/interfaces.go index 3837f8470..934032f68 100644 --- a/pkg/sfu/streamtracker/interfaces.go +++ b/pkg/sfu/streamtracker/interfaces.go @@ -3,6 +3,8 @@ package streamtracker import ( "fmt" "time" + + "github.com/livekit/livekit-server/pkg/sfu/buffer" ) // ------------------------------------------------------------ @@ -40,3 +42,15 @@ type StreamTrackerImpl interface { Observe(hasMarker bool, ts uint32) StreamStatusChange CheckStatus() StreamStatusChange } + +type StreamTrackerWorker interface { + Start() + Stop() + Reset() + OnStatusChanged(f func(status StreamStatus)) + OnBitrateAvailable(f func()) + Status() StreamStatus + BitrateTemporalCumulative() []int64 + SetPaused(paused bool) + Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, dd *buffer.DependencyDescriptorWithDecodeTarget) +} diff --git a/pkg/sfu/streamtracker/streamtracker.go b/pkg/sfu/streamtracker/streamtracker.go index b1d16dae6..a1b645fb2 100644 --- a/pkg/sfu/streamtracker/streamtracker.go +++ b/pkg/sfu/streamtracker/streamtracker.go @@ -7,6 +7,7 @@ import ( "go.uber.org/atomic" + "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/protocol/logger" ) @@ -175,6 +176,7 @@ func (s *StreamTracker) Observe( payloadSize int, hasMarker bool, ts uint32, + _ *buffer.DependencyDescriptorWithDecodeTarget, ) { s.lock.Lock() diff --git a/pkg/sfu/streamtracker/streamtracker_dd.go b/pkg/sfu/streamtracker/streamtracker_dd.go new file mode 100644 index 000000000..b6daee387 --- /dev/null +++ b/pkg/sfu/streamtracker/streamtracker_dd.go @@ -0,0 +1,272 @@ +package streamtracker + +import ( + "sync" + "time" + + "go.uber.org/atomic" + + "github.com/livekit/livekit-server/pkg/sfu/buffer" + dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" +) + +type StreamTrackerDependencyDescriptor struct { + lock sync.RWMutex + paused bool + generation atomic.Uint32 + params StreamTrackerParams + maxSpatialLayer int32 + maxTemporalLayer int32 + + onStatusChanged [buffer.DefaultMaxLayerSpatial + 1]func(status StreamStatus) + onBitrateAvailable [buffer.DefaultMaxLayerSpatial + 1]func() + + lastBitrateReport time.Time + bytesForBitrate [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerTemporal + 1]int64 + bitrate [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerTemporal + 1]int64 + + isStopped bool +} + +func NewStreamTrackerDependencyDescriptor(params StreamTrackerParams) *StreamTrackerDependencyDescriptor { + return &StreamTrackerDependencyDescriptor{ + params: params, + maxSpatialLayer: buffer.InvalidLayerSpatial, + maxTemporalLayer: buffer.InvalidLayerTemporal, + } +} +func (s *StreamTrackerDependencyDescriptor) Start() { +} + +func (s *StreamTrackerDependencyDescriptor) Stop() { + s.lock.Lock() + defer s.lock.Unlock() + + if s.isStopped { + return + } + s.isStopped = true + + // bump generation to trigger exit of worker + s.generation.Inc() +} + +func (s *StreamTrackerDependencyDescriptor) OnStatusChanged(layer int32, f func(status StreamStatus)) { + s.lock.Lock() + s.onStatusChanged[layer] = f + s.lock.Unlock() +} + +func (s *StreamTrackerDependencyDescriptor) OnBitrateAvailable(layer int32, f func()) { + s.lock.Lock() + s.onBitrateAvailable[layer] = f + s.lock.Unlock() +} + +func (s *StreamTrackerDependencyDescriptor) Status(layer int32) StreamStatus { + s.lock.RLock() + defer s.lock.RUnlock() + + if layer > s.maxSpatialLayer { + return StreamStatusStopped + } + + return StreamStatusActive +} + +func (s *StreamTrackerDependencyDescriptor) BitrateTemporalCumulative(layer int32) []int64 { + s.lock.RLock() + defer s.lock.RUnlock() + + if layer > s.maxSpatialLayer { + brs := make([]int64, len(s.bitrate[0])) + return brs + } + + return s.bitrate[layer][:] +} + +func (s *StreamTrackerDependencyDescriptor) Reset() { +} + +func (s *StreamTrackerDependencyDescriptor) resetLocked() { + // bump generation to trigger exit of current worker + s.generation.Inc() + + for i := 0; i < len(s.bytesForBitrate); i++ { + for j := 0; j < len(s.bytesForBitrate[i]); j++ { + s.bytesForBitrate[i][j] = 0 + } + } + for i := 0; i < len(s.bitrate); i++ { + for j := 0; j < len(s.bitrate[i]); j++ { + s.bitrate[i][j] = 0 + } + } +} + +func (s *StreamTrackerDependencyDescriptor) SetPaused(paused bool) { + s.lock.Lock() + if s.paused == paused { + s.lock.Unlock() + return + } + s.paused = paused + if !paused { + s.resetLocked() + } else { + s.lastBitrateReport = time.Now() + go s.worker(s.generation.Inc()) + + } + s.lock.Unlock() + +} + +func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, ddVal *buffer.DependencyDescriptorWithDecodeTarget) { + s.lock.Lock() + + if s.isStopped || s.paused || payloadSize == 0 || ddVal == nil { + s.lock.Unlock() + return + } + + var notifyFns []func(status StreamStatus) + var notifyStatus StreamStatus + if mask := ddVal.Descriptor.ActiveDecodeTargetsBitmask; mask != nil { + var maxSpatial, maxTemporal int32 + for _, dt := range ddVal.DecodeTargets { + if *mask&(1< buffer.DefaultMaxLayerSpatial { + maxSpatial = buffer.DefaultMaxLayerSpatial + s.params.Logger.Warnw("max spatial layer exceeded", nil, "maxSpatial", maxSpatial) + } + if maxTemporal > buffer.DefaultMaxLayerTemporal { + maxTemporal = buffer.DefaultMaxLayerTemporal + s.params.Logger.Warnw("max temporal layer exceeded", nil, "maxTemporal", maxTemporal) + } + + s.params.Logger.Debugw("max layer changed", "maxSpatial", maxSpatial, "maxTemporal", maxTemporal) + oldMaxSpatial := s.maxSpatialLayer + s.maxSpatialLayer, s.maxTemporalLayer = maxSpatial, maxTemporal + if oldMaxSpatial == -1 { + s.lastBitrateReport = time.Now() + go s.worker(s.generation.Inc()) + } + + if oldMaxSpatial > s.maxSpatialLayer { + notifyStatus = StreamStatusStopped + for i := s.maxSpatialLayer + 1; i <= oldMaxSpatial; i++ { + notifyFns = append(notifyFns, s.onStatusChanged[i]) + } + } else if oldMaxSpatial < s.maxSpatialLayer { + notifyStatus = StreamStatusActive + for i := oldMaxSpatial + 1; i <= s.maxSpatialLayer; i++ { + notifyFns = append(notifyFns, s.onStatusChanged[i]) + } + } + + } + + dtis := ddVal.Descriptor.FrameDependencies.DecodeTargetIndications + + for _, dt := range ddVal.DecodeTargets { + // we are not dropping discardable frames now, so only ingore not present frames + if dtis[dt.Target] == dd.DecodeTargetNotPresent { + continue + } + + s.bytesForBitrate[dt.Layer.Spatial][dt.Layer.Temporal] += int64(pktSize) + } + + s.lock.Unlock() + + for _, fn := range notifyFns { + if fn != nil { + fn(notifyStatus) + } + } +} + +func (s *StreamTrackerDependencyDescriptor) worker(generation uint32) { + tickerBitrate := time.NewTicker(s.params.BitrateReportInterval) + defer tickerBitrate.Stop() + + for { + <-tickerBitrate.C + if generation != s.generation.Load() { + return + } + s.bitrateReport() + } +} + +func (s *StreamTrackerDependencyDescriptor) bitrateReport() { + // run this even if paused to drain out bitrate if there are no packets coming in + s.lock.Lock() + now := time.Now() + diff := now.Sub(s.lastBitrateReport) + s.lastBitrateReport = now + + var availableChangedFns []func() + for spatial := 0; spatial < len(s.bytesForBitrate); spatial++ { + bytesForBitrate := s.bytesForBitrate[spatial][:] + bitrateAvailabilityChanged := false + bitrates := s.bitrate[spatial][:] + for i := 0; i < len(bytesForBitrate); i++ { + bitrate := int64(float64(bytesForBitrate[i]*8) / diff.Seconds()) + if (bitrates[i] == 0 && bitrate > 0) || (bitrates[i] > 0 && bitrate == 0) { + bitrateAvailabilityChanged = true + } + bitrates[i] = bitrate + bytesForBitrate[i] = 0 + } + + if bitrateAvailabilityChanged && s.onBitrateAvailable[spatial] != nil { + availableChangedFns = append(availableChangedFns, s.onBitrateAvailable[spatial]) + } + } + s.lock.Unlock() + + for _, fn := range availableChangedFns { + fn() + } +} + +func (s *StreamTrackerDependencyDescriptor) LayeredTracker(layer int32) *StreamTrackerDependencyDescriptorLayered { + return &StreamTrackerDependencyDescriptorLayered{ + StreamTrackerDependencyDescriptor: s, + layer: layer, + } +} + +// ---------------------------- +// Layered wrapper for StreamTrackerWorker +type StreamTrackerDependencyDescriptorLayered struct { + *StreamTrackerDependencyDescriptor + layer int32 +} + +func (s *StreamTrackerDependencyDescriptorLayered) OnStatusChanged(f func(status StreamStatus)) { + s.StreamTrackerDependencyDescriptor.OnStatusChanged(s.layer, f) +} + +func (s *StreamTrackerDependencyDescriptorLayered) OnBitrateAvailable(f func()) { + s.StreamTrackerDependencyDescriptor.OnBitrateAvailable(s.layer, f) +} + +func (s *StreamTrackerDependencyDescriptorLayered) Status() StreamStatus { + return s.StreamTrackerDependencyDescriptor.Status(s.layer) +} + +func (s *StreamTrackerDependencyDescriptorLayered) BitrateTemporalCumulative() []int64 { + return s.StreamTrackerDependencyDescriptor.BitrateTemporalCumulative(s.layer) +} diff --git a/pkg/sfu/streamtracker/streamtracker_dd_test.go b/pkg/sfu/streamtracker/streamtracker_dd_test.go new file mode 100644 index 000000000..6fdb6916d --- /dev/null +++ b/pkg/sfu/streamtracker/streamtracker_dd_test.go @@ -0,0 +1,83 @@ +package streamtracker + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/livekit/livekit-server/pkg/sfu/buffer" + dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + "github.com/livekit/protocol/logger" +) + +func createDescriptorDependencyForTargets(maxSpatial, maxTemporal int) *buffer.DependencyDescriptorWithDecodeTarget { + var targets []buffer.DependencyDescriptorDecodeTarget + var mask uint32 + for i := 0; i <= maxSpatial; i++ { + for j := 0; j <= maxTemporal; j++ { + targets = append(targets, buffer.DependencyDescriptorDecodeTarget{Target: len(targets), Layer: buffer.VideoLayer{Spatial: int32(i), Temporal: int32(j)}}) + mask |= 1 << uint32(len(targets)-1) + } + } + + dtis := make([]dd.DecodeTargetIndication, len(targets)) + for _, t := range targets { + dtis[t.Target] = dd.DecodeTargetRequired + } + + return &buffer.DependencyDescriptorWithDecodeTarget{ + Descriptor: &dd.DependencyDescriptor{ + ActiveDecodeTargetsBitmask: &mask, + FrameDependencies: &dd.FrameDependencyTemplate{ + DecodeTargetIndications: dtis, + }, + }, + DecodeTargets: targets, + } +} + +func checkStatues(t *testing.T, statuses []StreamStatus, expected StreamStatus, maxSpatial int) { + for i := 0; i <= maxSpatial; i++ { + require.Equal(t, expected, statuses[i]) + } + + for i := maxSpatial + 1; i < len(statuses); i++ { + require.NotEqual(t, expected, statuses[i]) + } +} + +func TestStreamTrackerDD(t *testing.T) { + ddTracker := NewStreamTrackerDependencyDescriptor(StreamTrackerParams{ + BitrateReportInterval: 1 * time.Second, + Logger: logger.GetLogger(), + }) + layeredTrackers := make([]StreamTrackerWorker, buffer.DefaultMaxLayerSpatial+1) + statuses := make([]StreamStatus, buffer.DefaultMaxLayerSpatial+1) + for i := 0; i <= int(buffer.DefaultMaxLayerSpatial); i++ { + layeredTrack := ddTracker.LayeredTracker(int32(i)) + layer := i + layeredTrack.OnStatusChanged(func(status StreamStatus) { + statuses[layer] = status + }) + layeredTrack.Start() + layeredTrackers[i] = layeredTrack + } + defer ddTracker.Stop() + + // no active layers + ddTracker.Observe(0, 1000, 1000, false, 0, nil) + checkStatues(t, statuses, StreamStatusActive, int(buffer.InvalidLayerSpatial)) + + // layer seen [0,1] + ddTracker.Observe(0, 1000, 1000, false, 0, createDescriptorDependencyForTargets(1, 1)) + checkStatues(t, statuses, StreamStatusActive, 1) + + // layer seen [0,1,2] + ddTracker.Observe(0, 1000, 1000, false, 0, createDescriptorDependencyForTargets(2, 1)) + checkStatues(t, statuses, StreamStatusActive, 2) + + // layer 2 gone, layer seen [0,1] + ddTracker.Observe(0, 1000, 1000, false, 0, createDescriptorDependencyForTargets(1, 1)) + checkStatues(t, statuses, StreamStatusActive, 1) +} diff --git a/pkg/sfu/streamtracker/streamtracker_packet_test.go b/pkg/sfu/streamtracker/streamtracker_packet_test.go index 2aee13ecb..a7beb2d3c 100644 --- a/pkg/sfu/streamtracker/streamtracker_packet_test.go +++ b/pkg/sfu/streamtracker/streamtracker_packet_test.go @@ -42,7 +42,7 @@ func TestStreamTracker(t *testing.T) { require.Equal(t, StreamStatusStopped, tracker.Status()) // observe first packet - tracker.Observe(0, 20, 10, false, 0) + tracker.Observe(0, 20, 10, false, 0, nil) testutils.WithTimeout(t, func() string { if callbackCalled.Load() { @@ -73,7 +73,7 @@ func TestStreamTracker(t *testing.T) { callbackStatusMu.Unlock() }) - tracker.Observe(0, 20, 10, false, 0) + tracker.Observe(0, 20, 10, false, 0, nil) testutils.WithTimeout(t, func() string { callbackStatusMu.RLock() defer callbackStatusMu.RUnlock() @@ -110,7 +110,7 @@ func TestStreamTracker(t *testing.T) { tracker.Start() require.Equal(t, StreamStatusStopped, tracker.Status()) - tracker.Observe(0, 20, 10, false, 0) + tracker.Observe(0, 20, 10, false, 0, nil) testutils.WithTimeout(t, func() string { if tracker.Status() == StreamStatusActive { return "" @@ -121,11 +121,11 @@ func TestStreamTracker(t *testing.T) { tracker.setStatusLocked(StreamStatusStopped) - tracker.Observe(0, 20, 10, false, 0) + tracker.Observe(0, 20, 10, false, 0, nil) tracker.updateStatus() require.Equal(t, StreamStatusStopped, tracker.Status()) - tracker.Observe(0, 20, 10, false, 0) + tracker.Observe(0, 20, 10, false, 0, nil) tracker.updateStatus() require.Equal(t, StreamStatusActive, tracker.Status()) @@ -135,7 +135,7 @@ func TestStreamTracker(t *testing.T) { t.Run("changes to inactive when paused", func(t *testing.T) { tracker := newStreamTrackerPacket(5, 60, 500*time.Millisecond) tracker.Start() - tracker.Observe(0, 20, 10, false, 0) + tracker.Observe(0, 20, 10, false, 0, nil) testutils.WithTimeout(t, func() string { if tracker.Status() == StreamStatusActive { return "" @@ -161,7 +161,7 @@ func TestStreamTracker(t *testing.T) { require.Equal(t, StreamStatusStopped, tracker.Status()) // observe first packet - tracker.Observe(0, 20, 10, false, 0) + tracker.Observe(0, 20, 10, false, 0, nil) testutils.WithTimeout(t, func() string { if callbackCalled.Load() == 1 { @@ -175,10 +175,10 @@ func TestStreamTracker(t *testing.T) { require.Equal(t, uint32(1), callbackCalled.Load()) // observe a few more - tracker.Observe(0, 20, 10, false, 0) - tracker.Observe(0, 20, 10, false, 0) - tracker.Observe(0, 20, 10, false, 0) - tracker.Observe(0, 20, 10, false, 0) + tracker.Observe(0, 20, 10, false, 0, nil) + tracker.Observe(0, 20, 10, false, 0, nil) + tracker.Observe(0, 20, 10, false, 0, nil) + tracker.Observe(0, 20, 10, false, 0, nil) tracker.updateStatus() // should still be active @@ -191,7 +191,7 @@ func TestStreamTracker(t *testing.T) { require.Equal(t, uint32(2), callbackCalled.Load()) // first packet after reset - tracker.Observe(0, 20, 10, false, 0) + tracker.Observe(0, 20, 10, false, 0, nil) testutils.WithTimeout(t, func() string { if callbackCalled.Load() == 3 { diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index cdb42f4ab..8c0784d40 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -41,7 +41,8 @@ type StreamTrackerManager struct { maxPublishedLayer int32 maxTemporalLayerSeen int32 - trackers [buffer.DefaultMaxLayerSpatial + 1]*streamtracker.StreamTracker + ddTracker *streamtracker.StreamTrackerDependencyDescriptor + trackers [buffer.DefaultMaxLayerSpatial + 1]streamtracker.StreamTrackerWorker availableLayers []int32 maxExpectedLayer int32 @@ -133,28 +134,58 @@ func (s *StreamTrackerManager) createStreamTrackerFrame(layer int32) streamtrack return streamtracker.NewStreamTrackerFrame(params) } -func (s *StreamTrackerManager) AddTracker(layer int32) *streamtracker.StreamTracker { +func (s *StreamTrackerManager) AddDependencyDescriptorTrackers() { + bitrateInterval, ok := s.trackerConfig.BitrateReportInterval[0] + if !ok { + return + } + s.lock.Lock() + var addAllTrackers bool + if s.ddTracker == nil { + s.ddTracker = streamtracker.NewStreamTrackerDependencyDescriptor(streamtracker.StreamTrackerParams{ + BitrateReportInterval: bitrateInterval, + Logger: s.logger.WithValues("layer", 0), + }) + addAllTrackers = true + } + s.lock.Unlock() + if addAllTrackers { + for i := 0; i <= int(buffer.DefaultMaxLayerSpatial); i++ { + s.AddTracker(int32(i)) + } + } +} + +func (s *StreamTrackerManager) AddTracker(layer int32) streamtracker.StreamTrackerWorker { bitrateInterval, ok := s.trackerConfig.BitrateReportInterval[layer] if !ok { return nil } - var trackerImpl streamtracker.StreamTrackerImpl - switch s.trackerConfig.StreamTrackerType { - case config.StreamTrackerTypePacket: - trackerImpl = s.createStreamTrackerPacket(layer) - case config.StreamTrackerTypeFrame: - trackerImpl = s.createStreamTrackerFrame(layer) - } - if trackerImpl == nil { - return nil + var tracker streamtracker.StreamTrackerWorker + s.lock.Lock() + if s.ddTracker != nil { + tracker = s.ddTracker.LayeredTracker(layer) } + s.lock.Unlock() + if tracker == nil { + var trackerImpl streamtracker.StreamTrackerImpl + switch s.trackerConfig.StreamTrackerType { + case config.StreamTrackerTypePacket: + trackerImpl = s.createStreamTrackerPacket(layer) + case config.StreamTrackerTypeFrame: + trackerImpl = s.createStreamTrackerFrame(layer) + } + if trackerImpl == nil { + return nil + } - tracker := streamtracker.NewStreamTracker(streamtracker.StreamTrackerParams{ - StreamTrackerImpl: trackerImpl, - BitrateReportInterval: bitrateInterval, - Logger: s.logger.WithValues("layer", layer), - }) + tracker = streamtracker.NewStreamTracker(streamtracker.StreamTrackerParams{ + StreamTrackerImpl: trackerImpl, + BitrateReportInterval: bitrateInterval, + Logger: s.logger.WithValues("layer", layer), + }) + } s.logger.Debugw("StreamTrackerManager add track", "layer", layer) tracker.OnStatusChanged(func(status streamtracker.StreamStatus) { @@ -213,6 +244,8 @@ func (s *StreamTrackerManager) RemoveAllTrackers() { s.availableLayers = make([]int32, 0) s.maxExpectedLayerFromTrackInfo() s.paused = false + ddTracker := s.ddTracker + s.ddTracker = nil s.lock.Unlock() for _, tracker := range trackers { @@ -220,9 +253,12 @@ func (s *StreamTrackerManager) RemoveAllTrackers() { tracker.Stop() } } + if ddTracker != nil { + ddTracker.Stop() + } } -func (s *StreamTrackerManager) GetTracker(layer int32) *streamtracker.StreamTracker { +func (s *StreamTrackerManager) GetTracker(layer int32) streamtracker.StreamTrackerWorker { s.lock.RLock() defer s.lock.RUnlock() @@ -270,7 +306,7 @@ func (s *StreamTrackerManager) SetMaxExpectedSpatialLayer(layer int32) int32 { // But, those conditions should be rare. In those cases, the restart will // take longer. // - var trackersToReset []*streamtracker.StreamTracker + var trackersToReset []streamtracker.StreamTrackerWorker for l := s.maxExpectedLayer + 1; l <= layer; l++ { if s.hasSpatialLayerLocked(l) { continue @@ -367,7 +403,8 @@ func (s *StreamTrackerManager) getLayeredBitrateLocked() ([]int32, Bitrates) { } } - if s.isSVC { + // accumulate bitrates for SVC streams without dependency descriptor + if s.isSVC && s.ddTracker == nil { for i := len(br) - 1; i >= 1; i-- { for j := len(br[i]) - 1; j >= 0; j-- { if br[i][j] != 0 {