mirror of
https://github.com/livekit/livekit.git
synced 2026-06-06 11:31:36 +00:00
Use measurement in window instead of since start. (#1695)
This captues chnages within a measurement window.
This commit is contained in:
+66
-37
@@ -177,6 +177,7 @@ type RTPStats struct {
|
||||
srData *RTCPSenderReportData
|
||||
lastSRTime time.Time
|
||||
lastSRNTP mediatransportutil.NtpTime
|
||||
lastSRRTP uint32
|
||||
pidController *PIDController
|
||||
|
||||
nextSnapshotId uint32
|
||||
@@ -189,12 +190,12 @@ func NewRTPStats(params RTPStatsParams) *RTPStats {
|
||||
logger: params.Logger,
|
||||
nextSnapshotId: FirstSnapshotId,
|
||||
snapshots: make(map[uint32]*Snapshot),
|
||||
pidController: NewPIDController(),
|
||||
pidController: NewPIDController(params.Logger),
|
||||
}
|
||||
|
||||
r.pidController.SetGains(2.0, 0.5, 0.25)
|
||||
r.pidController.SetDerivativeLPF(0.02)
|
||||
outMin, outMax := -0.025*float64(params.ClockRate), 0.025*float64(params.ClockRate)
|
||||
outMin, outMax := -0.025*float64(r.params.ClockRate), 0.025*float64(r.params.ClockRate)
|
||||
r.pidController.SetOutputLimits(outMin, outMax)
|
||||
r.pidController.SetIntegralLimits(outMin/2.0, outMax/2.0)
|
||||
return r
|
||||
@@ -286,6 +287,7 @@ func (r *RTPStats) Seed(from *RTPStats) {
|
||||
}
|
||||
r.lastSRTime = from.lastSRTime
|
||||
r.lastSRNTP = from.lastSRNTP
|
||||
r.lastSRRTP = from.lastSRRTP
|
||||
|
||||
r.nextSnapshotId = from.nextSnapshotId
|
||||
for id, ss := range from.snapshots {
|
||||
@@ -296,6 +298,7 @@ func (r *RTPStats) Seed(from *RTPStats) {
|
||||
|
||||
func (r *RTPStats) SetLogger(logger logger.Logger) {
|
||||
r.logger = logger
|
||||
r.pidController.SetLogger(logger)
|
||||
}
|
||||
|
||||
func (r *RTPStats) Stop() {
|
||||
@@ -732,7 +735,7 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
|
||||
// prevent against extreme case of anachronous sender reports
|
||||
if r.srData != nil && r.srData.NTPTimestamp > srData.NTPTimestamp {
|
||||
r.logger.Debugw(
|
||||
"anachronous RTCP sender report",
|
||||
"received anachronous sender report",
|
||||
"current", srData.NTPTimestamp.Time(),
|
||||
"last", r.srData.NTPTimestamp.Time(),
|
||||
)
|
||||
@@ -752,7 +755,7 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
|
||||
|
||||
timeSinceFirst := time.Since(r.firstTime) // ideally should use NTP time from SR, but that is a different time base, now is a resonable approximation
|
||||
rtpDiffSinceFirst := getExtTS(srData.RTPTimestamp, r.tsCycles) - r.extStartTS
|
||||
drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst)
|
||||
drift := int64(rtpDiffSinceFirst - uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9))
|
||||
driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate)
|
||||
|
||||
r.logger.Debugw(
|
||||
@@ -768,6 +771,7 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
|
||||
"rtpDiffSinceFirst", rtpDiffSinceFirst,
|
||||
"drift", drift,
|
||||
"driftMs", driftMs,
|
||||
"rate", float64(rtpDiffSinceFirst)/timeSinceFirst.Seconds(),
|
||||
)
|
||||
// TODO-REMOVE-AFTER-DEBUG-END
|
||||
|
||||
@@ -829,7 +833,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64
|
||||
expectedExtRTP := r.extStartTS + uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)
|
||||
if getExtTS(r.highestTS, r.tsCycles) > expectedExtRTP || now.Before(r.highestTime) {
|
||||
r.logger.Debugw(
|
||||
"anachronous sender report",
|
||||
"sending anachronous sender report",
|
||||
"firstTime", r.firstTime.String(),
|
||||
"currentTime", now.String(),
|
||||
"highestTime", r.highestTime.String(),
|
||||
@@ -844,20 +848,17 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64
|
||||
nowRTP := r.highestTS + uint32(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
|
||||
|
||||
// TODO-REMOVE-AFTER-DEBUG-START
|
||||
timeSinceFirst = nowNTP.Time().Sub(r.firstTime)
|
||||
rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS
|
||||
measurement := float64(rtpDiffSinceFirst) / timeSinceFirst.Seconds()
|
||||
pidOutput := r.pidController.Update(
|
||||
float64(r.params.ClockRate),
|
||||
measurement,
|
||||
now,
|
||||
)
|
||||
r.logger.Debugw(
|
||||
"pid controller output",
|
||||
"measurement", measurement,
|
||||
"errorTerm", float64(r.params.ClockRate)-measurement,
|
||||
"pidOutput", pidOutput,
|
||||
)
|
||||
pidOutput := float64(0.0)
|
||||
if !r.lastSRTime.IsZero() {
|
||||
timeSinceLast := now.Sub(r.lastSRTime)
|
||||
rtpDiffSinceLast := nowRTP - r.lastSRRTP
|
||||
rate := float64(rtpDiffSinceLast) / timeSinceLast.Seconds()
|
||||
pidOutput = r.pidController.Update(
|
||||
float64(r.params.ClockRate),
|
||||
rate,
|
||||
now,
|
||||
)
|
||||
}
|
||||
// TODO-REMOVE-AFTER-DEBUG-STOP
|
||||
|
||||
// TODO-REMOVE-AFTER-DEBUG-START
|
||||
@@ -867,9 +868,9 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64
|
||||
rtpDiffLocal := int32(nowRTP - r.highestTS)
|
||||
rtpOffsetLocal := int32(nowRTP - r.highestTS - uint32(ntpDiffLocal.Nanoseconds()*int64(r.params.ClockRate)/1e9))
|
||||
|
||||
drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst)
|
||||
rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS
|
||||
drift := int64(rtpDiffSinceFirst - uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9))
|
||||
driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate)
|
||||
|
||||
r.logger.Debugw(
|
||||
"sending sender report",
|
||||
"highestTS", r.highestTS,
|
||||
@@ -884,12 +885,13 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64
|
||||
"rtpDiffSinceFirst", rtpDiffSinceFirst,
|
||||
"drift", drift,
|
||||
"driftMs", driftMs,
|
||||
"rate", measurement,
|
||||
"rate", float64(rtpDiffSinceFirst)/timeSinceFirst.Seconds(),
|
||||
)
|
||||
// TODO-REMOVE-AFTER-DEBUG-END
|
||||
|
||||
r.lastSRTime = now
|
||||
r.lastSRNTP = nowNTP
|
||||
r.lastSRRTP = nowRTP
|
||||
|
||||
return &rtcp.SenderReport{
|
||||
SSRC: ssrc,
|
||||
@@ -1775,6 +1777,8 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
type PIDController struct {
|
||||
logger logger.Logger
|
||||
|
||||
kp, ki, kd float64
|
||||
|
||||
tau float64 // low pass filter of D, time constant
|
||||
@@ -1791,8 +1795,14 @@ type PIDController struct {
|
||||
prevMeasurementTime time.Time
|
||||
}
|
||||
|
||||
func NewPIDController() *PIDController {
|
||||
return &PIDController{}
|
||||
func NewPIDController(logger logger.Logger) *PIDController {
|
||||
return &PIDController{
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PIDController) SetLogger(logger logger.Logger) {
|
||||
p.logger = logger
|
||||
}
|
||||
|
||||
func (p *PIDController) SetGains(kp, ki, kd float64) {
|
||||
@@ -1818,43 +1828,62 @@ func (p *PIDController) SetIntegralLimits(min, max float64) {
|
||||
}
|
||||
|
||||
func (p *PIDController) Update(setpoint, measurement float64, at time.Time) float64 {
|
||||
diff := setpoint - measurement
|
||||
errorTerm := setpoint - measurement
|
||||
if p.prevMeasurementTime.IsZero() {
|
||||
p.prevError = diff
|
||||
p.prevError = errorTerm
|
||||
p.prevMeasurement = measurement
|
||||
p.prevMeasurementTime = at
|
||||
return 0
|
||||
}
|
||||
|
||||
proportional := p.kp * diff
|
||||
|
||||
duration := at.Sub(p.prevMeasurementTime).Seconds()
|
||||
p.iVal = p.iVal + (0.5 * p.ki * duration * (diff + p.prevError))
|
||||
if duration == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
proportional := p.kp * errorTerm
|
||||
|
||||
iVal := p.iVal + (0.5 * p.ki * duration * (errorTerm + p.prevError))
|
||||
boundIVal := iVal
|
||||
if p.isILimitsSet {
|
||||
if p.iVal > p.iMax {
|
||||
p.iVal = p.iMax
|
||||
if iVal > p.iMax {
|
||||
boundIVal = p.iMax
|
||||
}
|
||||
if p.iVal < p.iMin {
|
||||
p.iVal = p.iMin
|
||||
if iVal < p.iMin {
|
||||
boundIVal = p.iMin
|
||||
}
|
||||
}
|
||||
p.iVal = boundIVal
|
||||
|
||||
p.dVal = (-2.0*p.kd*(measurement-p.prevMeasurement) + (2.0*p.tau-duration)*p.dVal) / (2.0*p.tau + duration)
|
||||
|
||||
output := proportional + p.iVal + p.dVal
|
||||
boundOutput := output
|
||||
if p.isOutLimitsSet {
|
||||
if output > p.outMax {
|
||||
output = p.outMax
|
||||
boundOutput = p.outMax
|
||||
}
|
||||
if output < p.outMin {
|
||||
output = p.outMin
|
||||
boundOutput = p.outMin
|
||||
}
|
||||
}
|
||||
|
||||
p.prevError = diff
|
||||
p.prevError = errorTerm
|
||||
p.prevMeasurement = measurement
|
||||
p.prevMeasurementTime = at
|
||||
return output
|
||||
p.logger.Debugw(
|
||||
"pid controller",
|
||||
"setpoint", setpoint,
|
||||
"measurement", measurement,
|
||||
"errorTerm", errorTerm,
|
||||
"proportional", proportional,
|
||||
"integral", iVal,
|
||||
"integralLimited", boundIVal,
|
||||
"derivative", p.dVal,
|
||||
"output", output,
|
||||
"outputLimited", boundOutput,
|
||||
)
|
||||
return boundOutput
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
@@ -1634,7 +1634,7 @@ func (f *Forwarder) AdjustTimestamp(tsAdjust float64) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
f.rtpMunger.UpdateTsOffset(uint32(tsAdjust))
|
||||
f.rtpMunger.UpdateTsOffset(uint32(tsAdjust + 0.5))
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
@@ -524,7 +524,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in
|
||||
rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9
|
||||
normalizedTS := srLayer.RTPTimestamp + uint32(rtpDiff)
|
||||
s.logger.Debugw(
|
||||
"getting reference timestaml",
|
||||
"getting reference timestamp",
|
||||
"layer", layer,
|
||||
"referenceLayer", referenceLayer,
|
||||
"incomingTS", ts,
|
||||
|
||||
Reference in New Issue
Block a user