Merge remote-tracking branch 'origin/master' into raja_min_packets

This commit is contained in:
boks1971
2023-10-23 13:46:21 +05:30
9 changed files with 122 additions and 77 deletions
+25 -1
View File
@@ -112,6 +112,15 @@ rtc:
# batch_io:
# batch_size: 128
# max_flush_interval: 2ms
# # force a reconnect on a publication error
# reconnect_on_publication_error: true
# # force a reconnect on a subscription error
# reconnect_on_subscription_error: true
# # force a reconnect on a data channel error
# reconnect_on_data_channel_error: true
# # max number of bytes to buffer for data channel. 0 means unlimited.
# # when this limit is breached, data messages will be dropped till the buffered amount drops below this limit.
# data_channel_max_buffered_amount: 0
# when enabled, LiveKit will expose prometheus metrics on :6789/metrics
# prometheus_port: 6789
@@ -162,7 +171,7 @@ keys:
# enabled: true
# min: 100
# max: 2000
# # improves A/V sync when playout_delay set to a value larger than 200ms. It will disables transceiver re-use
# # improves A/V sync when playout_delay set to a value larger than 200ms. It will disables transceiver re-use
# # so not recommended for rooms with frequent subscription changes
# sync_streams: true
@@ -193,6 +202,21 @@ keys:
# # number of messages to buffer before dropping
# stream_buffer_size: 1000
# PSRPC
# since v1.5.1, a more reliable, psrpc based signal relay is available
# this gives us the ability to reliably proxy messages between a signal server and RTC node
# psrpc:
# # enable the internal psrpc api client for roomservice api calls
# enabled: true
# # maximum number of rpc attempts
# max_attempts: 3
# # initial time to wait for calls to complete
# timeout: 500ms
# # amount of time added to the timeout after each failure
# backoff: 500ms
# # number of messages to buffer before dropping
# buffer_size: 1000
# customize audio level sensitivity
# audio:
# # minimum level to be considered active, 0-127, where 0 is loudest
+16 -6
View File
@@ -106,6 +106,9 @@ type RTCConfig struct {
// force a reconnect on a data channel error
ReconnectOnDataChannelError *bool `yaml:"reconnect_on_data_channel_error,omitempty"`
// max number of bytes to buffer for data channel. 0 means unlimited
DataChannelMaxBufferedAmount uint64 `yaml:"data_channel_max_buffered_amount,omitempty"`
}
type TURNServer struct {
@@ -271,11 +274,11 @@ type SignalRelayConfig struct {
}
type PSRPCConfig struct {
Enable bool `yaml:"enable,omitempty"`
MaxAttempts int `yaml:"retry_attempts,omitempty"`
Timeout time.Duration `yaml:"retry_timeout,omitempty"`
Backoff time.Duration `yaml:"retry_backoff,omitempty"`
BufferSize int `yaml:"stream_buffer_size,omitempty"`
Enabled bool `yaml:"enabled,omitempty"`
MaxAttempts int `yaml:"max_attempts,omitempty"`
Timeout time.Duration `yaml:"timeout,omitempty"`
Backoff time.Duration `yaml:"backoff,omitempty"`
BufferSize int `yaml:"buffer_size,omitempty"`
}
// RegionConfig lists available regions and their latitude/longitude, so the selector would prefer
@@ -723,6 +726,13 @@ func GenerateCLIFlags(existingFlags []cli.Flag, hidden bool) ([]cli.Flag, error)
Usage: generatedCLIFlagUsage,
Hidden: hidden,
}
case reflect.Uint64:
flag = &cli.Uint64Flag{
Name: name,
EnvVars: []string{envVar},
Usage: generatedCLIFlagUsage,
Hidden: hidden,
}
case reflect.Float32:
flag = &cli.Float64Flag{
Name: name,
@@ -784,7 +794,7 @@ func (conf *Config) updateFromCLI(c *cli.Context, baseFlags []cli.Flag) error {
configValue.SetString(c.String(flagName))
case reflect.Int, reflect.Int32, reflect.Int64:
configValue.SetInt(c.Int64(flagName))
case reflect.Uint8, reflect.Uint16, reflect.Uint32:
case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
configValue.SetUint(c.Uint64(flagName))
case reflect.Float32:
configValue.SetFloat(c.Float64(flagName))
+1
View File
@@ -23,6 +23,7 @@ var (
ErrLimitExceeded = errors.New("node has exceeded its configured limit")
ErrAlreadyJoined = errors.New("a participant with the same identity is already in the room")
ErrDataChannelUnavailable = errors.New("data channel is not available")
ErrDataChannelBufferFull = errors.New("data channel buffer is full")
ErrTransportFailure = errors.New("transport failure")
ErrEmptyIdentity = errors.New("participant identity cannot be empty")
ErrEmptyParticipantID = errors.New("participant ID cannot be empty")
+19 -17
View File
@@ -118,6 +118,7 @@ type ParticipantParams struct {
ReconnectOnPublicationError bool
ReconnectOnSubscriptionError bool
ReconnectOnDataChannelError bool
DataChannelMaxBufferedAmount uint64
VersionGenerator utils.TimedVersionGenerator
TrackResolver types.MediaTrackResolver
DisableDynacast bool
@@ -1103,22 +1104,23 @@ func (p *ParticipantImpl) setupTransportManager() error {
SID: p.params.SID,
// primary connection does not change, canSubscribe can change if permission was updated
// after the participant has joined
SubscriberAsPrimary: p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe(),
Config: p.params.Config,
ProtocolVersion: p.params.ProtocolVersion,
Telemetry: p.params.Telemetry,
CongestionControlConfig: p.params.CongestionControlConfig,
EnabledCodecs: p.params.EnabledCodecs,
SimTracks: p.params.SimTracks,
ClientConf: p.params.ClientConf,
ClientInfo: p.params.ClientInfo,
Migration: p.params.Migration,
AllowTCPFallback: p.params.AllowTCPFallback,
TCPFallbackRTTThreshold: p.params.TCPFallbackRTTThreshold,
AllowUDPUnstableFallback: p.params.AllowUDPUnstableFallback,
TURNSEnabled: p.params.TURNSEnabled,
AllowPlayoutDelay: p.params.PlayoutDelay.GetEnabled(),
Logger: p.params.Logger.WithComponent(sutils.ComponentTransport),
SubscriberAsPrimary: p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe(),
Config: p.params.Config,
ProtocolVersion: p.params.ProtocolVersion,
Telemetry: p.params.Telemetry,
CongestionControlConfig: p.params.CongestionControlConfig,
EnabledCodecs: p.params.EnabledCodecs,
SimTracks: p.params.SimTracks,
ClientConf: p.params.ClientConf,
ClientInfo: p.params.ClientInfo,
Migration: p.params.Migration,
AllowTCPFallback: p.params.AllowTCPFallback,
TCPFallbackRTTThreshold: p.params.TCPFallbackRTTThreshold,
AllowUDPUnstableFallback: p.params.AllowUDPUnstableFallback,
TURNSEnabled: p.params.TURNSEnabled,
AllowPlayoutDelay: p.params.PlayoutDelay.GetEnabled(),
DataChannelMaxBufferedAmount: p.params.DataChannelMaxBufferedAmount,
Logger: p.params.Logger.WithComponent(sutils.ComponentTransport),
}
if p.params.SyncStreams && p.params.PlayoutDelay.GetEnabled() && p.params.ClientInfo.isFirefox() {
// we will disable playout delay for Firefox if the user is expecting
@@ -2303,7 +2305,7 @@ func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket, data []byte) er
err := p.TransportManager.SendDataPacket(dp, data)
if err != nil {
if (errors.Is(err, sctp.ErrStreamClosed) || errors.Is(err, io.ErrClosedPipe)) && p.params.ReconnectOnDataChannelError {
p.params.Logger.Infow("issuing full reconnect on data channel error")
p.params.Logger.Infow("issuing full reconnect on data channel error", "error", err)
p.IssueFullReconnect(types.ParticipantCloseReasonDataChannelError)
}
} else {
+1 -1
View File
@@ -1354,7 +1354,7 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp
utils.ParallelExec(destParticipants, dataForwardLoadBalanceThreshold, 1, func(op types.LocalParticipant) {
err := op.SendDataPacket(dp, dpData)
if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, sctp.ErrStreamClosed) &&
!errors.Is(err, ErrTransportFailure) {
!errors.Is(err, ErrTransportFailure) && !errors.Is(err, ErrDataChannelBufferFull) {
op.GetLogger().Infow("send data packet error", "error", err)
}
})
+19 -14
View File
@@ -242,20 +242,21 @@ type PCTransport struct {
}
type TransportParams struct {
ParticipantID livekit.ParticipantID
ParticipantIdentity livekit.ParticipantIdentity
ProtocolVersion types.ProtocolVersion
Config *WebRTCConfig
DirectionConfig DirectionConfig
CongestionControlConfig config.CongestionControlConfig
Telemetry telemetry.TelemetryService
EnabledCodecs []*livekit.Codec
Logger logger.Logger
SimTracks map[uint32]SimulcastTrackInfo
ClientInfo ClientInfo
IsOfferer bool
IsSendSide bool
AllowPlayoutDelay bool
ParticipantID livekit.ParticipantID
ParticipantIdentity livekit.ParticipantIdentity
ProtocolVersion types.ProtocolVersion
Config *WebRTCConfig
DirectionConfig DirectionConfig
CongestionControlConfig config.CongestionControlConfig
Telemetry telemetry.TelemetryService
EnabledCodecs []*livekit.Codec
Logger logger.Logger
SimTracks map[uint32]SimulcastTrackInfo
ClientInfo ClientInfo
IsOfferer bool
IsSendSide bool
AllowPlayoutDelay bool
DataChannelMaxBufferedAmount uint64
}
func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimator cc.BandwidthEstimator)) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) {
@@ -930,6 +931,10 @@ func (t *PCTransport) SendDataPacket(dp *livekit.DataPacket, data []byte) error
return ErrTransportFailure
}
if t.params.DataChannelMaxBufferedAmount > 0 && dc.BufferedAmount() > t.params.DataChannelMaxBufferedAmount {
return ErrDataChannelBufferFull
}
return dc.Send(data)
}
+33 -31
View File
@@ -49,24 +49,25 @@ const (
)
type TransportManagerParams struct {
Identity livekit.ParticipantIdentity
SID livekit.ParticipantID
SubscriberAsPrimary bool
Config *WebRTCConfig
ProtocolVersion types.ProtocolVersion
Telemetry telemetry.TelemetryService
CongestionControlConfig config.CongestionControlConfig
EnabledCodecs []*livekit.Codec
SimTracks map[uint32]SimulcastTrackInfo
ClientConf *livekit.ClientConfiguration
ClientInfo ClientInfo
Migration bool
AllowTCPFallback bool
TCPFallbackRTTThreshold int
AllowUDPUnstableFallback bool
TURNSEnabled bool
AllowPlayoutDelay bool
Logger logger.Logger
Identity livekit.ParticipantIdentity
SID livekit.ParticipantID
SubscriberAsPrimary bool
Config *WebRTCConfig
ProtocolVersion types.ProtocolVersion
Telemetry telemetry.TelemetryService
CongestionControlConfig config.CongestionControlConfig
EnabledCodecs []*livekit.Codec
SimTracks map[uint32]SimulcastTrackInfo
ClientConf *livekit.ClientConfiguration
ClientInfo ClientInfo
Migration bool
AllowTCPFallback bool
TCPFallbackRTTThreshold int
AllowUDPUnstableFallback bool
TURNSEnabled bool
AllowPlayoutDelay bool
DataChannelMaxBufferedAmount uint64
Logger logger.Logger
}
type TransportManager struct {
@@ -173,19 +174,20 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro
})
subscriber, err := NewPCTransport(TransportParams{
ParticipantID: params.SID,
ParticipantIdentity: params.Identity,
ProtocolVersion: params.ProtocolVersion,
Config: params.Config,
DirectionConfig: params.Config.Subscriber,
CongestionControlConfig: params.CongestionControlConfig,
Telemetry: params.Telemetry,
EnabledCodecs: subscribeCodecs,
Logger: LoggerWithPCTarget(params.Logger, livekit.SignalTarget_SUBSCRIBER),
ClientInfo: params.ClientInfo,
IsOfferer: true,
IsSendSide: true,
AllowPlayoutDelay: params.AllowPlayoutDelay,
ParticipantID: params.SID,
ParticipantIdentity: params.Identity,
ProtocolVersion: params.ProtocolVersion,
Config: params.Config,
DirectionConfig: params.Config.Subscriber,
CongestionControlConfig: params.CongestionControlConfig,
Telemetry: params.Telemetry,
EnabledCodecs: subscribeCodecs,
Logger: LoggerWithPCTarget(params.Logger, livekit.SignalTarget_SUBSCRIBER),
ClientInfo: params.ClientInfo,
IsOfferer: true,
IsSendSide: true,
AllowPlayoutDelay: params.AllowPlayoutDelay,
DataChannelMaxBufferedAmount: params.DataChannelMaxBufferedAmount,
})
if err != nil {
return nil, err
+1
View File
@@ -409,6 +409,7 @@ func (r *RoomManager) StartSession(
ReconnectOnPublicationError: reconnectOnPublicationError,
ReconnectOnSubscriptionError: reconnectOnSubscriptionError,
ReconnectOnDataChannelError: reconnectOnDataChannelError,
DataChannelMaxBufferedAmount: r.config.RTC.DataChannelMaxBufferedAmount,
VersionGenerator: r.versionGenerator,
TrackResolver: room.ResolveMediaTrackForSubscriber,
SubscriberAllowPause: subscriberAllowPause,
+7 -7
View File
@@ -151,7 +151,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
return nil, twirpAuthError(err)
}
if s.psrpcConf.Enable {
if s.psrpcConf.Enabled {
return s.roomClient.DeleteRoom(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
}
@@ -228,7 +228,7 @@ func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomPa
return nil, twirp.NotFoundError("participant not found")
}
if s.psrpcConf.Enable {
if s.psrpcConf.Enabled {
return s.roomClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
}
@@ -264,7 +264,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR
return nil, twirpAuthError(err)
}
if s.psrpcConf.Enable {
if s.psrpcConf.Enabled {
return s.roomClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
}
@@ -318,7 +318,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update
return nil, twirpAuthError(err)
}
if s.psrpcConf.Enable {
if s.psrpcConf.Enabled {
return s.roomClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
}
@@ -367,7 +367,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda
return nil, twirpAuthError(err)
}
if s.psrpcConf.Enable {
if s.psrpcConf.Enabled {
return s.roomClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
}
@@ -390,7 +390,7 @@ func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest
return nil, twirpAuthError(err)
}
if s.psrpcConf.Enable {
if s.psrpcConf.Enabled {
return s.roomClient.SendData(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
}
@@ -432,7 +432,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
return nil, err
}
if s.psrpcConf.Enable {
if s.psrpcConf.Enabled {
_, err := s.roomClient.UpdateRoomMetadata(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
if err != nil {
return nil, err