Cleaning up stream allocator data. (#2639)

* Cleaning up stream allocator data.

Marking it with STREAM-ALLOCATOR-DATA for easier use later if needed.

* clean up a bit more

* wire_gen

* wire_gen
This commit is contained in:
Raja Subramanian
2024-04-10 13:31:25 +05:30
committed by GitHub
parent 4b7e5dc1cc
commit c6ee34d083
6 changed files with 68 additions and 34 deletions
+1 -1
View File
@@ -1,6 +1,6 @@
// Code generated by Wire. DO NOT EDIT.
//go:generate go run github.com/google/wire/cmd/wire
//go:generate go run -mod=mod github.com/google/wire/cmd/wire
//go:build !wireinject
// +build !wireinject
+17 -1
View File
@@ -144,11 +144,13 @@ func (d DownTrackState) String() string {
// -------------------------------------------------------------------
/* STREAM-ALLOCATOR-DATA
type NackInfo struct {
Timestamp uint32
SequenceNumber uint16
Attempts uint8
}
*/
type DownTrackStreamAllocatorListener interface {
// RTCP received
@@ -179,11 +181,13 @@ type DownTrackStreamAllocatorListener interface {
// packet(s) sent
OnPacketsSent(dt *DownTrack, size int)
/* STREAM-ALLOCATOR-DATA
// NACKs received
OnNACK(dt *DownTrack, nackInfos []NackInfo)
// RTCP Receiver Report received
OnRTCPReceiverReport(dt *DownTrack, rr rtcp.ReceptionReport)
*/
// check if track should participate in BWE
IsBWEEnabled(dt *DownTrack) bool
@@ -266,8 +270,10 @@ type DownTrack struct {
streamAllocatorListener DownTrackStreamAllocatorListener
streamAllocatorReportGeneration int
streamAllocatorBytesCounter atomic.Uint32
/* STREAM-ALLOCATOR-DATA
bytesSent atomic.Uint32
bytesRetransmitted atomic.Uint32
*/
playoutDelay *PlayoutDelayController
@@ -1538,9 +1544,11 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
rttToReport = rtt
}
/* STREAM-ALLOCATOR-DATA
if sal := d.getStreamAllocatorListener(); sal != nil {
sal.OnRTCPReceiverReport(d, r)
}
*/
if d.playoutDelay != nil {
jitterMs := uint64(r.Jitter*1e3) / uint64(d.codec.ClockRate)
@@ -1655,18 +1663,20 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
nackAcks := uint32(0)
nackMisses := uint32(0)
numRepeatedNACKs := uint32(0)
nackInfos := make([]NackInfo, 0, len(filtered))
// STREAM-ALLOCATOR-DATA nackInfos := make([]NackInfo, 0, len(filtered))
for _, epm := range d.sequencer.getExtPacketMetas(filtered) {
if disallowedLayers[epm.layer] {
continue
}
nackAcks++
/* STREAM-ALLOCATOR-DATA
nackInfos = append(nackInfos, NackInfo{
SequenceNumber: epm.targetSeqNo,
Timestamp: epm.timestamp,
Attempts: epm.nacked,
})
*/
pktBuff := *src
n, err := d.params.Receiver.ReadRTP(pktBuff, uint8(epm.layer), epm.sourceSeqNo)
@@ -1738,6 +1748,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
d.totalRepeatedNACKs.Add(numRepeatedNACKs)
d.rtpStats.UpdateNackProcessed(nackAcks, nackMisses, numRepeatedNACKs)
/* STREAM-ALLOCATOR-DATA
// STREAM-ALLOCATOR-EXPERIMENTAL-TODO-START
// Need to check on the following
// - get all NACKs from sequencer even if SFU is not acknowledging,
@@ -1753,6 +1764,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
if sal := d.getStreamAllocatorListener(); sal != nil && len(nackInfos) != 0 {
sal.OnNACK(d, nackInfos)
}
*/
}
func (d *DownTrack) getTranslatedRTPHeader(extPkt *buffer.ExtPacket, tp *TranslationParams) (*rtp.Header, error) {
@@ -1841,9 +1853,11 @@ func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint
return
}
/* STREAM-ALLOCATOR-DATA
func (d *DownTrack) GetAndResetBytesSent() (uint32, uint32) {
return d.bytesSent.Swap(0), d.bytesRetransmitted.Swap(0)
}
*/
func (d *DownTrack) onBindAndConnectedChange() {
d.writable.Store(d.connected.Load() && d.bound.Load())
@@ -1981,11 +1995,13 @@ func (d *DownTrack) sendingPacket(hdr *rtp.Header, payloadSize int, spmd *sendPa
// STREAM-ALLOCATOR-TODO: remove this stream allocator bytes counter once stream allocator changes fully to pull bytes counter
size := uint32(hdrSize + payloadSize)
d.streamAllocatorBytesCounter.Add(size)
/* STREAM-ALLOCATOR-DATA
if spmd.isRTX {
d.bytesRetransmitted.Add(size)
} else {
d.bytesSent.Add(size)
}
*/
}
// update RTPStats
@@ -134,9 +134,11 @@ func (c *ChannelObserver) GetNackRatio() float64 {
return c.nackTracker.GetRatio()
}
/* STREAM-ALLOCATOR-DATA
func (c *ChannelObserver) GetNackHistory() []string {
return c.nackTracker.GetHistory()
}
*/
func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) {
estimateDirection := c.estimateTrend.GetDirection()
+7 -3
View File
@@ -38,20 +38,22 @@ type NackTracker struct {
packets uint32
repeatedNacks uint32
/* STREAM-ALLOCATOR-DATA
// STREAM-ALLOCATOR-EXPERIMENTAL-TODO: remove when cleaning up experimental stuff
history []string
*/
}
func NewNackTracker(params NackTrackerParams) *NackTracker {
return &NackTracker{
params: params,
history: make([]string, 0, 10),
params: params,
// STREAM-ALLOCATOR-DATA history: make([]string, 0, 10),
}
}
func (n *NackTracker) Add(packets uint32, repeatedNacks uint32) {
if n.params.WindowMaxDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.WindowMaxDuration {
n.updateHistory()
// STREAM-ALLOCATOR-DATA n.updateHistory()
n.windowStartTime = time.Time{}
n.packets = 0
@@ -104,6 +106,7 @@ func (n *NackTracker) ToString() string {
return fmt.Sprintf("n: %s, %s, p: %d, rn: %d, rn/p: %.2f", n.params.Name, window, n.packets, n.repeatedNacks, n.GetRatio())
}
/* STREAM-ALLOCATOR-DATA
func (n *NackTracker) GetHistory() []string {
return n.history
}
@@ -115,5 +118,6 @@ func (n *NackTracker) updateHistory() {
n.history = append(n.history, n.ToString())
}
*/
// ------------------------------------------------
+29 -17
View File
@@ -85,8 +85,8 @@ const (
streamAllocatorSignalResume
streamAllocatorSignalSetAllowPause
streamAllocatorSignalSetChannelCapacity
streamAllocatorSignalNACK
streamAllocatorSignalRTCPReceiverReport
// STREAM-ALLOCATOR-DATA streamAllocatorSignalNACK
// STREAM-ALLOCATOR-DATA streamAllocatorSignalRTCPReceiverReport
)
func (s streamAllocatorSignal) String() string {
@@ -111,10 +111,12 @@ func (s streamAllocatorSignal) String() string {
return "SET_ALLOW_PAUSE"
case streamAllocatorSignalSetChannelCapacity:
return "SET_CHANNEL_CAPACITY"
case streamAllocatorSignalNACK:
return "NACK"
case streamAllocatorSignalRTCPReceiverReport:
return "RTCP_RECEIVER_REPORT"
/* STREAM-ALLOCATOR-DATA
case streamAllocatorSignalNACK:
return "NACK"
case streamAllocatorSignalRTCPReceiverReport:
return "RTCP_RECEIVER_REPORT"
*/
default:
return fmt.Sprintf("%d", int(s))
}
@@ -157,7 +159,7 @@ type StreamAllocator struct {
prober *Prober
channelObserver *ChannelObserver
// rateMonitor *RateMonitor
// STREAM-ALLOCATOR-DATA rateMonitor *RateMonitor
videoTracksMu sync.RWMutex
videoTracks map[livekit.TrackID]*Track
@@ -178,7 +180,7 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
prober: NewProber(ProberParams{
Logger: params.Logger,
}),
// rateMonitor: NewRateMonitor(),
// STREAM-ALLOCATOR-DATA rateMonitor: NewRateMonitor(),
videoTracks: make(map[livekit.TrackID]*Track),
eventsQueue: utils.NewOpsQueue(utils.OpsQueueParams{
Name: "stream-allocator",
@@ -461,6 +463,7 @@ func (s *StreamAllocator) OnPacketsSent(downTrack *sfu.DownTrack, size int) {
s.prober.PacketsSent(size)
}
/* STREAM-ALLOCATOR-DATA
// called by a video DownTrack when it processes NACKs
func (s *StreamAllocator) OnNACK(downTrack *sfu.DownTrack, nackInfos []sfu.NackInfo) {
s.postEvent(Event{
@@ -479,6 +482,7 @@ func (s *StreamAllocator) OnRTCPReceiverReport(downTrack *sfu.DownTrack, rr rtcp
Data: rr,
})
}
*/
// called when prober wants to send packet(s)
func (s *StreamAllocator) OnSendProbe(bytesToSend int) {
@@ -598,10 +602,12 @@ func (s *StreamAllocator) handleEvent(event *Event) {
s.handleSignalSetAllowPause(event)
case streamAllocatorSignalSetChannelCapacity:
s.handleSignalSetChannelCapacity(event)
case streamAllocatorSignalNACK:
s.handleSignalNACK(event)
case streamAllocatorSignalRTCPReceiverReport:
s.handleSignalRTCPReceiverReport(event)
/* STREAM-ALLOCATOR-DATA
case streamAllocatorSignalNACK:
s.handleSignalNACK(event)
case streamAllocatorSignalRTCPReceiverReport:
s.handleSignalRTCPReceiverReport(event)
*/
}
}
@@ -635,7 +641,7 @@ func (s *StreamAllocator) handleSignalAdjustState(event *Event) {
func (s *StreamAllocator) handleSignalEstimate(event *Event) {
receivedEstimate, _ := event.Data.(int64)
s.lastReceivedEstimate = receivedEstimate
s.monitorRate(receivedEstimate)
// s.monitorRate(receivedEstimate)
// while probing, maintain estimate separately to enable keeping current committed estimate if probe fails
if s.probeController.IsInProbe() {
@@ -662,7 +668,7 @@ func (s *StreamAllocator) handleSignalPeriodicPing(event *Event) {
s.maybeProbe()
}
s.updateTracksHistory()
// s.updateTracksHistory()
}
func (s *StreamAllocator) handleSignalSendProbe(event *Event) {
@@ -719,6 +725,7 @@ func (s *StreamAllocator) handleSignalSetChannelCapacity(event *Event) {
}
}
/* STREAM-ALLOCATOR-DATA
func (s *StreamAllocator) handleSignalNACK(event *Event) {
nackInfos := event.Data.([]sfu.NackInfo)
@@ -742,6 +749,7 @@ func (s *StreamAllocator) handleSignalRTCPReceiverReport(event *Event) {
track.ProcessRTCPReceiverReport(rr)
}
}
*/
func (s *StreamAllocator) setState(state streamAllocatorState) {
if s.state == state {
@@ -823,13 +831,15 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() {
"commitThreshold(bps)", commitThreshold,
"channel", s.channelObserver.ToString(),
)
/* STREAM-ALLOCATOR-DATA
s.params.Logger.Debugw(
fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity: experimental", action),
// "rateHistory", s.rateMonitor.GetHistory(),
// "expectedQueuing", s.rateMonitor.GetQueuingGuess(),
"rateHistory", s.rateMonitor.GetHistory(),
"expectedQueuing", s.rateMonitor.GetQueuingGuess(),
"nackHistory", s.channelObserver.GetNackHistory(),
"trackHistory", s.getTracksHistory(),
)
*/
if estimateToCommit > commitThreshold {
// estimate to commit is either higher or within tolerance of expected uage, skip committing and re-allocating
return
@@ -1407,6 +1417,7 @@ func (s *StreamAllocator) getMaxDistanceSortedDeficient() MaxDistanceSorter {
return maxDistanceSorter
}
/* STREAM-ALLOCATOR-DATA
// STREAM-ALLOCATOR-EXPERIMENTAL-TODO
// Monitor sent rate vs estimate to figure out queuing on congestion.
// Idea here is to pause all managed tracks on congestion detection immediately till queue drains.
@@ -1431,7 +1442,7 @@ func (s *StreamAllocator) monitorRate(estimate int64) {
}
}
// s.rateMonitor.Update(estimate, managedBytesSent, managedBytesRetransmitted, unmanagedBytesSent, unmanagedBytesRetransmitted)
s.rateMonitor.Update(estimate, managedBytesSent, managedBytesRetransmitted, unmanagedBytesSent, unmanagedBytesRetransmitted)
}
func (s *StreamAllocator) updateTracksHistory() {
@@ -1449,6 +1460,7 @@ func (s *StreamAllocator) getTracksHistory() map[livekit.TrackID]string {
return history
}
*/
// ------------------------------------------------
+12 -12
View File
@@ -15,14 +15,8 @@
package streamallocator
import (
"fmt"
"sort"
"time"
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/pion/rtcp"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
@@ -41,6 +35,7 @@ type Track struct {
totalPackets uint32
totalRepeatedNacks uint32
/* STREAM-ALLOCATOR-DATA
nackInfos map[uint16]sfu.NackInfo
// STREAM-ALLOCATOR-EXPERIMENTAL-TODO: remove after experimental
nackHistory []string
@@ -53,6 +48,7 @@ type Track struct {
maxRTT uint32
// STREAM-ALLOCATOR-EXPERIMENTAL-TODO: remove after experimental
receiverReportHistory []string
*/
isDirty bool
@@ -67,15 +63,17 @@ func NewTrack(
logger logger.Logger,
) *Track {
t := &Track{
downTrack: downTrack,
source: source,
isSimulcast: isSimulcast,
publisherID: publisherID,
logger: logger,
downTrack: downTrack,
source: source,
isSimulcast: isSimulcast,
publisherID: publisherID,
logger: logger,
/* STREAM-ALLOCATOR-DATA
nackInfos: make(map[uint16]sfu.NackInfo),
nackHistory: make([]string, 0, 10),
receiverReportHistory: make([]string, 0, 10),
streamState: StreamStateInactive,
*/
streamState: StreamStateInactive,
}
t.SetPriority(0)
t.SetMaxLayer(downTrack.MaxLayer())
@@ -220,6 +218,7 @@ func (t *Track) GetNackDelta() (uint32, uint32) {
return packetDelta, nackDelta
}
/* STREAM-ALLOCATOR-DATA
func (t *Track) UpdateNack(nackInfos []sfu.NackInfo) {
for _, ni := range nackInfos {
t.nackInfos[ni.SequenceNumber] = ni
@@ -363,6 +362,7 @@ func (t *Track) updateReceiverReportHistory() {
fmt.Sprintf("t: %+v, l: %d, p: %d, rtt: %d", time.Now().Format(time.UnixDate), dl, dp, maxRTT),
)
}
*/
// ------------------------------------------------