mirror of
https://github.com/livekit/livekit.git
synced 2026-05-14 16:15:25 +00:00
De-centralize some configs to where they are used. (#3162)
* De-centralize some configs to where they are used. And make default variables. Renaming a bit, but these are all internal config and have not been added to documented config. * Keep documented config as is. * test * typo
This commit is contained in:
+15
-235
@@ -28,6 +28,8 @@ import (
|
||||
"gopkg.in/yaml.v3"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/metric"
|
||||
"github.com/livekit/livekit-server/pkg/sfu"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/streamallocator"
|
||||
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
@@ -35,23 +37,8 @@ import (
|
||||
"github.com/livekit/protocol/rpc"
|
||||
)
|
||||
|
||||
type (
|
||||
CongestionControlProbeMode string
|
||||
StreamTrackerType string
|
||||
)
|
||||
|
||||
const (
|
||||
generatedCLIFlagUsage = "generated"
|
||||
|
||||
CongestionControlProbeModePadding CongestionControlProbeMode = "padding"
|
||||
CongestionControlProbeModeMedia CongestionControlProbeMode = "media"
|
||||
|
||||
StreamTrackerTypePacket StreamTrackerType = "packet"
|
||||
StreamTrackerTypeFrame StreamTrackerType = "frame"
|
||||
|
||||
StatsUpdateInterval = time.Second * 10
|
||||
TelemetryStatsUpdateInterval = time.Second * 30
|
||||
TelemetryNonMediaStatsUpdateInterval = time.Minute * 5
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -67,7 +54,7 @@ type Config struct {
|
||||
Prometheus PrometheusConfig `yaml:"prometheus,omitempty"`
|
||||
RTC RTCConfig `yaml:"rtc,omitempty"`
|
||||
Redis redisLiveKit.RedisConfig `yaml:"redis,omitempty"`
|
||||
Audio AudioConfig `yaml:"audio,omitempty"`
|
||||
Audio sfu.AudioConfig `yaml:"audio,omitempty"`
|
||||
Video VideoConfig `yaml:"video,omitempty"`
|
||||
Room RoomConfig `yaml:"room,omitempty"`
|
||||
TURN TURNConfig `yaml:"turn,omitempty"`
|
||||
@@ -105,7 +92,7 @@ type RTCConfig struct {
|
||||
PacketBufferSizeAudio int `yaml:"packet_buffer_size_audio,omitempty"`
|
||||
|
||||
// Throttle periods for pli/fir rtcp packets
|
||||
PLIThrottle PLIThrottleConfig `yaml:"pli_throttle,omitempty"`
|
||||
PLIThrottle sfu.PLIThrottleConfig `yaml:"pli_throttle,omitempty"`
|
||||
|
||||
CongestionControl CongestionControlConfig `yaml:"congestion_control,omitempty"`
|
||||
|
||||
@@ -135,93 +122,11 @@ type TURNServer struct {
|
||||
Credential string `yaml:"credential,omitempty"`
|
||||
}
|
||||
|
||||
type PLIThrottleConfig struct {
|
||||
LowQuality time.Duration `yaml:"low_quality,omitempty"`
|
||||
MidQuality time.Duration `yaml:"mid_quality,omitempty"`
|
||||
HighQuality time.Duration `yaml:"high_quality,omitempty"`
|
||||
}
|
||||
|
||||
type CongestionControlProbeConfig struct {
|
||||
BaseInterval time.Duration `yaml:"base_interval,omitempty"`
|
||||
BackoffFactor float64 `yaml:"backoff_factor,omitempty"`
|
||||
MaxInterval time.Duration `yaml:"max_interval,omitempty"`
|
||||
|
||||
SettleWait time.Duration `yaml:"settle_wait,omitempty"`
|
||||
SettleWaitMax time.Duration `yaml:"settle_wait_max,omitempty"`
|
||||
|
||||
TrendWait time.Duration `yaml:"trend_wait,omitempty"`
|
||||
|
||||
OveragePct int64 `yaml:"overage_pct,omitempty"`
|
||||
MinBps int64 `yaml:"min_bps,omitempty"`
|
||||
MinDuration time.Duration `yaml:"min_duration,omitempty"`
|
||||
MaxDuration time.Duration `yaml:"max_duration,omitempty"`
|
||||
DurationOverflowFactor float64 `yaml:"duration_overflow_factor,omitempty"`
|
||||
DurationIncreaseFactor float64 `yaml:"duration_increase_factor,omitempty"`
|
||||
}
|
||||
|
||||
type CongestionControlChannelObserverConfig struct {
|
||||
EstimateRequiredSamples int `yaml:"estimate_required_samples,omitempty"`
|
||||
EstimateRequiredSamplesMin int `yaml:"estimate_required_samples_min,omitempty"`
|
||||
EstimateDownwardTrendThreshold float64 `yaml:"estimate_downward_trend_threshold,omitempty"`
|
||||
EstimateDownwardTrendMaxWait time.Duration `yaml:"estimate_downward_trend_max_wait,omitempty"`
|
||||
EstimateCollapseThreshold time.Duration `yaml:"estimate_collapse_threshold,omitempty"`
|
||||
EstimateValidityWindow time.Duration `yaml:"estimate_validity_window,omitempty"`
|
||||
NackWindowMinDuration time.Duration `yaml:"nack_window_min_duration,omitempty"`
|
||||
NackWindowMaxDuration time.Duration `yaml:"nack_window_max_duration,omitempty"`
|
||||
NackRatioThreshold float64 `yaml:"nack_ratio_threshold,omitempty"`
|
||||
}
|
||||
|
||||
type CongestionControlConfig struct {
|
||||
Enabled bool `yaml:"enabled,omitempty"`
|
||||
AllowPause bool `yaml:"allow_pause,omitempty"`
|
||||
NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"`
|
||||
ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"`
|
||||
UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"`
|
||||
ProbeMode CongestionControlProbeMode `yaml:"probe_mode,omitempty"`
|
||||
MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"`
|
||||
ProbeConfig CongestionControlProbeConfig `yaml:"probe_config,omitempty"`
|
||||
ChannelObserverProbeConfig CongestionControlChannelObserverConfig `yaml:"channel_observer_probe_config,omitempty"`
|
||||
ChannelObserverNonProbeConfig CongestionControlChannelObserverConfig `yaml:"channel_observer_non_probe_config,omitempty"`
|
||||
DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"`
|
||||
}
|
||||
|
||||
type AudioConfig struct {
|
||||
// minimum level to be considered active, 0-127, where 0 is loudest
|
||||
ActiveLevel uint8 `yaml:"active_level,omitempty"`
|
||||
// percentile to measure, a participant is considered active if it has exceeded the ActiveLevel more than
|
||||
// MinPercentile% of the time
|
||||
MinPercentile uint8 `yaml:"min_percentile,omitempty"`
|
||||
// interval to update clients, in ms
|
||||
UpdateInterval uint32 `yaml:"update_interval,omitempty"`
|
||||
// smoothing for audioLevel values sent to the client.
|
||||
// audioLevel will be an average of `smooth_intervals`, 0 to disable
|
||||
SmoothIntervals uint32 `yaml:"smooth_intervals,omitempty"`
|
||||
// enable red encoding downtrack for opus only audio up track
|
||||
ActiveREDEncoding bool `yaml:"active_red_encoding,omitempty"`
|
||||
// enable proxying weakest subscriber loss to publisher in RTCP Receiver Report
|
||||
EnableLossProxying bool `yaml:"enable_loss_proxying,omitempty"`
|
||||
}
|
||||
|
||||
type StreamTrackerPacketConfig struct {
|
||||
SamplesRequired uint32 `yaml:"samples_required,omitempty"` // number of samples needed per cycle
|
||||
CyclesRequired uint32 `yaml:"cycles_required,omitempty"` // number of cycles needed to be active
|
||||
CycleDuration time.Duration `yaml:"cycle_duration,omitempty"`
|
||||
}
|
||||
|
||||
type StreamTrackerFrameConfig struct {
|
||||
MinFPS float64 `yaml:"min_fps,omitempty"`
|
||||
}
|
||||
|
||||
type StreamTrackerConfig struct {
|
||||
StreamTrackerType StreamTrackerType `yaml:"stream_tracker_type,omitempty"`
|
||||
BitrateReportInterval map[int32]time.Duration `yaml:"bitrate_report_interval,omitempty"`
|
||||
PacketTracker map[int32]StreamTrackerPacketConfig `yaml:"packet_tracker,omitempty"`
|
||||
FrameTracker map[int32]StreamTrackerFrameConfig `yaml:"frame_tracker,omitempty"`
|
||||
}
|
||||
|
||||
type StreamTrackersConfig struct {
|
||||
Video StreamTrackerConfig `yaml:"video,omitempty"`
|
||||
Screenshare StreamTrackerConfig `yaml:"screenshare,omitempty"`
|
||||
Enabled bool `yaml:"enabled,omitempty"`
|
||||
AllowPause bool `yaml:"allow_pause,omitempty"`
|
||||
StreamAllocator streamallocator.StreamAllocatorConfig `yaml:"stream_allocator,omitempty"`
|
||||
UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"`
|
||||
}
|
||||
|
||||
type PlayoutDelayConfig struct {
|
||||
@@ -231,8 +136,8 @@ type PlayoutDelayConfig struct {
|
||||
}
|
||||
|
||||
type VideoConfig struct {
|
||||
DynacastPauseDelay time.Duration `yaml:"dynacast_pause_delay,omitempty"`
|
||||
StreamTracker StreamTrackersConfig `yaml:"stream_tracker,omitempty"`
|
||||
DynacastPauseDelay time.Duration `yaml:"dynacast_pause_delay,omitempty"`
|
||||
StreamTrackerManager sfu.StreamTrackerManagerConfig `yaml:"stream_tracker_manager,omitempty"`
|
||||
}
|
||||
|
||||
type RoomConfig struct {
|
||||
@@ -398,140 +303,15 @@ var DefaultConfig = Config{
|
||||
PacketBufferSizeVideo: 500,
|
||||
PacketBufferSizeAudio: 200,
|
||||
StrictACKs: true,
|
||||
PLIThrottle: PLIThrottleConfig{
|
||||
LowQuality: 500 * time.Millisecond,
|
||||
MidQuality: time.Second,
|
||||
HighQuality: time.Second,
|
||||
},
|
||||
PLIThrottle: sfu.DefaultPLIThrottleConfig,
|
||||
CongestionControl: CongestionControlConfig{
|
||||
Enabled: true,
|
||||
AllowPause: false,
|
||||
NackRatioAttenuator: 0.4,
|
||||
ExpectedUsageThreshold: 0.95,
|
||||
ProbeMode: CongestionControlProbeModePadding,
|
||||
ProbeConfig: CongestionControlProbeConfig{
|
||||
BaseInterval: 3 * time.Second,
|
||||
BackoffFactor: 1.5,
|
||||
MaxInterval: 2 * time.Minute,
|
||||
|
||||
SettleWait: 250 * time.Millisecond,
|
||||
SettleWaitMax: 10 * time.Second,
|
||||
|
||||
TrendWait: 2 * time.Second,
|
||||
|
||||
OveragePct: 120,
|
||||
MinBps: 200_000,
|
||||
MinDuration: 200 * time.Millisecond,
|
||||
MaxDuration: 20 * time.Second,
|
||||
DurationOverflowFactor: 1.25,
|
||||
DurationIncreaseFactor: 1.5,
|
||||
},
|
||||
ChannelObserverProbeConfig: CongestionControlChannelObserverConfig{
|
||||
EstimateRequiredSamples: 3,
|
||||
EstimateRequiredSamplesMin: 3,
|
||||
EstimateDownwardTrendThreshold: 0.0,
|
||||
EstimateDownwardTrendMaxWait: 5 * time.Second,
|
||||
EstimateCollapseThreshold: 0,
|
||||
EstimateValidityWindow: 10 * time.Second,
|
||||
NackWindowMinDuration: 500 * time.Millisecond,
|
||||
NackWindowMaxDuration: 1 * time.Second,
|
||||
NackRatioThreshold: 0.04,
|
||||
},
|
||||
ChannelObserverNonProbeConfig: CongestionControlChannelObserverConfig{
|
||||
EstimateRequiredSamples: 12,
|
||||
EstimateRequiredSamplesMin: 8,
|
||||
EstimateDownwardTrendThreshold: -0.6,
|
||||
EstimateDownwardTrendMaxWait: 5 * time.Second,
|
||||
EstimateCollapseThreshold: 500 * time.Millisecond,
|
||||
EstimateValidityWindow: 10 * time.Second,
|
||||
NackWindowMinDuration: 2 * time.Second,
|
||||
NackWindowMaxDuration: 3 * time.Second,
|
||||
NackRatioThreshold: 0.08,
|
||||
},
|
||||
StreamAllocator: streamallocator.DefaultStreamAllocatorConfig,
|
||||
},
|
||||
},
|
||||
Audio: AudioConfig{
|
||||
ActiveLevel: 35, // -35dBov
|
||||
MinPercentile: 40,
|
||||
UpdateInterval: 400,
|
||||
SmoothIntervals: 2,
|
||||
},
|
||||
Audio: sfu.DefaultAudioConfig,
|
||||
Video: VideoConfig{
|
||||
DynacastPauseDelay: 5 * time.Second,
|
||||
StreamTracker: StreamTrackersConfig{
|
||||
Video: StreamTrackerConfig{
|
||||
StreamTrackerType: StreamTrackerTypePacket,
|
||||
BitrateReportInterval: map[int32]time.Duration{
|
||||
0: 1 * time.Second,
|
||||
1: 1 * time.Second,
|
||||
2: 1 * time.Second,
|
||||
},
|
||||
PacketTracker: map[int32]StreamTrackerPacketConfig{
|
||||
0: {
|
||||
SamplesRequired: 1,
|
||||
CyclesRequired: 4,
|
||||
CycleDuration: 500 * time.Millisecond,
|
||||
},
|
||||
1: {
|
||||
SamplesRequired: 5,
|
||||
CyclesRequired: 20,
|
||||
CycleDuration: 500 * time.Millisecond,
|
||||
},
|
||||
2: {
|
||||
SamplesRequired: 5,
|
||||
CyclesRequired: 20,
|
||||
CycleDuration: 500 * time.Millisecond,
|
||||
},
|
||||
},
|
||||
FrameTracker: map[int32]StreamTrackerFrameConfig{
|
||||
0: {
|
||||
MinFPS: 5.0,
|
||||
},
|
||||
1: {
|
||||
MinFPS: 5.0,
|
||||
},
|
||||
2: {
|
||||
MinFPS: 5.0,
|
||||
},
|
||||
},
|
||||
},
|
||||
Screenshare: StreamTrackerConfig{
|
||||
StreamTrackerType: StreamTrackerTypePacket,
|
||||
BitrateReportInterval: map[int32]time.Duration{
|
||||
0: 4 * time.Second,
|
||||
1: 4 * time.Second,
|
||||
2: 4 * time.Second,
|
||||
},
|
||||
PacketTracker: map[int32]StreamTrackerPacketConfig{
|
||||
0: {
|
||||
SamplesRequired: 1,
|
||||
CyclesRequired: 1,
|
||||
CycleDuration: 2 * time.Second,
|
||||
},
|
||||
1: {
|
||||
SamplesRequired: 1,
|
||||
CyclesRequired: 1,
|
||||
CycleDuration: 2 * time.Second,
|
||||
},
|
||||
2: {
|
||||
SamplesRequired: 1,
|
||||
CyclesRequired: 1,
|
||||
CycleDuration: 2 * time.Second,
|
||||
},
|
||||
},
|
||||
FrameTracker: map[int32]StreamTrackerFrameConfig{
|
||||
0: {
|
||||
MinFPS: 0.5,
|
||||
},
|
||||
1: {
|
||||
MinFPS: 0.5,
|
||||
},
|
||||
2: {
|
||||
MinFPS: 0.5,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
DynacastPauseDelay: 5 * time.Second,
|
||||
StreamTrackerManager: sfu.DefaultStreamTrackerManagerConfig,
|
||||
},
|
||||
Redis: redisLiveKit.RedisConfig{},
|
||||
Room: RoomConfig{
|
||||
|
||||
@@ -65,8 +65,8 @@ type MediaTrackParams struct {
|
||||
BufferFactory *buffer.Factory
|
||||
ReceiverConfig ReceiverConfig
|
||||
SubscriberConfig DirectionConfig
|
||||
PLIThrottleConfig config.PLIThrottleConfig
|
||||
AudioConfig config.AudioConfig
|
||||
PLIThrottleConfig sfu.PLIThrottleConfig
|
||||
AudioConfig sfu.AudioConfig
|
||||
VideoConfig config.VideoConfig
|
||||
Telemetry telemetry.TelemetryService
|
||||
Logger logger.Logger
|
||||
@@ -280,7 +280,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
|
||||
ti,
|
||||
LoggerWithCodecMime(t.params.Logger, mime),
|
||||
t.params.OnRTCP,
|
||||
t.params.VideoConfig.StreamTracker,
|
||||
t.params.VideoConfig.StreamTrackerManager,
|
||||
sfu.WithPliThrottleConfig(t.params.PLIThrottleConfig),
|
||||
sfu.WithAudioConfig(t.params.AudioConfig),
|
||||
sfu.WithLoadBalanceThreshold(20),
|
||||
|
||||
@@ -32,7 +32,6 @@ import (
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/utils"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/livekit-server/pkg/sfu"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
@@ -91,7 +90,7 @@ type MediaTrackReceiverParams struct {
|
||||
ParticipantVersion uint32
|
||||
ReceiverConfig ReceiverConfig
|
||||
SubscriberConfig DirectionConfig
|
||||
AudioConfig config.AudioConfig
|
||||
AudioConfig sfu.AudioConfig
|
||||
Telemetry telemetry.TelemetryService
|
||||
Logger logger.Logger
|
||||
}
|
||||
|
||||
@@ -113,14 +113,14 @@ type ParticipantParams struct {
|
||||
SID livekit.ParticipantID
|
||||
Config *WebRTCConfig
|
||||
Sink routing.MessageSink
|
||||
AudioConfig config.AudioConfig
|
||||
AudioConfig sfu.AudioConfig
|
||||
VideoConfig config.VideoConfig
|
||||
LimitConfig config.LimitConfig
|
||||
ProtocolVersion types.ProtocolVersion
|
||||
SessionStartTime time.Time
|
||||
Telemetry telemetry.TelemetryService
|
||||
Trailer []byte
|
||||
PLIThrottleConfig config.PLIThrottleConfig
|
||||
PLIThrottleConfig sfu.PLIThrottleConfig
|
||||
CongestionControlConfig config.CongestionControlConfig
|
||||
// codecs that are enabled for this room
|
||||
PublishEnabledCodecs []*livekit.Codec
|
||||
|
||||
+3
-2
@@ -39,6 +39,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/routing"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/livekit-server/pkg/sfu"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
@@ -107,7 +108,7 @@ type Room struct {
|
||||
Logger logger.Logger
|
||||
|
||||
config WebRTCConfig
|
||||
audioConfig *config.AudioConfig
|
||||
audioConfig *sfu.AudioConfig
|
||||
serverInfo *livekit.ServerInfo
|
||||
telemetry telemetry.TelemetryService
|
||||
egressLauncher EgressLauncher
|
||||
@@ -234,7 +235,7 @@ func NewRoom(
|
||||
internal *livekit.RoomInternal,
|
||||
config WebRTCConfig,
|
||||
roomConfig config.RoomConfig,
|
||||
audioConfig *config.AudioConfig,
|
||||
audioConfig *sfu.AudioConfig,
|
||||
serverInfo *livekit.ServerInfo,
|
||||
telemetry telemetry.TelemetryService,
|
||||
agentClient agent.Client,
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types/typesfakes"
|
||||
"github.com/livekit/livekit-server/pkg/sfu"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/audio"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
@@ -802,7 +803,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room {
|
||||
EmptyTimeout: 5 * 60,
|
||||
DepartureTimeout: 1,
|
||||
},
|
||||
&config.AudioConfig{
|
||||
&sfu.AudioConfig{
|
||||
UpdateInterval: audioUpdateInterval,
|
||||
SmoothIntervals: opts.audioSmoothIntervals,
|
||||
},
|
||||
|
||||
@@ -444,9 +444,9 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
|
||||
}
|
||||
if params.IsSendSide {
|
||||
t.streamAllocator = streamallocator.NewStreamAllocator(streamallocator.StreamAllocatorParams{
|
||||
Config: params.CongestionControlConfig,
|
||||
Config: params.CongestionControlConfig.StreamAllocator,
|
||||
Logger: params.Logger.WithComponent(utils.ComponentCongestionControl),
|
||||
})
|
||||
}, params.CongestionControlConfig.Enabled, params.CongestionControlConfig.AllowPause)
|
||||
t.streamAllocator.OnStreamStateChange(params.Handler.OnStreamStateChange)
|
||||
t.streamAllocator.Start()
|
||||
t.pacer = pacer.NewPassThrough(params.Logger)
|
||||
|
||||
+52
-7
@@ -30,7 +30,6 @@ import (
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/utils"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/audio"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
|
||||
@@ -45,6 +44,52 @@ var (
|
||||
ErrDuplicateLayer = errors.New("duplicate layer")
|
||||
)
|
||||
|
||||
// --------------------------------------
|
||||
|
||||
type PLIThrottleConfig struct {
|
||||
LowQuality time.Duration `yaml:"low_quality,omitempty"`
|
||||
MidQuality time.Duration `yaml:"mid_quality,omitempty"`
|
||||
HighQuality time.Duration `yaml:"high_quality,omitempty"`
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultPLIThrottleConfig = PLIThrottleConfig{
|
||||
LowQuality: 500 * time.Millisecond,
|
||||
MidQuality: time.Second,
|
||||
HighQuality: time.Second,
|
||||
}
|
||||
)
|
||||
|
||||
// --------------------------------------
|
||||
|
||||
type AudioConfig struct {
|
||||
// minimum level to be considered active, 0-127, where 0 is loudest
|
||||
ActiveLevel uint8 `yaml:"active_level,omitempty"`
|
||||
// percentile to measure, a participant is considered active if it has exceeded the ActiveLevel more than
|
||||
// MinPercentile% of the time
|
||||
MinPercentile uint8 `yaml:"min_percentile,omitempty"`
|
||||
// interval to update clients, in ms
|
||||
UpdateInterval uint32 `yaml:"update_interval,omitempty"`
|
||||
// smoothing for audioLevel values sent to the client.
|
||||
// audioLevel will be an average of `smooth_intervals`, 0 to disable
|
||||
SmoothIntervals uint32 `yaml:"smooth_intervals,omitempty"`
|
||||
// enable red encoding downtrack for opus only audio up track
|
||||
ActiveREDEncoding bool `yaml:"active_red_encoding,omitempty"`
|
||||
// enable proxying weakest subscriber loss to publisher in RTCP Receiver Report
|
||||
EnableLossProxying bool `yaml:"enable_loss_proxying,omitempty"`
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultAudioConfig = AudioConfig{
|
||||
ActiveLevel: 35, // -35dBov
|
||||
MinPercentile: 40,
|
||||
UpdateInterval: 400,
|
||||
SmoothIntervals: 2,
|
||||
}
|
||||
)
|
||||
|
||||
// --------------------------------------
|
||||
|
||||
type AudioLevelHandle func(level uint8, duration uint32)
|
||||
|
||||
type Bitrates [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerTemporal + 1]int64
|
||||
@@ -94,8 +139,8 @@ type TrackReceiver interface {
|
||||
type WebRTCReceiver struct {
|
||||
logger logger.Logger
|
||||
|
||||
pliThrottleConfig config.PLIThrottleConfig
|
||||
audioConfig config.AudioConfig
|
||||
pliThrottleConfig PLIThrottleConfig
|
||||
audioConfig AudioConfig
|
||||
|
||||
trackID livekit.TrackID
|
||||
streamID string
|
||||
@@ -138,7 +183,7 @@ type WebRTCReceiver struct {
|
||||
type ReceiverOpts func(w *WebRTCReceiver) *WebRTCReceiver
|
||||
|
||||
// WithPliThrottleConfig indicates minimum time(ms) between sending PLIs
|
||||
func WithPliThrottleConfig(pliThrottleConfig config.PLIThrottleConfig) ReceiverOpts {
|
||||
func WithPliThrottleConfig(pliThrottleConfig PLIThrottleConfig) ReceiverOpts {
|
||||
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
||||
w.pliThrottleConfig = pliThrottleConfig
|
||||
return w
|
||||
@@ -146,7 +191,7 @@ func WithPliThrottleConfig(pliThrottleConfig config.PLIThrottleConfig) ReceiverO
|
||||
}
|
||||
|
||||
// WithAudioConfig sets up parameters for active speaker detection
|
||||
func WithAudioConfig(audioConfig config.AudioConfig) ReceiverOpts {
|
||||
func WithAudioConfig(audioConfig AudioConfig) ReceiverOpts {
|
||||
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
||||
w.audioConfig = audioConfig
|
||||
return w
|
||||
@@ -187,7 +232,7 @@ func NewWebRTCReceiver(
|
||||
trackInfo *livekit.TrackInfo,
|
||||
logger logger.Logger,
|
||||
onRTCP func([]rtcp.Packet),
|
||||
trackersConfig config.StreamTrackersConfig,
|
||||
streamTrackerManagerConfig StreamTrackerManagerConfig,
|
||||
opts ...ReceiverOpts,
|
||||
) *WebRTCReceiver {
|
||||
w := &WebRTCReceiver{
|
||||
@@ -227,7 +272,7 @@ func NewWebRTCReceiver(
|
||||
strings.EqualFold(w.codec.MimeType, MimeTypeAudioRed) || strings.Contains(strings.ToLower(w.codec.SDPFmtpLine), "useinbandfec=1"),
|
||||
)
|
||||
|
||||
w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig)
|
||||
w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, streamTrackerManagerConfig)
|
||||
w.streamTrackerManager.SetListener(w)
|
||||
// SVC-TODO: Handle DD for non-SVC cases???
|
||||
if w.isSVC {
|
||||
|
||||
@@ -17,7 +17,6 @@ package streamallocator
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
|
||||
@@ -69,9 +68,28 @@ func (c ChannelCongestionReason) String() string {
|
||||
|
||||
// ------------------------------------------------
|
||||
|
||||
type ChannelObserverConfig struct {
|
||||
Estimate TrendDetectorConfig `yaml:"estimate,omitempty"`
|
||||
Nack NackTrackerConfig `yaml:"nack,omitempty"`
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultChannelObserverConfigProbe = ChannelObserverConfig{
|
||||
Estimate: DefaultTrendDetectorConfigProbe,
|
||||
Nack: DefaultNackTrackerConfigProbe,
|
||||
}
|
||||
|
||||
DefaultChannelObserverConfigNonProbe = ChannelObserverConfig{
|
||||
Estimate: DefaultTrendDetectorConfigNonProbe,
|
||||
Nack: DefaultNackTrackerConfigNonProbe,
|
||||
}
|
||||
)
|
||||
|
||||
// ------------------------------------------------
|
||||
|
||||
type ChannelObserverParams struct {
|
||||
Name string
|
||||
Config config.CongestionControlChannelObserverConfig
|
||||
Config ChannelObserverConfig
|
||||
}
|
||||
|
||||
type ChannelObserver struct {
|
||||
@@ -87,21 +105,14 @@ func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *Cha
|
||||
params: params,
|
||||
logger: logger,
|
||||
estimateTrend: NewTrendDetector(TrendDetectorParams{
|
||||
Name: params.Name + "-estimate",
|
||||
Logger: logger,
|
||||
RequiredSamples: params.Config.EstimateRequiredSamples,
|
||||
RequiredSamplesMin: params.Config.EstimateRequiredSamplesMin,
|
||||
DownwardTrendThreshold: params.Config.EstimateDownwardTrendThreshold,
|
||||
DownwardTrendMaxWait: params.Config.EstimateDownwardTrendMaxWait,
|
||||
CollapseThreshold: params.Config.EstimateCollapseThreshold,
|
||||
ValidityWindow: params.Config.EstimateValidityWindow,
|
||||
Name: params.Name + "-estimate",
|
||||
Logger: logger,
|
||||
Config: params.Config.Estimate,
|
||||
}),
|
||||
nackTracker: NewNackTracker(NackTrackerParams{
|
||||
Name: params.Name + "-nack",
|
||||
Logger: logger,
|
||||
WindowMinDuration: params.Config.NackWindowMinDuration,
|
||||
WindowMaxDuration: params.Config.NackWindowMaxDuration,
|
||||
RatioThreshold: params.Config.NackRatioThreshold,
|
||||
Name: params.Name + "-nack",
|
||||
Logger: logger,
|
||||
Config: params.Config.Nack,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,12 +23,32 @@ import (
|
||||
|
||||
// ------------------------------------------------
|
||||
|
||||
type NackTrackerConfig struct {
|
||||
WindowMinDuration time.Duration `yaml:"window_min_duration,omitempty"`
|
||||
WindowMaxDuration time.Duration `yaml:"window_max_duration,omitempty"`
|
||||
RatioThreshold float64 `yaml:"ratio_threshold,omitempty"`
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultNackTrackerConfigProbe = NackTrackerConfig{
|
||||
WindowMinDuration: 500 * time.Millisecond,
|
||||
WindowMaxDuration: 1 * time.Second,
|
||||
RatioThreshold: 0.04,
|
||||
}
|
||||
|
||||
DefaultNackTrackerConfigNonProbe = NackTrackerConfig{
|
||||
WindowMinDuration: 2 * time.Second,
|
||||
WindowMaxDuration: 3 * time.Second,
|
||||
RatioThreshold: 0.08,
|
||||
}
|
||||
)
|
||||
|
||||
// ------------------------------------------------
|
||||
|
||||
type NackTrackerParams struct {
|
||||
Name string
|
||||
Logger logger.Logger
|
||||
WindowMinDuration time.Duration
|
||||
WindowMaxDuration time.Duration
|
||||
RatioThreshold float64
|
||||
Name string
|
||||
Logger logger.Logger
|
||||
Config NackTrackerConfig
|
||||
}
|
||||
|
||||
type NackTracker struct {
|
||||
@@ -52,7 +72,7 @@ func NewNackTracker(params NackTrackerParams) *NackTracker {
|
||||
}
|
||||
|
||||
func (n *NackTracker) Add(packets uint32, repeatedNacks uint32) {
|
||||
if n.params.WindowMaxDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.WindowMaxDuration {
|
||||
if n.params.Config.WindowMaxDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.Config.WindowMaxDuration {
|
||||
// STREAM-ALLOCATOR-DATA n.updateHistory()
|
||||
|
||||
n.windowStartTime = time.Time{}
|
||||
@@ -89,8 +109,8 @@ func (n *NackTracker) GetRatio() float64 {
|
||||
}
|
||||
|
||||
func (n *NackTracker) IsTriggered() bool {
|
||||
if n.params.WindowMinDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.WindowMinDuration {
|
||||
return n.GetRatio() > n.params.RatioThreshold
|
||||
if n.params.Config.WindowMinDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.Config.WindowMinDuration {
|
||||
return n.GetRatio() > n.params.Config.RatioThreshold
|
||||
}
|
||||
|
||||
return false
|
||||
|
||||
@@ -18,14 +18,53 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type ProbeControllerConfig struct {
|
||||
BaseInterval time.Duration `yaml:"base_interval,omitempty"`
|
||||
BackoffFactor float64 `yaml:"backoff_factor,omitempty"`
|
||||
MaxInterval time.Duration `yaml:"max_interval,omitempty"`
|
||||
|
||||
SettleWait time.Duration `yaml:"settle_wait,omitempty"`
|
||||
SettleWaitMax time.Duration `yaml:"settle_wait_max,omitempty"`
|
||||
|
||||
TrendWait time.Duration `yaml:"trend_wait,omitempty"`
|
||||
|
||||
OveragePct int64 `yaml:"overage_pct,omitempty"`
|
||||
MinBps int64 `yaml:"min_bps,omitempty"`
|
||||
MinDuration time.Duration `yaml:"min_duration,omitempty"`
|
||||
MaxDuration time.Duration `yaml:"max_duration,omitempty"`
|
||||
DurationOverflowFactor float64 `yaml:"duration_overflow_factor,omitempty"`
|
||||
DurationIncreaseFactor float64 `yaml:"duration_increase_factor,omitempty"`
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultProbeControllerConfig = ProbeControllerConfig{
|
||||
BaseInterval: 3 * time.Second,
|
||||
BackoffFactor: 1.5,
|
||||
MaxInterval: 2 * time.Minute,
|
||||
|
||||
SettleWait: 250 * time.Millisecond,
|
||||
SettleWaitMax: 10 * time.Second,
|
||||
|
||||
TrendWait: 2 * time.Second,
|
||||
|
||||
OveragePct: 120,
|
||||
MinBps: 200_000,
|
||||
MinDuration: 200 * time.Millisecond,
|
||||
MaxDuration: 20 * time.Second,
|
||||
DurationOverflowFactor: 1.25,
|
||||
DurationIncreaseFactor: 1.5,
|
||||
}
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type ProbeControllerParams struct {
|
||||
Config config.CongestionControlProbeConfig
|
||||
Config ProbeControllerConfig
|
||||
Prober *Prober
|
||||
Logger logger.Logger
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@ import (
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/sfu"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/livekit-server/pkg/utils"
|
||||
@@ -137,8 +136,41 @@ func (e Event) String() string {
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type (
|
||||
ProbeMode string
|
||||
)
|
||||
|
||||
const (
|
||||
ProbeModePadding ProbeMode = "padding"
|
||||
ProbeModeMedia ProbeMode = "media"
|
||||
)
|
||||
|
||||
type StreamAllocatorConfig struct {
|
||||
NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"`
|
||||
ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"`
|
||||
ProbeMode ProbeMode `yaml:"probe_mode,omitempty"`
|
||||
MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"`
|
||||
ProbeController ProbeControllerConfig `yaml:"probe_controller,omitempty"`
|
||||
ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"`
|
||||
ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"`
|
||||
DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"`
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultStreamAllocatorConfig = StreamAllocatorConfig{
|
||||
NackRatioAttenuator: 0.4,
|
||||
ExpectedUsageThreshold: 0.95,
|
||||
ProbeMode: ProbeModePadding,
|
||||
ProbeController: DefaultProbeControllerConfig,
|
||||
ChannelObserverProbe: DefaultChannelObserverConfigProbe,
|
||||
ChannelObserverNonProbe: DefaultChannelObserverConfigNonProbe,
|
||||
}
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type StreamAllocatorParams struct {
|
||||
Config config.CongestionControlConfig
|
||||
Config StreamAllocatorConfig
|
||||
Logger logger.Logger
|
||||
}
|
||||
|
||||
@@ -149,6 +181,7 @@ type StreamAllocator struct {
|
||||
|
||||
bwe cc.BandwidthEstimator
|
||||
|
||||
enabled bool
|
||||
allowPause bool
|
||||
|
||||
lastReceivedEstimate int64
|
||||
@@ -174,10 +207,11 @@ type StreamAllocator struct {
|
||||
isStopped atomic.Bool
|
||||
}
|
||||
|
||||
func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
|
||||
func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause bool) *StreamAllocator {
|
||||
s := &StreamAllocator{
|
||||
params: params,
|
||||
allowPause: params.Config.AllowPause,
|
||||
enabled: enabled,
|
||||
allowPause: allowPause,
|
||||
prober: NewProber(ProberParams{
|
||||
Logger: params.Logger,
|
||||
}),
|
||||
@@ -191,7 +225,7 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
|
||||
}
|
||||
|
||||
s.probeController = NewProbeController(ProbeControllerParams{
|
||||
Config: s.params.Config.ProbeConfig,
|
||||
Config: s.params.Config.ProbeController,
|
||||
Prober: s.prober,
|
||||
Logger: params.Logger,
|
||||
})
|
||||
@@ -869,7 +903,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
|
||||
s.probeController.AbortProbe()
|
||||
|
||||
// if not deficient, free pass allocate track
|
||||
if !s.params.Config.Enabled || s.state == streamAllocatorStateStable || !track.IsManaged() {
|
||||
if !s.enabled || s.state == streamAllocatorStateStable || !track.IsManaged() {
|
||||
update := NewStreamStateUpdate()
|
||||
allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal)
|
||||
updateStreamStateChange(track, allocation, update)
|
||||
@@ -1096,7 +1130,7 @@ boost_loop:
|
||||
}
|
||||
|
||||
func (s *StreamAllocator) allocateAllTracks() {
|
||||
if !s.params.Config.Enabled {
|
||||
if !s.enabled {
|
||||
// nothing else to do when disabled
|
||||
return
|
||||
}
|
||||
@@ -1276,7 +1310,7 @@ func (s *StreamAllocator) newChannelObserverProbe() *ChannelObserver {
|
||||
return NewChannelObserver(
|
||||
ChannelObserverParams{
|
||||
Name: "probe",
|
||||
Config: s.params.Config.ChannelObserverProbeConfig,
|
||||
Config: s.params.Config.ChannelObserverProbe,
|
||||
},
|
||||
s.params.Logger,
|
||||
)
|
||||
@@ -1286,7 +1320,7 @@ func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver {
|
||||
return NewChannelObserver(
|
||||
ChannelObserverParams{
|
||||
Name: "non-probe",
|
||||
Config: s.params.Config.ChannelObserverNonProbeConfig,
|
||||
Config: s.params.Config.ChannelObserverNonProbe,
|
||||
},
|
||||
s.params.Logger,
|
||||
)
|
||||
@@ -1325,10 +1359,10 @@ func (s *StreamAllocator) maybeProbe() {
|
||||
}
|
||||
|
||||
switch s.params.Config.ProbeMode {
|
||||
case config.CongestionControlProbeModeMedia:
|
||||
case ProbeModeMedia:
|
||||
s.maybeProbeWithMedia()
|
||||
s.adjustState()
|
||||
case config.CongestionControlProbeModePadding:
|
||||
case ProbeModePadding:
|
||||
s.maybeProbeWithPadding()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,15 +70,41 @@ func trendDetectorSampleListToString(samples []trendDetectorSample) string {
|
||||
|
||||
// ------------------------------------------------
|
||||
|
||||
type TrendDetectorConfig struct {
|
||||
RequiredSamples int `yaml:"required_samples,omitempty"`
|
||||
RequiredSamplesMin int `yaml:"required_samples_min,omitempty"`
|
||||
DownwardTrendThreshold float64 `yaml:"downward_trend_threshold,omitempty"`
|
||||
DownwardTrendMaxWait time.Duration `yaml:"downward_trend_max_wait,omitempty"`
|
||||
CollapseThreshold time.Duration `yaml:"collapse_threshold,omitempty"`
|
||||
ValidityWindow time.Duration `yaml:"validity_window,omitempty"`
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultTrendDetectorConfigProbe = TrendDetectorConfig{
|
||||
RequiredSamples: 3,
|
||||
RequiredSamplesMin: 3,
|
||||
DownwardTrendThreshold: 0.0,
|
||||
DownwardTrendMaxWait: 5 * time.Second,
|
||||
CollapseThreshold: 0,
|
||||
ValidityWindow: 10 * time.Second,
|
||||
}
|
||||
|
||||
DefaultTrendDetectorConfigNonProbe = TrendDetectorConfig{
|
||||
RequiredSamples: 12,
|
||||
RequiredSamplesMin: 8,
|
||||
DownwardTrendThreshold: -0.6,
|
||||
DownwardTrendMaxWait: 5 * time.Second,
|
||||
CollapseThreshold: 500 * time.Millisecond,
|
||||
ValidityWindow: 10 * time.Second,
|
||||
}
|
||||
)
|
||||
|
||||
// ------------------------------------------------
|
||||
|
||||
type TrendDetectorParams struct {
|
||||
Name string
|
||||
Logger logger.Logger
|
||||
RequiredSamples int
|
||||
RequiredSamplesMin int
|
||||
DownwardTrendThreshold float64
|
||||
DownwardTrendMaxWait time.Duration
|
||||
CollapseThreshold time.Duration
|
||||
ValidityWindow time.Duration
|
||||
Name string
|
||||
Logger logger.Logger
|
||||
Config TrendDetectorConfig
|
||||
}
|
||||
|
||||
type TrendDetector struct {
|
||||
@@ -134,7 +160,7 @@ func (t *TrendDetector) AddValue(value int64) {
|
||||
if len(t.samples) != 0 {
|
||||
lastSample = &t.samples[len(t.samples)-1]
|
||||
}
|
||||
if lastSample != nil && lastSample.value == value && t.params.CollapseThreshold > 0 && time.Since(lastSample.at) < t.params.CollapseThreshold {
|
||||
if lastSample != nil && lastSample.value == value && t.params.Config.CollapseThreshold > 0 && time.Since(lastSample.at) < t.params.Config.CollapseThreshold {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -156,7 +182,7 @@ func (t *TrendDetector) GetDirection() TrendDirection {
|
||||
}
|
||||
|
||||
func (t *TrendDetector) HasEnoughSamples() bool {
|
||||
return t.numSamples >= t.params.RequiredSamples
|
||||
return t.numSamples >= t.params.Config.RequiredSamples
|
||||
}
|
||||
|
||||
func (t *TrendDetector) ToString() string {
|
||||
@@ -173,13 +199,13 @@ func (t *TrendDetector) prune() {
|
||||
// prune based on a few rules
|
||||
|
||||
// 1. If there are more than required samples
|
||||
if len(t.samples) > t.params.RequiredSamples {
|
||||
t.samples = t.samples[len(t.samples)-t.params.RequiredSamples:]
|
||||
if len(t.samples) > t.params.Config.RequiredSamples {
|
||||
t.samples = t.samples[len(t.samples)-t.params.Config.RequiredSamples:]
|
||||
}
|
||||
|
||||
// 2. drop samples that are too old
|
||||
if len(t.samples) != 0 && t.params.ValidityWindow > 0 {
|
||||
cutoffTime := time.Now().Add(-t.params.ValidityWindow)
|
||||
if len(t.samples) != 0 && t.params.Config.ValidityWindow > 0 {
|
||||
cutoffTime := time.Now().Add(-t.params.Config.ValidityWindow)
|
||||
cutoffIndex := -1
|
||||
for i := 0; i < len(t.samples); i++ {
|
||||
if t.samples[i].at.After(cutoffTime) {
|
||||
@@ -213,7 +239,7 @@ func (t *TrendDetector) prune() {
|
||||
}
|
||||
|
||||
func (t *TrendDetector) updateDirection() {
|
||||
if len(t.samples) < t.params.RequiredSamplesMin {
|
||||
if len(t.samples) < t.params.Config.RequiredSamplesMin {
|
||||
t.direction = TrendDirectionNeutral
|
||||
return
|
||||
}
|
||||
@@ -223,9 +249,9 @@ func (t *TrendDetector) updateDirection() {
|
||||
|
||||
t.direction = TrendDirectionNeutral
|
||||
switch {
|
||||
case kt > 0 && len(t.samples) >= t.params.RequiredSamples:
|
||||
case kt > 0 && len(t.samples) >= t.params.Config.RequiredSamples:
|
||||
t.direction = TrendDirectionUpward
|
||||
case kt < t.params.DownwardTrendThreshold && (len(t.samples) >= t.params.RequiredSamples || t.samples[len(t.samples)-1].at.Sub(t.samples[0].at) > t.params.DownwardTrendMaxWait):
|
||||
case kt < t.params.Config.DownwardTrendThreshold && (len(t.samples) >= t.params.Config.RequiredSamples || t.samples[len(t.samples)-1].at.Sub(t.samples[0].at) > t.params.Config.DownwardTrendMaxWait):
|
||||
t.direction = TrendDirectionDownward
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
|
||||
@@ -30,8 +29,42 @@ const (
|
||||
frameRateDecreaseFactor = 0.9 // fast decrease
|
||||
)
|
||||
|
||||
// -------------------------------------------------------
|
||||
|
||||
type StreamTrackerFrameConfig struct {
|
||||
MinFPS float64 `yaml:"min_fps,omitempty"`
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultStreamTrackerFrameConfigVideo = map[int32]StreamTrackerFrameConfig{
|
||||
0: {
|
||||
MinFPS: 5.0,
|
||||
},
|
||||
1: {
|
||||
MinFPS: 5.0,
|
||||
},
|
||||
2: {
|
||||
MinFPS: 5.0,
|
||||
},
|
||||
}
|
||||
|
||||
DefaultStreamTrackerFrameConfigScreenshare = map[int32]StreamTrackerFrameConfig{
|
||||
0: {
|
||||
MinFPS: 0.5,
|
||||
},
|
||||
1: {
|
||||
MinFPS: 0.5,
|
||||
},
|
||||
2: {
|
||||
MinFPS: 0.5,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
// -------------------------------------------------------
|
||||
|
||||
type StreamTrackerFrameParams struct {
|
||||
Config config.StreamTrackerFrameConfig
|
||||
Config StreamTrackerFrameConfig
|
||||
ClockRate uint32
|
||||
Logger logger.Logger
|
||||
}
|
||||
|
||||
@@ -17,12 +17,56 @@ package streamtracker
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
|
||||
// --------------------------------------------
|
||||
|
||||
type StreamTrackerPacketConfig struct {
|
||||
SamplesRequired uint32 `yaml:"samples_required,omitempty"` // number of samples needed per cycle
|
||||
CyclesRequired uint32 `yaml:"cycles_required,omitempty"` // number of cycles needed to be active
|
||||
CycleDuration time.Duration `yaml:"cycle_duration,omitempty"`
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultStreamTrackerPacketConfigVideo = map[int32]StreamTrackerPacketConfig{
|
||||
0: {SamplesRequired: 1,
|
||||
CyclesRequired: 4,
|
||||
CycleDuration: 500 * time.Millisecond,
|
||||
},
|
||||
1: {SamplesRequired: 5,
|
||||
CyclesRequired: 20,
|
||||
CycleDuration: 500 * time.Millisecond,
|
||||
},
|
||||
2: {SamplesRequired: 5,
|
||||
CyclesRequired: 20,
|
||||
CycleDuration: 500 * time.Millisecond,
|
||||
},
|
||||
}
|
||||
|
||||
DefaultStreamTrackerPacketConfigScreenshare = map[int32]StreamTrackerPacketConfig{
|
||||
0: {
|
||||
SamplesRequired: 1,
|
||||
CyclesRequired: 1,
|
||||
CycleDuration: 2 * time.Second,
|
||||
},
|
||||
1: {
|
||||
SamplesRequired: 1,
|
||||
CyclesRequired: 1,
|
||||
CycleDuration: 2 * time.Second,
|
||||
},
|
||||
2: {
|
||||
SamplesRequired: 1,
|
||||
CyclesRequired: 1,
|
||||
CycleDuration: 2 * time.Second,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
// --------------------------------------------
|
||||
|
||||
type StreamTrackerPacketParams struct {
|
||||
Config config.StreamTrackerPacketConfig
|
||||
Config StreamTrackerPacketConfig
|
||||
Logger logger.Logger
|
||||
}
|
||||
|
||||
|
||||
@@ -23,14 +23,13 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/testutils"
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
|
||||
func newStreamTrackerPacket(samplesRequired uint32, cyclesRequired uint32, cycleDuration time.Duration) *StreamTracker {
|
||||
stp := NewStreamTrackerPacket(StreamTrackerPacketParams{
|
||||
Config: config.StreamTrackerPacketConfig{
|
||||
Config: StreamTrackerPacketConfig{
|
||||
SamplesRequired: samplesRequired,
|
||||
CyclesRequired: cyclesRequired,
|
||||
CycleDuration: cycleDuration,
|
||||
|
||||
@@ -26,7 +26,6 @@ import (
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/utils"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/streamtracker"
|
||||
)
|
||||
@@ -44,13 +43,79 @@ type StreamTrackerManagerListener interface {
|
||||
|
||||
// ---------------------------------------------------
|
||||
|
||||
type (
|
||||
StreamTrackerType string
|
||||
)
|
||||
|
||||
const (
|
||||
StreamTrackerTypePacket StreamTrackerType = "packet"
|
||||
StreamTrackerTypeFrame StreamTrackerType = "frame"
|
||||
)
|
||||
|
||||
type StreamTrackerPacketConfig struct {
|
||||
SamplesRequired uint32 `yaml:"samples_required,omitempty"` // number of samples needed per cycle
|
||||
CyclesRequired uint32 `yaml:"cycles_required,omitempty"` // number of cycles needed to be active
|
||||
CycleDuration time.Duration `yaml:"cycle_duration,omitempty"`
|
||||
}
|
||||
|
||||
type StreamTrackerFrameConfig struct {
|
||||
MinFPS float64 `yaml:"min_fps,omitempty"`
|
||||
}
|
||||
|
||||
type StreamTrackerConfig struct {
|
||||
StreamTrackerType StreamTrackerType `yaml:"stream_tracker_type,omitempty"`
|
||||
BitrateReportInterval map[int32]time.Duration `yaml:"bitrate_report_interval,omitempty"`
|
||||
PacketTracker map[int32]streamtracker.StreamTrackerPacketConfig `yaml:"packet_tracker,omitempty"`
|
||||
FrameTracker map[int32]streamtracker.StreamTrackerFrameConfig `yaml:"frame_tracker,omitempty"`
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultStreamTrackerConfigVideo = StreamTrackerConfig{
|
||||
StreamTrackerType: StreamTrackerTypePacket,
|
||||
BitrateReportInterval: map[int32]time.Duration{
|
||||
0: 1 * time.Second,
|
||||
1: 1 * time.Second,
|
||||
2: 1 * time.Second,
|
||||
},
|
||||
PacketTracker: streamtracker.DefaultStreamTrackerPacketConfigVideo,
|
||||
FrameTracker: streamtracker.DefaultStreamTrackerFrameConfigVideo,
|
||||
}
|
||||
|
||||
DefaultStreamTrackerConfigScreenshare = StreamTrackerConfig{
|
||||
StreamTrackerType: StreamTrackerTypePacket,
|
||||
BitrateReportInterval: map[int32]time.Duration{
|
||||
0: 4 * time.Second,
|
||||
1: 4 * time.Second,
|
||||
2: 4 * time.Second,
|
||||
},
|
||||
PacketTracker: streamtracker.DefaultStreamTrackerPacketConfigScreenshare,
|
||||
FrameTracker: streamtracker.DefaultStreamTrackerFrameConfigScreenshare,
|
||||
}
|
||||
)
|
||||
|
||||
// ---------------------------------------------------
|
||||
|
||||
type StreamTrackerManagerConfig struct {
|
||||
Video StreamTrackerConfig `yaml:"video,omitempty"`
|
||||
Screenshare StreamTrackerConfig `yaml:"screenshare,omitempty"`
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultStreamTrackerManagerConfig = StreamTrackerManagerConfig{
|
||||
Video: DefaultStreamTrackerConfigVideo,
|
||||
Screenshare: DefaultStreamTrackerConfigScreenshare,
|
||||
}
|
||||
)
|
||||
|
||||
// ---------------------------------------------------
|
||||
|
||||
type StreamTrackerManager struct {
|
||||
logger logger.Logger
|
||||
trackInfo atomic.Pointer[livekit.TrackInfo]
|
||||
isSVC bool
|
||||
clockRate uint32
|
||||
|
||||
trackerConfig config.StreamTrackerConfig
|
||||
trackerConfig StreamTrackerConfig
|
||||
|
||||
lock sync.RWMutex
|
||||
maxPublishedLayer int32
|
||||
@@ -73,7 +138,7 @@ func NewStreamTrackerManager(
|
||||
trackInfo *livekit.TrackInfo,
|
||||
isSVC bool,
|
||||
clockRate uint32,
|
||||
trackersConfig config.StreamTrackersConfig,
|
||||
config StreamTrackerManagerConfig,
|
||||
) *StreamTrackerManager {
|
||||
s := &StreamTrackerManager{
|
||||
logger: logger,
|
||||
@@ -86,11 +151,11 @@ func NewStreamTrackerManager(
|
||||
|
||||
switch trackInfo.Source {
|
||||
case livekit.TrackSource_SCREEN_SHARE:
|
||||
s.trackerConfig = trackersConfig.Screenshare
|
||||
s.trackerConfig = config.Screenshare
|
||||
case livekit.TrackSource_CAMERA:
|
||||
s.trackerConfig = trackersConfig.Video
|
||||
s.trackerConfig = config.Video
|
||||
default:
|
||||
s.trackerConfig = trackersConfig.Video
|
||||
s.trackerConfig = config.Video
|
||||
}
|
||||
|
||||
s.maxExpectedLayerFromTrackInfo()
|
||||
@@ -193,9 +258,9 @@ func (s *StreamTrackerManager) AddTracker(layer int32) streamtracker.StreamTrack
|
||||
if tracker == nil {
|
||||
var trackerImpl streamtracker.StreamTrackerImpl
|
||||
switch s.trackerConfig.StreamTrackerType {
|
||||
case config.StreamTrackerTypePacket:
|
||||
case StreamTrackerTypePacket:
|
||||
trackerImpl = s.createStreamTrackerPacket(layer)
|
||||
case config.StreamTrackerTypeFrame:
|
||||
case StreamTrackerTypeFrame:
|
||||
trackerImpl = s.createStreamTrackerFrame(layer)
|
||||
}
|
||||
if trackerImpl == nil {
|
||||
|
||||
@@ -21,7 +21,6 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/rpc"
|
||||
"github.com/livekit/protocol/utils/hwstats"
|
||||
@@ -29,6 +28,8 @@ import (
|
||||
|
||||
const (
|
||||
livekitNamespace string = "livekit"
|
||||
|
||||
statsUpdateInterval = time.Second * 10
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -183,7 +184,7 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats
|
||||
updatedAt := time.Now().Unix()
|
||||
elapsed := updatedAt - prevAverage.UpdatedAt
|
||||
// include sufficient buffer to be sure a stats update had taken place
|
||||
computeAverage := elapsed > int64(config.StatsUpdateInterval.Seconds()+2)
|
||||
computeAverage := elapsed > int64(statsUpdateInterval.Seconds()+2)
|
||||
if bytesInNow != prevAverage.BytesIn ||
|
||||
bytesOutNow != prevAverage.BytesOut ||
|
||||
packetsInNow != prevAverage.PacketsIn ||
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"github.com/frostbyte73/core"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/utils"
|
||||
)
|
||||
@@ -122,7 +121,7 @@ func (s *BytesTrackStats) report() {
|
||||
}
|
||||
|
||||
func (s *BytesTrackStats) reporter() {
|
||||
ticker := time.NewTicker(config.TelemetryNonMediaStatsUpdateInterval)
|
||||
ticker := time.NewTicker(telemetryNonMediaStatsUpdateInterval)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
s.report()
|
||||
|
||||
@@ -19,7 +19,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/utils"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
@@ -87,6 +86,9 @@ type TelemetryService interface {
|
||||
const (
|
||||
workerCleanupWait = 3 * time.Minute
|
||||
jobsQueueMinSize = 2048
|
||||
|
||||
telemetryStatsUpdateInterval = time.Second * 30
|
||||
telemetryNonMediaStatsUpdateInterval = time.Minute * 5
|
||||
)
|
||||
|
||||
type telemetryService struct {
|
||||
@@ -172,7 +174,7 @@ func (t *telemetryService) FlushStats() {
|
||||
}
|
||||
|
||||
func (t *telemetryService) run() {
|
||||
for range time.Tick(config.TelemetryStatsUpdateInterval) {
|
||||
for range time.Tick(telemetryStatsUpdateInterval) {
|
||||
t.FlushStats()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user