Remove sender report warp logs. (#2007)

* Remove sender report warp logs.

They are not useful.
Also replacing drift report with proper protocol and reporting
both packet ad report drift.

Need to dig more into out-of-order sender report sending.
That requires some digging and understanding.

* record time of anachronous report

* more logging around out-of-order repair

* log time of out-of-order received sender report

* Update deps and place holder StartParticipantEgress
This commit is contained in:
Raja Subramanian
2023-08-29 00:30:24 +05:30
committed by GitHub
parent 64bcef28aa
commit 7dc8a7f80c
4 changed files with 83 additions and 138 deletions

2
go.mod
View File

@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0
github.com/livekit/protocol v1.6.1
github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86
github.com/livekit/psrpc v0.3.3
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0

4
go.sum
View File

@@ -124,8 +124,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 h1:cHNvPzn6VHFcsHx8ZC9LwU/4jj22mW3LILrNg/y5A6I=
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14=
github.com/livekit/protocol v1.6.1 h1:MjRg/UBmynE636In1GD9PbrF2u/C10WwaVIkObsZYtk=
github.com/livekit/protocol v1.6.1/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU=
github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 h1:QEzGhfIOmGdRw17xIldbYzb1MTsYuVfXSqz8FTyfjWQ=
github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU=
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=

View File

@@ -17,6 +17,7 @@ package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
@@ -160,6 +161,10 @@ func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgre
return ei, err
}
func (s *EgressService) StartParticipantEgress(ctx context.Context, req *livekit.ParticipantEgressRequest) (*livekit.EgressInfo, error) {
return nil, errors.New("under development")
}
func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)

View File

