split room and participant services (#2205)

* merge

* tidy

* deps
This commit is contained in:
Paul Wells
2023-10-31 21:28:21 -07:00
committed by GitHub
parent 33a629065d
commit de7532b5a1
11 changed files with 136 additions and 173 deletions
+2 -3
View File
@@ -197,10 +197,9 @@ keys:
# stream_buffer_size: 1000
# PSRPC
# since v1.5.1, a more reliable, psrpc based signal relay is available
# this gives us the ability to reliably proxy messages between a signal server and RTC node
# since v1.5.1, a more reliable, psrpc based internal rpc
# psrpc:
# # enable the internal psrpc api client for roomservice api calls
# # enable the psrpc internal api client for roomservice calls
# enabled: true
# # maximum number of rpc attempts
# max_attempts: 3
+4 -4
View File
@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e
github.com/livekit/protocol v1.8.2-0.20231026194854-da745471e848
github.com/livekit/protocol v1.8.2-0.20231101040827-02a4a42603b1
github.com/livekit/psrpc v0.5.0
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
@@ -37,7 +37,7 @@ require (
github.com/pion/webrtc/v3 v3.2.21
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/redis/go-redis/v9 v9.2.1
github.com/redis/go-redis/v9 v9.3.0
github.com/rs/cors v1.10.1
github.com/stretchr/testify v1.8.4
github.com/thoas/go-funk v0.9.3
@@ -61,7 +61,7 @@ require (
github.com/eapache/channels v1.1.0 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/go-jose/go-jose/v3 v3.0.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/subcommands v1.2.0 // indirect
@@ -101,7 +101,7 @@ require (
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
google.golang.org/grpc v1.59.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+8 -8
View File
@@ -42,8 +42,8 @@ github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44
github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc=
github.com/go-jose/go-jose/v3 v3.0.0 h1:s6rrhirfEP/CGIoc6p+PZAeogN2SxKav6Wp7+dyMWVo=
github.com/go-jose/go-jose/v3 v3.0.0/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
@@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.8.2-0.20231026194854-da745471e848 h1:UtItjtQRAu5mryPebr/ewMlAGUNDNEJJNh6kQ/lkfNc=
github.com/livekit/protocol v1.8.2-0.20231026194854-da745471e848/go.mod h1:l2WjlZWErS6vBlQaQyCGwWLt1aOx10XfQTsmvLjJWFQ=
github.com/livekit/protocol v1.8.2-0.20231101040827-02a4a42603b1 h1:WPWxU9w5XHAsonxnSSIIXbWMty9b5uHnTnyKS9TpaXM=
github.com/livekit/protocol v1.8.2-0.20231101040827-02a4a42603b1/go.mod h1:l2WjlZWErS6vBlQaQyCGwWLt1aOx10XfQTsmvLjJWFQ=
github.com/livekit/psrpc v0.5.0 h1:g+yYNSs6Y1/vM7UlFkB2s/ARe2y3RKWZhX8ata5j+eo=
github.com/livekit/psrpc v0.5.0/go.mod h1:1XYH1LLoD/YbvBvt6xg2KQ/J3InLXSJK6PL/+DKmuAU=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
@@ -234,8 +234,8 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg=
github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0=
github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo=
@@ -412,8 +412,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1:AB/lmRny7e2pLhFEYIbl5qkDAUt2h0ZRO4wGPhZf+ik=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405/go.mod h1:67X1fPuzjcrkymZzZV1vvkFeTn2Rvc6lYF9MYFGCcwE=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+4 -16
View File
@@ -30,6 +30,7 @@ import (
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
"github.com/livekit/protocol/logger"
redisLiveKit "github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
)
type CongestionControlProbeMode string
@@ -71,7 +72,7 @@ type Config struct {
Keys map[string]string `yaml:"keys,omitempty"`
Region string `yaml:"region,omitempty"`
SignalRelay SignalRelayConfig `yaml:"signal_relay,omitempty"`
PSRPC PSRPCConfig `yaml:"psrpc,omitempty"`
PSRPC rpc.PSRPCConfig `yaml:"psrpc,omitempty"`
// LogLevel is deprecated
LogLevel string `yaml:"log_level,omitempty"`
Logging LoggingConfig `yaml:"logging,omitempty"`
@@ -273,14 +274,6 @@ type SignalRelayConfig struct {
StreamBufferSize int `yaml:"stream_buffer_size,omitempty"`
}
type PSRPCConfig struct {
Enabled bool `yaml:"enabled,omitempty"`
MaxAttempts int `yaml:"max_attempts,omitempty"`
Timeout time.Duration `yaml:"timeout,omitempty"`
Backoff time.Duration `yaml:"backoff,omitempty"`
BufferSize int `yaml:"buffer_size,omitempty"`
}
// RegionConfig lists available regions and their latitude/longitude, so the selector would prefer
// regions that are closer
type RegionConfig struct {
@@ -496,13 +489,8 @@ var DefaultConfig = Config{
MaxRetryInterval: 4 * time.Second,
StreamBufferSize: 1000,
},
PSRPC: PSRPCConfig{
MaxAttempts: 3,
Timeout: 500 * time.Millisecond,
Backoff: 500 * time.Millisecond,
BufferSize: 1000,
},
Keys: map[string]string{},
PSRPC: rpc.DefaultPSRPCConfig,
Keys: map[string]string{},
}
func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []cli.Flag) (*Config, error) {
-25
View File
@@ -1,25 +0,0 @@
package routing
import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/logger"
protopsrpc "github.com/livekit/protocol/psrpc"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"
"github.com/livekit/psrpc/pkg/middleware"
)
func NewRoomClient(bus psrpc.MessageBus, config config.PSRPCConfig) (rpc.TypedRoomClient, error) {
return rpc.NewTypedRoomClient(
bus,
protopsrpc.WithClientLogger(logger.GetLogger()),
middleware.WithClientMetrics(prometheus.PSRPCMetricsObserver{}),
psrpc.WithClientChannelSize(config.BufferSize),
middleware.WithRPCRetries(middleware.RetryOptions{
MaxAttempts: config.MaxAttempts,
Timeout: config.Timeout,
Backoff: config.Backoff,
}),
)
}
-22
View File
@@ -1,22 +0,0 @@
package routing
import (
"context"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
)
type topicFormatter struct{}
func NewTopicFormatter() rpc.TopicFormatter {
return topicFormatter{}
}
func (f topicFormatter) ParticipantTopic(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) rpc.ParticipantTopic {
return rpc.FormatParticipantTopic(roomName, identity)
}
func (f topicFormatter) RoomTopic(ctx context.Context, roomName livekit.RoomName) rpc.RoomTopic {
return rpc.FormatRoomTopic(roomName)
}
+62 -62
View File
@@ -49,6 +49,8 @@ const (
iceConfigTTL = 5 * time.Minute
)
var affinityEpoch = time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC)
type iceConfigCacheEntry struct {
iceConfig *livekit.ICEConfig
modifiedAt time.Time
@@ -70,10 +72,13 @@ type RoomManager struct {
egressLauncher rtc.EgressLauncher
versionGenerator utils.TimedVersionGenerator
turnAuthHandler *TURNAuthHandler
roomServer rpc.TypedRoomServer
bus psrpc.MessageBus
rooms map[livekit.RoomName]*rtc.Room
roomServers utils.MultitonService[rpc.RoomTopic]
participantServers utils.MultitonService[rpc.ParticipantTopic]
iceConfigCache map[livekit.ParticipantIdentity]*iceConfigCacheEntry
}
@@ -105,6 +110,7 @@ func NewLocalRoomManager(
egressLauncher: egressLauncher,
versionGenerator: versionGenerator,
turnAuthHandler: turnAuthHandler,
bus: bus,
rooms: make(map[livekit.RoomName]*rtc.Room),
@@ -119,11 +125,6 @@ func NewLocalRoomManager(
},
}
r.roomServer, err = rpc.NewTypedRoomServer(r, bus)
if err != nil {
return nil, err
}
// hook up to router
router.OnNewParticipantRTC(r.StartSession)
router.OnRTCMessage(r.handleRTCMessage)
@@ -220,7 +221,8 @@ func (r *RoomManager) Stop() {
room.Close()
}
r.roomServer.Kill()
r.roomServers.Kill()
r.participantServers.Kill()
if r.rtcConfig != nil {
if r.rtcConfig.UDPMux != nil {
@@ -433,13 +435,17 @@ func (r *RoomManager) StartSession(
_ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false)
return err
}
if r.config.PSRPC.Enabled {
if err := r.roomServer.RegisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())); err != nil {
pLogger.Errorw("could not join register participant topic", err)
_ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false)
return err
}
participantTopic := rpc.FormatParticipantTopic(roomName, participant.Identity())
participantServer := utils.Must(rpc.NewTypedParticipantServer(r, r.bus))
killParticipantServer := r.participantServers.Replace(participantTopic, participantServer)
if err := participantServer.RegisterAllParticipantTopics(participantTopic); err != nil {
killParticipantServer()
pLogger.Errorw("could not join register participant topic", err)
_ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false)
return err
}
if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil {
pLogger.Errorw("could not store participant", err)
}
@@ -459,14 +465,12 @@ func (r *RoomManager) StartSession(
clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region, Node: r.currentNode.Id}
r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta, true)
participant.OnClose(func(p types.LocalParticipant) {
killParticipantServer()
if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil {
pLogger.Errorw("could not delete participant", err)
}
if r.config.PSRPC.Enabled {
r.roomServer.DeregisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity()))
}
// update room store with new numParticipants
proto := room.ToProto()
persistRoomForParticipantCount(proto)
@@ -507,12 +511,6 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
return nil, err
}
if r.config.PSRPC.Enabled {
if err := r.roomServer.RegisterAllRoomTopics(rpc.FormatRoomTopic(roomName)); err != nil {
return nil, err
}
}
r.lock.Lock()
currentRoom := r.rooms[roomName]
@@ -530,10 +528,17 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
// construct ice servers
newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry, r.egressLauncher)
roomTopic := rpc.FormatRoomTopic(roomName)
roomServer := utils.Must(rpc.NewTypedRoomServer(r, r.bus))
killRoomServer := r.roomServers.Replace(roomTopic, roomServer)
if err := roomServer.RegisterAllRoomTopics(roomTopic); err != nil {
killRoomServer()
r.lock.Unlock()
return nil, err
}
newRoom.OnClose(func() {
if r.config.PSRPC.Enabled {
r.roomServer.DeregisterAllRoomTopics(rpc.FormatRoomTopic(roomName))
}
killRoomServer()
roomInfo := newRoom.ToProto()
r.telemetry.RoomEnded(ctx, roomInfo)
@@ -648,19 +653,29 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.Roo
}
}
func (r *RoomManager) roomLogger(room *rtc.Room) logger.Logger {
return rtc.LoggerWithParticipant(rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()), "", "", false)
type participantReq interface {
GetRoom() string
GetIdentity() string
}
func (r *RoomManager) roomAndParticipantForReq(ctx context.Context, req participantReq) (*rtc.Room, types.LocalParticipant, error) {
room := r.GetRoom(ctx, livekit.RoomName(req.GetRoom()))
if room == nil {
return nil, nil, ErrRoomNotFound
}
participant := room.GetParticipant(livekit.ParticipantIdentity(req.GetIdentity()))
if participant == nil {
return nil, nil, ErrParticipantNotFound
}
return room, participant, nil
}
func (r *RoomManager) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.RemoveParticipantResponse, error) {
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity))
if participant == nil {
return nil, ErrParticipantNotFound
room, participant, err := r.roomAndParticipantForReq(ctx, req)
if err != nil {
return nil, err
}
participant.GetLogger().Infow("removing participant")
@@ -669,14 +684,9 @@ func (r *RoomManager) RemoveParticipant(ctx context.Context, req *livekit.RoomPa
}
func (r *RoomManager) MutePublishedTrack(ctx context.Context, req *livekit.MuteRoomTrackRequest) (*livekit.MuteRoomTrackResponse, error) {
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity))
if participant == nil {
return nil, ErrParticipantNotFound
_, participant, err := r.roomAndParticipantForReq(ctx, req)
if err != nil {
return nil, err
}
participant.GetLogger().Debugw("setting track muted",
@@ -690,14 +700,9 @@ func (r *RoomManager) MutePublishedTrack(ctx context.Context, req *livekit.MuteR
}
func (r *RoomManager) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) {
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity))
if participant == nil {
return nil, ErrParticipantNotFound
room, participant, err := r.roomAndParticipantForReq(ctx, req)
if err != nil {
return nil, err
}
participant.GetLogger().Debugw("updating participant",
@@ -730,14 +735,9 @@ func (r *RoomManager) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
}
func (r *RoomManager) UpdateSubscriptions(ctx context.Context, req *livekit.UpdateSubscriptionsRequest) (*livekit.UpdateSubscriptionsResponse, error) {
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity))
if participant == nil {
return nil, ErrParticipantNotFound
room, participant, err := r.roomAndParticipantForReq(ctx, req)
if err != nil {
return nil, err
}
participant.GetLogger().Debugw("updating participant subscriptions")
@@ -756,7 +756,7 @@ func (r *RoomManager) SendData(ctx context.Context, req *livekit.SendDataRequest
return nil, ErrRoomNotFound
}
r.roomLogger(room).Debugw("api send data", "size", len(req.Data))
room.Logger.Debugw("api send data", "size", len(req.Data))
up := &livekit.UserPacket{
Payload: req.Data,
DestinationSids: req.DestinationSids,
@@ -773,7 +773,7 @@ func (r *RoomManager) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
return nil, ErrRoomNotFound
}
r.roomLogger(room).Debugw("updating room")
room.Logger.Debugw("updating room")
room.SetMetadata(req.Metadata)
return room.ToProto(), nil
}
+26 -23
View File
@@ -35,38 +35,41 @@ import (
// A rooms service that supports a single node
type RoomService struct {
roomConf config.RoomConfig
apiConf config.APIConfig
psrpcConf config.PSRPCConfig
router routing.MessageRouter
roomAllocator RoomAllocator
roomStore ServiceStore
egressLauncher rtc.EgressLauncher
topicFormatter rpc.TopicFormatter
roomClient rpc.TypedRoomClient
roomConf config.RoomConfig
apiConf config.APIConfig
psrpcConf rpc.PSRPCConfig
router routing.MessageRouter
roomAllocator RoomAllocator
roomStore ServiceStore
egressLauncher rtc.EgressLauncher
topicFormatter rpc.TopicFormatter
roomClient rpc.TypedRoomClient
participantClient rpc.TypedParticipantClient
}
func NewRoomService(
roomConf config.RoomConfig,
apiConf config.APIConfig,
psrpcConf config.PSRPCConfig,
psrpcConf rpc.PSRPCConfig,
router routing.MessageRouter,
roomAllocator RoomAllocator,
serviceStore ServiceStore,
egressLauncher rtc.EgressLauncher,
topicFormatter rpc.TopicFormatter,
roomClient rpc.TypedRoomClient,
participantClient rpc.TypedParticipantClient,
) (svc *RoomService, err error) {
svc = &RoomService{
roomConf: roomConf,
apiConf: apiConf,
psrpcConf: psrpcConf,
router: router,
roomAllocator: roomAllocator,
roomStore: serviceStore,
egressLauncher: egressLauncher,
topicFormatter: topicFormatter,
roomClient: roomClient,
roomConf: roomConf,
apiConf: apiConf,
psrpcConf: psrpcConf,
router: router,
roomAllocator: roomAllocator,
roomStore: serviceStore,
egressLauncher: egressLauncher,
topicFormatter: topicFormatter,
roomClient: roomClient,
participantClient: participantClient,
}
return
}
@@ -229,7 +232,7 @@ func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomPa
}
if s.psrpcConf.Enabled {
return s.roomClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
return s.participantClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
}
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
@@ -265,7 +268,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR
}
if s.psrpcConf.Enabled {
return s.roomClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
return s.participantClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
}
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
@@ -319,7 +322,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update
}
if s.psrpcConf.Enabled {
return s.roomClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
return s.participantClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
}
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
@@ -368,7 +371,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda
}
if s.psrpcConf.Enabled {
return s.roomClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
return s.participantClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
}
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
+4 -3
View File
@@ -23,10 +23,10 @@ import (
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/rpc/rpcfakes"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/routing/routingfakes"
"github.com/livekit/livekit-server/pkg/service"
"github.com/livekit/livekit-server/pkg/service/servicefakes"
@@ -131,13 +131,14 @@ func newTestRoomService(conf config.RoomConfig) *TestRoomService {
svc, err := service.NewRoomService(
conf,
config.APIConfig{ExecutionTimeout: 2},
config.PSRPCConfig{},
rpc.PSRPCConfig{},
router,
allocator,
store,
nil,
routing.NewTopicFormatter(),
rpc.NewTopicFormatter(),
&rpcfakes.FakeTypedRoomClient{},
&rpcfakes.FakeTypedParticipantClient{},
)
if err != nil {
panic(err)
+11 -3
View File
@@ -31,8 +31,10 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
redisLiveKit "github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
@@ -74,8 +76,10 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
NewDefaultSignalServer,
routing.NewSignalClient,
getPSRPCConfig,
routing.NewTopicFormatter,
routing.NewRoomClient,
getPSRPCClientParams,
rpc.NewTopicFormatter,
rpc.NewTypedRoomClient,
rpc.NewTypedParticipantClient,
NewLocalRoomManager,
NewTURNAuthHandler,
getTURNAuthHandlerFunc,
@@ -200,10 +204,14 @@ func getSignalRelayConfig(config *config.Config) config.SignalRelayConfig {
return config.SignalRelay
}
func getPSRPCConfig(config *config.Config) config.PSRPCConfig {
func getPSRPCConfig(config *config.Config) rpc.PSRPCConfig {
return config.PSRPC
}
func getPSRPCClientParams(config rpc.PSRPCConfig, bus psrpc.MessageBus) rpc.ClientParams {
return rpc.NewClientParams(config, bus, logger.GetLogger(), prometheus.PSRPCMetricsObserver{})
}
func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) {
return NewTurnServer(conf, authHandler, false)
}
+15 -4
View File
@@ -12,8 +12,10 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
redis2 "github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
@@ -74,12 +76,17 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
return nil, err
}
rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService)
topicFormatter := routing.NewTopicFormatter()
roomClient, err := routing.NewRoomClient(messageBus, psrpcConfig)
topicFormatter := rpc.NewTopicFormatter()
clientParams := getPSRPCClientParams(psrpcConfig, messageBus)
roomClient, err := rpc.NewTypedRoomClient(clientParams)
if err != nil {
return nil, err
}
roomService, err := NewRoomService(roomConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient)
participantClient, err := rpc.NewTypedParticipantClient(clientParams)
if err != nil {
return nil, err
}
roomService, err := NewRoomService(roomConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
if err != nil {
return nil, err
}
@@ -233,10 +240,14 @@ func getSignalRelayConfig(config2 *config.Config) config.SignalRelayConfig {
return config2.SignalRelay
}
func getPSRPCConfig(config2 *config.Config) config.PSRPCConfig {
func getPSRPCConfig(config2 *config.Config) rpc.PSRPCConfig {
return config2.PSRPC
}
func getPSRPCClientParams(config2 rpc.PSRPCConfig, bus psrpc.MessageBus) rpc.ClientParams {
return rpc.NewClientParams(config2, bus, logger.GetLogger(), prometheus.PSRPCMetricsObserver{})
}
func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) {
return NewTurnServer(conf, authHandler, false)
}