No proof that this helps (#1772)

This commit is contained in:
Raja Subramanian
2023-06-06 11:28:13 +05:30
committed by GitHub
parent 076d8cad73
commit 7ed3af193a
11 changed files with 26 additions and 295 deletions
-3
View File
@@ -89,9 +89,6 @@ type RTCConfig struct {
// force a reconnect on a subscription error
ReconnectOnSubscriptionError *bool `yaml:"reconnect_on_subscription_error,omitempty"`
// allow time stamp adjust to keep drift low, this is experimental
AllowTimestampAdjustment *bool `yaml:"allow_timestamp_adjustment,omitempty"`
}
type TURNServer struct {
-1
View File
@@ -104,7 +104,6 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
sub.GetBufferFactory(),
subscriberID,
t.params.ReceiverConfig.PacketBufferSize,
sub.GetAllowTimestampAdjustment(),
LoggerWithTrack(sub.GetLogger(), trackID, t.params.IsRelayed),
)
if err != nil {
-5
View File
@@ -93,7 +93,6 @@ type ParticipantParams struct {
SubscriberAllowPause bool
SubscriptionLimitAudio int32
SubscriptionLimitVideo int32
AllowTimestampAdjustment bool
}
type ParticipantImpl struct {
@@ -233,10 +232,6 @@ func (p *ParticipantImpl) GetAdaptiveStream() bool {
return p.params.AdaptiveStream
}
func (p *ParticipantImpl) GetAllowTimestampAdjustment() bool {
return p.params.AllowTimestampAdjustment
}
func (p *ParticipantImpl) ID() livekit.ParticipantID {
return p.params.SID
}
-2
View File
@@ -341,8 +341,6 @@ type LocalParticipant interface {
// down stream bandwidth management
SetSubscriberAllowPause(allowPause bool)
SetSubscriberChannelCapacity(channelCapacity int64)
GetAllowTimestampAdjustment() bool
}
// Room is a container of participants, and can provide room-level actions
@@ -165,16 +165,6 @@ type FakeLocalParticipant struct {
getAdaptiveStreamReturnsOnCall map[int]struct {
result1 bool
}
GetAllowTimestampAdjustmentStub func() bool
getAllowTimestampAdjustmentMutex sync.RWMutex
getAllowTimestampAdjustmentArgsForCall []struct {
}
getAllowTimestampAdjustmentReturns struct {
result1 bool
}
getAllowTimestampAdjustmentReturnsOnCall map[int]struct {
result1 bool
}
GetAudioLevelStub func() (float64, bool)
getAudioLevelMutex sync.RWMutex
getAudioLevelArgsForCall []struct {
@@ -1622,59 +1612,6 @@ func (fake *FakeLocalParticipant) GetAdaptiveStreamReturnsOnCall(i int, result1
}{result1}
}
func (fake *FakeLocalParticipant) GetAllowTimestampAdjustment() bool {
fake.getAllowTimestampAdjustmentMutex.Lock()
ret, specificReturn := fake.getAllowTimestampAdjustmentReturnsOnCall[len(fake.getAllowTimestampAdjustmentArgsForCall)]
fake.getAllowTimestampAdjustmentArgsForCall = append(fake.getAllowTimestampAdjustmentArgsForCall, struct {
}{})
stub := fake.GetAllowTimestampAdjustmentStub
fakeReturns := fake.getAllowTimestampAdjustmentReturns
fake.recordInvocation("GetAllowTimestampAdjustment", []interface{}{})
fake.getAllowTimestampAdjustmentMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentCallCount() int {
fake.getAllowTimestampAdjustmentMutex.RLock()
defer fake.getAllowTimestampAdjustmentMutex.RUnlock()
return len(fake.getAllowTimestampAdjustmentArgsForCall)
}
func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentCalls(stub func() bool) {
fake.getAllowTimestampAdjustmentMutex.Lock()
defer fake.getAllowTimestampAdjustmentMutex.Unlock()
fake.GetAllowTimestampAdjustmentStub = stub
}
func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentReturns(result1 bool) {
fake.getAllowTimestampAdjustmentMutex.Lock()
defer fake.getAllowTimestampAdjustmentMutex.Unlock()
fake.GetAllowTimestampAdjustmentStub = nil
fake.getAllowTimestampAdjustmentReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentReturnsOnCall(i int, result1 bool) {
fake.getAllowTimestampAdjustmentMutex.Lock()
defer fake.getAllowTimestampAdjustmentMutex.Unlock()
fake.GetAllowTimestampAdjustmentStub = nil
if fake.getAllowTimestampAdjustmentReturnsOnCall == nil {
fake.getAllowTimestampAdjustmentReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.getAllowTimestampAdjustmentReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) GetAudioLevel() (float64, bool) {
fake.getAudioLevelMutex.Lock()
ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)]
@@ -5519,8 +5456,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.debugInfoMutex.RUnlock()
fake.getAdaptiveStreamMutex.RLock()
defer fake.getAdaptiveStreamMutex.RUnlock()
fake.getAllowTimestampAdjustmentMutex.RLock()
defer fake.getAllowTimestampAdjustmentMutex.RUnlock()
fake.getAudioLevelMutex.RLock()
defer fake.getAudioLevelMutex.RUnlock()
fake.getBufferFactoryMutex.RLock()
-6
View File
@@ -310,11 +310,6 @@ func (r *RoomManager) StartSession(
if pi.SubscriberAllowPause != nil {
subscriberAllowPause = *pi.SubscriberAllowPause
}
// default do not allow timestamp adjustment
allowTimestampAdjustment := false
if r.config.RTC.AllowTimestampAdjustment != nil {
allowTimestampAdjustment = *r.config.RTC.AllowTimestampAdjustment
}
participant, err = rtc.NewParticipant(rtc.ParticipantParams{
Identity: pi.Identity,
Name: pi.Name,
@@ -349,7 +344,6 @@ func (r *RoomManager) StartSession(
SubscriberAllowPause: subscriberAllowPause,
SubscriptionLimitAudio: r.config.Limit.SubscriptionLimitAudio,
SubscriptionLimitVideo: r.config.Limit.SubscriptionLimitVideo,
AllowTimestampAdjustment: allowTimestampAdjustment,
})
if err != nil {
return err
+9 -144
View File
@@ -17,12 +17,11 @@ import (
)
const (
GapHistogramNumBins = 101
NumSequenceNumbers = 65536
FirstSnapshotId = 1
SnInfoSize = 8192
SnInfoMask = SnInfoSize - 1
MeasurementWindowSecondsMin = float64(5.0)
GapHistogramNumBins = 101
NumSequenceNumbers = 65536
FirstSnapshotId = 1
SnInfoSize = 8192
SnInfoMask = SnInfoSize - 1
)
// -------------------------------------------------------
@@ -201,27 +200,17 @@ type RTPStats struct {
srFirst *RTCPSenderReportData
srNewest *RTCPSenderReportData
pidController *PIDController
nextSnapshotId uint32
snapshots map[uint32]*Snapshot
}
func NewRTPStats(params RTPStatsParams) *RTPStats {
r := &RTPStats{
return &RTPStats{
params: params,
logger: params.Logger,
nextSnapshotId: FirstSnapshotId,
snapshots: make(map[uint32]*Snapshot),
pidController: NewPIDController(params.Logger),
}
r.pidController.SetGains(2.0, 0.5, 0.25)
r.pidController.SetDerivativeLPF(0.02)
outMin, outMax := -0.025*float64(r.params.ClockRate), 0.025*float64(r.params.ClockRate)
r.pidController.SetOutputLimits(outMin, outMax)
r.pidController.SetIntegralLimits(outMin/2.0, outMax/2.0)
return r
}
func (r *RTPStats) Seed(from *RTPStats) {
@@ -324,7 +313,6 @@ func (r *RTPStats) Seed(from *RTPStats) {
func (r *RTPStats) SetLogger(logger logger.Logger) {
r.logger = logger
r.pidController.SetLogger(logger)
}
func (r *RTPStats) Stop() {
@@ -897,12 +885,12 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint64, error)
return uint32(expectedExtRTP), minTS, nil
}
func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) (*rtcp.SenderReport, float64) {
func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) *rtcp.SenderReport {
r.lock.Lock()
defer r.lock.Unlock()
if !r.initialized {
return nil, 0.0
return nil
}
// construct current time based on monotonic clock
@@ -933,17 +921,6 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srFirst *RTCPSenderReportDat
}
}
pidOutput := float64(0.0)
if timeSinceFirst.Seconds() > MeasurementWindowSecondsMin {
rtpDiffSinceFirst := nowRTPExt - r.extStartTS
rate := float64(rtpDiffSinceFirst) / timeSinceFirst.Seconds()
pidOutput = r.pidController.Update(
float64(r.params.ClockRate),
rate,
now,
)
}
// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
var rtpDiffSinceLast uint32
@@ -1012,7 +989,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srFirst *RTCPSenderReportDat
RTPTime: nowRTP,
PacketCount: r.getTotalPacketsPrimary() + r.packetsDuplicate + r.packetsPadding,
OctetCount: uint32(r.bytes + r.bytesDuplicate + r.bytesPadding),
}, pidOutput
}
}
func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, snapshotId uint32) *rtcp.ReceptionReport {
@@ -1922,115 +1899,3 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
}
// -------------------------------------------------------------------
type PIDController struct {
logger logger.Logger
kp, ki, kd float64
tau float64 // low pass filter of D, time constant
outMin, outMax float64
isOutLimitsSet bool
iMin, iMax float64
isILimitsSet bool
iVal, dVal float64
prevError, prevMeasurement float64
prevMeasurementTime time.Time
}
func NewPIDController(logger logger.Logger) *PIDController {
return &PIDController{
logger: logger,
}
}
func (p *PIDController) SetLogger(logger logger.Logger) {
p.logger = logger
}
func (p *PIDController) SetGains(kp, ki, kd float64) {
p.kp = kp
p.ki = ki
p.kd = kd
}
func (p *PIDController) SetDerivativeLPF(tau float64) {
p.tau = tau
}
func (p *PIDController) SetOutputLimits(min, max float64) {
p.outMin = min
p.outMax = max
p.isOutLimitsSet = true
}
func (p *PIDController) SetIntegralLimits(min, max float64) {
p.iMin = min
p.iMax = max
p.isILimitsSet = true
}
func (p *PIDController) Update(setpoint, measurement float64, at time.Time) float64 {
errorTerm := setpoint - measurement
if p.prevMeasurementTime.IsZero() {
p.prevError = errorTerm
p.prevMeasurement = measurement
p.prevMeasurementTime = at
return 0
}
duration := at.Sub(p.prevMeasurementTime).Seconds()
if duration == 0 {
return 0
}
proportional := p.kp * errorTerm
iVal := p.iVal + (0.5 * p.ki * duration * (errorTerm + p.prevError))
boundIVal := iVal
if p.isILimitsSet {
if iVal > p.iMax {
boundIVal = p.iMax
}
if iVal < p.iMin {
boundIVal = p.iMin
}
}
p.iVal = boundIVal
p.dVal = -(2.0*p.kd*(measurement-p.prevMeasurement) + (2.0*p.tau-duration)*p.dVal) / (2.0*p.tau + duration)
output := proportional + p.iVal + p.dVal
boundOutput := output
if p.isOutLimitsSet {
if output > p.outMax {
boundOutput = p.outMax
}
if output < p.outMin {
boundOutput = p.outMin
}
}
p.prevError = errorTerm
p.prevMeasurement = measurement
p.prevMeasurementTime = at
p.logger.Debugw(
"pid controller",
"setpoint", setpoint,
"measurement", measurement,
"errorTerm", errorTerm,
"proportional", proportional,
"integral", iVal,
"integralLimited", boundIVal,
"derivative", p.dVal,
"output", output,
"outputLimited", boundOutput,
)
return boundOutput
}
// -------------------------------------------------------------------
+11 -19
View File
@@ -185,8 +185,6 @@ type DownTrack struct {
sequencer *sequencer
bufferFactory *buffer.Factory
allowTimestampAdjustment bool
forwarder *Forwarder
upstreamCodecs []webrtc.RTPCodecParameters
@@ -254,7 +252,6 @@ func NewDownTrack(
bf *buffer.Factory,
subID livekit.ParticipantID,
mt int,
allowTimestampAdjustment bool,
logger logger.Logger,
) (*DownTrack, error) {
var kind webrtc.RTPCodecType
@@ -268,17 +265,16 @@ func NewDownTrack(
}
d := &DownTrack{
logger: logger,
id: r.TrackID(),
subscriberID: subID,
maxTrack: mt,
streamID: r.StreamID(),
bufferFactory: bf,
allowTimestampAdjustment: allowTimestampAdjustment,
receiver: r,
upstreamCodecs: codecs,
kind: kind,
codec: codecs[0].RTPCodecCapability,
logger: logger,
id: r.TrackID(),
subscriberID: subID,
maxTrack: mt,
streamID: r.StreamID(),
bufferFactory: bf,
receiver: r,
upstreamCodecs: codecs,
kind: kind,
codec: codecs[0].RTPCodecCapability,
}
d.forwarder = NewForwarder(
d.kind,
@@ -1123,11 +1119,7 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport {
}
srFirst, srNewest := d.receiver.GetRTCPSenderReportData(d.forwarder.GetReferenceLayerSpatial())
sr, tsAdjust := d.rtpStats.GetRtcpSenderReport(d.ssrc, srFirst, srNewest)
if d.allowTimestampAdjustment {
d.forwarder.AdjustTimestamp(tsAdjust)
}
return sr
return d.rtpStats.GetRtcpSenderReport(d.ssrc, srFirst, srNewest)
}
func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} {
-7
View File
@@ -1706,13 +1706,6 @@ func (f *Forwarder) GetRTPMungerParams() RTPMungerParams {
return f.rtpMunger.GetParams()
}
func (f *Forwarder) AdjustTimestamp(tsAdjust float64) {
f.lock.Lock()
defer f.lock.Unlock()
f.rtpMunger.UpdateTsOffset(uint32(tsAdjust + 0.5))
}
// -----------------------------------------------------------------------------
func getOptimalBandwidthNeeded(muted bool, pubMuted bool, maxPublishedLayer int32, brs Bitrates, maxLayer buffer.VideoLayer) int64 {
+6 -40
View File
@@ -52,14 +52,12 @@ func (r RTPMungerState) String() string {
// ----------------------------------------------------------------------
type RTPMungerParams struct {
highestIncomingSN uint16
lastSN uint16
snOffset uint16
highestIncomingTS uint32
lastTS uint32
tsOffset uint32
tsOffsetAdjustment uint32
lastMarker bool
highestIncomingSN uint16
lastSN uint16
snOffset uint16
lastTS uint32
tsOffset uint32
lastMarker bool
snOffsets [SnOffsetCacheSize]uint16
snOffsetsWritePtr int
@@ -86,7 +84,6 @@ func (r *RTPMunger) GetParams() RTPMungerParams {
highestIncomingSN: r.highestIncomingSN,
lastSN: r.lastSN,
snOffset: r.snOffset,
highestIncomingTS: r.highestIncomingTS,
lastTS: r.lastTS,
tsOffset: r.tsOffset,
lastMarker: r.lastMarker,
@@ -107,14 +104,12 @@ func (r *RTPMunger) SeedLast(state RTPMungerState) {
func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1
r.highestIncomingTS = extPkt.Packet.Timestamp
r.lastSN = extPkt.Packet.SequenceNumber
r.lastTS = extPkt.Packet.Timestamp
}
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32) {
r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1
r.highestIncomingTS = extPkt.Packet.Timestamp
r.snOffset = extPkt.Packet.SequenceNumber - r.lastSN - snAdjust
r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust
@@ -131,10 +126,6 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) {
r.lastSN = extPkt.Packet.SequenceNumber - r.snOffset
}
func (r *RTPMunger) UpdateTsOffset(tsAdjust uint32) {
r.tsOffsetAdjustment = tsAdjust
}
func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) {
// if out-of-order, look up sequence number offset cache
diff := extPkt.Packet.SequenceNumber - r.highestIncomingSN
@@ -188,12 +179,6 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
}
}
// apply timestamp offset adjustment at the start of a frame only
if extPkt.Packet.Timestamp != r.highestIncomingTS && r.tsOffsetAdjustment != 0 {
r.tsOffset -= r.tsOffsetAdjustment
r.tsOffsetAdjustment = 0
}
// in-order incoming packet, may or may not be contiguous.
// In the case of loss (i.e. incoming sequence number is not contiguous),
// forward even if it is a padding only packet. With temporal scalability,
@@ -203,27 +188,8 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
mungedSN := extPkt.Packet.SequenceNumber - r.snOffset
mungedTS := extPkt.Packet.Timestamp - r.tsOffset
// with timestamp adjustment, it is possible that the adjustment causes munged timestamp to move backwards,
// detect that and adjust so that it does not move back
if extPkt.Packet.Timestamp != r.highestIncomingTS && (((mungedTS - r.lastTS) == 0) || (mungedTS-r.lastTS) > (1<<31)) {
adjustedMungedTS := r.lastTS + 1
adjustedTSOffset := extPkt.Packet.Timestamp - adjustedMungedTS
r.logger.Debugw(
"adjust out-of-order timestamp offset",
"mungedTS", mungedTS,
"lastTS", r.lastTS,
"incomingTS", extPkt.Packet.Timestamp,
"offset", r.tsOffset,
"adjustedMungedTS", adjustedMungedTS,
"adjustedTSOffset", adjustedTSOffset,
)
mungedTS = adjustedMungedTS
r.tsOffset = adjustedTSOffset
}
r.highestIncomingSN = extPkt.Packet.SequenceNumber
r.lastSN = mungedSN
r.highestIncomingTS = extPkt.Packet.Timestamp
r.lastTS = mungedTS
r.lastMarker = extPkt.Packet.Marker
-3
View File
@@ -28,7 +28,6 @@ func TestSetLastSnTs(t *testing.T) {
r.SetLastSnTs(extPkt)
require.Equal(t, uint16(23332), r.highestIncomingSN)
require.Equal(t, uint32(0xabcdef), r.highestIncomingTS)
require.Equal(t, uint16(23333), r.lastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint16(0), r.snOffset)
@@ -54,7 +53,6 @@ func TestUpdateSnTsOffsets(t *testing.T) {
extPkt, _ = testutils.GetTestExtPacket(params)
r.UpdateSnTsOffsets(extPkt, 1, 1)
require.Equal(t, uint16(33332), r.highestIncomingSN)
require.Equal(t, uint32(0xabcdef), r.highestIncomingTS)
require.Equal(t, uint16(23333), r.lastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint16(9999), r.snOffset)
@@ -73,7 +71,6 @@ func TestPacketDropped(t *testing.T) {
extPkt, _ := testutils.GetTestExtPacket(params)
r.SetLastSnTs(extPkt)
require.Equal(t, uint16(23332), r.highestIncomingSN)
require.Equal(t, uint32(0xabcdef), r.highestIncomingTS)
require.Equal(t, uint16(23333), r.lastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint16(0), r.snOffset)