mirror of
https://github.com/livekit/livekit.git
synced 2026-05-13 07:55:28 +00:00
Merge remote-tracking branch 'origin/master' into raja_fr
This commit is contained in:
@@ -18,7 +18,7 @@ require (
|
||||
github.com/jxskiss/base62 v1.1.0
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0
|
||||
github.com/livekit/protocol v1.6.1
|
||||
github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86
|
||||
github.com/livekit/psrpc v0.3.3
|
||||
github.com/mackerelio/go-osstat v0.2.4
|
||||
github.com/magefile/mage v1.15.0
|
||||
|
||||
@@ -124,8 +124,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 h1:cHNvPzn6VHFcsHx8ZC9LwU/4jj22mW3LILrNg/y5A6I=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14=
|
||||
github.com/livekit/protocol v1.6.1 h1:MjRg/UBmynE636In1GD9PbrF2u/C10WwaVIkObsZYtk=
|
||||
github.com/livekit/protocol v1.6.1/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU=
|
||||
github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 h1:QEzGhfIOmGdRw17xIldbYzb1MTsYuVfXSqz8FTyfjWQ=
|
||||
github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU=
|
||||
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
|
||||
github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
|
||||
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
|
||||
|
||||
@@ -17,6 +17,7 @@ package service
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
@@ -160,6 +161,10 @@ func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgre
|
||||
return ei, err
|
||||
}
|
||||
|
||||
func (s *EgressService) StartParticipantEgress(ctx context.Context, req *livekit.ParticipantEgressRequest) (*livekit.EgressInfo, error) {
|
||||
return nil, errors.New("under development")
|
||||
}
|
||||
|
||||
func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
|
||||
if err := EnsureRecordPermission(ctx); err != nil {
|
||||
return nil, twirpAuthError(err)
|
||||
|
||||
+75
-135
@@ -17,7 +17,6 @@ package buffer
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -44,22 +43,16 @@ const (
|
||||
|
||||
// -------------------------------------------------------
|
||||
|
||||
type driftResult struct {
|
||||
timeSinceFirst time.Duration
|
||||
rtpDiffSinceFirst uint64
|
||||
driftSamples int64
|
||||
driftMs float64
|
||||
sampleRate float64
|
||||
}
|
||||
func RTPDriftToString(r *livekit.RTPDrift) string {
|
||||
if r == nil {
|
||||
return "-"
|
||||
}
|
||||
|
||||
func (d driftResult) String() string {
|
||||
return fmt.Sprintf("time: %+v, rtp: %d, driftSamples: %d, driftMs: %.02f, sampleRate: %.02f",
|
||||
d.timeSinceFirst,
|
||||
d.rtpDiffSinceFirst,
|
||||
d.driftSamples,
|
||||
d.driftMs,
|
||||
d.sampleRate,
|
||||
)
|
||||
str := fmt.Sprintf("t: %+v|%+v|%.2fs", r.StartTime.AsTime().Format(time.UnixDate), r.EndTime.AsTime().Format(time.UnixDate), r.Duration)
|
||||
str += fmt.Sprintf(", ts: %d|%d|%d", r.StartTimestamp, r.EndTimestamp, r.RtpClockTicks)
|
||||
str += fmt.Sprintf(", d: %d|%.2fms", r.DriftSamples, r.DriftMs)
|
||||
str += fmt.Sprintf(", cr: %.2f", r.ClockRate)
|
||||
return str
|
||||
}
|
||||
|
||||
// -------------------------------------------------------
|
||||
@@ -867,8 +860,10 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
|
||||
"received anachronous sender report",
|
||||
"currentNTP", srData.NTPTimestamp.Time().String(),
|
||||
"currentRTP", srData.RTPTimestamp,
|
||||
"currentAt", srData.At.String(),
|
||||
"lastNTP", r.srNewest.NTPTimestamp.Time().String(),
|
||||
"lastRTP", r.srNewest.RTPTimestamp,
|
||||
"lastAt", r.srNewest.At.String(),
|
||||
)
|
||||
return
|
||||
}
|
||||
@@ -891,63 +886,29 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
|
||||
|
||||
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt)
|
||||
|
||||
// monitor and log RTP timestamp anomalies
|
||||
var ntpDiffSinceLast time.Duration
|
||||
var rtpDiffSinceLast uint32
|
||||
var arrivalDiffSinceLast time.Duration
|
||||
var expectedTimeDiffSinceLast float64
|
||||
var isWarped bool
|
||||
if r.srNewest != nil {
|
||||
if srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt {
|
||||
// This can happen when a track is replaced with a null and then restored -
|
||||
// i. e. muting replacing with null and unmute restoring the original track.
|
||||
// Under such a condition reset the sender reports to start from this point.
|
||||
// Resetting will ensure sample rate calculations do not go haywire due to negative time.
|
||||
r.logger.Infow(
|
||||
"received sender report, out-of-order, resetting",
|
||||
"prevTSExt", r.srNewest.RTPTimestampExt,
|
||||
"prevRTP", r.srNewest.RTPTimestamp,
|
||||
"prevNTP", r.srNewest.NTPTimestamp.Time().String(),
|
||||
"currTSExt", srDataCopy.RTPTimestampExt,
|
||||
"currRTP", srDataCopy.RTPTimestamp,
|
||||
"currNTP", srDataCopy.NTPTimestamp.Time().String(),
|
||||
)
|
||||
r.srFirst = &srDataCopy
|
||||
r.srNewest = &srDataCopy
|
||||
}
|
||||
|
||||
ntpDiffSinceLast = srDataCopy.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
|
||||
rtpDiffSinceLast = srDataCopy.RTPTimestamp - r.srNewest.RTPTimestamp
|
||||
arrivalDiffSinceLast = srDataCopy.At.Sub(r.srNewest.At)
|
||||
expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
|
||||
if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 {
|
||||
// more than 200 ms away from expected delta
|
||||
isWarped = true
|
||||
}
|
||||
if r.srNewest != nil && srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt {
|
||||
// This can happen when a track is replaced with a null and then restored -
|
||||
// i. e. muting replacing with null and unmute restoring the original track.
|
||||
// Under such a condition reset the sender reports to start from this point.
|
||||
// Resetting will ensure sample rate calculations do not go haywire due to negative time.
|
||||
r.logger.Infow(
|
||||
"received sender report, out-of-order, resetting",
|
||||
"prevTSExt", r.srNewest.RTPTimestampExt,
|
||||
"prevRTP", r.srNewest.RTPTimestamp,
|
||||
"prevNTP", r.srNewest.NTPTimestamp.Time().String(),
|
||||
"prevAt", r.srNewest.At.String(),
|
||||
"currTSExt", srDataCopy.RTPTimestampExt,
|
||||
"currRTP", srDataCopy.RTPTimestamp,
|
||||
"currNTP", srDataCopy.NTPTimestamp.Time().String(),
|
||||
"currentAt", srDataCopy.At.String(),
|
||||
)
|
||||
r.srFirst = nil
|
||||
}
|
||||
|
||||
r.srNewest = &srDataCopy
|
||||
if r.srFirst == nil {
|
||||
r.srFirst = &srDataCopy
|
||||
}
|
||||
|
||||
if isWarped {
|
||||
packetDriftResult, reportDriftResult := r.getDrift()
|
||||
r.logger.Infow(
|
||||
"received sender report, time warp",
|
||||
"ntp", srData.NTPTimestamp.Time().String(),
|
||||
"rtp", srData.RTPTimestamp,
|
||||
"arrival", srData.At.String(),
|
||||
"ntpDiffSinceLast", ntpDiffSinceLast.Seconds(),
|
||||
"rtpDiffSinceLast", int32(rtpDiffSinceLast),
|
||||
"arrivalDiffSinceLast", arrivalDiffSinceLast.Seconds(),
|
||||
"expectedTimeDiffSinceLast", expectedTimeDiffSinceLast,
|
||||
"packetDrift", packetDriftResult.String(),
|
||||
"reportDrift", reportDriftResult.String(),
|
||||
"highestTS", r.timestamp.GetExtendedHighest(),
|
||||
"highestTime", r.highestTime.String(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RTPStats) GetRtcpSenderReportData() (srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) {
|
||||
@@ -996,6 +957,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
|
||||
|
||||
timeSinceHighest := now.Sub(r.highestTime)
|
||||
nowRTPExt := r.timestamp.GetExtendedHighest() + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
|
||||
nowRTPExtUsingTime := nowRTPExt
|
||||
nowRTP := uint32(nowRTPExt)
|
||||
|
||||
// It is possible that publisher is pacing at a slower rate.
|
||||
@@ -1030,30 +992,20 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
|
||||
"currTSExt", nowRTPExt,
|
||||
"currRTP", nowRTP,
|
||||
"currNTP", nowNTP.Time().String(),
|
||||
"timeNow", time.Now().String(),
|
||||
"firstTime", r.firstTime.String(),
|
||||
"timeSinceFirst", timeSinceFirst,
|
||||
"highestTime", r.highestTime.String(),
|
||||
"timeSinceHighest", timeSinceHighest,
|
||||
"nowRTPExtUsingTime", nowRTPExtUsingTime,
|
||||
"calculatedClockRate", calculatedClockRate,
|
||||
"nowRTPExtUsingRate", nowRTPExtUsingRate,
|
||||
)
|
||||
ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
|
||||
nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate))
|
||||
nowRTP = uint32(nowRTPExt)
|
||||
}
|
||||
|
||||
// monitor and log RTP timestamp anomalies
|
||||
var ntpDiffSinceLast time.Duration
|
||||
var rtpDiffSinceLast uint64
|
||||
var departureDiffSinceLast time.Duration
|
||||
var expectedTimeDiffSinceLast float64
|
||||
var isWarped bool
|
||||
if r.srNewest != nil {
|
||||
ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
|
||||
rtpDiffSinceLast = nowRTPExt - r.srNewest.RTPTimestampExt
|
||||
departureDiffSinceLast = now.Sub(r.srNewest.At)
|
||||
|
||||
expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
|
||||
if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 {
|
||||
// more than 200 ms away from expected delta
|
||||
isWarped = true
|
||||
}
|
||||
}
|
||||
|
||||
r.srNewest = &RTCPSenderReportData{
|
||||
NTPTimestamp: nowNTP,
|
||||
RTPTimestamp: nowRTP,
|
||||
@@ -1064,27 +1016,6 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
|
||||
r.srFirst = r.srNewest
|
||||
}
|
||||
|
||||
if isWarped {
|
||||
packetDriftResult, reportDriftResult := r.getDrift()
|
||||
r.logger.Infow(
|
||||
"sending sender report, time warp",
|
||||
"ntp", nowNTP.Time().String(),
|
||||
"rtp", nowRTP,
|
||||
"departure", now.String(),
|
||||
"ntpDiffSinceLast", ntpDiffSinceLast.Seconds(),
|
||||
"rtpDiffSinceLast", int32(rtpDiffSinceLast),
|
||||
"departureDiffSinceLast", departureDiffSinceLast.Seconds(),
|
||||
"expectedTimeDiffSinceLast", expectedTimeDiffSinceLast,
|
||||
"packetDrift", packetDriftResult.String(),
|
||||
"reportDrift", reportDriftResult.String(),
|
||||
"highestTS", r.timestamp.GetExtendedHighest(),
|
||||
"highestTime", r.highestTime.String(),
|
||||
"calculatedClockRate", calculatedClockRate,
|
||||
"nowRTPExt", nowRTPExt,
|
||||
"nowRTPExtUsingRate", nowRTPExtUsingRate,
|
||||
)
|
||||
}
|
||||
|
||||
return &rtcp.SenderReport{
|
||||
SSRC: ssrc,
|
||||
NTPTime: uint64(nowNTP),
|
||||
@@ -1352,12 +1283,7 @@ func (r *RTPStats) ToString() string {
|
||||
str += ", rtt(ms):"
|
||||
str += fmt.Sprintf("%d|%d", p.RttCurrent, p.RttMax)
|
||||
|
||||
str += ", drift(ms):"
|
||||
str += fmt.Sprintf("%.2f", p.DriftMs)
|
||||
|
||||
str += ", sr(Hz):"
|
||||
str += fmt.Sprintf("%.2f", p.SampleRate)
|
||||
|
||||
str += fmt.Sprintf(", pd: %s, rd: %s", RTPDriftToString(p.PacketDrift), RTPDriftToString(p.ReportDrift))
|
||||
return str
|
||||
}
|
||||
|
||||
@@ -1405,7 +1331,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
|
||||
jitterTime := jitter / float64(r.params.ClockRate) * 1e6
|
||||
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
|
||||
|
||||
packetDrift, _ := r.getDrift()
|
||||
packetDrift, reportDrift := r.getDrift()
|
||||
|
||||
p := &livekit.RTPStats{
|
||||
StartTime: timestamppb.New(r.startTime),
|
||||
@@ -1448,8 +1374,8 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
|
||||
LastFir: timestamppb.New(r.lastFir),
|
||||
RttCurrent: r.rtt,
|
||||
RttMax: r.maxRtt,
|
||||
DriftMs: packetDrift.driftMs,
|
||||
SampleRate: packetDrift.sampleRate,
|
||||
PacketDrift: packetDrift,
|
||||
ReportDrift: reportDrift,
|
||||
}
|
||||
|
||||
gapsPresent := false
|
||||
@@ -1636,22 +1562,42 @@ func (r *RTPStats) updateJitter(rtph *rtp.Header, packetTime time.Time) {
|
||||
r.lastJitterRTP = rtph.Timestamp
|
||||
}
|
||||
|
||||
func (r *RTPStats) getDrift() (packetDrift driftResult, reportDrift driftResult) {
|
||||
packetDrift.timeSinceFirst = r.highestTime.Sub(r.firstTime)
|
||||
packetDrift.rtpDiffSinceFirst = r.timestamp.GetExtendedHighest() - r.timestamp.GetExtendedStart()
|
||||
packetDrift.driftSamples = int64(packetDrift.rtpDiffSinceFirst - uint64(packetDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9))
|
||||
packetDrift.driftMs = (float64(packetDrift.driftSamples) * 1000) / float64(r.params.ClockRate)
|
||||
if packetDrift.timeSinceFirst.Seconds() != 0 {
|
||||
packetDrift.sampleRate = float64(packetDrift.rtpDiffSinceFirst) / packetDrift.timeSinceFirst.Seconds()
|
||||
func (r *RTPStats) getDrift() (packetDrift *livekit.RTPDrift, reportDrift *livekit.RTPDrift) {
|
||||
if !r.firstTime.IsZero() {
|
||||
elapsed := r.highestTime.Sub(r.firstTime)
|
||||
rtpClockTicks := r.timestamp.GetExtendedHighest() - r.timestamp.GetExtendedStart()
|
||||
driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9))
|
||||
if elapsed.Seconds() > 0.0 {
|
||||
packetDrift = &livekit.RTPDrift{
|
||||
StartTime: timestamppb.New(r.firstTime),
|
||||
EndTime: timestamppb.New(r.highestTime),
|
||||
Duration: elapsed.Seconds(),
|
||||
StartTimestamp: r.timestamp.GetExtendedStart(),
|
||||
EndTimestamp: r.timestamp.GetExtendedHighest(),
|
||||
RtpClockTicks: rtpClockTicks,
|
||||
DriftSamples: driftSamples,
|
||||
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
|
||||
ClockRate: float64(rtpClockTicks) / elapsed.Seconds(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if r.srFirst != nil && r.srNewest != nil && r.srFirst.RTPTimestamp != r.srNewest.RTPTimestamp {
|
||||
reportDrift.timeSinceFirst = r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time())
|
||||
reportDrift.rtpDiffSinceFirst = r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt
|
||||
reportDrift.driftSamples = int64(reportDrift.rtpDiffSinceFirst - uint64(reportDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9))
|
||||
reportDrift.driftMs = (float64(reportDrift.driftSamples) * 1000) / float64(r.params.ClockRate)
|
||||
if reportDrift.timeSinceFirst.Seconds() != 0 {
|
||||
reportDrift.sampleRate = float64(reportDrift.rtpDiffSinceFirst) / reportDrift.timeSinceFirst.Seconds()
|
||||
elapsed := r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time())
|
||||
rtpClockTicks := r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt
|
||||
driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9))
|
||||
if elapsed.Seconds() > 0.0 {
|
||||
reportDrift = &livekit.RTPDrift{
|
||||
StartTime: timestamppb.New(r.srFirst.NTPTimestamp.Time()),
|
||||
EndTime: timestamppb.New(r.srNewest.NTPTimestamp.Time()),
|
||||
Duration: elapsed.Seconds(),
|
||||
StartTimestamp: r.timestamp.GetExtendedStart(),
|
||||
EndTimestamp: r.timestamp.GetExtendedHighest(),
|
||||
RtpClockTicks: rtpClockTicks,
|
||||
DriftSamples: driftSamples,
|
||||
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
|
||||
ClockRate: float64(rtpClockTicks) / elapsed.Seconds(),
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
@@ -1754,8 +1700,6 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
|
||||
lastFir := time.Time{}
|
||||
rtt := uint32(0)
|
||||
maxRtt := uint32(0)
|
||||
driftMs := float64(0.0)
|
||||
sampleRate := float64(0.0)
|
||||
|
||||
for _, stats := range statsList {
|
||||
if startTime.IsZero() || startTime.After(stats.StartTime.AsTime()) {
|
||||
@@ -1822,9 +1766,6 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
|
||||
if stats.RttMax > maxRtt {
|
||||
maxRtt = stats.RttMax
|
||||
}
|
||||
|
||||
driftMs += stats.DriftMs
|
||||
sampleRate += stats.SampleRate
|
||||
}
|
||||
|
||||
if endTime.IsZero() {
|
||||
@@ -1887,8 +1828,7 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
|
||||
LastFir: timestamppb.New(lastFir),
|
||||
RttCurrent: rtt / uint32(len(statsList)),
|
||||
RttMax: maxRtt,
|
||||
DriftMs: driftMs / float64(len(statsList)),
|
||||
SampleRate: sampleRate / float64(len(statsList)),
|
||||
// no aggregation for drift calculations
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1136,7 +1136,7 @@ func (d *DownTrack) ProvisionalAllocateReset() {
|
||||
d.forwarder.ProvisionalAllocateReset()
|
||||
}
|
||||
|
||||
func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 {
|
||||
func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) {
|
||||
return d.forwarder.ProvisionalAllocate(availableChannelCapacity, layers, allowPause, allowOvershoot)
|
||||
}
|
||||
|
||||
|
||||
@@ -728,7 +728,7 @@ func (f *Forwarder) ProvisionalAllocateReset() {
|
||||
f.provisional.allocatedLayer = buffer.InvalidLayer
|
||||
}
|
||||
|
||||
func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 {
|
||||
func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
@@ -737,12 +737,12 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu
|
||||
f.provisional.maxSeenLayer.Spatial == buffer.InvalidLayerSpatial ||
|
||||
!f.provisional.maxLayer.IsValid() ||
|
||||
((!allowOvershoot || !f.vls.IsOvershootOkay()) && layer.GreaterThan(f.provisional.maxLayer)) {
|
||||
return 0
|
||||
return false, 0
|
||||
}
|
||||
|
||||
requiredBitrate := f.provisional.Bitrates[layer.Spatial][layer.Temporal]
|
||||
if requiredBitrate == 0 {
|
||||
return 0
|
||||
return false, 0
|
||||
}
|
||||
|
||||
alreadyAllocatedBitrate := int64(0)
|
||||
@@ -753,7 +753,7 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu
|
||||
// a layer under maximum fits, take it
|
||||
if !layer.GreaterThan(f.provisional.maxLayer) && requiredBitrate <= (availableChannelCapacity+alreadyAllocatedBitrate) {
|
||||
f.provisional.allocatedLayer = layer
|
||||
return requiredBitrate - alreadyAllocatedBitrate
|
||||
return true, requiredBitrate - alreadyAllocatedBitrate
|
||||
}
|
||||
|
||||
//
|
||||
@@ -766,10 +766,10 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu
|
||||
//
|
||||
if !allowPause && (!f.provisional.allocatedLayer.IsValid() || !layer.GreaterThan(f.provisional.allocatedLayer)) {
|
||||
f.provisional.allocatedLayer = layer
|
||||
return requiredBitrate - alreadyAllocatedBitrate
|
||||
return true, requiredBitrate - alreadyAllocatedBitrate
|
||||
}
|
||||
|
||||
return 0
|
||||
return false, 0
|
||||
}
|
||||
|
||||
func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition {
|
||||
|
||||
+34
-17
@@ -400,20 +400,25 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
|
||||
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
|
||||
usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false)
|
||||
isCandidate, usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, bitrates[0][0], usedBitrate)
|
||||
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, true, false)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, true, false)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, bitrates[2][3]-bitrates[0][0], usedBitrate)
|
||||
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 3}, true, false)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 3}, true, false)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, bitrates[0][3]-bitrates[2][3], usedBitrate)
|
||||
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, false)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, false)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, bitrates[1][2]-bitrates[0][3], usedBitrate)
|
||||
|
||||
// available not enough to reach (2, 2), allocating at (2, 2) should not succeed
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][2]-bitrates[1][2]-1, buffer.VideoLayer{Spatial: 2, Temporal: 2}, true, false)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][2]-bitrates[1][2]-1, buffer.VideoLayer{Spatial: 2, Temporal: 2}, true, false)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// committing should set target to (1, 2)
|
||||
@@ -440,7 +445,8 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
|
||||
// when nothing fits and pausing disallowed, should allocate (0, 0)
|
||||
f.vls.SetTarget(buffer.InvalidLayer)
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
usedBitrate = f.ProvisionalAllocate(0, buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, false)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(0, buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, false)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, int64(1), usedBitrate)
|
||||
|
||||
// committing should set target to (0, 0)
|
||||
@@ -477,15 +483,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
|
||||
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// overshoot should succeed
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, bitrates[2][3], usedBitrate)
|
||||
|
||||
// overshoot should succeed - this should win as this is lesser overshoot
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, bitrates[1][3]-bitrates[2][3], usedBitrate)
|
||||
|
||||
// committing should set target to (1, 3)
|
||||
@@ -524,15 +533,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
|
||||
// all the provisional allocations should not succeed because the feed is dry
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// overshoot should not succeed
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// overshoot should not succeed
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// committing should set target to (0, 2), i. e. leave it at current for opportunistic forwarding
|
||||
@@ -562,15 +574,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
|
||||
// all the provisional allocations below should not succeed because the feed is dry
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// overshoot should not succeed
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// overshoot should not succeed
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
expectedResult = VideoAllocation{
|
||||
@@ -604,10 +619,12 @@ func TestForwarderProvisionalAllocateMute(t *testing.T) {
|
||||
f.Mute(true)
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
|
||||
usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false)
|
||||
isCandidate, usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// committing should set target to buffer.InvalidLayer as track is muted
|
||||
|
||||
@@ -883,9 +883,15 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
|
||||
return
|
||||
}
|
||||
|
||||
// this track is currently not streaming and needs bits to start.
|
||||
// first try an allocation using available headroom
|
||||
availableChannelCapacity := s.getAvailableHeadroom(false)
|
||||
// already streaming at some layer and transition is not requesting any change, i. e. BandwidthDelta == 0
|
||||
if transition.From.IsValid() && transition.BandwidthDelta == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// this track is currently not streaming and needs bits to start OR streaming at some layer and wants more bits.
|
||||
// NOTE: With co-operative transition, tracks should not be asking for more if already streaming, but handle that case any way.
|
||||
// first try an allocation using available headroom, current consumption of this track is discounted to calculate headroom.
|
||||
availableChannelCapacity := s.getAvailableHeadroomWithoutTracks(false, []*Track{track})
|
||||
if availableChannelCapacity > 0 {
|
||||
track.ProvisionalAllocateReset() // to reset allocation from co-operative transition above and try fresh
|
||||
|
||||
@@ -899,21 +905,30 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
|
||||
Temporal: temporal,
|
||||
}
|
||||
|
||||
usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient)
|
||||
isCandidate, usedChannelCapacity := track.ProvisionalAllocate(
|
||||
availableChannelCapacity,
|
||||
layer,
|
||||
s.allowPause,
|
||||
FlagAllowOvershootWhileDeficient,
|
||||
)
|
||||
if availableChannelCapacity < usedChannelCapacity {
|
||||
break alloc_loop
|
||||
}
|
||||
|
||||
bestLayer = layer
|
||||
if isCandidate {
|
||||
bestLayer = layer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if bestLayer.IsValid() {
|
||||
// found layer that can fit in available headroom
|
||||
update := NewStreamStateUpdate()
|
||||
allocation := track.ProvisionalAllocateCommit()
|
||||
updateStreamStateChange(track, allocation, update)
|
||||
s.maybeSendUpdate(update)
|
||||
if bestLayer.GreaterThan(transition.From) {
|
||||
// found layer that can fit in available headroom, take it if it is better than existing
|
||||
update := NewStreamStateUpdate()
|
||||
allocation := track.ProvisionalAllocateCommit()
|
||||
updateStreamStateChange(track, allocation, update)
|
||||
s.maybeSendUpdate(update)
|
||||
}
|
||||
|
||||
s.adjustState()
|
||||
return
|
||||
@@ -923,11 +938,6 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
|
||||
transition = track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient) // get transition again to reset above allocation attempt using available headroom
|
||||
}
|
||||
|
||||
// track is currently streaming at minimum
|
||||
if transition.BandwidthDelta == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// if there is not enough headroom, try to redistribute starting with tracks that are closest to their desired.
|
||||
bandwidthAcquired := int64(0)
|
||||
var contributingTracks []*Track
|
||||
@@ -1018,17 +1028,31 @@ func (s *StreamAllocator) maybeBoostDeficientTracks() {
|
||||
|
||||
update := NewStreamStateUpdate()
|
||||
|
||||
for _, track := range s.getMaxDistanceSortedDeficient() {
|
||||
allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup)
|
||||
if !boosted {
|
||||
continue
|
||||
sortedTracks := s.getMaxDistanceSortedDeficient()
|
||||
boost_loop:
|
||||
for {
|
||||
for idx, track := range sortedTracks {
|
||||
allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup)
|
||||
if !boosted {
|
||||
if idx == len(sortedTracks)-1 {
|
||||
// all tracks tried
|
||||
break boost_loop
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
updateStreamStateChange(track, allocation, update)
|
||||
|
||||
availableChannelCapacity -= allocation.BandwidthDelta
|
||||
if availableChannelCapacity <= 0 {
|
||||
break boost_loop
|
||||
}
|
||||
|
||||
break // sort again below as the track that was just boosted could still be farthest from its desired
|
||||
}
|
||||
|
||||
updateStreamStateChange(track, allocation, update)
|
||||
|
||||
availableChannelCapacity -= allocation.BandwidthDelta
|
||||
if availableChannelCapacity <= 0 {
|
||||
break
|
||||
sortedTracks = s.getMaxDistanceSortedDeficient()
|
||||
if len(sortedTracks) == 0 {
|
||||
break // nothing available to boost
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1103,7 +1127,7 @@ func (s *StreamAllocator) allocateAllTracks() {
|
||||
}
|
||||
|
||||
for _, track := range sorted {
|
||||
usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient)
|
||||
_, usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient)
|
||||
availableChannelCapacity -= usedChannelCapacity
|
||||
if availableChannelCapacity < 0 {
|
||||
availableChannelCapacity = 0
|
||||
@@ -1174,10 +1198,32 @@ func (s *StreamAllocator) getExpectedBandwidthUsage() int64 {
|
||||
return expected
|
||||
}
|
||||
|
||||
func (s *StreamAllocator) getExpectedBandwidthUsageWithoutTracks(filteredTracks []*Track) int64 {
|
||||
expected := int64(0)
|
||||
for _, track := range s.getTracks() {
|
||||
filtered := false
|
||||
for _, ft := range filteredTracks {
|
||||
if ft == track {
|
||||
filtered = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !filtered {
|
||||
expected += track.BandwidthRequested()
|
||||
}
|
||||
}
|
||||
|
||||
return expected
|
||||
}
|
||||
|
||||
func (s *StreamAllocator) getAvailableHeadroom(allowOverride bool) int64 {
|
||||
return s.getAvailableChannelCapacity(allowOverride) - s.getExpectedBandwidthUsage()
|
||||
}
|
||||
|
||||
func (s *StreamAllocator) getAvailableHeadroomWithoutTracks(allowOverride bool, filteredTracks []*Track) int64 {
|
||||
return s.getAvailableChannelCapacity(allowOverride) - s.getExpectedBandwidthUsageWithoutTracks(filteredTracks)
|
||||
}
|
||||
|
||||
func (s *StreamAllocator) getNackDelta() (uint32, uint32) {
|
||||
aggPacketDelta := uint32(0)
|
||||
aggRepeatedNackDelta := uint32(0)
|
||||
|
||||
@@ -164,7 +164,7 @@ func (t *Track) ProvisionalAllocateReset() {
|
||||
t.downTrack.ProvisionalAllocateReset()
|
||||
}
|
||||
|
||||
func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 {
|
||||
func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) {
|
||||
return t.downTrack.ProvisionalAllocate(availableChannelCapacity, layer, allowPause, allowOvershoot)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user