mirror of
https://github.com/livekit/livekit.git
synced 2026-06-06 15:31:51 +00:00
Seed snapshots (#1128)
* Seed snapshots - For one cycle after seeding, delta snap shot can get a huge gap because of snapshot iitializing from start if not present. Not a huge deal sa it should not affect functionality, but saving/restoring (at least with down track) snap shot is a big deal. So just do it. - Have been seeing a bunch of cases of delta stats getting a lot of packets due to out-of-order (what seems like) receiver report. So, save the receiver report and log it when out-of-order is detected to understand if they are closely spaced or something else could be happening. * Remove comment that does not apply anymore * log current time and RR
This commit is contained in:
+31
-17
@@ -103,8 +103,9 @@ type RTPStats struct {
|
||||
highestSN uint16
|
||||
cycles uint16
|
||||
|
||||
isRRSeen bool
|
||||
extHighestSNOverridden uint32
|
||||
lastRRTime time.Time
|
||||
lastRR rtcp.ReceptionReport
|
||||
|
||||
highestTS uint32
|
||||
highestTime int64
|
||||
@@ -175,6 +176,9 @@ func NewRTPStats(params RTPStatsParams) *RTPStats {
|
||||
}
|
||||
|
||||
func (r *RTPStats) Seed(from *RTPStats) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
if from == nil {
|
||||
return
|
||||
}
|
||||
@@ -189,8 +193,9 @@ func (r *RTPStats) Seed(from *RTPStats) {
|
||||
r.highestSN = from.highestSN
|
||||
r.cycles = from.cycles
|
||||
|
||||
r.isRRSeen = from.isRRSeen
|
||||
r.extHighestSNOverridden = from.extHighestSNOverridden
|
||||
r.lastRRTime = from.lastRRTime
|
||||
r.lastRR = from.lastRR
|
||||
|
||||
r.highestTS = from.highestTS
|
||||
r.highestTime = from.highestTime
|
||||
@@ -247,7 +252,10 @@ func (r *RTPStats) Seed(from *RTPStats) {
|
||||
r.ntpSR = from.ntpSR
|
||||
r.arrivalSR = from.arrivalSR
|
||||
|
||||
// snapshots are not cloned and should be recreated
|
||||
r.nextSnapshotId = from.nextSnapshotId
|
||||
for id, ss := range from.snapshots {
|
||||
r.snapshots[id] = ss
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RTPStats) SetLogger(logger logger.Logger) {
|
||||
@@ -442,7 +450,7 @@ func (r *RTPStats) getTotalPacketsPrimary() uint32 {
|
||||
return packetsSeen - r.packetsPadding
|
||||
}
|
||||
|
||||
func (r *RTPStats) UpdateFromReceiverReport(extHighestSN uint32, packetsLost uint32, rtt uint32, jitter float64) {
|
||||
func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport, rtt uint32) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
@@ -450,18 +458,18 @@ func (r *RTPStats) UpdateFromReceiverReport(extHighestSN uint32, packetsLost uin
|
||||
return
|
||||
}
|
||||
|
||||
if !r.isRRSeen || r.extHighestSNOverridden <= extHighestSN {
|
||||
r.extHighestSNOverridden = extHighestSN
|
||||
r.packetsLostOverridden = packetsLost
|
||||
if r.lastRRTime.IsZero() || r.extHighestSNOverridden <= rr.LastSequenceNumber {
|
||||
r.extHighestSNOverridden = rr.LastSequenceNumber
|
||||
r.packetsLostOverridden = rr.TotalLost
|
||||
|
||||
r.rtt = rtt
|
||||
if rtt > r.maxRtt {
|
||||
r.maxRtt = rtt
|
||||
}
|
||||
|
||||
r.jitterOverridden = jitter
|
||||
if jitter > r.maxJitterOverridden {
|
||||
r.maxJitterOverridden = jitter
|
||||
r.jitterOverridden = float64(rr.Jitter)
|
||||
if r.jitterOverridden > r.maxJitterOverridden {
|
||||
r.maxJitterOverridden = r.jitterOverridden
|
||||
}
|
||||
|
||||
// update snapshots
|
||||
@@ -470,17 +478,23 @@ func (r *RTPStats) UpdateFromReceiverReport(extHighestSN uint32, packetsLost uin
|
||||
s.maxRtt = rtt
|
||||
}
|
||||
|
||||
if jitter > s.maxJitterOverridden {
|
||||
s.maxJitterOverridden = jitter
|
||||
if r.jitterOverridden > s.maxJitterOverridden {
|
||||
s.maxJitterOverridden = r.jitterOverridden
|
||||
}
|
||||
}
|
||||
|
||||
r.lastRRTime = time.Now()
|
||||
r.lastRR = rr
|
||||
} else {
|
||||
r.logger.Warnw(
|
||||
"receiver report potentially out of order",
|
||||
fmt.Errorf("highestSN: existing: %d, received: %d", r.extHighestSNOverridden, extHighestSN),
|
||||
fmt.Errorf("highestSN: existing: %d, received: %d", r.extHighestSNOverridden, rr.LastSequenceNumber),
|
||||
"lastRRTime", r.lastRRTime,
|
||||
"lastRR", r.lastRR,
|
||||
"sinceLastRR", time.Since(r.lastRRTime),
|
||||
"receivedRR", rr,
|
||||
)
|
||||
}
|
||||
r.isRRSeen = true
|
||||
}
|
||||
|
||||
func (r *RTPStats) UpdateNack(nackCount uint32) {
|
||||
@@ -977,7 +991,7 @@ func (r *RTPStats) getExtHighestSN() uint32 {
|
||||
}
|
||||
|
||||
func (r *RTPStats) getExtHighestSNAdjusted() uint32 {
|
||||
if r.params.IsReceiverReportDriven && r.isRRSeen {
|
||||
if r.params.IsReceiverReportDriven && !r.lastRRTime.IsZero() {
|
||||
return r.extHighestSNOverridden
|
||||
}
|
||||
|
||||
@@ -985,7 +999,7 @@ func (r *RTPStats) getExtHighestSNAdjusted() uint32 {
|
||||
}
|
||||
|
||||
func (r *RTPStats) getPacketsLost() uint32 {
|
||||
if r.params.IsReceiverReportDriven && r.isRRSeen {
|
||||
if r.params.IsReceiverReportDriven && !r.lastRRTime.IsZero() {
|
||||
return r.packetsLostOverridden
|
||||
}
|
||||
|
||||
@@ -1137,7 +1151,7 @@ func (r *RTPStats) updateGapHistogram(gap int) {
|
||||
}
|
||||
|
||||
func (r *RTPStats) getAndResetSnapshot(snapshotId uint32) (*Snapshot, *Snapshot) {
|
||||
if !r.initialized || (r.params.IsReceiverReportDriven && !r.isRRSeen) {
|
||||
if !r.initialized || (r.params.IsReceiverReportDriven && r.lastRRTime.IsZero()) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
||||
+10
-7
@@ -105,13 +105,14 @@ var (
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
type DownTrackState struct {
|
||||
RTPStats *buffer.RTPStats
|
||||
ForwarderState ForwarderState
|
||||
RTPStats *buffer.RTPStats
|
||||
DeltaStatsSnapshotId uint32
|
||||
ForwarderState ForwarderState
|
||||
}
|
||||
|
||||
func (d DownTrackState) String() string {
|
||||
return fmt.Sprintf("DownTrackState{rtpStats: %s, forwarder: %s}",
|
||||
d.RTPStats.ToString(), d.ForwarderState.String())
|
||||
return fmt.Sprintf("DownTrackState{rtpStats: %s, delta: %d, forwarder: %s}",
|
||||
d.RTPStats.ToString(), d.DeltaStatsSnapshotId, d.ForwarderState.String())
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
@@ -769,13 +770,15 @@ func (d *DownTrack) MaxLayers() VideoLayers {
|
||||
|
||||
func (d *DownTrack) GetState() DownTrackState {
|
||||
return DownTrackState{
|
||||
RTPStats: d.rtpStats,
|
||||
ForwarderState: d.forwarder.GetState(),
|
||||
RTPStats: d.rtpStats,
|
||||
DeltaStatsSnapshotId: d.deltaStatsSnapshotId,
|
||||
ForwarderState: d.forwarder.GetState(),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DownTrack) SeedState(state DownTrackState) {
|
||||
d.rtpStats.Seed(state.RTPStats)
|
||||
d.deltaStatsSnapshotId = state.DeltaStatsSnapshotId
|
||||
d.forwarder.SeedState(state.ForwarderState)
|
||||
}
|
||||
|
||||
@@ -1162,7 +1165,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
|
||||
rttToReport = rtt
|
||||
}
|
||||
|
||||
d.rtpStats.UpdateFromReceiverReport(r.LastSequenceNumber, r.TotalLost, rtt, float64(r.Jitter))
|
||||
d.rtpStats.UpdateFromReceiverReport(r, rtt)
|
||||
}
|
||||
if len(rr.Reports) > 0 {
|
||||
d.listenerLock.RLock()
|
||||
|
||||
Reference in New Issue
Block a user