From df1e71aa3a07f57d028f039a42a0992f4fbdf7cf Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 16 Oct 2023 14:01:10 -0700 Subject: [PATCH 1/5] Update pion deps (#2100) Generated by renovateBot Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 6 +++--- go.sum | 11 ++++++----- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 0f430e9f5..9e5fe644b 100644 --- a/go.mod +++ b/go.mod @@ -27,14 +27,14 @@ require ( github.com/olekukonko/tablewriter v0.0.5 github.com/pion/dtls/v2 v2.2.7 github.com/pion/ice/v2 v2.3.11 - github.com/pion/interceptor v0.1.19 + github.com/pion/interceptor v0.1.23 github.com/pion/rtcp v1.2.10 - github.com/pion/rtp v1.8.1 + github.com/pion/rtp v1.8.2 github.com/pion/sctp v1.8.9 github.com/pion/sdp/v3 v3.0.6 github.com/pion/transport/v2 v2.2.4 github.com/pion/turn/v2 v2.1.4 - github.com/pion/webrtc/v3 v3.2.20 + github.com/pion/webrtc/v3 v3.2.21 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.17.0 github.com/redis/go-redis/v9 v9.2.1 diff --git a/go.sum b/go.sum index c2a682ef9..a2b5d93bb 100644 --- a/go.sum +++ b/go.sum @@ -189,8 +189,8 @@ github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ github.com/pion/ice/v2 v2.3.11 h1:rZjVmUwyT55cmN8ySMpL7rsS8KYsJERsrxJLLxpKhdw= github.com/pion/ice/v2 v2.3.11/go.mod h1:hPcLC3kxMa+JGRzMHqQzjoSj3xtE9F+eoncmXLlCL4E= github.com/pion/interceptor v0.1.18/go.mod h1:tpvvF4cPM6NGxFA1DUMbhabzQBxdWMATDGEUYOR9x6I= -github.com/pion/interceptor v0.1.19 h1:tq0TGBzuZQqipyBhaC1mVUCfCh8XjDKUuibq9rIl5t4= -github.com/pion/interceptor v0.1.19/go.mod h1:VANhFxdJezB8mwToMMmrmyHyP9gym6xLqIUch31xryg= +github.com/pion/interceptor v0.1.23 h1:BZmayeasUYVDam891RtvE5rs6syqmSK3Wzy+xu+UNw0= +github.com/pion/interceptor v0.1.23/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/mdns v0.0.8 h1:HhicWIg7OX5PVilyBO6plhMetInbzkVJAhbdJiAeVaI= @@ -199,8 +199,9 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.10 h1:nkr3uj+8Sp97zyItdN60tE/S6vk4al5CPRR6Gejsdjc= github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I= -github.com/pion/rtp v1.8.1 h1:26OxTc6lKg/qLSGir5agLyj0QKaOv8OP5wps2SFnVNQ= github.com/pion/rtp v1.8.1/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= +github.com/pion/rtp v1.8.2 h1:oKMM0K1/QYQ5b5qH+ikqDSZRipP5mIxPJcgcvw5sH0w= +github.com/pion/rtp v1.8.2/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= github.com/pion/sctp v1.8.5/go.mod h1:SUFFfDpViyKejTAdwD1d/HQsCu+V/40cCs2nZIvC3s0= github.com/pion/sctp v1.8.8/go.mod h1:igF9nZBrjh5AtmKc7U30jXltsFHicFCXSmWA2GWRaWs= github.com/pion/sctp v1.8.9 h1:TP5ZVxV5J7rz7uZmbyvnUvsn7EJ2x/5q9uhsTtXbI3g= @@ -223,8 +224,8 @@ github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9 github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= github.com/pion/turn/v2 v2.1.4 h1:2xn8rduI5W6sCZQkEnIUDAkrBQNl2eYIBCHMZ3QMmP8= github.com/pion/turn/v2 v2.1.4/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= -github.com/pion/webrtc/v3 v3.2.20 h1:BQJiXQsJq9LgLp3op7rLy1y8d2WD+LtiS9cpY0uQ22A= -github.com/pion/webrtc/v3 v3.2.20/go.mod h1:vVURQTBOG5BpWKOJz3nlr23NfTDeyKVmubRNqzQp+Tg= +github.com/pion/webrtc/v3 v3.2.21 h1:c8fy5JcqJkAQBwwy3Sk9huQLTBUSqaggyRlv9Lnh2zY= +github.com/pion/webrtc/v3 v3.2.21/go.mod h1:vVURQTBOG5BpWKOJz3nlr23NfTDeyKVmubRNqzQp+Tg= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= From 53e757fd2c04655014f9721c88e8e052cbb00405 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Tue, 17 Oct 2023 10:37:11 +0800 Subject: [PATCH 2/5] Fix panic on streamtracker_dd (#2147) --- pkg/sfu/streamtracker/streamtracker_dd.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/sfu/streamtracker/streamtracker_dd.go b/pkg/sfu/streamtracker/streamtracker_dd.go index c19b3fe85..b0f74ef44 100644 --- a/pkg/sfu/streamtracker/streamtracker_dd.go +++ b/pkg/sfu/streamtracker/streamtracker_dd.go @@ -192,6 +192,10 @@ func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize dtis := ddVal.Descriptor.FrameDependencies.DecodeTargetIndications for _, dt := range ddVal.DecodeTargets { + if len(dtis) <= dt.Target { + s.params.Logger.Errorw("len(dtis) less than target", nil, "target", dt.Target, "dtls", dtis) + continue + } // we are not dropping discardable frames now, so only ingore not present frames if dtis[dt.Target] == dd.DecodeTargetNotPresent { continue From 11c9c56a4d0d5e18ca8072cb61acac81209cddfa Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 17 Oct 2023 11:37:34 +0530 Subject: [PATCH 3/5] Defer close of source and sink to prevent error logs. (#2149) When a room is created via room service, when `StartSession` runs, it sees a closed request source and returns an error and that gets logged. It is not a real error. Defer the sink and source close so that room creation can finish without errors. --- pkg/service/roomservice.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index c81274093..d59d3ce82 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -84,8 +84,8 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq if err != nil { return nil, err } - sink.Close() - source.Close() + defer sink.Close() + defer source.Close() // ensure it's created correctly err = s.confirmExecution(func() error { From 3e4cd3a1614f04c399039f816cca1c6a81b13f4b Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 17 Oct 2023 23:52:14 +0530 Subject: [PATCH 4/5] Accept more range for first packet time adjustment. (#2150) --- go.mod | 2 +- go.sum | 4 ++-- pkg/rtc/mediatrack.go | 2 +- pkg/sfu/buffer/buffer.go | 14 +++++--------- pkg/sfu/buffer/rtpstats_base.go | 13 +++++-------- pkg/sfu/buffer/rtpstats_receiver.go | 7 ++++++- pkg/sfu/buffer/rtpstats_sender.go | 19 ++++++++++--------- 7 files changed, 30 insertions(+), 31 deletions(-) diff --git a/go.mod b/go.mod index 9e5fe644b..cf175a44f 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20231005043905-c137afffe71c + github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e github.com/livekit/protocol v1.8.0 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 diff --git a/go.sum b/go.sum index a2b5d93bb..6626aafb1 100644 --- a/go.sum +++ b/go.sum @@ -124,8 +124,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20231005043905-c137afffe71c h1:eTghhsCfx2ltyzArXZ7wiNoFFzbfLXJ4uI/IsLXFZQc= -github.com/livekit/mediatransportutil v0.0.0-20231005043905-c137afffe71c/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= +github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= +github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= github.com/livekit/protocol v1.8.0 h1:0z2eRmEXFFXiJ7WPAxRLMNCyUu55w41iikbbeT8dvlQ= github.com/livekit/protocol v1.8.0/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 856527635..9b7f1c311 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -228,7 +228,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra case *rtcp.SourceDescription: // do nothing for now case *rtcp.SenderReport: - buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime, pkt.PacketCount) + buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime) } } }) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 2b62b3427..77c6c58ab 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -84,8 +84,7 @@ type Buffer struct { closed atomic.Bool mime string - snRangeMap *utils.RangeMap[uint64, uint64] - paddingOnlyDrops uint64 + snRangeMap *utils.RangeMap[uint64, uint64] latestTSForAudioLevelInitialized bool latestTSForAudioLevel uint32 @@ -455,7 +454,6 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil { b.logger.Errorw("could not exclude range", err, "sn", rtpPacket.SequenceNumber, "esn", flowState.ExtSequenceNumber) } - b.paddingOnlyDrops++ } return } @@ -717,14 +715,12 @@ func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport { return b.rtpStats.GetRtcpReceptionReport(b.mediaSSRC, b.lastFractionLostToReport, b.rrSnapshotId) } -func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64, packetCount uint32) { +func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { b.RLock() srData := &RTCPSenderReportData{ - RTPTimestamp: rtpTime, - NTPTimestamp: mediatransportutil.NtpTime(ntpTime), - PacketCount: packetCount, - PaddingOnlyDrops: b.paddingOnlyDrops, - At: time.Now(), + RTPTimestamp: rtpTime, + NTPTimestamp: mediatransportutil.NtpTime(ntpTime), + At: time.Now(), } if b.rtpStats != nil { diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 6cf0fe6eb..9f8404e77 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -32,7 +32,7 @@ const ( cFirstSnapshotID = 1 cFirstPacketTimeAdjustWindow = 2 * time.Minute - cFirstPacketTimeAdjustThreshold = 5 * time.Second + cFirstPacketTimeAdjustThreshold = 5 * time.Minute ) // ------------------------------------------------------- @@ -109,10 +109,7 @@ type RTCPSenderReportData struct { RTPTimestamp uint32 RTPTimestampExt uint64 NTPTimestamp mediatransportutil.NtpTime - PacketCount uint32 - // RAJA-REMOVE PacketCountExt uint64 - PaddingOnlyDrops uint64 - At time.Time + At time.Time } type RTPStatsParams struct { @@ -484,7 +481,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64) "nowTime", now.String(), "before", r.firstTime.String(), "after", firstTime.String(), - "adjustment", r.firstTime.Sub(firstTime), + "adjustment", r.firstTime.Sub(firstTime).String(), "extNowTS", ets, "extStartTS", extStartTS, ) @@ -494,7 +491,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64) "nowTime", now.String(), "before", r.firstTime.String(), "after", firstTime.String(), - "adjustment", r.firstTime.Sub(firstTime), + "adjustment", r.firstTime.Sub(firstTime).String(), "extNowTS", ets, "extStartTS", extStartTS, ) @@ -537,7 +534,7 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes "packetsExpected", packetsExpected, "startTime", startTime, "endTime", endTime, - "duration", endTime.Sub(startTime), + "duration", endTime.Sub(startTime).String(), ) return nil } diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 10709cf50..0a2ccf0a9 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -359,6 +359,11 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin fracLost = proxyFracLost } + totalLost := r.packetsLost + if totalLost > 0xffffff { // 24-bits max + totalLost = 0xffffff + } + lastSR := uint32(0) dlsr := uint32(0) if r.srNewest != nil { @@ -372,7 +377,7 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin return &rtcp.ReceptionReport{ SSRC: ssrc, FractionLost: fracLost, - TotalLost: uint32(r.packetsLost), + TotalLost: uint32(totalLost), LastSequenceNumber: uint32(now.extStartSN), Jitter: uint32(r.jitter), LastSenderReport: lastSR, diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 62538bdf7..d3ea160a5 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -416,9 +416,9 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt if !r.lastRRTime.IsZero() && r.extHighestSNFromRR > extHighestSNFromRR { r.logger.Debugw( fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, extHighestSNFromRR), - "lastRRTime", r.lastRRTime, + "lastRRTime", r.lastRRTime.String(), "lastRR", r.lastRR, - "sinceLastRR", time.Since(r.lastRRTime), + "sinceLastRR", time.Since(r.lastRRTime).String(), "receivedRR", rr, ) return @@ -482,9 +482,9 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt if is.packetsNotFound != 0 { r.logger.Warnw( "potential sequence number de-sync", nil, - "lastRRTime", r.lastRRTime, + "lastRRTime", r.lastRRTime.String(), "lastRR", r.lastRR, - "sinceLastRR", time.Since(r.lastRRTime), + "sinceLastRR", time.Since(r.lastRRTime).String(), "receivedRR", rr, "extStartSN", r.extStartSN, "extHighestSN", r.extHighestSN, @@ -581,14 +581,15 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui "prevTSExt", r.srNewest.RTPTimestampExt, "prevRTP", r.srNewest.RTPTimestamp, "prevNTP", r.srNewest.NTPTimestamp.Time().String(), + "extHighestTS", r.extHighestTS, "currTSExt", nowRTPExt, "currRTP", nowRTP, "currNTP", nowNTP.Time().String(), "timeNow", time.Now().String(), "firstTime", r.firstTime.String(), - "timeSinceFirst", timeSinceFirst, + "timeSinceFirst", timeSinceFirst.String(), "highestTime", r.highestTime.String(), - "timeSinceHighest", timeSinceHighest, + "timeSinceHighest", timeSinceHighest.String(), "nowRTPExtUsingTime", nowRTPExtUsingTime, "calculatedClockRate", calculatedClockRate, "nowRTPExtUsingRate", nowRTPExtUsingRate, @@ -646,9 +647,9 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo "startSN", then.extStartSN, "endSN", now.extStartSN, "packetsExpected", packetsExpected, - "startTime", startTime, - "endTime", endTime, - "duration", endTime.Sub(startTime), + "startTime", startTime.String(), + "endTime", endTime.String(), + "duration", endTime.Sub(startTime).String(), ) return nil } From f97242c8ba1a440a417e49fbe107ac46a5deb486 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 18 Oct 2023 21:48:41 +0530 Subject: [PATCH 5/5] Use 32-bit time stamp to get reference time stamp on a switch. (#2153) * Use 32-bit time stamp to get reference time stamp on a switch. With relay and dyncast and migration, it is possible that different layers of a simulcast get out of sync in terms of extended type, i. e. layer 0 could keep running and its timestamp could have wrapped around and bumped the extended timestamp. But, another layer could start and stop. One possible solution is sending the extended timestamp across relay. But, that breaks down during migration if publisher has started afresh. Subscriber could still be using extended range. So, use 32-bit timestamp to infer reference timestamp and patch it with expected extended time stamp to derive the extended reference. * use calculated value * make it test friendly --- pkg/rtc/wrappedreceiver.go | 4 ++-- pkg/sfu/buffer/rtpstats_base.go | 12 +++++------ pkg/sfu/buffer/rtpstats_receiver.go | 30 +++++++++++++++++++++++--- pkg/sfu/buffer/rtpstats_sender.go | 33 +++++++++++++++++++++++++---- pkg/sfu/downtrack.go | 2 +- pkg/sfu/forwarder.go | 30 +++++++++++++++++++++----- pkg/sfu/receiver.go | 6 +++--- pkg/sfu/rtpmunger.go | 18 +++++++++++++++- pkg/sfu/streamtrackermanager.go | 10 ++++----- 9 files changed, 115 insertions(+), 30 deletions(-) diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 853d2c4f4..593a62487 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -317,9 +317,9 @@ func (d *DummyReceiver) GetCalculatedClockRate(layer int32) uint32 { return 0 } -func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) { +func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { - return r.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer) + return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) } return 0, errors.New("receiver not available") } diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 9f8404e77..0c77299ae 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -453,7 +453,7 @@ func (r *rtpStatsBase) GetRtt() uint32 { return r.rtt } -func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64) { +func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ts uint32, startTS uint32) { if time.Since(r.startTime) > cFirstPacketTimeAdjustWindow { return } @@ -464,7 +464,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64) // abnormal delay (maybe due to pacing or maybe due to queuing // in some network element along the way), push back first time // to an earlier instance. - samplesDiff := int64(ets - extStartTS) + samplesDiff := int32(ts - startTS) if samplesDiff < 0 { // out-of-order, skip return @@ -482,8 +482,8 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64) "before", r.firstTime.String(), "after", firstTime.String(), "adjustment", r.firstTime.Sub(firstTime).String(), - "extNowTS", ets, - "extStartTS", extStartTS, + "nowTS", ts, + "startTS", startTS, ) if r.firstTime.Sub(firstTime) > cFirstPacketTimeAdjustThreshold { r.logger.Infow("first packet time adjustment too big, ignoring", @@ -492,8 +492,8 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64) "before", r.firstTime.String(), "after", firstTime.String(), "adjustment", r.firstTime.Sub(firstTime).String(), - "extNowTS", ets, - "extStartTS", extStartTS, + "nowTS", ts, + "startTS", startTS, ) } else { r.firstTime = firstTime diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 0a2ccf0a9..5902f57e8 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -151,7 +151,19 @@ func (r *RTPStatsReceiver) Update( } } if -gapSN >= cNumSequenceNumbers { - r.logger.Warnw("large sequence number gap negative", nil, "prev", resSN.PreExtendedHighest, "curr", resSN.ExtendedVal, "gap", gapSN) + r.logger.Warnw( + "large sequence number gap negative", nil, + "prev", resSN.PreExtendedHighest, + "curr", resSN.ExtendedVal, + "gap", gapSN, + "packetTime", packetTime.String(), + "sequenceNumber", sequenceNumber, + "timestamp", timestamp, + "marker", marker, + "hdrSize", hdrSize, + "payloadSize", payloadSize, + "paddingSize", paddingSize, + ) } if gapSN != 0 { @@ -205,7 +217,19 @@ func (r *RTPStatsReceiver) Update( flowState.ExtTimestamp = resTS.ExtendedVal } else { // in-order if gapSN >= cNumSequenceNumbers { - r.logger.Warnw("large sequence number gap", nil, "prev", resSN.PreExtendedHighest, "curr", resSN.ExtendedVal, "gap", gapSN) + r.logger.Warnw( + "large sequence number gap", nil, + "prev", resSN.PreExtendedHighest, + "curr", resSN.ExtendedVal, + "gap", gapSN, + "packetTime", packetTime.String(), + "sequenceNumber", sequenceNumber, + "timestamp", timestamp, + "marker", marker, + "hdrSize", hdrSize, + "payloadSize", payloadSize, + "paddingSize", paddingSize, + ) } // update gap histogram @@ -284,7 +308,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) srDataCopy := *srData srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + tsCycles - r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt, r.timestamp.GetExtendedStart()) + r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp, r.timestamp.GetStart()) if r.srNewest != nil && srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt { // This can happen when a track is replaced with a null and then restored - diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index d3ea160a5..2ac196904 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -282,7 +282,19 @@ func (r *RTPStatsSender) Update( return } if -gapSN >= cNumSequenceNumbers { - r.logger.Warnw("large sequence number gap negative", nil, "prev", r.extHighestSN, "curr", extSequenceNumber, "gap", gapSN) + r.logger.Warnw( + "large sequence number gap negative", nil, + "prev", r.extHighestSN, + "curr", extSequenceNumber, + "gap", gapSN, + "packetTime", packetTime.String(), + "sequenceNumber", extSequenceNumber, + "timestamp", extTimestamp, + "marker", marker, + "hdrSize", hdrSize, + "payloadSize", payloadSize, + "paddingSize", paddingSize, + ) } if extSequenceNumber < r.extStartSN { @@ -341,7 +353,19 @@ func (r *RTPStatsSender) Update( } } else { // in-order if gapSN >= cNumSequenceNumbers { - r.logger.Warnw("large sequence number gap", nil, "prev", r.extHighestSN, "curr", extSequenceNumber, "gap", gapSN) + r.logger.Warnw( + "large sequence number gap", nil, + "prev", r.extHighestSN, + "curr", extSequenceNumber, + "gap", gapSN, + "packetTime", packetTime.String(), + "sequenceNumber", extSequenceNumber, + "timestamp", extTimestamp, + "marker", marker, + "hdrSize", hdrSize, + "payloadSize", payloadSize, + "paddingSize", paddingSize, + ) } // update gap histogram @@ -438,6 +462,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt } } + // This is 24-bit max in the protocol. So, technically doesn't need extended type. But, done for consistency. packetsLostFromRR := r.packetsLostFromRR&0xFFFF_FFFF_0000_0000 + uint64(rr.TotalLost) if (rr.TotalLost-r.lastRR.TotalLost) < (1<<31) && rr.TotalLost < r.lastRR.TotalLost { packetsLostFromRR += (1 << 32) @@ -512,11 +537,11 @@ func (r *RTPStatsSender) LastReceiverReportTime() time.Time { return r.lastRRTime } -func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(ets uint64) { +func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(ts uint32) { r.lock.Lock() defer r.lock.Unlock() - r.maybeAdjustFirstPacketTime(ets, r.extStartTS) + r.maybeAdjustFirstPacketTime(ts, uint32(r.extStartTS)) } func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt uint64, err error) { diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 929386f02..b26080526 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1883,7 +1883,7 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, layer int32, srData *buffer.RTCPSenderReportData) error { if layer == d.forwarder.GetReferenceLayerSpatial() && srData != nil { - d.rtpStats.MaybeAdjustFirstPacketTime(srData.RTPTimestampExt + d.forwarder.GetReferenceTimestampOffset()) + d.rtpStats.MaybeAdjustFirstPacketTime(srData.RTPTimestamp + uint32(d.forwarder.GetReferenceTimestampOffset())) } return nil } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 503d13a7f..cebb5704b 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -42,6 +42,7 @@ const ( TransitionCostSpatial = 10 ResumeBehindThresholdSeconds = float64(0.2) // 200ms + ResumeBehindHighTresholdSeconds = float64(2.0) // 2 seconds LayerSwitchBehindThresholdSeconds = float64(0.05) // 50ms SwitchAheadThresholdSeconds = float64(0.025) // 25ms ) @@ -187,7 +188,7 @@ type Forwarder struct { codec webrtc.RTPCodecCapability kind webrtc.RTPCodecType logger logger.Logger - getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error) + getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error) getExpectedRTPTimestamp func(at time.Time) (uint64, error) muted bool @@ -215,7 +216,7 @@ type Forwarder struct { func NewForwarder( kind webrtc.RTPCodecType, logger logger.Logger, - getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error), + getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error), getExpectedRTPTimestamp func(at time.Time) (uint64, error), ) *Forwarder { f := &Forwarder{ @@ -1499,11 +1500,11 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // But, cases like muting/unmuting, clock vagaries, pacing, etc. make them not satisfy those conditions always. rtpMungerState := f.rtpMunger.GetLast() extLastTS := rtpMungerState.ExtLastTS - extRefTS := extLastTS extExpectedTS := extLastTS + extRefTS := extExpectedTS switchingAt := time.Now() if f.getReferenceLayerRTPTimestamp != nil { - ets, err := f.getReferenceLayerRTPTimestamp(extPkt.ExtTimestamp, layer, f.referenceLayerSpatial) + ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial) if err != nil { // error out if extRefTS is not available. It can happen when there is no sender report // for the layer being switched to. Can especially happen at the start of the track when layer switches are @@ -1513,7 +1514,15 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e return err } - extRefTS = ets + extRefTS = (extRefTS & 0xFFFF_FFFF_0000_0000) + uint64(ts) + + expectedTS32 := uint32(extExpectedTS) + if (ts-expectedTS32) < 1<<31 && ts < expectedTS32 { + extRefTS += (1 << 32) + } + if (expectedTS32-ts) < 1<<31 && expectedTS32 < ts && extRefTS >= 1<<32 { + extRefTS -= (1 << 32) + } } if f.getExpectedRTPTimestamp != nil { @@ -1569,6 +1578,17 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e if f.resumeBehindThreshold > 0 && diffSeconds > f.resumeBehindThreshold { logTransition("resume, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) extNextTS = extExpectedTS + } else if diffSeconds > ResumeBehindHighTresholdSeconds { + // could be due to incorrect reference calculation + f.logger.Infow( + "resume, reference very far behind", + "layer", layer, + "extExpectedTS", extExpectedTS, + "extRefTS", extRefTS, + "extLastTS", extLastTS, + "diffSeconds", diffSeconds, + ) + extNextTS = extExpectedTS } else { extNextTS = extRefTS } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 434222078..e7ba1fb93 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -81,7 +81,7 @@ type TrackReceiver interface { GetTemporalLayerFpsForSpatial(layer int32) []float32 GetCalculatedClockRate(layer int32) uint32 - GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) + GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) } // WebRTCReceiver receives a media track @@ -777,8 +777,8 @@ func (w *WebRTCReceiver) GetCalculatedClockRate(layer int32) uint32 { return w.streamTrackerManager.GetCalculatedClockRate(layer) } -func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) { - return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer) +func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { + return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) } // closes all track senders in parallel, returns when all are closed diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index b162158fe..8cfb4be27 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -202,9 +202,25 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara }, ErrOutOfOrderSequenceNumberCacheMiss } + extSequenceNumber := extPkt.ExtSequenceNumber - snOffset + if extSequenceNumber >= r.extLastSN { + // should not happen, just being paranoid + r.logger.Errorw( + "unexpected packet ordering", nil, + "extIncomingSN", extPkt.ExtSequenceNumber, + "extHighestIncominSN", r.extHighestIncomingSN, + "extLastSN", r.extLastSN, + "snOffsetIncoming", snOffset, + "snOffsetHighest", r.snOffset, + ) + return &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingOutOfOrder, + }, ErrOutOfOrderSequenceNumberCacheMiss + } + return &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, - extSequenceNumber: extPkt.ExtSequenceNumber - snOffset, + extSequenceNumber: extSequenceNumber, extTimestamp: extPkt.ExtTimestamp - r.tsOffset, }, nil } diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 19fd266e9..9aa82f91e 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -76,7 +76,7 @@ type StreamTrackerManager struct { senderReportMu sync.RWMutex senderReports [buffer.DefaultMaxLayerSpatial + 1]endsSenderReport - layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint64 + layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint32 closed core.Fuse @@ -563,10 +563,10 @@ func (s *StreamTrackerManager) updateLayerOffsetLocked(ref, other int32) { rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9 // calculate other layer's time stamp at the same time as ref layer's NTP time - normalizedOtherTS := srOther.RTPTimestampExt + uint64(rtpDiff) + normalizedOtherTS := srOther.RTPTimestamp + uint32(rtpDiff) // now both layers' time stamp refer to the same NTP time and the diff is the offset between the layers - offset := srRef.RTPTimestampExt - normalizedOtherTS + offset := srRef.RTPTimestamp - normalizedOtherTS // use minimal offset to indicate value availability in the extremely unlikely case of // both layers using the same timestamp @@ -643,7 +643,7 @@ func (s *StreamTrackerManager) GetCalculatedClockRate(layer int32) uint32 { return uint32(float64(rdsf) / tsf.Seconds()) } -func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) { +func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { s.senderReportMu.RLock() defer s.senderReportMu.RUnlock() @@ -655,7 +655,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ets uint64, layer i return 0, fmt.Errorf("offset unavailable, target: %d, reference: %d", layer, referenceLayer) } - return ets + s.layerOffsets[referenceLayer][layer], nil + return ts + s.layerOffsets[referenceLayer][layer], nil } func (s *StreamTrackerManager) GetMaxTemporalLayerSeen() int32 {