diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 0c77299ae..3dac1c4af 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -105,6 +105,8 @@ type snapshot struct { maxJitter float64 } +// ------------------------------------------------------------------ + type RTCPSenderReportData struct { RTPTimestamp uint32 RTPTimestampExt uint64 @@ -112,6 +114,16 @@ type RTCPSenderReportData struct { At time.Time } +func (r *RTCPSenderReportData) ToString() string { + if r == nil { + return "" + } + + return fmt.Sprintf("ntp: %s, rtp: %d, extRtp: %d, at: %s", r.NTPTimestamp.Time().String(), r.RTPTimestamp, r.RTPTimestampExt, r.At.String()) +} + +// ------------------------------------------------------------------ + type RTPStatsParams struct { ClockRate uint32 Logger logger.Logger @@ -527,8 +539,8 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes packetsExpected := now.extStartSN - then.extStartSN if packetsExpected > cNumSequenceNumbers { - r.logger.Errorw( - "too many packets expected in delta", nil, + r.logger.Infow( + "too many packets expected in delta", "startSN", then.extStartSN, "endSN", now.extStartSN, "packetsExpected", packetsExpected, diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 5902f57e8..77300f7ab 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -16,6 +16,7 @@ package buffer import ( "fmt" + "math" "time" "github.com/pion/rtcp" @@ -51,6 +52,9 @@ type RTPStatsReceiver struct { timestamp *utils.WrapAround[uint32, uint64] history *protoutils.Bitmap[uint64] + + clockSkewCount int + outOfOrderSsenderReportCount int } func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver { @@ -110,7 +114,7 @@ func (r *RTPStatsReceiver) Update( r.snapshots[i] = r.initSnapshot(r.startTime, r.sequenceNumber.GetExtendedStart()) } - r.logger.Infow( + r.logger.Debugw( "rtp receiver stream start", "startTime", r.startTime.String(), "firstTime", r.firstTime.String(), @@ -153,6 +157,12 @@ func (r *RTPStatsReceiver) Update( if -gapSN >= cNumSequenceNumbers { r.logger.Warnw( "large sequence number gap negative", nil, + "extStartSN", r.sequenceNumber.GetExtendedStart(), + "extHighestSN", r.sequenceNumber.GetExtendedHighest(), + "extStartTS", r.timestamp.GetExtendedStart(), + "extHighestTS", r.timestamp.GetExtendedHighest(), + "firstTime", r.firstTime.String(), + "highestTime", r.highestTime.String(), "prev", resSN.PreExtendedHighest, "curr", resSN.ExtendedVal, "gap", gapSN, @@ -219,6 +229,12 @@ func (r *RTPStatsReceiver) Update( if gapSN >= cNumSequenceNumbers { r.logger.Warnw( "large sequence number gap", nil, + "extStartSN", r.sequenceNumber.GetExtendedStart(), + "extHighestSN", r.sequenceNumber.GetExtendedHighest(), + "extStartTS", r.timestamp.GetExtendedStart(), + "extHighestTS", r.timestamp.GetExtendedHighest(), + "firstTime", r.firstTime.String(), + "highestTime", r.highestTime.String(), "prev", resSN.PreExtendedHighest, "curr", resSN.ExtendedVal, "gap", gapSN, @@ -287,12 +303,8 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp { r.logger.Infow( "received anachronous sender report", - "currentNTP", srData.NTPTimestamp.Time().String(), - "currentRTP", srData.RTPTimestamp, - "currentAt", srData.At.String(), - "lastNTP", r.srNewest.NTPTimestamp.Time().String(), - "lastRTP", r.srNewest.RTPTimestamp, - "lastAt", r.srNewest.At.String(), + "last", r.srNewest.ToString(), + "current", srData.ToString(), ) return } @@ -310,22 +322,47 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp, r.timestamp.GetStart()) + if r.srNewest != nil { + timeSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()).Seconds() + rtpDiffSinceLast := srDataCopy.RTPTimestampExt - r.srNewest.RTPTimestampExt + calculatedClockRateFromLast := float64(rtpDiffSinceLast) / timeSinceLast + + timeSinceFirst := srData.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()).Seconds() + rtpDiffSinceFirst := srDataCopy.RTPTimestampExt - r.srFirst.RTPTimestampExt + calculatedClockRateFromFirst := float64(rtpDiffSinceFirst) / timeSinceFirst + + if (timeSinceLast > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate)) || + (timeSinceFirst > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate)) { + if r.clockSkewCount%10 == 0 { + r.logger.Infow( + "clock rate skew", + "first", r.srFirst.ToString(), + "last", r.srNewest.ToString(), + "current", srDataCopy.ToString(), + "calculatedFirst", calculatedClockRateFromFirst, + "calculatedLast", calculatedClockRateFromLast, + "count", r.clockSkewCount, + ) + } + r.clockSkewCount++ + } + } + if r.srNewest != nil && srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt { // This can happen when a track is replaced with a null and then restored - // i. e. muting replacing with null and unmute restoring the original track. // Under such a condition reset the sender reports to start from this point. // Resetting will ensure sample rate calculations do not go haywire due to negative time. - r.logger.Infow( - "received sender report, out-of-order, resetting", - "prevTSExt", r.srNewest.RTPTimestampExt, - "prevRTP", r.srNewest.RTPTimestamp, - "prevNTP", r.srNewest.NTPTimestamp.Time().String(), - "prevAt", r.srNewest.At.String(), - "currTSExt", srDataCopy.RTPTimestampExt, - "currRTP", srDataCopy.RTPTimestamp, - "currNTP", srDataCopy.NTPTimestamp.Time().String(), - "currentAt", srDataCopy.At.String(), - ) + if r.outOfOrderSsenderReportCount%10 == 0 { + r.logger.Infow( + "received sender report, out-of-order, resetting", + "last", r.srNewest.ToString(), + "current", srDataCopy.ToString(), + "count", r.outOfOrderSsenderReportCount, + ) + } + r.outOfOrderSsenderReportCount++ + r.srFirst = nil } diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index b1c4c5df9..59e01a831 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -17,6 +17,7 @@ package buffer import ( "errors" "fmt" + "math" "time" "github.com/pion/rtcp" @@ -155,6 +156,10 @@ type RTPStatsSender struct { nextSenderSnapshotID uint32 senderSnapshots []senderSnapshot + + clockSkewCount int + outOfOrderSenderReportCount int + metadataCacheOverflowCount int } func NewRTPStatsSender(params RTPStatsParams) *RTPStatsSender { @@ -264,7 +269,7 @@ func (r *RTPStatsSender) Update( r.senderSnapshots[i] = r.initSenderSnapshot(r.startTime, r.extStartSN) } - r.logger.Infow( + r.logger.Debugw( "rtp sender stream start", "startTime", r.startTime.String(), "firstTime", r.firstTime.String(), @@ -284,6 +289,12 @@ func (r *RTPStatsSender) Update( if -gapSN >= cNumSequenceNumbers { r.logger.Warnw( "large sequence number gap negative", nil, + "extStartSN", r.extStartSN, + "extHighestSN", r.extHighestSN, + "extStartTS", r.extStartTS, + "extHighestTS", r.extHighestTS, + "firstTime", r.firstTime.String(), + "highestTime", r.highestTime.String(), "prev", r.extHighestSN, "curr", extSequenceNumber, "gap", gapSN, @@ -344,6 +355,12 @@ func (r *RTPStatsSender) Update( if gapSN >= cNumSequenceNumbers { r.logger.Warnw( "large sequence number gap", nil, + "extStartSN", r.extStartSN, + "extHighestSN", r.extHighestSN, + "extStartTS", r.extStartTS, + "extHighestTS", r.extHighestTS, + "firstTime", r.firstTime.String(), + "highestTime", r.highestTime.String(), "prev", r.extHighestSN, "curr", extSequenceNumber, "gap", gapSN, @@ -509,22 +526,26 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt eis := &s.intervalStats eis.aggregate(&is) if is.packetsNotFound != 0 { - r.logger.Warnw( - "potential sequence number de-sync", nil, - "lastRRTime", r.lastRRTime.String(), - "lastRR", r.lastRR, - "sinceLastRR", time.Since(r.lastRRTime).String(), - "receivedRR", rr, - "extStartSN", r.extStartSN, - "extHighestSN", r.extHighestSN, - "extLastRRSN", s.extLastRRSN, - "extReceivedRRSN", extReceivedRRSN, - "packetsInInterval", extReceivedRRSN-s.extLastRRSN, - "intervalStats", is.ToString(), - "aggregateIntervalStats", eis.ToString(), - "extHighestSNFromRR", r.extHighestSNFromRR, - "packetsLostFromRR", r.packetsLostFromRR, - ) + if r.metadataCacheOverflowCount%10 == 0 { + r.logger.Infow( + "metadata cache overflow", + "lastRRTime", r.lastRRTime.String(), + "lastRR", r.lastRR, + "sinceLastRR", time.Since(r.lastRRTime).String(), + "receivedRR", rr, + "extStartSN", r.extStartSN, + "extHighestSN", r.extHighestSN, + "extLastRRSN", s.extLastRRSN, + "extReceivedRRSN", extReceivedRRSN, + "packetsInInterval", extReceivedRRSN-s.extLastRRSN, + "intervalStats", is.ToString(), + "aggregateIntervalStats", eis.ToString(), + "extHighestSNFromRR", r.extHighestSNFromRR, + "packetsLostFromRR", r.packetsLostFromRR, + "count", r.metadataCacheOverflowCount, + ) + } + r.metadataCacheOverflowCount++ } s.extLastRRSN = extReceivedRRSN } @@ -593,6 +614,42 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui } } + srData := &RTCPSenderReportData{ + NTPTimestamp: nowNTP, + RTPTimestamp: nowRTP, + RTPTimestampExt: nowRTPExt, + At: now, + } + if r.srNewest != nil { + timeSinceLastReport := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()).Seconds() + rtpDiffSinceLastReport := nowRTPExt - r.srNewest.RTPTimestampExt + windowClockRate := float64(rtpDiffSinceLastReport) / timeSinceLastReport + if timeSinceLastReport > 0.2 && math.Abs(float64(r.params.ClockRate)-windowClockRate) > 0.2*float64(r.params.ClockRate) { + if r.clockSkewCount%10 == 0 { + r.logger.Infow( + "sending sender report, clock skew", + "last", r.srNewest.ToString(), + "curr", srData.ToString(), + "timeNow", time.Now().String(), + "extStartTS", r.extStartTS, + "extHighestTS", r.extHighestTS, + "highestTime", r.highestTime.String(), + "timeSinceHighest", timeSinceHighest.String(), + "firstTime", r.firstTime.String(), + "timeSinceFirst", timeSinceFirst.String(), + "nowRTPExtUsingTime", nowRTPExtUsingTime, + "calculatedClockRate", calculatedClockRate, + "nowRTPExtUsingRate", nowRTPExtUsingRate, + "timeSinceLastReport", timeSinceLastReport, + "rtpDiffSinceLastReport", rtpDiffSinceLastReport, + "windowClockRate", windowClockRate, + "count", r.clockSkewCount, + ) + } + r.clockSkewCount++ + } + } + if r.srNewest != nil && nowRTPExt < r.srNewest.RTPTimestampExt { // If report being generated is behind, use the time difference and // clock rate of codec to produce next report. @@ -605,35 +662,32 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui // result in this module not having calculated clock rate of publisher side. // - When the above happens, current will be generated using highestTS which could be behind. // That could end up behind the last report's timestamp in extreme cases - r.logger.Infow( - "sending sender report, out-of-order, repairing", - "prevTSExt", r.srNewest.RTPTimestampExt, - "prevRTP", r.srNewest.RTPTimestamp, - "prevNTP", r.srNewest.NTPTimestamp.Time().String(), - "extHighestTS", r.extHighestTS, - "currTSExt", nowRTPExt, - "currRTP", nowRTP, - "currNTP", nowNTP.Time().String(), - "timeNow", time.Now().String(), - "firstTime", r.firstTime.String(), - "timeSinceFirst", timeSinceFirst.String(), - "highestTime", r.highestTime.String(), - "timeSinceHighest", timeSinceHighest.String(), - "nowRTPExtUsingTime", nowRTPExtUsingTime, - "calculatedClockRate", calculatedClockRate, - "nowRTPExtUsingRate", nowRTPExtUsingRate, - ) + if r.outOfOrderSenderReportCount%10 == 0 { + r.logger.Infow( + "sending sender report, out-of-order, repairing", + "last", r.srNewest.ToString(), + "curr", srData.ToString(), + "timeNow", time.Now().String(), + "extStartTS", r.extStartTS, + "extHighestTS", r.extHighestTS, + "highestTime", r.highestTime.String(), + "timeSinceHighest", timeSinceHighest.String(), + "firstTime", r.firstTime.String(), + "timeSinceFirst", timeSinceFirst.String(), + "nowRTPExtUsingTime", nowRTPExtUsingTime, + "calculatedClockRate", calculatedClockRate, + "nowRTPExtUsingRate", nowRTPExtUsingRate, + "count", r.outOfOrderSenderReportCount, + ) + } + r.outOfOrderSenderReportCount++ + ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate)) nowRTP = uint32(nowRTPExt) } - r.srNewest = &RTCPSenderReportData{ - NTPTimestamp: nowNTP, - RTPTimestamp: nowRTP, - RTPTimestampExt: nowRTPExt, - At: now, - } + r.srNewest = srData if r.srFirst == nil { r.srFirst = r.srNewest } @@ -844,6 +898,7 @@ func (r *RTPStatsSender) setSnInfo(esn uint64, ehsn uint64, pktSize uint16, hdrS snInfo := &r.snInfos[slot] snInfo.pktSize = pktSize snInfo.hdrSize = hdrSize + snInfo.flags = 0 if marker { snInfo.flags |= snInfoFlagMarker } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 3a3d3fa98..35af72df7 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -716,7 +716,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { tp, err := d.forwarder.GetTranslationParams(extPkt, layer) if tp.shouldDrop { if err != nil { - d.params.Logger.Errorw("write rtp packet failed", err) + d.params.Logger.Errorw("could not get translation params", err) } return err } @@ -741,7 +741,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { hdr, err := d.getTranslatedRTPHeader(extPkt, tp) if err != nil { - d.params.Logger.Errorw("write rtp packet failed", err) + d.params.Logger.Errorw("could not get translated RTP header", err) if poolEntity != nil { PacketFactory.Put(poolEntity) } @@ -1496,7 +1496,7 @@ func (d *DownTrack) getH264BlankFrame(_frameEndNeeded bool) ([]byte, error) { func (d *DownTrack) handleRTCP(bytes []byte) { pkts, err := rtcp.Unmarshal(bytes) if err != nil { - d.params.Logger.Errorw("unmarshal rtcp receiver packets err", err) + d.params.Logger.Errorw("could not unmarshal rtcp receiver packets", err) return } @@ -1660,7 +1660,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { var pkt rtp.Packet if err = pkt.Unmarshal(pktBuff[:n]); err != nil { - d.params.Logger.Errorw("unmarshalling rtp packet failed in retransmit", err) + d.params.Logger.Errorw("could not unmarshal rtp packet in retransmit", err) continue } pkt.Header.Marker = epm.marker @@ -1674,7 +1674,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { if d.mime == "video/vp8" && len(pkt.Payload) > 0 && len(epm.codecBytes) != 0 { var incomingVP8 buffer.VP8 if err = incomingVP8.Unmarshal(pkt.Payload); err != nil { - d.params.Logger.Errorw("unmarshalling VP8 packet err", err) + d.params.Logger.Errorw("could not unmarshal VP8 packet", err) PacketFactory.Put(poolEntity) continue } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 47c8c6de9..ff1a89935 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -38,7 +38,8 @@ import ( // Forwarder const ( FlagPauseOnDowngrade = true - FlagFilterRTX = true + FlagFilterRTX = false + FlagFilterRTXLayers = true TransitionCostSpatial = 10 ResumeBehindThresholdSeconds = float64(0.2) // 200ms @@ -1400,15 +1401,14 @@ func (f *Forwarder) CheckSync() (locked bool, layer int32) { } func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [buffer.DefaultMaxLayerSpatial + 1]bool) { - if !FlagFilterRTX { - filtered = nacks - return - } - f.lock.RLock() defer f.lock.RUnlock() - filtered = f.rtpMunger.FilterRTX(nacks) + if !FlagFilterRTX { + filtered = nacks + } else { + filtered = f.rtpMunger.FilterRTX(nacks) + } // // Curb RTX when deficient for two cases @@ -1418,14 +1418,15 @@ func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLaye // // Without the curb, when congestion hits, RTX rate could be so high that it further congests the channel. // - currentLayer := f.vls.GetCurrent() - targetLayer := f.vls.GetTarget() - for layer := int32(0); layer < buffer.DefaultMaxLayerSpatial+1; layer++ { - if f.isDeficientLocked() && (targetLayer.Spatial < currentLayer.Spatial || layer > currentLayer.Spatial) { - disallowedLayers[layer] = true + if FlagFilterRTXLayers { + currentLayer := f.vls.GetCurrent() + targetLayer := f.vls.GetTarget() + for layer := int32(0); layer < buffer.DefaultMaxLayerSpatial+1; layer++ { + if f.isDeficientLocked() && (targetLayer.Spatial < currentLayer.Spatial || layer > currentLayer.Spatial) { + disallowedLayers[layer] = true + } } } - return } @@ -1581,14 +1582,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( extNextTS = extExpectedTS } else if diffSeconds > ResumeBehindHighTresholdSeconds { // could be due to incorrect reference calculation - f.logger.Infow( - "resume, reference very far behind", - "layer", layer, - "extExpectedTS", extExpectedTS, - "extRefTS", extRefTS, - "extLastTS", extLastTS, - "diffSeconds", diffSeconds, - ) + logTransition("resume, reference very far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) extNextTS = extExpectedTS } else { extNextTS = extRefTS diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index e7ba1fb93..560a9246d 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -210,9 +210,6 @@ func NewWebRTCReceiver( isRED: IsRedCodec(track.Codec().MimeType), } - w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig) - w.streamTrackerManager.SetListener(w) - for _, opt := range opts { w = opt(w) } @@ -235,6 +232,8 @@ func NewWebRTCReceiver( }) w.connectionStats.Start(w.trackInfo) + w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig) + w.streamTrackerManager.SetListener(w) // SVC-TODO: Handle DD for non-SVC cases??? if w.isSVC { for _, ext := range receiver.GetParameters().HeaderExtensions { diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 73919b2b5..738b33e33 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -220,7 +220,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara r.logger.Errorw( "unexpected packet ordering", nil, "extIncomingSN", extPkt.ExtSequenceNumber, - "extHighestIncominSN", r.extHighestIncomingSN, + "extHighestIncomingSN", r.extHighestIncomingSN, "extLastSN", r.extLastSN, "snOffsetIncoming", snOffset, "snOffsetHighest", r.snOffset, diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index 7dad24aea..ee6ed8bdb 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -286,7 +286,7 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { meta.nacked++ meta.lastNack = refTime - extTS := uint64(meta.timestamp) + (s.extHighestTS & 0xFFFF_FFFF_FFFF_0000) + extTS := uint64(meta.timestamp) + (s.extHighestTS & 0xFFFF_FFFF_0000_0000) if meta.timestamp > highestTS { extTS -= (1 << 32) }