diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go
index 0ae04a970..6a785ec9e 100644
--- a/pkg/rtc/mediatrackreceiver.go
+++ b/pkg/rtc/mediatrackreceiver.go
@@ -19,6 +19,7 @@ import (
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
+ "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/telemetry"
)
@@ -206,6 +207,15 @@ func (t *MediaTrackReceiver) SetupReceiver(receiver sfu.TrackReceiver, priority
}
func (t *MediaTrackReceiver) SetPotentialCodecs(codecs []webrtc.RTPCodecParameters, headers []webrtc.RTPHeaderExtensionParameter) {
+ // The potential codecs have not published yet, so we can't get the actual Extensions, the client/browser uses same extensions
+ // for all video codecs so we assume they will have same extensions as the primary codec except for the dependency descriptor
+ // that is munged in svc codec.
+ headersWithoutDD := make([]webrtc.RTPHeaderExtensionParameter, 0, len(headers))
+ for _, h := range headers {
+ if h.URI != dependencydescriptor.ExtensionUrl {
+ headersWithoutDD = append(headersWithoutDD, h)
+ }
+ }
t.lock.Lock()
t.potentialCodecs = codecs
for i, c := range codecs {
@@ -217,8 +227,12 @@ func (t *MediaTrackReceiver) SetPotentialCodecs(codecs []webrtc.RTPCodecParamete
}
}
if !exist {
+ extHeaders := headers
+ if !sfu.IsSvcCodec(c.MimeType) {
+ extHeaders = headersWithoutDD
+ }
t.receivers = append(t.receivers, &simulcastReceiver{
- TrackReceiver: NewDummyReceiver(livekit.TrackID(t.trackInfo.Sid), string(t.PublisherID()), c, headers),
+ TrackReceiver: NewDummyReceiver(livekit.TrackID(t.trackInfo.Sid), string(t.PublisherID()), c, extHeaders),
priority: i,
})
}
diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go
index 4eabb04a1..0234be9f8 100644
--- a/pkg/sfu/buffer/buffer.go
+++ b/pkg/sfu/buffer/buffer.go
@@ -94,9 +94,8 @@ type Buffer struct {
logger logger.Logger
// dependency descriptor
- ddExt uint8
- ddParser *DependencyDescriptorParser
- maxLayerChangedCB func(int32, int32)
+ ddExt uint8
+ ddParser *DependencyDescriptorParser
paused bool
frameRateCalculator [DefaultMaxLayerSpatial + 1]FrameRateCalculator
@@ -175,9 +174,6 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
b.frameRateCalculator[i] = frc.GetFrameRateCalculatorForSpatial(int32(i))
}
b.ddParser = NewDependencyDescriptorParser(b.ddExt, b.logger, func(spatial, temporal int32) {
- if b.maxLayerChangedCB != nil {
- b.maxLayerChangedCB(spatial, temporal)
- }
frc.SetMaxLayer(spatial, temporal)
})
@@ -779,12 +775,6 @@ func (b *Buffer) GetAudioLevel() (float64, bool) {
return b.audioLevel.GetLevel()
}
-// DD-TODO : now we rely on stream tracker for layer change, dependency still
-// work for that too. Do we keep it unchanged or use both methods?
-func (b *Buffer) OnMaxLayerChanged(fn func(int32, int32)) {
- b.maxLayerChangedCB = fn
-}
-
func (b *Buffer) OnFpsChanged(f func()) {
b.Lock()
b.onFpsChanged = f
diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go
index 07c072501..80ef3f007 100644
--- a/pkg/sfu/receiver.go
+++ b/pkg/sfu/receiver.go
@@ -20,6 +20,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/audio"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
+ dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
)
var (
@@ -216,6 +217,13 @@ func NewWebRTCReceiver(
})
w.connectionStats.Start(w.trackInfo, time.Now())
+ for _, ext := range receiver.GetParameters().HeaderExtensions {
+ if ext.URI == dd.ExtensionUrl {
+ w.streamTrackerManager.AddDependencyDescriptorTrackers()
+ break
+ }
+ }
+
return w
}
@@ -644,6 +652,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
len(pkt.Packet.Payload),
pkt.Packet.Marker,
pkt.Packet.Timestamp,
+ pkt.DependencyDescriptor,
)
}
diff --git a/pkg/sfu/streamtracker/interfaces.go b/pkg/sfu/streamtracker/interfaces.go
index 3837f8470..934032f68 100644
--- a/pkg/sfu/streamtracker/interfaces.go
+++ b/pkg/sfu/streamtracker/interfaces.go
@@ -3,6 +3,8 @@ package streamtracker
import (
"fmt"
"time"
+
+ "github.com/livekit/livekit-server/pkg/sfu/buffer"
)
// ------------------------------------------------------------
@@ -40,3 +42,15 @@ type StreamTrackerImpl interface {
Observe(hasMarker bool, ts uint32) StreamStatusChange
CheckStatus() StreamStatusChange
}
+
+type StreamTrackerWorker interface {
+ Start()
+ Stop()
+ Reset()
+ OnStatusChanged(f func(status StreamStatus))
+ OnBitrateAvailable(f func())
+ Status() StreamStatus
+ BitrateTemporalCumulative() []int64
+ SetPaused(paused bool)
+ Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, dd *buffer.DependencyDescriptorWithDecodeTarget)
+}
diff --git a/pkg/sfu/streamtracker/streamtracker.go b/pkg/sfu/streamtracker/streamtracker.go
index b1d16dae6..a1b645fb2 100644
--- a/pkg/sfu/streamtracker/streamtracker.go
+++ b/pkg/sfu/streamtracker/streamtracker.go
@@ -7,6 +7,7 @@ import (
"go.uber.org/atomic"
+ "github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/protocol/logger"
)
@@ -175,6 +176,7 @@ func (s *StreamTracker) Observe(
payloadSize int,
hasMarker bool,
ts uint32,
+ _ *buffer.DependencyDescriptorWithDecodeTarget,
) {
s.lock.Lock()
diff --git a/pkg/sfu/streamtracker/streamtracker_dd.go b/pkg/sfu/streamtracker/streamtracker_dd.go
new file mode 100644
index 000000000..b6daee387
--- /dev/null
+++ b/pkg/sfu/streamtracker/streamtracker_dd.go
@@ -0,0 +1,272 @@
+package streamtracker
+
+import (
+ "sync"
+ "time"
+
+ "go.uber.org/atomic"
+
+ "github.com/livekit/livekit-server/pkg/sfu/buffer"
+ dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
+)
+
+type StreamTrackerDependencyDescriptor struct {
+ lock sync.RWMutex
+ paused bool
+ generation atomic.Uint32
+ params StreamTrackerParams
+ maxSpatialLayer int32
+ maxTemporalLayer int32
+
+ onStatusChanged [buffer.DefaultMaxLayerSpatial + 1]func(status StreamStatus)
+ onBitrateAvailable [buffer.DefaultMaxLayerSpatial + 1]func()
+
+ lastBitrateReport time.Time
+ bytesForBitrate [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerTemporal + 1]int64
+ bitrate [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerTemporal + 1]int64
+
+ isStopped bool
+}
+
+func NewStreamTrackerDependencyDescriptor(params StreamTrackerParams) *StreamTrackerDependencyDescriptor {
+ return &StreamTrackerDependencyDescriptor{
+ params: params,
+ maxSpatialLayer: buffer.InvalidLayerSpatial,
+ maxTemporalLayer: buffer.InvalidLayerTemporal,
+ }
+}
+func (s *StreamTrackerDependencyDescriptor) Start() {
+}
+
+func (s *StreamTrackerDependencyDescriptor) Stop() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if s.isStopped {
+ return
+ }
+ s.isStopped = true
+
+ // bump generation to trigger exit of worker
+ s.generation.Inc()
+}
+
+func (s *StreamTrackerDependencyDescriptor) OnStatusChanged(layer int32, f func(status StreamStatus)) {
+ s.lock.Lock()
+ s.onStatusChanged[layer] = f
+ s.lock.Unlock()
+}
+
+func (s *StreamTrackerDependencyDescriptor) OnBitrateAvailable(layer int32, f func()) {
+ s.lock.Lock()
+ s.onBitrateAvailable[layer] = f
+ s.lock.Unlock()
+}
+
+func (s *StreamTrackerDependencyDescriptor) Status(layer int32) StreamStatus {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+
+ if layer > s.maxSpatialLayer {
+ return StreamStatusStopped
+ }
+
+ return StreamStatusActive
+}
+
+func (s *StreamTrackerDependencyDescriptor) BitrateTemporalCumulative(layer int32) []int64 {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+
+ if layer > s.maxSpatialLayer {
+ brs := make([]int64, len(s.bitrate[0]))
+ return brs
+ }
+
+ return s.bitrate[layer][:]
+}
+
+func (s *StreamTrackerDependencyDescriptor) Reset() {
+}
+
+func (s *StreamTrackerDependencyDescriptor) resetLocked() {
+ // bump generation to trigger exit of current worker
+ s.generation.Inc()
+
+ for i := 0; i < len(s.bytesForBitrate); i++ {
+ for j := 0; j < len(s.bytesForBitrate[i]); j++ {
+ s.bytesForBitrate[i][j] = 0
+ }
+ }
+ for i := 0; i < len(s.bitrate); i++ {
+ for j := 0; j < len(s.bitrate[i]); j++ {
+ s.bitrate[i][j] = 0
+ }
+ }
+}
+
+func (s *StreamTrackerDependencyDescriptor) SetPaused(paused bool) {
+ s.lock.Lock()
+ if s.paused == paused {
+ s.lock.Unlock()
+ return
+ }
+ s.paused = paused
+ if !paused {
+ s.resetLocked()
+ } else {
+ s.lastBitrateReport = time.Now()
+ go s.worker(s.generation.Inc())
+
+ }
+ s.lock.Unlock()
+
+}
+
+func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, ddVal *buffer.DependencyDescriptorWithDecodeTarget) {
+ s.lock.Lock()
+
+ if s.isStopped || s.paused || payloadSize == 0 || ddVal == nil {
+ s.lock.Unlock()
+ return
+ }
+
+ var notifyFns []func(status StreamStatus)
+ var notifyStatus StreamStatus
+ if mask := ddVal.Descriptor.ActiveDecodeTargetsBitmask; mask != nil {
+ var maxSpatial, maxTemporal int32
+ for _, dt := range ddVal.DecodeTargets {
+ if *mask&(1<
buffer.DefaultMaxLayerSpatial {
+ maxSpatial = buffer.DefaultMaxLayerSpatial
+ s.params.Logger.Warnw("max spatial layer exceeded", nil, "maxSpatial", maxSpatial)
+ }
+ if maxTemporal > buffer.DefaultMaxLayerTemporal {
+ maxTemporal = buffer.DefaultMaxLayerTemporal
+ s.params.Logger.Warnw("max temporal layer exceeded", nil, "maxTemporal", maxTemporal)
+ }
+
+ s.params.Logger.Debugw("max layer changed", "maxSpatial", maxSpatial, "maxTemporal", maxTemporal)
+ oldMaxSpatial := s.maxSpatialLayer
+ s.maxSpatialLayer, s.maxTemporalLayer = maxSpatial, maxTemporal
+ if oldMaxSpatial == -1 {
+ s.lastBitrateReport = time.Now()
+ go s.worker(s.generation.Inc())
+ }
+
+ if oldMaxSpatial > s.maxSpatialLayer {
+ notifyStatus = StreamStatusStopped
+ for i := s.maxSpatialLayer + 1; i <= oldMaxSpatial; i++ {
+ notifyFns = append(notifyFns, s.onStatusChanged[i])
+ }
+ } else if oldMaxSpatial < s.maxSpatialLayer {
+ notifyStatus = StreamStatusActive
+ for i := oldMaxSpatial + 1; i <= s.maxSpatialLayer; i++ {
+ notifyFns = append(notifyFns, s.onStatusChanged[i])
+ }
+ }
+
+ }
+
+ dtis := ddVal.Descriptor.FrameDependencies.DecodeTargetIndications
+
+ for _, dt := range ddVal.DecodeTargets {
+ // we are not dropping discardable frames now, so only ingore not present frames
+ if dtis[dt.Target] == dd.DecodeTargetNotPresent {
+ continue
+ }
+
+ s.bytesForBitrate[dt.Layer.Spatial][dt.Layer.Temporal] += int64(pktSize)
+ }
+
+ s.lock.Unlock()
+
+ for _, fn := range notifyFns {
+ if fn != nil {
+ fn(notifyStatus)
+ }
+ }
+}
+
+func (s *StreamTrackerDependencyDescriptor) worker(generation uint32) {
+ tickerBitrate := time.NewTicker(s.params.BitrateReportInterval)
+ defer tickerBitrate.Stop()
+
+ for {
+ <-tickerBitrate.C
+ if generation != s.generation.Load() {
+ return
+ }
+ s.bitrateReport()
+ }
+}
+
+func (s *StreamTrackerDependencyDescriptor) bitrateReport() {
+ // run this even if paused to drain out bitrate if there are no packets coming in
+ s.lock.Lock()
+ now := time.Now()
+ diff := now.Sub(s.lastBitrateReport)
+ s.lastBitrateReport = now
+
+ var availableChangedFns []func()
+ for spatial := 0; spatial < len(s.bytesForBitrate); spatial++ {
+ bytesForBitrate := s.bytesForBitrate[spatial][:]
+ bitrateAvailabilityChanged := false
+ bitrates := s.bitrate[spatial][:]
+ for i := 0; i < len(bytesForBitrate); i++ {
+ bitrate := int64(float64(bytesForBitrate[i]*8) / diff.Seconds())
+ if (bitrates[i] == 0 && bitrate > 0) || (bitrates[i] > 0 && bitrate == 0) {
+ bitrateAvailabilityChanged = true
+ }
+ bitrates[i] = bitrate
+ bytesForBitrate[i] = 0
+ }
+
+ if bitrateAvailabilityChanged && s.onBitrateAvailable[spatial] != nil {
+ availableChangedFns = append(availableChangedFns, s.onBitrateAvailable[spatial])
+ }
+ }
+ s.lock.Unlock()
+
+ for _, fn := range availableChangedFns {
+ fn()
+ }
+}
+
+func (s *StreamTrackerDependencyDescriptor) LayeredTracker(layer int32) *StreamTrackerDependencyDescriptorLayered {
+ return &StreamTrackerDependencyDescriptorLayered{
+ StreamTrackerDependencyDescriptor: s,
+ layer: layer,
+ }
+}
+
+// ----------------------------
+// Layered wrapper for StreamTrackerWorker
+type StreamTrackerDependencyDescriptorLayered struct {
+ *StreamTrackerDependencyDescriptor
+ layer int32
+}
+
+func (s *StreamTrackerDependencyDescriptorLayered) OnStatusChanged(f func(status StreamStatus)) {
+ s.StreamTrackerDependencyDescriptor.OnStatusChanged(s.layer, f)
+}
+
+func (s *StreamTrackerDependencyDescriptorLayered) OnBitrateAvailable(f func()) {
+ s.StreamTrackerDependencyDescriptor.OnBitrateAvailable(s.layer, f)
+}
+
+func (s *StreamTrackerDependencyDescriptorLayered) Status() StreamStatus {
+ return s.StreamTrackerDependencyDescriptor.Status(s.layer)
+}
+
+func (s *StreamTrackerDependencyDescriptorLayered) BitrateTemporalCumulative() []int64 {
+ return s.StreamTrackerDependencyDescriptor.BitrateTemporalCumulative(s.layer)
+}
diff --git a/pkg/sfu/streamtracker/streamtracker_dd_test.go b/pkg/sfu/streamtracker/streamtracker_dd_test.go
new file mode 100644
index 000000000..6fdb6916d
--- /dev/null
+++ b/pkg/sfu/streamtracker/streamtracker_dd_test.go
@@ -0,0 +1,83 @@
+package streamtracker
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/livekit/livekit-server/pkg/sfu/buffer"
+ dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
+ "github.com/livekit/protocol/logger"
+)
+
+func createDescriptorDependencyForTargets(maxSpatial, maxTemporal int) *buffer.DependencyDescriptorWithDecodeTarget {
+ var targets []buffer.DependencyDescriptorDecodeTarget
+ var mask uint32
+ for i := 0; i <= maxSpatial; i++ {
+ for j := 0; j <= maxTemporal; j++ {
+ targets = append(targets, buffer.DependencyDescriptorDecodeTarget{Target: len(targets), Layer: buffer.VideoLayer{Spatial: int32(i), Temporal: int32(j)}})
+ mask |= 1 << uint32(len(targets)-1)
+ }
+ }
+
+ dtis := make([]dd.DecodeTargetIndication, len(targets))
+ for _, t := range targets {
+ dtis[t.Target] = dd.DecodeTargetRequired
+ }
+
+ return &buffer.DependencyDescriptorWithDecodeTarget{
+ Descriptor: &dd.DependencyDescriptor{
+ ActiveDecodeTargetsBitmask: &mask,
+ FrameDependencies: &dd.FrameDependencyTemplate{
+ DecodeTargetIndications: dtis,
+ },
+ },
+ DecodeTargets: targets,
+ }
+}
+
+func checkStatues(t *testing.T, statuses []StreamStatus, expected StreamStatus, maxSpatial int) {
+ for i := 0; i <= maxSpatial; i++ {
+ require.Equal(t, expected, statuses[i])
+ }
+
+ for i := maxSpatial + 1; i < len(statuses); i++ {
+ require.NotEqual(t, expected, statuses[i])
+ }
+}
+
+func TestStreamTrackerDD(t *testing.T) {
+ ddTracker := NewStreamTrackerDependencyDescriptor(StreamTrackerParams{
+ BitrateReportInterval: 1 * time.Second,
+ Logger: logger.GetLogger(),
+ })
+ layeredTrackers := make([]StreamTrackerWorker, buffer.DefaultMaxLayerSpatial+1)
+ statuses := make([]StreamStatus, buffer.DefaultMaxLayerSpatial+1)
+ for i := 0; i <= int(buffer.DefaultMaxLayerSpatial); i++ {
+ layeredTrack := ddTracker.LayeredTracker(int32(i))
+ layer := i
+ layeredTrack.OnStatusChanged(func(status StreamStatus) {
+ statuses[layer] = status
+ })
+ layeredTrack.Start()
+ layeredTrackers[i] = layeredTrack
+ }
+ defer ddTracker.Stop()
+
+ // no active layers
+ ddTracker.Observe(0, 1000, 1000, false, 0, nil)
+ checkStatues(t, statuses, StreamStatusActive, int(buffer.InvalidLayerSpatial))
+
+ // layer seen [0,1]
+ ddTracker.Observe(0, 1000, 1000, false, 0, createDescriptorDependencyForTargets(1, 1))
+ checkStatues(t, statuses, StreamStatusActive, 1)
+
+ // layer seen [0,1,2]
+ ddTracker.Observe(0, 1000, 1000, false, 0, createDescriptorDependencyForTargets(2, 1))
+ checkStatues(t, statuses, StreamStatusActive, 2)
+
+ // layer 2 gone, layer seen [0,1]
+ ddTracker.Observe(0, 1000, 1000, false, 0, createDescriptorDependencyForTargets(1, 1))
+ checkStatues(t, statuses, StreamStatusActive, 1)
+}
diff --git a/pkg/sfu/streamtracker/streamtracker_packet_test.go b/pkg/sfu/streamtracker/streamtracker_packet_test.go
index 2aee13ecb..a7beb2d3c 100644
--- a/pkg/sfu/streamtracker/streamtracker_packet_test.go
+++ b/pkg/sfu/streamtracker/streamtracker_packet_test.go
@@ -42,7 +42,7 @@ func TestStreamTracker(t *testing.T) {
require.Equal(t, StreamStatusStopped, tracker.Status())
// observe first packet
- tracker.Observe(0, 20, 10, false, 0)
+ tracker.Observe(0, 20, 10, false, 0, nil)
testutils.WithTimeout(t, func() string {
if callbackCalled.Load() {
@@ -73,7 +73,7 @@ func TestStreamTracker(t *testing.T) {
callbackStatusMu.Unlock()
})
- tracker.Observe(0, 20, 10, false, 0)
+ tracker.Observe(0, 20, 10, false, 0, nil)
testutils.WithTimeout(t, func() string {
callbackStatusMu.RLock()
defer callbackStatusMu.RUnlock()
@@ -110,7 +110,7 @@ func TestStreamTracker(t *testing.T) {
tracker.Start()
require.Equal(t, StreamStatusStopped, tracker.Status())
- tracker.Observe(0, 20, 10, false, 0)
+ tracker.Observe(0, 20, 10, false, 0, nil)
testutils.WithTimeout(t, func() string {
if tracker.Status() == StreamStatusActive {
return ""
@@ -121,11 +121,11 @@ func TestStreamTracker(t *testing.T) {
tracker.setStatusLocked(StreamStatusStopped)
- tracker.Observe(0, 20, 10, false, 0)
+ tracker.Observe(0, 20, 10, false, 0, nil)
tracker.updateStatus()
require.Equal(t, StreamStatusStopped, tracker.Status())
- tracker.Observe(0, 20, 10, false, 0)
+ tracker.Observe(0, 20, 10, false, 0, nil)
tracker.updateStatus()
require.Equal(t, StreamStatusActive, tracker.Status())
@@ -135,7 +135,7 @@ func TestStreamTracker(t *testing.T) {
t.Run("changes to inactive when paused", func(t *testing.T) {
tracker := newStreamTrackerPacket(5, 60, 500*time.Millisecond)
tracker.Start()
- tracker.Observe(0, 20, 10, false, 0)
+ tracker.Observe(0, 20, 10, false, 0, nil)
testutils.WithTimeout(t, func() string {
if tracker.Status() == StreamStatusActive {
return ""
@@ -161,7 +161,7 @@ func TestStreamTracker(t *testing.T) {
require.Equal(t, StreamStatusStopped, tracker.Status())
// observe first packet
- tracker.Observe(0, 20, 10, false, 0)
+ tracker.Observe(0, 20, 10, false, 0, nil)
testutils.WithTimeout(t, func() string {
if callbackCalled.Load() == 1 {
@@ -175,10 +175,10 @@ func TestStreamTracker(t *testing.T) {
require.Equal(t, uint32(1), callbackCalled.Load())
// observe a few more
- tracker.Observe(0, 20, 10, false, 0)
- tracker.Observe(0, 20, 10, false, 0)
- tracker.Observe(0, 20, 10, false, 0)
- tracker.Observe(0, 20, 10, false, 0)
+ tracker.Observe(0, 20, 10, false, 0, nil)
+ tracker.Observe(0, 20, 10, false, 0, nil)
+ tracker.Observe(0, 20, 10, false, 0, nil)
+ tracker.Observe(0, 20, 10, false, 0, nil)
tracker.updateStatus()
// should still be active
@@ -191,7 +191,7 @@ func TestStreamTracker(t *testing.T) {
require.Equal(t, uint32(2), callbackCalled.Load())
// first packet after reset
- tracker.Observe(0, 20, 10, false, 0)
+ tracker.Observe(0, 20, 10, false, 0, nil)
testutils.WithTimeout(t, func() string {
if callbackCalled.Load() == 3 {
diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go
index cdb42f4ab..8c0784d40 100644
--- a/pkg/sfu/streamtrackermanager.go
+++ b/pkg/sfu/streamtrackermanager.go
@@ -41,7 +41,8 @@ type StreamTrackerManager struct {
maxPublishedLayer int32
maxTemporalLayerSeen int32
- trackers [buffer.DefaultMaxLayerSpatial + 1]*streamtracker.StreamTracker
+ ddTracker *streamtracker.StreamTrackerDependencyDescriptor
+ trackers [buffer.DefaultMaxLayerSpatial + 1]streamtracker.StreamTrackerWorker
availableLayers []int32
maxExpectedLayer int32
@@ -133,28 +134,58 @@ func (s *StreamTrackerManager) createStreamTrackerFrame(layer int32) streamtrack
return streamtracker.NewStreamTrackerFrame(params)
}
-func (s *StreamTrackerManager) AddTracker(layer int32) *streamtracker.StreamTracker {
+func (s *StreamTrackerManager) AddDependencyDescriptorTrackers() {
+ bitrateInterval, ok := s.trackerConfig.BitrateReportInterval[0]
+ if !ok {
+ return
+ }
+ s.lock.Lock()
+ var addAllTrackers bool
+ if s.ddTracker == nil {
+ s.ddTracker = streamtracker.NewStreamTrackerDependencyDescriptor(streamtracker.StreamTrackerParams{
+ BitrateReportInterval: bitrateInterval,
+ Logger: s.logger.WithValues("layer", 0),
+ })
+ addAllTrackers = true
+ }
+ s.lock.Unlock()
+ if addAllTrackers {
+ for i := 0; i <= int(buffer.DefaultMaxLayerSpatial); i++ {
+ s.AddTracker(int32(i))
+ }
+ }
+}
+
+func (s *StreamTrackerManager) AddTracker(layer int32) streamtracker.StreamTrackerWorker {
bitrateInterval, ok := s.trackerConfig.BitrateReportInterval[layer]
if !ok {
return nil
}
- var trackerImpl streamtracker.StreamTrackerImpl
- switch s.trackerConfig.StreamTrackerType {
- case config.StreamTrackerTypePacket:
- trackerImpl = s.createStreamTrackerPacket(layer)
- case config.StreamTrackerTypeFrame:
- trackerImpl = s.createStreamTrackerFrame(layer)
- }
- if trackerImpl == nil {
- return nil
+ var tracker streamtracker.StreamTrackerWorker
+ s.lock.Lock()
+ if s.ddTracker != nil {
+ tracker = s.ddTracker.LayeredTracker(layer)
}
+ s.lock.Unlock()
+ if tracker == nil {
+ var trackerImpl streamtracker.StreamTrackerImpl
+ switch s.trackerConfig.StreamTrackerType {
+ case config.StreamTrackerTypePacket:
+ trackerImpl = s.createStreamTrackerPacket(layer)
+ case config.StreamTrackerTypeFrame:
+ trackerImpl = s.createStreamTrackerFrame(layer)
+ }
+ if trackerImpl == nil {
+ return nil
+ }
- tracker := streamtracker.NewStreamTracker(streamtracker.StreamTrackerParams{
- StreamTrackerImpl: trackerImpl,
- BitrateReportInterval: bitrateInterval,
- Logger: s.logger.WithValues("layer", layer),
- })
+ tracker = streamtracker.NewStreamTracker(streamtracker.StreamTrackerParams{
+ StreamTrackerImpl: trackerImpl,
+ BitrateReportInterval: bitrateInterval,
+ Logger: s.logger.WithValues("layer", layer),
+ })
+ }
s.logger.Debugw("StreamTrackerManager add track", "layer", layer)
tracker.OnStatusChanged(func(status streamtracker.StreamStatus) {
@@ -213,6 +244,8 @@ func (s *StreamTrackerManager) RemoveAllTrackers() {
s.availableLayers = make([]int32, 0)
s.maxExpectedLayerFromTrackInfo()
s.paused = false
+ ddTracker := s.ddTracker
+ s.ddTracker = nil
s.lock.Unlock()
for _, tracker := range trackers {
@@ -220,9 +253,12 @@ func (s *StreamTrackerManager) RemoveAllTrackers() {
tracker.Stop()
}
}
+ if ddTracker != nil {
+ ddTracker.Stop()
+ }
}
-func (s *StreamTrackerManager) GetTracker(layer int32) *streamtracker.StreamTracker {
+func (s *StreamTrackerManager) GetTracker(layer int32) streamtracker.StreamTrackerWorker {
s.lock.RLock()
defer s.lock.RUnlock()
@@ -270,7 +306,7 @@ func (s *StreamTrackerManager) SetMaxExpectedSpatialLayer(layer int32) int32 {
// But, those conditions should be rare. In those cases, the restart will
// take longer.
//
- var trackersToReset []*streamtracker.StreamTracker
+ var trackersToReset []streamtracker.StreamTrackerWorker
for l := s.maxExpectedLayer + 1; l <= layer; l++ {
if s.hasSpatialLayerLocked(l) {
continue
@@ -367,7 +403,8 @@ func (s *StreamTrackerManager) getLayeredBitrateLocked() ([]int32, Bitrates) {
}
}
- if s.isSVC {
+ // accumulate bitrates for SVC streams without dependency descriptor
+ if s.isSVC && s.ddTracker == nil {
for i := len(br) - 1; i >= 1; i-- {
for j := len(br[i]) - 1; j >= 0; j-- {
if br[i][j] != 0 {