Cache RTCP sender report in forwarder state. (#2994)

* Cache RTCP sender report in forwarder state.

To be used in migration.

TODO: need to check more places to operate pure in unix nano rather than
converting.

* match name
This commit is contained in:
Raja Subramanian
2024-09-10 20:50:50 +05:30
committed by GitHub
parent 3cf70b2198
commit b678ccdd66
8 changed files with 262 additions and 234 deletions
+5 -5
View File
@@ -19,7 +19,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598
github.com/livekit/protocol v1.21.1-0.20240906182921-f13661b200eb
github.com/livekit/protocol v1.21.1-0.20240910150033-0614c86dc888
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
@@ -50,7 +50,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
golang.org/x/sync v0.8.0
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
@@ -131,12 +131,12 @@ require (
go.uber.org/zap/exp v0.2.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.24.0 // indirect
golang.org/x/tools v0.25.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.66.0 // indirect
google.golang.org/grpc v1.66.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+10 -10
View File
@@ -169,8 +169,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.21.1-0.20240906182921-f13661b200eb h1:NHzzBd2qiu855Sgu/0LRah76pN+JFP5t7aoA/8n0cdw=
github.com/livekit/protocol v1.21.1-0.20240906182921-f13661b200eb/go.mod h1:AFuwk3+uIWFeO5ohKjx5w606Djl940+wktaZ441VoCI=
github.com/livekit/protocol v1.21.1-0.20240910150033-0614c86dc888 h1:976vtgdLdIaeoTMq9ABiNPr8pnelxFh9twTku2cd8tQ=
github.com/livekit/protocol v1.21.1-0.20240910150033-0614c86dc888/go.mod h1:AFuwk3+uIWFeO5ohKjx5w606Djl940+wktaZ441VoCI=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
@@ -366,8 +366,8 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e h1:I88y4caeGeuDQxgdoFPUq097j7kNfw6uvuiNxUBfcBk=
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
@@ -400,8 +400,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -478,8 +478,8 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps=
golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24=
golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ=
golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE=
golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -488,8 +488,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 h1:
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117/go.mod h1:OimBR/bc1wPO9iV4NC2bpyjy3VnAwZh5EBPQdtaE5oo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/grpc v1.66.1 h1:hO5qAXR19+/Z44hmvIM4dQFMSYX9XcWsByfoxutBpAM=
google.golang.org/grpc v1.66.1/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
+6 -11
View File
@@ -35,7 +35,6 @@ import (
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/sfu/utils"
sutils "github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/mediatransportutil"
"github.com/livekit/mediatransportutil/pkg/bucket"
"github.com/livekit/mediatransportutil/pkg/nack"
"github.com/livekit/mediatransportutil/pkg/twcc"
@@ -935,12 +934,12 @@ func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport {
func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64, packets uint32, octets uint32) {
b.RLock()
srData := &RTCPSenderReportData{
RTPTimestamp: rtpTime,
NTPTimestamp: mediatransportutil.NtpTime(ntpTime),
At: b.getMonotonicNow(),
srData := &livekit.RTCPSenderReportState{
RtpTimestamp: rtpTime,
NtpTimestamp: ntpTime,
At: b.getMonotonicNowUnixNano(),
Packets: packets,
Octets: octets,
Octets: uint64(octets),
}
didSet := false
@@ -956,7 +955,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64, packets uin
}
}
func (b *Buffer) GetSenderReportData() *RTCPSenderReportData {
func (b *Buffer) GetSenderReportData() *livekit.RTCPSenderReportState {
b.RLock()
defer b.RUnlock()
@@ -1126,10 +1125,6 @@ func (b *Buffer) getMonotonicNowUnixNano() int64 {
return b.baseTime.Add(time.Since(b.baseTime)).UnixNano()
}
func (b *Buffer) getMonotonicNow() time.Time {
return b.baseTime.Add(time.Since(b.baseTime))
}
// ---------------------------------------------------------------
// SVC-TODO: Have to use more conditions to differentiate between
+93 -77
View File
@@ -21,6 +21,7 @@ import (
"time"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/livekit/mediatransportutil"
@@ -112,53 +113,56 @@ type snapshot struct {
// ------------------------------------------------------------------
type RTCPSenderReportData struct {
RTPTimestamp uint32
RTPTimestampExt uint64
NTPTimestamp mediatransportutil.NtpTime
At time.Time
AtAdjusted time.Time
Packets uint32
Octets uint32
type wrappedRTPDriftLogger struct {
*livekit.RTPDrift
}
func (r *RTCPSenderReportData) PropagationDelay(passThrough bool) time.Duration {
func (w wrappedRTPDriftLogger) MarshalLogObject(e zapcore.ObjectEncoder) error {
rd := w.RTPDrift
if rd == nil {
return nil
}
e.AddTime("StartTime", rd.StartTime.AsTime())
e.AddTime("EndTime", rd.EndTime.AsTime())
e.AddFloat64("Duration", rd.Duration)
e.AddUint64("StartTimestamp", rd.StartTimestamp)
e.AddUint64("EndTimestamp", rd.EndTimestamp)
e.AddUint64("RtpClockTicks", rd.RtpClockTicks)
e.AddInt64("DriftSamples", rd.DriftSamples)
e.AddFloat64("DriftMs", rd.DriftMs)
e.AddFloat64("ClockRate", rd.ClockRate)
return nil
}
// ------------------------------------------------------------------
type WrappedRTCPSenderReportStateLogger struct {
*livekit.RTCPSenderReportState
}
func (w WrappedRTCPSenderReportStateLogger) MarshalLogObject(e zapcore.ObjectEncoder) error {
rsrs := w.RTCPSenderReportState
if rsrs == nil {
return nil
}
e.AddUint32("RtpTimestamp", rsrs.RtpTimestamp)
e.AddUint64("RtpTimestampExt", rsrs.RtpTimestampExt)
e.AddTime("NtpTimestamp", mediatransportutil.NtpTime(rsrs.NtpTimestamp).Time())
e.AddTime("At", time.Unix(0, rsrs.At))
e.AddTime("AtAdjusted", time.Unix(0, rsrs.AtAdjusted))
e.AddUint32("Packets", rsrs.Packets)
e.AddUint64("Octets", rsrs.Octets)
return nil
}
func RTCPSenderReportPropagationDelay(rsrs *livekit.RTCPSenderReportState, passThrough bool) time.Duration {
if passThrough {
return 0
}
return r.AtAdjusted.Sub(r.NTPTimestamp.Time())
}
func (r *RTCPSenderReportData) ToString() string {
if r == nil {
return ""
}
return fmt.Sprintf("ntp: %s, rtp: %d, extRtp: %d, at: %s, atAdj: %s, p: %d, o: %d",
r.NTPTimestamp.Time().String(),
r.RTPTimestamp,
r.RTPTimestampExt,
r.At.String(),
r.AtAdjusted.String(),
r.Packets,
r.Octets,
)
}
func (r *RTCPSenderReportData) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r == nil {
return nil
}
e.AddTime("NTPTimestamp", r.NTPTimestamp.Time())
e.AddUint32("RTPTimestamp", r.RTPTimestamp)
e.AddUint64("RTPTimestampExt", r.RTPTimestampExt)
e.AddTime("At", r.At)
e.AddTime("AtAdjusted", r.AtAdjusted)
e.AddUint32("Packets", r.Packets)
e.AddUint32("Octets", r.Octets)
return nil
return time.Unix(0, rsrs.AtAdjusted).Sub(mediatransportutil.NtpTime(rsrs.NtpTimestamp).Time())
}
// ------------------------------------------------------------------
@@ -226,8 +230,8 @@ type rtpStatsBase struct {
rtt uint32
maxRtt uint32
srFirst *RTCPSenderReportData
srNewest *RTCPSenderReportData
srFirst *livekit.RTCPSenderReportState
srNewest *livekit.RTCPSenderReportState
nextSnapshotID uint32
snapshots []snapshot
@@ -298,18 +302,8 @@ func (r *rtpStatsBase) seed(from *rtpStatsBase) bool {
r.rtt = from.rtt
r.maxRtt = from.maxRtt
if from.srFirst != nil {
srFirst := *from.srFirst
r.srFirst = &srFirst
} else {
r.srFirst = nil
}
if from.srNewest != nil {
srNewest := *from.srNewest
r.srNewest = &srNewest
} else {
r.srNewest = nil
}
r.srFirst = proto.Clone(from.srFirst).(*livekit.RTCPSenderReportState)
r.srNewest = proto.Clone(from.srNewest).(*livekit.RTCPSenderReportState)
r.nextSnapshotID = from.nextSnapshotID
r.snapshots = make([]snapshot, cap(from.snapshots))
@@ -510,7 +504,7 @@ func (r *rtpStatsBase) GetRtt() uint32 {
return r.rtt
}
func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, tsOffset uint64, extStartTS uint64) (err error, loggingFields []interface{}) {
func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderReportState, tsOffset uint64, extStartTS uint64) (err error, loggingFields []interface{}) {
if time.Since(r.startTime) > cFirstPacketTimeAdjustWindow {
return
}
@@ -521,8 +515,8 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData,
// abnormal delay (maybe due to pacing or maybe due to queuing
// in some network element along the way), push back first time
// to an earlier instance.
timeSinceReceive := time.Since(srData.AtAdjusted)
extNowTS := srData.RTPTimestampExt - tsOffset + uint64(timeSinceReceive.Nanoseconds()*int64(r.params.ClockRate)/1e9)
timeSinceReceive := time.Since(time.Unix(0, srData.AtAdjusted))
extNowTS := srData.RtpTimestampExt - tsOffset + uint64(timeSinceReceive.Nanoseconds()*int64(r.params.ClockRate)/1e9)
samplesDiff := int64(extNowTS - extStartTS)
if samplesDiff < 0 {
// out-of-order, skip
@@ -543,7 +537,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData,
"adjustment", time.Duration(r.firstTime - firstTime).String(),
"extNowTS", extNowTS,
"extStartTS", extStartTS,
"srData", srData,
"srData", WrappedRTCPSenderReportStateLogger{srData},
"tsOffset", tsOffset,
"timeSinceReceive", timeSinceReceive.String(),
"timeSinceFirst", timeSinceFirst.String(),
@@ -730,8 +724,8 @@ func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddUint32("rtt", r.rtt)
e.AddUint32("maxRtt", r.maxRtt)
e.AddObject("srFirst", r.srFirst)
e.AddObject("srNewest", r.srNewest)
e.AddObject("srFirst", WrappedRTCPSenderReportStateLogger{r.srFirst})
e.AddObject("srNewest", WrappedRTCPSenderReportStateLogger{r.srNewest})
return nil
}
@@ -800,7 +794,12 @@ func (r *rtpStatsBase) toString(
str += ", rtt(ms):"
str += fmt.Sprintf("%d|%d", p.RttCurrent, p.RttMax)
str += fmt.Sprintf(", pd: %s, nrd: %s, rrd: %s", RTPDriftToString(p.PacketDrift), RTPDriftToString(p.ReportDrift), RTPDriftToString(p.RebasedReportDrift))
str += fmt.Sprintf(", pd: %s, nrd: %s, rxrd: %s, rbrd: %s",
RTPDriftToString(p.PacketDrift),
RTPDriftToString(p.NtpReportDrift),
RTPDriftToString(p.ReceivedReportDrift),
RTPDriftToString(p.RebasedReportDrift),
)
return str
}
@@ -841,7 +840,7 @@ func (r *rtpStatsBase) toProto(
jitterTime := jitter / float64(r.params.ClockRate) * 1e6
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
packetDrift, ntpReportDrift, rebasedReportDrift := r.getDrift(extStartTS, extHighestTS)
packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(extStartTS, extHighestTS)
p := &livekit.RTPStats{
StartTime: timestamppb.New(r.startTime),
@@ -885,8 +884,9 @@ func (r *rtpStatsBase) toProto(
RttCurrent: r.rtt,
RttMax: r.maxRtt,
PacketDrift: packetDrift,
ReportDrift: ntpReportDrift,
NtpReportDrift: ntpReportDrift,
RebasedReportDrift: rebasedReportDrift,
ReceivedReportDrift: receivedReportDrift,
}
gapsPresent := false
@@ -968,7 +968,7 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64,
return &then, &now
}
func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *livekit.RTPDrift, ntpReportDrift *livekit.RTPDrift, rebasedReportDrift *livekit.RTPDrift) {
func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *livekit.RTPDrift, ntpReportDrift *livekit.RTPDrift, receivedReportDrift *livekit.RTPDrift, rebasedReportDrift *livekit.RTPDrift) {
if r.firstTime != 0 {
elapsed := r.highestTime - r.firstTime
rtpClockTicks := extHighestTS - extStartTS
@@ -989,18 +989,18 @@ func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *l
}
}
if r.srFirst != nil && r.srNewest != nil && r.srFirst.RTPTimestamp != r.srNewest.RTPTimestamp {
rtpClockTicks := r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt
if r.srFirst != nil && r.srNewest != nil && r.srFirst.RtpTimestamp != r.srNewest.RtpTimestamp {
rtpClockTicks := r.srNewest.RtpTimestampExt - r.srFirst.RtpTimestampExt
elapsed := r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time())
elapsed := mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time().Sub(mediatransportutil.NtpTime(r.srFirst.NtpTimestamp).Time())
if elapsed.Seconds() > 0.0 {
driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9))
ntpReportDrift = &livekit.RTPDrift{
StartTime: timestamppb.New(r.srFirst.NTPTimestamp.Time()),
EndTime: timestamppb.New(r.srNewest.NTPTimestamp.Time()),
StartTime: timestamppb.New(mediatransportutil.NtpTime(r.srFirst.NtpTimestamp).Time()),
EndTime: timestamppb.New(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time()),
Duration: elapsed.Seconds(),
StartTimestamp: r.srFirst.RTPTimestampExt,
EndTimestamp: r.srNewest.RTPTimestampExt,
StartTimestamp: r.srFirst.RtpTimestampExt,
EndTimestamp: r.srNewest.RtpTimestampExt,
RtpClockTicks: rtpClockTicks,
DriftSamples: driftSamples,
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
@@ -1008,15 +1008,31 @@ func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *l
}
}
elapsed = r.srNewest.AtAdjusted.Sub(r.srFirst.AtAdjusted)
elapsed = time.Duration(r.srNewest.At - r.srFirst.At)
if elapsed.Seconds() > 0.0 {
driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9))
receivedReportDrift = &livekit.RTPDrift{
StartTime: timestamppb.New(time.Unix(0, r.srFirst.At)),
EndTime: timestamppb.New(time.Unix(0, r.srNewest.At)),
Duration: elapsed.Seconds(),
StartTimestamp: r.srFirst.RtpTimestampExt,
EndTimestamp: r.srNewest.RtpTimestampExt,
RtpClockTicks: rtpClockTicks,
DriftSamples: driftSamples,
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
ClockRate: float64(rtpClockTicks) / elapsed.Seconds(),
}
}
elapsed = time.Duration(r.srNewest.AtAdjusted - r.srFirst.AtAdjusted)
if elapsed.Seconds() > 0.0 {
driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9))
rebasedReportDrift = &livekit.RTPDrift{
StartTime: timestamppb.New(r.srFirst.AtAdjusted),
EndTime: timestamppb.New(r.srNewest.AtAdjusted),
StartTime: timestamppb.New(time.Unix(0, r.srFirst.AtAdjusted)),
EndTime: timestamppb.New(time.Unix(0, r.srNewest.AtAdjusted)),
Duration: elapsed.Seconds(),
StartTimestamp: r.srFirst.RTPTimestampExt,
EndTimestamp: r.srNewest.RTPTimestampExt,
StartTimestamp: r.srFirst.RtpTimestampExt,
EndTimestamp: r.srNewest.RtpTimestampExt,
RtpClockTicks: rtpClockTicks,
DriftSamples: driftSamples,
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
+47 -43
View File
@@ -21,8 +21,10 @@ import (
"github.com/pion/rtcp"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
protoutils "github.com/livekit/protocol/utils"
)
@@ -372,23 +374,23 @@ func (r *RTPStatsReceiver) Update(
return
}
func (r *RTPStatsReceiver) getExtendedSenderReport(srData *RTCPSenderReportData) *RTCPSenderReportData {
func (r *RTPStatsReceiver) getExtendedSenderReport(srData *livekit.RTCPSenderReportState) *livekit.RTCPSenderReportState {
tsCycles := uint64(0)
if r.srNewest != nil {
// use time since last sender report to ensure long gaps where the time stamp might
// jump more than half the range
timeSinceLastReport := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
expectedRTPTimestampExt := r.srNewest.RTPTimestampExt + uint64(timeSinceLastReport.Nanoseconds()*int64(r.params.ClockRate)/1e9)
timeSinceLastReport := mediatransportutil.NtpTime(srData.NtpTimestamp).Time().Sub(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time())
expectedRTPTimestampExt := r.srNewest.RtpTimestampExt + uint64(timeSinceLastReport.Nanoseconds()*int64(r.params.ClockRate)/1e9)
lbound := expectedRTPTimestampExt - uint64(cReportSlack*float64(r.params.ClockRate))
ubound := expectedRTPTimestampExt + uint64(cReportSlack*float64(r.params.ClockRate))
isInRange := (srData.RTPTimestamp-uint32(lbound) < (1 << 31)) && (uint32(ubound)-srData.RTPTimestamp < (1 << 31))
isInRange := (srData.RtpTimestamp-uint32(lbound) < (1 << 31)) && (uint32(ubound)-srData.RtpTimestamp < (1 << 31))
if isInRange {
lbTSCycles := lbound & 0xFFFF_FFFF_0000_0000
ubTSCycles := ubound & 0xFFFF_FFFF_0000_0000
if lbTSCycles == ubTSCycles {
tsCycles = lbTSCycles
} else {
if srData.RTPTimestamp < (1 << 31) {
if srData.RtpTimestamp < (1 << 31) {
// rolled over
tsCycles = ubTSCycles
} else {
@@ -398,26 +400,26 @@ func (r *RTPStatsReceiver) getExtendedSenderReport(srData *RTCPSenderReportData)
} else {
// ideally this method should not be required, but there are clients
// negotiating one clock rate, but actually send media at a different rate.
tsCycles = r.srNewest.RTPTimestampExt & 0xFFFF_FFFF_0000_0000
if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp {
tsCycles = r.srNewest.RtpTimestampExt & 0xFFFF_FFFF_0000_0000
if (srData.RtpTimestamp-r.srNewest.RtpTimestamp) < (1<<31) && srData.RtpTimestamp < r.srNewest.RtpTimestamp {
tsCycles += (1 << 32)
}
if tsCycles >= (1 << 32) {
if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) >= (1<<31) && srData.RTPTimestamp > r.srNewest.RTPTimestamp {
if (srData.RtpTimestamp-r.srNewest.RtpTimestamp) >= (1<<31) && srData.RtpTimestamp > r.srNewest.RtpTimestamp {
tsCycles -= (1 << 32)
}
}
}
}
srDataExt := *srData
srDataExt.RTPTimestampExt = uint64(srDataExt.RTPTimestamp) + tsCycles
return &srDataExt
srDataExt := proto.Clone(srData).(*livekit.RTCPSenderReportState)
srDataExt.RtpTimestampExt = uint64(srDataExt.RtpTimestamp) + tsCycles
return srDataExt
}
func (r *RTPStatsReceiver) checkOutOfOrderSenderReport(srData *RTCPSenderReportData) bool {
if r.srNewest != nil && srData.RTPTimestampExt < r.srNewest.RTPTimestampExt {
func (r *RTPStatsReceiver) checkOutOfOrderSenderReport(srData *livekit.RTCPSenderReportState) bool {
if r.srNewest != nil && srData.RtpTimestampExt < r.srNewest.RtpTimestampExt {
// This can happen when a track is replaced with a null and then restored -
// i. e. muting replacing with null and unmute restoring the original track.
// Or it could be due bad report generation.
@@ -426,7 +428,7 @@ func (r *RTPStatsReceiver) checkOutOfOrderSenderReport(srData *RTCPSenderReportD
if (r.outOfOrderSenderReportCount-1)%10 == 0 {
r.logger.Infow(
"received sender report, out-of-order, skipping",
"current", srData,
"current", WrappedRTCPSenderReportStateLogger{srData},
"count", r.outOfOrderSenderReportCount,
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
)
@@ -437,17 +439,17 @@ func (r *RTPStatsReceiver) checkOutOfOrderSenderReport(srData *RTCPSenderReportD
return false
}
func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *RTCPSenderReportData) {
func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *livekit.RTCPSenderReportState) {
if r.srNewest == nil {
return
}
timeSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()).Seconds()
rtpDiffSinceLast := srData.RTPTimestampExt - r.srNewest.RTPTimestampExt
timeSinceLast := time.Duration(srData.NtpTimestamp - r.srNewest.NtpTimestamp).Seconds()
rtpDiffSinceLast := srData.RtpTimestampExt - r.srNewest.RtpTimestampExt
calculatedClockRateFromLast := float64(rtpDiffSinceLast) / timeSinceLast
timeSinceFirst := srData.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()).Seconds()
rtpDiffSinceFirst := srData.RTPTimestampExt - r.srFirst.RTPTimestampExt
timeSinceFirst := time.Duration(srData.NtpTimestamp - r.srFirst.NtpTimestamp).Seconds()
rtpDiffSinceFirst := srData.RtpTimestampExt - r.srFirst.RtpTimestampExt
calculatedClockRateFromFirst := float64(rtpDiffSinceFirst) / timeSinceFirst
if (timeSinceLast > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate)) ||
@@ -456,7 +458,7 @@ func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *RTCPSenderRe
if (r.clockSkewCount-1)%100 == 0 {
r.logger.Infow(
"received sender report, clock skew",
"current", srData,
"current", WrappedRTCPSenderReportStateLogger{srData},
"timeSinceFirst", timeSinceFirst,
"rtpDiffSinceFirst", rtpDiffSinceFirst,
"calculatedFirst", calculatedClockRateFromFirst,
@@ -470,13 +472,13 @@ func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *RTCPSenderRe
}
}
func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srData *RTCPSenderReportData) {
func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srData *livekit.RTCPSenderReportState) {
if r.highestTime == 0 {
return
}
timeSinceSR := time.Since(srData.AtAdjusted)
extNowTSSR := srData.RTPTimestampExt + uint64(timeSinceSR.Nanoseconds()*int64(r.params.ClockRate)/1e9)
timeSinceSR := time.Since(time.Unix(0, srData.AtAdjusted))
extNowTSSR := srData.RtpTimestampExt + uint64(timeSinceSR.Nanoseconds()*int64(r.params.ClockRate)/1e9)
timeSinceHighest := time.Since(time.Unix(0, r.highestTime))
extNowTSHighest := r.timestamp.GetExtendedHighest() + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
@@ -492,7 +494,7 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa
if (r.clockSkewMediaPathCount-1)%100 == 0 {
r.logger.Infow(
"received sender report, clock skew against media path",
"current", srData,
"current", WrappedRTCPSenderReportStateLogger{srData},
"timeSinceSR", timeSinceSR,
"extNowTSSR", extNowTSSR,
"timeSinceHighest", timeSinceHighest,
@@ -508,9 +510,9 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa
}
}
func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *RTCPSenderReportData) {
senderClockTime := srData.NTPTimestamp.Time()
estimatedPropagationDelay, stepChange := r.propagationDelayEstimator.Update(senderClockTime, srData.At)
func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *livekit.RTCPSenderReportState) {
senderClockTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time()
estimatedPropagationDelay, stepChange := r.propagationDelayEstimator.Update(senderClockTime, time.Unix(0, srData.At))
if stepChange {
r.logger.Debugw(
"propagation delay step change",
@@ -523,11 +525,11 @@ func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *R
r.srFirst = srData
}
// adjust receive time to estimated propagation delay
srData.AtAdjusted = senderClockTime.Add(estimatedPropagationDelay)
srData.AtAdjusted = senderClockTime.Add(estimatedPropagationDelay).UnixNano()
r.srNewest = srData
}
func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) bool {
func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *livekit.RTCPSenderReportState) bool {
r.lock.Lock()
defer r.lock.Unlock()
@@ -536,7 +538,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
}
// prevent against extreme case of anachronous sender reports
if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp {
if r.srNewest != nil && r.srNewest.NtpTimestamp > srData.NtpTimestamp {
r.logger.Infow(
"received sender report, anachronous, dropping",
"current", srData,
@@ -561,16 +563,11 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
return true
}
func (r *RTPStatsReceiver) GetRtcpSenderReportData() *RTCPSenderReportData {
func (r *RTPStatsReceiver) GetRtcpSenderReportData() *livekit.RTCPSenderReportState {
r.lock.RLock()
defer r.lock.RUnlock()
if r.srNewest == nil {
return nil
}
srNewestCopy := *r.srNewest
return &srNewestCopy
return proto.Clone(r.srNewest).(*livekit.RTCPSenderReportState)
}
func (r *RTPStatsReceiver) LastSenderReportTime() time.Time {
@@ -578,7 +575,7 @@ func (r *RTPStatsReceiver) LastSenderReportTime() time.Time {
defer r.lock.RUnlock()
if r.srNewest != nil {
return r.srNewest.At
return time.Unix(0, r.srNewest.At)
}
return time.Time{}
@@ -625,9 +622,9 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin
lastSR := uint32(0)
dlsr := uint32(0)
if r.srNewest != nil {
lastSR = uint32(r.srNewest.NTPTimestamp >> 16)
if !r.srNewest.At.IsZero() {
delayUS := time.Since(r.srNewest.At).Microseconds()
lastSR = uint32(r.srNewest.NtpTimestamp >> 16)
if r.srNewest.At != 0 {
delayUS := time.Since(time.Unix(0, r.srNewest.At)).Microseconds()
dlsr = uint32(delayUS * 65536 / 1e6)
}
}
@@ -719,10 +716,17 @@ func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncod
e.AddUint64("extStartSN", r.sequenceNumber.GetExtendedStart())
e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest())
e.AddUint64("extStartTS", r.timestamp.GetExtendedStart())
e.AddUint64("extHighestTS", r.timestamp.GetExtendedHighest())
extStartTS, extHighestTS := r.timestamp.GetExtendedStart(), r.timestamp.GetExtendedHighest()
e.AddUint64("extStartTS", extStartTS)
e.AddUint64("extHighestTS", extHighestTS)
e.AddObject("propagationDelayEstimator", r.propagationDelayEstimator)
packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(extStartTS, extHighestTS)
e.AddObject("packetDrift", wrappedRTPDriftLogger{packetDrift})
e.AddObject("ntpReportDrift", wrappedRTPDriftLogger{ntpReportDrift})
e.AddObject("receivedReportDrift", wrappedRTPDriftLogger{receivedReportDrift})
e.AddObject("rebasedReportDrift", wrappedRTPDriftLogger{rebasedReportDrift})
return nil
}
+29 -23
View File
@@ -483,7 +483,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
if r.srNewest != nil {
var err error
rtt, err = mediatransportutil.GetRttMs(&rr, r.srNewest.NTPTimestamp, r.srNewest.At)
rtt, err = mediatransportutil.GetRttMs(&rr, mediatransportutil.NtpTime(r.srNewest.NtpTimestamp), time.Unix(0, r.srNewest.At))
if err == nil {
isRttChanged = rtt != r.rtt
} else {
@@ -584,7 +584,7 @@ func (r *RTPStatsSender) LastReceiverReportTime() time.Time {
return r.lastRRTime
}
func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(publisherSRData *RTCPSenderReportData, tsOffset uint64) {
func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(publisherSRData *livekit.RTCPSenderReportState, tsOffset uint64) {
r.lock.Lock()
defer r.lock.Unlock()
@@ -612,7 +612,7 @@ func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt ui
return
}
func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPSenderReportData, tsOffset uint64, passThrough bool) *rtcp.SenderReport {
func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livekit.RTCPSenderReportState, tsOffset uint64, passThrough bool) *rtcp.SenderReport {
r.lock.Lock()
defer r.lock.Unlock()
@@ -620,26 +620,26 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS
return nil
}
timeSincePublisherSRAdjusted := time.Since(publisherSRData.AtAdjusted)
now := publisherSRData.AtAdjusted.Add(timeSincePublisherSRAdjusted)
timeSincePublisherSRAdjusted := time.Since(time.Unix(0, publisherSRData.AtAdjusted))
now := publisherSRData.AtAdjusted + timeSincePublisherSRAdjusted.Nanoseconds()
var (
nowNTP mediatransportutil.NtpTime
nowRTPExt uint64
)
if passThrough {
nowNTP = publisherSRData.NTPTimestamp
nowRTPExt = publisherSRData.RTPTimestampExt - tsOffset
nowNTP = mediatransportutil.NtpTime(publisherSRData.NtpTimestamp)
nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset
} else {
nowNTP = mediatransportutil.ToNtpTime(now)
nowRTPExt = publisherSRData.RTPTimestampExt - tsOffset + uint64(timeSincePublisherSRAdjusted.Nanoseconds()*int64(r.params.ClockRate)/1e9)
nowNTP = mediatransportutil.ToNtpTime(time.Unix(0, now))
nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset + uint64(timeSincePublisherSRAdjusted.Nanoseconds()*int64(r.params.ClockRate)/1e9)
}
packetCount := uint32(r.getTotalPacketsPrimary(r.extStartSN, r.extHighestSN) + r.packetsDuplicate + r.packetsPadding)
octetCount := uint32(r.bytes + r.bytesDuplicate + r.bytesPadding)
srData := &RTCPSenderReportData{
NTPTimestamp: nowNTP,
RTPTimestamp: uint32(nowRTPExt),
RTPTimestampExt: nowRTPExt,
octetCount := r.bytes + r.bytesDuplicate + r.bytesPadding
srData := &livekit.RTCPSenderReportState{
NtpTimestamp: uint64(nowNTP),
RtpTimestamp: uint32(nowRTPExt),
RtpTimestampExt: nowRTPExt,
At: now,
AtAdjusted: now,
Packets: packetCount,
@@ -652,18 +652,18 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS
"feed", publisherSRData,
"tsOffset", tsOffset,
"timeNow", time.Now().String(),
"now", now.String(),
"timeSinceHighest", now.Sub(time.Unix(0, r.highestTime)).String(),
"timeSinceFirst", now.Sub(time.Unix(0, r.firstTime)).String(),
"now", time.Unix(0, now).String(),
"timeSinceHighest", time.Unix(0, now).Sub(time.Unix(0, r.highestTime)).String(),
"timeSinceFirst", time.Unix(0, now).Sub(time.Unix(0, r.firstTime)).String(),
"timeSincePublisherSRAdjusted", timeSincePublisherSRAdjusted.String(),
"timeSincePublisherSR", time.Since(publisherSRData.At).String(),
"timeSincePublisherSR", time.Since(time.Unix(0, publisherSRData.At)).String(),
"nowRTPExt", nowRTPExt,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
}
}
if r.srNewest != nil && nowRTPExt >= r.srNewest.RTPTimestampExt {
timeSinceLastReport := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiffSinceLastReport := nowRTPExt - r.srNewest.RTPTimestampExt
if r.srNewest != nil && nowRTPExt >= r.srNewest.RtpTimestampExt {
timeSinceLastReport := nowNTP.Time().Sub(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time())
rtpDiffSinceLastReport := nowRTPExt - r.srNewest.RtpTimestampExt
windowClockRate := float64(rtpDiffSinceLastReport) / timeSinceLastReport.Seconds()
if timeSinceLastReport.Seconds() > 0.2 && math.Abs(float64(r.params.ClockRate)-windowClockRate) > 0.2*float64(r.params.ClockRate) {
r.clockSkewCount++
@@ -680,7 +680,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS
}
}
if r.srNewest != nil && nowRTPExt < r.srNewest.RTPTimestampExt {
if r.srNewest != nil && nowRTPExt < r.srNewest.RtpTimestampExt {
// If report being generated is behind the last report, skip it.
// Should not happen.
r.logger.Infow("sending sender report, out-of-order, skipping", getFields()...)
@@ -697,7 +697,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS
NTPTime: uint64(nowNTP),
RTPTime: uint32(nowRTPExt),
PacketCount: packetCount,
OctetCount: octetCount,
OctetCount: uint32(octetCount),
}
}
@@ -1025,5 +1025,11 @@ func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder
e.AddUint64("packetsLostFromRR", r.packetsLostFromRR)
e.AddFloat64("jitterFromRR", r.jitterFromRR)
e.AddFloat64("maxJitterFromRR", r.maxJitterFromRR)
packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(r.extStartTS, r.extHighestTS)
e.AddObject("packetDrift", wrappedRTPDriftLogger{packetDrift})
e.AddObject("ntpReportDrift", wrappedRTPDriftLogger{ntpReportDrift})
e.AddObject("receivedReportDrift", wrappedRTPDriftLogger{receivedReportDrift})
e.AddObject("rebasedReportDrift", wrappedRTPDriftLogger{rebasedReportDrift})
return nil
}
+4 -4
View File
@@ -61,7 +61,7 @@ type TrackSender interface {
payloadType webrtc.PayloadType,
isSVC bool,
layer int32,
publisherSRData *buffer.RTCPSenderReportData,
publisherSRData *livekit.RTCPSenderReportState,
) error
Resync()
}
@@ -905,7 +905,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
_, _, refSenderReport := d.forwarder.GetSenderReportParams()
if refSenderReport != nil {
actExtCopy := *extPkt.AbsCaptureTimeExt
if err = actExtCopy.Rewrite(refSenderReport.PropagationDelay(!d.params.DisableSenderReportPassThrough)); err == nil {
if err = actExtCopy.Rewrite(buffer.RTCPSenderReportPropagationDelay(refSenderReport, !d.params.DisableSenderReportPassThrough)); err == nil {
actBytes, err = actExtCopy.Marshal()
if err == nil {
extensions = append(
@@ -2191,7 +2191,7 @@ func (d *DownTrack) HandleRTCPSenderReportData(
_payloadType webrtc.PayloadType,
isSVC bool,
layer int32,
publisherSRData *buffer.RTCPSenderReportData,
publisherSRData *livekit.RTCPSenderReportState,
) error {
d.forwarder.SetRefSenderReport(isSVC, layer, publisherSRData)
@@ -2202,7 +2202,7 @@ func (d *DownTrack) HandleRTCPSenderReportData(
return nil
}
func (d *DownTrack) handleRTCPSenderReportData(publisherSRData *buffer.RTCPSenderReportData, tsOffset uint64) {
func (d *DownTrack) handleRTCPSenderReportData(publisherSRData *livekit.RTCPSenderReportState, tsOffset uint64) {
d.rtpStats.MaybeAdjustFirstPacketTime(publisherSRData, tsOffset)
}
+68 -61
View File
@@ -26,7 +26,9 @@ import (
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/proto"
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -188,13 +190,13 @@ type TranslationParams struct {
// -------------------------------------------------------------------
type refInfo struct {
senderReport *buffer.RTCPSenderReportData
senderReport *livekit.RTCPSenderReportState
tsOffset uint64
isTSOffsetValid bool
}
func (r refInfo) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddObject("senderReport", r.senderReport)
e.AddObject("senderReport", buffer.WrappedRTCPSenderReportStateLogger{r.senderReport})
e.AddUint64("tsOffset", r.tsOffset)
e.AddBool("isTSOffsetValid", r.isTSOffsetValid)
return nil
@@ -202,11 +204,11 @@ func (r refInfo) MarshalLogObject(e zapcore.ObjectEncoder) error {
// -------------------------------------------------------------------
type wrappedRefInfoLogger struct {
type wrappedRefInfosLogger struct {
*Forwarder
}
func (w wrappedRefInfoLogger) MarshalLogObject(e zapcore.ObjectEncoder) error {
func (w wrappedRefInfosLogger) MarshalLogObject(e zapcore.ObjectEncoder) error {
for i, refInfo := range w.Forwarder.refInfos {
e.AddObject(fmt.Sprintf("%d", i), refInfo)
}
@@ -399,6 +401,11 @@ func (f *Forwarder) GetState() *livekit.RTPForwarderState {
Vp8Munger: vp8MungerState,
}
}
state.SenderReportState = make([]*livekit.RTCPSenderReportState, len(f.refInfos))
for layer, refInfo := range f.refInfos {
state.SenderReportState[layer] = proto.Clone(refInfo.senderReport).(*livekit.RTCPSenderReportState)
}
return state
}
@@ -410,6 +417,12 @@ func (f *Forwarder) SeedState(state *livekit.RTPForwarderState) {
f.lock.Lock()
defer f.lock.Unlock()
for layer, rtcpSenderReportState := range state.SenderReportState {
f.refInfos[layer] = refInfo{
senderReport: proto.Clone(rtcpSenderReportState).(*livekit.RTCPSenderReportState),
}
}
f.rtpMunger.SeedState(state.RtpMunger)
f.codecMunger.SeedState(state.CodecMunger)
@@ -570,6 +583,14 @@ func (f *Forwarder) GetMaxSubscribedSpatial() int32 {
if layer < f.vls.GetCurrent().Spatial {
layer = f.vls.GetCurrent().Spatial
}
// if reference layer is higher, hold there until an RTCP Sender Report from
// publisher is available as that is used for reference time stamp between layers.
if f.referenceLayerSpatial != buffer.InvalidLayerSpatial &&
layer < f.referenceLayerSpatial &&
f.refInfos[f.referenceLayerSpatial].senderReport == nil {
layer = f.referenceLayerSpatial
}
}
return layer
@@ -596,62 +617,57 @@ func (f *Forwarder) getRefLayer() (int32, int32) {
return currentLayerSpatial, currentLayerSpatial
}
func (f *Forwarder) SetRefSenderReport(isSVC bool, layer int32, srData *buffer.RTCPSenderReportData) {
func (f *Forwarder) SetRefSenderReport(isSVC bool, layer int32, srData *livekit.RTCPSenderReportState) {
f.lock.Lock()
defer f.lock.Unlock()
f.refIsSVC = isSVC
refLayer, _ := f.getRefLayer()
if layer >= 0 && int(layer) < len(f.refInfos) {
f.refInfos[layer] = refInfo{srData, 0, false}
if layer == refLayer && srData.RTPTimestampExt >= f.lastSwitchExtIncomingTS {
// Mark validity of time stamp offset.
//
// It is possible to implement mute using pause/unpause
// which can be implemented using replaceTrack(null)/replaceTrack(track).
// In those cases, the RTP time stamp may not jump across
// the mute/pause valley (for the time it is replaced with null track).
// So, relying on a report that happened before unmute/unpause
// could result in incorrect RTCP sender report on subscriber side.
//
// It could happen like this
// 1. Normal operation: publisher sending sender reports and
// suscribers use reports from publisher to calculate and send
// RTCP sender report.
// 2. Publisher pauses: there are no more reports.
// 3. When paused, subscriber can still use the publisher side sender
// report to send reports. Although the time since last publisher
// sender report is increasing, the reports would still be correct
// as they referencing a previous (albeit older) correct report.
// 4. Publisher unpauses after 20 seconds. But, it may not have advanced
// RTP Timestamp by that much. Let us say, it advances only by 5 seconds.
// 5. When subscriber starts forwarding packets, it will calculate
// a new time stamp offset to adjust to the new time stamp of publisher.
// 6. But, when that same offset is used on an old publisher sender report
// (i. e. a report from before the pause), the subscriber side sender
// reports jumps ahead in time by 15 seconds.
//
// So, mark valid for reports after last switch.
refLayer, _ := f.getRefLayer()
if layer == refLayer && srData.RtpTimestampExt >= f.lastSwitchExtIncomingTS {
f.refInfos[layer].tsOffset = f.rtpMunger.GetTSOffset()
f.refInfos[layer].isTSOffsetValid = true
}
}
}
func (f *Forwarder) clearRefSenderReportsLocked() {
// On (re)start of fowarding, clear any old publisher sender reports.
// This is done to prevent use of potentially stale publisher sender reports.
//
// It is possible to implement mute using pause/unpause
// which can be implemented using replaceTrack(null)/replaceTrack(track).
// In those cases, the RTP time stamp may not jump across
// the mute/pause valley (for the time it is replaced with null track).
// So, relying on a report that happened before unmute/unpause
// could result in incorrect RTCP sender report on subscriber side.
//
// It could happen like this
// 1. Normal operation: publisher sending sender reports and
// suscribers use reports from publisher to calculate and send
// RTCP sender report.
// 2. Publisher pauses: there are no more reports.
// 3. When paused, subscriber can still use the publisher side sender
// report to send reports. Although the time since last publisher
// sender report is increasing, the reports would still be correct
// as they referencing a previous (albeit older) correct report.
// 4. Publisher unpauses after 20 seconds. But, it may not have advanced
// RTP Timestamp by that much. Let us say, it advances only by 5 seconds.
// 5. When subscriber starts forwarding packets, it will calculate
// a new time stamp offset to adjust to the new time stamp of publisher.
// 6. But, when that same offset is used on an old publisher sender report
// (i. e. a report from before the pause), the subscriber side sender
// reports jumps ahead in time by 15 seconds.
//
// By clearing sender report on (re)start of a stream, subscribers will wait for a fresh report
// after unmute to send sender report.
for layer := int32(0); layer < buffer.DefaultMaxLayerSpatial+1; layer++ {
f.refInfos[layer] = refInfo{nil, 0, false}
}
}
func (f *Forwarder) GetSenderReportParams() (int32, uint64, *buffer.RTCPSenderReportData) {
func (f *Forwarder) GetSenderReportParams() (int32, uint64, *livekit.RTCPSenderReportState) {
f.lock.RLock()
defer f.lock.RUnlock()
refLayer, currentLayerSpatial := f.getRefLayer()
if refLayer == buffer.InvalidLayerSpatial || !f.refInfos[refLayer].isTSOffsetValid {
if refLayer == buffer.InvalidLayerSpatial ||
f.refInfos[refLayer].senderReport == nil ||
!f.refInfos[refLayer].isTSOffsetValid {
return buffer.InvalidLayerSpatial, 0, nil
}
@@ -1584,18 +1600,18 @@ func (f *Forwarder) getRefLayerRTPTimestamp(ts uint32, refLayer, targetLayer int
srRef := f.refInfos[refLayer].senderReport
srTarget := f.refInfos[targetLayer].senderReport
if srRef == nil || srRef.NTPTimestamp == 0 || srTarget == nil || srTarget.NTPTimestamp == 0 {
if srRef == nil || srRef.NtpTimestamp == 0 || srTarget == nil || srTarget.NtpTimestamp == 0 {
return 0, fmt.Errorf("unavailable layer(s), refLayer: %d, targetLayer: %d", refLayer, targetLayer)
}
ntpDiff := srRef.NTPTimestamp.Time().Sub(srTarget.NTPTimestamp.Time())
ntpDiff := mediatransportutil.NtpTime(srRef.NtpTimestamp).Time().Sub(mediatransportutil.NtpTime(srTarget.NtpTimestamp).Time())
rtpDiff := ntpDiff.Nanoseconds() * int64(f.codec.ClockRate) / 1e9
// calculate other layer's time stamp at the same time as ref layer's NTP time
normalizedOtherTS := srTarget.RTPTimestamp + uint32(rtpDiff)
normalizedOtherTS := srTarget.RtpTimestamp + uint32(rtpDiff)
// now both layers' time stamp refer to the same NTP time and the diff is the offset between the layers
offset := srRef.RTPTimestamp - normalizedOtherTS
offset := srRef.RtpTimestamp - normalizedOtherTS
return ts + offset, nil
}
@@ -1607,8 +1623,6 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
f.rtpMunger.SetLastSnTs(extPkt)
f.codecMunger.SetLast(extPkt)
f.clearRefSenderReportsLocked()
f.logger.Debugw(
"starting forwarding",
"sequenceNumber", extPkt.Packet.SequenceNumber,
@@ -1644,7 +1658,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"extRefTS", extRefTS,
"extLastTS", extLastTS,
"diffSeconds", math.Abs(diffSeconds),
"refInfos", wrappedRefInfoLogger{f},
"refInfos", wrappedRefInfosLogger{f},
)
}
// TODO-REMOVE-AFTER-DATA-COLLECTION
@@ -1659,7 +1673,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"extRefTS", extRefTS,
"extLastTS", extLastTS,
"diffSeconds", math.Abs(diffSeconds),
"refInfos", wrappedRefInfoLogger{f},
"refInfos", wrappedRefInfosLogger{f},
)
}
@@ -1775,13 +1789,6 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
extNextTS = extRefTS
}
f.resumeBehindThreshold = 0.0
// sender reports are cleared after calculating switch time stamp
// as relative differences between layers should remain the same.
// TODO: If the relative difference changes a lot, probably have to
// abandon the checks above and just use the expected timestamp
// as the next time stamp.
f.clearRefSenderReportsLocked()
} else {
// switching between layers, check if extRefTS is too far behind the last sent
diffSeconds := float64(int64(extRefTS-extLastTS)) / float64(f.codec.ClockRate)
@@ -1858,12 +1865,12 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer int32, tp *TranslationParams) error {
if f.lastSSRC != extPkt.Packet.SSRC {
if err := f.processSourceSwitch(extPkt, layer); err != nil {
f.logger.Debugw("could not switch feed", "error", err, "refInfos", wrappedRefInfoLogger{f})
f.logger.Debugw("could not switch feed", "error", err, "refInfos", wrappedRefInfosLogger{f})
tp.shouldDrop = true
f.vls.Rollback()
return nil
}
f.logger.Debugw("switching feed", "from", f.lastSSRC, "to", extPkt.Packet.SSRC, "refInfos", wrappedRefInfoLogger{f})
f.logger.Debugw("switching feed", "from", f.lastSSRC, "to", extPkt.Packet.SSRC, "refInfos", wrappedRefInfosLogger{f})
f.lastSSRC = extPkt.Packet.SSRC
f.lastSwitchExtIncomingTS = extPkt.ExtTimestamp
}