From de7532b5a1717c18262843eb558f5ae5cd8b0989 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Tue, 31 Oct 2023 21:28:21 -0700 Subject: [PATCH] split room and participant services (#2205) * merge * tidy * deps --- config-sample.yaml | 5 +- go.mod | 8 +-- go.sum | 16 ++--- pkg/config/config.go | 20 ++---- pkg/routing/roomclient.go | 25 ------- pkg/routing/topic.go | 22 ------ pkg/service/roommanager.go | 124 ++++++++++++++++---------------- pkg/service/roomservice.go | 49 +++++++------ pkg/service/roomservice_test.go | 7 +- pkg/service/wire.go | 14 +++- pkg/service/wire_gen.go | 19 +++-- 11 files changed, 136 insertions(+), 173 deletions(-) delete mode 100644 pkg/routing/roomclient.go delete mode 100644 pkg/routing/topic.go diff --git a/config-sample.yaml b/config-sample.yaml index 4f9fb3a1a..4a70f2ae8 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -197,10 +197,9 @@ keys: # stream_buffer_size: 1000 # PSRPC -# since v1.5.1, a more reliable, psrpc based signal relay is available -# this gives us the ability to reliably proxy messages between a signal server and RTC node +# since v1.5.1, a more reliable, psrpc based internal rpc # psrpc: -# # enable the internal psrpc api client for roomservice api calls +# # enable the psrpc internal api client for roomservice calls # enabled: true # # maximum number of rpc attempts # max_attempts: 3 diff --git a/go.mod b/go.mod index 9c351b5cd..5df1d6509 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.8.2-0.20231026194854-da745471e848 + github.com/livekit/protocol v1.8.2-0.20231101040827-02a4a42603b1 github.com/livekit/psrpc v0.5.0 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -37,7 +37,7 @@ require ( github.com/pion/webrtc/v3 v3.2.21 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.17.0 - github.com/redis/go-redis/v9 v9.2.1 + github.com/redis/go-redis/v9 v9.3.0 github.com/rs/cors v1.10.1 github.com/stretchr/testify v1.8.4 github.com/thoas/go-funk v0.9.3 @@ -61,7 +61,7 @@ require ( github.com/eapache/channels v1.1.0 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/go-jose/go-jose/v3 v3.0.0 // indirect - github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/logr v1.3.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/google/subcommands v1.2.0 // indirect @@ -101,7 +101,7 @@ require ( golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/tools v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect google.golang.org/grpc v1.59.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 102ebe137..4aedde2df 100644 --- a/go.sum +++ b/go.sum @@ -42,8 +42,8 @@ github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44 github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc= github.com/go-jose/go-jose/v3 v3.0.0 h1:s6rrhirfEP/CGIoc6p+PZAeogN2SxKav6Wp7+dyMWVo= github.com/go-jose/go-jose/v3 v3.0.0/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= -github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.8.2-0.20231026194854-da745471e848 h1:UtItjtQRAu5mryPebr/ewMlAGUNDNEJJNh6kQ/lkfNc= -github.com/livekit/protocol v1.8.2-0.20231026194854-da745471e848/go.mod h1:l2WjlZWErS6vBlQaQyCGwWLt1aOx10XfQTsmvLjJWFQ= +github.com/livekit/protocol v1.8.2-0.20231101040827-02a4a42603b1 h1:WPWxU9w5XHAsonxnSSIIXbWMty9b5uHnTnyKS9TpaXM= +github.com/livekit/protocol v1.8.2-0.20231101040827-02a4a42603b1/go.mod h1:l2WjlZWErS6vBlQaQyCGwWLt1aOx10XfQTsmvLjJWFQ= github.com/livekit/psrpc v0.5.0 h1:g+yYNSs6Y1/vM7UlFkB2s/ARe2y3RKWZhX8ata5j+eo= github.com/livekit/psrpc v0.5.0/go.mod h1:1XYH1LLoD/YbvBvt6xg2KQ/J3InLXSJK6PL/+DKmuAU= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -234,8 +234,8 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= -github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg= -github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0= +github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo= @@ -412,8 +412,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1:AB/lmRny7e2pLhFEYIbl5qkDAUt2h0ZRO4wGPhZf+ik= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405/go.mod h1:67X1fPuzjcrkymZzZV1vvkFeTn2Rvc6lYF9MYFGCcwE= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/config/config.go b/pkg/config/config.go index 13d780d81..a87caf55b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -30,6 +30,7 @@ import ( "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/logger" redisLiveKit "github.com/livekit/protocol/redis" + "github.com/livekit/protocol/rpc" ) type CongestionControlProbeMode string @@ -71,7 +72,7 @@ type Config struct { Keys map[string]string `yaml:"keys,omitempty"` Region string `yaml:"region,omitempty"` SignalRelay SignalRelayConfig `yaml:"signal_relay,omitempty"` - PSRPC PSRPCConfig `yaml:"psrpc,omitempty"` + PSRPC rpc.PSRPCConfig `yaml:"psrpc,omitempty"` // LogLevel is deprecated LogLevel string `yaml:"log_level,omitempty"` Logging LoggingConfig `yaml:"logging,omitempty"` @@ -273,14 +274,6 @@ type SignalRelayConfig struct { StreamBufferSize int `yaml:"stream_buffer_size,omitempty"` } -type PSRPCConfig struct { - Enabled bool `yaml:"enabled,omitempty"` - MaxAttempts int `yaml:"max_attempts,omitempty"` - Timeout time.Duration `yaml:"timeout,omitempty"` - Backoff time.Duration `yaml:"backoff,omitempty"` - BufferSize int `yaml:"buffer_size,omitempty"` -} - // RegionConfig lists available regions and their latitude/longitude, so the selector would prefer // regions that are closer type RegionConfig struct { @@ -496,13 +489,8 @@ var DefaultConfig = Config{ MaxRetryInterval: 4 * time.Second, StreamBufferSize: 1000, }, - PSRPC: PSRPCConfig{ - MaxAttempts: 3, - Timeout: 500 * time.Millisecond, - Backoff: 500 * time.Millisecond, - BufferSize: 1000, - }, - Keys: map[string]string{}, + PSRPC: rpc.DefaultPSRPCConfig, + Keys: map[string]string{}, } func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []cli.Flag) (*Config, error) { diff --git a/pkg/routing/roomclient.go b/pkg/routing/roomclient.go deleted file mode 100644 index f5b6d2bf4..000000000 --- a/pkg/routing/roomclient.go +++ /dev/null @@ -1,25 +0,0 @@ -package routing - -import ( - "github.com/livekit/livekit-server/pkg/config" - "github.com/livekit/livekit-server/pkg/telemetry/prometheus" - "github.com/livekit/protocol/logger" - protopsrpc "github.com/livekit/protocol/psrpc" - "github.com/livekit/protocol/rpc" - "github.com/livekit/psrpc" - "github.com/livekit/psrpc/pkg/middleware" -) - -func NewRoomClient(bus psrpc.MessageBus, config config.PSRPCConfig) (rpc.TypedRoomClient, error) { - return rpc.NewTypedRoomClient( - bus, - protopsrpc.WithClientLogger(logger.GetLogger()), - middleware.WithClientMetrics(prometheus.PSRPCMetricsObserver{}), - psrpc.WithClientChannelSize(config.BufferSize), - middleware.WithRPCRetries(middleware.RetryOptions{ - MaxAttempts: config.MaxAttempts, - Timeout: config.Timeout, - Backoff: config.Backoff, - }), - ) -} diff --git a/pkg/routing/topic.go b/pkg/routing/topic.go deleted file mode 100644 index 24ebb0bba..000000000 --- a/pkg/routing/topic.go +++ /dev/null @@ -1,22 +0,0 @@ -package routing - -import ( - "context" - - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/rpc" -) - -type topicFormatter struct{} - -func NewTopicFormatter() rpc.TopicFormatter { - return topicFormatter{} -} - -func (f topicFormatter) ParticipantTopic(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) rpc.ParticipantTopic { - return rpc.FormatParticipantTopic(roomName, identity) -} - -func (f topicFormatter) RoomTopic(ctx context.Context, roomName livekit.RoomName) rpc.RoomTopic { - return rpc.FormatRoomTopic(roomName) -} diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index f37bfeee8..918e847bf 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -49,6 +49,8 @@ const ( iceConfigTTL = 5 * time.Minute ) +var affinityEpoch = time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC) + type iceConfigCacheEntry struct { iceConfig *livekit.ICEConfig modifiedAt time.Time @@ -70,10 +72,13 @@ type RoomManager struct { egressLauncher rtc.EgressLauncher versionGenerator utils.TimedVersionGenerator turnAuthHandler *TURNAuthHandler - roomServer rpc.TypedRoomServer + bus psrpc.MessageBus rooms map[livekit.RoomName]*rtc.Room + roomServers utils.MultitonService[rpc.RoomTopic] + participantServers utils.MultitonService[rpc.ParticipantTopic] + iceConfigCache map[livekit.ParticipantIdentity]*iceConfigCacheEntry } @@ -105,6 +110,7 @@ func NewLocalRoomManager( egressLauncher: egressLauncher, versionGenerator: versionGenerator, turnAuthHandler: turnAuthHandler, + bus: bus, rooms: make(map[livekit.RoomName]*rtc.Room), @@ -119,11 +125,6 @@ func NewLocalRoomManager( }, } - r.roomServer, err = rpc.NewTypedRoomServer(r, bus) - if err != nil { - return nil, err - } - // hook up to router router.OnNewParticipantRTC(r.StartSession) router.OnRTCMessage(r.handleRTCMessage) @@ -220,7 +221,8 @@ func (r *RoomManager) Stop() { room.Close() } - r.roomServer.Kill() + r.roomServers.Kill() + r.participantServers.Kill() if r.rtcConfig != nil { if r.rtcConfig.UDPMux != nil { @@ -433,13 +435,17 @@ func (r *RoomManager) StartSession( _ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false) return err } - if r.config.PSRPC.Enabled { - if err := r.roomServer.RegisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())); err != nil { - pLogger.Errorw("could not join register participant topic", err) - _ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false) - return err - } + + participantTopic := rpc.FormatParticipantTopic(roomName, participant.Identity()) + participantServer := utils.Must(rpc.NewTypedParticipantServer(r, r.bus)) + killParticipantServer := r.participantServers.Replace(participantTopic, participantServer) + if err := participantServer.RegisterAllParticipantTopics(participantTopic); err != nil { + killParticipantServer() + pLogger.Errorw("could not join register participant topic", err) + _ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false) + return err } + if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil { pLogger.Errorw("could not store participant", err) } @@ -459,14 +465,12 @@ func (r *RoomManager) StartSession( clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region, Node: r.currentNode.Id} r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta, true) participant.OnClose(func(p types.LocalParticipant) { + killParticipantServer() + if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil { pLogger.Errorw("could not delete participant", err) } - if r.config.PSRPC.Enabled { - r.roomServer.DeregisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())) - } - // update room store with new numParticipants proto := room.ToProto() persistRoomForParticipantCount(proto) @@ -507,12 +511,6 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room return nil, err } - if r.config.PSRPC.Enabled { - if err := r.roomServer.RegisterAllRoomTopics(rpc.FormatRoomTopic(roomName)); err != nil { - return nil, err - } - } - r.lock.Lock() currentRoom := r.rooms[roomName] @@ -530,10 +528,17 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room // construct ice servers newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry, r.egressLauncher) + roomTopic := rpc.FormatRoomTopic(roomName) + roomServer := utils.Must(rpc.NewTypedRoomServer(r, r.bus)) + killRoomServer := r.roomServers.Replace(roomTopic, roomServer) + if err := roomServer.RegisterAllRoomTopics(roomTopic); err != nil { + killRoomServer() + r.lock.Unlock() + return nil, err + } + newRoom.OnClose(func() { - if r.config.PSRPC.Enabled { - r.roomServer.DeregisterAllRoomTopics(rpc.FormatRoomTopic(roomName)) - } + killRoomServer() roomInfo := newRoom.ToProto() r.telemetry.RoomEnded(ctx, roomInfo) @@ -648,19 +653,29 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.Roo } } -func (r *RoomManager) roomLogger(room *rtc.Room) logger.Logger { - return rtc.LoggerWithParticipant(rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()), "", "", false) +type participantReq interface { + GetRoom() string + GetIdentity() string +} + +func (r *RoomManager) roomAndParticipantForReq(ctx context.Context, req participantReq) (*rtc.Room, types.LocalParticipant, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.GetRoom())) + if room == nil { + return nil, nil, ErrRoomNotFound + } + + participant := room.GetParticipant(livekit.ParticipantIdentity(req.GetIdentity())) + if participant == nil { + return nil, nil, ErrParticipantNotFound + } + + return room, participant, nil } func (r *RoomManager) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.RemoveParticipantResponse, error) { - room := r.GetRoom(ctx, livekit.RoomName(req.Room)) - if room == nil { - return nil, ErrRoomNotFound - } - - participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity)) - if participant == nil { - return nil, ErrParticipantNotFound + room, participant, err := r.roomAndParticipantForReq(ctx, req) + if err != nil { + return nil, err } participant.GetLogger().Infow("removing participant") @@ -669,14 +684,9 @@ func (r *RoomManager) RemoveParticipant(ctx context.Context, req *livekit.RoomPa } func (r *RoomManager) MutePublishedTrack(ctx context.Context, req *livekit.MuteRoomTrackRequest) (*livekit.MuteRoomTrackResponse, error) { - room := r.GetRoom(ctx, livekit.RoomName(req.Room)) - if room == nil { - return nil, ErrRoomNotFound - } - - participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity)) - if participant == nil { - return nil, ErrParticipantNotFound + _, participant, err := r.roomAndParticipantForReq(ctx, req) + if err != nil { + return nil, err } participant.GetLogger().Debugw("setting track muted", @@ -690,14 +700,9 @@ func (r *RoomManager) MutePublishedTrack(ctx context.Context, req *livekit.MuteR } func (r *RoomManager) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) { - room := r.GetRoom(ctx, livekit.RoomName(req.Room)) - if room == nil { - return nil, ErrRoomNotFound - } - - participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity)) - if participant == nil { - return nil, ErrParticipantNotFound + room, participant, err := r.roomAndParticipantForReq(ctx, req) + if err != nil { + return nil, err } participant.GetLogger().Debugw("updating participant", @@ -730,14 +735,9 @@ func (r *RoomManager) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq } func (r *RoomManager) UpdateSubscriptions(ctx context.Context, req *livekit.UpdateSubscriptionsRequest) (*livekit.UpdateSubscriptionsResponse, error) { - room := r.GetRoom(ctx, livekit.RoomName(req.Room)) - if room == nil { - return nil, ErrRoomNotFound - } - - participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity)) - if participant == nil { - return nil, ErrParticipantNotFound + room, participant, err := r.roomAndParticipantForReq(ctx, req) + if err != nil { + return nil, err } participant.GetLogger().Debugw("updating participant subscriptions") @@ -756,7 +756,7 @@ func (r *RoomManager) SendData(ctx context.Context, req *livekit.SendDataRequest return nil, ErrRoomNotFound } - r.roomLogger(room).Debugw("api send data", "size", len(req.Data)) + room.Logger.Debugw("api send data", "size", len(req.Data)) up := &livekit.UserPacket{ Payload: req.Data, DestinationSids: req.DestinationSids, @@ -773,7 +773,7 @@ func (r *RoomManager) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return nil, ErrRoomNotFound } - r.roomLogger(room).Debugw("updating room") + room.Logger.Debugw("updating room") room.SetMetadata(req.Metadata) return room.ToProto(), nil } diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index dd9cefebc..aa5e323d2 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -35,38 +35,41 @@ import ( // A rooms service that supports a single node type RoomService struct { - roomConf config.RoomConfig - apiConf config.APIConfig - psrpcConf config.PSRPCConfig - router routing.MessageRouter - roomAllocator RoomAllocator - roomStore ServiceStore - egressLauncher rtc.EgressLauncher - topicFormatter rpc.TopicFormatter - roomClient rpc.TypedRoomClient + roomConf config.RoomConfig + apiConf config.APIConfig + psrpcConf rpc.PSRPCConfig + router routing.MessageRouter + roomAllocator RoomAllocator + roomStore ServiceStore + egressLauncher rtc.EgressLauncher + topicFormatter rpc.TopicFormatter + roomClient rpc.TypedRoomClient + participantClient rpc.TypedParticipantClient } func NewRoomService( roomConf config.RoomConfig, apiConf config.APIConfig, - psrpcConf config.PSRPCConfig, + psrpcConf rpc.PSRPCConfig, router routing.MessageRouter, roomAllocator RoomAllocator, serviceStore ServiceStore, egressLauncher rtc.EgressLauncher, topicFormatter rpc.TopicFormatter, roomClient rpc.TypedRoomClient, + participantClient rpc.TypedParticipantClient, ) (svc *RoomService, err error) { svc = &RoomService{ - roomConf: roomConf, - apiConf: apiConf, - psrpcConf: psrpcConf, - router: router, - roomAllocator: roomAllocator, - roomStore: serviceStore, - egressLauncher: egressLauncher, - topicFormatter: topicFormatter, - roomClient: roomClient, + roomConf: roomConf, + apiConf: apiConf, + psrpcConf: psrpcConf, + router: router, + roomAllocator: roomAllocator, + roomStore: serviceStore, + egressLauncher: egressLauncher, + topicFormatter: topicFormatter, + roomClient: roomClient, + participantClient: participantClient, } return } @@ -229,7 +232,7 @@ func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomPa } if s.psrpcConf.Enabled { - return s.roomClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + return s.participantClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) } err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ @@ -265,7 +268,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR } if s.psrpcConf.Enabled { - return s.roomClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + return s.participantClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) } err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ @@ -319,7 +322,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update } if s.psrpcConf.Enabled { - return s.roomClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + return s.participantClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) } err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ @@ -368,7 +371,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda } if s.psrpcConf.Enabled { - return s.roomClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + return s.participantClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) } err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ diff --git a/pkg/service/roomservice_test.go b/pkg/service/roomservice_test.go index bf25870e7..f15a72cd7 100644 --- a/pkg/service/roomservice_test.go +++ b/pkg/service/roomservice_test.go @@ -23,10 +23,10 @@ import ( "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/rpc/rpcfakes" "github.com/livekit/livekit-server/pkg/config" - "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/routing/routingfakes" "github.com/livekit/livekit-server/pkg/service" "github.com/livekit/livekit-server/pkg/service/servicefakes" @@ -131,13 +131,14 @@ func newTestRoomService(conf config.RoomConfig) *TestRoomService { svc, err := service.NewRoomService( conf, config.APIConfig{ExecutionTimeout: 2}, - config.PSRPCConfig{}, + rpc.PSRPCConfig{}, router, allocator, store, nil, - routing.NewTopicFormatter(), + rpc.NewTopicFormatter(), &rpcfakes.FakeTypedRoomClient{}, + &rpcfakes.FakeTypedParticipantClient{}, ) if err != nil { panic(err) diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 0aa93088b..2e2b39592 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -31,8 +31,10 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" redisLiveKit "github.com/livekit/protocol/redis" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" @@ -74,8 +76,10 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live NewDefaultSignalServer, routing.NewSignalClient, getPSRPCConfig, - routing.NewTopicFormatter, - routing.NewRoomClient, + getPSRPCClientParams, + rpc.NewTopicFormatter, + rpc.NewTypedRoomClient, + rpc.NewTypedParticipantClient, NewLocalRoomManager, NewTURNAuthHandler, getTURNAuthHandlerFunc, @@ -200,10 +204,14 @@ func getSignalRelayConfig(config *config.Config) config.SignalRelayConfig { return config.SignalRelay } -func getPSRPCConfig(config *config.Config) config.PSRPCConfig { +func getPSRPCConfig(config *config.Config) rpc.PSRPCConfig { return config.PSRPC } +func getPSRPCClientParams(config rpc.PSRPCConfig, bus psrpc.MessageBus) rpc.ClientParams { + return rpc.NewClientParams(config, bus, logger.GetLogger(), prometheus.PSRPCMetricsObserver{}) +} + func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { return NewTurnServer(conf, authHandler, false) } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index b3bd269cf..322d125e6 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -12,8 +12,10 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" redis2 "github.com/livekit/protocol/redis" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" @@ -74,12 +76,17 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService) - topicFormatter := routing.NewTopicFormatter() - roomClient, err := routing.NewRoomClient(messageBus, psrpcConfig) + topicFormatter := rpc.NewTopicFormatter() + clientParams := getPSRPCClientParams(psrpcConfig, messageBus) + roomClient, err := rpc.NewTypedRoomClient(clientParams) if err != nil { return nil, err } - roomService, err := NewRoomService(roomConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient) + participantClient, err := rpc.NewTypedParticipantClient(clientParams) + if err != nil { + return nil, err + } + roomService, err := NewRoomService(roomConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient) if err != nil { return nil, err } @@ -233,10 +240,14 @@ func getSignalRelayConfig(config2 *config.Config) config.SignalRelayConfig { return config2.SignalRelay } -func getPSRPCConfig(config2 *config.Config) config.PSRPCConfig { +func getPSRPCConfig(config2 *config.Config) rpc.PSRPCConfig { return config2.PSRPC } +func getPSRPCClientParams(config2 rpc.PSRPCConfig, bus psrpc.MessageBus) rpc.ClientParams { + return rpc.NewClientParams(config2, bus, logger.GetLogger(), prometheus.PSRPCMetricsObserver{}) +} + func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { return NewTurnServer(conf, authHandler, false) }