diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 2cfe68567..baae76b46 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -101,7 +101,7 @@ type Buffer struct { // callbacks onClose func() onRtcpFeedback func([]rtcp.Packet) - onRtcpSenderReport func(*RTCPSenderReportData) + onRtcpSenderReport func() onFpsChanged func() onFinalRtpStats func(*RTPStats) @@ -675,7 +675,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { b.RUnlock() if b.onRtcpSenderReport != nil { - b.onRtcpSenderReport(srData) + b.onRtcpSenderReport() } } @@ -729,7 +729,7 @@ func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) { b.onRtcpFeedback = fn } -func (b *Buffer) OnRtcpSenderReport(fn func(srData *RTCPSenderReportData)) { +func (b *Buffer) OnRtcpSenderReport(fn func()) { b.onRtcpSenderReport = fn } diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index d1069c00a..938f1e4bd 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -36,6 +36,9 @@ const ( FirstSnapshotId = 1 SnInfoSize = 8192 SnInfoMask = SnInfoSize - 1 + + firstPacketTimeAdjustWindow = 2 * time.Minute + firstPacketTimeAdjustThreshold = 5 * time.Second ) // ------------------------------------------------------- @@ -763,6 +766,40 @@ func (r *RTPStats) GetRtt() uint32 { return r.rtt } +func (r *RTPStats) MaybeAdjustFirstPacketTime(srData *RTCPSenderReportData) { + r.lock.Lock() + defer r.lock.Unlock() + + r.maybeAdjustFirstPacketTime(srData.RTPTimestampExt) +} + +func (r *RTPStats) maybeAdjustFirstPacketTime(extTS uint64) { + if time.Since(r.startTime) > firstPacketTimeAdjustWindow { + return + } + + // for some time after the start, adjust time of first packet. + // Helps improve accuracy of expected timestamp calculation. + // Adjusting only one way, i. e. if the first sample experienced + // abnormal delay (maybe due to pacing or maybe due to queuing + // in some network element along the way), push back first time + // to an earlier instance. + samplesDuration := time.Duration(float64(extTS-r.extStartTS) / float64(r.params.ClockRate) * float64(time.Second)) + firstTime := time.Now().Add(-samplesDuration) + if firstTime.Before(r.firstTime) { + r.logger.Infow( + "adjusting first packet time", + "before", r.firstTime.String(), + "after", firstTime.String(), + ) + if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold { + r.logger.Infow("first packet time adjustment too big, ignoring", "adjustment", r.firstTime.Sub(firstTime)) + } else { + r.firstTime = firstTime + } + } +} + func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { r.lock.Lock() defer r.lock.Unlock() @@ -792,6 +829,8 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { srDataCopy := *srData srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles + r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt) + // monitor and log RTP timestamp anomalies var ntpDiffSinceLast time.Duration var rtpDiffSinceLast uint32 diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 5ab7b0cfb..97a5ba747 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1842,7 +1842,10 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { } } -func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, _layer int32, _srData *buffer.RTCPSenderReportData) error { +func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, layer int32, srData *buffer.RTCPSenderReportData) error { + if layer == d.forwarder.GetReferenceLayerSpatial() { + d.rtpStats.MaybeAdjustFirstPacketTime(srData) + } return nil } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index ec09ce48b..ede9a8f18 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -15,6 +15,7 @@ package sfu import ( + "errors" "fmt" "math" "math/rand" @@ -40,7 +41,7 @@ const ( FlagFilterRTX = true TransitionCostSpatial = 10 - ResumeBehindThresholdSeconds = float64(0.1) // 100ms + ResumeBehindThresholdSeconds = float64(0.2) // 200ms LayerSwitchBehindThresholdSeconds = float64(0.05) // 50ms SwitchAheadThresholdSeconds = float64(0.025) // 25ms ) @@ -1425,7 +1426,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( f.referenceLayerSpatial = layer f.rtpMunger.SetLastSnTs(extPkt) f.codecMunger.SetLast(extPkt) - f.logger.Debugw( + f.logger.Infow( "starting forwarding", "sequenceNumber", extPkt.Packet.SequenceNumber, "timestamp", extPkt.Packet.Timestamp, @@ -1457,14 +1458,16 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( switchingAt := time.Now() if f.getReferenceLayerRTPTimestamp != nil { ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial) - if err == nil { - refTS = ts + if err != nil { + // error out if refTS is not available. It can happen when there is no sender report + // for the layer being switched to. Can especially happen at the start of the track when layer switches are + // potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been + // received will calculate a better offset, but may result in initial adaptation to take a bit longer depending + // on how often publisher/remote side sends RTCP sender report. + return nil, err } - // AVSYNC-TODO: can error out here if refTS is not available. It can happen when there is no sender report - // for the layer being switched to. Can especially happen at the start of the track when layer switches are - // potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been - // received will calculate a better offset, but may result in initial adaptation to take a bit longer depending - // on how often publisher/remote side sends RTCP sender report. + + refTS = ts } if f.getExpectedRTPTimestamp != nil { @@ -1510,11 +1513,11 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( // between expectedTS and refTS is thresholded. Difference below the threshold is treated as Case 2 // and above as Case 1. // - // In the event of refTS > expectedTS, another threshold is used to pick the next timestamp. + // In the event of refTS > expectedTS, use refTS. // Ideally, refTS should not be ahead of expectedTS, but expectedTS uses the first packet's // wall clock time. So, if the first packet experienced abmormal latency, it is possible // for refTS > expectedTS - diffSeconds := float64(expectedTS-refTS) / float64(f.codec.ClockRate) + diffSeconds := float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate) if diffSeconds >= 0.0 { if diffSeconds > ResumeBehindThresholdSeconds { f.logger.Infow("resume, reference too far behind", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", diffSeconds) @@ -1525,25 +1528,25 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( } else { if math.Abs(diffSeconds) > SwitchAheadThresholdSeconds { f.logger.Infow("resume, reference too far ahead", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", math.Abs(diffSeconds)) - nextTS = expectedTS - } else { - nextTS = refTS } + nextTS = refTS } } else { // switching between layers, check if refTS is too far behind the last sent - diffSeconds := float64(refTS-lastTS) / float64(f.codec.ClockRate) + diffSeconds := float64(int32(refTS-lastTS)) / float64(f.codec.ClockRate) if diffSeconds < 0.0 { if math.Abs(diffSeconds) > LayerSwitchBehindThresholdSeconds { - // AVSYNC-TODO: This could be due to pacer trickling out this layer. Should potentially return error here and wait for a more opportune time - // or some forcing function (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition) - // to do the switch. Just logging it for now. + // this could be due to pacer trickling out this layer. Error out and wait for a more opportune time. + // AVSYNC-TODO: Consider some forcing function to do the switch + // (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition). f.logger.Infow("layer switch, reference too far behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds)) + return nil, errors.New("switch point too far behind") } // use a nominal increase to ensure that timestamp is always moving forward + f.logger.Infow("layer switch, reference is slghtly behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds)) nextTS = lastTS + 1 } else { - diffSeconds = float64(expectedTS-refTS) / float64(f.codec.ClockRate) + diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate) if diffSeconds < 0.0 && math.Abs(diffSeconds) > SwitchAheadThresholdSeconds { f.logger.Infow("layer switch, reference too far ahead", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", math.Abs(diffSeconds)) nextTS = expectedTS @@ -1573,7 +1576,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( f.rtpMunger.UpdateSnTsOffsets(extPkt, snOffset, tsOffset) f.codecMunger.UpdateOffsets(extPkt) - f.logger.Debugw( + f.logger.Infow( "source switch", "switchingAt", switchingAt.String(), "layer", layer, diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 1b36811d5..e24d8b1b0 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -331,12 +331,12 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff SmoothIntervals: w.audioConfig.SmoothIntervals, }) buff.OnRtcpFeedback(w.sendRTCP) - buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) { + buff.OnRtcpSenderReport(func() { srFirst, srNewest := buff.GetSenderReportData() w.streamTrackerManager.SetRTCPSenderReportData(layer, srFirst, srNewest) w.downTrackSpreader.Broadcast(func(dt TrackSender) { - _ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData) + _ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srNewest) }) }) diff --git a/pkg/telemetry/prometheus/packets.go b/pkg/telemetry/prometheus/packets.go index 3048367a8..ea8284295 100644 --- a/pkg/telemetry/prometheus/packets.go +++ b/pkg/telemetry/prometheus/packets.go @@ -56,6 +56,15 @@ var ( promRTT *prometheus.HistogramVec promParticipantJoin *prometheus.CounterVec promConnections *prometheus.GaugeVec + + promPacketTotalIncomingInitial prometheus.Counter + promPacketTotalIncomingRetransmit prometheus.Counter + promPacketTotalOutgoingInitial prometheus.Counter + promPacketTotalOutgoingRetransmit prometheus.Counter + promPacketBytesIncomingInitial prometheus.Counter + promPacketBytesIncomingRetransmit prometheus.Counter + promPacketBytesOutgoingInitial prometheus.Counter + promPacketBytesOutgoingRetransmit prometheus.Counter ) func initPacketStats(nodeID string, nodeType livekit.NodeType, env string) { @@ -140,13 +149,32 @@ func initPacketStats(nodeID string, nodeType livekit.NodeType, env string) { prometheus.MustRegister(promRTT) prometheus.MustRegister(promParticipantJoin) prometheus.MustRegister(promConnections) + + promPacketTotalIncomingInitial = promPacketTotal.WithLabelValues(string(Incoming), transmissionInitial) + promPacketTotalIncomingRetransmit = promPacketTotal.WithLabelValues(string(Incoming), transmissionRetransmit) + promPacketTotalOutgoingInitial = promPacketTotal.WithLabelValues(string(Outgoing), transmissionInitial) + promPacketTotalOutgoingRetransmit = promPacketTotal.WithLabelValues(string(Outgoing), transmissionRetransmit) + promPacketBytesIncomingInitial = promPacketBytes.WithLabelValues(string(Incoming), transmissionInitial) + promPacketBytesIncomingRetransmit = promPacketBytes.WithLabelValues(string(Incoming), transmissionRetransmit) + promPacketBytesOutgoingInitial = promPacketBytes.WithLabelValues(string(Outgoing), transmissionInitial) + promPacketBytesOutgoingRetransmit = promPacketBytes.WithLabelValues(string(Outgoing), transmissionRetransmit) } func IncrementPackets(direction Direction, count uint64, retransmit bool) { - promPacketTotal.WithLabelValues( - string(direction), - transmissionLabel(retransmit), - ).Add(float64(count)) + if direction == Incoming { + if retransmit { + promPacketTotalIncomingRetransmit.Add(float64(count)) + } else { + promPacketTotalIncomingInitial.Add(float64(count)) + } + } else { + if retransmit { + promPacketTotalOutgoingRetransmit.Add(float64(count)) + } else { + promPacketTotalOutgoingInitial.Add(float64(count)) + } + } + if direction == Incoming { packetsIn.Add(count) } else { @@ -158,10 +186,20 @@ func IncrementPackets(direction Direction, count uint64, retransmit bool) { } func IncrementBytes(direction Direction, count uint64, retransmit bool) { - promPacketBytes.WithLabelValues( - string(direction), - transmissionLabel(retransmit), - ).Add(float64(count)) + if direction == Incoming { + if retransmit { + promPacketBytesIncomingRetransmit.Add(float64(count)) + } else { + promPacketBytesIncomingInitial.Add(float64(count)) + } + } else { + if retransmit { + promPacketBytesOutgoingRetransmit.Add(float64(count)) + } else { + promPacketBytesOutgoingInitial.Add(float64(count)) + } + } + if direction == Incoming { bytesIn.Add(count) } else { @@ -240,11 +278,3 @@ func AddConnection(direction Direction) { func SubConnection(direction Direction) { promConnections.WithLabelValues(string(direction)).Sub(1) } - -func transmissionLabel(retransmit bool) string { - if !retransmit { - return transmissionInitial - } else { - return transmissionRetransmit - } -}