mirror of
https://github.com/livekit/livekit.git
synced 2026-07-02 15:41:53 +00:00
Add dependency descriptor stream tracker for svc codecs (#1788)
* Add dependency descriptor stream tracker for svc codecs * Solve comments
This commit is contained in:
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/livekit-server/pkg/sfu"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
)
|
||||
|
||||
@@ -206,6 +207,15 @@ func (t *MediaTrackReceiver) SetupReceiver(receiver sfu.TrackReceiver, priority
|
||||
}
|
||||
|
||||
func (t *MediaTrackReceiver) SetPotentialCodecs(codecs []webrtc.RTPCodecParameters, headers []webrtc.RTPHeaderExtensionParameter) {
|
||||
// The potential codecs have not published yet, so we can't get the actual Extensions, the client/browser uses same extensions
|
||||
// for all video codecs so we assume they will have same extensions as the primary codec except for the dependency descriptor
|
||||
// that is munged in svc codec.
|
||||
headersWithoutDD := make([]webrtc.RTPHeaderExtensionParameter, 0, len(headers))
|
||||
for _, h := range headers {
|
||||
if h.URI != dependencydescriptor.ExtensionUrl {
|
||||
headersWithoutDD = append(headersWithoutDD, h)
|
||||
}
|
||||
}
|
||||
t.lock.Lock()
|
||||
t.potentialCodecs = codecs
|
||||
for i, c := range codecs {
|
||||
@@ -217,8 +227,12 @@ func (t *MediaTrackReceiver) SetPotentialCodecs(codecs []webrtc.RTPCodecParamete
|
||||
}
|
||||
}
|
||||
if !exist {
|
||||
extHeaders := headers
|
||||
if !sfu.IsSvcCodec(c.MimeType) {
|
||||
extHeaders = headersWithoutDD
|
||||
}
|
||||
t.receivers = append(t.receivers, &simulcastReceiver{
|
||||
TrackReceiver: NewDummyReceiver(livekit.TrackID(t.trackInfo.Sid), string(t.PublisherID()), c, headers),
|
||||
TrackReceiver: NewDummyReceiver(livekit.TrackID(t.trackInfo.Sid), string(t.PublisherID()), c, extHeaders),
|
||||
priority: i,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -94,9 +94,8 @@ type Buffer struct {
|
||||
logger logger.Logger
|
||||
|
||||
// dependency descriptor
|
||||
ddExt uint8
|
||||
ddParser *DependencyDescriptorParser
|
||||
maxLayerChangedCB func(int32, int32)
|
||||
ddExt uint8
|
||||
ddParser *DependencyDescriptorParser
|
||||
|
||||
paused bool
|
||||
frameRateCalculator [DefaultMaxLayerSpatial + 1]FrameRateCalculator
|
||||
@@ -175,9 +174,6 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
|
||||
b.frameRateCalculator[i] = frc.GetFrameRateCalculatorForSpatial(int32(i))
|
||||
}
|
||||
b.ddParser = NewDependencyDescriptorParser(b.ddExt, b.logger, func(spatial, temporal int32) {
|
||||
if b.maxLayerChangedCB != nil {
|
||||
b.maxLayerChangedCB(spatial, temporal)
|
||||
}
|
||||
frc.SetMaxLayer(spatial, temporal)
|
||||
})
|
||||
|
||||
@@ -779,12 +775,6 @@ func (b *Buffer) GetAudioLevel() (float64, bool) {
|
||||
return b.audioLevel.GetLevel()
|
||||
}
|
||||
|
||||
// DD-TODO : now we rely on stream tracker for layer change, dependency still
|
||||
// work for that too. Do we keep it unchanged or use both methods?
|
||||
func (b *Buffer) OnMaxLayerChanged(fn func(int32, int32)) {
|
||||
b.maxLayerChangedCB = fn
|
||||
}
|
||||
|
||||
func (b *Buffer) OnFpsChanged(f func()) {
|
||||
b.Lock()
|
||||
b.onFpsChanged = f
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/sfu/audio"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
|
||||
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -216,6 +217,13 @@ func NewWebRTCReceiver(
|
||||
})
|
||||
w.connectionStats.Start(w.trackInfo, time.Now())
|
||||
|
||||
for _, ext := range receiver.GetParameters().HeaderExtensions {
|
||||
if ext.URI == dd.ExtensionUrl {
|
||||
w.streamTrackerManager.AddDependencyDescriptorTrackers()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return w
|
||||
}
|
||||
|
||||
@@ -644,6 +652,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
|
||||
len(pkt.Packet.Payload),
|
||||
pkt.Packet.Marker,
|
||||
pkt.Packet.Timestamp,
|
||||
pkt.DependencyDescriptor,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@ package streamtracker
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
)
|
||||
|
||||
// ------------------------------------------------------------
|
||||
@@ -40,3 +42,15 @@ type StreamTrackerImpl interface {
|
||||
Observe(hasMarker bool, ts uint32) StreamStatusChange
|
||||
CheckStatus() StreamStatusChange
|
||||
}
|
||||
|
||||
type StreamTrackerWorker interface {
|
||||
Start()
|
||||
Stop()
|
||||
Reset()
|
||||
OnStatusChanged(f func(status StreamStatus))
|
||||
OnBitrateAvailable(f func())
|
||||
Status() StreamStatus
|
||||
BitrateTemporalCumulative() []int64
|
||||
SetPaused(paused bool)
|
||||
Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, dd *buffer.DependencyDescriptorWithDecodeTarget)
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
|
||||
@@ -175,6 +176,7 @@ func (s *StreamTracker) Observe(
|
||||
payloadSize int,
|
||||
hasMarker bool,
|
||||
ts uint32,
|
||||
_ *buffer.DependencyDescriptorWithDecodeTarget,
|
||||
) {
|
||||
s.lock.Lock()
|
||||
|
||||
|
||||
@@ -0,0 +1,272 @@
|
||||
package streamtracker
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
|
||||
)
|
||||
|
||||
type StreamTrackerDependencyDescriptor struct {
|
||||
lock sync.RWMutex
|
||||
paused bool
|
||||
generation atomic.Uint32
|
||||
params StreamTrackerParams
|
||||
maxSpatialLayer int32
|
||||
maxTemporalLayer int32
|
||||
|
||||
onStatusChanged [buffer.DefaultMaxLayerSpatial + 1]func(status StreamStatus)
|
||||
onBitrateAvailable [buffer.DefaultMaxLayerSpatial + 1]func()
|
||||
|
||||
lastBitrateReport time.Time
|
||||
bytesForBitrate [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerTemporal + 1]int64
|
||||
bitrate [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerTemporal + 1]int64
|
||||
|
||||
isStopped bool
|
||||
}
|
||||
|
||||
func NewStreamTrackerDependencyDescriptor(params StreamTrackerParams) *StreamTrackerDependencyDescriptor {
|
||||
return &StreamTrackerDependencyDescriptor{
|
||||
params: params,
|
||||
maxSpatialLayer: buffer.InvalidLayerSpatial,
|
||||
maxTemporalLayer: buffer.InvalidLayerTemporal,
|
||||
}
|
||||
}
|
||||
func (s *StreamTrackerDependencyDescriptor) Start() {
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptor) Stop() {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
if s.isStopped {
|
||||
return
|
||||
}
|
||||
s.isStopped = true
|
||||
|
||||
// bump generation to trigger exit of worker
|
||||
s.generation.Inc()
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptor) OnStatusChanged(layer int32, f func(status StreamStatus)) {
|
||||
s.lock.Lock()
|
||||
s.onStatusChanged[layer] = f
|
||||
s.lock.Unlock()
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptor) OnBitrateAvailable(layer int32, f func()) {
|
||||
s.lock.Lock()
|
||||
s.onBitrateAvailable[layer] = f
|
||||
s.lock.Unlock()
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptor) Status(layer int32) StreamStatus {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
if layer > s.maxSpatialLayer {
|
||||
return StreamStatusStopped
|
||||
}
|
||||
|
||||
return StreamStatusActive
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptor) BitrateTemporalCumulative(layer int32) []int64 {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
if layer > s.maxSpatialLayer {
|
||||
brs := make([]int64, len(s.bitrate[0]))
|
||||
return brs
|
||||
}
|
||||
|
||||
return s.bitrate[layer][:]
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptor) Reset() {
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptor) resetLocked() {
|
||||
// bump generation to trigger exit of current worker
|
||||
s.generation.Inc()
|
||||
|
||||
for i := 0; i < len(s.bytesForBitrate); i++ {
|
||||
for j := 0; j < len(s.bytesForBitrate[i]); j++ {
|
||||
s.bytesForBitrate[i][j] = 0
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(s.bitrate); i++ {
|
||||
for j := 0; j < len(s.bitrate[i]); j++ {
|
||||
s.bitrate[i][j] = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptor) SetPaused(paused bool) {
|
||||
s.lock.Lock()
|
||||
if s.paused == paused {
|
||||
s.lock.Unlock()
|
||||
return
|
||||
}
|
||||
s.paused = paused
|
||||
if !paused {
|
||||
s.resetLocked()
|
||||
} else {
|
||||
s.lastBitrateReport = time.Now()
|
||||
go s.worker(s.generation.Inc())
|
||||
|
||||
}
|
||||
s.lock.Unlock()
|
||||
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, ddVal *buffer.DependencyDescriptorWithDecodeTarget) {
|
||||
s.lock.Lock()
|
||||
|
||||
if s.isStopped || s.paused || payloadSize == 0 || ddVal == nil {
|
||||
s.lock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
var notifyFns []func(status StreamStatus)
|
||||
var notifyStatus StreamStatus
|
||||
if mask := ddVal.Descriptor.ActiveDecodeTargetsBitmask; mask != nil {
|
||||
var maxSpatial, maxTemporal int32
|
||||
for _, dt := range ddVal.DecodeTargets {
|
||||
if *mask&(1<<dt.Target) != uint32(dd.DecodeTargetNotPresent) {
|
||||
if maxSpatial < dt.Layer.Spatial {
|
||||
maxSpatial = dt.Layer.Spatial
|
||||
}
|
||||
if maxTemporal < dt.Layer.Temporal {
|
||||
maxTemporal = dt.Layer.Temporal
|
||||
}
|
||||
}
|
||||
}
|
||||
if maxSpatial > buffer.DefaultMaxLayerSpatial {
|
||||
maxSpatial = buffer.DefaultMaxLayerSpatial
|
||||
s.params.Logger.Warnw("max spatial layer exceeded", nil, "maxSpatial", maxSpatial)
|
||||
}
|
||||
if maxTemporal > buffer.DefaultMaxLayerTemporal {
|
||||
maxTemporal = buffer.DefaultMaxLayerTemporal
|
||||
s.params.Logger.Warnw("max temporal layer exceeded", nil, "maxTemporal", maxTemporal)
|
||||
}
|
||||
|
||||
s.params.Logger.Debugw("max layer changed", "maxSpatial", maxSpatial, "maxTemporal", maxTemporal)
|
||||
oldMaxSpatial := s.maxSpatialLayer
|
||||
s.maxSpatialLayer, s.maxTemporalLayer = maxSpatial, maxTemporal
|
||||
if oldMaxSpatial == -1 {
|
||||
s.lastBitrateReport = time.Now()
|
||||
go s.worker(s.generation.Inc())
|
||||
}
|
||||
|
||||
if oldMaxSpatial > s.maxSpatialLayer {
|
||||
notifyStatus = StreamStatusStopped
|
||||
for i := s.maxSpatialLayer + 1; i <= oldMaxSpatial; i++ {
|
||||
notifyFns = append(notifyFns, s.onStatusChanged[i])
|
||||
}
|
||||
} else if oldMaxSpatial < s.maxSpatialLayer {
|
||||
notifyStatus = StreamStatusActive
|
||||
for i := oldMaxSpatial + 1; i <= s.maxSpatialLayer; i++ {
|
||||
notifyFns = append(notifyFns, s.onStatusChanged[i])
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
dtis := ddVal.Descriptor.FrameDependencies.DecodeTargetIndications
|
||||
|
||||
for _, dt := range ddVal.DecodeTargets {
|
||||
// we are not dropping discardable frames now, so only ingore not present frames
|
||||
if dtis[dt.Target] == dd.DecodeTargetNotPresent {
|
||||
continue
|
||||
}
|
||||
|
||||
s.bytesForBitrate[dt.Layer.Spatial][dt.Layer.Temporal] += int64(pktSize)
|
||||
}
|
||||
|
||||
s.lock.Unlock()
|
||||
|
||||
for _, fn := range notifyFns {
|
||||
if fn != nil {
|
||||
fn(notifyStatus)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptor) worker(generation uint32) {
|
||||
tickerBitrate := time.NewTicker(s.params.BitrateReportInterval)
|
||||
defer tickerBitrate.Stop()
|
||||
|
||||
for {
|
||||
<-tickerBitrate.C
|
||||
if generation != s.generation.Load() {
|
||||
return
|
||||
}
|
||||
s.bitrateReport()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptor) bitrateReport() {
|
||||
// run this even if paused to drain out bitrate if there are no packets coming in
|
||||
s.lock.Lock()
|
||||
now := time.Now()
|
||||
diff := now.Sub(s.lastBitrateReport)
|
||||
s.lastBitrateReport = now
|
||||
|
||||
var availableChangedFns []func()
|
||||
for spatial := 0; spatial < len(s.bytesForBitrate); spatial++ {
|
||||
bytesForBitrate := s.bytesForBitrate[spatial][:]
|
||||
bitrateAvailabilityChanged := false
|
||||
bitrates := s.bitrate[spatial][:]
|
||||
for i := 0; i < len(bytesForBitrate); i++ {
|
||||
bitrate := int64(float64(bytesForBitrate[i]*8) / diff.Seconds())
|
||||
if (bitrates[i] == 0 && bitrate > 0) || (bitrates[i] > 0 && bitrate == 0) {
|
||||
bitrateAvailabilityChanged = true
|
||||
}
|
||||
bitrates[i] = bitrate
|
||||
bytesForBitrate[i] = 0
|
||||
}
|
||||
|
||||
if bitrateAvailabilityChanged && s.onBitrateAvailable[spatial] != nil {
|
||||
availableChangedFns = append(availableChangedFns, s.onBitrateAvailable[spatial])
|
||||
}
|
||||
}
|
||||
s.lock.Unlock()
|
||||
|
||||
for _, fn := range availableChangedFns {
|
||||
fn()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptor) LayeredTracker(layer int32) *StreamTrackerDependencyDescriptorLayered {
|
||||
return &StreamTrackerDependencyDescriptorLayered{
|
||||
StreamTrackerDependencyDescriptor: s,
|
||||
layer: layer,
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------
|
||||
// Layered wrapper for StreamTrackerWorker
|
||||
type StreamTrackerDependencyDescriptorLayered struct {
|
||||
*StreamTrackerDependencyDescriptor
|
||||
layer int32
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptorLayered) OnStatusChanged(f func(status StreamStatus)) {
|
||||
s.StreamTrackerDependencyDescriptor.OnStatusChanged(s.layer, f)
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptorLayered) OnBitrateAvailable(f func()) {
|
||||
s.StreamTrackerDependencyDescriptor.OnBitrateAvailable(s.layer, f)
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptorLayered) Status() StreamStatus {
|
||||
return s.StreamTrackerDependencyDescriptor.Status(s.layer)
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptorLayered) BitrateTemporalCumulative() []int64 {
|
||||
return s.StreamTrackerDependencyDescriptor.BitrateTemporalCumulative(s.layer)
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
package streamtracker
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
|
||||
func createDescriptorDependencyForTargets(maxSpatial, maxTemporal int) *buffer.DependencyDescriptorWithDecodeTarget {
|
||||
var targets []buffer.DependencyDescriptorDecodeTarget
|
||||
var mask uint32
|
||||
for i := 0; i <= maxSpatial; i++ {
|
||||
for j := 0; j <= maxTemporal; j++ {
|
||||
targets = append(targets, buffer.DependencyDescriptorDecodeTarget{Target: len(targets), Layer: buffer.VideoLayer{Spatial: int32(i), Temporal: int32(j)}})
|
||||
mask |= 1 << uint32(len(targets)-1)
|
||||
}
|
||||
}
|
||||
|
||||
dtis := make([]dd.DecodeTargetIndication, len(targets))
|
||||
for _, t := range targets {
|
||||
dtis[t.Target] = dd.DecodeTargetRequired
|
||||
}
|
||||
|
||||
return &buffer.DependencyDescriptorWithDecodeTarget{
|
||||
Descriptor: &dd.DependencyDescriptor{
|
||||
ActiveDecodeTargetsBitmask: &mask,
|
||||
FrameDependencies: &dd.FrameDependencyTemplate{
|
||||
DecodeTargetIndications: dtis,
|
||||
},
|
||||
},
|
||||
DecodeTargets: targets,
|
||||
}
|
||||
}
|
||||
|
||||
func checkStatues(t *testing.T, statuses []StreamStatus, expected StreamStatus, maxSpatial int) {
|
||||
for i := 0; i <= maxSpatial; i++ {
|
||||
require.Equal(t, expected, statuses[i])
|
||||
}
|
||||
|
||||
for i := maxSpatial + 1; i < len(statuses); i++ {
|
||||
require.NotEqual(t, expected, statuses[i])
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamTrackerDD(t *testing.T) {
|
||||
ddTracker := NewStreamTrackerDependencyDescriptor(StreamTrackerParams{
|
||||
BitrateReportInterval: 1 * time.Second,
|
||||
Logger: logger.GetLogger(),
|
||||
})
|
||||
layeredTrackers := make([]StreamTrackerWorker, buffer.DefaultMaxLayerSpatial+1)
|
||||
statuses := make([]StreamStatus, buffer.DefaultMaxLayerSpatial+1)
|
||||
for i := 0; i <= int(buffer.DefaultMaxLayerSpatial); i++ {
|
||||
layeredTrack := ddTracker.LayeredTracker(int32(i))
|
||||
layer := i
|
||||
layeredTrack.OnStatusChanged(func(status StreamStatus) {
|
||||
statuses[layer] = status
|
||||
})
|
||||
layeredTrack.Start()
|
||||
layeredTrackers[i] = layeredTrack
|
||||
}
|
||||
defer ddTracker.Stop()
|
||||
|
||||
// no active layers
|
||||
ddTracker.Observe(0, 1000, 1000, false, 0, nil)
|
||||
checkStatues(t, statuses, StreamStatusActive, int(buffer.InvalidLayerSpatial))
|
||||
|
||||
// layer seen [0,1]
|
||||
ddTracker.Observe(0, 1000, 1000, false, 0, createDescriptorDependencyForTargets(1, 1))
|
||||
checkStatues(t, statuses, StreamStatusActive, 1)
|
||||
|
||||
// layer seen [0,1,2]
|
||||
ddTracker.Observe(0, 1000, 1000, false, 0, createDescriptorDependencyForTargets(2, 1))
|
||||
checkStatues(t, statuses, StreamStatusActive, 2)
|
||||
|
||||
// layer 2 gone, layer seen [0,1]
|
||||
ddTracker.Observe(0, 1000, 1000, false, 0, createDescriptorDependencyForTargets(1, 1))
|
||||
checkStatues(t, statuses, StreamStatusActive, 1)
|
||||
}
|
||||
@@ -42,7 +42,7 @@ func TestStreamTracker(t *testing.T) {
|
||||
require.Equal(t, StreamStatusStopped, tracker.Status())
|
||||
|
||||
// observe first packet
|
||||
tracker.Observe(0, 20, 10, false, 0)
|
||||
tracker.Observe(0, 20, 10, false, 0, nil)
|
||||
|
||||
testutils.WithTimeout(t, func() string {
|
||||
if callbackCalled.Load() {
|
||||
@@ -73,7 +73,7 @@ func TestStreamTracker(t *testing.T) {
|
||||
callbackStatusMu.Unlock()
|
||||
})
|
||||
|
||||
tracker.Observe(0, 20, 10, false, 0)
|
||||
tracker.Observe(0, 20, 10, false, 0, nil)
|
||||
testutils.WithTimeout(t, func() string {
|
||||
callbackStatusMu.RLock()
|
||||
defer callbackStatusMu.RUnlock()
|
||||
@@ -110,7 +110,7 @@ func TestStreamTracker(t *testing.T) {
|
||||
tracker.Start()
|
||||
require.Equal(t, StreamStatusStopped, tracker.Status())
|
||||
|
||||
tracker.Observe(0, 20, 10, false, 0)
|
||||
tracker.Observe(0, 20, 10, false, 0, nil)
|
||||
testutils.WithTimeout(t, func() string {
|
||||
if tracker.Status() == StreamStatusActive {
|
||||
return ""
|
||||
@@ -121,11 +121,11 @@ func TestStreamTracker(t *testing.T) {
|
||||
|
||||
tracker.setStatusLocked(StreamStatusStopped)
|
||||
|
||||
tracker.Observe(0, 20, 10, false, 0)
|
||||
tracker.Observe(0, 20, 10, false, 0, nil)
|
||||
tracker.updateStatus()
|
||||
require.Equal(t, StreamStatusStopped, tracker.Status())
|
||||
|
||||
tracker.Observe(0, 20, 10, false, 0)
|
||||
tracker.Observe(0, 20, 10, false, 0, nil)
|
||||
tracker.updateStatus()
|
||||
require.Equal(t, StreamStatusActive, tracker.Status())
|
||||
|
||||
@@ -135,7 +135,7 @@ func TestStreamTracker(t *testing.T) {
|
||||
t.Run("changes to inactive when paused", func(t *testing.T) {
|
||||
tracker := newStreamTrackerPacket(5, 60, 500*time.Millisecond)
|
||||
tracker.Start()
|
||||
tracker.Observe(0, 20, 10, false, 0)
|
||||
tracker.Observe(0, 20, 10, false, 0, nil)
|
||||
testutils.WithTimeout(t, func() string {
|
||||
if tracker.Status() == StreamStatusActive {
|
||||
return ""
|
||||
@@ -161,7 +161,7 @@ func TestStreamTracker(t *testing.T) {
|
||||
require.Equal(t, StreamStatusStopped, tracker.Status())
|
||||
|
||||
// observe first packet
|
||||
tracker.Observe(0, 20, 10, false, 0)
|
||||
tracker.Observe(0, 20, 10, false, 0, nil)
|
||||
|
||||
testutils.WithTimeout(t, func() string {
|
||||
if callbackCalled.Load() == 1 {
|
||||
@@ -175,10 +175,10 @@ func TestStreamTracker(t *testing.T) {
|
||||
require.Equal(t, uint32(1), callbackCalled.Load())
|
||||
|
||||
// observe a few more
|
||||
tracker.Observe(0, 20, 10, false, 0)
|
||||
tracker.Observe(0, 20, 10, false, 0)
|
||||
tracker.Observe(0, 20, 10, false, 0)
|
||||
tracker.Observe(0, 20, 10, false, 0)
|
||||
tracker.Observe(0, 20, 10, false, 0, nil)
|
||||
tracker.Observe(0, 20, 10, false, 0, nil)
|
||||
tracker.Observe(0, 20, 10, false, 0, nil)
|
||||
tracker.Observe(0, 20, 10, false, 0, nil)
|
||||
tracker.updateStatus()
|
||||
|
||||
// should still be active
|
||||
@@ -191,7 +191,7 @@ func TestStreamTracker(t *testing.T) {
|
||||
require.Equal(t, uint32(2), callbackCalled.Load())
|
||||
|
||||
// first packet after reset
|
||||
tracker.Observe(0, 20, 10, false, 0)
|
||||
tracker.Observe(0, 20, 10, false, 0, nil)
|
||||
|
||||
testutils.WithTimeout(t, func() string {
|
||||
if callbackCalled.Load() == 3 {
|
||||
|
||||
@@ -41,7 +41,8 @@ type StreamTrackerManager struct {
|
||||
maxPublishedLayer int32
|
||||
maxTemporalLayerSeen int32
|
||||
|
||||
trackers [buffer.DefaultMaxLayerSpatial + 1]*streamtracker.StreamTracker
|
||||
ddTracker *streamtracker.StreamTrackerDependencyDescriptor
|
||||
trackers [buffer.DefaultMaxLayerSpatial + 1]streamtracker.StreamTrackerWorker
|
||||
|
||||
availableLayers []int32
|
||||
maxExpectedLayer int32
|
||||
@@ -133,28 +134,58 @@ func (s *StreamTrackerManager) createStreamTrackerFrame(layer int32) streamtrack
|
||||
return streamtracker.NewStreamTrackerFrame(params)
|
||||
}
|
||||
|
||||
func (s *StreamTrackerManager) AddTracker(layer int32) *streamtracker.StreamTracker {
|
||||
func (s *StreamTrackerManager) AddDependencyDescriptorTrackers() {
|
||||
bitrateInterval, ok := s.trackerConfig.BitrateReportInterval[0]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
s.lock.Lock()
|
||||
var addAllTrackers bool
|
||||
if s.ddTracker == nil {
|
||||
s.ddTracker = streamtracker.NewStreamTrackerDependencyDescriptor(streamtracker.StreamTrackerParams{
|
||||
BitrateReportInterval: bitrateInterval,
|
||||
Logger: s.logger.WithValues("layer", 0),
|
||||
})
|
||||
addAllTrackers = true
|
||||
}
|
||||
s.lock.Unlock()
|
||||
if addAllTrackers {
|
||||
for i := 0; i <= int(buffer.DefaultMaxLayerSpatial); i++ {
|
||||
s.AddTracker(int32(i))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StreamTrackerManager) AddTracker(layer int32) streamtracker.StreamTrackerWorker {
|
||||
bitrateInterval, ok := s.trackerConfig.BitrateReportInterval[layer]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
var trackerImpl streamtracker.StreamTrackerImpl
|
||||
switch s.trackerConfig.StreamTrackerType {
|
||||
case config.StreamTrackerTypePacket:
|
||||
trackerImpl = s.createStreamTrackerPacket(layer)
|
||||
case config.StreamTrackerTypeFrame:
|
||||
trackerImpl = s.createStreamTrackerFrame(layer)
|
||||
}
|
||||
if trackerImpl == nil {
|
||||
return nil
|
||||
var tracker streamtracker.StreamTrackerWorker
|
||||
s.lock.Lock()
|
||||
if s.ddTracker != nil {
|
||||
tracker = s.ddTracker.LayeredTracker(layer)
|
||||
}
|
||||
s.lock.Unlock()
|
||||
if tracker == nil {
|
||||
var trackerImpl streamtracker.StreamTrackerImpl
|
||||
switch s.trackerConfig.StreamTrackerType {
|
||||
case config.StreamTrackerTypePacket:
|
||||
trackerImpl = s.createStreamTrackerPacket(layer)
|
||||
case config.StreamTrackerTypeFrame:
|
||||
trackerImpl = s.createStreamTrackerFrame(layer)
|
||||
}
|
||||
if trackerImpl == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
tracker := streamtracker.NewStreamTracker(streamtracker.StreamTrackerParams{
|
||||
StreamTrackerImpl: trackerImpl,
|
||||
BitrateReportInterval: bitrateInterval,
|
||||
Logger: s.logger.WithValues("layer", layer),
|
||||
})
|
||||
tracker = streamtracker.NewStreamTracker(streamtracker.StreamTrackerParams{
|
||||
StreamTrackerImpl: trackerImpl,
|
||||
BitrateReportInterval: bitrateInterval,
|
||||
Logger: s.logger.WithValues("layer", layer),
|
||||
})
|
||||
}
|
||||
|
||||
s.logger.Debugw("StreamTrackerManager add track", "layer", layer)
|
||||
tracker.OnStatusChanged(func(status streamtracker.StreamStatus) {
|
||||
@@ -213,6 +244,8 @@ func (s *StreamTrackerManager) RemoveAllTrackers() {
|
||||
s.availableLayers = make([]int32, 0)
|
||||
s.maxExpectedLayerFromTrackInfo()
|
||||
s.paused = false
|
||||
ddTracker := s.ddTracker
|
||||
s.ddTracker = nil
|
||||
s.lock.Unlock()
|
||||
|
||||
for _, tracker := range trackers {
|
||||
@@ -220,9 +253,12 @@ func (s *StreamTrackerManager) RemoveAllTrackers() {
|
||||
tracker.Stop()
|
||||
}
|
||||
}
|
||||
if ddTracker != nil {
|
||||
ddTracker.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StreamTrackerManager) GetTracker(layer int32) *streamtracker.StreamTracker {
|
||||
func (s *StreamTrackerManager) GetTracker(layer int32) streamtracker.StreamTrackerWorker {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
@@ -270,7 +306,7 @@ func (s *StreamTrackerManager) SetMaxExpectedSpatialLayer(layer int32) int32 {
|
||||
// But, those conditions should be rare. In those cases, the restart will
|
||||
// take longer.
|
||||
//
|
||||
var trackersToReset []*streamtracker.StreamTracker
|
||||
var trackersToReset []streamtracker.StreamTrackerWorker
|
||||
for l := s.maxExpectedLayer + 1; l <= layer; l++ {
|
||||
if s.hasSpatialLayerLocked(l) {
|
||||
continue
|
||||
@@ -367,7 +403,8 @@ func (s *StreamTrackerManager) getLayeredBitrateLocked() ([]int32, Bitrates) {
|
||||
}
|
||||
}
|
||||
|
||||
if s.isSVC {
|
||||
// accumulate bitrates for SVC streams without dependency descriptor
|
||||
if s.isSVC && s.ddTracker == nil {
|
||||
for i := len(br) - 1; i >= 1; i-- {
|
||||
for j := len(br[i]) - 1; j >= 0; j-- {
|
||||
if br[i][j] != 0 {
|
||||
|
||||
Reference in New Issue
Block a user