Some logging tweaks (#1048)

This commit is contained in:
Raja Subramanian
2022-09-28 23:54:40 +05:30
committed by GitHub
parent ab40ea7e7a
commit d70843bc5a
8 changed files with 183 additions and 226 deletions

View File

@@ -174,7 +174,7 @@ func (d *DynacastManager) updateMaxQualityForMime(mime string, maxQuality liveki
func (d *DynacastManager) update(force bool) {
d.lock.Lock()
d.params.Logger.Infow("processing quality change",
d.params.Logger.Debugw("processing quality change",
"force", force,
"committedMaxSubscribedQuality", d.committedMaxSubscribedQuality,
"maxSubscribedQuality", d.maxSubscribedQuality,

View File

@@ -58,7 +58,7 @@ func (d *DynacastQuality) OnSubscribedMaxQualityChange(f func(maxSubscribedQuali
}
func (d *DynacastQuality) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, quality livekit.VideoQuality) {
d.params.Logger.Infow(
d.params.Logger.Debugw(
"setting subscriber max quality",
"mime", d.params.MimeType,
"subscriberID", subscriberID,
@@ -77,7 +77,7 @@ func (d *DynacastQuality) NotifySubscriberMaxQuality(subscriberID livekit.Partic
}
func (d *DynacastQuality) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quality livekit.VideoQuality) {
d.params.Logger.Infow(
d.params.Logger.Debugw(
"setting subscriber node max quality",
"mime", d.params.MimeType,
"subscriberNodeID", nodeID,
@@ -124,7 +124,7 @@ func (d *DynacastQuality) updateQualityChange(force bool) {
d.initialized = true
d.maxSubscribedQuality = maxSubscribedQuality
d.params.Logger.Infow("notifying quality change",
d.params.Logger.Debugw("notifying quality change",
"mime", d.params.MimeType,
"maxSubscriberQuality", d.maxSubscriberQuality,
"maxSubscriberNodeQuality", d.maxSubscriberNodeQuality,

View File

@@ -1503,7 +1503,7 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID,
)
}
p.params.Logger.Debugw(
p.params.Logger.Infow(
"sending max subscribed quality",
"trackID", trackID,
"qualities", subscribedQualities,
@@ -2068,7 +2068,7 @@ func (p *ParticipantImpl) onAnyTransportNegotiationFailed() {
}
func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, f func(sub types.LocalParticipant) error) {
p.params.Logger.Infow("queuing subscribe", "trackID", trackID)
p.params.Logger.Debugw("queuing subscribe", "trackID", trackID)
p.supervisor.UpdateSubscription(trackID, true)
@@ -2083,7 +2083,7 @@ func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, f func(
}
func (p *ParticipantImpl) EnqueueUnsubscribeTrack(trackID livekit.TrackID, willBeResumed bool, f func(subscriberID livekit.ParticipantID, willBeResumed bool) error) {
p.params.Logger.Infow("queuing unsubscribe", "trackID", trackID)
p.params.Logger.Debugw("queuing unsubscribe", "trackID", trackID)
p.supervisor.UpdateSubscription(trackID, false)

View File

@@ -843,7 +843,6 @@ func (d *DownTrack) DistanceToDesired() int32 {
func (d *DownTrack) AllocateOptimal(allowOvershoot bool) VideoAllocation {
allocation := d.forwarder.AllocateOptimal(d.receiver.GetBitrateTemporalCumulative(), allowOvershoot)
d.logger.Debugw("stream: allocation optimal available", "allocation", allocation)
d.maybeStartKeyFrameRequester()
return allocation
}
@@ -870,14 +869,12 @@ func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti
func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation {
allocation := d.forwarder.ProvisionalAllocateCommit()
d.logger.Debugw("stream: allocation commit", "allocation", allocation)
d.maybeStartKeyFrameRequester()
return allocation
}
func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (VideoAllocation, bool) {
allocation, available := d.forwarder.AllocateNextHigher(availableChannelCapacity, d.receiver.GetBitrateTemporalCumulative(), allowOvershoot)
d.logger.Debugw("stream: allocation next higher layer", "allocation", allocation, "available", available)
d.maybeStartKeyFrameRequester()
return allocation, available
}
@@ -890,7 +887,6 @@ func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransitio
func (d *DownTrack) Pause() VideoAllocation {
allocation := d.forwarder.Pause(d.receiver.GetBitrateTemporalCumulative())
d.logger.Debugw("stream: pause", "allocation", allocation)
d.maybeStartKeyFrameRequester()
return allocation
}

View File

@@ -102,8 +102,8 @@ type VideoAllocation struct {
}
func (v VideoAllocation) String() string {
return fmt.Sprintf("VideoAllocation{state: %s, change: %s, bw: %d, del: %d, avail: %+v, exmpt: %+v, rates: %+v, target: %s}",
v.state, v.change, v.bandwidthRequested, v.bandwidthDelta, v.availableLayers, v.exemptedLayers, v.bitrates, v.targetLayers)
return fmt.Sprintf("VideoAllocation{state: %s, change: %s, bw: %d, del: %d, avail: %+v, exmpt: %+v, rates: %+v, target: %s, dist: %d}",
v.state, v.change, v.bandwidthRequested, v.bandwidthDelta, v.availableLayers, v.exemptedLayers, v.bitrates, v.targetLayers, v.distanceToDesired)
}
var (
@@ -132,6 +132,10 @@ type VideoTransition struct {
bandwidthDelta int64
}
func (v VideoTransition) String() string {
return fmt.Sprintf("VideoTransition{from: %s, to: %s, del: %d}", v.from, v.to, v.bandwidthDelta)
}
// -------------------------------------------------------------------
type TranslationParams struct {
@@ -543,31 +547,34 @@ func (f *Forwarder) AllocateOptimal(brs Bitrates, allowOvershoot bool) VideoAllo
return f.lastAllocation
}
state := VideoAllocationStateNone
change := VideoStreamingChangeNone
bandwidthRequested := int64(0)
targetLayers := InvalidLayers
alloc := VideoAllocation{
availableLayers: f.availableLayers,
exemptedLayers: f.exemptedLayers,
bitrates: brs,
targetLayers: InvalidLayers,
}
switch {
case f.muted:
state = VideoAllocationStateMuted
alloc.state = VideoAllocationStateMuted
case len(f.availableLayers) == 0:
// feed is dry
state = VideoAllocationStateFeedDry
alloc.state = VideoAllocationStateFeedDry
case !f.bitrateAvailable(brs):
// feed bitrate not yet calculated for all available layers
state = VideoAllocationStateAwaitingMeasurement
alloc.state = VideoAllocationStateAwaitingMeasurement
//
// Resume with the highest layer available <= max subscribed layer
// If already resumed, move allocation to the highest available layer <= max subscribed layer
//
targetLayers.Spatial = int32(math.Min(float64(f.maxLayers.Spatial), float64(f.availableLayers[len(f.availableLayers)-1])))
targetLayers.Temporal = int32(math.Max(0, float64(f.maxLayers.Temporal)))
if f.targetLayers == InvalidLayers && targetLayers.IsValid() {
change = VideoStreamingChangeResuming
alloc.targetLayers = VideoLayers{
Spatial: int32(math.Min(float64(f.maxLayers.Spatial), float64(f.availableLayers[len(f.availableLayers)-1]))),
Temporal: int32(math.Max(0, float64(f.maxLayers.Temporal))),
}
default:
// allocate best layer available
for s := f.maxLayers.Spatial; s >= 0; s-- {
@@ -576,22 +583,22 @@ func (f *Forwarder) AllocateOptimal(brs Bitrates, allowOvershoot bool) VideoAllo
continue
}
targetLayers = VideoLayers{
alloc.targetLayers = VideoLayers{
Spatial: s,
Temporal: t,
}
bandwidthRequested = brs[s][t]
state = VideoAllocationStateOptimal
alloc.bandwidthRequested = brs[s][t]
alloc.state = VideoAllocationStateOptimal
break
}
if bandwidthRequested != 0 {
if alloc.bandwidthRequested != 0 {
break
}
}
if bandwidthRequested == 0 && f.maxLayers.IsValid() && allowOvershoot {
if alloc.bandwidthRequested == 0 && f.maxLayers.IsValid() && allowOvershoot {
// if we cannot allocate anything below max layer,
// look for a layer above. It is okay to overshoot
// in optimal allocation (i.e. no bandwidth restrictions).
@@ -604,24 +611,24 @@ func (f *Forwarder) AllocateOptimal(brs Bitrates, allowOvershoot bool) VideoAllo
continue
}
targetLayers = VideoLayers{
alloc.targetLayers = VideoLayers{
Spatial: s,
Temporal: t,
}
bandwidthRequested = brs[s][t]
state = VideoAllocationStateOptimal
f.logger.Infow("allowing overshoot", "maxLayer", f.maxLayers, "targetLayers", targetLayers)
alloc.bandwidthRequested = brs[s][t]
alloc.state = VideoAllocationStateOptimal
f.logger.Infow("allowing overshoot", "maxLayer", f.maxLayers, "targetLayers", alloc.targetLayers)
break
}
if bandwidthRequested != 0 {
if alloc.bandwidthRequested != 0 {
break
}
}
}
if bandwidthRequested == 0 && f.maxLayers.IsValid() {
if alloc.bandwidthRequested == 0 && f.maxLayers.IsValid() {
// if overshoot was allowed and it did not also find a layer,
// keep target at exempted layer (if available) and the current layer is at that level.
// i. e. exempted layer may really have stopped, so a layer switch to an exempted layer should
@@ -631,53 +638,27 @@ func (f *Forwarder) AllocateOptimal(brs Bitrates, allowOvershoot bool) VideoAllo
if f.currentLayers.IsValid() {
for _, s := range f.exemptedLayers {
if s <= f.maxLayers.Spatial && f.currentLayers.Spatial == s {
targetLayers = f.currentLayers
bandwidthRequested = brs[targetLayers.Spatial][targetLayers.Temporal]
state = VideoAllocationStateDeficient
alloc.targetLayers = f.currentLayers
alloc.bandwidthRequested = brs[alloc.targetLayers.Spatial][alloc.targetLayers.Temporal]
alloc.state = VideoAllocationStateDeficient
break
}
}
}
}
if f.targetLayers == InvalidLayers && targetLayers.IsValid() {
change = VideoStreamingChangeResuming
} else if f.targetLayers != InvalidLayers && !targetLayers.IsValid() {
change = VideoStreamingChangePausing
}
if !targetLayers.IsValid() && f.maxLayers.IsValid() {
state = VideoAllocationStateDeficient
if !alloc.targetLayers.IsValid() && f.maxLayers.IsValid() {
alloc.state = VideoAllocationStateDeficient
}
}
if !targetLayers.IsValid() {
targetLayers = InvalidLayers
}
f.lastAllocation = VideoAllocation{
state: state,
change: change,
bandwidthRequested: bandwidthRequested,
bandwidthDelta: bandwidthRequested - f.lastAllocation.bandwidthRequested,
bitrates: brs,
targetLayers: targetLayers,
distanceToDesired: f.getDistanceToDesired(brs, targetLayers, f.maxLayers),
}
if len(f.availableLayers) > 0 {
f.lastAllocation.availableLayers = make([]int32, len(f.availableLayers))
copy(f.lastAllocation.availableLayers, f.availableLayers)
}
if len(f.exemptedLayers) > 0 {
f.lastAllocation.exemptedLayers = make([]int32, len(f.exemptedLayers))
copy(f.lastAllocation.exemptedLayers, f.exemptedLayers)
if !alloc.targetLayers.IsValid() {
alloc.targetLayers = InvalidLayers
}
alloc.bandwidthDelta = alloc.bandwidthRequested - f.lastAllocation.bandwidthRequested
alloc.distanceToDesired = f.getDistanceToDesired(brs, alloc.targetLayers, f.maxLayers)
f.setTargetLayers(f.lastAllocation.targetLayers)
if f.targetLayers == InvalidLayers {
f.resyncLocked()
}
return f.lastAllocation
return f.updateAllocation(alloc, "optimal")
}
func (f *Forwarder) ProvisionalAllocatePrepare(bitrates Bitrates) {
@@ -1006,65 +987,47 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation {
f.lock.Lock()
defer f.lock.Unlock()
state := VideoAllocationStateNone
change := VideoStreamingChangeNone
bandwidthRequested := int64(0)
alloc := VideoAllocation{
bandwidthRequested: 0,
bandwidthDelta: -f.lastAllocation.bandwidthRequested,
availableLayers: f.provisional.availableLayers,
exemptedLayers: f.provisional.exemptedLayers,
bitrates: f.provisional.bitrates,
targetLayers: f.provisional.allocatedLayers,
distanceToDesired: f.getDistanceToDesired(f.provisional.bitrates, f.provisional.allocatedLayers, f.provisional.maxLayers),
}
switch {
case f.provisional.muted:
state = VideoAllocationStateMuted
alloc.state = VideoAllocationStateMuted
case len(f.provisional.availableLayers) == 0:
// feed is dry
state = VideoAllocationStateFeedDry
case f.provisional.allocatedLayers == InvalidLayers:
state = VideoAllocationStateDeficient
alloc.state = VideoAllocationStateFeedDry
case f.provisional.allocatedLayers == InvalidLayers:
alloc.state = VideoAllocationStateDeficient
if f.targetLayers != InvalidLayers {
change = VideoStreamingChangePausing
}
default:
optimalBandwidthNeeded := f.getOptimalBandwidthNeeded(f.provisional.bitrates, f.provisional.maxLayers)
bandwidthRequested = f.provisional.bitrates[f.provisional.allocatedLayers.Spatial][f.provisional.allocatedLayers.Temporal]
bandwidthRequested := f.provisional.bitrates[f.provisional.allocatedLayers.Spatial][f.provisional.allocatedLayers.Temporal]
alloc.bandwidthRequested = bandwidthRequested
alloc.bandwidthDelta = bandwidthRequested - f.lastAllocation.bandwidthRequested
if f.provisional.allocatedLayers.GreaterThan(f.provisional.maxLayers) || (optimalBandwidthNeeded > 0 && bandwidthRequested >= optimalBandwidthNeeded) {
// could be greater than optimal if overshooting
state = VideoAllocationStateOptimal
alloc.state = VideoAllocationStateOptimal
} else {
//
// Optimal bandwidth could be 0 if using exempted layer.
// Exempted layer is still treated as deficient.
//
state = VideoAllocationStateDeficient
}
if f.targetLayers == InvalidLayers {
change = VideoStreamingChangeResuming
alloc.state = VideoAllocationStateDeficient
}
}
f.lastAllocation = VideoAllocation{
state: state,
change: change,
bandwidthRequested: bandwidthRequested,
bandwidthDelta: bandwidthRequested - f.lastAllocation.bandwidthRequested,
bitrates: f.provisional.bitrates,
targetLayers: f.provisional.allocatedLayers,
distanceToDesired: f.getDistanceToDesired(f.provisional.bitrates, f.provisional.allocatedLayers, f.provisional.maxLayers),
}
if len(f.provisional.availableLayers) > 0 {
f.lastAllocation.availableLayers = make([]int32, len(f.provisional.availableLayers))
copy(f.lastAllocation.availableLayers, f.provisional.availableLayers)
}
if len(f.provisional.exemptedLayers) > 0 {
f.lastAllocation.exemptedLayers = make([]int32, len(f.provisional.exemptedLayers))
copy(f.lastAllocation.exemptedLayers, f.provisional.exemptedLayers)
}
f.setTargetLayers(f.lastAllocation.targetLayers)
if f.targetLayers == InvalidLayers {
f.resyncLocked()
}
return f.lastAllocation
return f.updateAllocation(alloc, "cooperative")
}
func (f *Forwarder) AllocateNextHigher(availableChannelCapacity int64, brs Bitrates, allowOvershoot bool) (VideoAllocation, bool) {
@@ -1113,36 +1076,21 @@ func (f *Forwarder) AllocateNextHigher(availableChannelCapacity int64, brs Bitra
}
targetLayers := VideoLayers{Spatial: s, Temporal: t}
state := VideoAllocationStateDeficient
if targetLayers.GreaterThan(f.maxLayers) || (optimalBandwidthNeeded > 0 && bandwidthRequested >= optimalBandwidthNeeded) {
state = VideoAllocationStateOptimal
}
change := VideoStreamingChangeNone
if f.targetLayers == InvalidLayers {
change = VideoStreamingChangeResuming
}
f.lastAllocation = VideoAllocation{
state: state,
change: change,
alloc := VideoAllocation{
state: VideoAllocationStateDeficient,
bandwidthRequested: bandwidthRequested,
bandwidthDelta: bandwidthRequested - alreadyAllocated,
availableLayers: f.availableLayers,
exemptedLayers: f.exemptedLayers,
bitrates: brs,
targetLayers: targetLayers,
distanceToDesired: f.getDistanceToDesired(brs, targetLayers, f.maxLayers),
}
if len(f.availableLayers) > 0 {
f.lastAllocation.availableLayers = make([]int32, len(f.availableLayers))
copy(f.lastAllocation.availableLayers, f.availableLayers)
}
if len(f.exemptedLayers) > 0 {
f.lastAllocation.exemptedLayers = make([]int32, len(f.exemptedLayers))
copy(f.lastAllocation.exemptedLayers, f.exemptedLayers)
if targetLayers.GreaterThan(f.maxLayers) || (optimalBandwidthNeeded > 0 && bandwidthRequested >= optimalBandwidthNeeded) {
alloc.state = VideoAllocationStateOptimal
}
f.setTargetLayers(f.lastAllocation.targetLayers)
return true, f.lastAllocation, true
return true, f.updateAllocation(alloc, "next-higher"), true
}
}
@@ -1275,40 +1223,51 @@ func (f *Forwarder) Pause(brs Bitrates) VideoAllocation {
f.lock.Lock()
defer f.lock.Unlock()
state := VideoAllocationStateNone
change := VideoStreamingChangeNone
switch {
case f.muted:
state = VideoAllocationStateMuted
case len(f.availableLayers) == 0:
// feed is dry
state = VideoAllocationStateFeedDry
default:
// feed bitrate is not yet calculated or pausing due to lack of bandwidth
state = VideoAllocationStateDeficient
if f.targetLayers != InvalidLayers {
change = VideoStreamingChangePausing
}
}
f.lastAllocation = VideoAllocation{
state: state,
change: change,
alloc := VideoAllocation{
bandwidthRequested: 0,
bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested,
availableLayers: f.availableLayers,
exemptedLayers: f.exemptedLayers,
bitrates: brs,
targetLayers: InvalidLayers,
distanceToDesired: f.getDistanceToDesired(brs, InvalidLayers, f.maxLayers),
}
if len(f.availableLayers) > 0 {
f.lastAllocation.availableLayers = make([]int32, len(f.availableLayers))
copy(f.lastAllocation.availableLayers, f.availableLayers)
switch {
case f.muted:
alloc.state = VideoAllocationStateMuted
case len(f.availableLayers) == 0:
// feed is dry
alloc.state = VideoAllocationStateFeedDry
default:
// feed bitrate is not yet calculated or pausing due to lack of bandwidth
alloc.state = VideoAllocationStateDeficient
}
if len(f.exemptedLayers) > 0 {
f.lastAllocation.exemptedLayers = make([]int32, len(f.exemptedLayers))
copy(f.lastAllocation.exemptedLayers, f.exemptedLayers)
return f.updateAllocation(alloc, "pause")
}
func (f *Forwarder) updateAllocation(alloc VideoAllocation, reason string) VideoAllocation {
if f.targetLayers == InvalidLayers && alloc.targetLayers.IsValid() {
alloc.change = VideoStreamingChangeResuming
} else if f.targetLayers != InvalidLayers && !alloc.targetLayers.IsValid() {
alloc.change = VideoStreamingChangePausing
}
if alloc.state != f.lastAllocation.state || alloc.targetLayers != f.lastAllocation.targetLayers {
f.logger.Infow(fmt.Sprintf("stream allocation: %s", reason), "allocation", alloc)
}
f.lastAllocation = alloc
if len(alloc.availableLayers) > 0 {
f.lastAllocation.availableLayers = make([]int32, len(alloc.availableLayers))
copy(f.lastAllocation.availableLayers, alloc.availableLayers)
}
if len(alloc.exemptedLayers) > 0 {
f.lastAllocation.exemptedLayers = make([]int32, len(alloc.exemptedLayers))
copy(f.lastAllocation.exemptedLayers, alloc.exemptedLayers)
}
f.setTargetLayers(f.lastAllocation.targetLayers)

View File

@@ -1153,7 +1153,7 @@ func TestForwarderPauseMute(t *testing.T) {
f.Mute(true)
expectedResult := VideoAllocation{
state: VideoAllocationStateMuted,
change: VideoStreamingChangeNone,
change: VideoStreamingChangePausing,
bandwidthRequested: 0,
bandwidthDelta: 0 - bitrates[0][0],
availableLayers: availableLayers,

View File

@@ -68,51 +68,51 @@ var (
}
)
type State int
type streamAllocatorState int
const (
StateStable State = iota
StateDeficient
streamAllocatorStateStable streamAllocatorState = iota
streamAllocatorStateDeficient
)
func (s State) String() string {
func (s streamAllocatorState) String() string {
switch s {
case StateStable:
case streamAllocatorStateStable:
return "STABLE"
case StateDeficient:
case streamAllocatorStateDeficient:
return "DEFICIENT"
default:
return fmt.Sprintf("%d", int(s))
}
}
type Signal int
type streamAllocatorSignal int
const (
SignalAllocateTrack Signal = iota
SignalAllocateAllTracks
SignalAdjustState
SignalEstimate
SignalPeriodicPing
SignalSendProbe
SignalProbeClusterDone
streamAllocatorSignalAllocateTrack streamAllocatorSignal = iota
streamAllocatorSignalAllocateAllTracks
streamAllocatorSignalAdjustState
streamAllocatorSignalEstimate
streamAllocatorSignalPeriodicPing
streamAllocatorSignalSendProbe
streamAllocatorSignalProbeClusterDone
)
func (s Signal) String() string {
func (s streamAllocatorSignal) String() string {
switch s {
case SignalAllocateTrack:
case streamAllocatorSignalAllocateTrack:
return "ALLOCATE_TRACK"
case SignalAllocateAllTracks:
case streamAllocatorSignalAllocateAllTracks:
return "ALLOCATE_ALL_TRACKS"
case SignalAdjustState:
case streamAllocatorSignalAdjustState:
return "ADJUST_STATE"
case SignalEstimate:
case streamAllocatorSignalEstimate:
return "ESTIMATE"
case SignalPeriodicPing:
case streamAllocatorSignalPeriodicPing:
return "PERIODIC_PING"
case SignalSendProbe:
case streamAllocatorSignalSendProbe:
return "SEND_PROBE"
case SignalProbeClusterDone:
case streamAllocatorSignalProbeClusterDone:
return "PROBE_CLUSTER_DONE"
default:
return fmt.Sprintf("%d", int(s))
@@ -120,7 +120,7 @@ func (s Signal) String() string {
}
type Event struct {
Signal Signal
Signal streamAllocatorSignal
TrackID livekit.TrackID
Data interface{}
}
@@ -161,7 +161,7 @@ type StreamAllocator struct {
isAllocateAllPending bool
rembTrackingSSRC uint32
state State
state streamAllocatorState
eventChMu sync.RWMutex
eventCh chan Event
@@ -253,7 +253,7 @@ func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack) {
// LK-TODO: use any saved bandwidth to re-distribute
s.postEvent(Event{
Signal: SignalAdjustState,
Signal: streamAllocatorSignalAdjustState,
})
}
@@ -265,7 +265,7 @@ func (s *StreamAllocator) SetTrackPriority(downTrack *DownTrack, priority uint8)
// do a full allocation on a track priority change to keep it simple
s.isAllocateAllPending = true
s.postEvent(Event{
Signal: SignalAllocateAllTracks,
Signal: streamAllocatorSignalAllocateAllTracks,
})
}
}
@@ -276,7 +276,7 @@ func (s *StreamAllocator) resetState() {
s.channelObserver = s.newChannelObserverNonProbe()
s.resetProbe()
s.state = StateStable
s.state = streamAllocatorStateStable
}
// called when a new REMB is received (receive side bandwidth estimation)
@@ -326,7 +326,7 @@ func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstima
}
if !found {
if len(remb.SSRCs) == 0 {
s.params.Logger.Warnw("no SSRC to track REMB", nil)
s.params.Logger.Warnw("stream allocator: no SSRC to track REMB", nil)
s.videoTracksMu.Unlock()
return
}
@@ -354,7 +354,7 @@ func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstima
s.videoTracksMu.Unlock()
s.postEvent(Event{
Signal: SignalEstimate,
Signal: streamAllocatorSignalEstimate,
Data: int64(remb.Bitrate),
})
}
@@ -369,7 +369,7 @@ func (s *StreamAllocator) onTransportCCFeedback(downTrack *DownTrack, fb *rtcp.T
// called when target bitrate changes (send side bandwidth estimation)
func (s *StreamAllocator) onTargetBitrateChange(bitrate int) {
s.postEvent(Event{
Signal: SignalEstimate,
Signal: streamAllocatorSignalEstimate,
Data: int64(bitrate),
})
}
@@ -402,7 +402,7 @@ func (s *StreamAllocator) onSubscribedLayersChanged(downTrack *DownTrack, layers
if shouldPost {
s.postEvent(Event{
Signal: SignalAllocateTrack,
Signal: streamAllocatorSignalAllocateTrack,
TrackID: livekit.TrackID(downTrack.ID()),
})
}
@@ -416,7 +416,7 @@ func (s *StreamAllocator) onPacketSent(downTrack *DownTrack, size int) {
// called when prober wants to send packet(s)
func (s *StreamAllocator) onSendProbe(bytesToSend int) {
s.postEvent(Event{
Signal: SignalSendProbe,
Signal: streamAllocatorSignalSendProbe,
Data: bytesToSend,
})
}
@@ -424,7 +424,7 @@ func (s *StreamAllocator) onSendProbe(bytesToSend int) {
// called when prober wants to send packet(s)
func (s *StreamAllocator) onProbeClusterDone(info ProbeClusterInfo) {
s.postEvent(Event{
Signal: SignalProbeClusterDone,
Signal: streamAllocatorSignalProbeClusterDone,
Data: info,
})
}
@@ -441,7 +441,7 @@ func (s *StreamAllocator) maybePostEventAllocateTrack(downTrack *DownTrack) {
if shouldPost {
s.postEvent(Event{
Signal: SignalAllocateTrack,
Signal: streamAllocatorSignalAllocateTrack,
TrackID: livekit.TrackID(downTrack.ID()),
})
}
@@ -457,7 +457,7 @@ func (s *StreamAllocator) postEvent(event Event) {
select {
case s.eventCh <- event:
default:
s.params.Logger.Warnw("event queue full", nil)
s.params.Logger.Warnw("stream allocator: event queue full", nil)
}
s.eventChMu.RUnlock()
}
@@ -479,26 +479,26 @@ func (s *StreamAllocator) ping() {
}
s.postEvent(Event{
Signal: SignalPeriodicPing,
Signal: streamAllocatorSignalPeriodicPing,
})
}
}
func (s *StreamAllocator) handleEvent(event *Event) {
switch event.Signal {
case SignalAllocateTrack:
case streamAllocatorSignalAllocateTrack:
s.handleSignalAllocateTrack(event)
case SignalAllocateAllTracks:
case streamAllocatorSignalAllocateAllTracks:
s.handleSignalAllocateAllTracks(event)
case SignalAdjustState:
case streamAllocatorSignalAdjustState:
s.handleSignalAdjustState(event)
case SignalEstimate:
case streamAllocatorSignalEstimate:
s.handleSignalEstimate(event)
case SignalPeriodicPing:
case streamAllocatorSignalPeriodicPing:
s.handleSignalPeriodicPing(event)
case SignalSendProbe:
case streamAllocatorSignalSendProbe:
s.handleSignalSendProbe(event)
case SignalProbeClusterDone:
case streamAllocatorSignalProbeClusterDone:
s.handleSignalProbeClusterDone(event)
}
}
@@ -521,7 +521,7 @@ func (s *StreamAllocator) handleSignalAllocateAllTracks(event *Event) {
s.isAllocateAllPending = false
s.videoTracksMu.Unlock()
if s.state == StateDeficient {
if s.state == streamAllocatorStateDeficient {
s.allocateAllTracks()
}
}
@@ -549,7 +549,7 @@ func (s *StreamAllocator) handleSignalPeriodicPing(event *Event) {
}
// probe if necessary and timing is right
if s.state == StateDeficient {
if s.state == streamAllocatorStateDeficient {
s.maybeProbe()
}
}
@@ -599,12 +599,12 @@ func (s *StreamAllocator) handleSignalProbeClusterDone(event *Event) {
s.probeEndTime = s.lastProbeStartTime.Add(queueWait)
}
func (s *StreamAllocator) setState(state State) {
func (s *StreamAllocator) setState(state streamAllocatorState) {
if s.state == state {
return
}
s.params.Logger.Debugw("state change", "from", s.state, "to", state)
s.params.Logger.Infow("stream allocator: state change", "from", s.state, "to", state)
s.state = state
// reset probe to enforce a delay after state change before probing
@@ -614,12 +614,12 @@ func (s *StreamAllocator) setState(state State) {
func (s *StreamAllocator) adjustState() {
for _, track := range s.getTracks() {
if track.IsDeficient() {
s.setState(StateDeficient)
s.setState(streamAllocatorStateDeficient)
return
}
}
s.setState(StateStable)
s.setState(streamAllocatorStateStable)
}
func (s *StreamAllocator) handleNewEstimateInProbe() {
@@ -646,16 +646,18 @@ func (s *StreamAllocator) handleNewEstimateInProbe() {
// In rare cases, the estimate gets stuck. Prevent from probe running amok
// LK-TODO: Need more testing here to ensure that probe does not cause a lot of damage
//
s.params.Logger.Debugw("probe: aborting, no trend", "cluster", s.probeClusterId)
s.params.Logger.Infow("stream allocator: probe: aborting, no trend", "cluster", s.probeClusterId)
s.abortProbe()
case trend == ChannelTrendCongesting:
// stop immediately if the probe is congesting channel more
s.params.Logger.Debugw("probe: aborting, channel is congesting", "cluster", s.probeClusterId)
s.params.Logger.Infow("stream allocator: probe: aborting, channel is congesting", "cluster", s.probeClusterId)
s.abortProbe()
case s.channelObserver.GetHighestEstimate() > s.probeGoalBps:
// reached goal, stop probing
s.params.Logger.Debugw(
"probe: stopping, goal reached",
s.params.Logger.Infow(
"stream allocator: probe: stopping, goal reached",
"cluster", s.probeClusterId,
"goal", s.probeGoalBps,
"highest", s.channelObserver.GetHighestEstimate(),
@@ -688,7 +690,7 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() {
}
s.params.Logger.Infow(
"channel congestion detected, updating channel capacity",
"stream allocator: channel congestion detected, updating channel capacity",
"reason", reason,
"old(bps)", s.committedChannelCapacity,
"new(bps)", estimateToCommit,
@@ -714,7 +716,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
s.abortProbe()
// if not deficient, free pass allocate track
if !s.params.Config.Enabled || s.state == StateStable || !track.IsManaged() {
if !s.params.Config.Enabled || s.state == streamAllocatorStateStable || !track.IsManaged() {
update := NewStreamStateUpdate()
allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal)
update.HandleStreamingChange(allocation.change, track)
@@ -844,7 +846,7 @@ func (s *StreamAllocator) maybeBoostDeficientTracks() {
if s.params.Config.MinChannelCapacity > committedChannelCapacity {
committedChannelCapacity = s.params.Config.MinChannelCapacity
s.params.Logger.Debugw(
"overriding channel capacity",
"stream allocator: overriding channel capacity",
"actual", s.committedChannelCapacity,
"override", committedChannelCapacity,
)
@@ -899,7 +901,7 @@ func (s *StreamAllocator) allocateAllTracks() {
if s.params.Config.MinChannelCapacity > availableChannelCapacity {
availableChannelCapacity = s.params.Config.MinChannelCapacity
s.params.Logger.Debugw(
"overriding channel capacity",
"stream allocator: overriding channel capacity",
"actual", s.committedChannelCapacity,
"override", availableChannelCapacity,
)
@@ -1034,8 +1036,8 @@ func (s *StreamAllocator) initProbe(probeRateBps int64) {
ProbeMinDuration,
ProbeMaxDuration,
)
s.params.Logger.Debugw(
"starting probe",
s.params.Logger.Infow(
"stream allocator: starting probe",
"probeClusterId", s.probeClusterId,
"current usage", expectedBandwidthUsage,
"committed", s.committedChannelCapacity,
@@ -1613,7 +1615,7 @@ func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) {
switch {
case estimateDirection == TrendDirectionDownward:
c.logger.Debugw(
"channel observer: estimate is trending downward",
"stream allocator: channel observer: estimate is trending downward",
"name", c.params.Name,
"estimate", c.estimateTrend.ToString(),
"packets", packets,
@@ -1623,7 +1625,7 @@ func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) {
return ChannelTrendCongesting, ChannelCongestionReasonEstimate
case c.params.NackWindowMinDuration != 0 && !c.nackWindowStartTime.IsZero() && time.Since(c.nackWindowStartTime) > c.params.NackWindowMinDuration && nackRatio > c.params.NackRatioThreshold:
c.logger.Debugw(
"channel observer: high rate of repeated NACKs",
"stream allocator: channel observer: high rate of repeated NACKs",
"name", c.params.Name,
"estimate", c.estimateTrend.ToString(),
"packets", packets,

View File

@@ -379,7 +379,7 @@ func (s *StreamTrackerManager) addAvailableLayer(layer int32) {
exemptedLayers = append(exemptedLayers, s.exemptedLayers...)
s.lock.Unlock()
s.logger.Debugw(
s.logger.Infow(
"available layers changed - layer seen",
"added", layer,
"availableLayers", availableLayers,
@@ -461,7 +461,7 @@ func (s *StreamTrackerManager) removeAvailableLayer(layer int32) {
}
s.lock.Unlock()
s.logger.Debugw(
s.logger.Infow(
"available layers changed - layer gone",
"removed", layer,
"availableLayers", newLayers,