From 170d4b8629e042a05c2415d2bb97d31d0b34c3f1 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 28 Oct 2022 08:53:21 +0530 Subject: [PATCH] Seed snapshots (#1128) * Seed snapshots - For one cycle after seeding, delta snap shot can get a huge gap because of snapshot iitializing from start if not present. Not a huge deal sa it should not affect functionality, but saving/restoring (at least with down track) snap shot is a big deal. So just do it. - Have been seeing a bunch of cases of delta stats getting a lot of packets due to out-of-order (what seems like) receiver report. So, save the receiver report and log it when out-of-order is detected to understand if they are closely spaced or something else could be happening. * Remove comment that does not apply anymore * log current time and RR --- pkg/sfu/buffer/rtpstats.go | 48 ++++++++++++++++++++++++-------------- pkg/sfu/downtrack.go | 17 ++++++++------ 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 2b3193390..f4847acec 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -103,8 +103,9 @@ type RTPStats struct { highestSN uint16 cycles uint16 - isRRSeen bool extHighestSNOverridden uint32 + lastRRTime time.Time + lastRR rtcp.ReceptionReport highestTS uint32 highestTime int64 @@ -175,6 +176,9 @@ func NewRTPStats(params RTPStatsParams) *RTPStats { } func (r *RTPStats) Seed(from *RTPStats) { + r.lock.Lock() + defer r.lock.Unlock() + if from == nil { return } @@ -189,8 +193,9 @@ func (r *RTPStats) Seed(from *RTPStats) { r.highestSN = from.highestSN r.cycles = from.cycles - r.isRRSeen = from.isRRSeen r.extHighestSNOverridden = from.extHighestSNOverridden + r.lastRRTime = from.lastRRTime + r.lastRR = from.lastRR r.highestTS = from.highestTS r.highestTime = from.highestTime @@ -247,7 +252,10 @@ func (r *RTPStats) Seed(from *RTPStats) { r.ntpSR = from.ntpSR r.arrivalSR = from.arrivalSR - // snapshots are not cloned and should be recreated + r.nextSnapshotId = from.nextSnapshotId + for id, ss := range from.snapshots { + r.snapshots[id] = ss + } } func (r *RTPStats) SetLogger(logger logger.Logger) { @@ -442,7 +450,7 @@ func (r *RTPStats) getTotalPacketsPrimary() uint32 { return packetsSeen - r.packetsPadding } -func (r *RTPStats) UpdateFromReceiverReport(extHighestSN uint32, packetsLost uint32, rtt uint32, jitter float64) { +func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport, rtt uint32) { r.lock.Lock() defer r.lock.Unlock() @@ -450,18 +458,18 @@ func (r *RTPStats) UpdateFromReceiverReport(extHighestSN uint32, packetsLost uin return } - if !r.isRRSeen || r.extHighestSNOverridden <= extHighestSN { - r.extHighestSNOverridden = extHighestSN - r.packetsLostOverridden = packetsLost + if r.lastRRTime.IsZero() || r.extHighestSNOverridden <= rr.LastSequenceNumber { + r.extHighestSNOverridden = rr.LastSequenceNumber + r.packetsLostOverridden = rr.TotalLost r.rtt = rtt if rtt > r.maxRtt { r.maxRtt = rtt } - r.jitterOverridden = jitter - if jitter > r.maxJitterOverridden { - r.maxJitterOverridden = jitter + r.jitterOverridden = float64(rr.Jitter) + if r.jitterOverridden > r.maxJitterOverridden { + r.maxJitterOverridden = r.jitterOverridden } // update snapshots @@ -470,17 +478,23 @@ func (r *RTPStats) UpdateFromReceiverReport(extHighestSN uint32, packetsLost uin s.maxRtt = rtt } - if jitter > s.maxJitterOverridden { - s.maxJitterOverridden = jitter + if r.jitterOverridden > s.maxJitterOverridden { + s.maxJitterOverridden = r.jitterOverridden } } + + r.lastRRTime = time.Now() + r.lastRR = rr } else { r.logger.Warnw( "receiver report potentially out of order", - fmt.Errorf("highestSN: existing: %d, received: %d", r.extHighestSNOverridden, extHighestSN), + fmt.Errorf("highestSN: existing: %d, received: %d", r.extHighestSNOverridden, rr.LastSequenceNumber), + "lastRRTime", r.lastRRTime, + "lastRR", r.lastRR, + "sinceLastRR", time.Since(r.lastRRTime), + "receivedRR", rr, ) } - r.isRRSeen = true } func (r *RTPStats) UpdateNack(nackCount uint32) { @@ -977,7 +991,7 @@ func (r *RTPStats) getExtHighestSN() uint32 { } func (r *RTPStats) getExtHighestSNAdjusted() uint32 { - if r.params.IsReceiverReportDriven && r.isRRSeen { + if r.params.IsReceiverReportDriven && !r.lastRRTime.IsZero() { return r.extHighestSNOverridden } @@ -985,7 +999,7 @@ func (r *RTPStats) getExtHighestSNAdjusted() uint32 { } func (r *RTPStats) getPacketsLost() uint32 { - if r.params.IsReceiverReportDriven && r.isRRSeen { + if r.params.IsReceiverReportDriven && !r.lastRRTime.IsZero() { return r.packetsLostOverridden } @@ -1137,7 +1151,7 @@ func (r *RTPStats) updateGapHistogram(gap int) { } func (r *RTPStats) getAndResetSnapshot(snapshotId uint32) (*Snapshot, *Snapshot) { - if !r.initialized || (r.params.IsReceiverReportDriven && !r.isRRSeen) { + if !r.initialized || (r.params.IsReceiverReportDriven && r.lastRRTime.IsZero()) { return nil, nil } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index bf253bb04..b50a278ef 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -105,13 +105,14 @@ var ( // ------------------------------------------------------------------- type DownTrackState struct { - RTPStats *buffer.RTPStats - ForwarderState ForwarderState + RTPStats *buffer.RTPStats + DeltaStatsSnapshotId uint32 + ForwarderState ForwarderState } func (d DownTrackState) String() string { - return fmt.Sprintf("DownTrackState{rtpStats: %s, forwarder: %s}", - d.RTPStats.ToString(), d.ForwarderState.String()) + return fmt.Sprintf("DownTrackState{rtpStats: %s, delta: %d, forwarder: %s}", + d.RTPStats.ToString(), d.DeltaStatsSnapshotId, d.ForwarderState.String()) } // ------------------------------------------------------------------- @@ -769,13 +770,15 @@ func (d *DownTrack) MaxLayers() VideoLayers { func (d *DownTrack) GetState() DownTrackState { return DownTrackState{ - RTPStats: d.rtpStats, - ForwarderState: d.forwarder.GetState(), + RTPStats: d.rtpStats, + DeltaStatsSnapshotId: d.deltaStatsSnapshotId, + ForwarderState: d.forwarder.GetState(), } } func (d *DownTrack) SeedState(state DownTrackState) { d.rtpStats.Seed(state.RTPStats) + d.deltaStatsSnapshotId = state.DeltaStatsSnapshotId d.forwarder.SeedState(state.ForwarderState) } @@ -1162,7 +1165,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) { rttToReport = rtt } - d.rtpStats.UpdateFromReceiverReport(r.LastSequenceNumber, r.TotalLost, rtt, float64(r.Jitter)) + d.rtpStats.UpdateFromReceiverReport(r, rtt) } if len(rr.Reports) > 0 { d.listenerLock.RLock()