Merge remote-tracking branch 'origin/master' into raja_1833

This commit is contained in:
boks1971
2023-10-21 11:05:12 +05:30
8 changed files with 189 additions and 92 deletions
+14 -2
View File
@@ -105,6 +105,8 @@ type snapshot struct {
maxJitter float64
}
// ------------------------------------------------------------------
type RTCPSenderReportData struct {
RTPTimestamp uint32
RTPTimestampExt uint64
@@ -112,6 +114,16 @@ type RTCPSenderReportData struct {
At time.Time
}
func (r *RTCPSenderReportData) ToString() string {
if r == nil {
return ""
}
return fmt.Sprintf("ntp: %s, rtp: %d, extRtp: %d, at: %s", r.NTPTimestamp.Time().String(), r.RTPTimestamp, r.RTPTimestampExt, r.At.String())
}
// ------------------------------------------------------------------
type RTPStatsParams struct {
ClockRate uint32
Logger logger.Logger
@@ -527,8 +539,8 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes
packetsExpected := now.extStartSN - then.extStartSN
if packetsExpected > cNumSequenceNumbers {
r.logger.Errorw(
"too many packets expected in delta", nil,
r.logger.Infow(
"too many packets expected in delta",
"startSN", then.extStartSN,
"endSN", now.extStartSN,
"packetsExpected", packetsExpected,
+55 -18
View File
@@ -16,6 +16,7 @@ package buffer
import (
"fmt"
"math"
"time"
"github.com/pion/rtcp"
@@ -51,6 +52,9 @@ type RTPStatsReceiver struct {
timestamp *utils.WrapAround[uint32, uint64]
history *protoutils.Bitmap[uint64]
clockSkewCount int
outOfOrderSsenderReportCount int
}
func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver {
@@ -110,7 +114,7 @@ func (r *RTPStatsReceiver) Update(
r.snapshots[i] = r.initSnapshot(r.startTime, r.sequenceNumber.GetExtendedStart())
}
r.logger.Infow(
r.logger.Debugw(
"rtp receiver stream start",
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
@@ -153,6 +157,12 @@ func (r *RTPStatsReceiver) Update(
if -gapSN >= cNumSequenceNumbers {
r.logger.Warnw(
"large sequence number gap negative", nil,
"extStartSN", r.sequenceNumber.GetExtendedStart(),
"extHighestSN", r.sequenceNumber.GetExtendedHighest(),
"extStartTS", r.timestamp.GetExtendedStart(),
"extHighestTS", r.timestamp.GetExtendedHighest(),
"firstTime", r.firstTime.String(),
"highestTime", r.highestTime.String(),
"prev", resSN.PreExtendedHighest,
"curr", resSN.ExtendedVal,
"gap", gapSN,
@@ -219,6 +229,12 @@ func (r *RTPStatsReceiver) Update(
if gapSN >= cNumSequenceNumbers {
r.logger.Warnw(
"large sequence number gap", nil,
"extStartSN", r.sequenceNumber.GetExtendedStart(),
"extHighestSN", r.sequenceNumber.GetExtendedHighest(),
"extStartTS", r.timestamp.GetExtendedStart(),
"extHighestTS", r.timestamp.GetExtendedHighest(),
"firstTime", r.firstTime.String(),
"highestTime", r.highestTime.String(),
"prev", resSN.PreExtendedHighest,
"curr", resSN.ExtendedVal,
"gap", gapSN,
@@ -287,12 +303,8 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp {
r.logger.Infow(
"received anachronous sender report",
"currentNTP", srData.NTPTimestamp.Time().String(),
"currentRTP", srData.RTPTimestamp,
"currentAt", srData.At.String(),
"lastNTP", r.srNewest.NTPTimestamp.Time().String(),
"lastRTP", r.srNewest.RTPTimestamp,
"lastAt", r.srNewest.At.String(),
"last", r.srNewest.ToString(),
"current", srData.ToString(),
)
return
}
@@ -310,22 +322,47 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp, r.timestamp.GetStart())
if r.srNewest != nil {
timeSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()).Seconds()
rtpDiffSinceLast := srDataCopy.RTPTimestampExt - r.srNewest.RTPTimestampExt
calculatedClockRateFromLast := float64(rtpDiffSinceLast) / timeSinceLast
timeSinceFirst := srData.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()).Seconds()
rtpDiffSinceFirst := srDataCopy.RTPTimestampExt - r.srFirst.RTPTimestampExt
calculatedClockRateFromFirst := float64(rtpDiffSinceFirst) / timeSinceFirst
if (timeSinceLast > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate)) ||
(timeSinceFirst > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate)) {
if r.clockSkewCount%10 == 0 {
r.logger.Infow(
"clock rate skew",
"first", r.srFirst.ToString(),
"last", r.srNewest.ToString(),
"current", srDataCopy.ToString(),
"calculatedFirst", calculatedClockRateFromFirst,
"calculatedLast", calculatedClockRateFromLast,
"count", r.clockSkewCount,
)
}
r.clockSkewCount++
}
}
if r.srNewest != nil && srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt {
// This can happen when a track is replaced with a null and then restored -
// i. e. muting replacing with null and unmute restoring the original track.
// Under such a condition reset the sender reports to start from this point.
// Resetting will ensure sample rate calculations do not go haywire due to negative time.
r.logger.Infow(
"received sender report, out-of-order, resetting",
"prevTSExt", r.srNewest.RTPTimestampExt,
"prevRTP", r.srNewest.RTPTimestamp,
"prevNTP", r.srNewest.NTPTimestamp.Time().String(),
"prevAt", r.srNewest.At.String(),
"currTSExt", srDataCopy.RTPTimestampExt,
"currRTP", srDataCopy.RTPTimestamp,
"currNTP", srDataCopy.NTPTimestamp.Time().String(),
"currentAt", srDataCopy.At.String(),
)
if r.outOfOrderSsenderReportCount%10 == 0 {
r.logger.Infow(
"received sender report, out-of-order, resetting",
"last", r.srNewest.ToString(),
"current", srDataCopy.ToString(),
"count", r.outOfOrderSsenderReportCount,
)
}
r.outOfOrderSsenderReportCount++
r.srFirst = nil
}
+96 -41
View File
@@ -17,6 +17,7 @@ package buffer
import (
"errors"
"fmt"
"math"
"time"
"github.com/pion/rtcp"
@@ -155,6 +156,10 @@ type RTPStatsSender struct {
nextSenderSnapshotID uint32
senderSnapshots []senderSnapshot
clockSkewCount int
outOfOrderSenderReportCount int
metadataCacheOverflowCount int
}
func NewRTPStatsSender(params RTPStatsParams) *RTPStatsSender {
@@ -264,7 +269,7 @@ func (r *RTPStatsSender) Update(
r.senderSnapshots[i] = r.initSenderSnapshot(r.startTime, r.extStartSN)
}
r.logger.Infow(
r.logger.Debugw(
"rtp sender stream start",
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
@@ -284,6 +289,12 @@ func (r *RTPStatsSender) Update(
if -gapSN >= cNumSequenceNumbers {
r.logger.Warnw(
"large sequence number gap negative", nil,
"extStartSN", r.extStartSN,
"extHighestSN", r.extHighestSN,
"extStartTS", r.extStartTS,
"extHighestTS", r.extHighestTS,
"firstTime", r.firstTime.String(),
"highestTime", r.highestTime.String(),
"prev", r.extHighestSN,
"curr", extSequenceNumber,
"gap", gapSN,
@@ -344,6 +355,12 @@ func (r *RTPStatsSender) Update(
if gapSN >= cNumSequenceNumbers {
r.logger.Warnw(
"large sequence number gap", nil,
"extStartSN", r.extStartSN,
"extHighestSN", r.extHighestSN,
"extStartTS", r.extStartTS,
"extHighestTS", r.extHighestTS,
"firstTime", r.firstTime.String(),
"highestTime", r.highestTime.String(),
"prev", r.extHighestSN,
"curr", extSequenceNumber,
"gap", gapSN,
@@ -509,22 +526,26 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
eis := &s.intervalStats
eis.aggregate(&is)
if is.packetsNotFound != 0 {
r.logger.Warnw(
"potential sequence number de-sync", nil,
"lastRRTime", r.lastRRTime.String(),
"lastRR", r.lastRR,
"sinceLastRR", time.Since(r.lastRRTime).String(),
"receivedRR", rr,
"extStartSN", r.extStartSN,
"extHighestSN", r.extHighestSN,
"extLastRRSN", s.extLastRRSN,
"extReceivedRRSN", extReceivedRRSN,
"packetsInInterval", extReceivedRRSN-s.extLastRRSN,
"intervalStats", is.ToString(),
"aggregateIntervalStats", eis.ToString(),
"extHighestSNFromRR", r.extHighestSNFromRR,
"packetsLostFromRR", r.packetsLostFromRR,
)
if r.metadataCacheOverflowCount%10 == 0 {
r.logger.Infow(
"metadata cache overflow",
"lastRRTime", r.lastRRTime.String(),
"lastRR", r.lastRR,
"sinceLastRR", time.Since(r.lastRRTime).String(),
"receivedRR", rr,
"extStartSN", r.extStartSN,
"extHighestSN", r.extHighestSN,
"extLastRRSN", s.extLastRRSN,
"extReceivedRRSN", extReceivedRRSN,
"packetsInInterval", extReceivedRRSN-s.extLastRRSN,
"intervalStats", is.ToString(),
"aggregateIntervalStats", eis.ToString(),
"extHighestSNFromRR", r.extHighestSNFromRR,
"packetsLostFromRR", r.packetsLostFromRR,
"count", r.metadataCacheOverflowCount,
)
}
r.metadataCacheOverflowCount++
}
s.extLastRRSN = extReceivedRRSN
}
@@ -593,6 +614,42 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui
}
}
srData := &RTCPSenderReportData{
NTPTimestamp: nowNTP,
RTPTimestamp: nowRTP,
RTPTimestampExt: nowRTPExt,
At: now,
}
if r.srNewest != nil {
timeSinceLastReport := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()).Seconds()
rtpDiffSinceLastReport := nowRTPExt - r.srNewest.RTPTimestampExt
windowClockRate := float64(rtpDiffSinceLastReport) / timeSinceLastReport
if timeSinceLastReport > 0.2 && math.Abs(float64(r.params.ClockRate)-windowClockRate) > 0.2*float64(r.params.ClockRate) {
if r.clockSkewCount%10 == 0 {
r.logger.Infow(
"sending sender report, clock skew",
"last", r.srNewest.ToString(),
"curr", srData.ToString(),
"timeNow", time.Now().String(),
"extStartTS", r.extStartTS,
"extHighestTS", r.extHighestTS,
"highestTime", r.highestTime.String(),
"timeSinceHighest", timeSinceHighest.String(),
"firstTime", r.firstTime.String(),
"timeSinceFirst", timeSinceFirst.String(),
"nowRTPExtUsingTime", nowRTPExtUsingTime,
"calculatedClockRate", calculatedClockRate,
"nowRTPExtUsingRate", nowRTPExtUsingRate,
"timeSinceLastReport", timeSinceLastReport,
"rtpDiffSinceLastReport", rtpDiffSinceLastReport,
"windowClockRate", windowClockRate,
"count", r.clockSkewCount,
)
}
r.clockSkewCount++
}
}
if r.srNewest != nil && nowRTPExt < r.srNewest.RTPTimestampExt {
// If report being generated is behind, use the time difference and
// clock rate of codec to produce next report.
@@ -605,35 +662,32 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui
// result in this module not having calculated clock rate of publisher side.
// - When the above happens, current will be generated using highestTS which could be behind.
// That could end up behind the last report's timestamp in extreme cases
r.logger.Infow(
"sending sender report, out-of-order, repairing",
"prevTSExt", r.srNewest.RTPTimestampExt,
"prevRTP", r.srNewest.RTPTimestamp,
"prevNTP", r.srNewest.NTPTimestamp.Time().String(),
"extHighestTS", r.extHighestTS,
"currTSExt", nowRTPExt,
"currRTP", nowRTP,
"currNTP", nowNTP.Time().String(),
"timeNow", time.Now().String(),
"firstTime", r.firstTime.String(),
"timeSinceFirst", timeSinceFirst.String(),
"highestTime", r.highestTime.String(),
"timeSinceHighest", timeSinceHighest.String(),
"nowRTPExtUsingTime", nowRTPExtUsingTime,
"calculatedClockRate", calculatedClockRate,
"nowRTPExtUsingRate", nowRTPExtUsingRate,
)
if r.outOfOrderSenderReportCount%10 == 0 {
r.logger.Infow(
"sending sender report, out-of-order, repairing",
"last", r.srNewest.ToString(),
"curr", srData.ToString(),
"timeNow", time.Now().String(),
"extStartTS", r.extStartTS,
"extHighestTS", r.extHighestTS,
"highestTime", r.highestTime.String(),
"timeSinceHighest", timeSinceHighest.String(),
"firstTime", r.firstTime.String(),
"timeSinceFirst", timeSinceFirst.String(),
"nowRTPExtUsingTime", nowRTPExtUsingTime,
"calculatedClockRate", calculatedClockRate,
"nowRTPExtUsingRate", nowRTPExtUsingRate,
"count", r.outOfOrderSenderReportCount,
)
}
r.outOfOrderSenderReportCount++
ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate))
nowRTP = uint32(nowRTPExt)
}
r.srNewest = &RTCPSenderReportData{
NTPTimestamp: nowNTP,
RTPTimestamp: nowRTP,
RTPTimestampExt: nowRTPExt,
At: now,
}
r.srNewest = srData
if r.srFirst == nil {
r.srFirst = r.srNewest
}
@@ -844,6 +898,7 @@ func (r *RTPStatsSender) setSnInfo(esn uint64, ehsn uint64, pktSize uint16, hdrS
snInfo := &r.snInfos[slot]
snInfo.pktSize = pktSize
snInfo.hdrSize = hdrSize
snInfo.flags = 0
if marker {
snInfo.flags |= snInfoFlagMarker
}
+5 -5
View File
@@ -716,7 +716,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
tp, err := d.forwarder.GetTranslationParams(extPkt, layer)
if tp.shouldDrop {
if err != nil {
d.params.Logger.Errorw("write rtp packet failed", err)
d.params.Logger.Errorw("could not get translation params", err)
}
return err
}
@@ -741,7 +741,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
hdr, err := d.getTranslatedRTPHeader(extPkt, tp)
if err != nil {
d.params.Logger.Errorw("write rtp packet failed", err)
d.params.Logger.Errorw("could not get translated RTP header", err)
if poolEntity != nil {
PacketFactory.Put(poolEntity)
}
@@ -1496,7 +1496,7 @@ func (d *DownTrack) getH264BlankFrame(_frameEndNeeded bool) ([]byte, error) {
func (d *DownTrack) handleRTCP(bytes []byte) {
pkts, err := rtcp.Unmarshal(bytes)
if err != nil {
d.params.Logger.Errorw("unmarshal rtcp receiver packets err", err)
d.params.Logger.Errorw("could not unmarshal rtcp receiver packets", err)
return
}
@@ -1660,7 +1660,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
var pkt rtp.Packet
if err = pkt.Unmarshal(pktBuff[:n]); err != nil {
d.params.Logger.Errorw("unmarshalling rtp packet failed in retransmit", err)
d.params.Logger.Errorw("could not unmarshal rtp packet in retransmit", err)
continue
}
pkt.Header.Marker = epm.marker
@@ -1674,7 +1674,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
if d.mime == "video/vp8" && len(pkt.Payload) > 0 && len(epm.codecBytes) != 0 {
var incomingVP8 buffer.VP8
if err = incomingVP8.Unmarshal(pkt.Payload); err != nil {
d.params.Logger.Errorw("unmarshalling VP8 packet err", err)
d.params.Logger.Errorw("could not unmarshal VP8 packet", err)
PacketFactory.Put(poolEntity)
continue
}
+15 -21
View File
@@ -38,7 +38,8 @@ import (
// Forwarder
const (
FlagPauseOnDowngrade = true
FlagFilterRTX = true
FlagFilterRTX = false
FlagFilterRTXLayers = true
TransitionCostSpatial = 10
ResumeBehindThresholdSeconds = float64(0.2) // 200ms
@@ -1400,15 +1401,14 @@ func (f *Forwarder) CheckSync() (locked bool, layer int32) {
}
func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [buffer.DefaultMaxLayerSpatial + 1]bool) {
if !FlagFilterRTX {
filtered = nacks
return
}
f.lock.RLock()
defer f.lock.RUnlock()
filtered = f.rtpMunger.FilterRTX(nacks)
if !FlagFilterRTX {
filtered = nacks
} else {
filtered = f.rtpMunger.FilterRTX(nacks)
}
//
// Curb RTX when deficient for two cases
@@ -1418,14 +1418,15 @@ func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLaye
//
// Without the curb, when congestion hits, RTX rate could be so high that it further congests the channel.
//
currentLayer := f.vls.GetCurrent()
targetLayer := f.vls.GetTarget()
for layer := int32(0); layer < buffer.DefaultMaxLayerSpatial+1; layer++ {
if f.isDeficientLocked() && (targetLayer.Spatial < currentLayer.Spatial || layer > currentLayer.Spatial) {
disallowedLayers[layer] = true
if FlagFilterRTXLayers {
currentLayer := f.vls.GetCurrent()
targetLayer := f.vls.GetTarget()
for layer := int32(0); layer < buffer.DefaultMaxLayerSpatial+1; layer++ {
if f.isDeficientLocked() && (targetLayer.Spatial < currentLayer.Spatial || layer > currentLayer.Spatial) {
disallowedLayers[layer] = true
}
}
}
return
}
@@ -1581,14 +1582,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
extNextTS = extExpectedTS
} else if diffSeconds > ResumeBehindHighTresholdSeconds {
// could be due to incorrect reference calculation
f.logger.Infow(
"resume, reference very far behind",
"layer", layer,
"extExpectedTS", extExpectedTS,
"extRefTS", extRefTS,
"extLastTS", extLastTS,
"diffSeconds", diffSeconds,
)
logTransition("resume, reference very far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
extNextTS = extExpectedTS
} else {
extNextTS = extRefTS
+2 -3
View File
@@ -210,9 +210,6 @@ func NewWebRTCReceiver(
isRED: IsRedCodec(track.Codec().MimeType),
}
w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig)
w.streamTrackerManager.SetListener(w)
for _, opt := range opts {
w = opt(w)
}
@@ -235,6 +232,8 @@ func NewWebRTCReceiver(
})
w.connectionStats.Start(w.trackInfo)
w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig)
w.streamTrackerManager.SetListener(w)
// SVC-TODO: Handle DD for non-SVC cases???
if w.isSVC {
for _, ext := range receiver.GetParameters().HeaderExtensions {
+1 -1
View File
@@ -220,7 +220,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
r.logger.Errorw(
"unexpected packet ordering", nil,
"extIncomingSN", extPkt.ExtSequenceNumber,
"extHighestIncominSN", r.extHighestIncomingSN,
"extHighestIncomingSN", r.extHighestIncomingSN,
"extLastSN", r.extLastSN,
"snOffsetIncoming", snOffset,
"snOffsetHighest", r.snOffset,
+1 -1
View File
@@ -286,7 +286,7 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta {
meta.nacked++
meta.lastNack = refTime
extTS := uint64(meta.timestamp) + (s.extHighestTS & 0xFFFF_FFFF_FFFF_0000)
extTS := uint64(meta.timestamp) + (s.extHighestTS & 0xFFFF_FFFF_0000_0000)
if meta.timestamp > highestTS {
extTS -= (1 << 32)
}