From d70843bc5a720b5d4c95f9a210e60741bb9c8e27 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 28 Sep 2022 23:54:40 +0530 Subject: [PATCH] Some logging tweaks (#1048) --- pkg/rtc/dynacastmanager.go | 2 +- pkg/rtc/dynacastquality.go | 6 +- pkg/rtc/participant.go | 6 +- pkg/sfu/downtrack.go | 4 - pkg/sfu/forwarder.go | 261 ++++++++++++++------------------ pkg/sfu/forwarder_test.go | 2 +- pkg/sfu/streamallocator.go | 124 +++++++-------- pkg/sfu/streamtrackermanager.go | 4 +- 8 files changed, 183 insertions(+), 226 deletions(-) diff --git a/pkg/rtc/dynacastmanager.go b/pkg/rtc/dynacastmanager.go index 2f0bb45e3..3e8e4bf80 100644 --- a/pkg/rtc/dynacastmanager.go +++ b/pkg/rtc/dynacastmanager.go @@ -174,7 +174,7 @@ func (d *DynacastManager) updateMaxQualityForMime(mime string, maxQuality liveki func (d *DynacastManager) update(force bool) { d.lock.Lock() - d.params.Logger.Infow("processing quality change", + d.params.Logger.Debugw("processing quality change", "force", force, "committedMaxSubscribedQuality", d.committedMaxSubscribedQuality, "maxSubscribedQuality", d.maxSubscribedQuality, diff --git a/pkg/rtc/dynacastquality.go b/pkg/rtc/dynacastquality.go index 5b55e1ae8..00d9734ef 100644 --- a/pkg/rtc/dynacastquality.go +++ b/pkg/rtc/dynacastquality.go @@ -58,7 +58,7 @@ func (d *DynacastQuality) OnSubscribedMaxQualityChange(f func(maxSubscribedQuali } func (d *DynacastQuality) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, quality livekit.VideoQuality) { - d.params.Logger.Infow( + d.params.Logger.Debugw( "setting subscriber max quality", "mime", d.params.MimeType, "subscriberID", subscriberID, @@ -77,7 +77,7 @@ func (d *DynacastQuality) NotifySubscriberMaxQuality(subscriberID livekit.Partic } func (d *DynacastQuality) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quality livekit.VideoQuality) { - d.params.Logger.Infow( + d.params.Logger.Debugw( "setting subscriber node max quality", "mime", d.params.MimeType, "subscriberNodeID", nodeID, @@ -124,7 +124,7 @@ func (d *DynacastQuality) updateQualityChange(force bool) { d.initialized = true d.maxSubscribedQuality = maxSubscribedQuality - d.params.Logger.Infow("notifying quality change", + d.params.Logger.Debugw("notifying quality change", "mime", d.params.MimeType, "maxSubscriberQuality", d.maxSubscriberQuality, "maxSubscriberNodeQuality", d.maxSubscriberNodeQuality, diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index cb1b1e817..2a18c70ae 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1503,7 +1503,7 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, ) } - p.params.Logger.Debugw( + p.params.Logger.Infow( "sending max subscribed quality", "trackID", trackID, "qualities", subscribedQualities, @@ -2068,7 +2068,7 @@ func (p *ParticipantImpl) onAnyTransportNegotiationFailed() { } func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, f func(sub types.LocalParticipant) error) { - p.params.Logger.Infow("queuing subscribe", "trackID", trackID) + p.params.Logger.Debugw("queuing subscribe", "trackID", trackID) p.supervisor.UpdateSubscription(trackID, true) @@ -2083,7 +2083,7 @@ func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, f func( } func (p *ParticipantImpl) EnqueueUnsubscribeTrack(trackID livekit.TrackID, willBeResumed bool, f func(subscriberID livekit.ParticipantID, willBeResumed bool) error) { - p.params.Logger.Infow("queuing unsubscribe", "trackID", trackID) + p.params.Logger.Debugw("queuing unsubscribe", "trackID", trackID) p.supervisor.UpdateSubscription(trackID, false) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index fff74e851..d04b33457 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -843,7 +843,6 @@ func (d *DownTrack) DistanceToDesired() int32 { func (d *DownTrack) AllocateOptimal(allowOvershoot bool) VideoAllocation { allocation := d.forwarder.AllocateOptimal(d.receiver.GetBitrateTemporalCumulative(), allowOvershoot) - d.logger.Debugw("stream: allocation optimal available", "allocation", allocation) d.maybeStartKeyFrameRequester() return allocation } @@ -870,14 +869,12 @@ func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation { allocation := d.forwarder.ProvisionalAllocateCommit() - d.logger.Debugw("stream: allocation commit", "allocation", allocation) d.maybeStartKeyFrameRequester() return allocation } func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (VideoAllocation, bool) { allocation, available := d.forwarder.AllocateNextHigher(availableChannelCapacity, d.receiver.GetBitrateTemporalCumulative(), allowOvershoot) - d.logger.Debugw("stream: allocation next higher layer", "allocation", allocation, "available", available) d.maybeStartKeyFrameRequester() return allocation, available } @@ -890,7 +887,6 @@ func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransitio func (d *DownTrack) Pause() VideoAllocation { allocation := d.forwarder.Pause(d.receiver.GetBitrateTemporalCumulative()) - d.logger.Debugw("stream: pause", "allocation", allocation) d.maybeStartKeyFrameRequester() return allocation } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 7fc3e25c5..7dd39f210 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -102,8 +102,8 @@ type VideoAllocation struct { } func (v VideoAllocation) String() string { - return fmt.Sprintf("VideoAllocation{state: %s, change: %s, bw: %d, del: %d, avail: %+v, exmpt: %+v, rates: %+v, target: %s}", - v.state, v.change, v.bandwidthRequested, v.bandwidthDelta, v.availableLayers, v.exemptedLayers, v.bitrates, v.targetLayers) + return fmt.Sprintf("VideoAllocation{state: %s, change: %s, bw: %d, del: %d, avail: %+v, exmpt: %+v, rates: %+v, target: %s, dist: %d}", + v.state, v.change, v.bandwidthRequested, v.bandwidthDelta, v.availableLayers, v.exemptedLayers, v.bitrates, v.targetLayers, v.distanceToDesired) } var ( @@ -132,6 +132,10 @@ type VideoTransition struct { bandwidthDelta int64 } +func (v VideoTransition) String() string { + return fmt.Sprintf("VideoTransition{from: %s, to: %s, del: %d}", v.from, v.to, v.bandwidthDelta) +} + // ------------------------------------------------------------------- type TranslationParams struct { @@ -543,31 +547,34 @@ func (f *Forwarder) AllocateOptimal(brs Bitrates, allowOvershoot bool) VideoAllo return f.lastAllocation } - state := VideoAllocationStateNone - change := VideoStreamingChangeNone - bandwidthRequested := int64(0) - targetLayers := InvalidLayers + alloc := VideoAllocation{ + availableLayers: f.availableLayers, + exemptedLayers: f.exemptedLayers, + bitrates: brs, + targetLayers: InvalidLayers, + } switch { case f.muted: - state = VideoAllocationStateMuted + alloc.state = VideoAllocationStateMuted + case len(f.availableLayers) == 0: // feed is dry - state = VideoAllocationStateFeedDry + alloc.state = VideoAllocationStateFeedDry + case !f.bitrateAvailable(brs): // feed bitrate not yet calculated for all available layers - state = VideoAllocationStateAwaitingMeasurement + alloc.state = VideoAllocationStateAwaitingMeasurement // // Resume with the highest layer available <= max subscribed layer // If already resumed, move allocation to the highest available layer <= max subscribed layer // - targetLayers.Spatial = int32(math.Min(float64(f.maxLayers.Spatial), float64(f.availableLayers[len(f.availableLayers)-1]))) - targetLayers.Temporal = int32(math.Max(0, float64(f.maxLayers.Temporal))) - - if f.targetLayers == InvalidLayers && targetLayers.IsValid() { - change = VideoStreamingChangeResuming + alloc.targetLayers = VideoLayers{ + Spatial: int32(math.Min(float64(f.maxLayers.Spatial), float64(f.availableLayers[len(f.availableLayers)-1]))), + Temporal: int32(math.Max(0, float64(f.maxLayers.Temporal))), } + default: // allocate best layer available for s := f.maxLayers.Spatial; s >= 0; s-- { @@ -576,22 +583,22 @@ func (f *Forwarder) AllocateOptimal(brs Bitrates, allowOvershoot bool) VideoAllo continue } - targetLayers = VideoLayers{ + alloc.targetLayers = VideoLayers{ Spatial: s, Temporal: t, } - bandwidthRequested = brs[s][t] - state = VideoAllocationStateOptimal + alloc.bandwidthRequested = brs[s][t] + alloc.state = VideoAllocationStateOptimal break } - if bandwidthRequested != 0 { + if alloc.bandwidthRequested != 0 { break } } - if bandwidthRequested == 0 && f.maxLayers.IsValid() && allowOvershoot { + if alloc.bandwidthRequested == 0 && f.maxLayers.IsValid() && allowOvershoot { // if we cannot allocate anything below max layer, // look for a layer above. It is okay to overshoot // in optimal allocation (i.e. no bandwidth restrictions). @@ -604,24 +611,24 @@ func (f *Forwarder) AllocateOptimal(brs Bitrates, allowOvershoot bool) VideoAllo continue } - targetLayers = VideoLayers{ + alloc.targetLayers = VideoLayers{ Spatial: s, Temporal: t, } - bandwidthRequested = brs[s][t] - state = VideoAllocationStateOptimal - f.logger.Infow("allowing overshoot", "maxLayer", f.maxLayers, "targetLayers", targetLayers) + alloc.bandwidthRequested = brs[s][t] + alloc.state = VideoAllocationStateOptimal + f.logger.Infow("allowing overshoot", "maxLayer", f.maxLayers, "targetLayers", alloc.targetLayers) break } - if bandwidthRequested != 0 { + if alloc.bandwidthRequested != 0 { break } } } - if bandwidthRequested == 0 && f.maxLayers.IsValid() { + if alloc.bandwidthRequested == 0 && f.maxLayers.IsValid() { // if overshoot was allowed and it did not also find a layer, // keep target at exempted layer (if available) and the current layer is at that level. // i. e. exempted layer may really have stopped, so a layer switch to an exempted layer should @@ -631,53 +638,27 @@ func (f *Forwarder) AllocateOptimal(brs Bitrates, allowOvershoot bool) VideoAllo if f.currentLayers.IsValid() { for _, s := range f.exemptedLayers { if s <= f.maxLayers.Spatial && f.currentLayers.Spatial == s { - targetLayers = f.currentLayers - bandwidthRequested = brs[targetLayers.Spatial][targetLayers.Temporal] - state = VideoAllocationStateDeficient + alloc.targetLayers = f.currentLayers + alloc.bandwidthRequested = brs[alloc.targetLayers.Spatial][alloc.targetLayers.Temporal] + alloc.state = VideoAllocationStateDeficient break } } } } - if f.targetLayers == InvalidLayers && targetLayers.IsValid() { - change = VideoStreamingChangeResuming - } else if f.targetLayers != InvalidLayers && !targetLayers.IsValid() { - change = VideoStreamingChangePausing - } - - if !targetLayers.IsValid() && f.maxLayers.IsValid() { - state = VideoAllocationStateDeficient + if !alloc.targetLayers.IsValid() && f.maxLayers.IsValid() { + alloc.state = VideoAllocationStateDeficient } } - if !targetLayers.IsValid() { - targetLayers = InvalidLayers - } - f.lastAllocation = VideoAllocation{ - state: state, - change: change, - bandwidthRequested: bandwidthRequested, - bandwidthDelta: bandwidthRequested - f.lastAllocation.bandwidthRequested, - bitrates: brs, - targetLayers: targetLayers, - distanceToDesired: f.getDistanceToDesired(brs, targetLayers, f.maxLayers), - } - if len(f.availableLayers) > 0 { - f.lastAllocation.availableLayers = make([]int32, len(f.availableLayers)) - copy(f.lastAllocation.availableLayers, f.availableLayers) - } - if len(f.exemptedLayers) > 0 { - f.lastAllocation.exemptedLayers = make([]int32, len(f.exemptedLayers)) - copy(f.lastAllocation.exemptedLayers, f.exemptedLayers) + if !alloc.targetLayers.IsValid() { + alloc.targetLayers = InvalidLayers } + alloc.bandwidthDelta = alloc.bandwidthRequested - f.lastAllocation.bandwidthRequested + alloc.distanceToDesired = f.getDistanceToDesired(brs, alloc.targetLayers, f.maxLayers) - f.setTargetLayers(f.lastAllocation.targetLayers) - if f.targetLayers == InvalidLayers { - f.resyncLocked() - } - - return f.lastAllocation + return f.updateAllocation(alloc, "optimal") } func (f *Forwarder) ProvisionalAllocatePrepare(bitrates Bitrates) { @@ -1006,65 +987,47 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation { f.lock.Lock() defer f.lock.Unlock() - state := VideoAllocationStateNone - change := VideoStreamingChangeNone - bandwidthRequested := int64(0) + alloc := VideoAllocation{ + bandwidthRequested: 0, + bandwidthDelta: -f.lastAllocation.bandwidthRequested, + availableLayers: f.provisional.availableLayers, + exemptedLayers: f.provisional.exemptedLayers, + bitrates: f.provisional.bitrates, + targetLayers: f.provisional.allocatedLayers, + distanceToDesired: f.getDistanceToDesired(f.provisional.bitrates, f.provisional.allocatedLayers, f.provisional.maxLayers), + } switch { case f.provisional.muted: - state = VideoAllocationStateMuted + alloc.state = VideoAllocationStateMuted + case len(f.provisional.availableLayers) == 0: // feed is dry - state = VideoAllocationStateFeedDry - case f.provisional.allocatedLayers == InvalidLayers: - state = VideoAllocationStateDeficient + alloc.state = VideoAllocationStateFeedDry + + case f.provisional.allocatedLayers == InvalidLayers: + alloc.state = VideoAllocationStateDeficient - if f.targetLayers != InvalidLayers { - change = VideoStreamingChangePausing - } default: optimalBandwidthNeeded := f.getOptimalBandwidthNeeded(f.provisional.bitrates, f.provisional.maxLayers) - bandwidthRequested = f.provisional.bitrates[f.provisional.allocatedLayers.Spatial][f.provisional.allocatedLayers.Temporal] + bandwidthRequested := f.provisional.bitrates[f.provisional.allocatedLayers.Spatial][f.provisional.allocatedLayers.Temporal] + + alloc.bandwidthRequested = bandwidthRequested + alloc.bandwidthDelta = bandwidthRequested - f.lastAllocation.bandwidthRequested + if f.provisional.allocatedLayers.GreaterThan(f.provisional.maxLayers) || (optimalBandwidthNeeded > 0 && bandwidthRequested >= optimalBandwidthNeeded) { // could be greater than optimal if overshooting - state = VideoAllocationStateOptimal + alloc.state = VideoAllocationStateOptimal } else { // // Optimal bandwidth could be 0 if using exempted layer. // Exempted layer is still treated as deficient. // - state = VideoAllocationStateDeficient - } - - if f.targetLayers == InvalidLayers { - change = VideoStreamingChangeResuming + alloc.state = VideoAllocationStateDeficient } } - f.lastAllocation = VideoAllocation{ - state: state, - change: change, - bandwidthRequested: bandwidthRequested, - bandwidthDelta: bandwidthRequested - f.lastAllocation.bandwidthRequested, - bitrates: f.provisional.bitrates, - targetLayers: f.provisional.allocatedLayers, - distanceToDesired: f.getDistanceToDesired(f.provisional.bitrates, f.provisional.allocatedLayers, f.provisional.maxLayers), - } - if len(f.provisional.availableLayers) > 0 { - f.lastAllocation.availableLayers = make([]int32, len(f.provisional.availableLayers)) - copy(f.lastAllocation.availableLayers, f.provisional.availableLayers) - } - if len(f.provisional.exemptedLayers) > 0 { - f.lastAllocation.exemptedLayers = make([]int32, len(f.provisional.exemptedLayers)) - copy(f.lastAllocation.exemptedLayers, f.provisional.exemptedLayers) - } - - f.setTargetLayers(f.lastAllocation.targetLayers) - if f.targetLayers == InvalidLayers { - f.resyncLocked() - } - - return f.lastAllocation + return f.updateAllocation(alloc, "cooperative") } func (f *Forwarder) AllocateNextHigher(availableChannelCapacity int64, brs Bitrates, allowOvershoot bool) (VideoAllocation, bool) { @@ -1113,36 +1076,21 @@ func (f *Forwarder) AllocateNextHigher(availableChannelCapacity int64, brs Bitra } targetLayers := VideoLayers{Spatial: s, Temporal: t} - state := VideoAllocationStateDeficient - if targetLayers.GreaterThan(f.maxLayers) || (optimalBandwidthNeeded > 0 && bandwidthRequested >= optimalBandwidthNeeded) { - state = VideoAllocationStateOptimal - } - - change := VideoStreamingChangeNone - if f.targetLayers == InvalidLayers { - change = VideoStreamingChangeResuming - } - - f.lastAllocation = VideoAllocation{ - state: state, - change: change, + alloc := VideoAllocation{ + state: VideoAllocationStateDeficient, bandwidthRequested: bandwidthRequested, bandwidthDelta: bandwidthRequested - alreadyAllocated, + availableLayers: f.availableLayers, + exemptedLayers: f.exemptedLayers, bitrates: brs, targetLayers: targetLayers, distanceToDesired: f.getDistanceToDesired(brs, targetLayers, f.maxLayers), } - if len(f.availableLayers) > 0 { - f.lastAllocation.availableLayers = make([]int32, len(f.availableLayers)) - copy(f.lastAllocation.availableLayers, f.availableLayers) - } - if len(f.exemptedLayers) > 0 { - f.lastAllocation.exemptedLayers = make([]int32, len(f.exemptedLayers)) - copy(f.lastAllocation.exemptedLayers, f.exemptedLayers) + if targetLayers.GreaterThan(f.maxLayers) || (optimalBandwidthNeeded > 0 && bandwidthRequested >= optimalBandwidthNeeded) { + alloc.state = VideoAllocationStateOptimal } - f.setTargetLayers(f.lastAllocation.targetLayers) - return true, f.lastAllocation, true + return true, f.updateAllocation(alloc, "next-higher"), true } } @@ -1275,40 +1223,51 @@ func (f *Forwarder) Pause(brs Bitrates) VideoAllocation { f.lock.Lock() defer f.lock.Unlock() - state := VideoAllocationStateNone - change := VideoStreamingChangeNone - - switch { - case f.muted: - state = VideoAllocationStateMuted - case len(f.availableLayers) == 0: - // feed is dry - state = VideoAllocationStateFeedDry - default: - // feed bitrate is not yet calculated or pausing due to lack of bandwidth - state = VideoAllocationStateDeficient - - if f.targetLayers != InvalidLayers { - change = VideoStreamingChangePausing - } - } - - f.lastAllocation = VideoAllocation{ - state: state, - change: change, + alloc := VideoAllocation{ bandwidthRequested: 0, bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested, + availableLayers: f.availableLayers, + exemptedLayers: f.exemptedLayers, bitrates: brs, targetLayers: InvalidLayers, distanceToDesired: f.getDistanceToDesired(brs, InvalidLayers, f.maxLayers), } - if len(f.availableLayers) > 0 { - f.lastAllocation.availableLayers = make([]int32, len(f.availableLayers)) - copy(f.lastAllocation.availableLayers, f.availableLayers) + + switch { + case f.muted: + alloc.state = VideoAllocationStateMuted + + case len(f.availableLayers) == 0: + // feed is dry + alloc.state = VideoAllocationStateFeedDry + + default: + // feed bitrate is not yet calculated or pausing due to lack of bandwidth + alloc.state = VideoAllocationStateDeficient } - if len(f.exemptedLayers) > 0 { - f.lastAllocation.exemptedLayers = make([]int32, len(f.exemptedLayers)) - copy(f.lastAllocation.exemptedLayers, f.exemptedLayers) + + return f.updateAllocation(alloc, "pause") +} + +func (f *Forwarder) updateAllocation(alloc VideoAllocation, reason string) VideoAllocation { + if f.targetLayers == InvalidLayers && alloc.targetLayers.IsValid() { + alloc.change = VideoStreamingChangeResuming + } else if f.targetLayers != InvalidLayers && !alloc.targetLayers.IsValid() { + alloc.change = VideoStreamingChangePausing + } + + if alloc.state != f.lastAllocation.state || alloc.targetLayers != f.lastAllocation.targetLayers { + f.logger.Infow(fmt.Sprintf("stream allocation: %s", reason), "allocation", alloc) + } + + f.lastAllocation = alloc + if len(alloc.availableLayers) > 0 { + f.lastAllocation.availableLayers = make([]int32, len(alloc.availableLayers)) + copy(f.lastAllocation.availableLayers, alloc.availableLayers) + } + if len(alloc.exemptedLayers) > 0 { + f.lastAllocation.exemptedLayers = make([]int32, len(alloc.exemptedLayers)) + copy(f.lastAllocation.exemptedLayers, alloc.exemptedLayers) } f.setTargetLayers(f.lastAllocation.targetLayers) diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 323ecd70e..d9cd50a9d 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -1153,7 +1153,7 @@ func TestForwarderPauseMute(t *testing.T) { f.Mute(true) expectedResult := VideoAllocation{ state: VideoAllocationStateMuted, - change: VideoStreamingChangeNone, + change: VideoStreamingChangePausing, bandwidthRequested: 0, bandwidthDelta: 0 - bitrates[0][0], availableLayers: availableLayers, diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index 64c488c54..529b9d5a4 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -68,51 +68,51 @@ var ( } ) -type State int +type streamAllocatorState int const ( - StateStable State = iota - StateDeficient + streamAllocatorStateStable streamAllocatorState = iota + streamAllocatorStateDeficient ) -func (s State) String() string { +func (s streamAllocatorState) String() string { switch s { - case StateStable: + case streamAllocatorStateStable: return "STABLE" - case StateDeficient: + case streamAllocatorStateDeficient: return "DEFICIENT" default: return fmt.Sprintf("%d", int(s)) } } -type Signal int +type streamAllocatorSignal int const ( - SignalAllocateTrack Signal = iota - SignalAllocateAllTracks - SignalAdjustState - SignalEstimate - SignalPeriodicPing - SignalSendProbe - SignalProbeClusterDone + streamAllocatorSignalAllocateTrack streamAllocatorSignal = iota + streamAllocatorSignalAllocateAllTracks + streamAllocatorSignalAdjustState + streamAllocatorSignalEstimate + streamAllocatorSignalPeriodicPing + streamAllocatorSignalSendProbe + streamAllocatorSignalProbeClusterDone ) -func (s Signal) String() string { +func (s streamAllocatorSignal) String() string { switch s { - case SignalAllocateTrack: + case streamAllocatorSignalAllocateTrack: return "ALLOCATE_TRACK" - case SignalAllocateAllTracks: + case streamAllocatorSignalAllocateAllTracks: return "ALLOCATE_ALL_TRACKS" - case SignalAdjustState: + case streamAllocatorSignalAdjustState: return "ADJUST_STATE" - case SignalEstimate: + case streamAllocatorSignalEstimate: return "ESTIMATE" - case SignalPeriodicPing: + case streamAllocatorSignalPeriodicPing: return "PERIODIC_PING" - case SignalSendProbe: + case streamAllocatorSignalSendProbe: return "SEND_PROBE" - case SignalProbeClusterDone: + case streamAllocatorSignalProbeClusterDone: return "PROBE_CLUSTER_DONE" default: return fmt.Sprintf("%d", int(s)) @@ -120,7 +120,7 @@ func (s Signal) String() string { } type Event struct { - Signal Signal + Signal streamAllocatorSignal TrackID livekit.TrackID Data interface{} } @@ -161,7 +161,7 @@ type StreamAllocator struct { isAllocateAllPending bool rembTrackingSSRC uint32 - state State + state streamAllocatorState eventChMu sync.RWMutex eventCh chan Event @@ -253,7 +253,7 @@ func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack) { // LK-TODO: use any saved bandwidth to re-distribute s.postEvent(Event{ - Signal: SignalAdjustState, + Signal: streamAllocatorSignalAdjustState, }) } @@ -265,7 +265,7 @@ func (s *StreamAllocator) SetTrackPriority(downTrack *DownTrack, priority uint8) // do a full allocation on a track priority change to keep it simple s.isAllocateAllPending = true s.postEvent(Event{ - Signal: SignalAllocateAllTracks, + Signal: streamAllocatorSignalAllocateAllTracks, }) } } @@ -276,7 +276,7 @@ func (s *StreamAllocator) resetState() { s.channelObserver = s.newChannelObserverNonProbe() s.resetProbe() - s.state = StateStable + s.state = streamAllocatorStateStable } // called when a new REMB is received (receive side bandwidth estimation) @@ -326,7 +326,7 @@ func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstima } if !found { if len(remb.SSRCs) == 0 { - s.params.Logger.Warnw("no SSRC to track REMB", nil) + s.params.Logger.Warnw("stream allocator: no SSRC to track REMB", nil) s.videoTracksMu.Unlock() return } @@ -354,7 +354,7 @@ func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstima s.videoTracksMu.Unlock() s.postEvent(Event{ - Signal: SignalEstimate, + Signal: streamAllocatorSignalEstimate, Data: int64(remb.Bitrate), }) } @@ -369,7 +369,7 @@ func (s *StreamAllocator) onTransportCCFeedback(downTrack *DownTrack, fb *rtcp.T // called when target bitrate changes (send side bandwidth estimation) func (s *StreamAllocator) onTargetBitrateChange(bitrate int) { s.postEvent(Event{ - Signal: SignalEstimate, + Signal: streamAllocatorSignalEstimate, Data: int64(bitrate), }) } @@ -402,7 +402,7 @@ func (s *StreamAllocator) onSubscribedLayersChanged(downTrack *DownTrack, layers if shouldPost { s.postEvent(Event{ - Signal: SignalAllocateTrack, + Signal: streamAllocatorSignalAllocateTrack, TrackID: livekit.TrackID(downTrack.ID()), }) } @@ -416,7 +416,7 @@ func (s *StreamAllocator) onPacketSent(downTrack *DownTrack, size int) { // called when prober wants to send packet(s) func (s *StreamAllocator) onSendProbe(bytesToSend int) { s.postEvent(Event{ - Signal: SignalSendProbe, + Signal: streamAllocatorSignalSendProbe, Data: bytesToSend, }) } @@ -424,7 +424,7 @@ func (s *StreamAllocator) onSendProbe(bytesToSend int) { // called when prober wants to send packet(s) func (s *StreamAllocator) onProbeClusterDone(info ProbeClusterInfo) { s.postEvent(Event{ - Signal: SignalProbeClusterDone, + Signal: streamAllocatorSignalProbeClusterDone, Data: info, }) } @@ -441,7 +441,7 @@ func (s *StreamAllocator) maybePostEventAllocateTrack(downTrack *DownTrack) { if shouldPost { s.postEvent(Event{ - Signal: SignalAllocateTrack, + Signal: streamAllocatorSignalAllocateTrack, TrackID: livekit.TrackID(downTrack.ID()), }) } @@ -457,7 +457,7 @@ func (s *StreamAllocator) postEvent(event Event) { select { case s.eventCh <- event: default: - s.params.Logger.Warnw("event queue full", nil) + s.params.Logger.Warnw("stream allocator: event queue full", nil) } s.eventChMu.RUnlock() } @@ -479,26 +479,26 @@ func (s *StreamAllocator) ping() { } s.postEvent(Event{ - Signal: SignalPeriodicPing, + Signal: streamAllocatorSignalPeriodicPing, }) } } func (s *StreamAllocator) handleEvent(event *Event) { switch event.Signal { - case SignalAllocateTrack: + case streamAllocatorSignalAllocateTrack: s.handleSignalAllocateTrack(event) - case SignalAllocateAllTracks: + case streamAllocatorSignalAllocateAllTracks: s.handleSignalAllocateAllTracks(event) - case SignalAdjustState: + case streamAllocatorSignalAdjustState: s.handleSignalAdjustState(event) - case SignalEstimate: + case streamAllocatorSignalEstimate: s.handleSignalEstimate(event) - case SignalPeriodicPing: + case streamAllocatorSignalPeriodicPing: s.handleSignalPeriodicPing(event) - case SignalSendProbe: + case streamAllocatorSignalSendProbe: s.handleSignalSendProbe(event) - case SignalProbeClusterDone: + case streamAllocatorSignalProbeClusterDone: s.handleSignalProbeClusterDone(event) } } @@ -521,7 +521,7 @@ func (s *StreamAllocator) handleSignalAllocateAllTracks(event *Event) { s.isAllocateAllPending = false s.videoTracksMu.Unlock() - if s.state == StateDeficient { + if s.state == streamAllocatorStateDeficient { s.allocateAllTracks() } } @@ -549,7 +549,7 @@ func (s *StreamAllocator) handleSignalPeriodicPing(event *Event) { } // probe if necessary and timing is right - if s.state == StateDeficient { + if s.state == streamAllocatorStateDeficient { s.maybeProbe() } } @@ -599,12 +599,12 @@ func (s *StreamAllocator) handleSignalProbeClusterDone(event *Event) { s.probeEndTime = s.lastProbeStartTime.Add(queueWait) } -func (s *StreamAllocator) setState(state State) { +func (s *StreamAllocator) setState(state streamAllocatorState) { if s.state == state { return } - s.params.Logger.Debugw("state change", "from", s.state, "to", state) + s.params.Logger.Infow("stream allocator: state change", "from", s.state, "to", state) s.state = state // reset probe to enforce a delay after state change before probing @@ -614,12 +614,12 @@ func (s *StreamAllocator) setState(state State) { func (s *StreamAllocator) adjustState() { for _, track := range s.getTracks() { if track.IsDeficient() { - s.setState(StateDeficient) + s.setState(streamAllocatorStateDeficient) return } } - s.setState(StateStable) + s.setState(streamAllocatorStateStable) } func (s *StreamAllocator) handleNewEstimateInProbe() { @@ -646,16 +646,18 @@ func (s *StreamAllocator) handleNewEstimateInProbe() { // In rare cases, the estimate gets stuck. Prevent from probe running amok // LK-TODO: Need more testing here to ensure that probe does not cause a lot of damage // - s.params.Logger.Debugw("probe: aborting, no trend", "cluster", s.probeClusterId) + s.params.Logger.Infow("stream allocator: probe: aborting, no trend", "cluster", s.probeClusterId) s.abortProbe() + case trend == ChannelTrendCongesting: // stop immediately if the probe is congesting channel more - s.params.Logger.Debugw("probe: aborting, channel is congesting", "cluster", s.probeClusterId) + s.params.Logger.Infow("stream allocator: probe: aborting, channel is congesting", "cluster", s.probeClusterId) s.abortProbe() + case s.channelObserver.GetHighestEstimate() > s.probeGoalBps: // reached goal, stop probing - s.params.Logger.Debugw( - "probe: stopping, goal reached", + s.params.Logger.Infow( + "stream allocator: probe: stopping, goal reached", "cluster", s.probeClusterId, "goal", s.probeGoalBps, "highest", s.channelObserver.GetHighestEstimate(), @@ -688,7 +690,7 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() { } s.params.Logger.Infow( - "channel congestion detected, updating channel capacity", + "stream allocator: channel congestion detected, updating channel capacity", "reason", reason, "old(bps)", s.committedChannelCapacity, "new(bps)", estimateToCommit, @@ -714,7 +716,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) { s.abortProbe() // if not deficient, free pass allocate track - if !s.params.Config.Enabled || s.state == StateStable || !track.IsManaged() { + if !s.params.Config.Enabled || s.state == streamAllocatorStateStable || !track.IsManaged() { update := NewStreamStateUpdate() allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal) update.HandleStreamingChange(allocation.change, track) @@ -844,7 +846,7 @@ func (s *StreamAllocator) maybeBoostDeficientTracks() { if s.params.Config.MinChannelCapacity > committedChannelCapacity { committedChannelCapacity = s.params.Config.MinChannelCapacity s.params.Logger.Debugw( - "overriding channel capacity", + "stream allocator: overriding channel capacity", "actual", s.committedChannelCapacity, "override", committedChannelCapacity, ) @@ -899,7 +901,7 @@ func (s *StreamAllocator) allocateAllTracks() { if s.params.Config.MinChannelCapacity > availableChannelCapacity { availableChannelCapacity = s.params.Config.MinChannelCapacity s.params.Logger.Debugw( - "overriding channel capacity", + "stream allocator: overriding channel capacity", "actual", s.committedChannelCapacity, "override", availableChannelCapacity, ) @@ -1034,8 +1036,8 @@ func (s *StreamAllocator) initProbe(probeRateBps int64) { ProbeMinDuration, ProbeMaxDuration, ) - s.params.Logger.Debugw( - "starting probe", + s.params.Logger.Infow( + "stream allocator: starting probe", "probeClusterId", s.probeClusterId, "current usage", expectedBandwidthUsage, "committed", s.committedChannelCapacity, @@ -1613,7 +1615,7 @@ func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) { switch { case estimateDirection == TrendDirectionDownward: c.logger.Debugw( - "channel observer: estimate is trending downward", + "stream allocator: channel observer: estimate is trending downward", "name", c.params.Name, "estimate", c.estimateTrend.ToString(), "packets", packets, @@ -1623,7 +1625,7 @@ func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) { return ChannelTrendCongesting, ChannelCongestionReasonEstimate case c.params.NackWindowMinDuration != 0 && !c.nackWindowStartTime.IsZero() && time.Since(c.nackWindowStartTime) > c.params.NackWindowMinDuration && nackRatio > c.params.NackRatioThreshold: c.logger.Debugw( - "channel observer: high rate of repeated NACKs", + "stream allocator: channel observer: high rate of repeated NACKs", "name", c.params.Name, "estimate", c.estimateTrend.ToString(), "packets", packets, diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 3aeb303a9..8b703ef49 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -379,7 +379,7 @@ func (s *StreamTrackerManager) addAvailableLayer(layer int32) { exemptedLayers = append(exemptedLayers, s.exemptedLayers...) s.lock.Unlock() - s.logger.Debugw( + s.logger.Infow( "available layers changed - layer seen", "added", layer, "availableLayers", availableLayers, @@ -461,7 +461,7 @@ func (s *StreamTrackerManager) removeAvailableLayer(layer int32) { } s.lock.Unlock() - s.logger.Debugw( + s.logger.Infow( "available layers changed - layer gone", "removed", layer, "availableLayers", newLayers,