diff --git a/pkg/sfu/streamtracker/streamtracker_frame.go b/pkg/sfu/streamtracker/streamtracker_frame.go index b809cb3a6..ee80cd326 100644 --- a/pkg/sfu/streamtracker/streamtracker_frame.go +++ b/pkg/sfu/streamtracker/streamtracker_frame.go @@ -9,9 +9,10 @@ import ( ) const ( - checkInterval = 500 * time.Millisecond - staleWindowFactor = 5 - frameRateResolution = float64(0.01) // 1 frame every 100 seconds + checkInterval = 500 * time.Millisecond + frameRateResolution = float64(0.01) // 1 frame every 100 seconds + frameRateIncreaseFactor = 0.6 // slow increase + frameRateDecreaseFactor = 0.9 // fast decrease ) type StreamTrackerFrameParams struct { @@ -30,9 +31,9 @@ type StreamTrackerFrame struct { newestTS uint32 numFrames int - lowestFrameRate float64 - evalInterval time.Duration - lastStatusCheckAt time.Time + estimatedFrameRate float64 + evalInterval time.Duration + lastStatusCheckAt time.Time } func NewStreamTrackerFrame(params StreamTrackerFrameParams) StreamTrackerImpl { @@ -52,14 +53,19 @@ func (s *StreamTrackerFrame) Stop() { func (s *StreamTrackerFrame) Reset() { s.initialized = false + s.resetFPSCalculator() + + s.lastStatusCheckAt = time.Time{} +} + +func (s *StreamTrackerFrame) resetFPSCalculator() { s.tsInitialized = false s.oldestTS = 0 s.newestTS = 0 s.numFrames = 0 - s.lowestFrameRate = 0.0 + s.estimatedFrameRate = 0.0 s.updateEvalInterval() - s.lastStatusCheckAt = time.Time{} } func (s *StreamTrackerFrame) GetCheckInterval() time.Duration { @@ -67,17 +73,6 @@ func (s *StreamTrackerFrame) GetCheckInterval() time.Duration { } func (s *StreamTrackerFrame) Observe(hasMarker bool, ts uint32) StreamStatusChange { - if !s.initialized { - s.initialized = true - if hasMarker { - s.tsInitialized = true - s.oldestTS = ts - s.newestTS = ts - s.numFrames = 1 - } - return StreamStatusChangeActive - } - if hasMarker { if !s.tsInitialized { s.tsInitialized = true @@ -96,6 +91,18 @@ func (s *StreamTrackerFrame) Observe(hasMarker bool, ts uint32) StreamStatusChan s.numFrames++ } } + + // When starting up, check for first packet and declare active. + // Happens under following conditions + // 1. Start up + // 2. Unmute (stream restarting) + // 3. Layer starting after dynacast pause + if !s.initialized { + s.initialized = true + s.lastStatusCheckAt = time.Now() + return StreamStatusChangeActive + } + return StreamStatusChangeNone } @@ -105,70 +112,79 @@ func (s *StreamTrackerFrame) CheckStatus() StreamStatusChange { return StreamStatusChangeNone } - // calculate frame rate since last check - frameRate := float64(0.0) - diff := s.newestTS - s.oldestTS - if diff > 0 || s.numFrames > 1 { - if diff > s.params.ClockRate*staleWindowFactor { - s.params.Logger.Infow("eval window might be stale", "numFrames", s.numFrames, "timeElapsed", float64(diff)/float64(s.params.ClockRate)) - // STREAM-TRACKER-FRAME-TODO: might need to protect against one frame, long pause and then one or more frames, i. e. window getting stale. - // One possible option is to reset the fps measurement variables (tsInitialized, oldestTS, newestTS, numFrames, lowestFrameRate, evelInterval) - // and restart the lowest frame rate calulation process. - } - frameRate = float64(s.params.ClockRate) / float64(diff) * float64(s.numFrames-1) - frameRate = math.Round(frameRate/frameRateResolution) * frameRateResolution + if !s.updateStatusCheckTime() { + return StreamStatusChangeNone } - if s.lowestFrameRate == 0.0 { - if frameRate == 0.0 { - // need at least two frames to kick things off - return StreamStatusChangeNone - } - - s.lowestFrameRate = frameRate - s.updateEvalInterval() - s.params.Logger.Infow("initializing lowest frame rate", "lowestFPS", s.lowestFrameRate, "evalInterval", s.evalInterval) - } else { - // check only at intervals based on lowest seen frame rate - if s.lastStatusCheckAt.IsZero() { - s.lastStatusCheckAt = time.Now() - } - if time.Since(s.lastStatusCheckAt) < s.evalInterval { - return StreamStatusChangeNone - } - s.lastStatusCheckAt = time.Now() - } - - // reset for next evaluation interval - s.oldestTS = s.newestTS - s.numFrames = 1 - - // STREAM-TRACKER-FRAME-TODO: this will run into challenges for frame rate falling steeply, how to address that - // look at some referential rules (between layers) for possibilities to solve it. Currently, this is addressed - // by setting a source aware min FPS to ensure evaluation window in long enough - // update lowest seen frame rate - if frameRate > 0.0 && s.lowestFrameRate > frameRate { - s.lowestFrameRate = frameRate - s.updateEvalInterval() - s.params.Logger.Infow("updating lowest frame rate", "lowestFPS", s.lowestFrameRate, "evalInterval", s.evalInterval) - } - - if frameRate == 0.0 { + if s.updateEstimatedFrameRate() == 0.0 { + // when stream is stopped, reset FPS calculator to ensure re-start is not done until at least two frames are available, + // i. e. enough frames available to be able to calculate FPS + s.resetFPSCalculator() return StreamStatusChangeStopped } return StreamStatusChangeActive } +func (s *StreamTrackerFrame) updateStatusCheckTime() bool { + // check only at intervals based on estimated frame rate + if s.lastStatusCheckAt.IsZero() { + s.lastStatusCheckAt = time.Now() + } + if time.Since(s.lastStatusCheckAt) < s.evalInterval { + return false + } + s.lastStatusCheckAt = time.Now() + return true +} + +func (s *StreamTrackerFrame) updateEstimatedFrameRate() float64 { + frameRate := float64(0.0) + diff := s.newestTS - s.oldestTS + if diff == 0 || s.numFrames < 2 { + return 0.0 + } + + frameRate = float64(s.params.ClockRate) / float64(diff) * float64(s.numFrames-1) + frameRate = math.Round(frameRate/frameRateResolution) * frameRateResolution + + // reset for next evaluation interval + s.oldestTS = s.newestTS + s.numFrames = 1 + + factor := float64(1.0) + switch { + case s.estimatedFrameRate < frameRate: + // slow increase, prevents shortening eval interval too quickly on frame rate going up + factor = frameRateIncreaseFactor + case s.estimatedFrameRate > frameRate: + // fast decrease, prevents declaring stream stop too quickly on frame rate going down + factor = frameRateDecreaseFactor + } + + estimatedFrameRate := frameRate*factor + s.estimatedFrameRate*(1.0-factor) + estimatedFrameRate = math.Round(estimatedFrameRate/frameRateResolution) * frameRateResolution + if s.estimatedFrameRate != estimatedFrameRate { + s.estimatedFrameRate = estimatedFrameRate + s.updateEvalInterval() + s.params.Logger.Debugw("updating estimated frame rate", "estimatedFPS", estimatedFrameRate, "evalInterval", s.evalInterval) + } + + return frameRate +} + func (s *StreamTrackerFrame) updateEvalInterval() { + // STREAM-TRACKER-FRAME-TODO: This will run into challenges for frame rate falling steeply, How to address that? + // Maybe, look at some referential rules (between layers) for possibilities to solve it. Currently, this is addressed + // by setting a source aware min FPS to ensure evaluation window is long enough to avoid declaring stop too quickly. s.evalInterval = checkInterval - if s.lowestFrameRate > 0 { - lowestFrameRateInterval := time.Duration(float64(time.Second) / s.lowestFrameRate) - if lowestFrameRateInterval > s.evalInterval { - s.evalInterval = lowestFrameRateInterval + if s.estimatedFrameRate > 0.0 { + estimatedFrameRateInterval := time.Duration(float64(time.Second) / s.estimatedFrameRate) + if estimatedFrameRateInterval > s.evalInterval { + s.evalInterval = estimatedFrameRateInterval } } - if s.params.Config.MinFPS > 0 { + if s.params.Config.MinFPS > 0.0 { minFPSInterval := time.Duration(float64(time.Second) / s.params.Config.MinFPS) if minFPSInterval > s.evalInterval { s.evalInterval = minFPSInterval