From 6895eff496027fc08d7913389d0ea86fb27ddd48 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 21 Feb 2024 22:58:56 +0530 Subject: [PATCH] Buffer size config for video and audio. (#2498) * Buffer size config for video and audio. There was only one buffer size in config. In upstream, config value was used for video. Audio used a hard coded value of 200 packets. But, in the down stream sequencer, the config value was used for both video and audio. So, if video was set up for high bit rate (deep buffers), audio sequencer ended up using a lot of memory too in sequencer. Split config to be able to control that and also not hard code audio. Another optimisation here would be to not instantiate sequencer unkess NACK is negotiated. * deprecate packet_buffer_size --- config-sample.yaml | 7 ++++--- pkg/config/config.go | 14 ++++++++++---- pkg/rtc/config.go | 12 ++++++++++-- pkg/rtc/mediatracksubscriptions.go | 5 ++++- pkg/rtc/participant_internal_test.go | 2 +- pkg/rtc/room.go | 4 ++-- pkg/sfu/buffer/factory.go | 6 +++--- 7 files changed, 34 insertions(+), 16 deletions(-) diff --git a/config-sample.yaml b/config-sample.yaml index ae34d759f..da88700be 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -94,8 +94,10 @@ rtc: # allow_pause: true # # allows automatic connection fallback to TCP and TURN/TLS (if configured) when UDP has been unstable, default true # allow_tcp_fallback: true - # # number of packets to buffer in the SFU, defaults to 500 - # packet_buffer_size: 500 + # # number of packets to buffer in the SFU for video, defaults to 500 + # packet_buffer_size_video: 500 + # # number of packets to buffer in the SFU for audio, defaults to 200 + # packet_buffer_size_audio: 200 # # minimum amount of time between pli/fir rtcp packets being sent to an individual # # producer. Increasing these times can lead to longer black screens when new participants join, # # while reducing them can lead to higher stream bitrate. @@ -309,4 +311,3 @@ keys: # # value less or equal than 0 means no limit. # subscription_limit_video: 0 # subscription_limit_audio: 0 - diff --git a/pkg/config/config.go b/pkg/config/config.go index f6f0891a3..c7d86c0bd 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -76,7 +76,7 @@ type Config struct { Region string `yaml:"region,omitempty"` SignalRelay SignalRelayConfig `yaml:"signal_relay,omitempty"` PSRPC rpc.PSRPCConfig `yaml:"psrpc,omitempty"` - // LogLevel is deprecated + // Deprecated: LogLevel is deprecated LogLevel string `yaml:"log_level,omitempty"` Logging LoggingConfig `yaml:"logging,omitempty"` Limit LimitConfig `yaml:"limit,omitempty"` @@ -91,8 +91,12 @@ type RTCConfig struct { StrictACKs bool `yaml:"strict_acks,omitempty"` - // Number of packets to buffer for NACK + // Deprecated: use PacketBufferSizeVideo and PacketBufferSizeAudio PacketBufferSize int `yaml:"packet_buffer_size,omitempty"` + // Number of packets to buffer for NACK - video + PacketBufferSizeVideo int `yaml:"packet_buffer_size_video,omitempty"` + // Number of packets to buffer for NACK - audio + PacketBufferSizeAudio int `yaml:"packet_buffer_size_audio,omitempty"` // Throttle periods for pli/fir rtcp packets PLIThrottle PLIThrottleConfig `yaml:"pli_throttle,omitempty"` @@ -327,8 +331,10 @@ var DefaultConfig = Config{ ICEPortRangeEnd: 0, STUNServers: []string{}, }, - PacketBufferSize: 500, - StrictACKs: true, + PacketBufferSize: 500, + PacketBufferSizeVideo: 500, + PacketBufferSizeAudio: 200, + StrictACKs: true, PLIThrottle: PLIThrottleConfig{ LowQuality: 500 * time.Millisecond, MidQuality: time.Second, diff --git a/pkg/rtc/config.go b/pkg/rtc/config.go index 239b08608..f8a8054be 100644 --- a/pkg/rtc/config.go +++ b/pkg/rtc/config.go @@ -39,7 +39,8 @@ type WebRTCConfig struct { } type ReceiverConfig struct { - PacketBufferSize int + PacketBufferSizeVideo int + PacketBufferSizeAudio int } type RTPHeaderExtensionConfig struct { @@ -72,6 +73,12 @@ func NewWebRTCConfig(conf *config.Config) (*WebRTCConfig, error) { if rtcConf.PacketBufferSize == 0 { rtcConf.PacketBufferSize = 500 } + if rtcConf.PacketBufferSizeVideo == 0 { + rtcConf.PacketBufferSizeVideo = rtcConf.PacketBufferSize + } + if rtcConf.PacketBufferSizeAudio == 0 { + rtcConf.PacketBufferSizeAudio = rtcConf.PacketBufferSize + } // publisher configuration publisherConfig := DirectionConfig{ @@ -129,7 +136,8 @@ func NewWebRTCConfig(conf *config.Config) (*WebRTCConfig, error) { return &WebRTCConfig{ WebRTCConfig: *webRTCConfig, Receiver: ReceiverConfig{ - PacketBufferSize: rtcConf.PacketBufferSize, + PacketBufferSizeVideo: rtcConf.PacketBufferSizeVideo, + PacketBufferSizeAudio: rtcConf.PacketBufferSizeAudio, }, Publisher: publisherConfig, Subscriber: subscriberConfig, diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index d960f8de2..3f92e922d 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -103,11 +103,14 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * t.subscribedTracksMu.Unlock() var rtcpFeedback []webrtc.RTCPFeedback + var maxTrack int switch t.params.MediaTrack.Kind() { case livekit.TrackType_AUDIO: rtcpFeedback = t.params.SubscriberConfig.RTCPFeedback.Audio + maxTrack = t.params.ReceiverConfig.PacketBufferSizeAudio case livekit.TrackType_VIDEO: rtcpFeedback = t.params.SubscriberConfig.RTCPFeedback.Video + maxTrack = t.params.ReceiverConfig.PacketBufferSizeVideo } codecs := wr.Codecs() for _, c := range codecs { @@ -130,7 +133,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * BufferFactory: sub.GetBufferFactory(), SubID: subscriberID, StreamID: streamID, - MaxTrack: t.params.ReceiverConfig.PacketBufferSize, + MaxTrack: maxTrack, PlayoutDelayLimit: sub.GetPlayoutDelayConfig(), Pacer: sub.GetPacer(), Trailer: trailer, diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 5f5f8eac6..a7bd9b4e3 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -731,7 +731,7 @@ func newParticipantForTestWithOpts(identity livekit.ParticipantIdentity, opts *p if err != nil { panic(err) } - ff := buffer.NewFactoryOfBufferFactory(500) + ff := buffer.NewFactoryOfBufferFactory(500, 200) rtcConf.SetBufferFactory(ff.CreateBufferFactory()) grants := &auth.ClaimGrants{ Video: &auth.VideoGrant{}, diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 3b94a0cf9..082f91f71 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -87,7 +87,7 @@ type Room struct { joinedAt atomic.Int64 // time that the last participant left the room leftAt atomic.Int64 - holds atomic.Int32 + holds atomic.Int32 lock sync.RWMutex @@ -164,7 +164,7 @@ func NewRoom( participantOpts: make(map[livekit.ParticipantIdentity]*ParticipantOptions), participantRequestSources: make(map[livekit.ParticipantIdentity]routing.MessageSource), hasPublished: make(map[livekit.ParticipantIdentity]bool), - bufferFactory: buffer.NewFactoryOfBufferFactory(config.Receiver.PacketBufferSize), + bufferFactory: buffer.NewFactoryOfBufferFactory(config.Receiver.PacketBufferSizeVideo, config.Receiver.PacketBufferSizeAudio), batchedUpdates: make(map[livekit.ParticipantIdentity]*participantUpdate), closed: make(chan struct{}), trailer: []byte(utils.RandomSecret()), diff --git a/pkg/sfu/buffer/factory.go b/pkg/sfu/buffer/factory.go index d59038d28..2520c6cce 100644 --- a/pkg/sfu/buffer/factory.go +++ b/pkg/sfu/buffer/factory.go @@ -28,17 +28,17 @@ type FactoryOfBufferFactory struct { audioPool *sync.Pool } -func NewFactoryOfBufferFactory(trackingPackets int) *FactoryOfBufferFactory { +func NewFactoryOfBufferFactory(trackingPacketsVideo int, trackingPacketsAudio int) *FactoryOfBufferFactory { return &FactoryOfBufferFactory{ videoPool: &sync.Pool{ New: func() interface{} { - b := make([]byte, trackingPackets*bucket.MaxPktSize) + b := make([]byte, trackingPacketsVideo*bucket.MaxPktSize) return &b }, }, audioPool: &sync.Pool{ New: func() interface{} { - b := make([]byte, bucket.MaxPktSize*200) + b := make([]byte, trackingPacketsAudio*bucket.MaxPktSize) return &b }, },