From b678ccdd66c4dfa3f256ad0da2b5f340ae5ea176 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 10 Sep 2024 20:50:50 +0530 Subject: [PATCH] Cache RTCP sender report in forwarder state. (#2994) * Cache RTCP sender report in forwarder state. To be used in migration. TODO: need to check more places to operate pure in unix nano rather than converting. * match name --- go.mod | 10 +- go.sum | 20 ++-- pkg/sfu/buffer/buffer.go | 17 +-- pkg/sfu/buffer/rtpstats_base.go | 170 +++++++++++++++------------- pkg/sfu/buffer/rtpstats_receiver.go | 90 ++++++++------- pkg/sfu/buffer/rtpstats_sender.go | 52 +++++---- pkg/sfu/downtrack.go | 8 +- pkg/sfu/forwarder.go | 129 +++++++++++---------- 8 files changed, 262 insertions(+), 234 deletions(-) diff --git a/go.mod b/go.mod index 3c3801f19..10323e327 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 - github.com/livekit/protocol v1.21.1-0.20240906182921-f13661b200eb + github.com/livekit/protocol v1.21.1-0.20240910150033-0614c86dc888 github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 @@ -50,7 +50,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 golang.org/x/sync v0.8.0 google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 @@ -131,12 +131,12 @@ require ( go.uber.org/zap/exp v0.2.0 // indirect golang.org/x/crypto v0.27.0 // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/net v0.28.0 // indirect + golang.org/x/net v0.29.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect - golang.org/x/tools v0.24.0 // indirect + golang.org/x/tools v0.25.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect - google.golang.org/grpc v1.66.0 // indirect + google.golang.org/grpc v1.66.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 8557f7005..3597ef4e2 100644 --- a/go.sum +++ b/go.sum @@ -169,8 +169,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.21.1-0.20240906182921-f13661b200eb h1:NHzzBd2qiu855Sgu/0LRah76pN+JFP5t7aoA/8n0cdw= -github.com/livekit/protocol v1.21.1-0.20240906182921-f13661b200eb/go.mod h1:AFuwk3+uIWFeO5ohKjx5w606Djl940+wktaZ441VoCI= +github.com/livekit/protocol v1.21.1-0.20240910150033-0614c86dc888 h1:976vtgdLdIaeoTMq9ABiNPr8pnelxFh9twTku2cd8tQ= +github.com/livekit/protocol v1.21.1-0.20240910150033-0614c86dc888/go.mod h1:AFuwk3+uIWFeO5ohKjx5w606Djl940+wktaZ441VoCI= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= @@ -366,8 +366,8 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= -golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e h1:I88y4caeGeuDQxgdoFPUq097j7kNfw6uvuiNxUBfcBk= -golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -400,8 +400,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= -golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= -golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -478,8 +478,8 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= -golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= -golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= +golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE= +golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -488,8 +488,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 h1: google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117/go.mod h1:OimBR/bc1wPO9iV4NC2bpyjy3VnAwZh5EBPQdtaE5oo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= -google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= -google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/grpc v1.66.1 h1:hO5qAXR19+/Z44hmvIM4dQFMSYX9XcWsByfoxutBpAM= +google.golang.org/grpc v1.66.1/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 706478ba4..f467ac627 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -35,7 +35,6 @@ import ( dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" "github.com/livekit/livekit-server/pkg/sfu/utils" sutils "github.com/livekit/livekit-server/pkg/utils" - "github.com/livekit/mediatransportutil" "github.com/livekit/mediatransportutil/pkg/bucket" "github.com/livekit/mediatransportutil/pkg/nack" "github.com/livekit/mediatransportutil/pkg/twcc" @@ -935,12 +934,12 @@ func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport { func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64, packets uint32, octets uint32) { b.RLock() - srData := &RTCPSenderReportData{ - RTPTimestamp: rtpTime, - NTPTimestamp: mediatransportutil.NtpTime(ntpTime), - At: b.getMonotonicNow(), + srData := &livekit.RTCPSenderReportState{ + RtpTimestamp: rtpTime, + NtpTimestamp: ntpTime, + At: b.getMonotonicNowUnixNano(), Packets: packets, - Octets: octets, + Octets: uint64(octets), } didSet := false @@ -956,7 +955,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64, packets uin } } -func (b *Buffer) GetSenderReportData() *RTCPSenderReportData { +func (b *Buffer) GetSenderReportData() *livekit.RTCPSenderReportState { b.RLock() defer b.RUnlock() @@ -1126,10 +1125,6 @@ func (b *Buffer) getMonotonicNowUnixNano() int64 { return b.baseTime.Add(time.Since(b.baseTime)).UnixNano() } -func (b *Buffer) getMonotonicNow() time.Time { - return b.baseTime.Add(time.Since(b.baseTime)) -} - // --------------------------------------------------------------- // SVC-TODO: Have to use more conditions to differentiate between diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 07f1e0b0d..6142f10c7 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -21,6 +21,7 @@ import ( "time" "go.uber.org/zap/zapcore" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "github.com/livekit/mediatransportutil" @@ -112,53 +113,56 @@ type snapshot struct { // ------------------------------------------------------------------ -type RTCPSenderReportData struct { - RTPTimestamp uint32 - RTPTimestampExt uint64 - NTPTimestamp mediatransportutil.NtpTime - At time.Time - AtAdjusted time.Time - Packets uint32 - Octets uint32 +type wrappedRTPDriftLogger struct { + *livekit.RTPDrift } -func (r *RTCPSenderReportData) PropagationDelay(passThrough bool) time.Duration { +func (w wrappedRTPDriftLogger) MarshalLogObject(e zapcore.ObjectEncoder) error { + rd := w.RTPDrift + if rd == nil { + return nil + } + + e.AddTime("StartTime", rd.StartTime.AsTime()) + e.AddTime("EndTime", rd.EndTime.AsTime()) + e.AddFloat64("Duration", rd.Duration) + e.AddUint64("StartTimestamp", rd.StartTimestamp) + e.AddUint64("EndTimestamp", rd.EndTimestamp) + e.AddUint64("RtpClockTicks", rd.RtpClockTicks) + e.AddInt64("DriftSamples", rd.DriftSamples) + e.AddFloat64("DriftMs", rd.DriftMs) + e.AddFloat64("ClockRate", rd.ClockRate) + return nil +} + +// ------------------------------------------------------------------ + +type WrappedRTCPSenderReportStateLogger struct { + *livekit.RTCPSenderReportState +} + +func (w WrappedRTCPSenderReportStateLogger) MarshalLogObject(e zapcore.ObjectEncoder) error { + rsrs := w.RTCPSenderReportState + if rsrs == nil { + return nil + } + + e.AddUint32("RtpTimestamp", rsrs.RtpTimestamp) + e.AddUint64("RtpTimestampExt", rsrs.RtpTimestampExt) + e.AddTime("NtpTimestamp", mediatransportutil.NtpTime(rsrs.NtpTimestamp).Time()) + e.AddTime("At", time.Unix(0, rsrs.At)) + e.AddTime("AtAdjusted", time.Unix(0, rsrs.AtAdjusted)) + e.AddUint32("Packets", rsrs.Packets) + e.AddUint64("Octets", rsrs.Octets) + return nil +} + +func RTCPSenderReportPropagationDelay(rsrs *livekit.RTCPSenderReportState, passThrough bool) time.Duration { if passThrough { return 0 } - return r.AtAdjusted.Sub(r.NTPTimestamp.Time()) -} - -func (r *RTCPSenderReportData) ToString() string { - if r == nil { - return "" - } - - return fmt.Sprintf("ntp: %s, rtp: %d, extRtp: %d, at: %s, atAdj: %s, p: %d, o: %d", - r.NTPTimestamp.Time().String(), - r.RTPTimestamp, - r.RTPTimestampExt, - r.At.String(), - r.AtAdjusted.String(), - r.Packets, - r.Octets, - ) -} - -func (r *RTCPSenderReportData) MarshalLogObject(e zapcore.ObjectEncoder) error { - if r == nil { - return nil - } - - e.AddTime("NTPTimestamp", r.NTPTimestamp.Time()) - e.AddUint32("RTPTimestamp", r.RTPTimestamp) - e.AddUint64("RTPTimestampExt", r.RTPTimestampExt) - e.AddTime("At", r.At) - e.AddTime("AtAdjusted", r.AtAdjusted) - e.AddUint32("Packets", r.Packets) - e.AddUint32("Octets", r.Octets) - return nil + return time.Unix(0, rsrs.AtAdjusted).Sub(mediatransportutil.NtpTime(rsrs.NtpTimestamp).Time()) } // ------------------------------------------------------------------ @@ -226,8 +230,8 @@ type rtpStatsBase struct { rtt uint32 maxRtt uint32 - srFirst *RTCPSenderReportData - srNewest *RTCPSenderReportData + srFirst *livekit.RTCPSenderReportState + srNewest *livekit.RTCPSenderReportState nextSnapshotID uint32 snapshots []snapshot @@ -298,18 +302,8 @@ func (r *rtpStatsBase) seed(from *rtpStatsBase) bool { r.rtt = from.rtt r.maxRtt = from.maxRtt - if from.srFirst != nil { - srFirst := *from.srFirst - r.srFirst = &srFirst - } else { - r.srFirst = nil - } - if from.srNewest != nil { - srNewest := *from.srNewest - r.srNewest = &srNewest - } else { - r.srNewest = nil - } + r.srFirst = proto.Clone(from.srFirst).(*livekit.RTCPSenderReportState) + r.srNewest = proto.Clone(from.srNewest).(*livekit.RTCPSenderReportState) r.nextSnapshotID = from.nextSnapshotID r.snapshots = make([]snapshot, cap(from.snapshots)) @@ -510,7 +504,7 @@ func (r *rtpStatsBase) GetRtt() uint32 { return r.rtt } -func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, tsOffset uint64, extStartTS uint64) (err error, loggingFields []interface{}) { +func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderReportState, tsOffset uint64, extStartTS uint64) (err error, loggingFields []interface{}) { if time.Since(r.startTime) > cFirstPacketTimeAdjustWindow { return } @@ -521,8 +515,8 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, // abnormal delay (maybe due to pacing or maybe due to queuing // in some network element along the way), push back first time // to an earlier instance. - timeSinceReceive := time.Since(srData.AtAdjusted) - extNowTS := srData.RTPTimestampExt - tsOffset + uint64(timeSinceReceive.Nanoseconds()*int64(r.params.ClockRate)/1e9) + timeSinceReceive := time.Since(time.Unix(0, srData.AtAdjusted)) + extNowTS := srData.RtpTimestampExt - tsOffset + uint64(timeSinceReceive.Nanoseconds()*int64(r.params.ClockRate)/1e9) samplesDiff := int64(extNowTS - extStartTS) if samplesDiff < 0 { // out-of-order, skip @@ -543,7 +537,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, "adjustment", time.Duration(r.firstTime - firstTime).String(), "extNowTS", extNowTS, "extStartTS", extStartTS, - "srData", srData, + "srData", WrappedRTCPSenderReportStateLogger{srData}, "tsOffset", tsOffset, "timeSinceReceive", timeSinceReceive.String(), "timeSinceFirst", timeSinceFirst.String(), @@ -730,8 +724,8 @@ func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddUint32("rtt", r.rtt) e.AddUint32("maxRtt", r.maxRtt) - e.AddObject("srFirst", r.srFirst) - e.AddObject("srNewest", r.srNewest) + e.AddObject("srFirst", WrappedRTCPSenderReportStateLogger{r.srFirst}) + e.AddObject("srNewest", WrappedRTCPSenderReportStateLogger{r.srNewest}) return nil } @@ -800,7 +794,12 @@ func (r *rtpStatsBase) toString( str += ", rtt(ms):" str += fmt.Sprintf("%d|%d", p.RttCurrent, p.RttMax) - str += fmt.Sprintf(", pd: %s, nrd: %s, rrd: %s", RTPDriftToString(p.PacketDrift), RTPDriftToString(p.ReportDrift), RTPDriftToString(p.RebasedReportDrift)) + str += fmt.Sprintf(", pd: %s, nrd: %s, rxrd: %s, rbrd: %s", + RTPDriftToString(p.PacketDrift), + RTPDriftToString(p.NtpReportDrift), + RTPDriftToString(p.ReceivedReportDrift), + RTPDriftToString(p.RebasedReportDrift), + ) return str } @@ -841,7 +840,7 @@ func (r *rtpStatsBase) toProto( jitterTime := jitter / float64(r.params.ClockRate) * 1e6 maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 - packetDrift, ntpReportDrift, rebasedReportDrift := r.getDrift(extStartTS, extHighestTS) + packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(extStartTS, extHighestTS) p := &livekit.RTPStats{ StartTime: timestamppb.New(r.startTime), @@ -885,8 +884,9 @@ func (r *rtpStatsBase) toProto( RttCurrent: r.rtt, RttMax: r.maxRtt, PacketDrift: packetDrift, - ReportDrift: ntpReportDrift, + NtpReportDrift: ntpReportDrift, RebasedReportDrift: rebasedReportDrift, + ReceivedReportDrift: receivedReportDrift, } gapsPresent := false @@ -968,7 +968,7 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64, return &then, &now } -func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *livekit.RTPDrift, ntpReportDrift *livekit.RTPDrift, rebasedReportDrift *livekit.RTPDrift) { +func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *livekit.RTPDrift, ntpReportDrift *livekit.RTPDrift, receivedReportDrift *livekit.RTPDrift, rebasedReportDrift *livekit.RTPDrift) { if r.firstTime != 0 { elapsed := r.highestTime - r.firstTime rtpClockTicks := extHighestTS - extStartTS @@ -989,18 +989,18 @@ func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *l } } - if r.srFirst != nil && r.srNewest != nil && r.srFirst.RTPTimestamp != r.srNewest.RTPTimestamp { - rtpClockTicks := r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt + if r.srFirst != nil && r.srNewest != nil && r.srFirst.RtpTimestamp != r.srNewest.RtpTimestamp { + rtpClockTicks := r.srNewest.RtpTimestampExt - r.srFirst.RtpTimestampExt - elapsed := r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()) + elapsed := mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time().Sub(mediatransportutil.NtpTime(r.srFirst.NtpTimestamp).Time()) if elapsed.Seconds() > 0.0 { driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9)) ntpReportDrift = &livekit.RTPDrift{ - StartTime: timestamppb.New(r.srFirst.NTPTimestamp.Time()), - EndTime: timestamppb.New(r.srNewest.NTPTimestamp.Time()), + StartTime: timestamppb.New(mediatransportutil.NtpTime(r.srFirst.NtpTimestamp).Time()), + EndTime: timestamppb.New(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time()), Duration: elapsed.Seconds(), - StartTimestamp: r.srFirst.RTPTimestampExt, - EndTimestamp: r.srNewest.RTPTimestampExt, + StartTimestamp: r.srFirst.RtpTimestampExt, + EndTimestamp: r.srNewest.RtpTimestampExt, RtpClockTicks: rtpClockTicks, DriftSamples: driftSamples, DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate), @@ -1008,15 +1008,31 @@ func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *l } } - elapsed = r.srNewest.AtAdjusted.Sub(r.srFirst.AtAdjusted) + elapsed = time.Duration(r.srNewest.At - r.srFirst.At) + if elapsed.Seconds() > 0.0 { + driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9)) + receivedReportDrift = &livekit.RTPDrift{ + StartTime: timestamppb.New(time.Unix(0, r.srFirst.At)), + EndTime: timestamppb.New(time.Unix(0, r.srNewest.At)), + Duration: elapsed.Seconds(), + StartTimestamp: r.srFirst.RtpTimestampExt, + EndTimestamp: r.srNewest.RtpTimestampExt, + RtpClockTicks: rtpClockTicks, + DriftSamples: driftSamples, + DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate), + ClockRate: float64(rtpClockTicks) / elapsed.Seconds(), + } + } + + elapsed = time.Duration(r.srNewest.AtAdjusted - r.srFirst.AtAdjusted) if elapsed.Seconds() > 0.0 { driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9)) rebasedReportDrift = &livekit.RTPDrift{ - StartTime: timestamppb.New(r.srFirst.AtAdjusted), - EndTime: timestamppb.New(r.srNewest.AtAdjusted), + StartTime: timestamppb.New(time.Unix(0, r.srFirst.AtAdjusted)), + EndTime: timestamppb.New(time.Unix(0, r.srNewest.AtAdjusted)), Duration: elapsed.Seconds(), - StartTimestamp: r.srFirst.RTPTimestampExt, - EndTimestamp: r.srNewest.RTPTimestampExt, + StartTimestamp: r.srFirst.RtpTimestampExt, + EndTimestamp: r.srNewest.RtpTimestampExt, RtpClockTicks: rtpClockTicks, DriftSamples: driftSamples, DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate), diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 71360ea8e..81d3f1178 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -21,8 +21,10 @@ import ( "github.com/pion/rtcp" "go.uber.org/zap/zapcore" + "google.golang.org/protobuf/proto" "github.com/livekit/livekit-server/pkg/sfu/utils" + "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" protoutils "github.com/livekit/protocol/utils" ) @@ -372,23 +374,23 @@ func (r *RTPStatsReceiver) Update( return } -func (r *RTPStatsReceiver) getExtendedSenderReport(srData *RTCPSenderReportData) *RTCPSenderReportData { +func (r *RTPStatsReceiver) getExtendedSenderReport(srData *livekit.RTCPSenderReportState) *livekit.RTCPSenderReportState { tsCycles := uint64(0) if r.srNewest != nil { // use time since last sender report to ensure long gaps where the time stamp might // jump more than half the range - timeSinceLastReport := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()) - expectedRTPTimestampExt := r.srNewest.RTPTimestampExt + uint64(timeSinceLastReport.Nanoseconds()*int64(r.params.ClockRate)/1e9) + timeSinceLastReport := mediatransportutil.NtpTime(srData.NtpTimestamp).Time().Sub(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time()) + expectedRTPTimestampExt := r.srNewest.RtpTimestampExt + uint64(timeSinceLastReport.Nanoseconds()*int64(r.params.ClockRate)/1e9) lbound := expectedRTPTimestampExt - uint64(cReportSlack*float64(r.params.ClockRate)) ubound := expectedRTPTimestampExt + uint64(cReportSlack*float64(r.params.ClockRate)) - isInRange := (srData.RTPTimestamp-uint32(lbound) < (1 << 31)) && (uint32(ubound)-srData.RTPTimestamp < (1 << 31)) + isInRange := (srData.RtpTimestamp-uint32(lbound) < (1 << 31)) && (uint32(ubound)-srData.RtpTimestamp < (1 << 31)) if isInRange { lbTSCycles := lbound & 0xFFFF_FFFF_0000_0000 ubTSCycles := ubound & 0xFFFF_FFFF_0000_0000 if lbTSCycles == ubTSCycles { tsCycles = lbTSCycles } else { - if srData.RTPTimestamp < (1 << 31) { + if srData.RtpTimestamp < (1 << 31) { // rolled over tsCycles = ubTSCycles } else { @@ -398,26 +400,26 @@ func (r *RTPStatsReceiver) getExtendedSenderReport(srData *RTCPSenderReportData) } else { // ideally this method should not be required, but there are clients // negotiating one clock rate, but actually send media at a different rate. - tsCycles = r.srNewest.RTPTimestampExt & 0xFFFF_FFFF_0000_0000 - if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp { + tsCycles = r.srNewest.RtpTimestampExt & 0xFFFF_FFFF_0000_0000 + if (srData.RtpTimestamp-r.srNewest.RtpTimestamp) < (1<<31) && srData.RtpTimestamp < r.srNewest.RtpTimestamp { tsCycles += (1 << 32) } if tsCycles >= (1 << 32) { - if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) >= (1<<31) && srData.RTPTimestamp > r.srNewest.RTPTimestamp { + if (srData.RtpTimestamp-r.srNewest.RtpTimestamp) >= (1<<31) && srData.RtpTimestamp > r.srNewest.RtpTimestamp { tsCycles -= (1 << 32) } } } } - srDataExt := *srData - srDataExt.RTPTimestampExt = uint64(srDataExt.RTPTimestamp) + tsCycles - return &srDataExt + srDataExt := proto.Clone(srData).(*livekit.RTCPSenderReportState) + srDataExt.RtpTimestampExt = uint64(srDataExt.RtpTimestamp) + tsCycles + return srDataExt } -func (r *RTPStatsReceiver) checkOutOfOrderSenderReport(srData *RTCPSenderReportData) bool { - if r.srNewest != nil && srData.RTPTimestampExt < r.srNewest.RTPTimestampExt { +func (r *RTPStatsReceiver) checkOutOfOrderSenderReport(srData *livekit.RTCPSenderReportState) bool { + if r.srNewest != nil && srData.RtpTimestampExt < r.srNewest.RtpTimestampExt { // This can happen when a track is replaced with a null and then restored - // i. e. muting replacing with null and unmute restoring the original track. // Or it could be due bad report generation. @@ -426,7 +428,7 @@ func (r *RTPStatsReceiver) checkOutOfOrderSenderReport(srData *RTCPSenderReportD if (r.outOfOrderSenderReportCount-1)%10 == 0 { r.logger.Infow( "received sender report, out-of-order, skipping", - "current", srData, + "current", WrappedRTCPSenderReportStateLogger{srData}, "count", r.outOfOrderSenderReportCount, "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, ) @@ -437,17 +439,17 @@ func (r *RTPStatsReceiver) checkOutOfOrderSenderReport(srData *RTCPSenderReportD return false } -func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *RTCPSenderReportData) { +func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *livekit.RTCPSenderReportState) { if r.srNewest == nil { return } - timeSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()).Seconds() - rtpDiffSinceLast := srData.RTPTimestampExt - r.srNewest.RTPTimestampExt + timeSinceLast := time.Duration(srData.NtpTimestamp - r.srNewest.NtpTimestamp).Seconds() + rtpDiffSinceLast := srData.RtpTimestampExt - r.srNewest.RtpTimestampExt calculatedClockRateFromLast := float64(rtpDiffSinceLast) / timeSinceLast - timeSinceFirst := srData.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()).Seconds() - rtpDiffSinceFirst := srData.RTPTimestampExt - r.srFirst.RTPTimestampExt + timeSinceFirst := time.Duration(srData.NtpTimestamp - r.srFirst.NtpTimestamp).Seconds() + rtpDiffSinceFirst := srData.RtpTimestampExt - r.srFirst.RtpTimestampExt calculatedClockRateFromFirst := float64(rtpDiffSinceFirst) / timeSinceFirst if (timeSinceLast > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate)) || @@ -456,7 +458,7 @@ func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *RTCPSenderRe if (r.clockSkewCount-1)%100 == 0 { r.logger.Infow( "received sender report, clock skew", - "current", srData, + "current", WrappedRTCPSenderReportStateLogger{srData}, "timeSinceFirst", timeSinceFirst, "rtpDiffSinceFirst", rtpDiffSinceFirst, "calculatedFirst", calculatedClockRateFromFirst, @@ -470,13 +472,13 @@ func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *RTCPSenderRe } } -func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srData *RTCPSenderReportData) { +func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srData *livekit.RTCPSenderReportState) { if r.highestTime == 0 { return } - timeSinceSR := time.Since(srData.AtAdjusted) - extNowTSSR := srData.RTPTimestampExt + uint64(timeSinceSR.Nanoseconds()*int64(r.params.ClockRate)/1e9) + timeSinceSR := time.Since(time.Unix(0, srData.AtAdjusted)) + extNowTSSR := srData.RtpTimestampExt + uint64(timeSinceSR.Nanoseconds()*int64(r.params.ClockRate)/1e9) timeSinceHighest := time.Since(time.Unix(0, r.highestTime)) extNowTSHighest := r.timestamp.GetExtendedHighest() + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9) @@ -492,7 +494,7 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa if (r.clockSkewMediaPathCount-1)%100 == 0 { r.logger.Infow( "received sender report, clock skew against media path", - "current", srData, + "current", WrappedRTCPSenderReportStateLogger{srData}, "timeSinceSR", timeSinceSR, "extNowTSSR", extNowTSSR, "timeSinceHighest", timeSinceHighest, @@ -508,9 +510,9 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa } } -func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *RTCPSenderReportData) { - senderClockTime := srData.NTPTimestamp.Time() - estimatedPropagationDelay, stepChange := r.propagationDelayEstimator.Update(senderClockTime, srData.At) +func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *livekit.RTCPSenderReportState) { + senderClockTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time() + estimatedPropagationDelay, stepChange := r.propagationDelayEstimator.Update(senderClockTime, time.Unix(0, srData.At)) if stepChange { r.logger.Debugw( "propagation delay step change", @@ -523,11 +525,11 @@ func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *R r.srFirst = srData } // adjust receive time to estimated propagation delay - srData.AtAdjusted = senderClockTime.Add(estimatedPropagationDelay) + srData.AtAdjusted = senderClockTime.Add(estimatedPropagationDelay).UnixNano() r.srNewest = srData } -func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) bool { +func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *livekit.RTCPSenderReportState) bool { r.lock.Lock() defer r.lock.Unlock() @@ -536,7 +538,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) } // prevent against extreme case of anachronous sender reports - if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp { + if r.srNewest != nil && r.srNewest.NtpTimestamp > srData.NtpTimestamp { r.logger.Infow( "received sender report, anachronous, dropping", "current", srData, @@ -561,16 +563,11 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) return true } -func (r *RTPStatsReceiver) GetRtcpSenderReportData() *RTCPSenderReportData { +func (r *RTPStatsReceiver) GetRtcpSenderReportData() *livekit.RTCPSenderReportState { r.lock.RLock() defer r.lock.RUnlock() - if r.srNewest == nil { - return nil - } - - srNewestCopy := *r.srNewest - return &srNewestCopy + return proto.Clone(r.srNewest).(*livekit.RTCPSenderReportState) } func (r *RTPStatsReceiver) LastSenderReportTime() time.Time { @@ -578,7 +575,7 @@ func (r *RTPStatsReceiver) LastSenderReportTime() time.Time { defer r.lock.RUnlock() if r.srNewest != nil { - return r.srNewest.At + return time.Unix(0, r.srNewest.At) } return time.Time{} @@ -625,9 +622,9 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin lastSR := uint32(0) dlsr := uint32(0) if r.srNewest != nil { - lastSR = uint32(r.srNewest.NTPTimestamp >> 16) - if !r.srNewest.At.IsZero() { - delayUS := time.Since(r.srNewest.At).Microseconds() + lastSR = uint32(r.srNewest.NtpTimestamp >> 16) + if r.srNewest.At != 0 { + delayUS := time.Since(time.Unix(0, r.srNewest.At)).Microseconds() dlsr = uint32(delayUS * 65536 / 1e6) } } @@ -719,10 +716,17 @@ func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncod e.AddUint64("extStartSN", r.sequenceNumber.GetExtendedStart()) e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest()) - e.AddUint64("extStartTS", r.timestamp.GetExtendedStart()) - e.AddUint64("extHighestTS", r.timestamp.GetExtendedHighest()) + extStartTS, extHighestTS := r.timestamp.GetExtendedStart(), r.timestamp.GetExtendedHighest() + e.AddUint64("extStartTS", extStartTS) + e.AddUint64("extHighestTS", extHighestTS) e.AddObject("propagationDelayEstimator", r.propagationDelayEstimator) + + packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(extStartTS, extHighestTS) + e.AddObject("packetDrift", wrappedRTPDriftLogger{packetDrift}) + e.AddObject("ntpReportDrift", wrappedRTPDriftLogger{ntpReportDrift}) + e.AddObject("receivedReportDrift", wrappedRTPDriftLogger{receivedReportDrift}) + e.AddObject("rebasedReportDrift", wrappedRTPDriftLogger{rebasedReportDrift}) return nil } diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 588378f02..b46e976e9 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -483,7 +483,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt if r.srNewest != nil { var err error - rtt, err = mediatransportutil.GetRttMs(&rr, r.srNewest.NTPTimestamp, r.srNewest.At) + rtt, err = mediatransportutil.GetRttMs(&rr, mediatransportutil.NtpTime(r.srNewest.NtpTimestamp), time.Unix(0, r.srNewest.At)) if err == nil { isRttChanged = rtt != r.rtt } else { @@ -584,7 +584,7 @@ func (r *RTPStatsSender) LastReceiverReportTime() time.Time { return r.lastRRTime } -func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(publisherSRData *RTCPSenderReportData, tsOffset uint64) { +func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(publisherSRData *livekit.RTCPSenderReportState, tsOffset uint64) { r.lock.Lock() defer r.lock.Unlock() @@ -612,7 +612,7 @@ func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt ui return } -func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPSenderReportData, tsOffset uint64, passThrough bool) *rtcp.SenderReport { +func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livekit.RTCPSenderReportState, tsOffset uint64, passThrough bool) *rtcp.SenderReport { r.lock.Lock() defer r.lock.Unlock() @@ -620,26 +620,26 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS return nil } - timeSincePublisherSRAdjusted := time.Since(publisherSRData.AtAdjusted) - now := publisherSRData.AtAdjusted.Add(timeSincePublisherSRAdjusted) + timeSincePublisherSRAdjusted := time.Since(time.Unix(0, publisherSRData.AtAdjusted)) + now := publisherSRData.AtAdjusted + timeSincePublisherSRAdjusted.Nanoseconds() var ( nowNTP mediatransportutil.NtpTime nowRTPExt uint64 ) if passThrough { - nowNTP = publisherSRData.NTPTimestamp - nowRTPExt = publisherSRData.RTPTimestampExt - tsOffset + nowNTP = mediatransportutil.NtpTime(publisherSRData.NtpTimestamp) + nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset } else { - nowNTP = mediatransportutil.ToNtpTime(now) - nowRTPExt = publisherSRData.RTPTimestampExt - tsOffset + uint64(timeSincePublisherSRAdjusted.Nanoseconds()*int64(r.params.ClockRate)/1e9) + nowNTP = mediatransportutil.ToNtpTime(time.Unix(0, now)) + nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset + uint64(timeSincePublisherSRAdjusted.Nanoseconds()*int64(r.params.ClockRate)/1e9) } packetCount := uint32(r.getTotalPacketsPrimary(r.extStartSN, r.extHighestSN) + r.packetsDuplicate + r.packetsPadding) - octetCount := uint32(r.bytes + r.bytesDuplicate + r.bytesPadding) - srData := &RTCPSenderReportData{ - NTPTimestamp: nowNTP, - RTPTimestamp: uint32(nowRTPExt), - RTPTimestampExt: nowRTPExt, + octetCount := r.bytes + r.bytesDuplicate + r.bytesPadding + srData := &livekit.RTCPSenderReportState{ + NtpTimestamp: uint64(nowNTP), + RtpTimestamp: uint32(nowRTPExt), + RtpTimestampExt: nowRTPExt, At: now, AtAdjusted: now, Packets: packetCount, @@ -652,18 +652,18 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS "feed", publisherSRData, "tsOffset", tsOffset, "timeNow", time.Now().String(), - "now", now.String(), - "timeSinceHighest", now.Sub(time.Unix(0, r.highestTime)).String(), - "timeSinceFirst", now.Sub(time.Unix(0, r.firstTime)).String(), + "now", time.Unix(0, now).String(), + "timeSinceHighest", time.Unix(0, now).Sub(time.Unix(0, r.highestTime)).String(), + "timeSinceFirst", time.Unix(0, now).Sub(time.Unix(0, r.firstTime)).String(), "timeSincePublisherSRAdjusted", timeSincePublisherSRAdjusted.String(), - "timeSincePublisherSR", time.Since(publisherSRData.At).String(), + "timeSincePublisherSR", time.Since(time.Unix(0, publisherSRData.At)).String(), "nowRTPExt", nowRTPExt, "rtpStats", lockedRTPStatsSenderLogEncoder{r}, } } - if r.srNewest != nil && nowRTPExt >= r.srNewest.RTPTimestampExt { - timeSinceLastReport := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) - rtpDiffSinceLastReport := nowRTPExt - r.srNewest.RTPTimestampExt + if r.srNewest != nil && nowRTPExt >= r.srNewest.RtpTimestampExt { + timeSinceLastReport := nowNTP.Time().Sub(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time()) + rtpDiffSinceLastReport := nowRTPExt - r.srNewest.RtpTimestampExt windowClockRate := float64(rtpDiffSinceLastReport) / timeSinceLastReport.Seconds() if timeSinceLastReport.Seconds() > 0.2 && math.Abs(float64(r.params.ClockRate)-windowClockRate) > 0.2*float64(r.params.ClockRate) { r.clockSkewCount++ @@ -680,7 +680,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS } } - if r.srNewest != nil && nowRTPExt < r.srNewest.RTPTimestampExt { + if r.srNewest != nil && nowRTPExt < r.srNewest.RtpTimestampExt { // If report being generated is behind the last report, skip it. // Should not happen. r.logger.Infow("sending sender report, out-of-order, skipping", getFields()...) @@ -697,7 +697,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS NTPTime: uint64(nowNTP), RTPTime: uint32(nowRTPExt), PacketCount: packetCount, - OctetCount: octetCount, + OctetCount: uint32(octetCount), } } @@ -1025,5 +1025,11 @@ func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder e.AddUint64("packetsLostFromRR", r.packetsLostFromRR) e.AddFloat64("jitterFromRR", r.jitterFromRR) e.AddFloat64("maxJitterFromRR", r.maxJitterFromRR) + + packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(r.extStartTS, r.extHighestTS) + e.AddObject("packetDrift", wrappedRTPDriftLogger{packetDrift}) + e.AddObject("ntpReportDrift", wrappedRTPDriftLogger{ntpReportDrift}) + e.AddObject("receivedReportDrift", wrappedRTPDriftLogger{receivedReportDrift}) + e.AddObject("rebasedReportDrift", wrappedRTPDriftLogger{rebasedReportDrift}) return nil } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 06f2ab3a7..08edc1e5b 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -61,7 +61,7 @@ type TrackSender interface { payloadType webrtc.PayloadType, isSVC bool, layer int32, - publisherSRData *buffer.RTCPSenderReportData, + publisherSRData *livekit.RTCPSenderReportState, ) error Resync() } @@ -905,7 +905,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { _, _, refSenderReport := d.forwarder.GetSenderReportParams() if refSenderReport != nil { actExtCopy := *extPkt.AbsCaptureTimeExt - if err = actExtCopy.Rewrite(refSenderReport.PropagationDelay(!d.params.DisableSenderReportPassThrough)); err == nil { + if err = actExtCopy.Rewrite(buffer.RTCPSenderReportPropagationDelay(refSenderReport, !d.params.DisableSenderReportPassThrough)); err == nil { actBytes, err = actExtCopy.Marshal() if err == nil { extensions = append( @@ -2191,7 +2191,7 @@ func (d *DownTrack) HandleRTCPSenderReportData( _payloadType webrtc.PayloadType, isSVC bool, layer int32, - publisherSRData *buffer.RTCPSenderReportData, + publisherSRData *livekit.RTCPSenderReportState, ) error { d.forwarder.SetRefSenderReport(isSVC, layer, publisherSRData) @@ -2202,7 +2202,7 @@ func (d *DownTrack) HandleRTCPSenderReportData( return nil } -func (d *DownTrack) handleRTCPSenderReportData(publisherSRData *buffer.RTCPSenderReportData, tsOffset uint64) { +func (d *DownTrack) handleRTCPSenderReportData(publisherSRData *livekit.RTCPSenderReportState, tsOffset uint64) { d.rtpStats.MaybeAdjustFirstPacketTime(publisherSRData, tsOffset) } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 7f7f06d0f..4df4c91cc 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -26,7 +26,9 @@ import ( "github.com/pion/rtp" "github.com/pion/webrtc/v3" "go.uber.org/zap/zapcore" + "google.golang.org/protobuf/proto" + "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -188,13 +190,13 @@ type TranslationParams struct { // ------------------------------------------------------------------- type refInfo struct { - senderReport *buffer.RTCPSenderReportData + senderReport *livekit.RTCPSenderReportState tsOffset uint64 isTSOffsetValid bool } func (r refInfo) MarshalLogObject(e zapcore.ObjectEncoder) error { - e.AddObject("senderReport", r.senderReport) + e.AddObject("senderReport", buffer.WrappedRTCPSenderReportStateLogger{r.senderReport}) e.AddUint64("tsOffset", r.tsOffset) e.AddBool("isTSOffsetValid", r.isTSOffsetValid) return nil @@ -202,11 +204,11 @@ func (r refInfo) MarshalLogObject(e zapcore.ObjectEncoder) error { // ------------------------------------------------------------------- -type wrappedRefInfoLogger struct { +type wrappedRefInfosLogger struct { *Forwarder } -func (w wrappedRefInfoLogger) MarshalLogObject(e zapcore.ObjectEncoder) error { +func (w wrappedRefInfosLogger) MarshalLogObject(e zapcore.ObjectEncoder) error { for i, refInfo := range w.Forwarder.refInfos { e.AddObject(fmt.Sprintf("%d", i), refInfo) } @@ -399,6 +401,11 @@ func (f *Forwarder) GetState() *livekit.RTPForwarderState { Vp8Munger: vp8MungerState, } } + + state.SenderReportState = make([]*livekit.RTCPSenderReportState, len(f.refInfos)) + for layer, refInfo := range f.refInfos { + state.SenderReportState[layer] = proto.Clone(refInfo.senderReport).(*livekit.RTCPSenderReportState) + } return state } @@ -410,6 +417,12 @@ func (f *Forwarder) SeedState(state *livekit.RTPForwarderState) { f.lock.Lock() defer f.lock.Unlock() + for layer, rtcpSenderReportState := range state.SenderReportState { + f.refInfos[layer] = refInfo{ + senderReport: proto.Clone(rtcpSenderReportState).(*livekit.RTCPSenderReportState), + } + } + f.rtpMunger.SeedState(state.RtpMunger) f.codecMunger.SeedState(state.CodecMunger) @@ -570,6 +583,14 @@ func (f *Forwarder) GetMaxSubscribedSpatial() int32 { if layer < f.vls.GetCurrent().Spatial { layer = f.vls.GetCurrent().Spatial } + + // if reference layer is higher, hold there until an RTCP Sender Report from + // publisher is available as that is used for reference time stamp between layers. + if f.referenceLayerSpatial != buffer.InvalidLayerSpatial && + layer < f.referenceLayerSpatial && + f.refInfos[f.referenceLayerSpatial].senderReport == nil { + layer = f.referenceLayerSpatial + } } return layer @@ -596,62 +617,57 @@ func (f *Forwarder) getRefLayer() (int32, int32) { return currentLayerSpatial, currentLayerSpatial } -func (f *Forwarder) SetRefSenderReport(isSVC bool, layer int32, srData *buffer.RTCPSenderReportData) { +func (f *Forwarder) SetRefSenderReport(isSVC bool, layer int32, srData *livekit.RTCPSenderReportState) { f.lock.Lock() defer f.lock.Unlock() f.refIsSVC = isSVC - refLayer, _ := f.getRefLayer() if layer >= 0 && int(layer) < len(f.refInfos) { f.refInfos[layer] = refInfo{srData, 0, false} - if layer == refLayer && srData.RTPTimestampExt >= f.lastSwitchExtIncomingTS { + + // Mark validity of time stamp offset. + // + // It is possible to implement mute using pause/unpause + // which can be implemented using replaceTrack(null)/replaceTrack(track). + // In those cases, the RTP time stamp may not jump across + // the mute/pause valley (for the time it is replaced with null track). + // So, relying on a report that happened before unmute/unpause + // could result in incorrect RTCP sender report on subscriber side. + // + // It could happen like this + // 1. Normal operation: publisher sending sender reports and + // suscribers use reports from publisher to calculate and send + // RTCP sender report. + // 2. Publisher pauses: there are no more reports. + // 3. When paused, subscriber can still use the publisher side sender + // report to send reports. Although the time since last publisher + // sender report is increasing, the reports would still be correct + // as they referencing a previous (albeit older) correct report. + // 4. Publisher unpauses after 20 seconds. But, it may not have advanced + // RTP Timestamp by that much. Let us say, it advances only by 5 seconds. + // 5. When subscriber starts forwarding packets, it will calculate + // a new time stamp offset to adjust to the new time stamp of publisher. + // 6. But, when that same offset is used on an old publisher sender report + // (i. e. a report from before the pause), the subscriber side sender + // reports jumps ahead in time by 15 seconds. + // + // So, mark valid for reports after last switch. + refLayer, _ := f.getRefLayer() + if layer == refLayer && srData.RtpTimestampExt >= f.lastSwitchExtIncomingTS { f.refInfos[layer].tsOffset = f.rtpMunger.GetTSOffset() f.refInfos[layer].isTSOffsetValid = true } } } -func (f *Forwarder) clearRefSenderReportsLocked() { - // On (re)start of fowarding, clear any old publisher sender reports. - // This is done to prevent use of potentially stale publisher sender reports. - // - // It is possible to implement mute using pause/unpause - // which can be implemented using replaceTrack(null)/replaceTrack(track). - // In those cases, the RTP time stamp may not jump across - // the mute/pause valley (for the time it is replaced with null track). - // So, relying on a report that happened before unmute/unpause - // could result in incorrect RTCP sender report on subscriber side. - // - // It could happen like this - // 1. Normal operation: publisher sending sender reports and - // suscribers use reports from publisher to calculate and send - // RTCP sender report. - // 2. Publisher pauses: there are no more reports. - // 3. When paused, subscriber can still use the publisher side sender - // report to send reports. Although the time since last publisher - // sender report is increasing, the reports would still be correct - // as they referencing a previous (albeit older) correct report. - // 4. Publisher unpauses after 20 seconds. But, it may not have advanced - // RTP Timestamp by that much. Let us say, it advances only by 5 seconds. - // 5. When subscriber starts forwarding packets, it will calculate - // a new time stamp offset to adjust to the new time stamp of publisher. - // 6. But, when that same offset is used on an old publisher sender report - // (i. e. a report from before the pause), the subscriber side sender - // reports jumps ahead in time by 15 seconds. - // - // By clearing sender report on (re)start of a stream, subscribers will wait for a fresh report - // after unmute to send sender report. - for layer := int32(0); layer < buffer.DefaultMaxLayerSpatial+1; layer++ { - f.refInfos[layer] = refInfo{nil, 0, false} - } -} - -func (f *Forwarder) GetSenderReportParams() (int32, uint64, *buffer.RTCPSenderReportData) { +func (f *Forwarder) GetSenderReportParams() (int32, uint64, *livekit.RTCPSenderReportState) { f.lock.RLock() defer f.lock.RUnlock() refLayer, currentLayerSpatial := f.getRefLayer() - if refLayer == buffer.InvalidLayerSpatial || !f.refInfos[refLayer].isTSOffsetValid { + if refLayer == buffer.InvalidLayerSpatial || + f.refInfos[refLayer].senderReport == nil || + !f.refInfos[refLayer].isTSOffsetValid { return buffer.InvalidLayerSpatial, 0, nil } @@ -1584,18 +1600,18 @@ func (f *Forwarder) getRefLayerRTPTimestamp(ts uint32, refLayer, targetLayer int srRef := f.refInfos[refLayer].senderReport srTarget := f.refInfos[targetLayer].senderReport - if srRef == nil || srRef.NTPTimestamp == 0 || srTarget == nil || srTarget.NTPTimestamp == 0 { + if srRef == nil || srRef.NtpTimestamp == 0 || srTarget == nil || srTarget.NtpTimestamp == 0 { return 0, fmt.Errorf("unavailable layer(s), refLayer: %d, targetLayer: %d", refLayer, targetLayer) } - ntpDiff := srRef.NTPTimestamp.Time().Sub(srTarget.NTPTimestamp.Time()) + ntpDiff := mediatransportutil.NtpTime(srRef.NtpTimestamp).Time().Sub(mediatransportutil.NtpTime(srTarget.NtpTimestamp).Time()) rtpDiff := ntpDiff.Nanoseconds() * int64(f.codec.ClockRate) / 1e9 // calculate other layer's time stamp at the same time as ref layer's NTP time - normalizedOtherTS := srTarget.RTPTimestamp + uint32(rtpDiff) + normalizedOtherTS := srTarget.RtpTimestamp + uint32(rtpDiff) // now both layers' time stamp refer to the same NTP time and the diff is the offset between the layers - offset := srRef.RTPTimestamp - normalizedOtherTS + offset := srRef.RtpTimestamp - normalizedOtherTS return ts + offset, nil } @@ -1607,8 +1623,6 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e f.rtpMunger.SetLastSnTs(extPkt) f.codecMunger.SetLast(extPkt) - f.clearRefSenderReportsLocked() - f.logger.Debugw( "starting forwarding", "sequenceNumber", extPkt.Packet.SequenceNumber, @@ -1644,7 +1658,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "extRefTS", extRefTS, "extLastTS", extLastTS, "diffSeconds", math.Abs(diffSeconds), - "refInfos", wrappedRefInfoLogger{f}, + "refInfos", wrappedRefInfosLogger{f}, ) } // TODO-REMOVE-AFTER-DATA-COLLECTION @@ -1659,7 +1673,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "extRefTS", extRefTS, "extLastTS", extLastTS, "diffSeconds", math.Abs(diffSeconds), - "refInfos", wrappedRefInfoLogger{f}, + "refInfos", wrappedRefInfosLogger{f}, ) } @@ -1775,13 +1789,6 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e extNextTS = extRefTS } f.resumeBehindThreshold = 0.0 - - // sender reports are cleared after calculating switch time stamp - // as relative differences between layers should remain the same. - // TODO: If the relative difference changes a lot, probably have to - // abandon the checks above and just use the expected timestamp - // as the next time stamp. - f.clearRefSenderReportsLocked() } else { // switching between layers, check if extRefTS is too far behind the last sent diffSeconds := float64(int64(extRefTS-extLastTS)) / float64(f.codec.ClockRate) @@ -1858,12 +1865,12 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer int32, tp *TranslationParams) error { if f.lastSSRC != extPkt.Packet.SSRC { if err := f.processSourceSwitch(extPkt, layer); err != nil { - f.logger.Debugw("could not switch feed", "error", err, "refInfos", wrappedRefInfoLogger{f}) + f.logger.Debugw("could not switch feed", "error", err, "refInfos", wrappedRefInfosLogger{f}) tp.shouldDrop = true f.vls.Rollback() return nil } - f.logger.Debugw("switching feed", "from", f.lastSSRC, "to", extPkt.Packet.SSRC, "refInfos", wrappedRefInfoLogger{f}) + f.logger.Debugw("switching feed", "from", f.lastSSRC, "to", extPkt.Packet.SSRC, "refInfos", wrappedRefInfosLogger{f}) f.lastSSRC = extPkt.Packet.SSRC f.lastSwitchExtIncomingTS = extPkt.ExtTimestamp }