From bdbc9dcbc707639d534d41c099f7780db1803fe0 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 10 Mar 2024 23:18:54 +0530 Subject: [PATCH] Use start time stamp to calculate down stream sender report. (#2564) * Use start time stamp to calculate down stream sender report. With first packet time adjustment, using the first time stamp is more accurate. This still suffers if the up stream clock rate changes (happens in cases like noise suppression which is not well understood). Will be looking at pass through of sender report from publisher to subscriber. * similar log strings * avoid early sender reports * log messages * Reduce first packet adjustment threshold to 15 seconds --- pkg/sfu/buffer/rtpstats_base.go | 24 ++++---- pkg/sfu/buffer/rtpstats_receiver.go | 4 +- pkg/sfu/buffer/rtpstats_sender.go | 85 +++++++++++------------------ 3 files changed, 47 insertions(+), 66 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 8cc73db9d..7af312c11 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -34,7 +34,7 @@ const ( cFirstSnapshotID = 1 cFirstPacketTimeAdjustWindow = 2 * time.Minute - cFirstPacketTimeAdjustThreshold = 5 * time.Minute + cFirstPacketTimeAdjustThreshold = 15 * time.Second ) // ------------------------------------------------------- @@ -506,18 +506,8 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ts uint32, startTS uint32) { now := r.firstTime.Add(timeSinceFirst) firstTime := now.Add(-samplesDuration) if firstTime.Before(r.firstTime) { - r.logger.Debugw( - "adjusting first packet time", - "startTime", r.startTime.String(), - "nowTime", now.String(), - "before", r.firstTime.String(), - "after", firstTime.String(), - "adjustment", r.firstTime.Sub(firstTime).String(), - "nowTS", ts, - "startTS", startTS, - ) if r.firstTime.Sub(firstTime) > cFirstPacketTimeAdjustThreshold { - r.logger.Infow("first packet time adjustment too big, ignoring", + r.logger.Infow("adjusting first packet time, too big, ignoring", "startTime", r.startTime.String(), "nowTime", now.String(), "before", r.firstTime.String(), @@ -527,6 +517,16 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ts uint32, startTS uint32) { "startTS", startTS, ) } else { + r.logger.Debugw( + "adjusting first packet time", + "startTime", r.startTime.String(), + "nowTime", now.String(), + "before", r.firstTime.String(), + "after", firstTime.String(), + "adjustment", r.firstTime.Sub(firstTime).String(), + "nowTS", ts, + "startTS", startTS, + ) r.firstTime = firstTime } } diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 12ab315af..5b5e502ca 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -251,7 +251,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) // prevent against extreme case of anachronous sender reports if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp { r.logger.Infow( - "received anachronous sender report", + "received sender report, anachronous, dropping", "first", r.srFirst, "last", r.srNewest, "current", srData, @@ -311,7 +311,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) (timeSinceFirst > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate)) { if r.clockSkewCount%100 == 0 { r.logger.Infow( - "clock rate skew", + "received sender report, clock skew", "first", r.srFirst, "last", r.srNewest, "current", &srDataCopy, diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 7e8b22510..c748b483a 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -29,6 +29,8 @@ import ( const ( cSnInfoSize = 4096 cSnInfoMask = cSnInfoSize - 1 + + cSenderReportInitialWait = time.Second ) type snInfoFlag byte @@ -157,9 +159,8 @@ type RTPStatsSender struct { nextSenderSnapshotID uint32 senderSnapshots []senderSnapshot - clockSkewCount int - outOfOrderSenderReportCount int - metadataCacheOverflowCount int + clockSkewCount int + metadataCacheOverflowCount int srFeedFirst *RTCPSenderReportData srFeedNewest *RTCPSenderReportData @@ -648,13 +649,13 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui // construct current time based on monotonic clock timeSinceFirst := time.Since(r.firstTime) + if timeSinceFirst < cSenderReportInitialWait { + return nil + } now := r.firstTime.Add(timeSinceFirst) nowNTP := mediatransportutil.ToNtpTime(now) - - timeSinceHighest := now.Sub(r.highestTime) - nowRTPExt := r.extHighestTS + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9) - nowRTPExtUsingTime := nowRTPExt - nowRTP := uint32(nowRTPExt) + nowRTPExtUsingTime := r.extStartTS + uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) + nowRTPExt := nowRTPExtUsingTime // It is possible that publisher is pacing at a slower rate. // That would make `highestTS` to be lagging the RTP time stamp in the RTCP Sender Report from publisher. @@ -664,13 +665,12 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui nowRTPExtUsingRate = r.extStartTS + uint64(float64(calculatedClockRate)*timeSinceFirst.Seconds()) if nowRTPExtUsingRate > nowRTPExt { nowRTPExt = nowRTPExtUsingRate - nowRTP = uint32(nowRTPExt) } } srData := &RTCPSenderReportData{ NTPTimestamp: nowNTP, - RTPTimestamp: nowRTP, + RTPTimestamp: uint32(nowRTPExt), RTPTimestampExt: nowRTPExt, At: now, } @@ -691,7 +691,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui "extStartTS", r.extStartTS, "extHighestTS", r.extHighestTS, "highestTime", r.highestTime.String(), - "timeSinceHighest", timeSinceHighest.String(), + "timeSinceHighest", now.Sub(r.highestTime).String(), "firstTime", r.firstTime.String(), "timeSinceFirst", timeSinceFirst.String(), "nowRTPExtUsingTime", nowRTPExtUsingTime, @@ -708,46 +708,27 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui } if r.srNewest != nil && nowRTPExt < r.srNewest.RTPTimestampExt { - // If report being generated is behind, use the time difference and - // clock rate of codec to produce next report. - // - // Current report could be behind due to the following - // - Publisher pacing - // - Due to above, report from publisher side is ahead of packet timestamps. - // Note that report will map wall clock to timestamp at capture time and happens before the pacer. - // - Pause/Mute followed by resume, some combination of events that could - // result in this module not having calculated clock rate of publisher side. - // - When the above happens, current will be generated using highestTS which could be behind. - // That could end up behind the last report's timestamp in extreme cases - if r.outOfOrderSenderReportCount%10 == 0 { - r.logger.Infow( - "sending sender report, out-of-order, repairing", - "first", r.srFirst, - "last", r.srNewest, - "curr", srData, - "firstFeed", r.srFeedFirst, - "lastFeed", r.srFeedNewest, - "timeNow", time.Now().String(), - "extStartTS", r.extStartTS, - "extHighestTS", r.extHighestTS, - "highestTime", r.highestTime.String(), - "timeSinceHighest", timeSinceHighest.String(), - "firstTime", r.firstTime.String(), - "timeSinceFirst", timeSinceFirst.String(), - "nowRTPExtUsingTime", nowRTPExtUsingTime, - "calculatedClockRate", calculatedClockRate, - "nowRTPExtUsingRate", nowRTPExtUsingRate, - "count", r.outOfOrderSenderReportCount, - ) - } - r.outOfOrderSenderReportCount++ - - ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) - nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate)) - nowRTP = uint32(nowRTPExt) - - srData.RTPTimestamp = nowRTP - srData.RTPTimestampExt = nowRTPExt + // If report being generated is behind the last report, skip it. + // Should not happen. + r.logger.Infow( + "sending sender report, out-of-order, skipping", + "first", r.srFirst, + "last", r.srNewest, + "curr", srData, + "firstFeed", r.srFeedFirst, + "lastFeed", r.srFeedNewest, + "timeNow", time.Now().String(), + "extStartTS", r.extStartTS, + "extHighestTS", r.extHighestTS, + "highestTime", r.highestTime.String(), + "timeSinceHighest", now.Sub(r.highestTime).String(), + "firstTime", r.firstTime.String(), + "timeSinceFirst", timeSinceFirst.String(), + "nowRTPExtUsingTime", nowRTPExtUsingTime, + "calculatedClockRate", calculatedClockRate, + "nowRTPExtUsingRate", nowRTPExtUsingRate, + ) + return nil } r.srNewest = srData @@ -758,7 +739,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui return &rtcp.SenderReport{ SSRC: ssrc, NTPTime: uint64(nowNTP), - RTPTime: nowRTP, + RTPTime: uint32(nowRTPExt), PacketCount: uint32(r.getTotalPacketsPrimary(r.extStartSN, r.extHighestSN) + r.packetsDuplicate + r.packetsPadding), OctetCount: uint32(r.bytes + r.bytesDuplicate + r.bytesPadding), }