From f4a984d446545e10bf4c8244e65e1ac9cac9dd59 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Tue, 8 Aug 2023 01:06:14 -0700 Subject: [PATCH 1/4] preallocate prometheus packet counters (#1942) --- pkg/telemetry/prometheus/packets.go | 62 +++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/pkg/telemetry/prometheus/packets.go b/pkg/telemetry/prometheus/packets.go index 3048367a8..ea8284295 100644 --- a/pkg/telemetry/prometheus/packets.go +++ b/pkg/telemetry/prometheus/packets.go @@ -56,6 +56,15 @@ var ( promRTT *prometheus.HistogramVec promParticipantJoin *prometheus.CounterVec promConnections *prometheus.GaugeVec + + promPacketTotalIncomingInitial prometheus.Counter + promPacketTotalIncomingRetransmit prometheus.Counter + promPacketTotalOutgoingInitial prometheus.Counter + promPacketTotalOutgoingRetransmit prometheus.Counter + promPacketBytesIncomingInitial prometheus.Counter + promPacketBytesIncomingRetransmit prometheus.Counter + promPacketBytesOutgoingInitial prometheus.Counter + promPacketBytesOutgoingRetransmit prometheus.Counter ) func initPacketStats(nodeID string, nodeType livekit.NodeType, env string) { @@ -140,13 +149,32 @@ func initPacketStats(nodeID string, nodeType livekit.NodeType, env string) { prometheus.MustRegister(promRTT) prometheus.MustRegister(promParticipantJoin) prometheus.MustRegister(promConnections) + + promPacketTotalIncomingInitial = promPacketTotal.WithLabelValues(string(Incoming), transmissionInitial) + promPacketTotalIncomingRetransmit = promPacketTotal.WithLabelValues(string(Incoming), transmissionRetransmit) + promPacketTotalOutgoingInitial = promPacketTotal.WithLabelValues(string(Outgoing), transmissionInitial) + promPacketTotalOutgoingRetransmit = promPacketTotal.WithLabelValues(string(Outgoing), transmissionRetransmit) + promPacketBytesIncomingInitial = promPacketBytes.WithLabelValues(string(Incoming), transmissionInitial) + promPacketBytesIncomingRetransmit = promPacketBytes.WithLabelValues(string(Incoming), transmissionRetransmit) + promPacketBytesOutgoingInitial = promPacketBytes.WithLabelValues(string(Outgoing), transmissionInitial) + promPacketBytesOutgoingRetransmit = promPacketBytes.WithLabelValues(string(Outgoing), transmissionRetransmit) } func IncrementPackets(direction Direction, count uint64, retransmit bool) { - promPacketTotal.WithLabelValues( - string(direction), - transmissionLabel(retransmit), - ).Add(float64(count)) + if direction == Incoming { + if retransmit { + promPacketTotalIncomingRetransmit.Add(float64(count)) + } else { + promPacketTotalIncomingInitial.Add(float64(count)) + } + } else { + if retransmit { + promPacketTotalOutgoingRetransmit.Add(float64(count)) + } else { + promPacketTotalOutgoingInitial.Add(float64(count)) + } + } + if direction == Incoming { packetsIn.Add(count) } else { @@ -158,10 +186,20 @@ func IncrementPackets(direction Direction, count uint64, retransmit bool) { } func IncrementBytes(direction Direction, count uint64, retransmit bool) { - promPacketBytes.WithLabelValues( - string(direction), - transmissionLabel(retransmit), - ).Add(float64(count)) + if direction == Incoming { + if retransmit { + promPacketBytesIncomingRetransmit.Add(float64(count)) + } else { + promPacketBytesIncomingInitial.Add(float64(count)) + } + } else { + if retransmit { + promPacketBytesOutgoingRetransmit.Add(float64(count)) + } else { + promPacketBytesOutgoingInitial.Add(float64(count)) + } + } + if direction == Incoming { bytesIn.Add(count) } else { @@ -240,11 +278,3 @@ func AddConnection(direction Direction) { func SubConnection(direction Direction) { promConnections.WithLabelValues(string(direction)).Sub(1) } - -func transmissionLabel(retransmit bool) string { - if !retransmit { - return transmissionInitial - } else { - return transmissionRetransmit - } -} From 9a96abc11fe0b095de81e8eb43bbad39dac5d367 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 8 Aug 2023 23:44:03 +0530 Subject: [PATCH 2/4] Intermediate signed type casting (#1944) --- pkg/sfu/forwarder.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 6ab522da1..68c646808 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1512,7 +1512,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // Ideally, refTS should not be ahead of expectedTS, but expectedTS uses the first packet's // wall clock time. So, if the first packet experienced abmormal latency, it is possible // for refTS > expectedTS - diffSeconds := float64(expectedTS-refTS) / float64(f.codec.ClockRate) + diffSeconds := float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate) if diffSeconds >= 0.0 { if diffSeconds > ResumeBehindThresholdSeconds { f.logger.Infow("resume, reference too far behind", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", diffSeconds) @@ -1530,7 +1530,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e } } else { // switching between layers, check if refTS is too far behind the last sent - diffSeconds := float64(refTS-lastTS) / float64(f.codec.ClockRate) + diffSeconds := float64(int32(refTS-lastTS)) / float64(f.codec.ClockRate) if diffSeconds < 0.0 { if math.Abs(diffSeconds) > LayerSwitchBehindThresholdSeconds { // AVSYNC-TODO: This could be due to pacer trickling out this layer. Should potentially return error here and wait for a more opportune time @@ -1541,7 +1541,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // use a nominal increase to ensure that timestamp is always moving forward nextTS = lastTS + 1 } else { - diffSeconds = float64(expectedTS-refTS) / float64(f.codec.ClockRate) + diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate) if diffSeconds < 0.0 && math.Abs(diffSeconds) > SwitchAheadThresholdSeconds { f.logger.Infow("layer switch, reference too far ahead", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", math.Abs(diffSeconds)) nextTS = expectedTS From c14c58b4ae1499c61e79506784ce163b3a09a403 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 9 Aug 2023 11:28:48 +0530 Subject: [PATCH 3/4] Layer switches at log info to better understand A/V sync issues. (#1947) --- pkg/sfu/forwarder.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 68c646808..33131bd71 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1424,7 +1424,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e f.referenceLayerSpatial = layer f.rtpMunger.SetLastSnTs(extPkt) f.codecMunger.SetLast(extPkt) - f.logger.Debugw( + f.logger.Infow( "starting forwarding", "sequenceNumber", extPkt.Packet.SequenceNumber, "timestamp", extPkt.Packet.Timestamp, @@ -1556,7 +1556,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // nominal increase nextTS = lastTS + 1 } - f.logger.Debugw( + f.logger.Infow( "next timestamp on switch", "switchingAt", switchingAt.String(), "layer", layer, From 0e9ec9a21e2b2687ae4bfacdb1f129c41a0995c0 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 9 Aug 2023 17:42:33 +0530 Subject: [PATCH 4/4] Ignore lagging layer switches. (#1948) --- pkg/sfu/buffer/buffer.go | 6 +++--- pkg/sfu/buffer/rtpstats.go | 39 ++++++++++++++++++++++++++++++++++++++ pkg/sfu/downtrack.go | 5 ++++- pkg/sfu/forwarder.go | 33 +++++++++++++++++--------------- pkg/sfu/receiver.go | 4 ++-- 5 files changed, 66 insertions(+), 21 deletions(-) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 2cfe68567..baae76b46 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -101,7 +101,7 @@ type Buffer struct { // callbacks onClose func() onRtcpFeedback func([]rtcp.Packet) - onRtcpSenderReport func(*RTCPSenderReportData) + onRtcpSenderReport func() onFpsChanged func() onFinalRtpStats func(*RTPStats) @@ -675,7 +675,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { b.RUnlock() if b.onRtcpSenderReport != nil { - b.onRtcpSenderReport(srData) + b.onRtcpSenderReport() } } @@ -729,7 +729,7 @@ func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) { b.onRtcpFeedback = fn } -func (b *Buffer) OnRtcpSenderReport(fn func(srData *RTCPSenderReportData)) { +func (b *Buffer) OnRtcpSenderReport(fn func()) { b.onRtcpSenderReport = fn } diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index d1069c00a..938f1e4bd 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -36,6 +36,9 @@ const ( FirstSnapshotId = 1 SnInfoSize = 8192 SnInfoMask = SnInfoSize - 1 + + firstPacketTimeAdjustWindow = 2 * time.Minute + firstPacketTimeAdjustThreshold = 5 * time.Second ) // ------------------------------------------------------- @@ -763,6 +766,40 @@ func (r *RTPStats) GetRtt() uint32 { return r.rtt } +func (r *RTPStats) MaybeAdjustFirstPacketTime(srData *RTCPSenderReportData) { + r.lock.Lock() + defer r.lock.Unlock() + + r.maybeAdjustFirstPacketTime(srData.RTPTimestampExt) +} + +func (r *RTPStats) maybeAdjustFirstPacketTime(extTS uint64) { + if time.Since(r.startTime) > firstPacketTimeAdjustWindow { + return + } + + // for some time after the start, adjust time of first packet. + // Helps improve accuracy of expected timestamp calculation. + // Adjusting only one way, i. e. if the first sample experienced + // abnormal delay (maybe due to pacing or maybe due to queuing + // in some network element along the way), push back first time + // to an earlier instance. + samplesDuration := time.Duration(float64(extTS-r.extStartTS) / float64(r.params.ClockRate) * float64(time.Second)) + firstTime := time.Now().Add(-samplesDuration) + if firstTime.Before(r.firstTime) { + r.logger.Infow( + "adjusting first packet time", + "before", r.firstTime.String(), + "after", firstTime.String(), + ) + if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold { + r.logger.Infow("first packet time adjustment too big, ignoring", "adjustment", r.firstTime.Sub(firstTime)) + } else { + r.firstTime = firstTime + } + } +} + func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { r.lock.Lock() defer r.lock.Unlock() @@ -792,6 +829,8 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { srDataCopy := *srData srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles + r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt) + // monitor and log RTP timestamp anomalies var ntpDiffSinceLast time.Duration var rtpDiffSinceLast uint32 diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index d204b8c34..ff287ae20 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1802,7 +1802,10 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { } } -func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, _layer int32, _srData *buffer.RTCPSenderReportData) error { +func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, layer int32, srData *buffer.RTCPSenderReportData) error { + if layer == d.forwarder.GetReferenceLayerSpatial() { + d.rtpStats.MaybeAdjustFirstPacketTime(srData) + } return nil } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 33131bd71..abfd9a4e3 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -15,6 +15,7 @@ package sfu import ( + "errors" "fmt" "math" "math/rand" @@ -40,7 +41,7 @@ const ( FlagFilterRTX = true TransitionCostSpatial = 10 - ResumeBehindThresholdSeconds = float64(0.1) // 100ms + ResumeBehindThresholdSeconds = float64(0.2) // 200ms LayerSwitchBehindThresholdSeconds = float64(0.05) // 50ms SwitchAheadThresholdSeconds = float64(0.025) // 25ms ) @@ -1455,14 +1456,16 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e switchingAt := time.Now() if f.getReferenceLayerRTPTimestamp != nil { ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial) - if err == nil { - refTS = ts + if err != nil { + // error out if refTS is not available. It can happen when there is no sender report + // for the layer being switched to. Can especially happen at the start of the track when layer switches are + // potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been + // received will calculate a better offset, but may result in initial adaptation to take a bit longer depending + // on how often publisher/remote side sends RTCP sender report. + return err } - // AVSYNC-TODO: can error out here if refTS is not available. It can happen when there is no sender report - // for the layer being switched to. Can especially happen at the start of the track when layer switches are - // potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been - // received will calculate a better offset, but may result in initial adaptation to take a bit longer depending - // on how often publisher/remote side sends RTCP sender report. + + refTS = ts } if f.getExpectedRTPTimestamp != nil { @@ -1508,7 +1511,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // between expectedTS and refTS is thresholded. Difference below the threshold is treated as Case 2 // and above as Case 1. // - // In the event of refTS > expectedTS, another threshold is used to pick the next timestamp. + // In the event of refTS > expectedTS, use refTS. // Ideally, refTS should not be ahead of expectedTS, but expectedTS uses the first packet's // wall clock time. So, if the first packet experienced abmormal latency, it is possible // for refTS > expectedTS @@ -1523,22 +1526,22 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e } else { if math.Abs(diffSeconds) > SwitchAheadThresholdSeconds { f.logger.Infow("resume, reference too far ahead", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", math.Abs(diffSeconds)) - nextTS = expectedTS - } else { - nextTS = refTS } + nextTS = refTS } } else { // switching between layers, check if refTS is too far behind the last sent diffSeconds := float64(int32(refTS-lastTS)) / float64(f.codec.ClockRate) if diffSeconds < 0.0 { if math.Abs(diffSeconds) > LayerSwitchBehindThresholdSeconds { - // AVSYNC-TODO: This could be due to pacer trickling out this layer. Should potentially return error here and wait for a more opportune time - // or some forcing function (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition) - // to do the switch. Just logging it for now. + // this could be due to pacer trickling out this layer. Error out and wait for a more opportune time. + // AVSYNC-TODO: Consider some forcing function to do the switch + // (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition). f.logger.Infow("layer switch, reference too far behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds)) + return errors.New("switch point too far behind") } // use a nominal increase to ensure that timestamp is always moving forward + f.logger.Infow("layer switch, reference is slghtly behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds)) nextTS = lastTS + 1 } else { diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate) diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 1b36811d5..e24d8b1b0 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -331,12 +331,12 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff SmoothIntervals: w.audioConfig.SmoothIntervals, }) buff.OnRtcpFeedback(w.sendRTCP) - buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) { + buff.OnRtcpSenderReport(func() { srFirst, srNewest := buff.GetSenderReportData() w.streamTrackerManager.SetRTCPSenderReportData(layer, srFirst, srNewest) w.downTrackSpreader.Broadcast(func(dt TrackSender) { - _ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData) + _ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srNewest) }) })