diff --git a/pkg/config/config.go b/pkg/config/config.go index 1fceaebc9..0c9d6bb18 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -28,6 +28,8 @@ import ( "gopkg.in/yaml.v3" "github.com/livekit/livekit-server/pkg/metric" + "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/streamallocator" "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -35,23 +37,8 @@ import ( "github.com/livekit/protocol/rpc" ) -type ( - CongestionControlProbeMode string - StreamTrackerType string -) - const ( generatedCLIFlagUsage = "generated" - - CongestionControlProbeModePadding CongestionControlProbeMode = "padding" - CongestionControlProbeModeMedia CongestionControlProbeMode = "media" - - StreamTrackerTypePacket StreamTrackerType = "packet" - StreamTrackerTypeFrame StreamTrackerType = "frame" - - StatsUpdateInterval = time.Second * 10 - TelemetryStatsUpdateInterval = time.Second * 30 - TelemetryNonMediaStatsUpdateInterval = time.Minute * 5 ) var ( @@ -67,7 +54,7 @@ type Config struct { Prometheus PrometheusConfig `yaml:"prometheus,omitempty"` RTC RTCConfig `yaml:"rtc,omitempty"` Redis redisLiveKit.RedisConfig `yaml:"redis,omitempty"` - Audio AudioConfig `yaml:"audio,omitempty"` + Audio sfu.AudioConfig `yaml:"audio,omitempty"` Video VideoConfig `yaml:"video,omitempty"` Room RoomConfig `yaml:"room,omitempty"` TURN TURNConfig `yaml:"turn,omitempty"` @@ -105,7 +92,7 @@ type RTCConfig struct { PacketBufferSizeAudio int `yaml:"packet_buffer_size_audio,omitempty"` // Throttle periods for pli/fir rtcp packets - PLIThrottle PLIThrottleConfig `yaml:"pli_throttle,omitempty"` + PLIThrottle sfu.PLIThrottleConfig `yaml:"pli_throttle,omitempty"` CongestionControl CongestionControlConfig `yaml:"congestion_control,omitempty"` @@ -135,93 +122,11 @@ type TURNServer struct { Credential string `yaml:"credential,omitempty"` } -type PLIThrottleConfig struct { - LowQuality time.Duration `yaml:"low_quality,omitempty"` - MidQuality time.Duration `yaml:"mid_quality,omitempty"` - HighQuality time.Duration `yaml:"high_quality,omitempty"` -} - -type CongestionControlProbeConfig struct { - BaseInterval time.Duration `yaml:"base_interval,omitempty"` - BackoffFactor float64 `yaml:"backoff_factor,omitempty"` - MaxInterval time.Duration `yaml:"max_interval,omitempty"` - - SettleWait time.Duration `yaml:"settle_wait,omitempty"` - SettleWaitMax time.Duration `yaml:"settle_wait_max,omitempty"` - - TrendWait time.Duration `yaml:"trend_wait,omitempty"` - - OveragePct int64 `yaml:"overage_pct,omitempty"` - MinBps int64 `yaml:"min_bps,omitempty"` - MinDuration time.Duration `yaml:"min_duration,omitempty"` - MaxDuration time.Duration `yaml:"max_duration,omitempty"` - DurationOverflowFactor float64 `yaml:"duration_overflow_factor,omitempty"` - DurationIncreaseFactor float64 `yaml:"duration_increase_factor,omitempty"` -} - -type CongestionControlChannelObserverConfig struct { - EstimateRequiredSamples int `yaml:"estimate_required_samples,omitempty"` - EstimateRequiredSamplesMin int `yaml:"estimate_required_samples_min,omitempty"` - EstimateDownwardTrendThreshold float64 `yaml:"estimate_downward_trend_threshold,omitempty"` - EstimateDownwardTrendMaxWait time.Duration `yaml:"estimate_downward_trend_max_wait,omitempty"` - EstimateCollapseThreshold time.Duration `yaml:"estimate_collapse_threshold,omitempty"` - EstimateValidityWindow time.Duration `yaml:"estimate_validity_window,omitempty"` - NackWindowMinDuration time.Duration `yaml:"nack_window_min_duration,omitempty"` - NackWindowMaxDuration time.Duration `yaml:"nack_window_max_duration,omitempty"` - NackRatioThreshold float64 `yaml:"nack_ratio_threshold,omitempty"` -} - type CongestionControlConfig struct { - Enabled bool `yaml:"enabled,omitempty"` - AllowPause bool `yaml:"allow_pause,omitempty"` - NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"` - ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"` - UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"` - ProbeMode CongestionControlProbeMode `yaml:"probe_mode,omitempty"` - MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"` - ProbeConfig CongestionControlProbeConfig `yaml:"probe_config,omitempty"` - ChannelObserverProbeConfig CongestionControlChannelObserverConfig `yaml:"channel_observer_probe_config,omitempty"` - ChannelObserverNonProbeConfig CongestionControlChannelObserverConfig `yaml:"channel_observer_non_probe_config,omitempty"` - DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"` -} - -type AudioConfig struct { - // minimum level to be considered active, 0-127, where 0 is loudest - ActiveLevel uint8 `yaml:"active_level,omitempty"` - // percentile to measure, a participant is considered active if it has exceeded the ActiveLevel more than - // MinPercentile% of the time - MinPercentile uint8 `yaml:"min_percentile,omitempty"` - // interval to update clients, in ms - UpdateInterval uint32 `yaml:"update_interval,omitempty"` - // smoothing for audioLevel values sent to the client. - // audioLevel will be an average of `smooth_intervals`, 0 to disable - SmoothIntervals uint32 `yaml:"smooth_intervals,omitempty"` - // enable red encoding downtrack for opus only audio up track - ActiveREDEncoding bool `yaml:"active_red_encoding,omitempty"` - // enable proxying weakest subscriber loss to publisher in RTCP Receiver Report - EnableLossProxying bool `yaml:"enable_loss_proxying,omitempty"` -} - -type StreamTrackerPacketConfig struct { - SamplesRequired uint32 `yaml:"samples_required,omitempty"` // number of samples needed per cycle - CyclesRequired uint32 `yaml:"cycles_required,omitempty"` // number of cycles needed to be active - CycleDuration time.Duration `yaml:"cycle_duration,omitempty"` -} - -type StreamTrackerFrameConfig struct { - MinFPS float64 `yaml:"min_fps,omitempty"` -} - -type StreamTrackerConfig struct { - StreamTrackerType StreamTrackerType `yaml:"stream_tracker_type,omitempty"` - BitrateReportInterval map[int32]time.Duration `yaml:"bitrate_report_interval,omitempty"` - PacketTracker map[int32]StreamTrackerPacketConfig `yaml:"packet_tracker,omitempty"` - FrameTracker map[int32]StreamTrackerFrameConfig `yaml:"frame_tracker,omitempty"` -} - -type StreamTrackersConfig struct { - Video StreamTrackerConfig `yaml:"video,omitempty"` - Screenshare StreamTrackerConfig `yaml:"screenshare,omitempty"` + Enabled bool `yaml:"enabled,omitempty"` + AllowPause bool `yaml:"allow_pause,omitempty"` + StreamAllocator streamallocator.StreamAllocatorConfig `yaml:"stream_allocator,omitempty"` + UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"` } type PlayoutDelayConfig struct { @@ -231,8 +136,8 @@ type PlayoutDelayConfig struct { } type VideoConfig struct { - DynacastPauseDelay time.Duration `yaml:"dynacast_pause_delay,omitempty"` - StreamTracker StreamTrackersConfig `yaml:"stream_tracker,omitempty"` + DynacastPauseDelay time.Duration `yaml:"dynacast_pause_delay,omitempty"` + StreamTrackerManager sfu.StreamTrackerManagerConfig `yaml:"stream_tracker_manager,omitempty"` } type RoomConfig struct { @@ -398,140 +303,15 @@ var DefaultConfig = Config{ PacketBufferSizeVideo: 500, PacketBufferSizeAudio: 200, StrictACKs: true, - PLIThrottle: PLIThrottleConfig{ - LowQuality: 500 * time.Millisecond, - MidQuality: time.Second, - HighQuality: time.Second, - }, + PLIThrottle: sfu.DefaultPLIThrottleConfig, CongestionControl: CongestionControlConfig{ - Enabled: true, - AllowPause: false, - NackRatioAttenuator: 0.4, - ExpectedUsageThreshold: 0.95, - ProbeMode: CongestionControlProbeModePadding, - ProbeConfig: CongestionControlProbeConfig{ - BaseInterval: 3 * time.Second, - BackoffFactor: 1.5, - MaxInterval: 2 * time.Minute, - - SettleWait: 250 * time.Millisecond, - SettleWaitMax: 10 * time.Second, - - TrendWait: 2 * time.Second, - - OveragePct: 120, - MinBps: 200_000, - MinDuration: 200 * time.Millisecond, - MaxDuration: 20 * time.Second, - DurationOverflowFactor: 1.25, - DurationIncreaseFactor: 1.5, - }, - ChannelObserverProbeConfig: CongestionControlChannelObserverConfig{ - EstimateRequiredSamples: 3, - EstimateRequiredSamplesMin: 3, - EstimateDownwardTrendThreshold: 0.0, - EstimateDownwardTrendMaxWait: 5 * time.Second, - EstimateCollapseThreshold: 0, - EstimateValidityWindow: 10 * time.Second, - NackWindowMinDuration: 500 * time.Millisecond, - NackWindowMaxDuration: 1 * time.Second, - NackRatioThreshold: 0.04, - }, - ChannelObserverNonProbeConfig: CongestionControlChannelObserverConfig{ - EstimateRequiredSamples: 12, - EstimateRequiredSamplesMin: 8, - EstimateDownwardTrendThreshold: -0.6, - EstimateDownwardTrendMaxWait: 5 * time.Second, - EstimateCollapseThreshold: 500 * time.Millisecond, - EstimateValidityWindow: 10 * time.Second, - NackWindowMinDuration: 2 * time.Second, - NackWindowMaxDuration: 3 * time.Second, - NackRatioThreshold: 0.08, - }, + StreamAllocator: streamallocator.DefaultStreamAllocatorConfig, }, }, - Audio: AudioConfig{ - ActiveLevel: 35, // -35dBov - MinPercentile: 40, - UpdateInterval: 400, - SmoothIntervals: 2, - }, + Audio: sfu.DefaultAudioConfig, Video: VideoConfig{ - DynacastPauseDelay: 5 * time.Second, - StreamTracker: StreamTrackersConfig{ - Video: StreamTrackerConfig{ - StreamTrackerType: StreamTrackerTypePacket, - BitrateReportInterval: map[int32]time.Duration{ - 0: 1 * time.Second, - 1: 1 * time.Second, - 2: 1 * time.Second, - }, - PacketTracker: map[int32]StreamTrackerPacketConfig{ - 0: { - SamplesRequired: 1, - CyclesRequired: 4, - CycleDuration: 500 * time.Millisecond, - }, - 1: { - SamplesRequired: 5, - CyclesRequired: 20, - CycleDuration: 500 * time.Millisecond, - }, - 2: { - SamplesRequired: 5, - CyclesRequired: 20, - CycleDuration: 500 * time.Millisecond, - }, - }, - FrameTracker: map[int32]StreamTrackerFrameConfig{ - 0: { - MinFPS: 5.0, - }, - 1: { - MinFPS: 5.0, - }, - 2: { - MinFPS: 5.0, - }, - }, - }, - Screenshare: StreamTrackerConfig{ - StreamTrackerType: StreamTrackerTypePacket, - BitrateReportInterval: map[int32]time.Duration{ - 0: 4 * time.Second, - 1: 4 * time.Second, - 2: 4 * time.Second, - }, - PacketTracker: map[int32]StreamTrackerPacketConfig{ - 0: { - SamplesRequired: 1, - CyclesRequired: 1, - CycleDuration: 2 * time.Second, - }, - 1: { - SamplesRequired: 1, - CyclesRequired: 1, - CycleDuration: 2 * time.Second, - }, - 2: { - SamplesRequired: 1, - CyclesRequired: 1, - CycleDuration: 2 * time.Second, - }, - }, - FrameTracker: map[int32]StreamTrackerFrameConfig{ - 0: { - MinFPS: 0.5, - }, - 1: { - MinFPS: 0.5, - }, - 2: { - MinFPS: 0.5, - }, - }, - }, - }, + DynacastPauseDelay: 5 * time.Second, + StreamTrackerManager: sfu.DefaultStreamTrackerManagerConfig, }, Redis: redisLiveKit.RedisConfig{}, Room: RoomConfig{ diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 70421f5b0..57dc3c02b 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -65,8 +65,8 @@ type MediaTrackParams struct { BufferFactory *buffer.Factory ReceiverConfig ReceiverConfig SubscriberConfig DirectionConfig - PLIThrottleConfig config.PLIThrottleConfig - AudioConfig config.AudioConfig + PLIThrottleConfig sfu.PLIThrottleConfig + AudioConfig sfu.AudioConfig VideoConfig config.VideoConfig Telemetry telemetry.TelemetryService Logger logger.Logger @@ -280,7 +280,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra ti, LoggerWithCodecMime(t.params.Logger, mime), t.params.OnRTCP, - t.params.VideoConfig.StreamTracker, + t.params.VideoConfig.StreamTrackerManager, sfu.WithPliThrottleConfig(t.params.PLIThrottleConfig), sfu.WithAudioConfig(t.params.AudioConfig), sfu.WithLoadBalanceThreshold(20), diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 6c52cb727..ffeff5f58 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -32,7 +32,6 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" @@ -91,7 +90,7 @@ type MediaTrackReceiverParams struct { ParticipantVersion uint32 ReceiverConfig ReceiverConfig SubscriberConfig DirectionConfig - AudioConfig config.AudioConfig + AudioConfig sfu.AudioConfig Telemetry telemetry.TelemetryService Logger logger.Logger } diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 4a5a734d9..d5ea78462 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -113,14 +113,14 @@ type ParticipantParams struct { SID livekit.ParticipantID Config *WebRTCConfig Sink routing.MessageSink - AudioConfig config.AudioConfig + AudioConfig sfu.AudioConfig VideoConfig config.VideoConfig LimitConfig config.LimitConfig ProtocolVersion types.ProtocolVersion SessionStartTime time.Time Telemetry telemetry.TelemetryService Trailer []byte - PLIThrottleConfig config.PLIThrottleConfig + PLIThrottleConfig sfu.PLIThrottleConfig CongestionControlConfig config.CongestionControlConfig // codecs that are enabled for this room PublishEnabledCodecs []*livekit.Codec diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 306d7b04a..f94532bb5 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -39,6 +39,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" "github.com/livekit/livekit-server/pkg/telemetry" @@ -107,7 +108,7 @@ type Room struct { Logger logger.Logger config WebRTCConfig - audioConfig *config.AudioConfig + audioConfig *sfu.AudioConfig serverInfo *livekit.ServerInfo telemetry telemetry.TelemetryService egressLauncher EgressLauncher @@ -234,7 +235,7 @@ func NewRoom( internal *livekit.RoomInternal, config WebRTCConfig, roomConfig config.RoomConfig, - audioConfig *config.AudioConfig, + audioConfig *sfu.AudioConfig, serverInfo *livekit.ServerInfo, telemetry telemetry.TelemetryService, agentClient agent.Client, diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 37b0df7a6..ff106f634 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -31,6 +31,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/rtc/types/typesfakes" + "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/audio" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" @@ -802,7 +803,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room { EmptyTimeout: 5 * 60, DepartureTimeout: 1, }, - &config.AudioConfig{ + &sfu.AudioConfig{ UpdateInterval: audioUpdateInterval, SmoothIntervals: opts.audioSmoothIntervals, }, diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index fba7f6c51..644096721 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -444,9 +444,9 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) { } if params.IsSendSide { t.streamAllocator = streamallocator.NewStreamAllocator(streamallocator.StreamAllocatorParams{ - Config: params.CongestionControlConfig, + Config: params.CongestionControlConfig.StreamAllocator, Logger: params.Logger.WithComponent(utils.ComponentCongestionControl), - }) + }, params.CongestionControlConfig.Enabled, params.CongestionControlConfig.AllowPause) t.streamAllocator.OnStreamStateChange(params.Handler.OnStreamStateChange) t.streamAllocator.Start() t.pacer = pacer.NewPassThrough(params.Logger) diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 207765033..c0b1f8ae1 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -30,7 +30,6 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/sfu/audio" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" @@ -45,6 +44,52 @@ var ( ErrDuplicateLayer = errors.New("duplicate layer") ) +// -------------------------------------- + +type PLIThrottleConfig struct { + LowQuality time.Duration `yaml:"low_quality,omitempty"` + MidQuality time.Duration `yaml:"mid_quality,omitempty"` + HighQuality time.Duration `yaml:"high_quality,omitempty"` +} + +var ( + DefaultPLIThrottleConfig = PLIThrottleConfig{ + LowQuality: 500 * time.Millisecond, + MidQuality: time.Second, + HighQuality: time.Second, + } +) + +// -------------------------------------- + +type AudioConfig struct { + // minimum level to be considered active, 0-127, where 0 is loudest + ActiveLevel uint8 `yaml:"active_level,omitempty"` + // percentile to measure, a participant is considered active if it has exceeded the ActiveLevel more than + // MinPercentile% of the time + MinPercentile uint8 `yaml:"min_percentile,omitempty"` + // interval to update clients, in ms + UpdateInterval uint32 `yaml:"update_interval,omitempty"` + // smoothing for audioLevel values sent to the client. + // audioLevel will be an average of `smooth_intervals`, 0 to disable + SmoothIntervals uint32 `yaml:"smooth_intervals,omitempty"` + // enable red encoding downtrack for opus only audio up track + ActiveREDEncoding bool `yaml:"active_red_encoding,omitempty"` + // enable proxying weakest subscriber loss to publisher in RTCP Receiver Report + EnableLossProxying bool `yaml:"enable_loss_proxying,omitempty"` +} + +var ( + DefaultAudioConfig = AudioConfig{ + ActiveLevel: 35, // -35dBov + MinPercentile: 40, + UpdateInterval: 400, + SmoothIntervals: 2, + } +) + +// -------------------------------------- + type AudioLevelHandle func(level uint8, duration uint32) type Bitrates [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerTemporal + 1]int64 @@ -94,8 +139,8 @@ type TrackReceiver interface { type WebRTCReceiver struct { logger logger.Logger - pliThrottleConfig config.PLIThrottleConfig - audioConfig config.AudioConfig + pliThrottleConfig PLIThrottleConfig + audioConfig AudioConfig trackID livekit.TrackID streamID string @@ -138,7 +183,7 @@ type WebRTCReceiver struct { type ReceiverOpts func(w *WebRTCReceiver) *WebRTCReceiver // WithPliThrottleConfig indicates minimum time(ms) between sending PLIs -func WithPliThrottleConfig(pliThrottleConfig config.PLIThrottleConfig) ReceiverOpts { +func WithPliThrottleConfig(pliThrottleConfig PLIThrottleConfig) ReceiverOpts { return func(w *WebRTCReceiver) *WebRTCReceiver { w.pliThrottleConfig = pliThrottleConfig return w @@ -146,7 +191,7 @@ func WithPliThrottleConfig(pliThrottleConfig config.PLIThrottleConfig) ReceiverO } // WithAudioConfig sets up parameters for active speaker detection -func WithAudioConfig(audioConfig config.AudioConfig) ReceiverOpts { +func WithAudioConfig(audioConfig AudioConfig) ReceiverOpts { return func(w *WebRTCReceiver) *WebRTCReceiver { w.audioConfig = audioConfig return w @@ -187,7 +232,7 @@ func NewWebRTCReceiver( trackInfo *livekit.TrackInfo, logger logger.Logger, onRTCP func([]rtcp.Packet), - trackersConfig config.StreamTrackersConfig, + streamTrackerManagerConfig StreamTrackerManagerConfig, opts ...ReceiverOpts, ) *WebRTCReceiver { w := &WebRTCReceiver{ @@ -227,7 +272,7 @@ func NewWebRTCReceiver( strings.EqualFold(w.codec.MimeType, MimeTypeAudioRed) || strings.Contains(strings.ToLower(w.codec.SDPFmtpLine), "useinbandfec=1"), ) - w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig) + w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, streamTrackerManagerConfig) w.streamTrackerManager.SetListener(w) // SVC-TODO: Handle DD for non-SVC cases??? if w.isSVC { diff --git a/pkg/sfu/streamallocator/channelobserver.go b/pkg/sfu/streamallocator/channelobserver.go index 585484408..70b67a3cf 100644 --- a/pkg/sfu/streamallocator/channelobserver.go +++ b/pkg/sfu/streamallocator/channelobserver.go @@ -17,7 +17,6 @@ package streamallocator import ( "fmt" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/logger" ) @@ -69,9 +68,28 @@ func (c ChannelCongestionReason) String() string { // ------------------------------------------------ +type ChannelObserverConfig struct { + Estimate TrendDetectorConfig `yaml:"estimate,omitempty"` + Nack NackTrackerConfig `yaml:"nack,omitempty"` +} + +var ( + DefaultChannelObserverConfigProbe = ChannelObserverConfig{ + Estimate: DefaultTrendDetectorConfigProbe, + Nack: DefaultNackTrackerConfigProbe, + } + + DefaultChannelObserverConfigNonProbe = ChannelObserverConfig{ + Estimate: DefaultTrendDetectorConfigNonProbe, + Nack: DefaultNackTrackerConfigNonProbe, + } +) + +// ------------------------------------------------ + type ChannelObserverParams struct { Name string - Config config.CongestionControlChannelObserverConfig + Config ChannelObserverConfig } type ChannelObserver struct { @@ -87,21 +105,14 @@ func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *Cha params: params, logger: logger, estimateTrend: NewTrendDetector(TrendDetectorParams{ - Name: params.Name + "-estimate", - Logger: logger, - RequiredSamples: params.Config.EstimateRequiredSamples, - RequiredSamplesMin: params.Config.EstimateRequiredSamplesMin, - DownwardTrendThreshold: params.Config.EstimateDownwardTrendThreshold, - DownwardTrendMaxWait: params.Config.EstimateDownwardTrendMaxWait, - CollapseThreshold: params.Config.EstimateCollapseThreshold, - ValidityWindow: params.Config.EstimateValidityWindow, + Name: params.Name + "-estimate", + Logger: logger, + Config: params.Config.Estimate, }), nackTracker: NewNackTracker(NackTrackerParams{ - Name: params.Name + "-nack", - Logger: logger, - WindowMinDuration: params.Config.NackWindowMinDuration, - WindowMaxDuration: params.Config.NackWindowMaxDuration, - RatioThreshold: params.Config.NackRatioThreshold, + Name: params.Name + "-nack", + Logger: logger, + Config: params.Config.Nack, }), } } diff --git a/pkg/sfu/streamallocator/nacktracker.go b/pkg/sfu/streamallocator/nacktracker.go index c7131a01b..5b678a243 100644 --- a/pkg/sfu/streamallocator/nacktracker.go +++ b/pkg/sfu/streamallocator/nacktracker.go @@ -23,12 +23,32 @@ import ( // ------------------------------------------------ +type NackTrackerConfig struct { + WindowMinDuration time.Duration `yaml:"window_min_duration,omitempty"` + WindowMaxDuration time.Duration `yaml:"window_max_duration,omitempty"` + RatioThreshold float64 `yaml:"ratio_threshold,omitempty"` +} + +var ( + DefaultNackTrackerConfigProbe = NackTrackerConfig{ + WindowMinDuration: 500 * time.Millisecond, + WindowMaxDuration: 1 * time.Second, + RatioThreshold: 0.04, + } + + DefaultNackTrackerConfigNonProbe = NackTrackerConfig{ + WindowMinDuration: 2 * time.Second, + WindowMaxDuration: 3 * time.Second, + RatioThreshold: 0.08, + } +) + +// ------------------------------------------------ + type NackTrackerParams struct { - Name string - Logger logger.Logger - WindowMinDuration time.Duration - WindowMaxDuration time.Duration - RatioThreshold float64 + Name string + Logger logger.Logger + Config NackTrackerConfig } type NackTracker struct { @@ -52,7 +72,7 @@ func NewNackTracker(params NackTrackerParams) *NackTracker { } func (n *NackTracker) Add(packets uint32, repeatedNacks uint32) { - if n.params.WindowMaxDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.WindowMaxDuration { + if n.params.Config.WindowMaxDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.Config.WindowMaxDuration { // STREAM-ALLOCATOR-DATA n.updateHistory() n.windowStartTime = time.Time{} @@ -89,8 +109,8 @@ func (n *NackTracker) GetRatio() float64 { } func (n *NackTracker) IsTriggered() bool { - if n.params.WindowMinDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.WindowMinDuration { - return n.GetRatio() > n.params.RatioThreshold + if n.params.Config.WindowMinDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.Config.WindowMinDuration { + return n.GetRatio() > n.params.Config.RatioThreshold } return false diff --git a/pkg/sfu/streamallocator/probe_controller.go b/pkg/sfu/streamallocator/probe_controller.go index 0a49af5de..c2b21b8e1 100644 --- a/pkg/sfu/streamallocator/probe_controller.go +++ b/pkg/sfu/streamallocator/probe_controller.go @@ -18,14 +18,53 @@ import ( "sync" "time" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/logger" ) // --------------------------------------------------------------------------- +type ProbeControllerConfig struct { + BaseInterval time.Duration `yaml:"base_interval,omitempty"` + BackoffFactor float64 `yaml:"backoff_factor,omitempty"` + MaxInterval time.Duration `yaml:"max_interval,omitempty"` + + SettleWait time.Duration `yaml:"settle_wait,omitempty"` + SettleWaitMax time.Duration `yaml:"settle_wait_max,omitempty"` + + TrendWait time.Duration `yaml:"trend_wait,omitempty"` + + OveragePct int64 `yaml:"overage_pct,omitempty"` + MinBps int64 `yaml:"min_bps,omitempty"` + MinDuration time.Duration `yaml:"min_duration,omitempty"` + MaxDuration time.Duration `yaml:"max_duration,omitempty"` + DurationOverflowFactor float64 `yaml:"duration_overflow_factor,omitempty"` + DurationIncreaseFactor float64 `yaml:"duration_increase_factor,omitempty"` +} + +var ( + DefaultProbeControllerConfig = ProbeControllerConfig{ + BaseInterval: 3 * time.Second, + BackoffFactor: 1.5, + MaxInterval: 2 * time.Minute, + + SettleWait: 250 * time.Millisecond, + SettleWaitMax: 10 * time.Second, + + TrendWait: 2 * time.Second, + + OveragePct: 120, + MinBps: 200_000, + MinDuration: 200 * time.Millisecond, + MaxDuration: 20 * time.Second, + DurationOverflowFactor: 1.25, + DurationIncreaseFactor: 1.5, + } +) + +// --------------------------------------------------------------------------- + type ProbeControllerParams struct { - Config config.CongestionControlProbeConfig + Config ProbeControllerConfig Prober *Prober Logger logger.Logger } diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index e1595b427..c7d422500 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -28,7 +28,6 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/utils" @@ -137,8 +136,41 @@ func (e Event) String() string { // --------------------------------------------------------------------------- +type ( + ProbeMode string +) + +const ( + ProbeModePadding ProbeMode = "padding" + ProbeModeMedia ProbeMode = "media" +) + +type StreamAllocatorConfig struct { + NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"` + ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"` + ProbeMode ProbeMode `yaml:"probe_mode,omitempty"` + MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"` + ProbeController ProbeControllerConfig `yaml:"probe_controller,omitempty"` + ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"` + ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"` + DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"` +} + +var ( + DefaultStreamAllocatorConfig = StreamAllocatorConfig{ + NackRatioAttenuator: 0.4, + ExpectedUsageThreshold: 0.95, + ProbeMode: ProbeModePadding, + ProbeController: DefaultProbeControllerConfig, + ChannelObserverProbe: DefaultChannelObserverConfigProbe, + ChannelObserverNonProbe: DefaultChannelObserverConfigNonProbe, + } +) + +// --------------------------------------------------------------------------- + type StreamAllocatorParams struct { - Config config.CongestionControlConfig + Config StreamAllocatorConfig Logger logger.Logger } @@ -149,6 +181,7 @@ type StreamAllocator struct { bwe cc.BandwidthEstimator + enabled bool allowPause bool lastReceivedEstimate int64 @@ -174,10 +207,11 @@ type StreamAllocator struct { isStopped atomic.Bool } -func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { +func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause bool) *StreamAllocator { s := &StreamAllocator{ params: params, - allowPause: params.Config.AllowPause, + enabled: enabled, + allowPause: allowPause, prober: NewProber(ProberParams{ Logger: params.Logger, }), @@ -191,7 +225,7 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { } s.probeController = NewProbeController(ProbeControllerParams{ - Config: s.params.Config.ProbeConfig, + Config: s.params.Config.ProbeController, Prober: s.prober, Logger: params.Logger, }) @@ -869,7 +903,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) { s.probeController.AbortProbe() // if not deficient, free pass allocate track - if !s.params.Config.Enabled || s.state == streamAllocatorStateStable || !track.IsManaged() { + if !s.enabled || s.state == streamAllocatorStateStable || !track.IsManaged() { update := NewStreamStateUpdate() allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal) updateStreamStateChange(track, allocation, update) @@ -1096,7 +1130,7 @@ boost_loop: } func (s *StreamAllocator) allocateAllTracks() { - if !s.params.Config.Enabled { + if !s.enabled { // nothing else to do when disabled return } @@ -1276,7 +1310,7 @@ func (s *StreamAllocator) newChannelObserverProbe() *ChannelObserver { return NewChannelObserver( ChannelObserverParams{ Name: "probe", - Config: s.params.Config.ChannelObserverProbeConfig, + Config: s.params.Config.ChannelObserverProbe, }, s.params.Logger, ) @@ -1286,7 +1320,7 @@ func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver { return NewChannelObserver( ChannelObserverParams{ Name: "non-probe", - Config: s.params.Config.ChannelObserverNonProbeConfig, + Config: s.params.Config.ChannelObserverNonProbe, }, s.params.Logger, ) @@ -1325,10 +1359,10 @@ func (s *StreamAllocator) maybeProbe() { } switch s.params.Config.ProbeMode { - case config.CongestionControlProbeModeMedia: + case ProbeModeMedia: s.maybeProbeWithMedia() s.adjustState() - case config.CongestionControlProbeModePadding: + case ProbeModePadding: s.maybeProbeWithPadding() } } diff --git a/pkg/sfu/streamallocator/trenddetector.go b/pkg/sfu/streamallocator/trenddetector.go index a42e54a1f..164ca419e 100644 --- a/pkg/sfu/streamallocator/trenddetector.go +++ b/pkg/sfu/streamallocator/trenddetector.go @@ -70,15 +70,41 @@ func trendDetectorSampleListToString(samples []trendDetectorSample) string { // ------------------------------------------------ +type TrendDetectorConfig struct { + RequiredSamples int `yaml:"required_samples,omitempty"` + RequiredSamplesMin int `yaml:"required_samples_min,omitempty"` + DownwardTrendThreshold float64 `yaml:"downward_trend_threshold,omitempty"` + DownwardTrendMaxWait time.Duration `yaml:"downward_trend_max_wait,omitempty"` + CollapseThreshold time.Duration `yaml:"collapse_threshold,omitempty"` + ValidityWindow time.Duration `yaml:"validity_window,omitempty"` +} + +var ( + DefaultTrendDetectorConfigProbe = TrendDetectorConfig{ + RequiredSamples: 3, + RequiredSamplesMin: 3, + DownwardTrendThreshold: 0.0, + DownwardTrendMaxWait: 5 * time.Second, + CollapseThreshold: 0, + ValidityWindow: 10 * time.Second, + } + + DefaultTrendDetectorConfigNonProbe = TrendDetectorConfig{ + RequiredSamples: 12, + RequiredSamplesMin: 8, + DownwardTrendThreshold: -0.6, + DownwardTrendMaxWait: 5 * time.Second, + CollapseThreshold: 500 * time.Millisecond, + ValidityWindow: 10 * time.Second, + } +) + +// ------------------------------------------------ + type TrendDetectorParams struct { - Name string - Logger logger.Logger - RequiredSamples int - RequiredSamplesMin int - DownwardTrendThreshold float64 - DownwardTrendMaxWait time.Duration - CollapseThreshold time.Duration - ValidityWindow time.Duration + Name string + Logger logger.Logger + Config TrendDetectorConfig } type TrendDetector struct { @@ -134,7 +160,7 @@ func (t *TrendDetector) AddValue(value int64) { if len(t.samples) != 0 { lastSample = &t.samples[len(t.samples)-1] } - if lastSample != nil && lastSample.value == value && t.params.CollapseThreshold > 0 && time.Since(lastSample.at) < t.params.CollapseThreshold { + if lastSample != nil && lastSample.value == value && t.params.Config.CollapseThreshold > 0 && time.Since(lastSample.at) < t.params.Config.CollapseThreshold { return } @@ -156,7 +182,7 @@ func (t *TrendDetector) GetDirection() TrendDirection { } func (t *TrendDetector) HasEnoughSamples() bool { - return t.numSamples >= t.params.RequiredSamples + return t.numSamples >= t.params.Config.RequiredSamples } func (t *TrendDetector) ToString() string { @@ -173,13 +199,13 @@ func (t *TrendDetector) prune() { // prune based on a few rules // 1. If there are more than required samples - if len(t.samples) > t.params.RequiredSamples { - t.samples = t.samples[len(t.samples)-t.params.RequiredSamples:] + if len(t.samples) > t.params.Config.RequiredSamples { + t.samples = t.samples[len(t.samples)-t.params.Config.RequiredSamples:] } // 2. drop samples that are too old - if len(t.samples) != 0 && t.params.ValidityWindow > 0 { - cutoffTime := time.Now().Add(-t.params.ValidityWindow) + if len(t.samples) != 0 && t.params.Config.ValidityWindow > 0 { + cutoffTime := time.Now().Add(-t.params.Config.ValidityWindow) cutoffIndex := -1 for i := 0; i < len(t.samples); i++ { if t.samples[i].at.After(cutoffTime) { @@ -213,7 +239,7 @@ func (t *TrendDetector) prune() { } func (t *TrendDetector) updateDirection() { - if len(t.samples) < t.params.RequiredSamplesMin { + if len(t.samples) < t.params.Config.RequiredSamplesMin { t.direction = TrendDirectionNeutral return } @@ -223,9 +249,9 @@ func (t *TrendDetector) updateDirection() { t.direction = TrendDirectionNeutral switch { - case kt > 0 && len(t.samples) >= t.params.RequiredSamples: + case kt > 0 && len(t.samples) >= t.params.Config.RequiredSamples: t.direction = TrendDirectionUpward - case kt < t.params.DownwardTrendThreshold && (len(t.samples) >= t.params.RequiredSamples || t.samples[len(t.samples)-1].at.Sub(t.samples[0].at) > t.params.DownwardTrendMaxWait): + case kt < t.params.Config.DownwardTrendThreshold && (len(t.samples) >= t.params.Config.RequiredSamples || t.samples[len(t.samples)-1].at.Sub(t.samples[0].at) > t.params.Config.DownwardTrendMaxWait): t.direction = TrendDirectionDownward } } diff --git a/pkg/sfu/streamtracker/streamtracker_frame.go b/pkg/sfu/streamtracker/streamtracker_frame.go index db08d9343..e0f9f5770 100644 --- a/pkg/sfu/streamtracker/streamtracker_frame.go +++ b/pkg/sfu/streamtracker/streamtracker_frame.go @@ -18,7 +18,6 @@ import ( "math" "time" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/logger" ) @@ -30,8 +29,42 @@ const ( frameRateDecreaseFactor = 0.9 // fast decrease ) +// ------------------------------------------------------- + +type StreamTrackerFrameConfig struct { + MinFPS float64 `yaml:"min_fps,omitempty"` +} + +var ( + DefaultStreamTrackerFrameConfigVideo = map[int32]StreamTrackerFrameConfig{ + 0: { + MinFPS: 5.0, + }, + 1: { + MinFPS: 5.0, + }, + 2: { + MinFPS: 5.0, + }, + } + + DefaultStreamTrackerFrameConfigScreenshare = map[int32]StreamTrackerFrameConfig{ + 0: { + MinFPS: 0.5, + }, + 1: { + MinFPS: 0.5, + }, + 2: { + MinFPS: 0.5, + }, + } +) + +// ------------------------------------------------------- + type StreamTrackerFrameParams struct { - Config config.StreamTrackerFrameConfig + Config StreamTrackerFrameConfig ClockRate uint32 Logger logger.Logger } diff --git a/pkg/sfu/streamtracker/streamtracker_packet.go b/pkg/sfu/streamtracker/streamtracker_packet.go index e95629580..1ff3be1cd 100644 --- a/pkg/sfu/streamtracker/streamtracker_packet.go +++ b/pkg/sfu/streamtracker/streamtracker_packet.go @@ -17,12 +17,56 @@ package streamtracker import ( "time" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/logger" ) +// -------------------------------------------- + +type StreamTrackerPacketConfig struct { + SamplesRequired uint32 `yaml:"samples_required,omitempty"` // number of samples needed per cycle + CyclesRequired uint32 `yaml:"cycles_required,omitempty"` // number of cycles needed to be active + CycleDuration time.Duration `yaml:"cycle_duration,omitempty"` +} + +var ( + DefaultStreamTrackerPacketConfigVideo = map[int32]StreamTrackerPacketConfig{ + 0: {SamplesRequired: 1, + CyclesRequired: 4, + CycleDuration: 500 * time.Millisecond, + }, + 1: {SamplesRequired: 5, + CyclesRequired: 20, + CycleDuration: 500 * time.Millisecond, + }, + 2: {SamplesRequired: 5, + CyclesRequired: 20, + CycleDuration: 500 * time.Millisecond, + }, + } + + DefaultStreamTrackerPacketConfigScreenshare = map[int32]StreamTrackerPacketConfig{ + 0: { + SamplesRequired: 1, + CyclesRequired: 1, + CycleDuration: 2 * time.Second, + }, + 1: { + SamplesRequired: 1, + CyclesRequired: 1, + CycleDuration: 2 * time.Second, + }, + 2: { + SamplesRequired: 1, + CyclesRequired: 1, + CycleDuration: 2 * time.Second, + }, + } +) + +// -------------------------------------------- + type StreamTrackerPacketParams struct { - Config config.StreamTrackerPacketConfig + Config StreamTrackerPacketConfig Logger logger.Logger } diff --git a/pkg/sfu/streamtracker/streamtracker_packet_test.go b/pkg/sfu/streamtracker/streamtracker_packet_test.go index 276483e5e..3f1b65f00 100644 --- a/pkg/sfu/streamtracker/streamtracker_packet_test.go +++ b/pkg/sfu/streamtracker/streamtracker_packet_test.go @@ -23,14 +23,13 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/testutils" "github.com/livekit/protocol/logger" ) func newStreamTrackerPacket(samplesRequired uint32, cyclesRequired uint32, cycleDuration time.Duration) *StreamTracker { stp := NewStreamTrackerPacket(StreamTrackerPacketParams{ - Config: config.StreamTrackerPacketConfig{ + Config: StreamTrackerPacketConfig{ SamplesRequired: samplesRequired, CyclesRequired: cyclesRequired, CycleDuration: cycleDuration, diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index b25544b29..adacdfcf7 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -26,7 +26,6 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/streamtracker" ) @@ -44,13 +43,79 @@ type StreamTrackerManagerListener interface { // --------------------------------------------------- +type ( + StreamTrackerType string +) + +const ( + StreamTrackerTypePacket StreamTrackerType = "packet" + StreamTrackerTypeFrame StreamTrackerType = "frame" +) + +type StreamTrackerPacketConfig struct { + SamplesRequired uint32 `yaml:"samples_required,omitempty"` // number of samples needed per cycle + CyclesRequired uint32 `yaml:"cycles_required,omitempty"` // number of cycles needed to be active + CycleDuration time.Duration `yaml:"cycle_duration,omitempty"` +} + +type StreamTrackerFrameConfig struct { + MinFPS float64 `yaml:"min_fps,omitempty"` +} + +type StreamTrackerConfig struct { + StreamTrackerType StreamTrackerType `yaml:"stream_tracker_type,omitempty"` + BitrateReportInterval map[int32]time.Duration `yaml:"bitrate_report_interval,omitempty"` + PacketTracker map[int32]streamtracker.StreamTrackerPacketConfig `yaml:"packet_tracker,omitempty"` + FrameTracker map[int32]streamtracker.StreamTrackerFrameConfig `yaml:"frame_tracker,omitempty"` +} + +var ( + DefaultStreamTrackerConfigVideo = StreamTrackerConfig{ + StreamTrackerType: StreamTrackerTypePacket, + BitrateReportInterval: map[int32]time.Duration{ + 0: 1 * time.Second, + 1: 1 * time.Second, + 2: 1 * time.Second, + }, + PacketTracker: streamtracker.DefaultStreamTrackerPacketConfigVideo, + FrameTracker: streamtracker.DefaultStreamTrackerFrameConfigVideo, + } + + DefaultStreamTrackerConfigScreenshare = StreamTrackerConfig{ + StreamTrackerType: StreamTrackerTypePacket, + BitrateReportInterval: map[int32]time.Duration{ + 0: 4 * time.Second, + 1: 4 * time.Second, + 2: 4 * time.Second, + }, + PacketTracker: streamtracker.DefaultStreamTrackerPacketConfigScreenshare, + FrameTracker: streamtracker.DefaultStreamTrackerFrameConfigScreenshare, + } +) + +// --------------------------------------------------- + +type StreamTrackerManagerConfig struct { + Video StreamTrackerConfig `yaml:"video,omitempty"` + Screenshare StreamTrackerConfig `yaml:"screenshare,omitempty"` +} + +var ( + DefaultStreamTrackerManagerConfig = StreamTrackerManagerConfig{ + Video: DefaultStreamTrackerConfigVideo, + Screenshare: DefaultStreamTrackerConfigScreenshare, + } +) + +// --------------------------------------------------- + type StreamTrackerManager struct { logger logger.Logger trackInfo atomic.Pointer[livekit.TrackInfo] isSVC bool clockRate uint32 - trackerConfig config.StreamTrackerConfig + trackerConfig StreamTrackerConfig lock sync.RWMutex maxPublishedLayer int32 @@ -73,7 +138,7 @@ func NewStreamTrackerManager( trackInfo *livekit.TrackInfo, isSVC bool, clockRate uint32, - trackersConfig config.StreamTrackersConfig, + config StreamTrackerManagerConfig, ) *StreamTrackerManager { s := &StreamTrackerManager{ logger: logger, @@ -86,11 +151,11 @@ func NewStreamTrackerManager( switch trackInfo.Source { case livekit.TrackSource_SCREEN_SHARE: - s.trackerConfig = trackersConfig.Screenshare + s.trackerConfig = config.Screenshare case livekit.TrackSource_CAMERA: - s.trackerConfig = trackersConfig.Video + s.trackerConfig = config.Video default: - s.trackerConfig = trackersConfig.Video + s.trackerConfig = config.Video } s.maxExpectedLayerFromTrackInfo() @@ -193,9 +258,9 @@ func (s *StreamTrackerManager) AddTracker(layer int32) streamtracker.StreamTrack if tracker == nil { var trackerImpl streamtracker.StreamTrackerImpl switch s.trackerConfig.StreamTrackerType { - case config.StreamTrackerTypePacket: + case StreamTrackerTypePacket: trackerImpl = s.createStreamTrackerPacket(layer) - case config.StreamTrackerTypeFrame: + case StreamTrackerTypeFrame: trackerImpl = s.createStreamTrackerFrame(layer) } if trackerImpl == nil { diff --git a/pkg/telemetry/prometheus/node.go b/pkg/telemetry/prometheus/node.go index 514220ca4..9dda64602 100644 --- a/pkg/telemetry/prometheus/node.go +++ b/pkg/telemetry/prometheus/node.go @@ -21,7 +21,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils/hwstats" @@ -29,6 +28,8 @@ import ( const ( livekitNamespace string = "livekit" + + statsUpdateInterval = time.Second * 10 ) var ( @@ -183,7 +184,7 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats updatedAt := time.Now().Unix() elapsed := updatedAt - prevAverage.UpdatedAt // include sufficient buffer to be sure a stats update had taken place - computeAverage := elapsed > int64(config.StatsUpdateInterval.Seconds()+2) + computeAverage := elapsed > int64(statsUpdateInterval.Seconds()+2) if bytesInNow != prevAverage.BytesIn || bytesOutNow != prevAverage.BytesOut || packetsInNow != prevAverage.PacketsIn || diff --git a/pkg/telemetry/signalanddatastats.go b/pkg/telemetry/signalanddatastats.go index 1ec931563..2c728f06b 100644 --- a/pkg/telemetry/signalanddatastats.go +++ b/pkg/telemetry/signalanddatastats.go @@ -23,7 +23,6 @@ import ( "github.com/frostbyte73/core" "go.uber.org/atomic" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" ) @@ -122,7 +121,7 @@ func (s *BytesTrackStats) report() { } func (s *BytesTrackStats) reporter() { - ticker := time.NewTicker(config.TelemetryNonMediaStatsUpdateInterval) + ticker := time.NewTicker(telemetryNonMediaStatsUpdateInterval) defer func() { ticker.Stop() s.report() diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index ba8ca50df..ca0aa022f 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -19,7 +19,6 @@ import ( "sync" "time" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -87,6 +86,9 @@ type TelemetryService interface { const ( workerCleanupWait = 3 * time.Minute jobsQueueMinSize = 2048 + + telemetryStatsUpdateInterval = time.Second * 30 + telemetryNonMediaStatsUpdateInterval = time.Minute * 5 ) type telemetryService struct { @@ -172,7 +174,7 @@ func (t *telemetryService) FlushStats() { } func (t *telemetryService) run() { - for range time.Tick(config.TelemetryStatsUpdateInterval) { + for range time.Tick(telemetryStatsUpdateInterval) { t.FlushStats() } }