From c6ee34d083e80591be3914c1928194e2a65b1226 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 10 Apr 2024 13:31:25 +0530 Subject: [PATCH] Cleaning up stream allocator data. (#2639) * Cleaning up stream allocator data. Marking it with STREAM-ALLOCATOR-DATA for easier use later if needed. * clean up a bit more * wire_gen * wire_gen --- pkg/service/wire_gen.go | 2 +- pkg/sfu/downtrack.go | 18 ++++++++- pkg/sfu/streamallocator/channelobserver.go | 2 + pkg/sfu/streamallocator/nacktracker.go | 10 +++-- pkg/sfu/streamallocator/streamallocator.go | 46 ++++++++++++++-------- pkg/sfu/streamallocator/track.go | 24 +++++------ 6 files changed, 68 insertions(+), 34 deletions(-) diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 80b8bd8c0..08d493b13 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,6 +1,6 @@ // Code generated by Wire. DO NOT EDIT. -//go:generate go run github.com/google/wire/cmd/wire +//go:generate go run -mod=mod github.com/google/wire/cmd/wire //go:build !wireinject // +build !wireinject diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index d02e9de4d..7e9081901 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -144,11 +144,13 @@ func (d DownTrackState) String() string { // ------------------------------------------------------------------- +/* STREAM-ALLOCATOR-DATA type NackInfo struct { Timestamp uint32 SequenceNumber uint16 Attempts uint8 } +*/ type DownTrackStreamAllocatorListener interface { // RTCP received @@ -179,11 +181,13 @@ type DownTrackStreamAllocatorListener interface { // packet(s) sent OnPacketsSent(dt *DownTrack, size int) + /* STREAM-ALLOCATOR-DATA // NACKs received OnNACK(dt *DownTrack, nackInfos []NackInfo) // RTCP Receiver Report received OnRTCPReceiverReport(dt *DownTrack, rr rtcp.ReceptionReport) + */ // check if track should participate in BWE IsBWEEnabled(dt *DownTrack) bool @@ -266,8 +270,10 @@ type DownTrack struct { streamAllocatorListener DownTrackStreamAllocatorListener streamAllocatorReportGeneration int streamAllocatorBytesCounter atomic.Uint32 + /* STREAM-ALLOCATOR-DATA bytesSent atomic.Uint32 bytesRetransmitted atomic.Uint32 + */ playoutDelay *PlayoutDelayController @@ -1538,9 +1544,11 @@ func (d *DownTrack) handleRTCP(bytes []byte) { rttToReport = rtt } + /* STREAM-ALLOCATOR-DATA if sal := d.getStreamAllocatorListener(); sal != nil { sal.OnRTCPReceiverReport(d, r) } + */ if d.playoutDelay != nil { jitterMs := uint64(r.Jitter*1e3) / uint64(d.codec.ClockRate) @@ -1655,18 +1663,20 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { nackAcks := uint32(0) nackMisses := uint32(0) numRepeatedNACKs := uint32(0) - nackInfos := make([]NackInfo, 0, len(filtered)) + // STREAM-ALLOCATOR-DATA nackInfos := make([]NackInfo, 0, len(filtered)) for _, epm := range d.sequencer.getExtPacketMetas(filtered) { if disallowedLayers[epm.layer] { continue } nackAcks++ + /* STREAM-ALLOCATOR-DATA nackInfos = append(nackInfos, NackInfo{ SequenceNumber: epm.targetSeqNo, Timestamp: epm.timestamp, Attempts: epm.nacked, }) + */ pktBuff := *src n, err := d.params.Receiver.ReadRTP(pktBuff, uint8(epm.layer), epm.sourceSeqNo) @@ -1738,6 +1748,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { d.totalRepeatedNACKs.Add(numRepeatedNACKs) d.rtpStats.UpdateNackProcessed(nackAcks, nackMisses, numRepeatedNACKs) + /* STREAM-ALLOCATOR-DATA // STREAM-ALLOCATOR-EXPERIMENTAL-TODO-START // Need to check on the following // - get all NACKs from sequencer even if SFU is not acknowledging, @@ -1753,6 +1764,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { if sal := d.getStreamAllocatorListener(); sal != nil && len(nackInfos) != 0 { sal.OnNACK(d, nackInfos) } + */ } func (d *DownTrack) getTranslatedRTPHeader(extPkt *buffer.ExtPacket, tp *TranslationParams) (*rtp.Header, error) { @@ -1841,9 +1853,11 @@ func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint return } +/* STREAM-ALLOCATOR-DATA func (d *DownTrack) GetAndResetBytesSent() (uint32, uint32) { return d.bytesSent.Swap(0), d.bytesRetransmitted.Swap(0) } +*/ func (d *DownTrack) onBindAndConnectedChange() { d.writable.Store(d.connected.Load() && d.bound.Load()) @@ -1981,11 +1995,13 @@ func (d *DownTrack) sendingPacket(hdr *rtp.Header, payloadSize int, spmd *sendPa // STREAM-ALLOCATOR-TODO: remove this stream allocator bytes counter once stream allocator changes fully to pull bytes counter size := uint32(hdrSize + payloadSize) d.streamAllocatorBytesCounter.Add(size) + /* STREAM-ALLOCATOR-DATA if spmd.isRTX { d.bytesRetransmitted.Add(size) } else { d.bytesSent.Add(size) } + */ } // update RTPStats diff --git a/pkg/sfu/streamallocator/channelobserver.go b/pkg/sfu/streamallocator/channelobserver.go index 9afab9c49..585484408 100644 --- a/pkg/sfu/streamallocator/channelobserver.go +++ b/pkg/sfu/streamallocator/channelobserver.go @@ -134,9 +134,11 @@ func (c *ChannelObserver) GetNackRatio() float64 { return c.nackTracker.GetRatio() } +/* STREAM-ALLOCATOR-DATA func (c *ChannelObserver) GetNackHistory() []string { return c.nackTracker.GetHistory() } +*/ func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) { estimateDirection := c.estimateTrend.GetDirection() diff --git a/pkg/sfu/streamallocator/nacktracker.go b/pkg/sfu/streamallocator/nacktracker.go index b353781e5..c7131a01b 100644 --- a/pkg/sfu/streamallocator/nacktracker.go +++ b/pkg/sfu/streamallocator/nacktracker.go @@ -38,20 +38,22 @@ type NackTracker struct { packets uint32 repeatedNacks uint32 + /* STREAM-ALLOCATOR-DATA // STREAM-ALLOCATOR-EXPERIMENTAL-TODO: remove when cleaning up experimental stuff history []string + */ } func NewNackTracker(params NackTrackerParams) *NackTracker { return &NackTracker{ - params: params, - history: make([]string, 0, 10), + params: params, + // STREAM-ALLOCATOR-DATA history: make([]string, 0, 10), } } func (n *NackTracker) Add(packets uint32, repeatedNacks uint32) { if n.params.WindowMaxDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.WindowMaxDuration { - n.updateHistory() + // STREAM-ALLOCATOR-DATA n.updateHistory() n.windowStartTime = time.Time{} n.packets = 0 @@ -104,6 +106,7 @@ func (n *NackTracker) ToString() string { return fmt.Sprintf("n: %s, %s, p: %d, rn: %d, rn/p: %.2f", n.params.Name, window, n.packets, n.repeatedNacks, n.GetRatio()) } +/* STREAM-ALLOCATOR-DATA func (n *NackTracker) GetHistory() []string { return n.history } @@ -115,5 +118,6 @@ func (n *NackTracker) updateHistory() { n.history = append(n.history, n.ToString()) } +*/ // ------------------------------------------------ diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 08ecd3421..3bff2aca5 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -85,8 +85,8 @@ const ( streamAllocatorSignalResume streamAllocatorSignalSetAllowPause streamAllocatorSignalSetChannelCapacity - streamAllocatorSignalNACK - streamAllocatorSignalRTCPReceiverReport + // STREAM-ALLOCATOR-DATA streamAllocatorSignalNACK + // STREAM-ALLOCATOR-DATA streamAllocatorSignalRTCPReceiverReport ) func (s streamAllocatorSignal) String() string { @@ -111,10 +111,12 @@ func (s streamAllocatorSignal) String() string { return "SET_ALLOW_PAUSE" case streamAllocatorSignalSetChannelCapacity: return "SET_CHANNEL_CAPACITY" - case streamAllocatorSignalNACK: - return "NACK" - case streamAllocatorSignalRTCPReceiverReport: - return "RTCP_RECEIVER_REPORT" + /* STREAM-ALLOCATOR-DATA + case streamAllocatorSignalNACK: + return "NACK" + case streamAllocatorSignalRTCPReceiverReport: + return "RTCP_RECEIVER_REPORT" + */ default: return fmt.Sprintf("%d", int(s)) } @@ -157,7 +159,7 @@ type StreamAllocator struct { prober *Prober channelObserver *ChannelObserver - // rateMonitor *RateMonitor + // STREAM-ALLOCATOR-DATA rateMonitor *RateMonitor videoTracksMu sync.RWMutex videoTracks map[livekit.TrackID]*Track @@ -178,7 +180,7 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { prober: NewProber(ProberParams{ Logger: params.Logger, }), - // rateMonitor: NewRateMonitor(), + // STREAM-ALLOCATOR-DATA rateMonitor: NewRateMonitor(), videoTracks: make(map[livekit.TrackID]*Track), eventsQueue: utils.NewOpsQueue(utils.OpsQueueParams{ Name: "stream-allocator", @@ -461,6 +463,7 @@ func (s *StreamAllocator) OnPacketsSent(downTrack *sfu.DownTrack, size int) { s.prober.PacketsSent(size) } +/* STREAM-ALLOCATOR-DATA // called by a video DownTrack when it processes NACKs func (s *StreamAllocator) OnNACK(downTrack *sfu.DownTrack, nackInfos []sfu.NackInfo) { s.postEvent(Event{ @@ -479,6 +482,7 @@ func (s *StreamAllocator) OnRTCPReceiverReport(downTrack *sfu.DownTrack, rr rtcp Data: rr, }) } +*/ // called when prober wants to send packet(s) func (s *StreamAllocator) OnSendProbe(bytesToSend int) { @@ -598,10 +602,12 @@ func (s *StreamAllocator) handleEvent(event *Event) { s.handleSignalSetAllowPause(event) case streamAllocatorSignalSetChannelCapacity: s.handleSignalSetChannelCapacity(event) - case streamAllocatorSignalNACK: - s.handleSignalNACK(event) - case streamAllocatorSignalRTCPReceiverReport: - s.handleSignalRTCPReceiverReport(event) + /* STREAM-ALLOCATOR-DATA + case streamAllocatorSignalNACK: + s.handleSignalNACK(event) + case streamAllocatorSignalRTCPReceiverReport: + s.handleSignalRTCPReceiverReport(event) + */ } } @@ -635,7 +641,7 @@ func (s *StreamAllocator) handleSignalAdjustState(event *Event) { func (s *StreamAllocator) handleSignalEstimate(event *Event) { receivedEstimate, _ := event.Data.(int64) s.lastReceivedEstimate = receivedEstimate - s.monitorRate(receivedEstimate) + // s.monitorRate(receivedEstimate) // while probing, maintain estimate separately to enable keeping current committed estimate if probe fails if s.probeController.IsInProbe() { @@ -662,7 +668,7 @@ func (s *StreamAllocator) handleSignalPeriodicPing(event *Event) { s.maybeProbe() } - s.updateTracksHistory() + // s.updateTracksHistory() } func (s *StreamAllocator) handleSignalSendProbe(event *Event) { @@ -719,6 +725,7 @@ func (s *StreamAllocator) handleSignalSetChannelCapacity(event *Event) { } } +/* STREAM-ALLOCATOR-DATA func (s *StreamAllocator) handleSignalNACK(event *Event) { nackInfos := event.Data.([]sfu.NackInfo) @@ -742,6 +749,7 @@ func (s *StreamAllocator) handleSignalRTCPReceiverReport(event *Event) { track.ProcessRTCPReceiverReport(rr) } } +*/ func (s *StreamAllocator) setState(state streamAllocatorState) { if s.state == state { @@ -823,13 +831,15 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() { "commitThreshold(bps)", commitThreshold, "channel", s.channelObserver.ToString(), ) + /* STREAM-ALLOCATOR-DATA s.params.Logger.Debugw( fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity: experimental", action), - // "rateHistory", s.rateMonitor.GetHistory(), - // "expectedQueuing", s.rateMonitor.GetQueuingGuess(), + "rateHistory", s.rateMonitor.GetHistory(), + "expectedQueuing", s.rateMonitor.GetQueuingGuess(), "nackHistory", s.channelObserver.GetNackHistory(), "trackHistory", s.getTracksHistory(), ) + */ if estimateToCommit > commitThreshold { // estimate to commit is either higher or within tolerance of expected uage, skip committing and re-allocating return @@ -1407,6 +1417,7 @@ func (s *StreamAllocator) getMaxDistanceSortedDeficient() MaxDistanceSorter { return maxDistanceSorter } +/* STREAM-ALLOCATOR-DATA // STREAM-ALLOCATOR-EXPERIMENTAL-TODO // Monitor sent rate vs estimate to figure out queuing on congestion. // Idea here is to pause all managed tracks on congestion detection immediately till queue drains. @@ -1431,7 +1442,7 @@ func (s *StreamAllocator) monitorRate(estimate int64) { } } - // s.rateMonitor.Update(estimate, managedBytesSent, managedBytesRetransmitted, unmanagedBytesSent, unmanagedBytesRetransmitted) + s.rateMonitor.Update(estimate, managedBytesSent, managedBytesRetransmitted, unmanagedBytesSent, unmanagedBytesRetransmitted) } func (s *StreamAllocator) updateTracksHistory() { @@ -1449,6 +1460,7 @@ func (s *StreamAllocator) getTracksHistory() map[livekit.TrackID]string { return history } +*/ // ------------------------------------------------ diff --git a/pkg/sfu/streamallocator/track.go b/pkg/sfu/streamallocator/track.go index be93b9e4b..1a0669585 100644 --- a/pkg/sfu/streamallocator/track.go +++ b/pkg/sfu/streamallocator/track.go @@ -15,14 +15,8 @@ package streamallocator import ( - "fmt" - "sort" - "time" - - "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "github.com/pion/rtcp" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" @@ -41,6 +35,7 @@ type Track struct { totalPackets uint32 totalRepeatedNacks uint32 + /* STREAM-ALLOCATOR-DATA nackInfos map[uint16]sfu.NackInfo // STREAM-ALLOCATOR-EXPERIMENTAL-TODO: remove after experimental nackHistory []string @@ -53,6 +48,7 @@ type Track struct { maxRTT uint32 // STREAM-ALLOCATOR-EXPERIMENTAL-TODO: remove after experimental receiverReportHistory []string + */ isDirty bool @@ -67,15 +63,17 @@ func NewTrack( logger logger.Logger, ) *Track { t := &Track{ - downTrack: downTrack, - source: source, - isSimulcast: isSimulcast, - publisherID: publisherID, - logger: logger, + downTrack: downTrack, + source: source, + isSimulcast: isSimulcast, + publisherID: publisherID, + logger: logger, + /* STREAM-ALLOCATOR-DATA nackInfos: make(map[uint16]sfu.NackInfo), nackHistory: make([]string, 0, 10), receiverReportHistory: make([]string, 0, 10), - streamState: StreamStateInactive, + */ + streamState: StreamStateInactive, } t.SetPriority(0) t.SetMaxLayer(downTrack.MaxLayer()) @@ -220,6 +218,7 @@ func (t *Track) GetNackDelta() (uint32, uint32) { return packetDelta, nackDelta } +/* STREAM-ALLOCATOR-DATA func (t *Track) UpdateNack(nackInfos []sfu.NackInfo) { for _, ni := range nackInfos { t.nackInfos[ni.SequenceNumber] = ni @@ -363,6 +362,7 @@ func (t *Track) updateReceiverReportHistory() { fmt.Sprintf("t: %+v, l: %d, p: %d, rtt: %d", time.Now().Format(time.UnixDate), dl, dp, maxRTT), ) } +*/ // ------------------------------------------------