From 7dc8a7f80c7dd3bc69a48bda10ff8e3eca2c590b Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 29 Aug 2023 00:30:24 +0530 Subject: [PATCH 1/2] Remove sender report warp logs. (#2007) * Remove sender report warp logs. They are not useful. Also replacing drift report with proper protocol and reporting both packet ad report drift. Need to dig more into out-of-order sender report sending. That requires some digging and understanding. * record time of anachronous report * more logging around out-of-order repair * log time of out-of-order received sender report * Update deps and place holder StartParticipantEgress --- go.mod | 2 +- go.sum | 4 +- pkg/service/egress.go | 5 + pkg/sfu/buffer/rtpstats.go | 210 +++++++++++++------------------------ 4 files changed, 83 insertions(+), 138 deletions(-) diff --git a/go.mod b/go.mod index 24d5bf8fb..021fba3cc 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 - github.com/livekit/protocol v1.6.1 + github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 31050de5f..602f95ac6 100644 --- a/go.sum +++ b/go.sum @@ -124,8 +124,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 h1:cHNvPzn6VHFcsHx8ZC9LwU/4jj22mW3LILrNg/y5A6I= github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= -github.com/livekit/protocol v1.6.1 h1:MjRg/UBmynE636In1GD9PbrF2u/C10WwaVIkObsZYtk= -github.com/livekit/protocol v1.6.1/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU= +github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 h1:QEzGhfIOmGdRw17xIldbYzb1MTsYuVfXSqz8FTyfjWQ= +github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/service/egress.go b/pkg/service/egress.go index 1ae4b5a69..ffe16f839 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -17,6 +17,7 @@ package service import ( "context" "encoding/json" + "errors" "fmt" "reflect" @@ -160,6 +161,10 @@ func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgre return ei, err } +func (s *EgressService) StartParticipantEgress(ctx context.Context, req *livekit.ParticipantEgressRequest) (*livekit.EgressInfo, error) { + return nil, errors.New("under development") +} + func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index e070a6fb9..2fdcbb4c5 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -17,7 +17,6 @@ package buffer import ( "errors" "fmt" - "math" "sync" "time" @@ -44,22 +43,16 @@ const ( // ------------------------------------------------------- -type driftResult struct { - timeSinceFirst time.Duration - rtpDiffSinceFirst uint64 - driftSamples int64 - driftMs float64 - sampleRate float64 -} +func RTPDriftToString(r *livekit.RTPDrift) string { + if r == nil { + return "-" + } -func (d driftResult) String() string { - return fmt.Sprintf("time: %+v, rtp: %d, driftSamples: %d, driftMs: %.02f, sampleRate: %.02f", - d.timeSinceFirst, - d.rtpDiffSinceFirst, - d.driftSamples, - d.driftMs, - d.sampleRate, - ) + str := fmt.Sprintf("t: %+v|%+v|%.2fs", r.StartTime.AsTime().Format(time.UnixDate), r.EndTime.AsTime().Format(time.UnixDate), r.Duration) + str += fmt.Sprintf(", ts: %d|%d|%d", r.StartTimestamp, r.EndTimestamp, r.RtpClockTicks) + str += fmt.Sprintf(", d: %d|%.2fms", r.DriftSamples, r.DriftMs) + str += fmt.Sprintf(", cr: %.2f", r.ClockRate) + return str } // ------------------------------------------------------- @@ -867,8 +860,10 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { "received anachronous sender report", "currentNTP", srData.NTPTimestamp.Time().String(), "currentRTP", srData.RTPTimestamp, + "currentAt", srData.At.String(), "lastNTP", r.srNewest.NTPTimestamp.Time().String(), "lastRTP", r.srNewest.RTPTimestamp, + "lastAt", r.srNewest.At.String(), ) return } @@ -891,63 +886,29 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt) - // monitor and log RTP timestamp anomalies - var ntpDiffSinceLast time.Duration - var rtpDiffSinceLast uint32 - var arrivalDiffSinceLast time.Duration - var expectedTimeDiffSinceLast float64 - var isWarped bool - if r.srNewest != nil { - if srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt { - // This can happen when a track is replaced with a null and then restored - - // i. e. muting replacing with null and unmute restoring the original track. - // Under such a condition reset the sender reports to start from this point. - // Resetting will ensure sample rate calculations do not go haywire due to negative time. - r.logger.Infow( - "received sender report, out-of-order, resetting", - "prevTSExt", r.srNewest.RTPTimestampExt, - "prevRTP", r.srNewest.RTPTimestamp, - "prevNTP", r.srNewest.NTPTimestamp.Time().String(), - "currTSExt", srDataCopy.RTPTimestampExt, - "currRTP", srDataCopy.RTPTimestamp, - "currNTP", srDataCopy.NTPTimestamp.Time().String(), - ) - r.srFirst = &srDataCopy - r.srNewest = &srDataCopy - } - - ntpDiffSinceLast = srDataCopy.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()) - rtpDiffSinceLast = srDataCopy.RTPTimestamp - r.srNewest.RTPTimestamp - arrivalDiffSinceLast = srDataCopy.At.Sub(r.srNewest.At) - expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate) - if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 { - // more than 200 ms away from expected delta - isWarped = true - } + if r.srNewest != nil && srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt { + // This can happen when a track is replaced with a null and then restored - + // i. e. muting replacing with null and unmute restoring the original track. + // Under such a condition reset the sender reports to start from this point. + // Resetting will ensure sample rate calculations do not go haywire due to negative time. + r.logger.Infow( + "received sender report, out-of-order, resetting", + "prevTSExt", r.srNewest.RTPTimestampExt, + "prevRTP", r.srNewest.RTPTimestamp, + "prevNTP", r.srNewest.NTPTimestamp.Time().String(), + "prevAt", r.srNewest.At.String(), + "currTSExt", srDataCopy.RTPTimestampExt, + "currRTP", srDataCopy.RTPTimestamp, + "currNTP", srDataCopy.NTPTimestamp.Time().String(), + "currentAt", srDataCopy.At.String(), + ) + r.srFirst = nil } r.srNewest = &srDataCopy if r.srFirst == nil { r.srFirst = &srDataCopy } - - if isWarped { - packetDriftResult, reportDriftResult := r.getDrift() - r.logger.Infow( - "received sender report, time warp", - "ntp", srData.NTPTimestamp.Time().String(), - "rtp", srData.RTPTimestamp, - "arrival", srData.At.String(), - "ntpDiffSinceLast", ntpDiffSinceLast.Seconds(), - "rtpDiffSinceLast", int32(rtpDiffSinceLast), - "arrivalDiffSinceLast", arrivalDiffSinceLast.Seconds(), - "expectedTimeDiffSinceLast", expectedTimeDiffSinceLast, - "packetDrift", packetDriftResult.String(), - "reportDrift", reportDriftResult.String(), - "highestTS", r.timestamp.GetExtendedHighest(), - "highestTime", r.highestTime.String(), - ) - } } func (r *RTPStats) GetRtcpSenderReportData() (srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) { @@ -996,6 +957,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) timeSinceHighest := now.Sub(r.highestTime) nowRTPExt := r.timestamp.GetExtendedHighest() + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9) + nowRTPExtUsingTime := nowRTPExt nowRTP := uint32(nowRTPExt) // It is possible that publisher is pacing at a slower rate. @@ -1030,30 +992,20 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) "currTSExt", nowRTPExt, "currRTP", nowRTP, "currNTP", nowNTP.Time().String(), + "timeNow", time.Now().String(), + "firstTime", r.firstTime.String(), + "timeSinceFirst", timeSinceFirst, + "highestTime", r.highestTime.String(), + "timeSinceHighest", timeSinceHighest, + "nowRTPExtUsingTime", nowRTPExtUsingTime, + "calculatedClockRate", calculatedClockRate, + "nowRTPExtUsingRate", nowRTPExtUsingRate, ) ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate)) nowRTP = uint32(nowRTPExt) } - // monitor and log RTP timestamp anomalies - var ntpDiffSinceLast time.Duration - var rtpDiffSinceLast uint64 - var departureDiffSinceLast time.Duration - var expectedTimeDiffSinceLast float64 - var isWarped bool - if r.srNewest != nil { - ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) - rtpDiffSinceLast = nowRTPExt - r.srNewest.RTPTimestampExt - departureDiffSinceLast = now.Sub(r.srNewest.At) - - expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate) - if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 { - // more than 200 ms away from expected delta - isWarped = true - } - } - r.srNewest = &RTCPSenderReportData{ NTPTimestamp: nowNTP, RTPTimestamp: nowRTP, @@ -1064,27 +1016,6 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) r.srFirst = r.srNewest } - if isWarped { - packetDriftResult, reportDriftResult := r.getDrift() - r.logger.Infow( - "sending sender report, time warp", - "ntp", nowNTP.Time().String(), - "rtp", nowRTP, - "departure", now.String(), - "ntpDiffSinceLast", ntpDiffSinceLast.Seconds(), - "rtpDiffSinceLast", int32(rtpDiffSinceLast), - "departureDiffSinceLast", departureDiffSinceLast.Seconds(), - "expectedTimeDiffSinceLast", expectedTimeDiffSinceLast, - "packetDrift", packetDriftResult.String(), - "reportDrift", reportDriftResult.String(), - "highestTS", r.timestamp.GetExtendedHighest(), - "highestTime", r.highestTime.String(), - "calculatedClockRate", calculatedClockRate, - "nowRTPExt", nowRTPExt, - "nowRTPExtUsingRate", nowRTPExtUsingRate, - ) - } - return &rtcp.SenderReport{ SSRC: ssrc, NTPTime: uint64(nowNTP), @@ -1352,12 +1283,7 @@ func (r *RTPStats) ToString() string { str += ", rtt(ms):" str += fmt.Sprintf("%d|%d", p.RttCurrent, p.RttMax) - str += ", drift(ms):" - str += fmt.Sprintf("%.2f", p.DriftMs) - - str += ", sr(Hz):" - str += fmt.Sprintf("%.2f", p.SampleRate) - + str += fmt.Sprintf(", pd: %s, rd: %s", RTPDriftToString(p.PacketDrift), RTPDriftToString(p.ReportDrift)) return str } @@ -1405,7 +1331,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { jitterTime := jitter / float64(r.params.ClockRate) * 1e6 maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 - packetDrift, _ := r.getDrift() + packetDrift, reportDrift := r.getDrift() p := &livekit.RTPStats{ StartTime: timestamppb.New(r.startTime), @@ -1448,8 +1374,8 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { LastFir: timestamppb.New(r.lastFir), RttCurrent: r.rtt, RttMax: r.maxRtt, - DriftMs: packetDrift.driftMs, - SampleRate: packetDrift.sampleRate, + PacketDrift: packetDrift, + ReportDrift: reportDrift, } gapsPresent := false @@ -1636,22 +1562,42 @@ func (r *RTPStats) updateJitter(rtph *rtp.Header, packetTime time.Time) { r.lastJitterRTP = rtph.Timestamp } -func (r *RTPStats) getDrift() (packetDrift driftResult, reportDrift driftResult) { - packetDrift.timeSinceFirst = r.highestTime.Sub(r.firstTime) - packetDrift.rtpDiffSinceFirst = r.timestamp.GetExtendedHighest() - r.timestamp.GetExtendedStart() - packetDrift.driftSamples = int64(packetDrift.rtpDiffSinceFirst - uint64(packetDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)) - packetDrift.driftMs = (float64(packetDrift.driftSamples) * 1000) / float64(r.params.ClockRate) - if packetDrift.timeSinceFirst.Seconds() != 0 { - packetDrift.sampleRate = float64(packetDrift.rtpDiffSinceFirst) / packetDrift.timeSinceFirst.Seconds() +func (r *RTPStats) getDrift() (packetDrift *livekit.RTPDrift, reportDrift *livekit.RTPDrift) { + if !r.firstTime.IsZero() { + elapsed := r.highestTime.Sub(r.firstTime) + rtpClockTicks := r.timestamp.GetExtendedHighest() - r.timestamp.GetExtendedStart() + driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9)) + if elapsed.Seconds() > 0.0 { + packetDrift = &livekit.RTPDrift{ + StartTime: timestamppb.New(r.firstTime), + EndTime: timestamppb.New(r.highestTime), + Duration: elapsed.Seconds(), + StartTimestamp: r.timestamp.GetExtendedStart(), + EndTimestamp: r.timestamp.GetExtendedHighest(), + RtpClockTicks: rtpClockTicks, + DriftSamples: driftSamples, + DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate), + ClockRate: float64(rtpClockTicks) / elapsed.Seconds(), + } + } } if r.srFirst != nil && r.srNewest != nil && r.srFirst.RTPTimestamp != r.srNewest.RTPTimestamp { - reportDrift.timeSinceFirst = r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()) - reportDrift.rtpDiffSinceFirst = r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt - reportDrift.driftSamples = int64(reportDrift.rtpDiffSinceFirst - uint64(reportDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)) - reportDrift.driftMs = (float64(reportDrift.driftSamples) * 1000) / float64(r.params.ClockRate) - if reportDrift.timeSinceFirst.Seconds() != 0 { - reportDrift.sampleRate = float64(reportDrift.rtpDiffSinceFirst) / reportDrift.timeSinceFirst.Seconds() + elapsed := r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()) + rtpClockTicks := r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt + driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9)) + if elapsed.Seconds() > 0.0 { + reportDrift = &livekit.RTPDrift{ + StartTime: timestamppb.New(r.srFirst.NTPTimestamp.Time()), + EndTime: timestamppb.New(r.srNewest.NTPTimestamp.Time()), + Duration: elapsed.Seconds(), + StartTimestamp: r.timestamp.GetExtendedStart(), + EndTimestamp: r.timestamp.GetExtendedHighest(), + RtpClockTicks: rtpClockTicks, + DriftSamples: driftSamples, + DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate), + ClockRate: float64(rtpClockTicks) / elapsed.Seconds(), + } } } return @@ -1754,8 +1700,6 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { lastFir := time.Time{} rtt := uint32(0) maxRtt := uint32(0) - driftMs := float64(0.0) - sampleRate := float64(0.0) for _, stats := range statsList { if startTime.IsZero() || startTime.After(stats.StartTime.AsTime()) { @@ -1822,9 +1766,6 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { if stats.RttMax > maxRtt { maxRtt = stats.RttMax } - - driftMs += stats.DriftMs - sampleRate += stats.SampleRate } if endTime.IsZero() { @@ -1887,8 +1828,7 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { LastFir: timestamppb.New(lastFir), RttCurrent: rtt / uint32(len(statsList)), RttMax: maxRtt, - DriftMs: driftMs / float64(len(statsList)), - SampleRate: sampleRate / float64(len(statsList)), + // no aggregation for drift calculations } } From f0ca262bcf987f9e89f0e32d1f06ffdf747be5a4 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 29 Aug 2023 13:21:57 +0530 Subject: [PATCH 2/2] Prevent erroneous stream pause. (#2008) --- pkg/sfu/downtrack.go | 2 +- pkg/sfu/forwarder.go | 12 +-- pkg/sfu/forwarder_test.go | 51 +++++++---- pkg/sfu/streamallocator/streamallocator.go | 98 ++++++++++++++++------ pkg/sfu/streamallocator/track.go | 2 +- 5 files changed, 114 insertions(+), 51 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 707640bd2..44414d7c8 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1135,7 +1135,7 @@ func (d *DownTrack) ProvisionalAllocateReset() { d.forwarder.ProvisionalAllocateReset() } -func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 { +func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) { return d.forwarder.ProvisionalAllocate(availableChannelCapacity, layers, allowPause, allowOvershoot) } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index eab995bb8..9421f238c 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -728,7 +728,7 @@ func (f *Forwarder) ProvisionalAllocateReset() { f.provisional.allocatedLayer = buffer.InvalidLayer } -func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 { +func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) { f.lock.Lock() defer f.lock.Unlock() @@ -737,12 +737,12 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu f.provisional.maxSeenLayer.Spatial == buffer.InvalidLayerSpatial || !f.provisional.maxLayer.IsValid() || ((!allowOvershoot || !f.vls.IsOvershootOkay()) && layer.GreaterThan(f.provisional.maxLayer)) { - return 0 + return false, 0 } requiredBitrate := f.provisional.Bitrates[layer.Spatial][layer.Temporal] if requiredBitrate == 0 { - return 0 + return false, 0 } alreadyAllocatedBitrate := int64(0) @@ -753,7 +753,7 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu // a layer under maximum fits, take it if !layer.GreaterThan(f.provisional.maxLayer) && requiredBitrate <= (availableChannelCapacity+alreadyAllocatedBitrate) { f.provisional.allocatedLayer = layer - return requiredBitrate - alreadyAllocatedBitrate + return true, requiredBitrate - alreadyAllocatedBitrate } // @@ -766,10 +766,10 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu // if !allowPause && (!f.provisional.allocatedLayer.IsValid() || !layer.GreaterThan(f.provisional.allocatedLayer)) { f.provisional.allocatedLayer = layer - return requiredBitrate - alreadyAllocatedBitrate + return true, requiredBitrate - alreadyAllocatedBitrate } - return 0 + return false, 0 } func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition { diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 9c3be7c7a..b0e9bc2ea 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -400,20 +400,25 @@ func TestForwarderProvisionalAllocate(t *testing.T) { f.ProvisionalAllocatePrepare(nil, bitrates) - usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false) + isCandidate, usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false) + require.True(t, isCandidate) require.Equal(t, bitrates[0][0], usedBitrate) - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, true, false) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, true, false) + require.True(t, isCandidate) require.Equal(t, bitrates[2][3]-bitrates[0][0], usedBitrate) - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 3}, true, false) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 3}, true, false) + require.True(t, isCandidate) require.Equal(t, bitrates[0][3]-bitrates[2][3], usedBitrate) - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, false) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, false) + require.True(t, isCandidate) require.Equal(t, bitrates[1][2]-bitrates[0][3], usedBitrate) // available not enough to reach (2, 2), allocating at (2, 2) should not succeed - usedBitrate = f.ProvisionalAllocate(bitrates[2][2]-bitrates[1][2]-1, buffer.VideoLayer{Spatial: 2, Temporal: 2}, true, false) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][2]-bitrates[1][2]-1, buffer.VideoLayer{Spatial: 2, Temporal: 2}, true, false) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // committing should set target to (1, 2) @@ -440,7 +445,8 @@ func TestForwarderProvisionalAllocate(t *testing.T) { // when nothing fits and pausing disallowed, should allocate (0, 0) f.vls.SetTarget(buffer.InvalidLayer) f.ProvisionalAllocatePrepare(nil, bitrates) - usedBitrate = f.ProvisionalAllocate(0, buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, false) + isCandidate, usedBitrate = f.ProvisionalAllocate(0, buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, false) + require.True(t, isCandidate) require.Equal(t, int64(1), usedBitrate) // committing should set target to (0, 0) @@ -477,15 +483,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) { f.ProvisionalAllocatePrepare(nil, bitrates) - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // overshoot should succeed - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true) + require.True(t, isCandidate) require.Equal(t, bitrates[2][3], usedBitrate) // overshoot should succeed - this should win as this is lesser overshoot - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true) + require.True(t, isCandidate) require.Equal(t, bitrates[1][3]-bitrates[2][3], usedBitrate) // committing should set target to (1, 3) @@ -524,15 +533,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) { f.ProvisionalAllocatePrepare(nil, bitrates) // all the provisional allocations should not succeed because the feed is dry - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // overshoot should not succeed - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // overshoot should not succeed - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // committing should set target to (0, 2), i. e. leave it at current for opportunistic forwarding @@ -562,15 +574,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) { f.ProvisionalAllocatePrepare(nil, bitrates) // all the provisional allocations below should not succeed because the feed is dry - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // overshoot should not succeed - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // overshoot should not succeed - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) expectedResult = VideoAllocation{ @@ -604,10 +619,12 @@ func TestForwarderProvisionalAllocateMute(t *testing.T) { f.Mute(true) f.ProvisionalAllocatePrepare(nil, bitrates) - usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false) + isCandidate, usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // committing should set target to buffer.InvalidLayer as track is muted diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 6751f29cb..f9baf7e5c 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -883,9 +883,15 @@ func (s *StreamAllocator) allocateTrack(track *Track) { return } - // this track is currently not streaming and needs bits to start. - // first try an allocation using available headroom - availableChannelCapacity := s.getAvailableHeadroom(false) + // already streaming at some layer and transition is not requesting any change, i. e. BandwidthDelta == 0 + if transition.From.IsValid() && transition.BandwidthDelta == 0 { + return + } + + // this track is currently not streaming and needs bits to start OR streaming at some layer and wants more bits. + // NOTE: With co-operative transition, tracks should not be asking for more if already streaming, but handle that case any way. + // first try an allocation using available headroom, current consumption of this track is discounted to calculate headroom. + availableChannelCapacity := s.getAvailableHeadroomWithoutTracks(false, []*Track{track}) if availableChannelCapacity > 0 { track.ProvisionalAllocateReset() // to reset allocation from co-operative transition above and try fresh @@ -899,21 +905,30 @@ func (s *StreamAllocator) allocateTrack(track *Track) { Temporal: temporal, } - usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient) + isCandidate, usedChannelCapacity := track.ProvisionalAllocate( + availableChannelCapacity, + layer, + s.allowPause, + FlagAllowOvershootWhileDeficient, + ) if availableChannelCapacity < usedChannelCapacity { break alloc_loop } - bestLayer = layer + if isCandidate { + bestLayer = layer + } } } if bestLayer.IsValid() { - // found layer that can fit in available headroom - update := NewStreamStateUpdate() - allocation := track.ProvisionalAllocateCommit() - updateStreamStateChange(track, allocation, update) - s.maybeSendUpdate(update) + if bestLayer.GreaterThan(transition.From) { + // found layer that can fit in available headroom, take it if it is better than existing + update := NewStreamStateUpdate() + allocation := track.ProvisionalAllocateCommit() + updateStreamStateChange(track, allocation, update) + s.maybeSendUpdate(update) + } s.adjustState() return @@ -923,11 +938,6 @@ func (s *StreamAllocator) allocateTrack(track *Track) { transition = track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient) // get transition again to reset above allocation attempt using available headroom } - // track is currently streaming at minimum - if transition.BandwidthDelta == 0 { - return - } - // if there is not enough headroom, try to redistribute starting with tracks that are closest to their desired. bandwidthAcquired := int64(0) var contributingTracks []*Track @@ -1018,17 +1028,31 @@ func (s *StreamAllocator) maybeBoostDeficientTracks() { update := NewStreamStateUpdate() - for _, track := range s.getMaxDistanceSortedDeficient() { - allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup) - if !boosted { - continue + sortedTracks := s.getMaxDistanceSortedDeficient() +boost_loop: + for { + for idx, track := range sortedTracks { + allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup) + if !boosted { + if idx == len(sortedTracks)-1 { + // all tracks tried + break boost_loop + } + continue + } + + updateStreamStateChange(track, allocation, update) + + availableChannelCapacity -= allocation.BandwidthDelta + if availableChannelCapacity <= 0 { + break boost_loop + } + + break // sort again below as the track that was just boosted could still be farthest from its desired } - - updateStreamStateChange(track, allocation, update) - - availableChannelCapacity -= allocation.BandwidthDelta - if availableChannelCapacity <= 0 { - break + sortedTracks = s.getMaxDistanceSortedDeficient() + if len(sortedTracks) == 0 { + break // nothing available to boost } } @@ -1103,7 +1127,7 @@ func (s *StreamAllocator) allocateAllTracks() { } for _, track := range sorted { - usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient) + _, usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient) availableChannelCapacity -= usedChannelCapacity if availableChannelCapacity < 0 { availableChannelCapacity = 0 @@ -1174,10 +1198,32 @@ func (s *StreamAllocator) getExpectedBandwidthUsage() int64 { return expected } +func (s *StreamAllocator) getExpectedBandwidthUsageWithoutTracks(filteredTracks []*Track) int64 { + expected := int64(0) + for _, track := range s.getTracks() { + filtered := false + for _, ft := range filteredTracks { + if ft == track { + filtered = true + break + } + } + if !filtered { + expected += track.BandwidthRequested() + } + } + + return expected +} + func (s *StreamAllocator) getAvailableHeadroom(allowOverride bool) int64 { return s.getAvailableChannelCapacity(allowOverride) - s.getExpectedBandwidthUsage() } +func (s *StreamAllocator) getAvailableHeadroomWithoutTracks(allowOverride bool, filteredTracks []*Track) int64 { + return s.getAvailableChannelCapacity(allowOverride) - s.getExpectedBandwidthUsageWithoutTracks(filteredTracks) +} + func (s *StreamAllocator) getNackDelta() (uint32, uint32) { aggPacketDelta := uint32(0) aggRepeatedNackDelta := uint32(0) diff --git a/pkg/sfu/streamallocator/track.go b/pkg/sfu/streamallocator/track.go index 6ccae215b..528792928 100644 --- a/pkg/sfu/streamallocator/track.go +++ b/pkg/sfu/streamallocator/track.go @@ -164,7 +164,7 @@ func (t *Track) ProvisionalAllocateReset() { t.downTrack.ProvisionalAllocateReset() } -func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 { +func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) { return t.downTrack.ProvisionalAllocate(availableChannelCapacity, layer, allowPause, allowOvershoot) }