diff --git a/go.mod b/go.mod index 0f430e9f5..cf175a44f 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20231005043905-c137afffe71c + github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e github.com/livekit/protocol v1.8.0 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 @@ -27,14 +27,14 @@ require ( github.com/olekukonko/tablewriter v0.0.5 github.com/pion/dtls/v2 v2.2.7 github.com/pion/ice/v2 v2.3.11 - github.com/pion/interceptor v0.1.19 + github.com/pion/interceptor v0.1.23 github.com/pion/rtcp v1.2.10 - github.com/pion/rtp v1.8.1 + github.com/pion/rtp v1.8.2 github.com/pion/sctp v1.8.9 github.com/pion/sdp/v3 v3.0.6 github.com/pion/transport/v2 v2.2.4 github.com/pion/turn/v2 v2.1.4 - github.com/pion/webrtc/v3 v3.2.20 + github.com/pion/webrtc/v3 v3.2.21 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.17.0 github.com/redis/go-redis/v9 v9.2.1 diff --git a/go.sum b/go.sum index c2a682ef9..6626aafb1 100644 --- a/go.sum +++ b/go.sum @@ -124,8 +124,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20231005043905-c137afffe71c h1:eTghhsCfx2ltyzArXZ7wiNoFFzbfLXJ4uI/IsLXFZQc= -github.com/livekit/mediatransportutil v0.0.0-20231005043905-c137afffe71c/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= +github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= +github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= github.com/livekit/protocol v1.8.0 h1:0z2eRmEXFFXiJ7WPAxRLMNCyUu55w41iikbbeT8dvlQ= github.com/livekit/protocol v1.8.0/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= @@ -189,8 +189,8 @@ github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ github.com/pion/ice/v2 v2.3.11 h1:rZjVmUwyT55cmN8ySMpL7rsS8KYsJERsrxJLLxpKhdw= github.com/pion/ice/v2 v2.3.11/go.mod h1:hPcLC3kxMa+JGRzMHqQzjoSj3xtE9F+eoncmXLlCL4E= github.com/pion/interceptor v0.1.18/go.mod h1:tpvvF4cPM6NGxFA1DUMbhabzQBxdWMATDGEUYOR9x6I= -github.com/pion/interceptor v0.1.19 h1:tq0TGBzuZQqipyBhaC1mVUCfCh8XjDKUuibq9rIl5t4= -github.com/pion/interceptor v0.1.19/go.mod h1:VANhFxdJezB8mwToMMmrmyHyP9gym6xLqIUch31xryg= +github.com/pion/interceptor v0.1.23 h1:BZmayeasUYVDam891RtvE5rs6syqmSK3Wzy+xu+UNw0= +github.com/pion/interceptor v0.1.23/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/mdns v0.0.8 h1:HhicWIg7OX5PVilyBO6plhMetInbzkVJAhbdJiAeVaI= @@ -199,8 +199,9 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.10 h1:nkr3uj+8Sp97zyItdN60tE/S6vk4al5CPRR6Gejsdjc= github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I= -github.com/pion/rtp v1.8.1 h1:26OxTc6lKg/qLSGir5agLyj0QKaOv8OP5wps2SFnVNQ= github.com/pion/rtp v1.8.1/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= +github.com/pion/rtp v1.8.2 h1:oKMM0K1/QYQ5b5qH+ikqDSZRipP5mIxPJcgcvw5sH0w= +github.com/pion/rtp v1.8.2/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= github.com/pion/sctp v1.8.5/go.mod h1:SUFFfDpViyKejTAdwD1d/HQsCu+V/40cCs2nZIvC3s0= github.com/pion/sctp v1.8.8/go.mod h1:igF9nZBrjh5AtmKc7U30jXltsFHicFCXSmWA2GWRaWs= github.com/pion/sctp v1.8.9 h1:TP5ZVxV5J7rz7uZmbyvnUvsn7EJ2x/5q9uhsTtXbI3g= @@ -223,8 +224,8 @@ github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9 github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= github.com/pion/turn/v2 v2.1.4 h1:2xn8rduI5W6sCZQkEnIUDAkrBQNl2eYIBCHMZ3QMmP8= github.com/pion/turn/v2 v2.1.4/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= -github.com/pion/webrtc/v3 v3.2.20 h1:BQJiXQsJq9LgLp3op7rLy1y8d2WD+LtiS9cpY0uQ22A= -github.com/pion/webrtc/v3 v3.2.20/go.mod h1:vVURQTBOG5BpWKOJz3nlr23NfTDeyKVmubRNqzQp+Tg= +github.com/pion/webrtc/v3 v3.2.21 h1:c8fy5JcqJkAQBwwy3Sk9huQLTBUSqaggyRlv9Lnh2zY= +github.com/pion/webrtc/v3 v3.2.21/go.mod h1:vVURQTBOG5BpWKOJz3nlr23NfTDeyKVmubRNqzQp+Tg= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 856527635..9b7f1c311 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -228,7 +228,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra case *rtcp.SourceDescription: // do nothing for now case *rtcp.SenderReport: - buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime, pkt.PacketCount) + buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime) } } }) diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index a899909fd..69c4e391c 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -324,9 +324,9 @@ func (d *DummyReceiver) GetCalculatedClockRate(layer int32) uint32 { return 0 } -func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) { +func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { - return r.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer) + return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) } return 0, errors.New("receiver not available") } diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index c81274093..d59d3ce82 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -84,8 +84,8 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq if err != nil { return nil, err } - sink.Close() - source.Close() + defer sink.Close() + defer source.Close() // ensure it's created correctly err = s.confirmExecution(func() error { diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index f4a57d252..c6edd25b4 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -84,8 +84,7 @@ type Buffer struct { closed atomic.Bool mime string - snRangeMap *utils.RangeMap[uint64, uint64] - paddingOnlyDrops uint64 + snRangeMap *utils.RangeMap[uint64, uint64] latestTSForAudioLevelInitialized bool latestTSForAudioLevel uint32 @@ -455,7 +454,6 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil { b.logger.Errorw("could not exclude range", err, "sn", rtpPacket.SequenceNumber, "esn", flowState.ExtSequenceNumber) } - b.paddingOnlyDrops++ } return } @@ -726,14 +724,12 @@ func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport { return b.rtpStats.GetRtcpReceptionReport(b.mediaSSRC, b.lastFractionLostToReport, b.rrSnapshotId) } -func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64, packetCount uint32) { +func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { b.RLock() srData := &RTCPSenderReportData{ - RTPTimestamp: rtpTime, - NTPTimestamp: mediatransportutil.NtpTime(ntpTime), - PacketCount: packetCount, - PaddingOnlyDrops: b.paddingOnlyDrops, - At: time.Now(), + RTPTimestamp: rtpTime, + NTPTimestamp: mediatransportutil.NtpTime(ntpTime), + At: time.Now(), } if b.rtpStats != nil { diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 6cf0fe6eb..0c77299ae 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -32,7 +32,7 @@ const ( cFirstSnapshotID = 1 cFirstPacketTimeAdjustWindow = 2 * time.Minute - cFirstPacketTimeAdjustThreshold = 5 * time.Second + cFirstPacketTimeAdjustThreshold = 5 * time.Minute ) // ------------------------------------------------------- @@ -109,10 +109,7 @@ type RTCPSenderReportData struct { RTPTimestamp uint32 RTPTimestampExt uint64 NTPTimestamp mediatransportutil.NtpTime - PacketCount uint32 - // RAJA-REMOVE PacketCountExt uint64 - PaddingOnlyDrops uint64 - At time.Time + At time.Time } type RTPStatsParams struct { @@ -456,7 +453,7 @@ func (r *rtpStatsBase) GetRtt() uint32 { return r.rtt } -func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64) { +func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ts uint32, startTS uint32) { if time.Since(r.startTime) > cFirstPacketTimeAdjustWindow { return } @@ -467,7 +464,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64) // abnormal delay (maybe due to pacing or maybe due to queuing // in some network element along the way), push back first time // to an earlier instance. - samplesDiff := int64(ets - extStartTS) + samplesDiff := int32(ts - startTS) if samplesDiff < 0 { // out-of-order, skip return @@ -484,9 +481,9 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64) "nowTime", now.String(), "before", r.firstTime.String(), "after", firstTime.String(), - "adjustment", r.firstTime.Sub(firstTime), - "extNowTS", ets, - "extStartTS", extStartTS, + "adjustment", r.firstTime.Sub(firstTime).String(), + "nowTS", ts, + "startTS", startTS, ) if r.firstTime.Sub(firstTime) > cFirstPacketTimeAdjustThreshold { r.logger.Infow("first packet time adjustment too big, ignoring", @@ -494,9 +491,9 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64) "nowTime", now.String(), "before", r.firstTime.String(), "after", firstTime.String(), - "adjustment", r.firstTime.Sub(firstTime), - "extNowTS", ets, - "extStartTS", extStartTS, + "adjustment", r.firstTime.Sub(firstTime).String(), + "nowTS", ts, + "startTS", startTS, ) } else { r.firstTime = firstTime @@ -537,7 +534,7 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes "packetsExpected", packetsExpected, "startTime", startTime, "endTime", endTime, - "duration", endTime.Sub(startTime), + "duration", endTime.Sub(startTime).String(), ) return nil } diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 10709cf50..5902f57e8 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -151,7 +151,19 @@ func (r *RTPStatsReceiver) Update( } } if -gapSN >= cNumSequenceNumbers { - r.logger.Warnw("large sequence number gap negative", nil, "prev", resSN.PreExtendedHighest, "curr", resSN.ExtendedVal, "gap", gapSN) + r.logger.Warnw( + "large sequence number gap negative", nil, + "prev", resSN.PreExtendedHighest, + "curr", resSN.ExtendedVal, + "gap", gapSN, + "packetTime", packetTime.String(), + "sequenceNumber", sequenceNumber, + "timestamp", timestamp, + "marker", marker, + "hdrSize", hdrSize, + "payloadSize", payloadSize, + "paddingSize", paddingSize, + ) } if gapSN != 0 { @@ -205,7 +217,19 @@ func (r *RTPStatsReceiver) Update( flowState.ExtTimestamp = resTS.ExtendedVal } else { // in-order if gapSN >= cNumSequenceNumbers { - r.logger.Warnw("large sequence number gap", nil, "prev", resSN.PreExtendedHighest, "curr", resSN.ExtendedVal, "gap", gapSN) + r.logger.Warnw( + "large sequence number gap", nil, + "prev", resSN.PreExtendedHighest, + "curr", resSN.ExtendedVal, + "gap", gapSN, + "packetTime", packetTime.String(), + "sequenceNumber", sequenceNumber, + "timestamp", timestamp, + "marker", marker, + "hdrSize", hdrSize, + "payloadSize", payloadSize, + "paddingSize", paddingSize, + ) } // update gap histogram @@ -284,7 +308,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) srDataCopy := *srData srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + tsCycles - r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt, r.timestamp.GetExtendedStart()) + r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp, r.timestamp.GetStart()) if r.srNewest != nil && srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt { // This can happen when a track is replaced with a null and then restored - @@ -359,6 +383,11 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin fracLost = proxyFracLost } + totalLost := r.packetsLost + if totalLost > 0xffffff { // 24-bits max + totalLost = 0xffffff + } + lastSR := uint32(0) dlsr := uint32(0) if r.srNewest != nil { @@ -372,7 +401,7 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin return &rtcp.ReceptionReport{ SSRC: ssrc, FractionLost: fracLost, - TotalLost: uint32(r.packetsLost), + TotalLost: uint32(totalLost), LastSequenceNumber: uint32(now.extStartSN), Jitter: uint32(r.jitter), LastSenderReport: lastSR, diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 62538bdf7..2ac196904 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -282,7 +282,19 @@ func (r *RTPStatsSender) Update( return } if -gapSN >= cNumSequenceNumbers { - r.logger.Warnw("large sequence number gap negative", nil, "prev", r.extHighestSN, "curr", extSequenceNumber, "gap", gapSN) + r.logger.Warnw( + "large sequence number gap negative", nil, + "prev", r.extHighestSN, + "curr", extSequenceNumber, + "gap", gapSN, + "packetTime", packetTime.String(), + "sequenceNumber", extSequenceNumber, + "timestamp", extTimestamp, + "marker", marker, + "hdrSize", hdrSize, + "payloadSize", payloadSize, + "paddingSize", paddingSize, + ) } if extSequenceNumber < r.extStartSN { @@ -341,7 +353,19 @@ func (r *RTPStatsSender) Update( } } else { // in-order if gapSN >= cNumSequenceNumbers { - r.logger.Warnw("large sequence number gap", nil, "prev", r.extHighestSN, "curr", extSequenceNumber, "gap", gapSN) + r.logger.Warnw( + "large sequence number gap", nil, + "prev", r.extHighestSN, + "curr", extSequenceNumber, + "gap", gapSN, + "packetTime", packetTime.String(), + "sequenceNumber", extSequenceNumber, + "timestamp", extTimestamp, + "marker", marker, + "hdrSize", hdrSize, + "payloadSize", payloadSize, + "paddingSize", paddingSize, + ) } // update gap histogram @@ -416,9 +440,9 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt if !r.lastRRTime.IsZero() && r.extHighestSNFromRR > extHighestSNFromRR { r.logger.Debugw( fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, extHighestSNFromRR), - "lastRRTime", r.lastRRTime, + "lastRRTime", r.lastRRTime.String(), "lastRR", r.lastRR, - "sinceLastRR", time.Since(r.lastRRTime), + "sinceLastRR", time.Since(r.lastRRTime).String(), "receivedRR", rr, ) return @@ -438,6 +462,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt } } + // This is 24-bit max in the protocol. So, technically doesn't need extended type. But, done for consistency. packetsLostFromRR := r.packetsLostFromRR&0xFFFF_FFFF_0000_0000 + uint64(rr.TotalLost) if (rr.TotalLost-r.lastRR.TotalLost) < (1<<31) && rr.TotalLost < r.lastRR.TotalLost { packetsLostFromRR += (1 << 32) @@ -482,9 +507,9 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt if is.packetsNotFound != 0 { r.logger.Warnw( "potential sequence number de-sync", nil, - "lastRRTime", r.lastRRTime, + "lastRRTime", r.lastRRTime.String(), "lastRR", r.lastRR, - "sinceLastRR", time.Since(r.lastRRTime), + "sinceLastRR", time.Since(r.lastRRTime).String(), "receivedRR", rr, "extStartSN", r.extStartSN, "extHighestSN", r.extHighestSN, @@ -512,11 +537,11 @@ func (r *RTPStatsSender) LastReceiverReportTime() time.Time { return r.lastRRTime } -func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(ets uint64) { +func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(ts uint32) { r.lock.Lock() defer r.lock.Unlock() - r.maybeAdjustFirstPacketTime(ets, r.extStartTS) + r.maybeAdjustFirstPacketTime(ts, uint32(r.extStartTS)) } func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt uint64, err error) { @@ -581,14 +606,15 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui "prevTSExt", r.srNewest.RTPTimestampExt, "prevRTP", r.srNewest.RTPTimestamp, "prevNTP", r.srNewest.NTPTimestamp.Time().String(), + "extHighestTS", r.extHighestTS, "currTSExt", nowRTPExt, "currRTP", nowRTP, "currNTP", nowNTP.Time().String(), "timeNow", time.Now().String(), "firstTime", r.firstTime.String(), - "timeSinceFirst", timeSinceFirst, + "timeSinceFirst", timeSinceFirst.String(), "highestTime", r.highestTime.String(), - "timeSinceHighest", timeSinceHighest, + "timeSinceHighest", timeSinceHighest.String(), "nowRTPExtUsingTime", nowRTPExtUsingTime, "calculatedClockRate", calculatedClockRate, "nowRTPExtUsingRate", nowRTPExtUsingRate, @@ -646,9 +672,9 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo "startSN", then.extStartSN, "endSN", now.extStartSN, "packetsExpected", packetsExpected, - "startTime", startTime, - "endTime", endTime, - "duration", endTime.Sub(startTime), + "startTime", startTime.String(), + "endTime", endTime.String(), + "duration", endTime.Sub(startTime).String(), ) return nil } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index bc217888d..15884f34b 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1884,7 +1884,7 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, layer int32, srData *buffer.RTCPSenderReportData) error { if layer == d.forwarder.GetReferenceLayerSpatial() && srData != nil { - d.rtpStats.MaybeAdjustFirstPacketTime(srData.RTPTimestampExt + d.forwarder.GetReferenceTimestampOffset()) + d.rtpStats.MaybeAdjustFirstPacketTime(srData.RTPTimestamp + uint32(d.forwarder.GetReferenceTimestampOffset())) } return nil } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 503d13a7f..cebb5704b 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -42,6 +42,7 @@ const ( TransitionCostSpatial = 10 ResumeBehindThresholdSeconds = float64(0.2) // 200ms + ResumeBehindHighTresholdSeconds = float64(2.0) // 2 seconds LayerSwitchBehindThresholdSeconds = float64(0.05) // 50ms SwitchAheadThresholdSeconds = float64(0.025) // 25ms ) @@ -187,7 +188,7 @@ type Forwarder struct { codec webrtc.RTPCodecCapability kind webrtc.RTPCodecType logger logger.Logger - getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error) + getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error) getExpectedRTPTimestamp func(at time.Time) (uint64, error) muted bool @@ -215,7 +216,7 @@ type Forwarder struct { func NewForwarder( kind webrtc.RTPCodecType, logger logger.Logger, - getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error), + getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error), getExpectedRTPTimestamp func(at time.Time) (uint64, error), ) *Forwarder { f := &Forwarder{ @@ -1499,11 +1500,11 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // But, cases like muting/unmuting, clock vagaries, pacing, etc. make them not satisfy those conditions always. rtpMungerState := f.rtpMunger.GetLast() extLastTS := rtpMungerState.ExtLastTS - extRefTS := extLastTS extExpectedTS := extLastTS + extRefTS := extExpectedTS switchingAt := time.Now() if f.getReferenceLayerRTPTimestamp != nil { - ets, err := f.getReferenceLayerRTPTimestamp(extPkt.ExtTimestamp, layer, f.referenceLayerSpatial) + ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial) if err != nil { // error out if extRefTS is not available. It can happen when there is no sender report // for the layer being switched to. Can especially happen at the start of the track when layer switches are @@ -1513,7 +1514,15 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e return err } - extRefTS = ets + extRefTS = (extRefTS & 0xFFFF_FFFF_0000_0000) + uint64(ts) + + expectedTS32 := uint32(extExpectedTS) + if (ts-expectedTS32) < 1<<31 && ts < expectedTS32 { + extRefTS += (1 << 32) + } + if (expectedTS32-ts) < 1<<31 && expectedTS32 < ts && extRefTS >= 1<<32 { + extRefTS -= (1 << 32) + } } if f.getExpectedRTPTimestamp != nil { @@ -1569,6 +1578,17 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e if f.resumeBehindThreshold > 0 && diffSeconds > f.resumeBehindThreshold { logTransition("resume, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) extNextTS = extExpectedTS + } else if diffSeconds > ResumeBehindHighTresholdSeconds { + // could be due to incorrect reference calculation + f.logger.Infow( + "resume, reference very far behind", + "layer", layer, + "extExpectedTS", extExpectedTS, + "extRefTS", extRefTS, + "extLastTS", extLastTS, + "diffSeconds", diffSeconds, + ) + extNextTS = extExpectedTS } else { extNextTS = extRefTS } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 8c20c1300..2294ea9c4 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -82,7 +82,7 @@ type TrackReceiver interface { GetTemporalLayerFpsForSpatial(layer int32) (bool, []float32) GetCalculatedClockRate(layer int32) uint32 - GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) + GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) } // WebRTCReceiver receives a media track @@ -824,8 +824,8 @@ func (w *WebRTCReceiver) GetCalculatedClockRate(layer int32) uint32 { return w.streamTrackerManager.GetCalculatedClockRate(layer) } -func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) { - return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer) +func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { + return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) } // closes all track senders in parallel, returns when all are closed diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index b162158fe..8cfb4be27 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -202,9 +202,25 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara }, ErrOutOfOrderSequenceNumberCacheMiss } + extSequenceNumber := extPkt.ExtSequenceNumber - snOffset + if extSequenceNumber >= r.extLastSN { + // should not happen, just being paranoid + r.logger.Errorw( + "unexpected packet ordering", nil, + "extIncomingSN", extPkt.ExtSequenceNumber, + "extHighestIncominSN", r.extHighestIncomingSN, + "extLastSN", r.extLastSN, + "snOffsetIncoming", snOffset, + "snOffsetHighest", r.snOffset, + ) + return &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingOutOfOrder, + }, ErrOutOfOrderSequenceNumberCacheMiss + } + return &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, - extSequenceNumber: extPkt.ExtSequenceNumber - snOffset, + extSequenceNumber: extSequenceNumber, extTimestamp: extPkt.ExtTimestamp - r.tsOffset, }, nil } diff --git a/pkg/sfu/streamtracker/streamtracker_dd.go b/pkg/sfu/streamtracker/streamtracker_dd.go index c19b3fe85..b0f74ef44 100644 --- a/pkg/sfu/streamtracker/streamtracker_dd.go +++ b/pkg/sfu/streamtracker/streamtracker_dd.go @@ -192,6 +192,10 @@ func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize dtis := ddVal.Descriptor.FrameDependencies.DecodeTargetIndications for _, dt := range ddVal.DecodeTargets { + if len(dtis) <= dt.Target { + s.params.Logger.Errorw("len(dtis) less than target", nil, "target", dt.Target, "dtls", dtis) + continue + } // we are not dropping discardable frames now, so only ingore not present frames if dtis[dt.Target] == dd.DecodeTargetNotPresent { continue diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 19fd266e9..9aa82f91e 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -76,7 +76,7 @@ type StreamTrackerManager struct { senderReportMu sync.RWMutex senderReports [buffer.DefaultMaxLayerSpatial + 1]endsSenderReport - layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint64 + layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint32 closed core.Fuse @@ -563,10 +563,10 @@ func (s *StreamTrackerManager) updateLayerOffsetLocked(ref, other int32) { rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9 // calculate other layer's time stamp at the same time as ref layer's NTP time - normalizedOtherTS := srOther.RTPTimestampExt + uint64(rtpDiff) + normalizedOtherTS := srOther.RTPTimestamp + uint32(rtpDiff) // now both layers' time stamp refer to the same NTP time and the diff is the offset between the layers - offset := srRef.RTPTimestampExt - normalizedOtherTS + offset := srRef.RTPTimestamp - normalizedOtherTS // use minimal offset to indicate value availability in the extremely unlikely case of // both layers using the same timestamp @@ -643,7 +643,7 @@ func (s *StreamTrackerManager) GetCalculatedClockRate(layer int32) uint32 { return uint32(float64(rdsf) / tsf.Seconds()) } -func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) { +func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { s.senderReportMu.RLock() defer s.senderReportMu.RUnlock() @@ -655,7 +655,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ets uint64, layer i return 0, fmt.Errorf("offset unavailable, target: %d, reference: %d", layer, referenceLayer) } - return ets + s.layerOffsets[referenceLayer][layer], nil + return ts + s.layerOffsets[referenceLayer][layer], nil } func (s *StreamTrackerManager) GetMaxTemporalLayerSeen() int32 {