From 0bc932e57ea33a0e5971443c4386f057108ed438 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sun, 22 Oct 2023 23:43:03 -0700 Subject: [PATCH 1/2] fix config typo (#2172) * fix config typo * tidy * add sample config * cleanup --- config-sample.yaml | 17 ++++++++++++++++- pkg/config/config.go | 10 +++++----- pkg/service/roomservice.go | 14 +++++++------- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/config-sample.yaml b/config-sample.yaml index 6025a3a9b..95fc9f7fe 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -162,7 +162,7 @@ keys: # enabled: true # min: 100 # max: 2000 -# # improves A/V sync when playout_delay set to a value larger than 200ms. It will disables transceiver re-use +# # improves A/V sync when playout_delay set to a value larger than 200ms. It will disables transceiver re-use # # so not recommended for rooms with frequent subscription changes # sync_streams: true @@ -193,6 +193,21 @@ keys: # # number of messages to buffer before dropping # stream_buffer_size: 1000 +# PSRPC +# since v1.5.1, a more reliable, psrpc based signal relay is available +# this gives us the ability to reliably proxy messages between a signal server and RTC node +# psrpc: +# # enable the internal psrpc api client for roomservice api calls +# enabled: true +# # maximum number of rpc attempts +# max_attempts: 3 +# # initial time to wait for calls to complete +# timeout: 500ms +# # amount of time added to the timeout after each failure +# backoff: 500ms +# # number of messages to buffer before dropping +# buffer_size: 1000 + # customize audio level sensitivity # audio: # # minimum level to be considered active, 0-127, where 0 is loudest diff --git a/pkg/config/config.go b/pkg/config/config.go index 06af44c34..048096c71 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -271,11 +271,11 @@ type SignalRelayConfig struct { } type PSRPCConfig struct { - Enable bool `yaml:"enable,omitempty"` - MaxAttempts int `yaml:"retry_attempts,omitempty"` - Timeout time.Duration `yaml:"retry_timeout,omitempty"` - Backoff time.Duration `yaml:"retry_backoff,omitempty"` - BufferSize int `yaml:"stream_buffer_size,omitempty"` + Enabled bool `yaml:"enabled,omitempty"` + MaxAttempts int `yaml:"max_attempts,omitempty"` + Timeout time.Duration `yaml:"timeout,omitempty"` + Backoff time.Duration `yaml:"backoff,omitempty"` + BufferSize int `yaml:"buffer_size,omitempty"` } // RegionConfig lists available regions and their latitude/longitude, so the selector would prefer diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index fe49550f9..2f75ac00c 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -151,7 +151,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq return nil, twirpAuthError(err) } - if s.psrpcConf.Enable { + if s.psrpcConf.Enabled { return s.roomClient.DeleteRoom(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) } @@ -228,7 +228,7 @@ func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomPa return nil, twirp.NotFoundError("participant not found") } - if s.psrpcConf.Enable { + if s.psrpcConf.Enabled { return s.roomClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) } @@ -264,7 +264,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR return nil, twirpAuthError(err) } - if s.psrpcConf.Enable { + if s.psrpcConf.Enabled { return s.roomClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) } @@ -318,7 +318,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update return nil, twirpAuthError(err) } - if s.psrpcConf.Enable { + if s.psrpcConf.Enabled { return s.roomClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) } @@ -367,7 +367,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda return nil, twirpAuthError(err) } - if s.psrpcConf.Enable { + if s.psrpcConf.Enabled { return s.roomClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) } @@ -390,7 +390,7 @@ func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest return nil, twirpAuthError(err) } - if s.psrpcConf.Enable { + if s.psrpcConf.Enabled { return s.roomClient.SendData(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) } @@ -432,7 +432,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return nil, err } - if s.psrpcConf.Enable { + if s.psrpcConf.Enabled { _, err := s.roomClient.UpdateRoomMetadata(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) if err != nil { return nil, err From eca32792b872dbec66df1829c117b7a220c182f4 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Mon, 23 Oct 2023 15:03:58 +0800 Subject: [PATCH 2/2] Add configuration to limit MaxBufferedAmount for data channel (#2170) * Add configuration to limit MaxBufferedAmount for data channel * comment * Fix generate flags * fix test * Don't disconnect slow subscriber --- config-sample.yaml | 9 ++++++ pkg/config/config.go | 12 ++++++- pkg/rtc/errors.go | 1 + pkg/rtc/participant.go | 36 +++++++++++---------- pkg/rtc/room.go | 2 +- pkg/rtc/transport.go | 33 +++++++++++-------- pkg/rtc/transportmanager.go | 64 +++++++++++++++++++------------------ pkg/service/roommanager.go | 1 + 8 files changed, 94 insertions(+), 64 deletions(-) diff --git a/config-sample.yaml b/config-sample.yaml index 95fc9f7fe..b204340c4 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -112,6 +112,15 @@ rtc: # batch_io: # batch_size: 128 # max_flush_interval: 2ms + # # force a reconnect on a publication error + # reconnect_on_publication_error: true + # # force a reconnect on a subscription error + # reconnect_on_subscription_error: true + # # force a reconnect on a data channel error + # reconnect_on_data_channel_error: true + # # max number of bytes to buffer for data channel. 0 means unlimited. + # # when this limit is breached, data messages will be dropped till the buffered amount drops below this limit. + # data_channel_max_buffered_amount: 0 # when enabled, LiveKit will expose prometheus metrics on :6789/metrics # prometheus_port: 6789 diff --git a/pkg/config/config.go b/pkg/config/config.go index 048096c71..13d780d81 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -106,6 +106,9 @@ type RTCConfig struct { // force a reconnect on a data channel error ReconnectOnDataChannelError *bool `yaml:"reconnect_on_data_channel_error,omitempty"` + + // max number of bytes to buffer for data channel. 0 means unlimited + DataChannelMaxBufferedAmount uint64 `yaml:"data_channel_max_buffered_amount,omitempty"` } type TURNServer struct { @@ -723,6 +726,13 @@ func GenerateCLIFlags(existingFlags []cli.Flag, hidden bool) ([]cli.Flag, error) Usage: generatedCLIFlagUsage, Hidden: hidden, } + case reflect.Uint64: + flag = &cli.Uint64Flag{ + Name: name, + EnvVars: []string{envVar}, + Usage: generatedCLIFlagUsage, + Hidden: hidden, + } case reflect.Float32: flag = &cli.Float64Flag{ Name: name, @@ -784,7 +794,7 @@ func (conf *Config) updateFromCLI(c *cli.Context, baseFlags []cli.Flag) error { configValue.SetString(c.String(flagName)) case reflect.Int, reflect.Int32, reflect.Int64: configValue.SetInt(c.Int64(flagName)) - case reflect.Uint8, reflect.Uint16, reflect.Uint32: + case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: configValue.SetUint(c.Uint64(flagName)) case reflect.Float32: configValue.SetFloat(c.Float64(flagName)) diff --git a/pkg/rtc/errors.go b/pkg/rtc/errors.go index c8f66073b..32df7d91f 100644 --- a/pkg/rtc/errors.go +++ b/pkg/rtc/errors.go @@ -23,6 +23,7 @@ var ( ErrLimitExceeded = errors.New("node has exceeded its configured limit") ErrAlreadyJoined = errors.New("a participant with the same identity is already in the room") ErrDataChannelUnavailable = errors.New("data channel is not available") + ErrDataChannelBufferFull = errors.New("data channel buffer is full") ErrTransportFailure = errors.New("transport failure") ErrEmptyIdentity = errors.New("participant identity cannot be empty") ErrEmptyParticipantID = errors.New("participant ID cannot be empty") diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index f09b13d2f..701482ff8 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -118,6 +118,7 @@ type ParticipantParams struct { ReconnectOnPublicationError bool ReconnectOnSubscriptionError bool ReconnectOnDataChannelError bool + DataChannelMaxBufferedAmount uint64 VersionGenerator utils.TimedVersionGenerator TrackResolver types.MediaTrackResolver DisableDynacast bool @@ -1103,22 +1104,23 @@ func (p *ParticipantImpl) setupTransportManager() error { SID: p.params.SID, // primary connection does not change, canSubscribe can change if permission was updated // after the participant has joined - SubscriberAsPrimary: p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe(), - Config: p.params.Config, - ProtocolVersion: p.params.ProtocolVersion, - Telemetry: p.params.Telemetry, - CongestionControlConfig: p.params.CongestionControlConfig, - EnabledCodecs: p.params.EnabledCodecs, - SimTracks: p.params.SimTracks, - ClientConf: p.params.ClientConf, - ClientInfo: p.params.ClientInfo, - Migration: p.params.Migration, - AllowTCPFallback: p.params.AllowTCPFallback, - TCPFallbackRTTThreshold: p.params.TCPFallbackRTTThreshold, - AllowUDPUnstableFallback: p.params.AllowUDPUnstableFallback, - TURNSEnabled: p.params.TURNSEnabled, - AllowPlayoutDelay: p.params.PlayoutDelay.GetEnabled(), - Logger: p.params.Logger.WithComponent(sutils.ComponentTransport), + SubscriberAsPrimary: p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe(), + Config: p.params.Config, + ProtocolVersion: p.params.ProtocolVersion, + Telemetry: p.params.Telemetry, + CongestionControlConfig: p.params.CongestionControlConfig, + EnabledCodecs: p.params.EnabledCodecs, + SimTracks: p.params.SimTracks, + ClientConf: p.params.ClientConf, + ClientInfo: p.params.ClientInfo, + Migration: p.params.Migration, + AllowTCPFallback: p.params.AllowTCPFallback, + TCPFallbackRTTThreshold: p.params.TCPFallbackRTTThreshold, + AllowUDPUnstableFallback: p.params.AllowUDPUnstableFallback, + TURNSEnabled: p.params.TURNSEnabled, + AllowPlayoutDelay: p.params.PlayoutDelay.GetEnabled(), + DataChannelMaxBufferedAmount: p.params.DataChannelMaxBufferedAmount, + Logger: p.params.Logger.WithComponent(sutils.ComponentTransport), } if p.params.SyncStreams && p.params.PlayoutDelay.GetEnabled() && p.params.ClientInfo.isFirefox() { // we will disable playout delay for Firefox if the user is expecting @@ -2303,7 +2305,7 @@ func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket, data []byte) er err := p.TransportManager.SendDataPacket(dp, data) if err != nil { if (errors.Is(err, sctp.ErrStreamClosed) || errors.Is(err, io.ErrClosedPipe)) && p.params.ReconnectOnDataChannelError { - p.params.Logger.Infow("issuing full reconnect on data channel error") + p.params.Logger.Infow("issuing full reconnect on data channel error", "error", err) p.IssueFullReconnect(types.ParticipantCloseReasonDataChannelError) } } else { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index e797af6ba..77ef5d1bb 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -1354,7 +1354,7 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp utils.ParallelExec(destParticipants, dataForwardLoadBalanceThreshold, 1, func(op types.LocalParticipant) { err := op.SendDataPacket(dp, dpData) if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, sctp.ErrStreamClosed) && - !errors.Is(err, ErrTransportFailure) { + !errors.Is(err, ErrTransportFailure) && !errors.Is(err, ErrDataChannelBufferFull) { op.GetLogger().Infow("send data packet error", "error", err) } }) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index a24826c11..7d285eb27 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -242,20 +242,21 @@ type PCTransport struct { } type TransportParams struct { - ParticipantID livekit.ParticipantID - ParticipantIdentity livekit.ParticipantIdentity - ProtocolVersion types.ProtocolVersion - Config *WebRTCConfig - DirectionConfig DirectionConfig - CongestionControlConfig config.CongestionControlConfig - Telemetry telemetry.TelemetryService - EnabledCodecs []*livekit.Codec - Logger logger.Logger - SimTracks map[uint32]SimulcastTrackInfo - ClientInfo ClientInfo - IsOfferer bool - IsSendSide bool - AllowPlayoutDelay bool + ParticipantID livekit.ParticipantID + ParticipantIdentity livekit.ParticipantIdentity + ProtocolVersion types.ProtocolVersion + Config *WebRTCConfig + DirectionConfig DirectionConfig + CongestionControlConfig config.CongestionControlConfig + Telemetry telemetry.TelemetryService + EnabledCodecs []*livekit.Codec + Logger logger.Logger + SimTracks map[uint32]SimulcastTrackInfo + ClientInfo ClientInfo + IsOfferer bool + IsSendSide bool + AllowPlayoutDelay bool + DataChannelMaxBufferedAmount uint64 } func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimator cc.BandwidthEstimator)) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) { @@ -930,6 +931,10 @@ func (t *PCTransport) SendDataPacket(dp *livekit.DataPacket, data []byte) error return ErrTransportFailure } + if t.params.DataChannelMaxBufferedAmount > 0 && dc.BufferedAmount() > t.params.DataChannelMaxBufferedAmount { + return ErrDataChannelBufferFull + } + return dc.Send(data) } diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index a36291f19..0864bf490 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -49,24 +49,25 @@ const ( ) type TransportManagerParams struct { - Identity livekit.ParticipantIdentity - SID livekit.ParticipantID - SubscriberAsPrimary bool - Config *WebRTCConfig - ProtocolVersion types.ProtocolVersion - Telemetry telemetry.TelemetryService - CongestionControlConfig config.CongestionControlConfig - EnabledCodecs []*livekit.Codec - SimTracks map[uint32]SimulcastTrackInfo - ClientConf *livekit.ClientConfiguration - ClientInfo ClientInfo - Migration bool - AllowTCPFallback bool - TCPFallbackRTTThreshold int - AllowUDPUnstableFallback bool - TURNSEnabled bool - AllowPlayoutDelay bool - Logger logger.Logger + Identity livekit.ParticipantIdentity + SID livekit.ParticipantID + SubscriberAsPrimary bool + Config *WebRTCConfig + ProtocolVersion types.ProtocolVersion + Telemetry telemetry.TelemetryService + CongestionControlConfig config.CongestionControlConfig + EnabledCodecs []*livekit.Codec + SimTracks map[uint32]SimulcastTrackInfo + ClientConf *livekit.ClientConfiguration + ClientInfo ClientInfo + Migration bool + AllowTCPFallback bool + TCPFallbackRTTThreshold int + AllowUDPUnstableFallback bool + TURNSEnabled bool + AllowPlayoutDelay bool + DataChannelMaxBufferedAmount uint64 + Logger logger.Logger } type TransportManager struct { @@ -173,19 +174,20 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro }) subscriber, err := NewPCTransport(TransportParams{ - ParticipantID: params.SID, - ParticipantIdentity: params.Identity, - ProtocolVersion: params.ProtocolVersion, - Config: params.Config, - DirectionConfig: params.Config.Subscriber, - CongestionControlConfig: params.CongestionControlConfig, - Telemetry: params.Telemetry, - EnabledCodecs: subscribeCodecs, - Logger: LoggerWithPCTarget(params.Logger, livekit.SignalTarget_SUBSCRIBER), - ClientInfo: params.ClientInfo, - IsOfferer: true, - IsSendSide: true, - AllowPlayoutDelay: params.AllowPlayoutDelay, + ParticipantID: params.SID, + ParticipantIdentity: params.Identity, + ProtocolVersion: params.ProtocolVersion, + Config: params.Config, + DirectionConfig: params.Config.Subscriber, + CongestionControlConfig: params.CongestionControlConfig, + Telemetry: params.Telemetry, + EnabledCodecs: subscribeCodecs, + Logger: LoggerWithPCTarget(params.Logger, livekit.SignalTarget_SUBSCRIBER), + ClientInfo: params.ClientInfo, + IsOfferer: true, + IsSendSide: true, + AllowPlayoutDelay: params.AllowPlayoutDelay, + DataChannelMaxBufferedAmount: params.DataChannelMaxBufferedAmount, }) if err != nil { return nil, err diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 71b9cf5f9..8a21933dd 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -409,6 +409,7 @@ func (r *RoomManager) StartSession( ReconnectOnPublicationError: reconnectOnPublicationError, ReconnectOnSubscriptionError: reconnectOnSubscriptionError, ReconnectOnDataChannelError: reconnectOnDataChannelError, + DataChannelMaxBufferedAmount: r.config.RTC.DataChannelMaxBufferedAmount, VersionGenerator: r.versionGenerator, TrackResolver: room.ResolveMediaTrackForSubscriber, SubscriberAllowPause: subscriberAllowPause,