Extended type for RTP timestamp. (#2001)

This commit is contained in:
Raja Subramanian
2023-08-27 17:28:44 +05:30
committed by GitHub
parent 55d5edcf73
commit 3b30f49ad5
11 changed files with 177 additions and 157 deletions
+2 -2
View File
@@ -317,9 +317,9 @@ func (d *DummyReceiver) GetCalculatedClockRate(layer int32) uint32 {
return 0
}
func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
return r.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer)
}
return 0, errors.New("receiver not available")
}
+11 -9
View File
@@ -55,6 +55,7 @@ type ExtPacket struct {
VideoLayer
Arrival time.Time
ExtSequenceNumber uint32
ExtTimestamp uint64
Packet *rtp.Packet
Payload interface{}
KeyFrame bool
@@ -414,21 +415,21 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
return
}
extSeqNumber, isOutOfOrder := b.updateStreamState(&rtpPacket, arrivalTime)
flowState := b.updateStreamState(&rtpPacket, arrivalTime)
b.processHeaderExtensions(&rtpPacket, arrivalTime)
if !isOutOfOrder && len(rtpPacket.Payload) == 0 {
if !flowState.IsOutOfOrder && len(rtpPacket.Payload) == 0 {
// drop padding only in-order packet
b.snRangeMap.IncValue(1)
return
}
// add to RTX buffer using sequence number after accounting for dropped padding only packets
snAdjustment, err := b.snRangeMap.GetValue(extSeqNumber)
snAdjustment, err := b.snRangeMap.GetValue(flowState.ExtSequenceNumber)
if err != nil {
b.logger.Errorw("could not get sequence number adjustment", err)
return
}
rtpPacket.Header.SequenceNumber = uint16(extSeqNumber - snAdjustment)
rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber - snAdjustment)
_, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber)
if err != nil {
if err != bucket.ErrRTXPacket {
@@ -441,7 +442,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
b.doReports(arrivalTime)
ep := b.getExtPacket(&rtpPacket, extSeqNumber, arrivalTime)
ep := b.getExtPacket(&rtpPacket, arrivalTime, flowState)
if ep == nil {
return
}
@@ -499,7 +500,7 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) {
}
}
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32, bool) {
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlowState {
flowState := b.rtpStats.Update(&p.Header, len(p.Payload), int(p.PaddingSize), arrivalTime)
if b.nacker != nil {
@@ -513,7 +514,7 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32
}
}
return flowState.ExtSeqNumber, flowState.IsOutOfOrder
return flowState
}
func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) {
@@ -546,10 +547,11 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) {
}
}
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, extSeqNumber uint32, arrivalTime time.Time) *ExtPacket {
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flowState RTPFlowState) *ExtPacket {
ep := &ExtPacket{
Arrival: arrivalTime,
ExtSequenceNumber: extSeqNumber,
ExtSequenceNumber: flowState.ExtSequenceNumber,
ExtTimestamp: flowState.ExtTimestamp,
Packet: rtpPacket,
VideoLayer: VideoLayer{
Spatial: InvalidLayerSpatial,
+29 -28
View File
@@ -71,7 +71,8 @@ type RTPFlowState struct {
IsOutOfOrder bool
ExtSeqNumber uint32
ExtSequenceNumber uint32
ExtTimestamp uint64
}
type IntervalStats struct {
@@ -152,8 +153,7 @@ type RTPStats struct {
lock sync.RWMutex
initialized bool
resyncOnNextPacket bool
initialized bool
startTime time.Time
endTime time.Time
@@ -245,7 +245,6 @@ func (r *RTPStats) Seed(from *RTPStats) {
}
r.initialized = from.initialized
r.resyncOnNextPacket = from.resyncOnNextPacket
r.startTime = from.startTime
// do not clone endTime as a non-zero endTime indicates an ended object
@@ -375,16 +374,6 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
return
}
if r.resyncOnNextPacket {
r.resyncOnNextPacket = false
if r.initialized {
r.sequenceNumber.ResetHighest(rtph.SequenceNumber - 1)
r.timestamp.ResetHighest(rtph.Timestamp)
r.highestTime = packetTime
}
}
var resSN utils.WrapAroundUpdateResult[uint32]
var resTS utils.WrapAroundUpdateResult[uint64]
if !r.initialized {
@@ -417,8 +406,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
"rtp stream start",
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
"startSN", r.sequenceNumber.GetExtendedHighest(),
"startTS", r.timestamp.GetExtendedHighest(),
"startSN", r.sequenceNumber.GetExtendedStart(),
"startTS", r.timestamp.GetExtendedStart(),
)
} else {
resSN = r.sequenceNumber.Update(rtph.SequenceNumber)
@@ -483,7 +472,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
}
flowState.IsOutOfOrder = true
flowState.ExtSeqNumber = resSN.ExtendedVal
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
} else { // in-order
// update gap histogram
r.updateGapHistogram(int(gapSN))
@@ -505,7 +495,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
flowState.LossStartInclusive = resSN.PreExtendedHighest + 1
flowState.LossEndExclusive = resSN.ExtendedVal
}
flowState.ExtSeqNumber = resSN.ExtendedVal
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
}
if !isDuplicate {
@@ -527,11 +518,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
return
}
func (r *RTPStats) ResyncOnNextPacket() {
func (r *RTPStats) Resync(esn uint32, ets uint64, at time.Time) {
r.lock.Lock()
defer r.lock.Unlock()
r.resyncOnNextPacket = true
if !r.initialized {
return
}
r.sequenceNumber.ResetHighest(esn - 1)
r.timestamp.ResetHighest(ets)
r.highestTime = at
}
func (r *RTPStats) getPacketsExpected() uint32 {
@@ -788,11 +784,11 @@ func (r *RTPStats) MaybeAdjustFirstPacketTime(srData *RTCPSenderReportData) {
defer r.lock.Unlock()
if srData != nil {
r.maybeAdjustFirstPacketTime(srData.RTPTimestamp)
r.maybeAdjustFirstPacketTime(srData.RTPTimestampExt)
}
}
func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
func (r *RTPStats) maybeAdjustFirstPacketTime(ets uint64) {
if time.Since(r.startTime) > firstPacketTimeAdjustWindow {
return
}
@@ -803,7 +799,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
// abnormal delay (maybe due to pacing or maybe due to queuing
// in some network element along the way), push back first time
// to an earlier instance.
samplesDiff := int32(ts - uint32(r.timestamp.GetExtendedStart()))
samplesDiff := int64(ets - r.timestamp.GetExtendedStart())
if samplesDiff < 0 {
// out-of-order, skip
return
@@ -819,7 +815,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
"before", r.firstTime.String(),
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime),
"nowTS", ts,
"extNowTS", ets,
"extStartTS", r.timestamp.GetExtendedStart(),
)
if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold {
@@ -829,7 +825,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
"before", r.firstTime.String(),
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime),
"nowTS", ts,
"extNowTS", ets,
"extStartTS", r.timestamp.GetExtendedStart(),
)
} else {
@@ -864,12 +860,17 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp {
cycles += (1 << 32)
}
ntpDiffSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiff := uint64(ntpDiffSinceLast.Seconds() * float64(r.params.ClockRate))
goArounds := rtpDiff / (1 << 32)
cycles += goArounds * (1 << 32)
}
srDataCopy := *srData
srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp)
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt)
// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
@@ -1018,13 +1019,13 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
var rtpDiffSinceLast uint32
var rtpDiffSinceLast uint64
var departureDiffSinceLast time.Duration
var expectedTimeDiffSinceLast float64
var isWarped bool
if r.srNewest != nil {
ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiffSinceLast = nowRTP - r.srNewest.RTPTimestamp
rtpDiffSinceLast = nowRTPExt - r.srNewest.RTPTimestampExt
departureDiffSinceLast = now.Sub(r.srNewest.At)
expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
+67 -67
View File
@@ -157,8 +157,8 @@ type ForwarderState struct {
Started bool
ReferenceLayerSpatial int32
PreStartTime time.Time
FirstTS uint32
RefTSOffset uint32
ExtFirstTS uint64
RefTSOffset uint64
RTP RTPMungerState
Codec interface{}
}
@@ -169,11 +169,11 @@ func (f ForwarderState) String() string {
case codecmunger.VP8State:
codecString = codecState.String()
}
return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, firstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}",
return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, extFirstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}",
f.Started,
f.ReferenceLayerSpatial,
f.PreStartTime.String(),
f.FirstTS,
f.ExtFirstTS,
f.RefTSOffset,
f.RTP.String(),
codecString,
@@ -187,7 +187,7 @@ type Forwarder struct {
codec webrtc.RTPCodecCapability
kind webrtc.RTPCodecType
logger logger.Logger
getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error)
getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error)
getExpectedRTPTimestamp func(at time.Time) (uint64, error)
muted bool
@@ -196,10 +196,10 @@ type Forwarder struct {
started bool
preStartTime time.Time
firstTS uint32
extFirstTS uint64
lastSSRC uint32
referenceLayerSpatial int32
refTSOffset uint32
refTSOffset uint64
provisional *VideoAllocationProvisional
@@ -215,7 +215,7 @@ type Forwarder struct {
func NewForwarder(
kind webrtc.RTPCodecType,
logger logger.Logger,
getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error),
getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error),
getExpectedRTPTimestamp func(at time.Time) (uint64, error),
) *Forwarder {
f := &Forwarder{
@@ -335,7 +335,7 @@ func (f *Forwarder) GetState() ForwarderState {
Started: f.started,
ReferenceLayerSpatial: f.referenceLayerSpatial,
PreStartTime: f.preStartTime,
FirstTS: f.firstTS,
ExtFirstTS: f.extFirstTS,
RefTSOffset: f.refTSOffset,
RTP: f.rtpMunger.GetLast(),
Codec: f.codecMunger.GetState(),
@@ -356,7 +356,7 @@ func (f *Forwarder) SeedState(state ForwarderState) {
f.started = true
f.referenceLayerSpatial = state.ReferenceLayerSpatial
f.preStartTime = state.PreStartTime
f.firstTS = state.FirstTS
f.extFirstTS = state.ExtFirstTS
f.refTSOffset = state.RefTSOffset
}
@@ -1444,13 +1444,13 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
return nil
}
logTransition := func(message string, expectedTS, refTS, lastTS uint32, diffSeconds float64) {
logTransition := func(message string, extExpectedTS, extRefTS, extLastTS uint64, diffSeconds float64) {
f.logger.Debugw(
message,
"layer", layer,
"expectedTS", expectedTS,
"refTS", refTS,
"lastTS", lastTS,
"extExpectedTS", extExpectedTS,
"extRefTS", extRefTS,
"extLastTS", extLastTS,
"diffSeconds", math.Abs(diffSeconds),
)
}
@@ -1460,20 +1460,20 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
// timestamp offset on source change.
//
// There are three timestamps to consider here
// 1. lastTS -> timestamp of last sent packet
// 2. refTS -> timestamp of this packet (after munging) calculated using feed's RTCP sender report
// 3. expectedTS -> expected timestamp of this packet calculated based on elapsed time since first packet
// Ideally, refTS and expectedTS should be very close and lastTS should be before both of those.
// 1. extLastTS -> timestamp of last sent packet
// 2. extRefTS -> timestamp of this packet (after munging) calculated using feed's RTCP sender report
// 3. extExpectedTS -> expected timestamp of this packet calculated based on elapsed time since first packet
// Ideally, extRefTS and extExpectedTS should be very close and extLastTS should be before both of those.
// But, cases like muting/unmuting, clock vagaries, pacing, etc. make them not satisfy those conditions always.
rtpMungerState := f.rtpMunger.GetLast()
lastTS := rtpMungerState.LastTS
refTS := lastTS
expectedTS := lastTS
extLastTS := rtpMungerState.ExtLastTS
extRefTS := extLastTS
extExpectedTS := extLastTS
switchingAt := time.Now()
if f.getReferenceLayerRTPTimestamp != nil {
ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial)
ets, err := f.getReferenceLayerRTPTimestamp(extPkt.ExtTimestamp, layer, f.referenceLayerSpatial)
if err != nil {
// error out if refTS is not available. It can happen when there is no sender report
// error out if extRefTS is not available. It can happen when there is no sender report
// for the layer being switched to. Can especially happen at the start of the track when layer switches are
// potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been
// received will calculate a better offset, but may result in initial adaptation to take a bit longer depending
@@ -1481,35 +1481,35 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
return err
}
refTS = ts
extRefTS = ets
}
if f.getExpectedRTPTimestamp != nil {
tsExt, err := f.getExpectedRTPTimestamp(switchingAt)
if err == nil {
expectedTS = uint32(tsExt)
extExpectedTS = tsExt
} else {
rtpDiff := uint32(0)
rtpDiff := uint64(0)
if !f.preStartTime.IsZero() && f.refTSOffset == 0 {
timeSinceFirst := time.Since(f.preStartTime)
rtpDiff = uint32(timeSinceFirst.Nanoseconds() * int64(f.codec.ClockRate) / 1e9)
f.refTSOffset = f.firstTS + rtpDiff - refTS
rtpDiff = uint64(timeSinceFirst.Nanoseconds() * int64(f.codec.ClockRate) / 1e9)
f.refTSOffset = f.extFirstTS + rtpDiff - extRefTS
f.logger.Infow(
"calculating refTSOffset",
"preStartTime", f.preStartTime.String(),
"firstTS", f.firstTS,
"extFirstTS", f.extFirstTS,
"timeSinceFirst", timeSinceFirst,
"rtpDiff", rtpDiff,
"refTS", refTS,
"extRefTS", extRefTS,
"refTSOffset", f.refTSOffset,
)
}
expectedTS += rtpDiff
extExpectedTS += rtpDiff
}
}
refTS += f.refTSOffset
extRefTS += f.refTSOffset
var nextTS uint32
var extNextTS uint64
if f.lastSSRC == 0 {
// If resuming (e. g. on unmute), keep next timestamp close to expected timestamp.
//
@@ -1524,71 +1524,71 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
// increases and pacer starts sending at faster rate.
//
// But, the challenege is distinguishing between the two cases. As a compromise, the difference
// between expectedTS and refTS is thresholded. Difference below the threshold is treated as Case 2
// between extExpectedTS and extRefTS is thresholded. Difference below the threshold is treated as Case 2
// and above as Case 1.
//
// In the event of refTS > expectedTS, use refTS.
// Ideally, refTS should not be ahead of expectedTS, but expectedTS uses the first packet's
// In the event of extRefTS > extExpectedTS, use extRefTS.
// Ideally, extRefTS should not be ahead of extExpectedTS, but extExpectedTS uses the first packet's
// wall clock time. So, if the first packet experienced abmormal latency, it is possible
// for refTS > expectedTS
diffSeconds := float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate)
// for extRefTS > extExpectedTS
diffSeconds := float64(int64(extExpectedTS-extRefTS)) / float64(f.codec.ClockRate)
if diffSeconds >= 0.0 {
if f.resumeBehindThreshold > 0 && diffSeconds > f.resumeBehindThreshold {
logTransition("resume, reference too far behind", expectedTS, refTS, lastTS, diffSeconds)
nextTS = expectedTS
logTransition("resume, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
extNextTS = extExpectedTS
} else {
nextTS = refTS
extNextTS = extRefTS
}
} else {
if math.Abs(diffSeconds) > SwitchAheadThresholdSeconds {
logTransition("resume, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds)
logTransition("resume, reference too far ahead", extExpectedTS, extRefTS, extLastTS, diffSeconds)
}
nextTS = refTS
extNextTS = extRefTS
}
f.resumeBehindThreshold = 0.0
} else {
// switching between layers, check if refTS is too far behind the last sent
diffSeconds := float64(int32(refTS-lastTS)) / float64(f.codec.ClockRate)
// switching between layers, check if extRefTS is too far behind the last sent
diffSeconds := float64(int64(extRefTS-extLastTS)) / float64(f.codec.ClockRate)
if diffSeconds < 0.0 {
if math.Abs(diffSeconds) > LayerSwitchBehindThresholdSeconds {
// this could be due to pacer trickling out this layer. Error out and wait for a more opportune time.
// AVSYNC-TODO: Consider some forcing function to do the switch
// (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition).
logTransition("layer switch, reference too far behind", expectedTS, refTS, lastTS, diffSeconds)
logTransition("layer switch, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
return errors.New("switch point too far behind")
}
// use a nominal increase to ensure that timestamp is always moving forward
logTransition("layer switch, reference is slightly behind", expectedTS, refTS, lastTS, diffSeconds)
nextTS = lastTS + 1
logTransition("layer switch, reference is slightly behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
extNextTS = extLastTS + 1
} else {
diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate)
diffSeconds = float64(int64(extExpectedTS-extRefTS)) / float64(f.codec.ClockRate)
if diffSeconds < 0.0 && math.Abs(diffSeconds) > SwitchAheadThresholdSeconds {
logTransition("layer switch, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds)
logTransition("layer switch, reference too far ahead", extExpectedTS, extRefTS, extLastTS, diffSeconds)
}
nextTS = refTS
extNextTS = extRefTS
}
}
if nextTS-lastTS == 0 || nextTS-lastTS > (1<<31) {
f.logger.Debugw("next timestamp is before last, adjusting", "nextTS", nextTS, "lastTS", lastTS)
if int64(extNextTS-extLastTS) <= 0 {
f.logger.Debugw("next timestamp is before last, adjusting", "extNextTS", extNextTS, "extLastTS", extLastTS)
// nominal increase
nextTS = lastTS + 1
extNextTS = extLastTS + 1
}
f.logger.Debugw(
"next timestamp on switch",
"switchingAt", switchingAt.String(),
"layer", layer,
"lastTS", lastTS,
"refTS", refTS,
"extLastTS", extLastTS,
"extRefTS", extRefTS,
"refTSOffset", f.refTSOffset,
"referenceLayerSpatial", f.referenceLayerSpatial,
"expectedTS", expectedTS,
"nextTS", nextTS,
"tsJump", nextTS-lastTS,
"extExpectedTS", extExpectedTS,
"extNextTS", extNextTS,
"tsJump", extNextTS-extLastTS,
"nextSN", rtpMungerState.ExtLastSN+1,
)
f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, nextTS-lastTS)
f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, extNextTS-extLastTS)
f.codecMunger.UpdateOffsets(extPkt)
return nil
}
@@ -1733,7 +1733,7 @@ func (f *Forwarder) maybeStart() {
}
f.rtpMunger.SetLastSnTs(extPkt)
f.firstTS = extPkt.Packet.Timestamp
f.extFirstTS = uint64(extPkt.Packet.Timestamp)
f.logger.Debugw(
"starting with dummy forwarding",
"sequenceNumber", extPkt.Packet.SequenceNumber,
@@ -1770,18 +1770,18 @@ func (f *Forwarder) GetSnTsForBlankFrames(frameRate uint32, numPackets int) ([]S
numPackets++
}
lastTS := f.rtpMunger.GetLast().LastTS
expectedTS := lastTS
extLastTS := f.rtpMunger.GetLast().ExtLastTS
extExpectedTS := extLastTS
if f.getExpectedRTPTimestamp != nil {
tsExt, err := f.getExpectedRTPTimestamp(time.Now())
if err == nil {
expectedTS = uint32(tsExt)
extExpectedTS = tsExt
}
}
if expectedTS-lastTS == 0 || expectedTS-lastTS > (1<<31) {
expectedTS = lastTS + 1
if int64(extExpectedTS-extLastTS) <= 0 {
extExpectedTS = extLastTS + 1
}
snts, err := f.rtpMunger.UpdateAndGetPaddingSnTs(numPackets, f.codec.ClockRate, frameRate, frameEndNeeded, expectedTS)
snts, err := f.rtpMunger.UpdateAndGetPaddingSnTs(numPackets, f.codec.ClockRate, frameRate, frameEndNeeded, extExpectedTS)
return snts, frameEndNeeded, err
}
+3 -3
View File
@@ -81,7 +81,7 @@ type TrackReceiver interface {
GetTemporalLayerFpsForSpatial(layer int32) []float32
GetCalculatedClockRate(layer int32) uint32
GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error)
GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error)
}
// WebRTCReceiver receives a media track
@@ -777,8 +777,8 @@ func (w *WebRTCReceiver) GetCalculatedClockRate(layer int32) uint32 {
return w.streamTrackerManager.GetCalculatedClockRate(layer)
}
func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) {
return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer)
}
// closes all track senders in parallel, returns when all are closed
+26 -25
View File
@@ -54,11 +54,11 @@ type SnTs struct {
type RTPMungerState struct {
ExtLastSN uint32
LastTS uint32
ExtLastTS uint64
}
func (r RTPMungerState) String() string {
return fmt.Sprintf("RTPMungerState{extLastSN: %d, lastTS: %d)", r.ExtLastSN, r.LastTS)
return fmt.Sprintf("RTPMungerState{extLastSN: %d, extLastTS: %d)", r.ExtLastSN, r.ExtLastTS)
}
// ----------------------------------------------------------------------
@@ -70,8 +70,8 @@ type RTPMunger struct {
snRangeMap *utils.RangeMap[uint32, uint32]
extLastSN uint32
lastTS uint32
tsOffset uint32
extLastTS uint64
tsOffset uint64
lastMarker bool
extRtxGateSn uint32
@@ -91,7 +91,7 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} {
"ExtHighestIncomingSN": r.extHighestIncomingSN,
"ExtLastSN": r.extLastSN,
"SNOffset": snOffset,
"LastTS": r.lastTS,
"ExtLastTS": r.extLastTS,
"TSOffset": r.tsOffset,
"LastMarker": r.lastMarker,
}
@@ -100,25 +100,25 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} {
func (r *RTPMunger) GetLast() RTPMungerState {
return RTPMungerState{
ExtLastSN: r.extLastSN,
LastTS: r.lastTS,
ExtLastTS: r.extLastTS,
}
}
func (r *RTPMunger) SeedLast(state RTPMungerState) {
r.extLastSN = state.ExtLastSN
r.lastTS = state.LastTS
r.extLastTS = state.ExtLastTS
}
func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1
r.extLastSN = extPkt.ExtSequenceNumber
r.lastTS = extPkt.Packet.Timestamp
r.extLastTS = extPkt.ExtTimestamp
}
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint32, tsAdjust uint32) {
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint32, tsAdjust uint64) {
r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1
r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust)
r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust
r.tsOffset = extPkt.ExtTimestamp - r.extLastTS - tsAdjust
}
func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) {
@@ -151,7 +151,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: uint16(extPkt.ExtSequenceNumber - snOffset),
timestamp: extPkt.Packet.Timestamp - r.tsOffset,
timestamp: uint32(extPkt.ExtTimestamp - r.tsOffset),
}, nil
}
@@ -186,10 +186,10 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
}
extMungedSN := extPkt.ExtSequenceNumber - snOffset
mungedTS := extPkt.Packet.Timestamp - r.tsOffset
extMungedTS := extPkt.ExtTimestamp - r.tsOffset
r.extLastSN = extMungedSN
r.lastTS = mungedTS
r.extLastTS = extMungedTS
r.lastMarker = extPkt.Packet.Marker
if extPkt.KeyFrame {
@@ -204,7 +204,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
return &TranslationParamsRTP{
snOrdering: ordering,
sequenceNumber: uint16(extMungedSN),
timestamp: mungedTS,
timestamp: uint32(extMungedTS),
}, nil
}
@@ -223,7 +223,7 @@ func (r *RTPMunger) FilterRTX(nacks []uint16) []uint16 {
return filtered
}
func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate uint32, forceMarker bool, rtpTimestamp uint32) ([]SnTs, error) {
func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate uint32, forceMarker bool, extRtpTimestamp uint64) ([]SnTs, error) {
useLastTSForFirst := false
tsOffset := 0
if !r.lastMarker {
@@ -237,32 +237,33 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate
}
extLastSN := r.extLastSN
lastTS := r.lastTS
extLastTS := r.extLastTS
vals := make([]SnTs, num)
for i := 0; i < num; i++ {
extLastSN++
vals[i].sequenceNumber = uint16(extLastSN)
if frameRate != 0 {
if useLastTSForFirst && i == 0 {
vals[i].timestamp = r.lastTS
vals[i].timestamp = uint32(r.extLastTS)
} else {
ts := rtpTimestamp + ((uint32(i+1-tsOffset)*clockRate)+frameRate-1)/frameRate
if (ts-lastTS) == 0 || (ts-lastTS) > (1<<31) {
ts = lastTS + 1
lastTS = ts
ets := extRtpTimestamp + uint64(((uint32(i+1-tsOffset)*clockRate)+frameRate-1)/frameRate)
if int64(ets-extLastTS) <= 0 {
ets = extLastTS + 1
}
vals[i].timestamp = ts
extLastTS = ets
vals[i].timestamp = uint32(ets)
}
} else {
vals[i].timestamp = r.lastTS
vals[i].timestamp = uint32(r.extLastTS)
}
}
r.extLastSN = extLastSN
r.snRangeMap.DecValue(uint32(num))
r.tsOffset -= vals[num-1].timestamp - r.lastTS
r.lastTS = vals[num-1].timestamp
r.tsOffset -= extLastTS - r.extLastTS
r.extLastTS = extLastTS
if forceMarker {
r.lastMarker = true
+8 -8
View File
@@ -43,11 +43,11 @@ func TestSetLastSnTs(t *testing.T) {
r.SetLastSnTs(extPkt)
require.Equal(t, uint32(23332), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint64(0xabcdef), r.extLastTS)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
require.Equal(t, uint32(0), r.tsOffset)
require.Equal(t, uint64(0), r.tsOffset)
}
func TestUpdateSnTsOffsets(t *testing.T) {
@@ -70,11 +70,11 @@ func TestUpdateSnTsOffsets(t *testing.T) {
r.UpdateSnTsOffsets(extPkt, 1, 1)
require.Equal(t, uint32(33332), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint64(0xabcdef), r.extLastTS)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(9999), snOffset)
require.Equal(t, uint32(0xffffffff), r.tsOffset)
require.Equal(t, uint64(0xffff_ffff_ffff_ffff), r.tsOffset)
}
func TestPacketDropped(t *testing.T) {
@@ -90,11 +90,11 @@ func TestPacketDropped(t *testing.T) {
r.SetLastSnTs(extPkt)
require.Equal(t, uint32(23332), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint64(0xabcdef), r.extLastTS)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
require.Equal(t, uint32(0), r.tsOffset)
require.Equal(t, uint64(0), r.tsOffset)
r.UpdateAndGetSnTs(extPkt) // update sequence number offset
@@ -493,7 +493,7 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) {
timestamp: params.Timestamp + ((uint32(i)*clockRate)+frameRate-1)/frameRate,
}
}
snts, err := r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, true, params.Timestamp)
snts, err := r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, true, extPkt.ExtTimestamp)
require.NoError(t, err)
require.Equal(t, sntsExpected, snts)
@@ -504,7 +504,7 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) {
timestamp: snts[len(snts)-1].timestamp + ((uint32(i+1)*clockRate)+frameRate-1)/frameRate,
}
}
snts, err = r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, false, snts[len(snts)-1].timestamp)
snts, err = r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, false, uint64(snts[len(snts)-1].timestamp))
require.NoError(t, err)
require.Equal(t, sntsExpected, snts)
}
+5 -5
View File
@@ -76,7 +76,7 @@ type StreamTrackerManager struct {
senderReportMu sync.RWMutex
senderReports [buffer.DefaultMaxLayerSpatial + 1]endsSenderReport
layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint32
layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint64
closed core.Fuse
@@ -563,10 +563,10 @@ func (s *StreamTrackerManager) updateLayerOffsetLocked(ref, other int32) {
rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9
// calculate other layer's time stamp at the same time as ref layer's NTP time
normalizedOtherTS := srOther.RTPTimestamp + uint32(rtpDiff)
normalizedOtherTS := srOther.RTPTimestampExt + uint64(rtpDiff)
// now both layers' time stamp refer to the same NTP time and the diff is the offset between the layers
offset := srRef.RTPTimestamp - normalizedOtherTS
offset := srRef.RTPTimestampExt - normalizedOtherTS
// use minimal offset to indicate value availability in the extremely unlikely case of
// both layers using the same timestamp
@@ -643,7 +643,7 @@ func (s *StreamTrackerManager) GetCalculatedClockRate(layer int32) uint32 {
return uint32(float64(rdsf) / tsf.Seconds())
}
func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) {
s.senderReportMu.RLock()
defer s.senderReportMu.RUnlock()
@@ -655,7 +655,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in
return 0, fmt.Errorf("offset unavailable, target: %d, reference: %d", layer, referenceLayer)
}
return ts + s.layerOffsets[referenceLayer][layer], nil
return ets + s.layerOffsets[referenceLayer][layer], nil
}
func (s *StreamTrackerManager) GetMaxTemporalLayerSeen() int32 {
+2
View File
@@ -32,6 +32,7 @@ type TestExtPacketParams struct {
SequenceNumber uint16
SNCycles int
Timestamp uint32
TSCycles int
SSRC uint32
PayloadSize int
PaddingSize byte
@@ -64,6 +65,7 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) {
ep := &buffer.ExtPacket{
VideoLayer: params.VideoLayer,
ExtSequenceNumber: uint32(params.SNCycles<<16) + uint32(params.SequenceNumber),
ExtTimestamp: uint64(params.TSCycles<<32) + uint64(params.Timestamp),
Arrival: params.ArrivalTime,
Packet: &packet,
KeyFrame: params.IsKeyFrame,
+11 -10
View File
@@ -32,7 +32,7 @@ type WrapAround[T number, ET extendedNumber] struct {
initialized bool
start T
highest T
cycles int
cycles ET
}
func NewWrapAround[T number, ET extendedNumber]() *WrapAround[T, ET] {
@@ -78,7 +78,7 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
// in-order
if val < w.highest {
w.cycles++
w.cycles += w.fullRange
}
w.highest = val
@@ -88,13 +88,14 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
func (w *WrapAround[T, ET]) RollbackRestart(ev ET) {
if w.isWrapBack(w.start, T(ev)) {
w.cycles--
w.cycles -= w.fullRange
}
w.start = T(ev)
}
func (w *WrapAround[T, ET]) ResetHighest(val T) {
w.highest = val
func (w *WrapAround[T, ET]) ResetHighest(ev ET) {
w.highest = T(ev)
w.cycles = ev & ^(w.fullRange - 1)
}
func (w *WrapAround[T, ET]) GetStart() T {
@@ -122,7 +123,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended
totalNum := w.GetExtendedHighest() - w.GetExtendedStart() + 1
if totalNum > (w.fullRange >> 1) {
if w.isWrapBack(val, w.highest) {
cycles--
cycles -= w.fullRange
}
extendedVal = w.getExtendedHighest(cycles, val)
return
@@ -134,13 +135,13 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended
preExtendedStart = w.GetExtendedStart()
if w.isWrapBack(val, w.highest) {
w.cycles = 1
w.cycles = w.fullRange
cycles = 0
}
w.start = val
} else {
if w.isWrapBack(val, w.highest) {
cycles--
cycles -= w.fullRange
}
}
extendedVal = w.getExtendedHighest(cycles, val)
@@ -151,6 +152,6 @@ func (w *WrapAround[T, ET]) isWrapBack(earlier T, later T) bool {
return ET(later) < (w.fullRange>>1) && ET(earlier) >= (w.fullRange>>1)
}
func (w *WrapAround[T, ET]) getExtendedHighest(cycles int, val T) ET {
return ET(cycles)*w.fullRange + ET(val)
func (w *WrapAround[T, ET]) getExtendedHighest(cycles ET, val T) ET {
return cycles + ET(val)
}
+13
View File
@@ -252,6 +252,19 @@ func TestWrapAroundUint16RollbackRestart(t *testing.T) {
require.Equal(t, uint32(23), w.GetExtendedStart())
require.Equal(t, uint16(25), w.GetHighest())
require.Equal(t, uint32(25), w.GetExtendedHighest())
// reset highest
w.ResetHighest(0x1234)
require.Equal(t, uint16(23), w.GetStart())
require.Equal(t, uint32(23), w.GetExtendedStart())
require.Equal(t, uint16(0x1234), w.GetHighest())
require.Equal(t, uint32(0x1234), w.GetExtendedHighest())
w.ResetHighest(0x7f1234)
require.Equal(t, uint16(23), w.GetStart())
require.Equal(t, uint32(23), w.GetExtendedStart())
require.Equal(t, uint16(0x1234), w.GetHighest())
require.Equal(t, uint32(0x7f1234), w.GetExtendedHighest())
}
func TestWrapAroundUint32(t *testing.T) {