@@ -17,7 +17,6 @@ package buffer
import (
"errors"
"fmt"
"math"
"sync"
"time"
@@ -44,22 +43,16 @@ const (
// -------------------------------------------------------
type driftResult struct {
timeSinceFirst time.Duration
rtpDiffSinceFirst uint64
driftSamples int64
driftMs float64
sampleRate float64
}
func RTPDriftToString(r *livekit.RTPDrift) string {
if r == nil {
return "-"
}
func (d driftResult) String() string {
return fmt.Sprintf("time: %+v, rtp: %d, driftSamples: %d, driftMs: %.02f, sampleRate: %.02f",
d.timeSinceFirst,
d.rtpDiffSinceFirst,
d.driftSamples,
d.driftMs,
d.sampleRate,
)
str := fmt.Sprintf("t: %+v|%+v|%.2fs", r.StartTime.AsTime().Format(time.UnixDate), r.EndTime.AsTime().Format(time.UnixDate), r.Duration)
str += fmt.Sprintf(", ts: %d|%d|%d", r.StartTimestamp, r.EndTimestamp, r.RtpClockTicks)
str += fmt.Sprintf(", d: %d|%.2fms", r.DriftSamples, r.DriftMs)
str += fmt.Sprintf(", cr: %.2f", r.ClockRate)
return str
}
// -------------------------------------------------------
@@ -867,8 +860,10 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
"received anachronous sender report",
"currentNTP", srData.NTPTimestamp.Time().String(),
"currentRTP", srData.RTPTimestamp,
"currentAt", srData.At.String(),
"lastNTP", r.srNewest.NTPTimestamp.Time().String(),
"lastRTP", r.srNewest.RTPTimestamp,
"lastAt", r.srNewest.At.String(),
)
return
}
@@ -891,63 +886,29 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt)
// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
var rtpDiffSinceLast uint32
var arrivalDiffSinceLast time.Duration
var expectedTimeDiffSinceLast float64
var isWarped bool
if r.srNewest != nil {
if srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt {
// This can happen when a track is replaced with a null and then restored -
// i. e. muting replacing with null and unmute restoring the original track.
// Under such a condition reset the sender reports to start from this point.
// Resetting will ensure sample rate calculations do not go haywire due to negative time.
r.logger.Infow(
"received sender report, out-of-order, resetting",
"prevTSExt", r.srNewest.RTPTimestampExt,
"prevRTP", r.srNewest.RTPTimestamp,
"prevNTP", r.srNewest.NTPTimestamp.Time().String(),
"currTSExt", srDataCopy.RTPTimestampExt,
"currRTP", srDataCopy.RTPTimestamp,
"currNTP", srDataCopy.NTPTimestamp.Time().String(),
)
r.srFirst = &srDataCopy
r.srNewest = &srDataCopy
}
ntpDiffSinceLast = srDataCopy.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiffSinceLast = srDataCopy.RTPTimestamp - r.srNewest.RTPTimestamp
arrivalDiffSinceLast = srDataCopy.At.Sub(r.srNewest.At)
expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 {
// more than 200 ms away from expected delta
isWarped = true
}
if r.srNewest != nil && srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt {
// This can happen when a track is replaced with a null and then restored -
// i. e. muting replacing with null and unmute restoring the original track.
// Under such a condition reset the sender reports to start from this point.
// Resetting will ensure sample rate calculations do not go haywire due to negative time.
r.logger.Infow(
"received sender report, out-of-order, resetting",
"prevTSExt", r.srNewest.RTPTimestampExt,
"prevRTP", r.srNewest.RTPTimestamp,
"prevNTP", r.srNewest.NTPTimestamp.Time().String(),
"prevAt", r.srNewest.At.String(),
"currTSExt", srDataCopy.RTPTimestampExt,
"currRTP", srDataCopy.RTPTimestamp,
"currNTP", srDataCopy.NTPTimestamp.Time().String(),
"currentAt", srDataCopy.At.String(),
)
r.srFirst = nil
}
r.srNewest = &srDataCopy
if r.srFirst == nil {
r.srFirst = &srDataCopy
}
if isWarped {
packetDriftResult, reportDriftResult := r.getDrift()
r.logger.Infow(
"received sender report, time warp",
"ntp", srData.NTPTimestamp.Time().String(),
"rtp", srData.RTPTimestamp,
"arrival", srData.At.String(),
"ntpDiffSinceLast", ntpDiffSinceLast.Seconds(),
"rtpDiffSinceLast", int32(rtpDiffSinceLast),
"arrivalDiffSinceLast", arrivalDiffSinceLast.Seconds(),
"expectedTimeDiffSinceLast", expectedTimeDiffSinceLast,
"packetDrift", packetDriftResult.String(),
"reportDrift", reportDriftResult.String(),
"highestTS", r.timestamp.GetExtendedHighest(),
"highestTime", r.highestTime.String(),
)
}
}
func (r *RTPStats) GetRtcpSenderReportData() (srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) {
@@ -996,6 +957,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
timeSinceHighest := now.Sub(r.highestTime)
nowRTPExt := r.timestamp.GetExtendedHighest() + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
nowRTPExtUsingTime := nowRTPExt
nowRTP := uint32(nowRTPExt)
// It is possible that publisher is pacing at a slower rate.
@@ -1030,30 +992,20 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
"currTSExt", nowRTPExt,
"currRTP", nowRTP,
"currNTP", nowNTP.Time().String(),
"timeNow", time.Now().String(),
"firstTime", r.firstTime.String(),
"timeSinceFirst", timeSinceFirst,
"highestTime", r.highestTime.String(),
"timeSinceHighest", timeSinceHighest,
"nowRTPExtUsingTime", nowRTPExtUsingTime,
"calculatedClockRate", calculatedClockRate,
"nowRTPExtUsingRate", nowRTPExtUsingRate,
)
ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate))
nowRTP = uint32(nowRTPExt)
}
// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
var rtpDiffSinceLast uint64
var departureDiffSinceLast time.Duration
var expectedTimeDiffSinceLast float64
var isWarped bool
if r.srNewest != nil {
ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiffSinceLast = nowRTPExt - r.srNewest.RTPTimestampExt
departureDiffSinceLast = now.Sub(r.srNewest.At)
expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 {
// more than 200 ms away from expected delta
isWarped = true
}
}
r.srNewest = &RTCPSenderReportData{
NTPTimestamp: nowNTP,
RTPTimestamp: nowRTP,
@@ -1064,27 +1016,6 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
r.srFirst = r.srNewest
}
if isWarped {
packetDriftResult, reportDriftResult := r.getDrift()
r.logger.Infow(
"sending sender report, time warp",
"ntp", nowNTP.Time().String(),
"rtp", nowRTP,
"departure", now.String(),
"ntpDiffSinceLast", ntpDiffSinceLast.Seconds(),
"rtpDiffSinceLast", int32(rtpDiffSinceLast),
"departureDiffSinceLast", departureDiffSinceLast.Seconds(),
"expectedTimeDiffSinceLast", expectedTimeDiffSinceLast,
"packetDrift", packetDriftResult.String(),
"reportDrift", reportDriftResult.String(),
"highestTS", r.timestamp.GetExtendedHighest(),
"highestTime", r.highestTime.String(),
"calculatedClockRate", calculatedClockRate,
"nowRTPExt", nowRTPExt,
"nowRTPExtUsingRate", nowRTPExtUsingRate,
)
}
return &rtcp.SenderReport{
SSRC: ssrc,
NTPTime: uint64(nowNTP),
@@ -1352,12 +1283,7 @@ func (r *RTPStats) ToString() string {
str += ", rtt(ms):"
str += fmt.Sprintf("%d|%d", p.RttCurrent, p.RttMax)
str += ", drift(ms):"
str += fmt.Sprintf("%.2f", p.DriftMs)
str += ", sr(Hz):"
str += fmt.Sprintf("%.2f", p.SampleRate)
str += fmt.Sprintf(", pd: %s, rd: %s", RTPDriftToString(p.PacketDrift), RTPDriftToString(p.ReportDrift))
return str
}
@@ -1405,7 +1331,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
jitterTime := jitter / float64(r.params.ClockRate) * 1e6
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
packetDrift, _ := r.getDrift()
packetDrift, reportDrift := r.getDrift()
p := &livekit.RTPStats{
StartTime: timestamppb.New(r.startTime),
@@ -1448,8 +1374,8 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
LastFir: timestamppb.New(r.lastFir),
RttCurrent: r.rtt,
RttMax: r.maxRtt,
DriftMs: packetDrift.driftMs,
SampleRate: packetDrift.sampleRate,
PacketDrift: packetDrift,
ReportDrift: reportDrift,
}
gapsPresent := false
@@ -1636,22 +1562,42 @@ func (r *RTPStats) updateJitter(rtph *rtp.Header, packetTime time.Time) {
r.lastJitterRTP = rtph.Timestamp
}
func (r *RTPStats) getDrift() (packetDrift driftResult, reportDrift driftResult) {
packetDrift.timeSinceFirst = r.highestTime.Sub(r.firstTime)
packetDrift.rtpDiffSinceFirst = r.timestamp.GetExtendedHighest() - r.timestamp.GetExtendedStart()
packetDrift.driftSamples = int64(packetDrift.rtpDiffSinceFirst - uint64(packetDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9))
packetDrift.driftMs = (float64(packetDrift.driftSamples) * 1000) / float64(r.params.ClockRate)
if packetDrift.timeSinceFirst.Seconds() != 0 {
packetDrift.sampleRate = float64(packetDrift.rtpDiffSinceFirst) / packetDrift.timeSinceFirst.Seconds()
func (r *RTPStats) getDrift() (packetDrift *livekit.RTPDrift, reportDrift *livekit.RTPDrift) {
if !r.firstTime.IsZero() {
elapsed := r.highestTime.Sub(r.firstTime)
rtpClockTicks := r.timestamp.GetExtendedHighest() - r.timestamp.GetExtendedStart()
driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9))
if elapsed.Seconds() > 0.0 {
packetDrift = &livekit.RTPDrift{
StartTime: timestamppb.New(r.firstTime),
EndTime: timestamppb.New(r.highestTime),
Duration: elapsed.Seconds(),
StartTimestamp: r.timestamp.GetExtendedStart(),
EndTimestamp: r.timestamp.GetExtendedHighest(),
RtpClockTicks: rtpClockTicks,
DriftSamples: driftSamples,
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
ClockRate: float64(rtpClockTicks) / elapsed.Seconds(),
}
}
}
if r.srFirst != nil && r.srNewest != nil && r.srFirst.RTPTimestamp != r.srNewest.RTPTimestamp {
reportDrift.timeSinceFirst = r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time())
reportDrift.rtpDiffSinceFirst = r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt
reportDrift.driftSamples = int64(reportDrift.rtpDiffSinceFirst - uint64(reportDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9))
reportDrift.driftMs = (float64(reportDrift.driftSamples) * 1000) / float64(r.params.ClockRate)
if reportDrift.timeSinceFirst.Seconds() != 0 {
reportDrift.sampleRate = float64(reportDrift.rtpDiffSinceFirst) / reportDrift.timeSinceFirst.Seconds()
elapsed := r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time())
rtpClockTicks := r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt
driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9))
if elapsed.Seconds() > 0.0 {
reportDrift = &livekit.RTPDrift{
StartTime: timestamppb.New(r.srFirst.NTPTimestamp.Time()),
EndTime: timestamppb.New(r.srNewest.NTPTimestamp.Time()),
Duration: elapsed.Seconds(),
StartTimestamp: r.timestamp.GetExtendedStart(),
EndTimestamp: r.timestamp.GetExtendedHighest(),
RtpClockTicks: rtpClockTicks,
DriftSamples: driftSamples,
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
ClockRate: float64(rtpClockTicks) / elapsed.Seconds(),
}
}
}
return
@@ -1754,8 +1700,6 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
lastFir := time.Time{}
rtt := uint32(0)
maxRtt := uint32(0)
driftMs := float64(0.0)
sampleRate := float64(0.0)
for _, stats := range statsList {
if startTime.IsZero() || startTime.After(stats.StartTime.AsTime()) {
@@ -1822,9 +1766,6 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
if stats.RttMax > maxRtt {
maxRtt = stats.RttMax
}
driftMs += stats.DriftMs
sampleRate += stats.SampleRate
}
if endTime.IsZero() {
@@ -1887,8 +1828,7 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
LastFir: timestamppb.New(lastFir),
RttCurrent: rtt / uint32(len(statsList)),
RttMax: maxRtt,
DriftMs: driftMs / float64(len(statsList)),
SampleRate: sampleRate / float64(len(statsList)),
// no aggregation for drift calculations
}
}