Support XR request/response for rtt calculation (#2536)

* Support XR request/response for rtt calculation

* Update pkg/sfu/downtrack.go

Co-authored-by: David Zhao <dz@livekit.io>

---------

Co-authored-by: David Zhao <dz@livekit.io>
This commit is contained in:
cnderrauber
2024-03-03 12:34:49 +08:00
committed by GitHub
parent cbc1d1b1ce
commit b58307f144
8 changed files with 155 additions and 10 deletions

4
go.mod
View File

@@ -18,7 +18,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240228075855-6fbf3be6f6ef
github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8
github.com/livekit/protocol v1.10.1
github.com/livekit/psrpc v0.5.3-0.20240227154351-b7f99eaaf7b3
github.com/mackerelio/go-osstat v0.2.4
@@ -29,7 +29,7 @@ require (
github.com/pion/dtls/v2 v2.2.10
github.com/pion/ice/v2 v2.3.14
github.com/pion/interceptor v0.1.25
github.com/pion/rtcp v1.2.13
github.com/pion/rtcp v1.2.14
github.com/pion/rtp v1.8.3
github.com/pion/sctp v1.8.12
github.com/pion/sdp/v3 v3.0.6

8
go.sum
View File

@@ -128,8 +128,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240228075855-6fbf3be6f6ef h1:Db/UItb+Cvm1trBRJiEZOdRSyss+LDY4e8gU9aE4GRc=
github.com/livekit/mediatransportutil v0.0.0-20240228075855-6fbf3be6f6ef/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc=
github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8 h1:xawydPEACNO5Ncs2LgioTjWghXQ0eUN1q1RnVUUyVnI=
github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.10.1 h1:upe6pKRqH8wpsMuR2OLtgizEm94iia3pDYm3O4/2PRY=
github.com/livekit/protocol v1.10.1/go.mod h1:eWPz45pnxwpCwB84qqhHxG0bCRgasa2itN6GAHCDddc=
github.com/livekit/psrpc v0.5.3-0.20240227154351-b7f99eaaf7b3 h1:bvjzDR+Rvdf3JgzQMtLiGVHBQ8KoOWM7x7sHj79jevQ=
@@ -200,8 +200,8 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I=
github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
github.com/pion/rtcp v1.2.13 h1:+EQijuisKwm/8VBs8nWllr0bIndR7Lf7cZG200mpbNo=
github.com/pion/rtcp v1.2.13/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
github.com/pion/rtcp v1.2.14 h1:KCkGV3vJ+4DAJmvP0vaQShsb0xkRfWkO540Gy102KyE=
github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
github.com/pion/rtp v1.8.2/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.3 h1:VEHxqzSVQxCkKDSHro5/4IUUG1ea+MFdqR2R3xSpNU8=
github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=

View File

@@ -16,8 +16,10 @@ package rtc
import (
"context"
"math"
"strings"
"sync"
"time"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
@@ -32,6 +34,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
"github.com/livekit/livekit-server/pkg/telemetry"
util "github.com/livekit/mediatransportutil"
)
// MediaTrack represents a WebRTC track that needs to be forwarded
@@ -47,6 +50,8 @@ type MediaTrack struct {
dynacastManager *DynacastManager
lock sync.RWMutex
rttFromXR atomic.Bool
}
type MediaTrackParams struct {
@@ -183,12 +188,14 @@ func (t *MediaTrack) UpdateCodecCid(codecs []*livekit.SimulcastCodec) {
// AddReceiver adds a new RTP receiver to the track, returns true when receiver represents a new codec
func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, mid string) bool {
var newCodec bool
buff, rtcpReader := t.params.BufferFactory.GetBufferPair(uint32(track.SSRC()))
ssrc := uint32(track.SSRC())
buff, rtcpReader := t.params.BufferFactory.GetBufferPair(ssrc)
if buff == nil || rtcpReader == nil {
t.params.Logger.Errorw("could not retrieve buffer pair", nil)
return newCodec
}
var lastRR uint32
rtcpReader.OnPacket(func(bytes []byte) {
pkts, err := rtcp.Unmarshal(bytes)
if err != nil {
@@ -202,6 +209,25 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
// do nothing for now
case *rtcp.SenderReport:
buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime)
case *rtcp.ExtendedReport:
rttFromXR:
for _, report := range pkt.Reports {
if rr, ok := report.(*rtcp.DLRRReportBlock); ok {
for _, dlrrReport := range rr.Reports {
if dlrrReport.LastRR <= lastRR {
continue
}
nowNTP := util.ToNtpTime(time.Now())
nowNTP32 := uint32(nowNTP >> 16)
ntpDiff := nowNTP32 - dlrrReport.LastRR - dlrrReport.DLRR
rtt := uint32(math.Ceil(float64(ntpDiff) * 1000.0 / 65536.0))
buff.SetRTT(rtt)
t.rttFromXR.Store(true)
lastRR = dlrrReport.LastRR
break rttFromXR
}
}
}
}
}
})
@@ -359,7 +385,9 @@ func (t *MediaTrack) GetConnectionScoreAndQuality() (float32, livekit.Connection
}
func (t *MediaTrack) SetRTT(rtt uint32) {
t.MediaTrackReceiver.SetRTT(rtt)
if !t.rttFromXR.Load() {
t.MediaTrackReceiver.SetRTT(rtt)
}
}
func (t *MediaTrack) HasPendingCodec() bool {

View File

@@ -138,6 +138,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
Pacer: sub.GetPacer(),
Trailer: trailer,
Logger: LoggerWithTrack(sub.GetLogger().WithComponent(sutils.ComponentSub), trackID, t.params.IsRelayed),
RTCPWriter: sub.WriteSubscriberRTCP,
})
if err != nil {
return nil, err

View File

@@ -38,13 +38,14 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc/transport"
"github.com/livekit/livekit-server/pkg/rtc/types"
lkinterceptor "github.com/livekit/livekit-server/pkg/sfu/interceptor"
sfuinterceptor "github.com/livekit/livekit-server/pkg/sfu/interceptor"
"github.com/livekit/livekit-server/pkg/sfu/pacer"
"github.com/livekit/livekit-server/pkg/sfu/rtpextension"
"github.com/livekit/livekit-server/pkg/sfu/streamallocator"
sfuutils "github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
sutils "github.com/livekit/livekit-server/pkg/utils"
lkinterceptor "github.com/livekit/mediatransportutil/pkg/interceptor"
lktwcc "github.com/livekit/mediatransportutil/pkg/twcc"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -322,6 +323,11 @@ func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimat
}
}
}
} else {
// sfu only use interceptor to send XR but don't read response from it (use buffer instead),
// so use a empty callback here
ir.Add(lkinterceptor.NewRTTFromXRFactory(func(rtt uint32) {
}))
}
if len(params.SimTracks) > 0 {
f, err := NewUnhandleSimulcastInterceptorFactory(UnhandleSimulcastTracks(params.SimTracks))
@@ -361,7 +367,7 @@ func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimat
}
}
// put rtx interceptor behind unhandle simulcast interceptor so it can get the correct mid & rid
ir.Add(lkinterceptor.NewRTXInfoExtractorFactory(setTWCCForVideo, func(repair, base uint32) {
ir.Add(sfuinterceptor.NewRTXInfoExtractorFactory(setTWCCForVideo, func(repair, base uint32) {
params.Logger.Debugw("rtx pair found from extension", "repair", repair, "base", base)
params.Config.BufferFactory.SetRTXPair(repair, base)
}, params.Logger))

View File

@@ -344,6 +344,8 @@ type LocalParticipant interface {
AddTransceiverFromTrackToSubscriber(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)
RemoveTrackFromSubscriber(sender *webrtc.RTPSender) error
WriteSubscriberRTCP(pkts []rtcp.Packet) error
// subscriptions
SubscribeToTrack(trackID livekit.TrackID)
UnsubscribeFromTrack(trackID livekit.TrackID)

View File

@@ -979,6 +979,17 @@ type FakeLocalParticipant struct {
waitUntilSubscribedReturnsOnCall map[int]struct {
result1 error
}
WriteSubscriberRTCPStub func([]rtcp.Packet) error
writeSubscriberRTCPMutex sync.RWMutex
writeSubscriberRTCPArgsForCall []struct {
arg1 []rtcp.Packet
}
writeSubscriberRTCPReturns struct {
result1 error
}
writeSubscriberRTCPReturnsOnCall map[int]struct {
result1 error
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
@@ -6207,6 +6218,72 @@ func (fake *FakeLocalParticipant) WaitUntilSubscribedReturnsOnCall(i int, result
}{result1}
}
func (fake *FakeLocalParticipant) WriteSubscriberRTCP(arg1 []rtcp.Packet) error {
var arg1Copy []rtcp.Packet
if arg1 != nil {
arg1Copy = make([]rtcp.Packet, len(arg1))
copy(arg1Copy, arg1)
}
fake.writeSubscriberRTCPMutex.Lock()
ret, specificReturn := fake.writeSubscriberRTCPReturnsOnCall[len(fake.writeSubscriberRTCPArgsForCall)]
fake.writeSubscriberRTCPArgsForCall = append(fake.writeSubscriberRTCPArgsForCall, struct {
arg1 []rtcp.Packet
}{arg1Copy})
stub := fake.WriteSubscriberRTCPStub
fakeReturns := fake.writeSubscriberRTCPReturns
fake.recordInvocation("WriteSubscriberRTCP", []interface{}{arg1Copy})
fake.writeSubscriberRTCPMutex.Unlock()
if stub != nil {
return stub(arg1)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) WriteSubscriberRTCPCallCount() int {
fake.writeSubscriberRTCPMutex.RLock()
defer fake.writeSubscriberRTCPMutex.RUnlock()
return len(fake.writeSubscriberRTCPArgsForCall)
}
func (fake *FakeLocalParticipant) WriteSubscriberRTCPCalls(stub func([]rtcp.Packet) error) {
fake.writeSubscriberRTCPMutex.Lock()
defer fake.writeSubscriberRTCPMutex.Unlock()
fake.WriteSubscriberRTCPStub = stub
}
func (fake *FakeLocalParticipant) WriteSubscriberRTCPArgsForCall(i int) []rtcp.Packet {
fake.writeSubscriberRTCPMutex.RLock()
defer fake.writeSubscriberRTCPMutex.RUnlock()
argsForCall := fake.writeSubscriberRTCPArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) WriteSubscriberRTCPReturns(result1 error) {
fake.writeSubscriberRTCPMutex.Lock()
defer fake.writeSubscriberRTCPMutex.Unlock()
fake.WriteSubscriberRTCPStub = nil
fake.writeSubscriberRTCPReturns = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) WriteSubscriberRTCPReturnsOnCall(i int, result1 error) {
fake.writeSubscriberRTCPMutex.Lock()
defer fake.writeSubscriberRTCPMutex.Unlock()
fake.WriteSubscriberRTCPStub = nil
if fake.writeSubscriberRTCPReturnsOnCall == nil {
fake.writeSubscriberRTCPReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.writeSubscriberRTCPReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
@@ -6430,6 +6507,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.verifySubscribeParticipantInfoMutex.RUnlock()
fake.waitUntilSubscribedMutex.RLock()
defer fake.waitUntilSubscribedMutex.RUnlock()
fake.writeSubscriberRTCPMutex.RLock()
defer fake.writeSubscriberRTCPMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value

View File

@@ -206,6 +206,7 @@ type DowntrackParams struct {
Pacer pacer.Pacer
Logger logger.Logger
Trailer []byte
RTCPWriter func([]rtcp.Packet) error
}
// DownTrack implements TrackLocal, is the track used to write packets
@@ -1567,6 +1568,34 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
sal.OnTransportCCFeedback(d, p)
}
}
case *rtcp.ExtendedReport:
// SFU only responds with the DLRRReport for the track has the sender SSRC, the behavior is different with
// browser's implementation, which includes all sent tracks. It is ok since all the tracks
// use the same connection, and server-sdk-go can get the rtt from the first DLRRReport
// (libwebrtc/browsers don't send XR to calculate rtt, it only responds)
var lastRR uint32
for _, report := range p.Reports {
if rr, ok := report.(*rtcp.ReceiverReferenceTimeReportBlock); ok {
lastRR = uint32(rr.NTPTimestamp >> 16)
break
}
}
if lastRR > 0 {
d.params.RTCPWriter([]rtcp.Packet{&rtcp.ExtendedReport{
SenderSSRC: d.ssrc,
Reports: []rtcp.ReportBlock{
&rtcp.DLRRReportBlock{
Reports: []rtcp.DLRRReport{{
SSRC: p.SenderSSRC,
LastRR: lastRR,
DLRR: 0, // no delay
}},
},
},
}})
}
}
}