From b58307f1449c6e9ed55bc4de03000e98c94d9c19 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Sun, 3 Mar 2024 12:34:49 +0800 Subject: [PATCH] Support XR request/response for rtt calculation (#2536) * Support XR request/response for rtt calculation * Update pkg/sfu/downtrack.go Co-authored-by: David Zhao --------- Co-authored-by: David Zhao --- go.mod | 4 +- go.sum | 8 +- pkg/rtc/mediatrack.go | 32 +++++++- pkg/rtc/mediatracksubscriptions.go | 1 + pkg/rtc/transport.go | 10 ++- pkg/rtc/types/interfaces.go | 2 + .../typesfakes/fake_local_participant.go | 79 +++++++++++++++++++ pkg/sfu/downtrack.go | 29 +++++++ 8 files changed, 155 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index f803a07a2..669c5ea85 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20240228075855-6fbf3be6f6ef + github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8 github.com/livekit/protocol v1.10.1 github.com/livekit/psrpc v0.5.3-0.20240227154351-b7f99eaaf7b3 github.com/mackerelio/go-osstat v0.2.4 @@ -29,7 +29,7 @@ require ( github.com/pion/dtls/v2 v2.2.10 github.com/pion/ice/v2 v2.3.14 github.com/pion/interceptor v0.1.25 - github.com/pion/rtcp v1.2.13 + github.com/pion/rtcp v1.2.14 github.com/pion/rtp v1.8.3 github.com/pion/sctp v1.8.12 github.com/pion/sdp/v3 v3.0.6 diff --git a/go.sum b/go.sum index 80604eeea..dc3e827a7 100644 --- a/go.sum +++ b/go.sum @@ -128,8 +128,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20240228075855-6fbf3be6f6ef h1:Db/UItb+Cvm1trBRJiEZOdRSyss+LDY4e8gU9aE4GRc= -github.com/livekit/mediatransportutil v0.0.0-20240228075855-6fbf3be6f6ef/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc= +github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8 h1:xawydPEACNO5Ncs2LgioTjWghXQ0eUN1q1RnVUUyVnI= +github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= github.com/livekit/protocol v1.10.1 h1:upe6pKRqH8wpsMuR2OLtgizEm94iia3pDYm3O4/2PRY= github.com/livekit/protocol v1.10.1/go.mod h1:eWPz45pnxwpCwB84qqhHxG0bCRgasa2itN6GAHCDddc= github.com/livekit/psrpc v0.5.3-0.20240227154351-b7f99eaaf7b3 h1:bvjzDR+Rvdf3JgzQMtLiGVHBQ8KoOWM7x7sHj79jevQ= @@ -200,8 +200,8 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I= github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= -github.com/pion/rtcp v1.2.13 h1:+EQijuisKwm/8VBs8nWllr0bIndR7Lf7cZG200mpbNo= -github.com/pion/rtcp v1.2.13/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= +github.com/pion/rtcp v1.2.14 h1:KCkGV3vJ+4DAJmvP0vaQShsb0xkRfWkO540Gy102KyE= +github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= github.com/pion/rtp v1.8.2/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= github.com/pion/rtp v1.8.3 h1:VEHxqzSVQxCkKDSHro5/4IUUG1ea+MFdqR2R3xSpNU8= github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 57141fb62..870c8c15c 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -16,8 +16,10 @@ package rtc import ( "context" + "math" "strings" "sync" + "time" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" @@ -32,6 +34,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" "github.com/livekit/livekit-server/pkg/telemetry" + util "github.com/livekit/mediatransportutil" ) // MediaTrack represents a WebRTC track that needs to be forwarded @@ -47,6 +50,8 @@ type MediaTrack struct { dynacastManager *DynacastManager lock sync.RWMutex + + rttFromXR atomic.Bool } type MediaTrackParams struct { @@ -183,12 +188,14 @@ func (t *MediaTrack) UpdateCodecCid(codecs []*livekit.SimulcastCodec) { // AddReceiver adds a new RTP receiver to the track, returns true when receiver represents a new codec func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, mid string) bool { var newCodec bool - buff, rtcpReader := t.params.BufferFactory.GetBufferPair(uint32(track.SSRC())) + ssrc := uint32(track.SSRC()) + buff, rtcpReader := t.params.BufferFactory.GetBufferPair(ssrc) if buff == nil || rtcpReader == nil { t.params.Logger.Errorw("could not retrieve buffer pair", nil) return newCodec } + var lastRR uint32 rtcpReader.OnPacket(func(bytes []byte) { pkts, err := rtcp.Unmarshal(bytes) if err != nil { @@ -202,6 +209,25 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra // do nothing for now case *rtcp.SenderReport: buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime) + case *rtcp.ExtendedReport: + rttFromXR: + for _, report := range pkt.Reports { + if rr, ok := report.(*rtcp.DLRRReportBlock); ok { + for _, dlrrReport := range rr.Reports { + if dlrrReport.LastRR <= lastRR { + continue + } + nowNTP := util.ToNtpTime(time.Now()) + nowNTP32 := uint32(nowNTP >> 16) + ntpDiff := nowNTP32 - dlrrReport.LastRR - dlrrReport.DLRR + rtt := uint32(math.Ceil(float64(ntpDiff) * 1000.0 / 65536.0)) + buff.SetRTT(rtt) + t.rttFromXR.Store(true) + lastRR = dlrrReport.LastRR + break rttFromXR + } + } + } } } }) @@ -359,7 +385,9 @@ func (t *MediaTrack) GetConnectionScoreAndQuality() (float32, livekit.Connection } func (t *MediaTrack) SetRTT(rtt uint32) { - t.MediaTrackReceiver.SetRTT(rtt) + if !t.rttFromXR.Load() { + t.MediaTrackReceiver.SetRTT(rtt) + } } func (t *MediaTrack) HasPendingCodec() bool { diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 652e2fcfb..ee67987e4 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -138,6 +138,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * Pacer: sub.GetPacer(), Trailer: trailer, Logger: LoggerWithTrack(sub.GetLogger().WithComponent(sutils.ComponentSub), trackID, t.params.IsRelayed), + RTCPWriter: sub.WriteSubscriberRTCP, }) if err != nil { return nil, err diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 1765f865b..77b8de20b 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -38,13 +38,14 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc/transport" "github.com/livekit/livekit-server/pkg/rtc/types" - lkinterceptor "github.com/livekit/livekit-server/pkg/sfu/interceptor" + sfuinterceptor "github.com/livekit/livekit-server/pkg/sfu/interceptor" "github.com/livekit/livekit-server/pkg/sfu/pacer" "github.com/livekit/livekit-server/pkg/sfu/rtpextension" "github.com/livekit/livekit-server/pkg/sfu/streamallocator" sfuutils "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" sutils "github.com/livekit/livekit-server/pkg/utils" + lkinterceptor "github.com/livekit/mediatransportutil/pkg/interceptor" lktwcc "github.com/livekit/mediatransportutil/pkg/twcc" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -322,6 +323,11 @@ func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimat } } } + } else { + // sfu only use interceptor to send XR but don't read response from it (use buffer instead), + // so use a empty callback here + ir.Add(lkinterceptor.NewRTTFromXRFactory(func(rtt uint32) { + })) } if len(params.SimTracks) > 0 { f, err := NewUnhandleSimulcastInterceptorFactory(UnhandleSimulcastTracks(params.SimTracks)) @@ -361,7 +367,7 @@ func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimat } } // put rtx interceptor behind unhandle simulcast interceptor so it can get the correct mid & rid - ir.Add(lkinterceptor.NewRTXInfoExtractorFactory(setTWCCForVideo, func(repair, base uint32) { + ir.Add(sfuinterceptor.NewRTXInfoExtractorFactory(setTWCCForVideo, func(repair, base uint32) { params.Logger.Debugw("rtx pair found from extension", "repair", repair, "base", base) params.Config.BufferFactory.SetRTXPair(repair, base) }, params.Logger)) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 8ad80ef24..88f624396 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -344,6 +344,8 @@ type LocalParticipant interface { AddTransceiverFromTrackToSubscriber(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) RemoveTrackFromSubscriber(sender *webrtc.RTPSender) error + WriteSubscriberRTCP(pkts []rtcp.Packet) error + // subscriptions SubscribeToTrack(trackID livekit.TrackID) UnsubscribeFromTrack(trackID livekit.TrackID) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 131e2f87c..4cc2d1db4 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -979,6 +979,17 @@ type FakeLocalParticipant struct { waitUntilSubscribedReturnsOnCall map[int]struct { result1 error } + WriteSubscriberRTCPStub func([]rtcp.Packet) error + writeSubscriberRTCPMutex sync.RWMutex + writeSubscriberRTCPArgsForCall []struct { + arg1 []rtcp.Packet + } + writeSubscriberRTCPReturns struct { + result1 error + } + writeSubscriberRTCPReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -6207,6 +6218,72 @@ func (fake *FakeLocalParticipant) WaitUntilSubscribedReturnsOnCall(i int, result }{result1} } +func (fake *FakeLocalParticipant) WriteSubscriberRTCP(arg1 []rtcp.Packet) error { + var arg1Copy []rtcp.Packet + if arg1 != nil { + arg1Copy = make([]rtcp.Packet, len(arg1)) + copy(arg1Copy, arg1) + } + fake.writeSubscriberRTCPMutex.Lock() + ret, specificReturn := fake.writeSubscriberRTCPReturnsOnCall[len(fake.writeSubscriberRTCPArgsForCall)] + fake.writeSubscriberRTCPArgsForCall = append(fake.writeSubscriberRTCPArgsForCall, struct { + arg1 []rtcp.Packet + }{arg1Copy}) + stub := fake.WriteSubscriberRTCPStub + fakeReturns := fake.writeSubscriberRTCPReturns + fake.recordInvocation("WriteSubscriberRTCP", []interface{}{arg1Copy}) + fake.writeSubscriberRTCPMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) WriteSubscriberRTCPCallCount() int { + fake.writeSubscriberRTCPMutex.RLock() + defer fake.writeSubscriberRTCPMutex.RUnlock() + return len(fake.writeSubscriberRTCPArgsForCall) +} + +func (fake *FakeLocalParticipant) WriteSubscriberRTCPCalls(stub func([]rtcp.Packet) error) { + fake.writeSubscriberRTCPMutex.Lock() + defer fake.writeSubscriberRTCPMutex.Unlock() + fake.WriteSubscriberRTCPStub = stub +} + +func (fake *FakeLocalParticipant) WriteSubscriberRTCPArgsForCall(i int) []rtcp.Packet { + fake.writeSubscriberRTCPMutex.RLock() + defer fake.writeSubscriberRTCPMutex.RUnlock() + argsForCall := fake.writeSubscriberRTCPArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeLocalParticipant) WriteSubscriberRTCPReturns(result1 error) { + fake.writeSubscriberRTCPMutex.Lock() + defer fake.writeSubscriberRTCPMutex.Unlock() + fake.WriteSubscriberRTCPStub = nil + fake.writeSubscriberRTCPReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) WriteSubscriberRTCPReturnsOnCall(i int, result1 error) { + fake.writeSubscriberRTCPMutex.Lock() + defer fake.writeSubscriberRTCPMutex.Unlock() + fake.WriteSubscriberRTCPStub = nil + if fake.writeSubscriberRTCPReturnsOnCall == nil { + fake.writeSubscriberRTCPReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.writeSubscriberRTCPReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -6430,6 +6507,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.verifySubscribeParticipantInfoMutex.RUnlock() fake.waitUntilSubscribedMutex.RLock() defer fake.waitUntilSubscribedMutex.RUnlock() + fake.writeSubscriberRTCPMutex.RLock() + defer fake.writeSubscriberRTCPMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 79abf6242..bf6d9970e 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -206,6 +206,7 @@ type DowntrackParams struct { Pacer pacer.Pacer Logger logger.Logger Trailer []byte + RTCPWriter func([]rtcp.Packet) error } // DownTrack implements TrackLocal, is the track used to write packets @@ -1567,6 +1568,34 @@ func (d *DownTrack) handleRTCP(bytes []byte) { sal.OnTransportCCFeedback(d, p) } } + + case *rtcp.ExtendedReport: + // SFU only responds with the DLRRReport for the track has the sender SSRC, the behavior is different with + // browser's implementation, which includes all sent tracks. It is ok since all the tracks + // use the same connection, and server-sdk-go can get the rtt from the first DLRRReport + // (libwebrtc/browsers don't send XR to calculate rtt, it only responds) + var lastRR uint32 + for _, report := range p.Reports { + if rr, ok := report.(*rtcp.ReceiverReferenceTimeReportBlock); ok { + lastRR = uint32(rr.NTPTimestamp >> 16) + break + } + } + + if lastRR > 0 { + d.params.RTCPWriter([]rtcp.Packet{&rtcp.ExtendedReport{ + SenderSSRC: d.ssrc, + Reports: []rtcp.ReportBlock{ + &rtcp.DLRRReportBlock{ + Reports: []rtcp.DLRRReport{{ + SSRC: p.SenderSSRC, + LastRR: lastRR, + DLRR: 0, // no delay + }}, + }, + }, + }}) + } } }