diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 6ef8a60da..b10dbff76 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -177,6 +177,7 @@ type RTPStats struct { srData *RTCPSenderReportData lastSRTime time.Time lastSRNTP mediatransportutil.NtpTime + lastSRRTP uint32 pidController *PIDController nextSnapshotId uint32 @@ -189,12 +190,12 @@ func NewRTPStats(params RTPStatsParams) *RTPStats { logger: params.Logger, nextSnapshotId: FirstSnapshotId, snapshots: make(map[uint32]*Snapshot), - pidController: NewPIDController(), + pidController: NewPIDController(params.Logger), } r.pidController.SetGains(2.0, 0.5, 0.25) r.pidController.SetDerivativeLPF(0.02) - outMin, outMax := -0.025*float64(params.ClockRate), 0.025*float64(params.ClockRate) + outMin, outMax := -0.025*float64(r.params.ClockRate), 0.025*float64(r.params.ClockRate) r.pidController.SetOutputLimits(outMin, outMax) r.pidController.SetIntegralLimits(outMin/2.0, outMax/2.0) return r @@ -286,6 +287,7 @@ func (r *RTPStats) Seed(from *RTPStats) { } r.lastSRTime = from.lastSRTime r.lastSRNTP = from.lastSRNTP + r.lastSRRTP = from.lastSRRTP r.nextSnapshotId = from.nextSnapshotId for id, ss := range from.snapshots { @@ -296,6 +298,7 @@ func (r *RTPStats) Seed(from *RTPStats) { func (r *RTPStats) SetLogger(logger logger.Logger) { r.logger = logger + r.pidController.SetLogger(logger) } func (r *RTPStats) Stop() { @@ -732,7 +735,7 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { // prevent against extreme case of anachronous sender reports if r.srData != nil && r.srData.NTPTimestamp > srData.NTPTimestamp { r.logger.Debugw( - "anachronous RTCP sender report", + "received anachronous sender report", "current", srData.NTPTimestamp.Time(), "last", r.srData.NTPTimestamp.Time(), ) @@ -752,7 +755,7 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { timeSinceFirst := time.Since(r.firstTime) // ideally should use NTP time from SR, but that is a different time base, now is a resonable approximation rtpDiffSinceFirst := getExtTS(srData.RTPTimestamp, r.tsCycles) - r.extStartTS - drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst) + drift := int64(rtpDiffSinceFirst - uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)) driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate) r.logger.Debugw( @@ -768,6 +771,7 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { "rtpDiffSinceFirst", rtpDiffSinceFirst, "drift", drift, "driftMs", driftMs, + "rate", float64(rtpDiffSinceFirst)/timeSinceFirst.Seconds(), ) // TODO-REMOVE-AFTER-DEBUG-END @@ -829,7 +833,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64 expectedExtRTP := r.extStartTS + uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) if getExtTS(r.highestTS, r.tsCycles) > expectedExtRTP || now.Before(r.highestTime) { r.logger.Debugw( - "anachronous sender report", + "sending anachronous sender report", "firstTime", r.firstTime.String(), "currentTime", now.String(), "highestTime", r.highestTime.String(), @@ -844,20 +848,17 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64 nowRTP := r.highestTS + uint32(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9) // TODO-REMOVE-AFTER-DEBUG-START - timeSinceFirst = nowNTP.Time().Sub(r.firstTime) - rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS - measurement := float64(rtpDiffSinceFirst) / timeSinceFirst.Seconds() - pidOutput := r.pidController.Update( - float64(r.params.ClockRate), - measurement, - now, - ) - r.logger.Debugw( - "pid controller output", - "measurement", measurement, - "errorTerm", float64(r.params.ClockRate)-measurement, - "pidOutput", pidOutput, - ) + pidOutput := float64(0.0) + if !r.lastSRTime.IsZero() { + timeSinceLast := now.Sub(r.lastSRTime) + rtpDiffSinceLast := nowRTP - r.lastSRRTP + rate := float64(rtpDiffSinceLast) / timeSinceLast.Seconds() + pidOutput = r.pidController.Update( + float64(r.params.ClockRate), + rate, + now, + ) + } // TODO-REMOVE-AFTER-DEBUG-STOP // TODO-REMOVE-AFTER-DEBUG-START @@ -867,9 +868,9 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64 rtpDiffLocal := int32(nowRTP - r.highestTS) rtpOffsetLocal := int32(nowRTP - r.highestTS - uint32(ntpDiffLocal.Nanoseconds()*int64(r.params.ClockRate)/1e9)) - drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst) + rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS + drift := int64(rtpDiffSinceFirst - uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)) driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate) - r.logger.Debugw( "sending sender report", "highestTS", r.highestTS, @@ -884,12 +885,13 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64 "rtpDiffSinceFirst", rtpDiffSinceFirst, "drift", drift, "driftMs", driftMs, - "rate", measurement, + "rate", float64(rtpDiffSinceFirst)/timeSinceFirst.Seconds(), ) // TODO-REMOVE-AFTER-DEBUG-END r.lastSRTime = now r.lastSRNTP = nowNTP + r.lastSRRTP = nowRTP return &rtcp.SenderReport{ SSRC: ssrc, @@ -1775,6 +1777,8 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { // ------------------------------------------------------------------- type PIDController struct { + logger logger.Logger + kp, ki, kd float64 tau float64 // low pass filter of D, time constant @@ -1791,8 +1795,14 @@ type PIDController struct { prevMeasurementTime time.Time } -func NewPIDController() *PIDController { - return &PIDController{} +func NewPIDController(logger logger.Logger) *PIDController { + return &PIDController{ + logger: logger, + } +} + +func (p *PIDController) SetLogger(logger logger.Logger) { + p.logger = logger } func (p *PIDController) SetGains(kp, ki, kd float64) { @@ -1818,43 +1828,62 @@ func (p *PIDController) SetIntegralLimits(min, max float64) { } func (p *PIDController) Update(setpoint, measurement float64, at time.Time) float64 { - diff := setpoint - measurement + errorTerm := setpoint - measurement if p.prevMeasurementTime.IsZero() { - p.prevError = diff + p.prevError = errorTerm p.prevMeasurement = measurement p.prevMeasurementTime = at return 0 } - proportional := p.kp * diff - duration := at.Sub(p.prevMeasurementTime).Seconds() - p.iVal = p.iVal + (0.5 * p.ki * duration * (diff + p.prevError)) + if duration == 0 { + return 0 + } + + proportional := p.kp * errorTerm + + iVal := p.iVal + (0.5 * p.ki * duration * (errorTerm + p.prevError)) + boundIVal := iVal if p.isILimitsSet { - if p.iVal > p.iMax { - p.iVal = p.iMax + if iVal > p.iMax { + boundIVal = p.iMax } - if p.iVal < p.iMin { - p.iVal = p.iMin + if iVal < p.iMin { + boundIVal = p.iMin } } + p.iVal = boundIVal p.dVal = (-2.0*p.kd*(measurement-p.prevMeasurement) + (2.0*p.tau-duration)*p.dVal) / (2.0*p.tau + duration) output := proportional + p.iVal + p.dVal + boundOutput := output if p.isOutLimitsSet { if output > p.outMax { - output = p.outMax + boundOutput = p.outMax } if output < p.outMin { - output = p.outMin + boundOutput = p.outMin } } - p.prevError = diff + p.prevError = errorTerm p.prevMeasurement = measurement p.prevMeasurementTime = at - return output + p.logger.Debugw( + "pid controller", + "setpoint", setpoint, + "measurement", measurement, + "errorTerm", errorTerm, + "proportional", proportional, + "integral", iVal, + "integralLimited", boundIVal, + "derivative", p.dVal, + "output", output, + "outputLimited", boundOutput, + ) + return boundOutput } // ------------------------------------------------------------------- diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index b347be756..c7159d810 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1634,7 +1634,7 @@ func (f *Forwarder) AdjustTimestamp(tsAdjust float64) { f.lock.Lock() defer f.lock.Unlock() - f.rtpMunger.UpdateTsOffset(uint32(tsAdjust)) + f.rtpMunger.UpdateTsOffset(uint32(tsAdjust + 0.5)) } // ----------------------------------------------------------------------------- diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index ee5f039e7..6b79914f9 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -524,7 +524,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9 normalizedTS := srLayer.RTPTimestamp + uint32(rtpDiff) s.logger.Debugw( - "getting reference timestaml", + "getting reference timestamp", "layer", layer, "referenceLayer", referenceLayer, "incomingTS", ts,