diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 8cc73db9d..7af312c11 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -34,7 +34,7 @@ const ( cFirstSnapshotID = 1 cFirstPacketTimeAdjustWindow = 2 * time.Minute - cFirstPacketTimeAdjustThreshold = 5 * time.Minute + cFirstPacketTimeAdjustThreshold = 15 * time.Second ) // ------------------------------------------------------- @@ -506,18 +506,8 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ts uint32, startTS uint32) { now := r.firstTime.Add(timeSinceFirst) firstTime := now.Add(-samplesDuration) if firstTime.Before(r.firstTime) { - r.logger.Debugw( - "adjusting first packet time", - "startTime", r.startTime.String(), - "nowTime", now.String(), - "before", r.firstTime.String(), - "after", firstTime.String(), - "adjustment", r.firstTime.Sub(firstTime).String(), - "nowTS", ts, - "startTS", startTS, - ) if r.firstTime.Sub(firstTime) > cFirstPacketTimeAdjustThreshold { - r.logger.Infow("first packet time adjustment too big, ignoring", + r.logger.Infow("adjusting first packet time, too big, ignoring", "startTime", r.startTime.String(), "nowTime", now.String(), "before", r.firstTime.String(), @@ -527,6 +517,16 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ts uint32, startTS uint32) { "startTS", startTS, ) } else { + r.logger.Debugw( + "adjusting first packet time", + "startTime", r.startTime.String(), + "nowTime", now.String(), + "before", r.firstTime.String(), + "after", firstTime.String(), + "adjustment", r.firstTime.Sub(firstTime).String(), + "nowTS", ts, + "startTS", startTS, + ) r.firstTime = firstTime } } diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 12ab315af..5b5e502ca 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -251,7 +251,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) // prevent against extreme case of anachronous sender reports if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp { r.logger.Infow( - "received anachronous sender report", + "received sender report, anachronous, dropping", "first", r.srFirst, "last", r.srNewest, "current", srData, @@ -311,7 +311,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) (timeSinceFirst > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate)) { if r.clockSkewCount%100 == 0 { r.logger.Infow( - "clock rate skew", + "received sender report, clock skew", "first", r.srFirst, "last", r.srNewest, "current", &srDataCopy, diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 7e8b22510..c748b483a 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -29,6 +29,8 @@ import ( const ( cSnInfoSize = 4096 cSnInfoMask = cSnInfoSize - 1 + + cSenderReportInitialWait = time.Second ) type snInfoFlag byte @@ -157,9 +159,8 @@ type RTPStatsSender struct { nextSenderSnapshotID uint32 senderSnapshots []senderSnapshot - clockSkewCount int - outOfOrderSenderReportCount int - metadataCacheOverflowCount int + clockSkewCount int + metadataCacheOverflowCount int srFeedFirst *RTCPSenderReportData srFeedNewest *RTCPSenderReportData @@ -648,13 +649,13 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui // construct current time based on monotonic clock timeSinceFirst := time.Since(r.firstTime) + if timeSinceFirst < cSenderReportInitialWait { + return nil + } now := r.firstTime.Add(timeSinceFirst) nowNTP := mediatransportutil.ToNtpTime(now) - - timeSinceHighest := now.Sub(r.highestTime) - nowRTPExt := r.extHighestTS + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9) - nowRTPExtUsingTime := nowRTPExt - nowRTP := uint32(nowRTPExt) + nowRTPExtUsingTime := r.extStartTS + uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) + nowRTPExt := nowRTPExtUsingTime // It is possible that publisher is pacing at a slower rate. // That would make `highestTS` to be lagging the RTP time stamp in the RTCP Sender Report from publisher. @@ -664,13 +665,12 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui nowRTPExtUsingRate = r.extStartTS + uint64(float64(calculatedClockRate)*timeSinceFirst.Seconds()) if nowRTPExtUsingRate > nowRTPExt { nowRTPExt = nowRTPExtUsingRate - nowRTP = uint32(nowRTPExt) } } srData := &RTCPSenderReportData{ NTPTimestamp: nowNTP, - RTPTimestamp: nowRTP, + RTPTimestamp: uint32(nowRTPExt), RTPTimestampExt: nowRTPExt, At: now, } @@ -691,7 +691,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui "extStartTS", r.extStartTS, "extHighestTS", r.extHighestTS, "highestTime", r.highestTime.String(), - "timeSinceHighest", timeSinceHighest.String(), + "timeSinceHighest", now.Sub(r.highestTime).String(), "firstTime", r.firstTime.String(), "timeSinceFirst", timeSinceFirst.String(), "nowRTPExtUsingTime", nowRTPExtUsingTime, @@ -708,46 +708,27 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui } if r.srNewest != nil && nowRTPExt < r.srNewest.RTPTimestampExt { - // If report being generated is behind, use the time difference and - // clock rate of codec to produce next report. - // - // Current report could be behind due to the following - // - Publisher pacing - // - Due to above, report from publisher side is ahead of packet timestamps. - // Note that report will map wall clock to timestamp at capture time and happens before the pacer. - // - Pause/Mute followed by resume, some combination of events that could - // result in this module not having calculated clock rate of publisher side. - // - When the above happens, current will be generated using highestTS which could be behind. - // That could end up behind the last report's timestamp in extreme cases - if r.outOfOrderSenderReportCount%10 == 0 { - r.logger.Infow( - "sending sender report, out-of-order, repairing", - "first", r.srFirst, - "last", r.srNewest, - "curr", srData, - "firstFeed", r.srFeedFirst, - "lastFeed", r.srFeedNewest, - "timeNow", time.Now().String(), - "extStartTS", r.extStartTS, - "extHighestTS", r.extHighestTS, - "highestTime", r.highestTime.String(), - "timeSinceHighest", timeSinceHighest.String(), - "firstTime", r.firstTime.String(), - "timeSinceFirst", timeSinceFirst.String(), - "nowRTPExtUsingTime", nowRTPExtUsingTime, - "calculatedClockRate", calculatedClockRate, - "nowRTPExtUsingRate", nowRTPExtUsingRate, - "count", r.outOfOrderSenderReportCount, - ) - } - r.outOfOrderSenderReportCount++ - - ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) - nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate)) - nowRTP = uint32(nowRTPExt) - - srData.RTPTimestamp = nowRTP - srData.RTPTimestampExt = nowRTPExt + // If report being generated is behind the last report, skip it. + // Should not happen. + r.logger.Infow( + "sending sender report, out-of-order, skipping", + "first", r.srFirst, + "last", r.srNewest, + "curr", srData, + "firstFeed", r.srFeedFirst, + "lastFeed", r.srFeedNewest, + "timeNow", time.Now().String(), + "extStartTS", r.extStartTS, + "extHighestTS", r.extHighestTS, + "highestTime", r.highestTime.String(), + "timeSinceHighest", now.Sub(r.highestTime).String(), + "firstTime", r.firstTime.String(), + "timeSinceFirst", timeSinceFirst.String(), + "nowRTPExtUsingTime", nowRTPExtUsingTime, + "calculatedClockRate", calculatedClockRate, + "nowRTPExtUsingRate", nowRTPExtUsingRate, + ) + return nil } r.srNewest = srData @@ -758,7 +739,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui return &rtcp.SenderReport{ SSRC: ssrc, NTPTime: uint64(nowNTP), - RTPTime: nowRTP, + RTPTime: uint32(nowRTPExt), PacketCount: uint32(r.getTotalPacketsPrimary(r.extStartSN, r.extHighestSN) + r.packetsDuplicate + r.packetsPadding), OctetCount: uint32(r.bytes + r.bytesDuplicate + r.bytesPadding), }