mirror of
https://github.com/livekit/livekit.git
synced 2026-05-15 03:05:26 +00:00
Discount out-of-order packets in downstream score. (#1831)
* Discount out-of-order packets in downstream score. More notes inline. * correct comment * clean up comment
This commit is contained in:
@@ -60,6 +60,7 @@ type IntervalStats struct {
|
||||
bytesPadding uint64
|
||||
headerBytesPadding uint64
|
||||
packetsLost uint32
|
||||
packetsOutOfOrder uint32
|
||||
frames uint32
|
||||
}
|
||||
|
||||
@@ -77,6 +78,7 @@ type RTPDeltaInfo struct {
|
||||
HeaderBytesPadding uint64
|
||||
PacketsLost uint32
|
||||
PacketsMissing uint32
|
||||
PacketsOutOfOrder uint32
|
||||
Frames uint32
|
||||
RttMax uint32
|
||||
JitterMax float64
|
||||
@@ -106,6 +108,7 @@ type SnInfo struct {
|
||||
pktSize uint16
|
||||
isPaddingOnly bool
|
||||
marker bool
|
||||
isOutOfOrder bool
|
||||
}
|
||||
|
||||
type RTCPSenderReportData struct {
|
||||
@@ -412,7 +415,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
|
||||
isDuplicate = true
|
||||
} else {
|
||||
r.packetsLost--
|
||||
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker)
|
||||
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -431,7 +434,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
|
||||
r.clearSnInfos(r.highestSN+1, rtph.SequenceNumber)
|
||||
r.packetsLost += uint32(diff - 1)
|
||||
|
||||
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker)
|
||||
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, false)
|
||||
|
||||
if rtph.SequenceNumber < r.highestSN && !first {
|
||||
r.cycles++
|
||||
@@ -490,7 +493,7 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, pktSize uint64, hdrSize
|
||||
beforeAdjust := r.extStartSN
|
||||
r.extStartSN = uint32(rtph.SequenceNumber)
|
||||
|
||||
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker)
|
||||
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, true)
|
||||
|
||||
for _, s := range r.snapshots {
|
||||
if s.extStartSN == beforeAdjust {
|
||||
@@ -1132,7 +1135,6 @@ func (r *RTPStats) DeltaInfoOverridden(snapshotId uint32) *RTPDeltaInfo {
|
||||
}
|
||||
|
||||
intervalStats := r.getIntervalStats(uint16(then.extStartSNOverridden), uint16(now.extStartSNOverridden))
|
||||
packetsMissing := intervalStats.packetsLost
|
||||
packetsLost := now.packetsLostOverridden - then.packetsLostOverridden
|
||||
if int32(packetsLost) < 0 {
|
||||
packetsLost = 0
|
||||
@@ -1173,7 +1175,8 @@ func (r *RTPStats) DeltaInfoOverridden(snapshotId uint32) *RTPDeltaInfo {
|
||||
BytesPadding: intervalStats.bytesPadding,
|
||||
HeaderBytesPadding: intervalStats.headerBytesPadding,
|
||||
PacketsLost: packetsLost,
|
||||
PacketsMissing: packetsMissing,
|
||||
PacketsMissing: intervalStats.packetsLost,
|
||||
PacketsOutOfOrder: intervalStats.packetsOutOfOrder,
|
||||
Frames: intervalStats.frames,
|
||||
RttMax: then.maxRtt,
|
||||
JitterMax: maxJitterTime,
|
||||
@@ -1409,7 +1412,7 @@ func (r *RTPStats) getSnInfoOutOfOrderPtr(sn uint16) int {
|
||||
return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask
|
||||
}
|
||||
|
||||
func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool) {
|
||||
func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool, isOutOfOrder bool) {
|
||||
writePtr := 0
|
||||
ooo := (sn - r.highestSN) > (1 << 15)
|
||||
if !ooo {
|
||||
@@ -1427,6 +1430,7 @@ func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadS
|
||||
snInfo.hdrSize = hdrSize
|
||||
snInfo.isPaddingOnly = payloadSize == 0
|
||||
snInfo.marker = marker
|
||||
snInfo.isOutOfOrder = isOutOfOrder
|
||||
}
|
||||
|
||||
func (r *RTPStats) clearSnInfos(startInclusive uint16, endExclusive uint16) {
|
||||
@@ -1474,6 +1478,9 @@ func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16)
|
||||
intervalStats.packets++
|
||||
intervalStats.bytes += uint64(snInfo.pktSize)
|
||||
intervalStats.headerBytes += uint64(snInfo.hdrSize)
|
||||
if snInfo.isOutOfOrder {
|
||||
intervalStats.packetsOutOfOrder++
|
||||
}
|
||||
}
|
||||
|
||||
if snInfo.marker {
|
||||
@@ -1822,6 +1829,7 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
|
||||
|
||||
packetsLost := uint32(0)
|
||||
packetsMissing := uint32(0)
|
||||
packetsOutOfOrder := uint32(0)
|
||||
|
||||
frames := uint32(0)
|
||||
|
||||
@@ -1860,6 +1868,7 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
|
||||
|
||||
packetsLost += deltaInfo.PacketsLost
|
||||
packetsMissing += deltaInfo.PacketsMissing
|
||||
packetsOutOfOrder += deltaInfo.PacketsOutOfOrder
|
||||
|
||||
frames += deltaInfo.Frames
|
||||
|
||||
@@ -1893,6 +1902,7 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
|
||||
HeaderBytesPadding: headerBytesPadding,
|
||||
PacketsLost: packetsLost,
|
||||
PacketsMissing: packetsMissing,
|
||||
PacketsOutOfOrder: packetsOutOfOrder,
|
||||
Frames: frames,
|
||||
RttMax: maxRtt,
|
||||
JitterMax: maxJitter,
|
||||
|
||||
@@ -108,6 +108,7 @@ func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, at
|
||||
stat.packetsExpected = agg.Packets + agg.PacketsPadding
|
||||
stat.packetsLost = agg.PacketsLost
|
||||
stat.packetsMissing = agg.PacketsMissing
|
||||
stat.packetsOutOfOrder = agg.PacketsOutOfOrder
|
||||
stat.bytes = agg.Bytes - agg.HeaderBytes // only use media payload size
|
||||
stat.rttMax = agg.RttMax
|
||||
stat.jitterMax = agg.JitterMax
|
||||
|
||||
@@ -29,14 +29,15 @@ const (
|
||||
// ------------------------------------------
|
||||
|
||||
type windowStat struct {
|
||||
startedAt time.Time
|
||||
duration time.Duration
|
||||
packetsExpected uint32
|
||||
packetsLost uint32
|
||||
packetsMissing uint32
|
||||
bytes uint64
|
||||
rttMax uint32
|
||||
jitterMax float64
|
||||
startedAt time.Time
|
||||
duration time.Duration
|
||||
packetsExpected uint32
|
||||
packetsLost uint32
|
||||
packetsMissing uint32
|
||||
packetsOutOfOrder uint32
|
||||
bytes uint64
|
||||
rttMax uint32
|
||||
jitterMax float64
|
||||
}
|
||||
|
||||
func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJitter bool) float64 {
|
||||
@@ -59,7 +60,28 @@ func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJ
|
||||
delayEffect = (effectiveDelay - 120.0) / 10.0
|
||||
}
|
||||
|
||||
actualLost := w.packetsLost - w.packetsMissing
|
||||
// discount out-of-order packets from loss to deal with a scenario like
|
||||
// 1. up stream has loss
|
||||
// 2. down stream forwards with loss/hole in sequence number
|
||||
// 3. down stream client reports a certain number of loss
|
||||
// 4. while processing that, up stream could have retransmitted missing packets
|
||||
// 5. those retransmitted packets are forwarded,
|
||||
// - server's view: it has forwarded those packets
|
||||
// - client's view: it had not seen those packets when sending RTCP RR
|
||||
// so those retransmitted packets appear like down stream loss to server.
|
||||
//
|
||||
// retransmitted packets would have arrived out-of-order. So, discounting them
|
||||
// will account for it.
|
||||
//
|
||||
// Note that packets can arrive out-of-order in the upstream during regular
|
||||
// streaming as well, i. e. without loss + NACK + retransmit. Those will be
|
||||
// discounted too. And that will skew the real loss. For example, let
|
||||
// us say that 40 out of 100 packets were reported lost by down stream.
|
||||
// These could be real losses. In the same window, 40 packets could have been
|
||||
// delivered out-of-order by the up stream, thus cancelling out the real loss.
|
||||
// But, those situations should be rare and is a compromise for not letting
|
||||
// up stream loss penalise down stream.
|
||||
actualLost := w.packetsLost - w.packetsMissing - w.packetsOutOfOrder
|
||||
if int32(actualLost) < 0 {
|
||||
actualLost = 0
|
||||
}
|
||||
@@ -102,12 +124,13 @@ func (w *windowStat) calculateBitrateScore(expectedBitrate int64) float64 {
|
||||
}
|
||||
|
||||
func (w *windowStat) String() string {
|
||||
return fmt.Sprintf("start: %+v, dur: %+v, pe: %d, pl: %d, pm: %d, b: %d, rtt: %d, jitter: %0.2f",
|
||||
return fmt.Sprintf("start: %+v, dur: %+v, pe: %d, pl: %d, pm: %d, pooo: %d, b: %d, rtt: %d, jitter: %0.2f",
|
||||
w.startedAt,
|
||||
w.duration,
|
||||
w.packetsExpected,
|
||||
w.packetsLost,
|
||||
w.packetsMissing,
|
||||
w.packetsOutOfOrder,
|
||||
w.bytes,
|
||||
w.rttMax,
|
||||
w.jitterMax,
|
||||
|
||||
Reference in New Issue
Block a user