Handle the case of no packets in down stream tracks better. (#1500)

This commit is contained in:
Raja Subramanian
2023-03-07 14:32:43 +05:30
committed by GitHub
parent 38deab8991
commit 99601e6d41
3 changed files with 77 additions and 65 deletions
+43 -25
View File
@@ -31,6 +31,8 @@ type RTPFlowState struct {
}
type IntervalStats struct {
earliestPktTime int64
latestPktTime int64
packets uint32
bytes uint64
headerBytes uint64
@@ -54,6 +56,7 @@ type RTPDeltaInfo struct {
BytesPadding uint64
HeaderBytesPadding uint64
PacketsLost uint32
PacketsMissing uint32
Frames uint32
RttMax uint32
JitterMax float64
@@ -78,6 +81,7 @@ type Snapshot struct {
}
type SnInfo struct {
pktTime int64
hdrSize uint16
pktSize uint16
isPaddingOnly bool
@@ -365,7 +369,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
isDuplicate = true
} else {
r.packetsLost--
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker)
r.setSnInfo(rtph.SequenceNumber, packetTime, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker)
}
}
@@ -384,7 +388,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
r.clearSnInfos(r.highestSN+1, rtph.SequenceNumber)
r.packetsLost += uint32(diff - 1)
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker)
r.setSnInfo(rtph.SequenceNumber, packetTime, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker)
if rtph.SequenceNumber < r.highestSN && !first {
r.cycles++
@@ -434,7 +438,7 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSiz
beforeAdjust := r.extStartSN
r.extStartSN = uint32(rtph.SequenceNumber)
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker)
r.setSnInfo(rtph.SequenceNumber, packetTime, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker)
for _, s := range r.snapshots {
if s.extStartSN == beforeAdjust {
@@ -855,6 +859,9 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
r.lock.RLock()
defer r.lock.RUnlock()
startTime := then.startTime
endTime := now.startTime
packetsExpected := now.extStartSN - then.extStartSN
if packetsExpected > NumSequenceNumbers {
r.logger.Warnw(
@@ -864,28 +871,27 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
return nil
}
if packetsExpected == 0 {
r.logger.Debugw(
"no expected packets",
"info", fmt.Sprintf("start: %d @ %+v, end: %d @ %+v", then.extStartSN, then.startTime, now.extStartSN, now.startTime),
)
return nil
}
packetsLost := uint32(0)
intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
if r.params.IsReceiverReportDriven {
// by taking number of packets from interval report, packets not sent (because of missing packets in feed) will be accounted for
packetsExpected = intervalStats.packets + intervalStats.packetsPadding
if packetsExpected == 0 {
r.logger.Debugw(
"no expected packets in interval",
"info", fmt.Sprintf("start: %d @ %+v, end: %d @ %+v", then.extStartSN, then.startTime, now.extStartSN, now.startTime),
)
if r.params.IsReceiverReportDriven {
// not received RTCP RR
return nil
}
// discount loss in the interval as those are packets not sent at all
packetsLost = now.packetsLostOverridden - then.packetsLostOverridden - intervalStats.packetsLost
return &RTPDeltaInfo{
StartTime: startTime,
Duration: endTime.Sub(startTime),
}
}
packetsLost := uint32(0)
packetsMissing := uint32(0)
intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
if r.params.IsReceiverReportDriven {
startTime = time.Unix(0, intervalStats.earliestPktTime)
endTime = time.Unix(0, intervalStats.latestPktTime)
packetsMissing = intervalStats.packetsLost
packetsLost = now.packetsLostOverridden - then.packetsLostOverridden
if int32(packetsLost) < 0 {
packetsLost = 0
}
@@ -915,8 +921,8 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
return &RTPDeltaInfo{
StartTime: then.startTime,
Duration: now.startTime.Sub(then.startTime),
StartTime: startTime,
Duration: endTime.Sub(startTime),
Packets: packetsExpected - intervalStats.packetsPadding,
Bytes: intervalStats.bytes,
HeaderBytes: intervalStats.headerBytes,
@@ -927,6 +933,7 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
BytesPadding: intervalStats.bytesPadding,
HeaderBytesPadding: intervalStats.headerBytesPadding,
PacketsLost: packetsLost,
PacketsMissing: packetsMissing,
Frames: intervalStats.frames,
RttMax: then.maxRtt,
JitterMax: maxJitterTime,
@@ -1150,7 +1157,7 @@ func (r *RTPStats) getSnInfoOutOfOrderPtr(sn uint16) int {
return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask
}
func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool) {
func (r *RTPStats) setSnInfo(sn uint16, pktTime int64, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool) {
writePtr := 0
ooo := (sn - r.highestSN) > (1 << 15)
if !ooo {
@@ -1164,6 +1171,7 @@ func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadS
}
snInfo := &r.snInfos[writePtr]
snInfo.pktTime = pktTime
snInfo.pktSize = pktSize
snInfo.hdrSize = hdrSize
snInfo.isPaddingOnly = payloadSize == 0
@@ -1173,7 +1181,9 @@ func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadS
func (r *RTPStats) clearSnInfos(startInclusive uint16, endExclusive uint16) {
for sn := startInclusive; sn != endExclusive; sn++ {
snInfo := &r.snInfos[r.snInfoWritePtr]
snInfo.pktTime = 0
snInfo.pktSize = 0
snInfo.hdrSize = 0
snInfo.isPaddingOnly = false
snInfo.marker = false
@@ -1219,6 +1229,14 @@ func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16)
if snInfo.marker {
intervalStats.frames++
}
if intervalStats.earliestPktTime == 0 || snInfo.pktTime < intervalStats.earliestPktTime {
intervalStats.earliestPktTime = snInfo.pktTime
}
if intervalStats.latestPktTime == 0 || snInfo.pktTime > intervalStats.latestPktTime {
intervalStats.latestPktTime = snInfo.pktTime
}
}
if startInclusive == endExclusive {
+18 -22
View File
@@ -77,29 +77,26 @@ func (cs *ConnectionStats) GetScoreAndQuality() (float32, livekit.ConnectionQual
}
func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWithLayers, at time.Time) float32 {
if len(streams) == 0 {
cs.scorer.Update(nil, at)
} else {
var stat windowStat
for _, s := range streams {
if stat.startedAt.IsZero() || stat.startedAt.After(s.RTPStats.StartTime) {
stat.startedAt = s.RTPStats.StartTime
}
if stat.duration < s.RTPStats.Duration {
stat.duration = s.RTPStats.Duration
}
stat.packetsExpected += s.RTPStats.Packets + s.RTPStats.PacketsPadding
stat.packetsLost += s.RTPStats.PacketsLost
if stat.rttMax < s.RTPStats.RttMax {
stat.rttMax = s.RTPStats.RttMax
}
if stat.jitterMax < s.RTPStats.JitterMax {
stat.jitterMax = s.RTPStats.JitterMax
}
stat.bytes += s.RTPStats.Bytes - s.RTPStats.HeaderBytes // only use media payload size
var stat windowStat
for _, s := range streams {
if stat.startedAt.IsZero() || stat.startedAt.After(s.RTPStats.StartTime) {
stat.startedAt = s.RTPStats.StartTime
}
cs.scorer.Update(&stat, at)
if stat.duration < s.RTPStats.Duration {
stat.duration = s.RTPStats.Duration
}
stat.packetsExpected += s.RTPStats.Packets + s.RTPStats.PacketsPadding
stat.packetsLost += s.RTPStats.PacketsLost
stat.packetsMissing += s.RTPStats.PacketsMissing
if stat.rttMax < s.RTPStats.RttMax {
stat.rttMax = s.RTPStats.RttMax
}
if stat.jitterMax < s.RTPStats.JitterMax {
stat.jitterMax = s.RTPStats.JitterMax
}
stat.bytes += s.RTPStats.Bytes - s.RTPStats.HeaderBytes // only use media payload size
}
cs.scorer.Update(&stat, at)
mos, _ := cs.scorer.GetMOSAndQuality()
return mos
@@ -112,7 +109,6 @@ func (cs *ConnectionStats) getStat(at time.Time) *livekit.AnalyticsStat {
streams := cs.params.GetDeltaStats()
if len(streams) == 0 {
cs.updateScore(streams, at)
return nil
}
+16 -18
View File
@@ -30,6 +30,7 @@ type windowStat struct {
duration time.Duration
packetsExpected uint32
packetsLost uint32
packetsMissing uint32
bytes uint64
rttMax uint32
jitterMax float64
@@ -44,9 +45,14 @@ func (w *windowStat) calculatePacketScore(plw float64) float64 {
delayEffect = (effectiveDelay - 120.0) / 10.0
}
actualLost := w.packetsLost - w.packetsMissing
if int32(actualLost) < 0 {
actualLost = 0
}
lossEffect := float64(0.0)
if w.packetsExpected > 0 {
lossEffect = float64(w.packetsLost) * 100.0 / float64(w.packetsExpected)
lossEffect = float64(actualLost) * 100.0 / float64(w.packetsExpected)
}
lossEffect *= plw
@@ -256,26 +262,18 @@ func (q *qualityScorer) Update(stat *windowStat, at time.Time) {
reason := "none"
var ws *windowScore
if stat == nil {
if stat.packetsExpected == 0 {
reason = "dry"
ws = newWindowScoreWithScore(&windowStat{
startedAt: q.lastUpdateAt,
duration: at.Sub(q.lastUpdateAt),
}, poorScore)
ws = newWindowScoreWithScore(stat, poorScore)
} else {
if stat.packetsExpected == 0 {
reason = "dry"
ws = newWindowScoreWithScore(stat, poorScore)
wsPacket := newWindowScorePacket(stat, q.getPacketLossWeight(stat))
wsByte := newWindowScoreByte(stat, expectedBitrate)
if wsPacket.getScore() < wsByte.getScore() {
reason = "packet"
ws = wsPacket
} else {
wsPacket := newWindowScorePacket(stat, q.getPacketLossWeight(stat))
wsByte := newWindowScoreByte(stat, expectedBitrate)
if wsPacket.getScore() < wsByte.getScore() {
reason = "packet"
ws = wsPacket
} else {
reason = "bitrate"
ws = wsByte
}
reason = "bitrate"
ws = wsByte
}
}
score := ws.getScore()