diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 72f0a6f3a..365b7434a 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -228,7 +228,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra case *rtcp.SourceDescription: // do nothing for now case *rtcp.SenderReport: - buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime) + buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime, pkt.PacketCount) } } }) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 93471804e..aaeb08925 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -83,7 +83,8 @@ type Buffer struct { closed atomic.Bool mime string - snRangeMap *utils.RangeMap[uint64, uint64] + snRangeMap *utils.RangeMap[uint64, uint64] + paddingOnlyDrops uint64 latestTSForAudioLevelInitialized bool latestTSForAudioLevel uint32 @@ -416,7 +417,12 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { } flowState := b.updateStreamState(&rtpPacket, arrivalTime) + // process header extensions always as padding packets could be used for probing b.processHeaderExtensions(&rtpPacket, arrivalTime) + if flowState.IsNotHandled { + return + } + if len(rtpPacket.Payload) == 0 && (!flowState.IsOutOfOrder || flowState.IsDuplicate) { // drop padding only in-order or duplicate packet if !flowState.IsOutOfOrder { @@ -436,8 +442,9 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { // 44 - padding only - out-of-order + duplicate - dropped as duplicate // if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil { - b.logger.Errorw("could not exclude range", err, "sn", flowState.ExtSequenceNumber) + b.logger.Errorw("could not exclude range", err, "sn", rtpPacket.SequenceNumber, "esn", flowState.ExtSequenceNumber) } + b.paddingOnlyDrops++ } return } @@ -448,7 +455,8 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { b.logger.Errorw("could not get sequence number adjustment", err, "sn", flowState.ExtSequenceNumber, "payloadSize", len(rtpPacket.Payload)) return } - rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber - snAdjustment) + flowState.ExtSequenceNumber -= snAdjustment + rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber) _, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber) if err != nil { if err != bucket.ErrRTXPacket { @@ -687,14 +695,16 @@ func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport { return b.rtpStats.SnapshotRtcpReceptionReport(b.mediaSSRC, b.lastFractionLostToReport, b.rrSnapshotId) } -func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { +func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64, packetCount uint32) { + b.RLock() srData := &RTCPSenderReportData{ - RTPTimestamp: rtpTime, - NTPTimestamp: mediatransportutil.NtpTime(ntpTime), - At: time.Now(), + RTPTimestamp: rtpTime, + NTPTimestamp: mediatransportutil.NtpTime(ntpTime), + PacketCount: packetCount, + PaddingOnlyDrops: b.paddingOnlyDrops, + At: time.Now(), } - b.RLock() if b.rtpStats != nil { b.rtpStats.SetRtcpSenderReportData(srData) } diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index d9f58ba67..357b50902 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -58,6 +58,8 @@ func RTPDriftToString(r *livekit.RTPDrift) string { // ------------------------------------------------------- type RTPFlowState struct { + IsNotHandled bool + HasLoss bool LossStartInclusive uint64 LossEndExclusive uint64 @@ -129,10 +131,13 @@ type SnInfo struct { } type RTCPSenderReportData struct { - RTPTimestamp uint32 - RTPTimestampExt uint64 - NTPTimestamp mediatransportutil.NtpTime - At time.Time + RTPTimestamp uint32 + RTPTimestampExt uint64 + NTPTimestamp mediatransportutil.NtpTime + PacketCount uint32 + PacketCountExt uint64 + PaddingOnlyDrops uint64 + At time.Time } type RTPStatsParams struct { @@ -147,7 +152,9 @@ type RTPStats struct { lock sync.RWMutex - initialized bool + initialized bool + resyncOnNextPacket bool + shouldDiscountPaddingOnlyDrops bool startTime time.Time endTime time.Time @@ -239,6 +246,8 @@ func (r *RTPStats) Seed(from *RTPStats) { } r.initialized = from.initialized + r.resyncOnNextPacket = from.resyncOnNextPacket + r.shouldDiscountPaddingOnlyDrops = from.shouldDiscountPaddingOnlyDrops r.startTime = from.startTime // do not clone endTime as a non-zero endTime indicates an ended object @@ -365,14 +374,85 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa defer r.lock.Unlock() if !r.endTime.IsZero() { + flowState.IsNotHandled = true return } + if r.resyncOnNextPacket { + r.resyncOnNextPacket = false + + if r.initialized { + extHighestSN := r.sequenceNumber.GetExtendedHighest() + var newestPacketCount uint64 + var paddingOnlyDrops uint64 + var extExpectedHighestSN uint64 + var expectedHighestSN uint16 + var snCycles uint64 + + extHighestTS := r.timestamp.GetExtendedHighest() + var newestTS uint64 + var extExpectedHighestTS uint64 + var expectedHighestTS uint32 + var tsCycles uint64 + if r.srNewest != nil { + newestPacketCount = r.srNewest.PacketCountExt + paddingOnlyDrops = r.srNewest.PaddingOnlyDrops + if newestPacketCount != 0 { + extExpectedHighestSN = r.sequenceNumber.GetExtendedStart() + newestPacketCount + if r.shouldDiscountPaddingOnlyDrops { + extExpectedHighestSN -= paddingOnlyDrops + } + expectedHighestSN = uint16(extExpectedHighestSN & 0xFFFF) + snCycles = extExpectedHighestSN & 0xFFFF_FFFF_FFFF_0000 + if rtph.SequenceNumber-expectedHighestSN < (1<<15) && rtph.SequenceNumber < expectedHighestSN { + snCycles += (1 << 16) + } + if snCycles != 0 && expectedHighestSN-rtph.SequenceNumber < (1<<15) && expectedHighestSN < rtph.SequenceNumber { + snCycles -= (1 << 16) + } + } + + newestTS = r.srNewest.RTPTimestampExt + extExpectedHighestTS = newestTS + expectedHighestTS = uint32(extExpectedHighestTS & 0xFFFF_FFFF) + tsCycles = extExpectedHighestTS & 0xFFFF_FFFF_0000_0000 + if rtph.Timestamp-expectedHighestTS < (1<<31) && rtph.Timestamp < expectedHighestTS { + tsCycles += (1 << 32) + } + if tsCycles != 0 && expectedHighestTS-rtph.Timestamp < (1<<31) && expectedHighestTS < rtph.Timestamp { + tsCycles -= (1 << 32) + } + } + r.sequenceNumber.ResetHighest(snCycles + uint64(rtph.SequenceNumber) - 1) + r.timestamp.ResetHighest(tsCycles + uint64(rtph.Timestamp)) + r.highestTime = packetTime + r.logger.Debugw( + "resync", + "newestPacketCount", newestPacketCount, + "paddingOnlyDrops", paddingOnlyDrops, + "extExpectedHighestSN", extExpectedHighestSN, + "expectedHighestSN", expectedHighestSN, + "snCycles", snCycles, + "rtpSN", rtph.SequenceNumber, + "beforeExtHighestSN", extHighestSN, + "afterExtHighestSN", r.sequenceNumber.GetExtendedHighest(), + "newestTS", newestTS, + "extExpectedHighestTS", extExpectedHighestTS, + "expectedHighestTS", expectedHighestTS, + "tsCycles", tsCycles, + "rtpTS", rtph.Timestamp, + "beforeExtHighestTS", extHighestTS, + "afterExtHighestTS", r.timestamp.GetExtendedHighest(), + ) + } + } + var resSN utils.WrapAroundUpdateResult[uint64] var resTS utils.WrapAroundUpdateResult[uint64] if !r.initialized { if payloadSize == 0 { // do not start on a padding only packet + flowState.IsNotHandled = true return } @@ -511,16 +591,12 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa return } -func (r *RTPStats) Resync(esn uint64, ets uint64, at time.Time) { +func (r *RTPStats) ResyncOnNextPacket(shouldDiscountPaddingOnlyDrops bool) { r.lock.Lock() defer r.lock.Unlock() - if !r.initialized { - return - } - r.sequenceNumber.ResetHighest(esn - 1) - r.timestamp.ResetHighest(ets) - r.highestTime = at + r.resyncOnNextPacket = true + r.shouldDiscountPaddingOnlyDrops = shouldDiscountPaddingOnlyDrops } func (r *RTPStats) getPacketsExpected() uint64 { @@ -868,21 +944,23 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { return } - cycles := uint64(0) + tsCycles := uint64(0) + pcCycles := uint64(0) if r.srNewest != nil { - cycles = r.srNewest.RTPTimestampExt & 0xFFFF_FFFF_0000_0000 + tsCycles = r.srNewest.RTPTimestampExt & 0xFFFF_FFFF_0000_0000 if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp { - cycles += (1 << 32) + tsCycles += (1 << 32) } - ntpDiffSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()) - rtpDiff := uint64(ntpDiffSinceLast.Seconds() * float64(r.params.ClockRate)) - goArounds := rtpDiff / (1 << 32) - cycles += goArounds * (1 << 32) + pcCycles = r.srNewest.PacketCountExt & 0xFFFF_FFFF_0000_0000 + if (srData.PacketCount-r.srNewest.PacketCount) < (1<<31) && srData.PacketCount < r.srNewest.PacketCount { + pcCycles += (1 << 32) + } } srDataCopy := *srData - srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles + srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + tsCycles + srDataCopy.PacketCountExt = uint64(srDataCopy.PacketCount) + pcCycles r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt)