From 7dc8a7f80c7dd3bc69a48bda10ff8e3eca2c590b Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 29 Aug 2023 00:30:24 +0530 Subject: [PATCH] Remove sender report warp logs. (#2007) * Remove sender report warp logs. They are not useful. Also replacing drift report with proper protocol and reporting both packet ad report drift. Need to dig more into out-of-order sender report sending. That requires some digging and understanding. * record time of anachronous report * more logging around out-of-order repair * log time of out-of-order received sender report * Update deps and place holder StartParticipantEgress --- go.mod | 2 +- go.sum | 4 +- pkg/service/egress.go | 5 + pkg/sfu/buffer/rtpstats.go | 210 +++++++++++++------------------------ 4 files changed, 83 insertions(+), 138 deletions(-) diff --git a/go.mod b/go.mod index 24d5bf8fb..021fba3cc 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 - github.com/livekit/protocol v1.6.1 + github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 31050de5f..602f95ac6 100644 --- a/go.sum +++ b/go.sum @@ -124,8 +124,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 h1:cHNvPzn6VHFcsHx8ZC9LwU/4jj22mW3LILrNg/y5A6I= github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= -github.com/livekit/protocol v1.6.1 h1:MjRg/UBmynE636In1GD9PbrF2u/C10WwaVIkObsZYtk= -github.com/livekit/protocol v1.6.1/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU= +github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 h1:QEzGhfIOmGdRw17xIldbYzb1MTsYuVfXSqz8FTyfjWQ= +github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/service/egress.go b/pkg/service/egress.go index 1ae4b5a69..ffe16f839 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -17,6 +17,7 @@ package service import ( "context" "encoding/json" + "errors" "fmt" "reflect" @@ -160,6 +161,10 @@ func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgre return ei, err } +func (s *EgressService) StartParticipantEgress(ctx context.Context, req *livekit.ParticipantEgressRequest) (*livekit.EgressInfo, error) { + return nil, errors.New("under development") +} + func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index e070a6fb9..2fdcbb4c5 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -17,7 +17,6 @@ package buffer import ( "errors" "fmt" - "math" "sync" "time" @@ -44,22 +43,16 @@ const ( // ------------------------------------------------------- -type driftResult struct { - timeSinceFirst time.Duration - rtpDiffSinceFirst uint64 - driftSamples int64 - driftMs float64 - sampleRate float64 -} +func RTPDriftToString(r *livekit.RTPDrift) string { + if r == nil { + return "-" + } -func (d driftResult) String() string { - return fmt.Sprintf("time: %+v, rtp: %d, driftSamples: %d, driftMs: %.02f, sampleRate: %.02f", - d.timeSinceFirst, - d.rtpDiffSinceFirst, - d.driftSamples, - d.driftMs, - d.sampleRate, - ) + str := fmt.Sprintf("t: %+v|%+v|%.2fs", r.StartTime.AsTime().Format(time.UnixDate), r.EndTime.AsTime().Format(time.UnixDate), r.Duration) + str += fmt.Sprintf(", ts: %d|%d|%d", r.StartTimestamp, r.EndTimestamp, r.RtpClockTicks) + str += fmt.Sprintf(", d: %d|%.2fms", r.DriftSamples, r.DriftMs) + str += fmt.Sprintf(", cr: %.2f", r.ClockRate) + return str } // ------------------------------------------------------- @@ -867,8 +860,10 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { "received anachronous sender report", "currentNTP", srData.NTPTimestamp.Time().String(), "currentRTP", srData.RTPTimestamp, + "currentAt", srData.At.String(), "lastNTP", r.srNewest.NTPTimestamp.Time().String(), "lastRTP", r.srNewest.RTPTimestamp, + "lastAt", r.srNewest.At.String(), ) return } @@ -891,63 +886,29 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt) - // monitor and log RTP timestamp anomalies - var ntpDiffSinceLast time.Duration - var rtpDiffSinceLast uint32 - var arrivalDiffSinceLast time.Duration - var expectedTimeDiffSinceLast float64 - var isWarped bool - if r.srNewest != nil { - if srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt { - // This can happen when a track is replaced with a null and then restored - - // i. e. muting replacing with null and unmute restoring the original track. - // Under such a condition reset the sender reports to start from this point. - // Resetting will ensure sample rate calculations do not go haywire due to negative time. - r.logger.Infow( - "received sender report, out-of-order, resetting", - "prevTSExt", r.srNewest.RTPTimestampExt, - "prevRTP", r.srNewest.RTPTimestamp, - "prevNTP", r.srNewest.NTPTimestamp.Time().String(), - "currTSExt", srDataCopy.RTPTimestampExt, - "currRTP", srDataCopy.RTPTimestamp, - "currNTP", srDataCopy.NTPTimestamp.Time().String(), - ) - r.srFirst = &srDataCopy - r.srNewest = &srDataCopy - } - - ntpDiffSinceLast = srDataCopy.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()) - rtpDiffSinceLast = srDataCopy.RTPTimestamp - r.srNewest.RTPTimestamp - arrivalDiffSinceLast = srDataCopy.At.Sub(r.srNewest.At) - expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate) - if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 { - // more than 200 ms away from expected delta - isWarped = true - } + if r.srNewest != nil && srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt { + // This can happen when a track is replaced with a null and then restored - + // i. e. muting replacing with null and unmute restoring the original track. + // Under such a condition reset the sender reports to start from this point. + // Resetting will ensure sample rate calculations do not go haywire due to negative time. + r.logger.Infow( + "received sender report, out-of-order, resetting", + "prevTSExt", r.srNewest.RTPTimestampExt, + "prevRTP", r.srNewest.RTPTimestamp, + "prevNTP", r.srNewest.NTPTimestamp.Time().String(), + "prevAt", r.srNewest.At.String(), + "currTSExt", srDataCopy.RTPTimestampExt, + "currRTP", srDataCopy.RTPTimestamp, + "currNTP", srDataCopy.NTPTimestamp.Time().String(), + "currentAt", srDataCopy.At.String(), + ) + r.srFirst = nil } r.srNewest = &srDataCopy if r.srFirst == nil { r.srFirst = &srDataCopy } - - if isWarped { - packetDriftResult, reportDriftResult := r.getDrift() - r.logger.Infow( - "received sender report, time warp", - "ntp", srData.NTPTimestamp.Time().String(), - "rtp", srData.RTPTimestamp, - "arrival", srData.At.String(), - "ntpDiffSinceLast", ntpDiffSinceLast.Seconds(), - "rtpDiffSinceLast", int32(rtpDiffSinceLast), - "arrivalDiffSinceLast", arrivalDiffSinceLast.Seconds(), - "expectedTimeDiffSinceLast", expectedTimeDiffSinceLast, - "packetDrift", packetDriftResult.String(), - "reportDrift", reportDriftResult.String(), - "highestTS", r.timestamp.GetExtendedHighest(), - "highestTime", r.highestTime.String(), - ) - } } func (r *RTPStats) GetRtcpSenderReportData() (srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) { @@ -996,6 +957,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) timeSinceHighest := now.Sub(r.highestTime) nowRTPExt := r.timestamp.GetExtendedHighest() + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9) + nowRTPExtUsingTime := nowRTPExt nowRTP := uint32(nowRTPExt) // It is possible that publisher is pacing at a slower rate. @@ -1030,30 +992,20 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) "currTSExt", nowRTPExt, "currRTP", nowRTP, "currNTP", nowNTP.Time().String(), + "timeNow", time.Now().String(), + "firstTime", r.firstTime.String(), + "timeSinceFirst", timeSinceFirst, + "highestTime", r.highestTime.String(), + "timeSinceHighest", timeSinceHighest, + "nowRTPExtUsingTime", nowRTPExtUsingTime, + "calculatedClockRate", calculatedClockRate, + "nowRTPExtUsingRate", nowRTPExtUsingRate, ) ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate)) nowRTP = uint32(nowRTPExt) } - // monitor and log RTP timestamp anomalies - var ntpDiffSinceLast time.Duration - var rtpDiffSinceLast uint64 - var departureDiffSinceLast time.Duration - var expectedTimeDiffSinceLast float64 - var isWarped bool - if r.srNewest != nil { - ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) - rtpDiffSinceLast = nowRTPExt - r.srNewest.RTPTimestampExt - departureDiffSinceLast = now.Sub(r.srNewest.At) - - expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate) - if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 { - // more than 200 ms away from expected delta - isWarped = true - } - } - r.srNewest = &RTCPSenderReportData{ NTPTimestamp: nowNTP, RTPTimestamp: nowRTP, @@ -1064,27 +1016,6 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) r.srFirst = r.srNewest } - if isWarped { - packetDriftResult, reportDriftResult := r.getDrift() - r.logger.Infow( - "sending sender report, time warp", - "ntp", nowNTP.Time().String(), - "rtp", nowRTP, - "departure", now.String(), - "ntpDiffSinceLast", ntpDiffSinceLast.Seconds(), - "rtpDiffSinceLast", int32(rtpDiffSinceLast), - "departureDiffSinceLast", departureDiffSinceLast.Seconds(), - "expectedTimeDiffSinceLast", expectedTimeDiffSinceLast, - "packetDrift", packetDriftResult.String(), - "reportDrift", reportDriftResult.String(), - "highestTS", r.timestamp.GetExtendedHighest(), - "highestTime", r.highestTime.String(), - "calculatedClockRate", calculatedClockRate, - "nowRTPExt", nowRTPExt, - "nowRTPExtUsingRate", nowRTPExtUsingRate, - ) - } - return &rtcp.SenderReport{ SSRC: ssrc, NTPTime: uint64(nowNTP), @@ -1352,12 +1283,7 @@ func (r *RTPStats) ToString() string { str += ", rtt(ms):" str += fmt.Sprintf("%d|%d", p.RttCurrent, p.RttMax) - str += ", drift(ms):" - str += fmt.Sprintf("%.2f", p.DriftMs) - - str += ", sr(Hz):" - str += fmt.Sprintf("%.2f", p.SampleRate) - + str += fmt.Sprintf(", pd: %s, rd: %s", RTPDriftToString(p.PacketDrift), RTPDriftToString(p.ReportDrift)) return str } @@ -1405,7 +1331,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { jitterTime := jitter / float64(r.params.ClockRate) * 1e6 maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 - packetDrift, _ := r.getDrift() + packetDrift, reportDrift := r.getDrift() p := &livekit.RTPStats{ StartTime: timestamppb.New(r.startTime), @@ -1448,8 +1374,8 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { LastFir: timestamppb.New(r.lastFir), RttCurrent: r.rtt, RttMax: r.maxRtt, - DriftMs: packetDrift.driftMs, - SampleRate: packetDrift.sampleRate, + PacketDrift: packetDrift, + ReportDrift: reportDrift, } gapsPresent := false @@ -1636,22 +1562,42 @@ func (r *RTPStats) updateJitter(rtph *rtp.Header, packetTime time.Time) { r.lastJitterRTP = rtph.Timestamp } -func (r *RTPStats) getDrift() (packetDrift driftResult, reportDrift driftResult) { - packetDrift.timeSinceFirst = r.highestTime.Sub(r.firstTime) - packetDrift.rtpDiffSinceFirst = r.timestamp.GetExtendedHighest() - r.timestamp.GetExtendedStart() - packetDrift.driftSamples = int64(packetDrift.rtpDiffSinceFirst - uint64(packetDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)) - packetDrift.driftMs = (float64(packetDrift.driftSamples) * 1000) / float64(r.params.ClockRate) - if packetDrift.timeSinceFirst.Seconds() != 0 { - packetDrift.sampleRate = float64(packetDrift.rtpDiffSinceFirst) / packetDrift.timeSinceFirst.Seconds() +func (r *RTPStats) getDrift() (packetDrift *livekit.RTPDrift, reportDrift *livekit.RTPDrift) { + if !r.firstTime.IsZero() { + elapsed := r.highestTime.Sub(r.firstTime) + rtpClockTicks := r.timestamp.GetExtendedHighest() - r.timestamp.GetExtendedStart() + driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9)) + if elapsed.Seconds() > 0.0 { + packetDrift = &livekit.RTPDrift{ + StartTime: timestamppb.New(r.firstTime), + EndTime: timestamppb.New(r.highestTime), + Duration: elapsed.Seconds(), + StartTimestamp: r.timestamp.GetExtendedStart(), + EndTimestamp: r.timestamp.GetExtendedHighest(), + RtpClockTicks: rtpClockTicks, + DriftSamples: driftSamples, + DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate), + ClockRate: float64(rtpClockTicks) / elapsed.Seconds(), + } + } } if r.srFirst != nil && r.srNewest != nil && r.srFirst.RTPTimestamp != r.srNewest.RTPTimestamp { - reportDrift.timeSinceFirst = r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()) - reportDrift.rtpDiffSinceFirst = r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt - reportDrift.driftSamples = int64(reportDrift.rtpDiffSinceFirst - uint64(reportDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)) - reportDrift.driftMs = (float64(reportDrift.driftSamples) * 1000) / float64(r.params.ClockRate) - if reportDrift.timeSinceFirst.Seconds() != 0 { - reportDrift.sampleRate = float64(reportDrift.rtpDiffSinceFirst) / reportDrift.timeSinceFirst.Seconds() + elapsed := r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()) + rtpClockTicks := r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt + driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9)) + if elapsed.Seconds() > 0.0 { + reportDrift = &livekit.RTPDrift{ + StartTime: timestamppb.New(r.srFirst.NTPTimestamp.Time()), + EndTime: timestamppb.New(r.srNewest.NTPTimestamp.Time()), + Duration: elapsed.Seconds(), + StartTimestamp: r.timestamp.GetExtendedStart(), + EndTimestamp: r.timestamp.GetExtendedHighest(), + RtpClockTicks: rtpClockTicks, + DriftSamples: driftSamples, + DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate), + ClockRate: float64(rtpClockTicks) / elapsed.Seconds(), + } } } return @@ -1754,8 +1700,6 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { lastFir := time.Time{} rtt := uint32(0) maxRtt := uint32(0) - driftMs := float64(0.0) - sampleRate := float64(0.0) for _, stats := range statsList { if startTime.IsZero() || startTime.After(stats.StartTime.AsTime()) { @@ -1822,9 +1766,6 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { if stats.RttMax > maxRtt { maxRtt = stats.RttMax } - - driftMs += stats.DriftMs - sampleRate += stats.SampleRate } if endTime.IsZero() { @@ -1887,8 +1828,7 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { LastFir: timestamppb.New(lastFir), RttCurrent: rtt / uint32(len(statsList)), RttMax: maxRtt, - DriftMs: driftMs / float64(len(statsList)), - SampleRate: sampleRate / float64(len(statsList)), + // no aggregation for drift calculations } }