Merge remote-tracking branch 'origin/master' into raja_min_packets

This commit is contained in:
boks1971
2023-08-29 17:09:31 +05:30
9 changed files with 197 additions and 189 deletions
+1 -1
View File
@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0
github.com/livekit/protocol v1.6.1
github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86
github.com/livekit/psrpc v0.3.3
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
+2 -2
View File
@@ -124,8 +124,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 h1:cHNvPzn6VHFcsHx8ZC9LwU/4jj22mW3LILrNg/y5A6I=
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14=
github.com/livekit/protocol v1.6.1 h1:MjRg/UBmynE636In1GD9PbrF2u/C10WwaVIkObsZYtk=
github.com/livekit/protocol v1.6.1/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU=
github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 h1:QEzGhfIOmGdRw17xIldbYzb1MTsYuVfXSqz8FTyfjWQ=
github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU=
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
+5
View File
@@ -17,6 +17,7 @@ package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
@@ -160,6 +161,10 @@ func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgre
return ei, err
}
func (s *EgressService) StartParticipantEgress(ctx context.Context, req *livekit.ParticipantEgressRequest) (*livekit.EgressInfo, error) {
return nil, errors.New("under development")
}
func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
+75 -135
View File
@@ -17,7 +17,6 @@ package buffer
import (
"errors"
"fmt"
"math"
"sync"
"time"
@@ -44,22 +43,16 @@ const (
// -------------------------------------------------------
type driftResult struct {
timeSinceFirst time.Duration
rtpDiffSinceFirst uint64
driftSamples int64
driftMs float64
sampleRate float64
}
func RTPDriftToString(r *livekit.RTPDrift) string {
if r == nil {
return "-"
}
func (d driftResult) String() string {
return fmt.Sprintf("time: %+v, rtp: %d, driftSamples: %d, driftMs: %.02f, sampleRate: %.02f",
d.timeSinceFirst,
d.rtpDiffSinceFirst,
d.driftSamples,
d.driftMs,
d.sampleRate,
)
str := fmt.Sprintf("t: %+v|%+v|%.2fs", r.StartTime.AsTime().Format(time.UnixDate), r.EndTime.AsTime().Format(time.UnixDate), r.Duration)
str += fmt.Sprintf(", ts: %d|%d|%d", r.StartTimestamp, r.EndTimestamp, r.RtpClockTicks)
str += fmt.Sprintf(", d: %d|%.2fms", r.DriftSamples, r.DriftMs)
str += fmt.Sprintf(", cr: %.2f", r.ClockRate)
return str
}
// -------------------------------------------------------
@@ -867,8 +860,10 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
"received anachronous sender report",
"currentNTP", srData.NTPTimestamp.Time().String(),
"currentRTP", srData.RTPTimestamp,
"currentAt", srData.At.String(),
"lastNTP", r.srNewest.NTPTimestamp.Time().String(),
"lastRTP", r.srNewest.RTPTimestamp,
"lastAt", r.srNewest.At.String(),
)
return
}
@@ -891,63 +886,29 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt)
// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
var rtpDiffSinceLast uint32
var arrivalDiffSinceLast time.Duration
var expectedTimeDiffSinceLast float64
var isWarped bool
if r.srNewest != nil {
if srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt {
// This can happen when a track is replaced with a null and then restored -
// i. e. muting replacing with null and unmute restoring the original track.
// Under such a condition reset the sender reports to start from this point.
// Resetting will ensure sample rate calculations do not go haywire due to negative time.
r.logger.Infow(
"received sender report, out-of-order, resetting",
"prevTSExt", r.srNewest.RTPTimestampExt,
"prevRTP", r.srNewest.RTPTimestamp,
"prevNTP", r.srNewest.NTPTimestamp.Time().String(),
"currTSExt", srDataCopy.RTPTimestampExt,
"currRTP", srDataCopy.RTPTimestamp,
"currNTP", srDataCopy.NTPTimestamp.Time().String(),
)
r.srFirst = &srDataCopy
r.srNewest = &srDataCopy
}
ntpDiffSinceLast = srDataCopy.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiffSinceLast = srDataCopy.RTPTimestamp - r.srNewest.RTPTimestamp
arrivalDiffSinceLast = srDataCopy.At.Sub(r.srNewest.At)
expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 {
// more than 200 ms away from expected delta
isWarped = true
}
if r.srNewest != nil && srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt {
// This can happen when a track is replaced with a null and then restored -
// i. e. muting replacing with null and unmute restoring the original track.
// Under such a condition reset the sender reports to start from this point.
// Resetting will ensure sample rate calculations do not go haywire due to negative time.
r.logger.Infow(
"received sender report, out-of-order, resetting",
"prevTSExt", r.srNewest.RTPTimestampExt,
"prevRTP", r.srNewest.RTPTimestamp,
"prevNTP", r.srNewest.NTPTimestamp.Time().String(),
"prevAt", r.srNewest.At.String(),
"currTSExt", srDataCopy.RTPTimestampExt,
"currRTP", srDataCopy.RTPTimestamp,
"currNTP", srDataCopy.NTPTimestamp.Time().String(),
"currentAt", srDataCopy.At.String(),
)
r.srFirst = nil
}
r.srNewest = &srDataCopy
if r.srFirst == nil {
r.srFirst = &srDataCopy
}
if isWarped {
packetDriftResult, reportDriftResult := r.getDrift()
r.logger.Infow(
"received sender report, time warp",
"ntp", srData.NTPTimestamp.Time().String(),
"rtp", srData.RTPTimestamp,
"arrival", srData.At.String(),
"ntpDiffSinceLast", ntpDiffSinceLast.Seconds(),
"rtpDiffSinceLast", int32(rtpDiffSinceLast),
"arrivalDiffSinceLast", arrivalDiffSinceLast.Seconds(),
"expectedTimeDiffSinceLast", expectedTimeDiffSinceLast,
"packetDrift", packetDriftResult.String(),
"reportDrift", reportDriftResult.String(),
"highestTS", r.timestamp.GetExtendedHighest(),
"highestTime", r.highestTime.String(),
)
}
}
func (r *RTPStats) GetRtcpSenderReportData() (srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) {
@@ -996,6 +957,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
timeSinceHighest := now.Sub(r.highestTime)
nowRTPExt := r.timestamp.GetExtendedHighest() + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
nowRTPExtUsingTime := nowRTPExt
nowRTP := uint32(nowRTPExt)
// It is possible that publisher is pacing at a slower rate.
@@ -1030,30 +992,20 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
"currTSExt", nowRTPExt,
"currRTP", nowRTP,
"currNTP", nowNTP.Time().String(),
"timeNow", time.Now().String(),
"firstTime", r.firstTime.String(),
"timeSinceFirst", timeSinceFirst,
"highestTime", r.highestTime.String(),
"timeSinceHighest", timeSinceHighest,
"nowRTPExtUsingTime", nowRTPExtUsingTime,
"calculatedClockRate", calculatedClockRate,
"nowRTPExtUsingRate", nowRTPExtUsingRate,
)
ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate))
nowRTP = uint32(nowRTPExt)
}
// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
var rtpDiffSinceLast uint64
var departureDiffSinceLast time.Duration
var expectedTimeDiffSinceLast float64
var isWarped bool
if r.srNewest != nil {
ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiffSinceLast = nowRTPExt - r.srNewest.RTPTimestampExt
departureDiffSinceLast = now.Sub(r.srNewest.At)
expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 {
// more than 200 ms away from expected delta
isWarped = true
}
}
r.srNewest = &RTCPSenderReportData{
NTPTimestamp: nowNTP,
RTPTimestamp: nowRTP,
@@ -1064,27 +1016,6 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
r.srFirst = r.srNewest
}
if isWarped {
packetDriftResult, reportDriftResult := r.getDrift()
r.logger.Infow(
"sending sender report, time warp",
"ntp", nowNTP.Time().String(),
"rtp", nowRTP,
"departure", now.String(),
"ntpDiffSinceLast", ntpDiffSinceLast.Seconds(),
"rtpDiffSinceLast", int32(rtpDiffSinceLast),
"departureDiffSinceLast", departureDiffSinceLast.Seconds(),
"expectedTimeDiffSinceLast", expectedTimeDiffSinceLast,
"packetDrift", packetDriftResult.String(),
"reportDrift", reportDriftResult.String(),
"highestTS", r.timestamp.GetExtendedHighest(),
"highestTime", r.highestTime.String(),
"calculatedClockRate", calculatedClockRate,
"nowRTPExt", nowRTPExt,
"nowRTPExtUsingRate", nowRTPExtUsingRate,
)
}
return &rtcp.SenderReport{
SSRC: ssrc,
NTPTime: uint64(nowNTP),
@@ -1352,12 +1283,7 @@ func (r *RTPStats) ToString() string {
str += ", rtt(ms):"
str += fmt.Sprintf("%d|%d", p.RttCurrent, p.RttMax)
str += ", drift(ms):"
str += fmt.Sprintf("%.2f", p.DriftMs)
str += ", sr(Hz):"
str += fmt.Sprintf("%.2f", p.SampleRate)
str += fmt.Sprintf(", pd: %s, rd: %s", RTPDriftToString(p.PacketDrift), RTPDriftToString(p.ReportDrift))
return str
}
@@ -1405,7 +1331,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
jitterTime := jitter / float64(r.params.ClockRate) * 1e6
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
packetDrift, _ := r.getDrift()
packetDrift, reportDrift := r.getDrift()
p := &livekit.RTPStats{
StartTime: timestamppb.New(r.startTime),
@@ -1448,8 +1374,8 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
LastFir: timestamppb.New(r.lastFir),
RttCurrent: r.rtt,
RttMax: r.maxRtt,
DriftMs: packetDrift.driftMs,
SampleRate: packetDrift.sampleRate,
PacketDrift: packetDrift,
ReportDrift: reportDrift,
}
gapsPresent := false
@@ -1636,22 +1562,42 @@ func (r *RTPStats) updateJitter(rtph *rtp.Header, packetTime time.Time) {
r.lastJitterRTP = rtph.Timestamp
}
func (r *RTPStats) getDrift() (packetDrift driftResult, reportDrift driftResult) {
packetDrift.timeSinceFirst = r.highestTime.Sub(r.firstTime)
packetDrift.rtpDiffSinceFirst = r.timestamp.GetExtendedHighest() - r.timestamp.GetExtendedStart()
packetDrift.driftSamples = int64(packetDrift.rtpDiffSinceFirst - uint64(packetDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9))
packetDrift.driftMs = (float64(packetDrift.driftSamples) * 1000) / float64(r.params.ClockRate)
if packetDrift.timeSinceFirst.Seconds() != 0 {
packetDrift.sampleRate = float64(packetDrift.rtpDiffSinceFirst) / packetDrift.timeSinceFirst.Seconds()
func (r *RTPStats) getDrift() (packetDrift *livekit.RTPDrift, reportDrift *livekit.RTPDrift) {
if !r.firstTime.IsZero() {
elapsed := r.highestTime.Sub(r.firstTime)
rtpClockTicks := r.timestamp.GetExtendedHighest() - r.timestamp.GetExtendedStart()
driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9))
if elapsed.Seconds() > 0.0 {
packetDrift = &livekit.RTPDrift{
StartTime: timestamppb.New(r.firstTime),
EndTime: timestamppb.New(r.highestTime),
Duration: elapsed.Seconds(),
StartTimestamp: r.timestamp.GetExtendedStart(),
EndTimestamp: r.timestamp.GetExtendedHighest(),
RtpClockTicks: rtpClockTicks,
DriftSamples: driftSamples,
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
ClockRate: float64(rtpClockTicks) / elapsed.Seconds(),
}
}
}
if r.srFirst != nil && r.srNewest != nil && r.srFirst.RTPTimestamp != r.srNewest.RTPTimestamp {
reportDrift.timeSinceFirst = r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time())
reportDrift.rtpDiffSinceFirst = r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt
reportDrift.driftSamples = int64(reportDrift.rtpDiffSinceFirst - uint64(reportDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9))
reportDrift.driftMs = (float64(reportDrift.driftSamples) * 1000) / float64(r.params.ClockRate)
if reportDrift.timeSinceFirst.Seconds() != 0 {
reportDrift.sampleRate = float64(reportDrift.rtpDiffSinceFirst) / reportDrift.timeSinceFirst.Seconds()
elapsed := r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time())
rtpClockTicks := r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt
driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9))
if elapsed.Seconds() > 0.0 {
reportDrift = &livekit.RTPDrift{
StartTime: timestamppb.New(r.srFirst.NTPTimestamp.Time()),
EndTime: timestamppb.New(r.srNewest.NTPTimestamp.Time()),
Duration: elapsed.Seconds(),
StartTimestamp: r.timestamp.GetExtendedStart(),
EndTimestamp: r.timestamp.GetExtendedHighest(),
RtpClockTicks: rtpClockTicks,
DriftSamples: driftSamples,
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
ClockRate: float64(rtpClockTicks) / elapsed.Seconds(),
}
}
}
return
@@ -1754,8 +1700,6 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
lastFir := time.Time{}
rtt := uint32(0)
maxRtt := uint32(0)
driftMs := float64(0.0)
sampleRate := float64(0.0)
for _, stats := range statsList {
if startTime.IsZero() || startTime.After(stats.StartTime.AsTime()) {
@@ -1822,9 +1766,6 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
if stats.RttMax > maxRtt {
maxRtt = stats.RttMax
}
driftMs += stats.DriftMs
sampleRate += stats.SampleRate
}
if endTime.IsZero() {
@@ -1887,8 +1828,7 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
LastFir: timestamppb.New(lastFir),
RttCurrent: rtt / uint32(len(statsList)),
RttMax: maxRtt,
DriftMs: driftMs / float64(len(statsList)),
SampleRate: sampleRate / float64(len(statsList)),
// no aggregation for drift calculations
}
}
+1 -1
View File
@@ -1135,7 +1135,7 @@ func (d *DownTrack) ProvisionalAllocateReset() {
d.forwarder.ProvisionalAllocateReset()
}
func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 {
func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) {
return d.forwarder.ProvisionalAllocate(availableChannelCapacity, layers, allowPause, allowOvershoot)
}
+6 -6
View File
@@ -728,7 +728,7 @@ func (f *Forwarder) ProvisionalAllocateReset() {
f.provisional.allocatedLayer = buffer.InvalidLayer
}
func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 {
func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) {
f.lock.Lock()
defer f.lock.Unlock()
@@ -737,12 +737,12 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu
f.provisional.maxSeenLayer.Spatial == buffer.InvalidLayerSpatial ||
!f.provisional.maxLayer.IsValid() ||
((!allowOvershoot || !f.vls.IsOvershootOkay()) && layer.GreaterThan(f.provisional.maxLayer)) {
return 0
return false, 0
}
requiredBitrate := f.provisional.Bitrates[layer.Spatial][layer.Temporal]
if requiredBitrate == 0 {
return 0
return false, 0
}
alreadyAllocatedBitrate := int64(0)
@@ -753,7 +753,7 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu
// a layer under maximum fits, take it
if !layer.GreaterThan(f.provisional.maxLayer) && requiredBitrate <= (availableChannelCapacity+alreadyAllocatedBitrate) {
f.provisional.allocatedLayer = layer
return requiredBitrate - alreadyAllocatedBitrate
return true, requiredBitrate - alreadyAllocatedBitrate
}
//
@@ -766,10 +766,10 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu
//
if !allowPause && (!f.provisional.allocatedLayer.IsValid() || !layer.GreaterThan(f.provisional.allocatedLayer)) {
f.provisional.allocatedLayer = layer
return requiredBitrate - alreadyAllocatedBitrate
return true, requiredBitrate - alreadyAllocatedBitrate
}
return 0
return false, 0
}
func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition {
+34 -17
View File
@@ -400,20 +400,25 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
f.ProvisionalAllocatePrepare(nil, bitrates)
usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false)
isCandidate, usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false)
require.True(t, isCandidate)
require.Equal(t, bitrates[0][0], usedBitrate)
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, true, false)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, true, false)
require.True(t, isCandidate)
require.Equal(t, bitrates[2][3]-bitrates[0][0], usedBitrate)
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 3}, true, false)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 3}, true, false)
require.True(t, isCandidate)
require.Equal(t, bitrates[0][3]-bitrates[2][3], usedBitrate)
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, false)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, false)
require.True(t, isCandidate)
require.Equal(t, bitrates[1][2]-bitrates[0][3], usedBitrate)
// available not enough to reach (2, 2), allocating at (2, 2) should not succeed
usedBitrate = f.ProvisionalAllocate(bitrates[2][2]-bitrates[1][2]-1, buffer.VideoLayer{Spatial: 2, Temporal: 2}, true, false)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][2]-bitrates[1][2]-1, buffer.VideoLayer{Spatial: 2, Temporal: 2}, true, false)
require.False(t, isCandidate)
require.Equal(t, int64(0), usedBitrate)
// committing should set target to (1, 2)
@@ -440,7 +445,8 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
// when nothing fits and pausing disallowed, should allocate (0, 0)
f.vls.SetTarget(buffer.InvalidLayer)
f.ProvisionalAllocatePrepare(nil, bitrates)
usedBitrate = f.ProvisionalAllocate(0, buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, false)
isCandidate, usedBitrate = f.ProvisionalAllocate(0, buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, false)
require.True(t, isCandidate)
require.Equal(t, int64(1), usedBitrate)
// committing should set target to (0, 0)
@@ -477,15 +483,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
f.ProvisionalAllocatePrepare(nil, bitrates)
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
require.False(t, isCandidate)
require.Equal(t, int64(0), usedBitrate)
// overshoot should succeed
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
require.True(t, isCandidate)
require.Equal(t, bitrates[2][3], usedBitrate)
// overshoot should succeed - this should win as this is lesser overshoot
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
require.True(t, isCandidate)
require.Equal(t, bitrates[1][3]-bitrates[2][3], usedBitrate)
// committing should set target to (1, 3)
@@ -524,15 +533,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
f.ProvisionalAllocatePrepare(nil, bitrates)
// all the provisional allocations should not succeed because the feed is dry
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
require.False(t, isCandidate)
require.Equal(t, int64(0), usedBitrate)
// overshoot should not succeed
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
require.False(t, isCandidate)
require.Equal(t, int64(0), usedBitrate)
// overshoot should not succeed
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
require.False(t, isCandidate)
require.Equal(t, int64(0), usedBitrate)
// committing should set target to (0, 2), i. e. leave it at current for opportunistic forwarding
@@ -562,15 +574,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
f.ProvisionalAllocatePrepare(nil, bitrates)
// all the provisional allocations below should not succeed because the feed is dry
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
require.False(t, isCandidate)
require.Equal(t, int64(0), usedBitrate)
// overshoot should not succeed
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
require.False(t, isCandidate)
require.Equal(t, int64(0), usedBitrate)
// overshoot should not succeed
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
require.False(t, isCandidate)
require.Equal(t, int64(0), usedBitrate)
expectedResult = VideoAllocation{
@@ -604,10 +619,12 @@ func TestForwarderProvisionalAllocateMute(t *testing.T) {
f.Mute(true)
f.ProvisionalAllocatePrepare(nil, bitrates)
usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false)
isCandidate, usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false)
require.False(t, isCandidate)
require.Equal(t, int64(0), usedBitrate)
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, true)
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, true)
require.False(t, isCandidate)
require.Equal(t, int64(0), usedBitrate)
// committing should set target to buffer.InvalidLayer as track is muted
+72 -26
View File
@@ -883,9 +883,15 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
return
}
// this track is currently not streaming and needs bits to start.
// first try an allocation using available headroom
availableChannelCapacity := s.getAvailableHeadroom(false)
// already streaming at some layer and transition is not requesting any change, i. e. BandwidthDelta == 0
if transition.From.IsValid() && transition.BandwidthDelta == 0 {
return
}
// this track is currently not streaming and needs bits to start OR streaming at some layer and wants more bits.
// NOTE: With co-operative transition, tracks should not be asking for more if already streaming, but handle that case any way.
// first try an allocation using available headroom, current consumption of this track is discounted to calculate headroom.
availableChannelCapacity := s.getAvailableHeadroomWithoutTracks(false, []*Track{track})
if availableChannelCapacity > 0 {
track.ProvisionalAllocateReset() // to reset allocation from co-operative transition above and try fresh
@@ -899,21 +905,30 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
Temporal: temporal,
}
usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient)
isCandidate, usedChannelCapacity := track.ProvisionalAllocate(
availableChannelCapacity,
layer,
s.allowPause,
FlagAllowOvershootWhileDeficient,
)
if availableChannelCapacity < usedChannelCapacity {
break alloc_loop
}
bestLayer = layer
if isCandidate {
bestLayer = layer
}
}
}
if bestLayer.IsValid() {
// found layer that can fit in available headroom
update := NewStreamStateUpdate()
allocation := track.ProvisionalAllocateCommit()
updateStreamStateChange(track, allocation, update)
s.maybeSendUpdate(update)
if bestLayer.GreaterThan(transition.From) {
// found layer that can fit in available headroom, take it if it is better than existing
update := NewStreamStateUpdate()
allocation := track.ProvisionalAllocateCommit()
updateStreamStateChange(track, allocation, update)
s.maybeSendUpdate(update)
}
s.adjustState()
return
@@ -923,11 +938,6 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
transition = track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient) // get transition again to reset above allocation attempt using available headroom
}
// track is currently streaming at minimum
if transition.BandwidthDelta == 0 {
return
}
// if there is not enough headroom, try to redistribute starting with tracks that are closest to their desired.
bandwidthAcquired := int64(0)
var contributingTracks []*Track
@@ -1018,17 +1028,31 @@ func (s *StreamAllocator) maybeBoostDeficientTracks() {
update := NewStreamStateUpdate()
for _, track := range s.getMaxDistanceSortedDeficient() {
allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup)
if !boosted {
continue
sortedTracks := s.getMaxDistanceSortedDeficient()
boost_loop:
for {
for idx, track := range sortedTracks {
allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup)
if !boosted {
if idx == len(sortedTracks)-1 {
// all tracks tried
break boost_loop
}
continue
}
updateStreamStateChange(track, allocation, update)
availableChannelCapacity -= allocation.BandwidthDelta
if availableChannelCapacity <= 0 {
break boost_loop
}
break // sort again below as the track that was just boosted could still be farthest from its desired
}
updateStreamStateChange(track, allocation, update)
availableChannelCapacity -= allocation.BandwidthDelta
if availableChannelCapacity <= 0 {
break
sortedTracks = s.getMaxDistanceSortedDeficient()
if len(sortedTracks) == 0 {
break // nothing available to boost
}
}
@@ -1103,7 +1127,7 @@ func (s *StreamAllocator) allocateAllTracks() {
}
for _, track := range sorted {
usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient)
_, usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient)
availableChannelCapacity -= usedChannelCapacity
if availableChannelCapacity < 0 {
availableChannelCapacity = 0
@@ -1174,10 +1198,32 @@ func (s *StreamAllocator) getExpectedBandwidthUsage() int64 {
return expected
}
func (s *StreamAllocator) getExpectedBandwidthUsageWithoutTracks(filteredTracks []*Track) int64 {
expected := int64(0)
for _, track := range s.getTracks() {
filtered := false
for _, ft := range filteredTracks {
if ft == track {
filtered = true
break
}
}
if !filtered {
expected += track.BandwidthRequested()
}
}
return expected
}
func (s *StreamAllocator) getAvailableHeadroom(allowOverride bool) int64 {
return s.getAvailableChannelCapacity(allowOverride) - s.getExpectedBandwidthUsage()
}
func (s *StreamAllocator) getAvailableHeadroomWithoutTracks(allowOverride bool, filteredTracks []*Track) int64 {
return s.getAvailableChannelCapacity(allowOverride) - s.getExpectedBandwidthUsageWithoutTracks(filteredTracks)
}
func (s *StreamAllocator) getNackDelta() (uint32, uint32) {
aggPacketDelta := uint32(0)
aggRepeatedNackDelta := uint32(0)
+1 -1
View File
@@ -164,7 +164,7 @@ func (t *Track) ProvisionalAllocateReset() {
t.downTrack.ProvisionalAllocateReset()
}
func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 {
func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) {
return t.downTrack.ProvisionalAllocate(availableChannelCapacity, layer, allowPause, allowOvershoot)
}