Merge remote-tracking branch 'origin/master' into raja_min_packets

This commit is contained in:
boks1971
2023-08-31 11:55:49 +05:30
3 changed files with 117 additions and 29 deletions
+1 -1
View File
@@ -228,7 +228,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
case *rtcp.SourceDescription:
// do nothing for now
case *rtcp.SenderReport:
buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime)
buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime, pkt.PacketCount)
}
}
})
+18 -8
View File
@@ -83,7 +83,8 @@ type Buffer struct {
closed atomic.Bool
mime string
snRangeMap *utils.RangeMap[uint64, uint64]
snRangeMap *utils.RangeMap[uint64, uint64]
paddingOnlyDrops uint64
latestTSForAudioLevelInitialized bool
latestTSForAudioLevel uint32
@@ -416,7 +417,12 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
}
flowState := b.updateStreamState(&rtpPacket, arrivalTime)
// process header extensions always as padding packets could be used for probing
b.processHeaderExtensions(&rtpPacket, arrivalTime)
if flowState.IsNotHandled {
return
}
if len(rtpPacket.Payload) == 0 && (!flowState.IsOutOfOrder || flowState.IsDuplicate) {
// drop padding only in-order or duplicate packet
if !flowState.IsOutOfOrder {
@@ -436,8 +442,9 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
// 44 - padding only - out-of-order + duplicate - dropped as duplicate
//
if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil {
b.logger.Errorw("could not exclude range", err, "sn", flowState.ExtSequenceNumber)
b.logger.Errorw("could not exclude range", err, "sn", rtpPacket.SequenceNumber, "esn", flowState.ExtSequenceNumber)
}
b.paddingOnlyDrops++
}
return
}
@@ -448,7 +455,8 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
b.logger.Errorw("could not get sequence number adjustment", err, "sn", flowState.ExtSequenceNumber, "payloadSize", len(rtpPacket.Payload))
return
}
rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber - snAdjustment)
flowState.ExtSequenceNumber -= snAdjustment
rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber)
_, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber)
if err != nil {
if err != bucket.ErrRTXPacket {
@@ -687,14 +695,16 @@ func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport {
return b.rtpStats.SnapshotRtcpReceptionReport(b.mediaSSRC, b.lastFractionLostToReport, b.rrSnapshotId)
}
func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) {
func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64, packetCount uint32) {
b.RLock()
srData := &RTCPSenderReportData{
RTPTimestamp: rtpTime,
NTPTimestamp: mediatransportutil.NtpTime(ntpTime),
At: time.Now(),
RTPTimestamp: rtpTime,
NTPTimestamp: mediatransportutil.NtpTime(ntpTime),
PacketCount: packetCount,
PaddingOnlyDrops: b.paddingOnlyDrops,
At: time.Now(),
}
b.RLock()
if b.rtpStats != nil {
b.rtpStats.SetRtcpSenderReportData(srData)
}
+98 -20
View File
@@ -58,6 +58,8 @@ func RTPDriftToString(r *livekit.RTPDrift) string {
// -------------------------------------------------------
type RTPFlowState struct {
IsNotHandled bool
HasLoss bool
LossStartInclusive uint64
LossEndExclusive uint64
@@ -129,10 +131,13 @@ type SnInfo struct {
}
type RTCPSenderReportData struct {
RTPTimestamp uint32
RTPTimestampExt uint64
NTPTimestamp mediatransportutil.NtpTime
At time.Time
RTPTimestamp uint32
RTPTimestampExt uint64
NTPTimestamp mediatransportutil.NtpTime
PacketCount uint32
PacketCountExt uint64
PaddingOnlyDrops uint64
At time.Time
}
type RTPStatsParams struct {
@@ -147,7 +152,9 @@ type RTPStats struct {
lock sync.RWMutex
initialized bool
initialized bool
resyncOnNextPacket bool
shouldDiscountPaddingOnlyDrops bool
startTime time.Time
endTime time.Time
@@ -239,6 +246,8 @@ func (r *RTPStats) Seed(from *RTPStats) {
}
r.initialized = from.initialized
r.resyncOnNextPacket = from.resyncOnNextPacket
r.shouldDiscountPaddingOnlyDrops = from.shouldDiscountPaddingOnlyDrops
r.startTime = from.startTime
// do not clone endTime as a non-zero endTime indicates an ended object
@@ -365,14 +374,85 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
defer r.lock.Unlock()
if !r.endTime.IsZero() {
flowState.IsNotHandled = true
return
}
if r.resyncOnNextPacket {
r.resyncOnNextPacket = false
if r.initialized {
extHighestSN := r.sequenceNumber.GetExtendedHighest()
var newestPacketCount uint64
var paddingOnlyDrops uint64
var extExpectedHighestSN uint64
var expectedHighestSN uint16
var snCycles uint64
extHighestTS := r.timestamp.GetExtendedHighest()
var newestTS uint64
var extExpectedHighestTS uint64
var expectedHighestTS uint32
var tsCycles uint64
if r.srNewest != nil {
newestPacketCount = r.srNewest.PacketCountExt
paddingOnlyDrops = r.srNewest.PaddingOnlyDrops
if newestPacketCount != 0 {
extExpectedHighestSN = r.sequenceNumber.GetExtendedStart() + newestPacketCount
if r.shouldDiscountPaddingOnlyDrops {
extExpectedHighestSN -= paddingOnlyDrops
}
expectedHighestSN = uint16(extExpectedHighestSN & 0xFFFF)
snCycles = extExpectedHighestSN & 0xFFFF_FFFF_FFFF_0000
if rtph.SequenceNumber-expectedHighestSN < (1<<15) && rtph.SequenceNumber < expectedHighestSN {
snCycles += (1 << 16)
}
if snCycles != 0 && expectedHighestSN-rtph.SequenceNumber < (1<<15) && expectedHighestSN < rtph.SequenceNumber {
snCycles -= (1 << 16)
}
}
newestTS = r.srNewest.RTPTimestampExt
extExpectedHighestTS = newestTS
expectedHighestTS = uint32(extExpectedHighestTS & 0xFFFF_FFFF)
tsCycles = extExpectedHighestTS & 0xFFFF_FFFF_0000_0000
if rtph.Timestamp-expectedHighestTS < (1<<31) && rtph.Timestamp < expectedHighestTS {
tsCycles += (1 << 32)
}
if tsCycles != 0 && expectedHighestTS-rtph.Timestamp < (1<<31) && expectedHighestTS < rtph.Timestamp {
tsCycles -= (1 << 32)
}
}
r.sequenceNumber.ResetHighest(snCycles + uint64(rtph.SequenceNumber) - 1)
r.timestamp.ResetHighest(tsCycles + uint64(rtph.Timestamp))
r.highestTime = packetTime
r.logger.Debugw(
"resync",
"newestPacketCount", newestPacketCount,
"paddingOnlyDrops", paddingOnlyDrops,
"extExpectedHighestSN", extExpectedHighestSN,
"expectedHighestSN", expectedHighestSN,
"snCycles", snCycles,
"rtpSN", rtph.SequenceNumber,
"beforeExtHighestSN", extHighestSN,
"afterExtHighestSN", r.sequenceNumber.GetExtendedHighest(),
"newestTS", newestTS,
"extExpectedHighestTS", extExpectedHighestTS,
"expectedHighestTS", expectedHighestTS,
"tsCycles", tsCycles,
"rtpTS", rtph.Timestamp,
"beforeExtHighestTS", extHighestTS,
"afterExtHighestTS", r.timestamp.GetExtendedHighest(),
)
}
}
var resSN utils.WrapAroundUpdateResult[uint64]
var resTS utils.WrapAroundUpdateResult[uint64]
if !r.initialized {
if payloadSize == 0 {
// do not start on a padding only packet
flowState.IsNotHandled = true
return
}
@@ -511,16 +591,12 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
return
}
func (r *RTPStats) Resync(esn uint64, ets uint64, at time.Time) {
func (r *RTPStats) ResyncOnNextPacket(shouldDiscountPaddingOnlyDrops bool) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.initialized {
return
}
r.sequenceNumber.ResetHighest(esn - 1)
r.timestamp.ResetHighest(ets)
r.highestTime = at
r.resyncOnNextPacket = true
r.shouldDiscountPaddingOnlyDrops = shouldDiscountPaddingOnlyDrops
}
func (r *RTPStats) getPacketsExpected() uint64 {
@@ -868,21 +944,23 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
return
}
cycles := uint64(0)
tsCycles := uint64(0)
pcCycles := uint64(0)
if r.srNewest != nil {
cycles = r.srNewest.RTPTimestampExt & 0xFFFF_FFFF_0000_0000
tsCycles = r.srNewest.RTPTimestampExt & 0xFFFF_FFFF_0000_0000
if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp {
cycles += (1 << 32)
tsCycles += (1 << 32)
}
ntpDiffSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiff := uint64(ntpDiffSinceLast.Seconds() * float64(r.params.ClockRate))
goArounds := rtpDiff / (1 << 32)
cycles += goArounds * (1 << 32)
pcCycles = r.srNewest.PacketCountExt & 0xFFFF_FFFF_0000_0000
if (srData.PacketCount-r.srNewest.PacketCount) < (1<<31) && srData.PacketCount < r.srNewest.PacketCount {
pcCycles += (1 << 32)
}
}
srDataCopy := *srData
srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles
srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + tsCycles
srDataCopy.PacketCountExt = uint64(srDataCopy.PacketCount) + pcCycles
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt)