Use start time stamp to calculate down stream sender report. (#2564)

* Use start time stamp to calculate down stream sender report.

With first packet time adjustment, using the first time stamp is more
accurate.

This still suffers if the up stream clock rate changes (happens in cases
like noise suppression which is not well understood). Will be looking at
pass through of sender report from publisher to subscriber.

* similar log strings

* avoid early sender reports

* log messages

* Reduce first packet adjustment threshold to 15 seconds
This commit is contained in:
Raja Subramanian
2024-03-10 23:18:54 +05:30
committed by GitHub
parent a08b058abc
commit bdbc9dcbc7
3 changed files with 47 additions and 66 deletions
+12 -12
View File
@@ -34,7 +34,7 @@ const (
cFirstSnapshotID = 1
cFirstPacketTimeAdjustWindow = 2 * time.Minute
cFirstPacketTimeAdjustThreshold = 5 * time.Minute
cFirstPacketTimeAdjustThreshold = 15 * time.Second
)
// -------------------------------------------------------
@@ -506,18 +506,8 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ts uint32, startTS uint32) {
now := r.firstTime.Add(timeSinceFirst)
firstTime := now.Add(-samplesDuration)
if firstTime.Before(r.firstTime) {
r.logger.Debugw(
"adjusting first packet time",
"startTime", r.startTime.String(),
"nowTime", now.String(),
"before", r.firstTime.String(),
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime).String(),
"nowTS", ts,
"startTS", startTS,
)
if r.firstTime.Sub(firstTime) > cFirstPacketTimeAdjustThreshold {
r.logger.Infow("first packet time adjustment too big, ignoring",
r.logger.Infow("adjusting first packet time, too big, ignoring",
"startTime", r.startTime.String(),
"nowTime", now.String(),
"before", r.firstTime.String(),
@@ -527,6 +517,16 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ts uint32, startTS uint32) {
"startTS", startTS,
)
} else {
r.logger.Debugw(
"adjusting first packet time",
"startTime", r.startTime.String(),
"nowTime", now.String(),
"before", r.firstTime.String(),
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime).String(),
"nowTS", ts,
"startTS", startTS,
)
r.firstTime = firstTime
}
}
+2 -2
View File
@@ -251,7 +251,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
// prevent against extreme case of anachronous sender reports
if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp {
r.logger.Infow(
"received anachronous sender report",
"received sender report, anachronous, dropping",
"first", r.srFirst,
"last", r.srNewest,
"current", srData,
@@ -311,7 +311,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
(timeSinceFirst > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate)) {
if r.clockSkewCount%100 == 0 {
r.logger.Infow(
"clock rate skew",
"received sender report, clock skew",
"first", r.srFirst,
"last", r.srNewest,
"current", &srDataCopy,
+33 -52
View File
@@ -29,6 +29,8 @@ import (
const (
cSnInfoSize = 4096
cSnInfoMask = cSnInfoSize - 1
cSenderReportInitialWait = time.Second
)
type snInfoFlag byte
@@ -157,9 +159,8 @@ type RTPStatsSender struct {
nextSenderSnapshotID uint32
senderSnapshots []senderSnapshot
clockSkewCount int
outOfOrderSenderReportCount int
metadataCacheOverflowCount int
clockSkewCount int
metadataCacheOverflowCount int
srFeedFirst *RTCPSenderReportData
srFeedNewest *RTCPSenderReportData
@@ -648,13 +649,13 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui
// construct current time based on monotonic clock
timeSinceFirst := time.Since(r.firstTime)
if timeSinceFirst < cSenderReportInitialWait {
return nil
}
now := r.firstTime.Add(timeSinceFirst)
nowNTP := mediatransportutil.ToNtpTime(now)
timeSinceHighest := now.Sub(r.highestTime)
nowRTPExt := r.extHighestTS + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
nowRTPExtUsingTime := nowRTPExt
nowRTP := uint32(nowRTPExt)
nowRTPExtUsingTime := r.extStartTS + uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)
nowRTPExt := nowRTPExtUsingTime
// It is possible that publisher is pacing at a slower rate.
// That would make `highestTS` to be lagging the RTP time stamp in the RTCP Sender Report from publisher.
@@ -664,13 +665,12 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui
nowRTPExtUsingRate = r.extStartTS + uint64(float64(calculatedClockRate)*timeSinceFirst.Seconds())
if nowRTPExtUsingRate > nowRTPExt {
nowRTPExt = nowRTPExtUsingRate
nowRTP = uint32(nowRTPExt)
}
}
srData := &RTCPSenderReportData{
NTPTimestamp: nowNTP,
RTPTimestamp: nowRTP,
RTPTimestamp: uint32(nowRTPExt),
RTPTimestampExt: nowRTPExt,
At: now,
}
@@ -691,7 +691,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui
"extStartTS", r.extStartTS,
"extHighestTS", r.extHighestTS,
"highestTime", r.highestTime.String(),
"timeSinceHighest", timeSinceHighest.String(),
"timeSinceHighest", now.Sub(r.highestTime).String(),
"firstTime", r.firstTime.String(),
"timeSinceFirst", timeSinceFirst.String(),
"nowRTPExtUsingTime", nowRTPExtUsingTime,
@@ -708,46 +708,27 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui
}
if r.srNewest != nil && nowRTPExt < r.srNewest.RTPTimestampExt {
// If report being generated is behind, use the time difference and
// clock rate of codec to produce next report.
//
// Current report could be behind due to the following
// - Publisher pacing
// - Due to above, report from publisher side is ahead of packet timestamps.
// Note that report will map wall clock to timestamp at capture time and happens before the pacer.
// - Pause/Mute followed by resume, some combination of events that could
// result in this module not having calculated clock rate of publisher side.
// - When the above happens, current will be generated using highestTS which could be behind.
// That could end up behind the last report's timestamp in extreme cases
if r.outOfOrderSenderReportCount%10 == 0 {
r.logger.Infow(
"sending sender report, out-of-order, repairing",
"first", r.srFirst,
"last", r.srNewest,
"curr", srData,
"firstFeed", r.srFeedFirst,
"lastFeed", r.srFeedNewest,
"timeNow", time.Now().String(),
"extStartTS", r.extStartTS,
"extHighestTS", r.extHighestTS,
"highestTime", r.highestTime.String(),
"timeSinceHighest", timeSinceHighest.String(),
"firstTime", r.firstTime.String(),
"timeSinceFirst", timeSinceFirst.String(),
"nowRTPExtUsingTime", nowRTPExtUsingTime,
"calculatedClockRate", calculatedClockRate,
"nowRTPExtUsingRate", nowRTPExtUsingRate,
"count", r.outOfOrderSenderReportCount,
)
}
r.outOfOrderSenderReportCount++
ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate))
nowRTP = uint32(nowRTPExt)
srData.RTPTimestamp = nowRTP
srData.RTPTimestampExt = nowRTPExt
// If report being generated is behind the last report, skip it.
// Should not happen.
r.logger.Infow(
"sending sender report, out-of-order, skipping",
"first", r.srFirst,
"last", r.srNewest,
"curr", srData,
"firstFeed", r.srFeedFirst,
"lastFeed", r.srFeedNewest,
"timeNow", time.Now().String(),
"extStartTS", r.extStartTS,
"extHighestTS", r.extHighestTS,
"highestTime", r.highestTime.String(),
"timeSinceHighest", now.Sub(r.highestTime).String(),
"firstTime", r.firstTime.String(),
"timeSinceFirst", timeSinceFirst.String(),
"nowRTPExtUsingTime", nowRTPExtUsingTime,
"calculatedClockRate", calculatedClockRate,
"nowRTPExtUsingRate", nowRTPExtUsingRate,
)
return nil
}
r.srNewest = srData
@@ -758,7 +739,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui
return &rtcp.SenderReport{
SSRC: ssrc,
NTPTime: uint64(nowNTP),
RTPTime: nowRTP,
RTPTime: uint32(nowRTPExt),
PacketCount: uint32(r.getTotalPacketsPrimary(r.extStartSN, r.extHighestSN) + r.packetsDuplicate + r.packetsPadding),
OctetCount: uint32(r.bytes + r.bytesDuplicate + r.bytesPadding),
}