From 08997c96b013a735be1b4afe725f4e5248b1a081 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 22 Oct 2023 00:08:41 +0530 Subject: [PATCH 1/2] Drop not relevant packet only if contiguous. (#2167) The probing + munging has not been set up to drop packets that follow a gap. Dropping such a packet leads to padding packet sequence numbers overlapping with regular packets. This change does two things though. - The not relevant packet will still not be sent over the wire. That could create holes in the sequence number leading to NACKs - Would the hole cause decode issues? Unclear as making this condition is hard. Simulating it is not showing issues, but that may not be producing the bad sequence if any. Will look at the ability to drop a packet after a gap later. --- pkg/sfu/forwarder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index f37bea03f..887c48566 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1691,7 +1691,7 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in tp.shouldDrop = true if f.started && result.IsRelevant { // call to update highest incoming sequence number and other internal structures - if _, err := f.rtpMunger.UpdateAndGetSnTs(extPkt); err == nil { + if tpRTP, err := f.rtpMunger.UpdateAndGetSnTs(extPkt); err == nil && tpRTP.snOrdering == SequenceNumberOrderingContiguous { f.rtpMunger.PacketDropped(extPkt) } } From 325e5ca753e485a131023bb9f7df1b476420d31f Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sun, 22 Oct 2023 22:49:38 -0700 Subject: [PATCH 2/2] add psrpc room service (#2171) * add psrpc room service * update deps * disable by default * feedback * config * test --- go.mod | 8 +- go.sum | 20 +- pkg/config/config.go | 15 + pkg/routing/roomclient.go | 27 ++ pkg/routing/topic.go | 22 ++ pkg/rtc/errors.go | 1 + pkg/rtc/helper_test.go | 3 +- pkg/rtc/participant.go | 8 +- pkg/rtc/types/interfaces.go | 7 +- .../typesfakes/fake_local_participant.go | 43 ++- pkg/service/errors.go | 29 +- pkg/service/redisstore.go | 2 +- pkg/service/roommanager.go | 257 +++++++++++------- pkg/service/roomservice.go | 67 ++++- pkg/service/roomservice_test.go | 14 +- pkg/service/wire.go | 7 + pkg/service/wire_gen.go | 14 +- 17 files changed, 401 insertions(+), 143 deletions(-) create mode 100644 pkg/routing/roomclient.go create mode 100644 pkg/routing/topic.go diff --git a/go.mod b/go.mod index f9a258872..8a990ac1a 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16 + github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -70,14 +70,14 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.4 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/josharian/native v1.1.0 // indirect - github.com/klauspost/compress v1.17.0 // indirect + github.com/klauspost/compress v1.17.2 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mdlayher/netlink v1.7.1 // indirect github.com/mdlayher/socket v0.4.0 // indirect - github.com/nats-io/nats.go v1.30.2 // indirect + github.com/nats-io/nats.go v1.31.0 // indirect github.com/nats-io/nkeys v0.4.5 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pion/datachannel v1.5.5 // indirect @@ -101,7 +101,7 @@ require ( golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/tools v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/grpc v1.59.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index b1d7080f8..9927c1032 100644 --- a/go.sum +++ b/go.sum @@ -107,8 +107,8 @@ github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786 h1:N527AHMa79 github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786/go.mod h1:v4hqbTdfQngbVSZJVWUhGE/lbTFf9jb+ygmNUDQMuOs= github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= -github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= -github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16 h1:0Ca5hQP9+DaRjrr5d9msNhfCL+CBvcxqLdh4xOXAZeU= -github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16/go.mod h1:K47UnOyc9C0L9LEBDznwd1NMBSsohgyhfurshU5f+4Y= +github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca h1:M3gymSJFriayMERPin3BSGgYTUpLVfGaMgmqp+3JV6I= +github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca/go.mod h1:K47UnOyc9C0L9LEBDznwd1NMBSsohgyhfurshU5f+4Y= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -158,13 +158,10 @@ github.com/mdlayher/socket v0.0.0-20210307095302-262dc9984e00/go.mod h1:GAFlyu4/ github.com/mdlayher/socket v0.1.1/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5Awbj+qDs= github.com/mdlayher/socket v0.4.0 h1:280wsy40IC9M9q1uPGcLBwXpcTQDtoGwVt+BNoITxIw= github.com/mdlayher/socket v0.4.0/go.mod h1:xxFqz5GRCUN3UEOm9CZqEJsAbe1C8OwSK46NlmWuVoc= -github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= -github.com/nats-io/nats-server/v2 v2.9.8 h1:jgxZsv+A3Reb3MgwxaINcNq/za8xZInKhDg9Q0cGN1o= -github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY= -github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -403,7 +400,6 @@ golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -416,8 +412,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a h1:a2MQQVoTo96JC9PMGtGBymLp7+/RzpFc2yX/9WfFg1c= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a/go.mod h1:4cYg8o5yUbm77w8ZX00LhMVNl/YVBFJRYWDc0uYWMs0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/config/config.go b/pkg/config/config.go index 7997e7f93..06af44c34 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -71,6 +71,7 @@ type Config struct { Keys map[string]string `yaml:"keys,omitempty"` Region string `yaml:"region,omitempty"` SignalRelay SignalRelayConfig `yaml:"signal_relay,omitempty"` + PSRPC PSRPCConfig `yaml:"psrpc,omitempty"` // LogLevel is deprecated LogLevel string `yaml:"log_level,omitempty"` Logging LoggingConfig `yaml:"logging,omitempty"` @@ -269,6 +270,14 @@ type SignalRelayConfig struct { StreamBufferSize int `yaml:"stream_buffer_size,omitempty"` } +type PSRPCConfig struct { + Enable bool `yaml:"enable,omitempty"` + MaxAttempts int `yaml:"retry_attempts,omitempty"` + Timeout time.Duration `yaml:"retry_timeout,omitempty"` + Backoff time.Duration `yaml:"retry_backoff,omitempty"` + BufferSize int `yaml:"stream_buffer_size,omitempty"` +} + // RegionConfig lists available regions and their latitude/longitude, so the selector would prefer // regions that are closer type RegionConfig struct { @@ -484,6 +493,12 @@ var DefaultConfig = Config{ MaxRetryInterval: 4 * time.Second, StreamBufferSize: 1000, }, + PSRPC: PSRPCConfig{ + MaxAttempts: 3, + Timeout: 500 * time.Millisecond, + Backoff: 500 * time.Millisecond, + BufferSize: 1000, + }, Keys: map[string]string{}, } diff --git a/pkg/routing/roomclient.go b/pkg/routing/roomclient.go new file mode 100644 index 000000000..d511fc282 --- /dev/null +++ b/pkg/routing/roomclient.go @@ -0,0 +1,27 @@ +package routing + +import ( + "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + protopsrpc "github.com/livekit/protocol/psrpc" + "github.com/livekit/protocol/rpc" + "github.com/livekit/psrpc" + "github.com/livekit/psrpc/pkg/middleware" +) + +func NewRoomClient(nodeID livekit.NodeID, bus psrpc.MessageBus, config config.PSRPCConfig) (rpc.TypedRoomClient, error) { + return rpc.NewTypedRoomClient( + nodeID, + bus, + protopsrpc.WithClientLogger(logger.GetLogger()), + middleware.WithClientMetrics(prometheus.PSRPCMetricsObserver{}), + psrpc.WithClientChannelSize(config.BufferSize), + middleware.WithRPCRetries(middleware.RetryOptions{ + MaxAttempts: config.MaxAttempts, + Timeout: config.Timeout, + Backoff: config.Backoff, + }), + ) +} diff --git a/pkg/routing/topic.go b/pkg/routing/topic.go new file mode 100644 index 000000000..24ebb0bba --- /dev/null +++ b/pkg/routing/topic.go @@ -0,0 +1,22 @@ +package routing + +import ( + "context" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" +) + +type topicFormatter struct{} + +func NewTopicFormatter() rpc.TopicFormatter { + return topicFormatter{} +} + +func (f topicFormatter) ParticipantTopic(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) rpc.ParticipantTopic { + return rpc.FormatParticipantTopic(roomName, identity) +} + +func (f topicFormatter) RoomTopic(ctx context.Context, roomName livekit.RoomName) rpc.RoomTopic { + return rpc.FormatRoomTopic(roomName) +} diff --git a/pkg/rtc/errors.go b/pkg/rtc/errors.go index 656995632..c8f66073b 100644 --- a/pkg/rtc/errors.go +++ b/pkg/rtc/errors.go @@ -27,6 +27,7 @@ var ( ErrEmptyIdentity = errors.New("participant identity cannot be empty") ErrEmptyParticipantID = errors.New("participant ID cannot be empty") ErrMissingGrants = errors.New("VideoGrant is missing") + ErrInternalError = errors.New("internal error") // Track subscription related ErrNoTrackPermission = errors.New("participant is not allowed to subscribe to this track") diff --git a/pkg/rtc/helper_test.go b/pkg/rtc/helper_test.go index c47b5a658..4a0087ea1 100644 --- a/pkg/rtc/helper_test.go +++ b/pkg/rtc/helper_test.go @@ -64,8 +64,9 @@ func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.Pro } } - p.SetTrackMutedCalls(func(sid livekit.TrackID, muted bool, fromServer bool) { + p.SetTrackMutedCalls(func(sid livekit.TrackID, muted bool, fromServer bool) *livekit.TrackInfo { updateTrack() + return nil }) p.AddTrackCalls(func(req *livekit.AddTrackRequest) { updateTrack() diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 38fac3f58..f09b13d2f 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1670,16 +1670,16 @@ func (p *ParticipantImpl) sendTrackPublished(cid string, ti *livekit.TrackInfo) }) } -func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) { +func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) *livekit.TrackInfo { // when request is coming from admin, send message to current participant if fromAdmin { p.sendTrackMuted(trackID, muted) } - p.setTrackMuted(trackID, muted) + return p.setTrackMuted(trackID, muted) } -func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) { +func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) *livekit.TrackInfo { p.dirty.Store(true) p.supervisor.SetPublicationMute(trackID, muted) @@ -1713,6 +1713,8 @@ func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) { if !isPending && track == nil { p.pubLogger.Warnw("could not locate track", nil, "trackID", trackID) } + + return trackInfo } func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) (*MediaTrack, bool) { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 3a971fce4..da3ee7178 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -87,6 +87,7 @@ const ( ParticipantCloseReasonVerifyFailed ParticipantCloseReasonJoinFailed ParticipantCloseReasonJoinTimeout + ParticipantCloseReasonMessageBusFailed ParticipantCloseReasonStateDisconnected ParticipantCloseReasonPeerConnectionDisconnected ParticipantCloseReasonDuplicateIdentity @@ -119,6 +120,8 @@ func (p ParticipantCloseReason) String() string { return "JOIN_FAILED" case ParticipantCloseReasonJoinTimeout: return "JOIN_TIMEOUT" + case ParticipantCloseReasonMessageBusFailed: + return "MESSAGE_BUS_FAILED" case ParticipantCloseReasonStateDisconnected: return "STATE_DISCONNECTED" case ParticipantCloseReasonPeerConnectionDisconnected: @@ -162,7 +165,7 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason { return livekit.DisconnectReason_CLIENT_INITIATED case ParticipantCloseReasonRoomManagerStop: return livekit.DisconnectReason_SERVER_SHUTDOWN - case ParticipantCloseReasonVerifyFailed, ParticipantCloseReasonJoinFailed, ParticipantCloseReasonJoinTimeout: + case ParticipantCloseReasonVerifyFailed, ParticipantCloseReasonJoinFailed, ParticipantCloseReasonJoinTimeout, ParticipantCloseReasonMessageBusFailed: // expected to be connected but is not return livekit.DisconnectReason_JOIN_FAILURE case ParticipantCloseReasonPeerConnectionDisconnected: @@ -335,7 +338,7 @@ type LocalParticipant interface { AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) HandleOffer(sdp webrtc.SessionDescription) AddTrack(req *livekit.AddTrackRequest) - SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) + SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) *livekit.TrackInfo HandleAnswer(sdp webrtc.SessionDescription) Negotiate(force bool) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index ac0b18932..4020502be 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -738,13 +738,19 @@ type FakeLocalParticipant struct { setSubscriberChannelCapacityArgsForCall []struct { arg1 int64 } - SetTrackMutedStub func(livekit.TrackID, bool, bool) + SetTrackMutedStub func(livekit.TrackID, bool, bool) *livekit.TrackInfo setTrackMutedMutex sync.RWMutex setTrackMutedArgsForCall []struct { arg1 livekit.TrackID arg2 bool arg3 bool } + setTrackMutedReturns struct { + result1 *livekit.TrackInfo + } + setTrackMutedReturnsOnCall map[int]struct { + result1 *livekit.TrackInfo + } StartStub func() startMutex sync.RWMutex startArgsForCall []struct { @@ -4877,19 +4883,25 @@ func (fake *FakeLocalParticipant) SetSubscriberChannelCapacityArgsForCall(i int) return argsForCall.arg1 } -func (fake *FakeLocalParticipant) SetTrackMuted(arg1 livekit.TrackID, arg2 bool, arg3 bool) { +func (fake *FakeLocalParticipant) SetTrackMuted(arg1 livekit.TrackID, arg2 bool, arg3 bool) *livekit.TrackInfo { fake.setTrackMutedMutex.Lock() + ret, specificReturn := fake.setTrackMutedReturnsOnCall[len(fake.setTrackMutedArgsForCall)] fake.setTrackMutedArgsForCall = append(fake.setTrackMutedArgsForCall, struct { arg1 livekit.TrackID arg2 bool arg3 bool }{arg1, arg2, arg3}) stub := fake.SetTrackMutedStub + fakeReturns := fake.setTrackMutedReturns fake.recordInvocation("SetTrackMuted", []interface{}{arg1, arg2, arg3}) fake.setTrackMutedMutex.Unlock() if stub != nil { - fake.SetTrackMutedStub(arg1, arg2, arg3) + return stub(arg1, arg2, arg3) } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 } func (fake *FakeLocalParticipant) SetTrackMutedCallCount() int { @@ -4898,7 +4910,7 @@ func (fake *FakeLocalParticipant) SetTrackMutedCallCount() int { return len(fake.setTrackMutedArgsForCall) } -func (fake *FakeLocalParticipant) SetTrackMutedCalls(stub func(livekit.TrackID, bool, bool)) { +func (fake *FakeLocalParticipant) SetTrackMutedCalls(stub func(livekit.TrackID, bool, bool) *livekit.TrackInfo) { fake.setTrackMutedMutex.Lock() defer fake.setTrackMutedMutex.Unlock() fake.SetTrackMutedStub = stub @@ -4911,6 +4923,29 @@ func (fake *FakeLocalParticipant) SetTrackMutedArgsForCall(i int) (livekit.Track return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } +func (fake *FakeLocalParticipant) SetTrackMutedReturns(result1 *livekit.TrackInfo) { + fake.setTrackMutedMutex.Lock() + defer fake.setTrackMutedMutex.Unlock() + fake.SetTrackMutedStub = nil + fake.setTrackMutedReturns = struct { + result1 *livekit.TrackInfo + }{result1} +} + +func (fake *FakeLocalParticipant) SetTrackMutedReturnsOnCall(i int, result1 *livekit.TrackInfo) { + fake.setTrackMutedMutex.Lock() + defer fake.setTrackMutedMutex.Unlock() + fake.SetTrackMutedStub = nil + if fake.setTrackMutedReturnsOnCall == nil { + fake.setTrackMutedReturnsOnCall = make(map[int]struct { + result1 *livekit.TrackInfo + }) + } + fake.setTrackMutedReturnsOnCall[i] = struct { + result1 *livekit.TrackInfo + }{result1} +} + func (fake *FakeLocalParticipant) Start() { fake.startMutex.Lock() fake.startArgsForCall = append(fake.startArgsForCall, struct { diff --git a/pkg/service/errors.go b/pkg/service/errors.go index 92093d682..7c27d0dac 100644 --- a/pkg/service/errors.go +++ b/pkg/service/errors.go @@ -19,18 +19,19 @@ import ( ) var ( - ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress does not exist") - ErrEgressNotConnected = psrpc.NewErrorf(psrpc.Internal, "egress not connected (redis required)") - ErrIdentityEmpty = psrpc.NewErrorf(psrpc.InvalidArgument, "identity cannot be empty") - ErrIngressNotConnected = psrpc.NewErrorf(psrpc.Internal, "ingress not connected (redis required)") - ErrIngressNotFound = psrpc.NewErrorf(psrpc.NotFound, "ingress does not exist") - ErrIngressNonReusable = psrpc.NewErrorf(psrpc.InvalidArgument, "ingress is not reusable and cannot be modified") - ErrMetadataExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "metadata size exceeds limits") - ErrOperationFailed = psrpc.NewErrorf(psrpc.Internal, "operation cannot be completed") - ErrParticipantNotFound = psrpc.NewErrorf(psrpc.NotFound, "participant does not exist") - ErrRoomNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested room does not exist") - ErrRoomLockFailed = psrpc.NewErrorf(psrpc.Internal, "could not lock room") - ErrRoomUnlockFailed = psrpc.NewErrorf(psrpc.Internal, "could not unlock room, lock token does not match") - ErrTrackNotFound = psrpc.NewErrorf(psrpc.NotFound, "track is not found") - ErrWebHookMissingAPIKey = psrpc.NewErrorf(psrpc.InvalidArgument, "api_key is required to use webhooks") + ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress does not exist") + ErrEgressNotConnected = psrpc.NewErrorf(psrpc.Internal, "egress not connected (redis required)") + ErrIdentityEmpty = psrpc.NewErrorf(psrpc.InvalidArgument, "identity cannot be empty") + ErrIngressNotConnected = psrpc.NewErrorf(psrpc.Internal, "ingress not connected (redis required)") + ErrIngressNotFound = psrpc.NewErrorf(psrpc.NotFound, "ingress does not exist") + ErrIngressNonReusable = psrpc.NewErrorf(psrpc.InvalidArgument, "ingress is not reusable and cannot be modified") + ErrMetadataExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "metadata size exceeds limits") + ErrOperationFailed = psrpc.NewErrorf(psrpc.Internal, "operation cannot be completed") + ErrParticipantNotFound = psrpc.NewErrorf(psrpc.NotFound, "participant does not exist") + ErrRoomNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested room does not exist") + ErrRoomLockFailed = psrpc.NewErrorf(psrpc.Internal, "could not lock room") + ErrRoomUnlockFailed = psrpc.NewErrorf(psrpc.Internal, "could not unlock room, lock token does not match") + ErrRemoteUnmuteNoteEnabled = psrpc.NewErrorf(psrpc.FailedPrecondition, "remote unmute not enabled") + ErrTrackNotFound = psrpc.NewErrorf(psrpc.NotFound, "track is not found") + ErrWebHookMissingAPIKey = psrpc.NewErrorf(psrpc.InvalidArgument, "api_key is required to use webhooks") ) diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index 71fe77a9d..cc8c2e146 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -70,7 +70,7 @@ type RedisStore struct { func NewRedisStore(rc redis.UniversalClient) *RedisStore { unlockScript := `if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) - else return 0 + else return 0 end` return &RedisStore{ diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 5797142e3..71b9cf5f9 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pkg/errors" + "golang.org/x/exp/maps" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/livekit-server/version" @@ -29,7 +30,9 @@ import ( "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" + "github.com/livekit/psrpc" "github.com/livekit/livekit-server/pkg/clientconfiguration" "github.com/livekit/livekit-server/pkg/config" @@ -67,6 +70,7 @@ type RoomManager struct { egressLauncher rtc.EgressLauncher versionGenerator utils.TimedVersionGenerator turnAuthHandler *TURNAuthHandler + roomServer rpc.TypedRoomServer rooms map[livekit.RoomName]*rtc.Room @@ -83,6 +87,7 @@ func NewLocalRoomManager( egressLauncher rtc.EgressLauncher, versionGenerator utils.TimedVersionGenerator, turnAuthHandler *TURNAuthHandler, + bus psrpc.MessageBus, ) (*RoomManager, error) { rtcConf, err := rtc.NewWebRTCConfig(conf) if err != nil { @@ -114,6 +119,11 @@ func NewLocalRoomManager( }, } + r.roomServer, err = rpc.NewTypedRoomServer(livekit.NodeID(r.currentNode.Id), r, bus) + if err != nil { + return nil, err + } + // hook up to router router.OnNewParticipantRTC(r.StartSession) router.OnRTCMessage(r.handleRTCMessage) @@ -126,8 +136,8 @@ func (r *RoomManager) GetRoom(_ context.Context, roomName livekit.RoomName) *rtc return r.rooms[roomName] } -// DeleteRoom completely deletes all room information, including active sessions, room store, and routing info -func (r *RoomManager) DeleteRoom(ctx context.Context, roomName livekit.RoomName) error { +// deleteRoom completely deletes all room information, including active sessions, room store, and routing info +func (r *RoomManager) deleteRoom(ctx context.Context, roomName livekit.RoomName) error { logger.Infow("deleting room state", "room", roomName) r.lock.Lock() delete(r.rooms, roomName) @@ -167,7 +177,7 @@ func (r *RoomManager) CleanupRooms() error { now := time.Now().Unix() for _, room := range rooms { if (now - room.CreationTime) > roomPurgeSeconds { - if err := r.DeleteRoom(ctx, livekit.RoomName(room.Name)); err != nil { + if err := r.deleteRoom(ctx, livekit.RoomName(room.Name)); err != nil { return err } } @@ -177,10 +187,7 @@ func (r *RoomManager) CleanupRooms() error { func (r *RoomManager) CloseIdleRooms() { r.lock.RLock() - rooms := make([]*rtc.Room, 0, len(r.rooms)) - for _, rm := range r.rooms { - rooms = append(rooms, rm) - } + rooms := maps.Values(r.rooms) r.lock.RUnlock() for _, room := range rooms { @@ -203,10 +210,7 @@ func (r *RoomManager) HasParticipants() bool { func (r *RoomManager) Stop() { // disconnect all clients r.lock.RLock() - rooms := make([]*rtc.Room, 0, len(r.rooms)) - for _, rm := range r.rooms { - rooms = append(rooms, rm) - } + rooms := maps.Values(r.rooms) r.lock.RUnlock() for _, room := range rooms { @@ -216,6 +220,8 @@ func (r *RoomManager) Stop() { room.Close() } + r.roomServer.Kill() + if r.rtcConfig != nil { if r.rtcConfig.UDPMux != nil { _ = r.rtcConfig.UDPMux.Close() @@ -426,6 +432,11 @@ func (r *RoomManager) StartSession( _ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false) return err } + if err := r.roomServer.RegisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())); err != nil { + pLogger.Errorw("could not join register participant topic", err) + _ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false) + return err + } if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil { pLogger.Errorw("could not store participant", err) } @@ -449,6 +460,8 @@ func (r *RoomManager) StartSession( pLogger.Errorw("could not delete participant", err) } + r.roomServer.DeregisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())) + // update room store with new numParticipants proto := room.ToProto() persistRoomForParticipantCount(proto) @@ -489,6 +502,10 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room return nil, err } + if err := r.roomServer.RegisterAllRoomTopics(rpc.FormatRoomTopic(roomName)); err != nil { + return nil, err + } + r.lock.Lock() currentRoom := r.rooms[roomName] @@ -507,10 +524,12 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry, r.egressLauncher) newRoom.OnClose(func() { + r.roomServer.DeregisterAllRoomTopics(rpc.FormatRoomTopic(roomName)) + roomInfo := newRoom.ToProto() r.telemetry.RoomEnded(ctx, roomInfo) prometheus.RoomEnded(time.Unix(roomInfo.CreationTime, 0)) - if err := r.DeleteRoom(ctx, roomName); err != nil { + if err := r.deleteRoom(ctx, roomName); err != nil { newRoom.Logger.Errorw("could not delete room", err) } @@ -602,96 +621,152 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa // handles RTC messages resulted from Room API calls func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) { - r.lock.RLock() - room := r.rooms[roomName] - r.lock.RUnlock() - - if room == nil { - if _, ok := msg.Message.(*livekit.RTCNodeMessage_DeleteRoom); ok { - // special case of a non-RTC room e.g. room created but no participants joined - logger.Debugw("Deleting non-rtc room, loading from roomstore") - err := r.roomStore.DeleteRoom(ctx, roomName) - if err != nil { - logger.Debugw("Error deleting non-rtc room", "err", err) - } - return - } else { - logger.Warnw("Could not find room", nil, "room", roomName) - return - } - } - - participant := room.GetParticipant(identity) - var sid livekit.ParticipantID - if participant != nil { - sid = participant.ID() - } - pLogger := rtc.LoggerWithParticipant( - rtc.LoggerWithRoom(logger.GetLogger(), roomName, room.ID()), - identity, - sid, - false, - ) - switch rm := msg.Message.(type) { case *livekit.RTCNodeMessage_RemoveParticipant: - if participant == nil { - return - } - pLogger.Infow("removing participant") - // remove participant by identity, any SID - room.RemoveParticipant(identity, "", types.ParticipantCloseReasonServiceRequestRemoveParticipant) + r.RemoveParticipant(ctx, rm.RemoveParticipant) case *livekit.RTCNodeMessage_MuteTrack: - if participant == nil { - return - } - pLogger.Debugw("setting track muted", - "trackID", rm.MuteTrack.TrackSid, "muted", rm.MuteTrack.Muted) - if !rm.MuteTrack.Muted && !r.config.Room.EnableRemoteUnmute { - pLogger.Errorw("cannot unmute track, remote unmute is disabled", nil) - return - } - participant.SetTrackMuted(livekit.TrackID(rm.MuteTrack.TrackSid), rm.MuteTrack.Muted, true) + r.MutePublishedTrack(ctx, rm.MuteTrack) case *livekit.RTCNodeMessage_UpdateParticipant: - if participant == nil { - return - } - pLogger.Debugw("updating participant", "metadata", rm.UpdateParticipant.Metadata, - "permission", rm.UpdateParticipant.Permission) - room.UpdateParticipantMetadata(participant, rm.UpdateParticipant.Name, rm.UpdateParticipant.Metadata) - if rm.UpdateParticipant.Permission != nil { - participant.SetPermission(rm.UpdateParticipant.Permission) - } + r.UpdateParticipant(ctx, rm.UpdateParticipant) case *livekit.RTCNodeMessage_DeleteRoom: + r.DeleteRoom(ctx, rm.DeleteRoom) + case *livekit.RTCNodeMessage_UpdateSubscriptions: + r.UpdateSubscriptions(ctx, rm.UpdateSubscriptions) + case *livekit.RTCNodeMessage_SendData: + r.SendData(ctx, rm.SendData) + case *livekit.RTCNodeMessage_UpdateRoomMetadata: + r.UpdateRoomMetadata(ctx, rm.UpdateRoomMetadata) + } +} + +func (r *RoomManager) roomLogger(room *rtc.Room) logger.Logger { + return rtc.LoggerWithParticipant(rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()), "", "", false) +} + +func (r *RoomManager) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.RemoveParticipantResponse, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + return nil, ErrRoomNotFound + } + + participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity)) + if participant == nil { + return nil, ErrParticipantNotFound + } + + participant.GetLogger().Infow("removing participant") + room.RemoveParticipant(livekit.ParticipantIdentity(req.Identity), "", types.ParticipantCloseReasonServiceRequestRemoveParticipant) + return &livekit.RemoveParticipantResponse{}, nil +} + +func (r *RoomManager) MutePublishedTrack(ctx context.Context, req *livekit.MuteRoomTrackRequest) (*livekit.MuteRoomTrackResponse, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + return nil, ErrRoomNotFound + } + + participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity)) + if participant == nil { + return nil, ErrParticipantNotFound + } + + participant.GetLogger().Debugw("setting track muted", + "trackID", req.TrackSid, "muted", req.Muted) + if !req.Muted && !r.config.Room.EnableRemoteUnmute { + participant.GetLogger().Errorw("cannot unmute track, remote unmute is disabled", nil) + return nil, ErrRemoteUnmuteNoteEnabled + } + track := participant.SetTrackMuted(livekit.TrackID(req.TrackSid), req.Muted, true) + return &livekit.MuteRoomTrackResponse{Track: track}, nil +} + +func (r *RoomManager) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + return nil, ErrRoomNotFound + } + + participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity)) + if participant == nil { + return nil, ErrParticipantNotFound + } + + participant.GetLogger().Debugw("updating participant", + "metadata", req.Metadata, "permission", req.Permission) + room.UpdateParticipantMetadata(participant, req.Name, req.Metadata) + if req.Permission != nil { + participant.SetPermission(req.Permission) + } + return participant.ToProto(), nil +} + +func (r *RoomManager) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomRequest) (*livekit.DeleteRoomResponse, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + // special case of a non-RTC room e.g. room created but no participants joined + logger.Debugw("Deleting non-rtc room, loading from roomstore") + err := r.roomStore.DeleteRoom(ctx, livekit.RoomName(req.Room)) + if err != nil { + logger.Debugw("Error deleting non-rtc room", "err", err) + return nil, err + } + } else { room.Logger.Infow("deleting room") for _, p := range room.GetParticipants() { _ = p.Close(true, types.ParticipantCloseReasonServiceRequestDeleteRoom, false) } room.Close() - case *livekit.RTCNodeMessage_UpdateSubscriptions: - if participant == nil { - return - } - pLogger.Debugw("updating participant subscriptions") - room.UpdateSubscriptions( - participant, - livekit.StringsAsIDs[livekit.TrackID](rm.UpdateSubscriptions.TrackSids), - rm.UpdateSubscriptions.ParticipantTracks, - rm.UpdateSubscriptions.Subscribe, - ) - case *livekit.RTCNodeMessage_SendData: - pLogger.Debugw("api send data", "size", len(rm.SendData.Data)) - up := &livekit.UserPacket{ - Payload: rm.SendData.Data, - DestinationSids: rm.SendData.DestinationSids, - DestinationIdentities: rm.SendData.DestinationIdentities, - Topic: rm.SendData.Topic, - } - room.SendDataPacket(up, rm.SendData.Kind) - case *livekit.RTCNodeMessage_UpdateRoomMetadata: - pLogger.Debugw("updating room") - room.SetMetadata(rm.UpdateRoomMetadata.Metadata) } + return &livekit.DeleteRoomResponse{}, nil +} + +func (r *RoomManager) UpdateSubscriptions(ctx context.Context, req *livekit.UpdateSubscriptionsRequest) (*livekit.UpdateSubscriptionsResponse, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + return nil, ErrRoomNotFound + } + + participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity)) + if participant == nil { + return nil, ErrParticipantNotFound + } + + participant.GetLogger().Debugw("updating participant subscriptions") + room.UpdateSubscriptions( + participant, + livekit.StringsAsIDs[livekit.TrackID](req.TrackSids), + req.ParticipantTracks, + req.Subscribe, + ) + return &livekit.UpdateSubscriptionsResponse{}, nil +} + +func (r *RoomManager) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + return nil, ErrRoomNotFound + } + + r.roomLogger(room).Debugw("api send data", "size", len(req.Data)) + up := &livekit.UserPacket{ + Payload: req.Data, + DestinationSids: req.DestinationSids, + DestinationIdentities: req.DestinationIdentities, + Topic: req.Topic, + } + room.SendDataPacket(up, req.Kind) + return &livekit.SendDataResponse{}, nil +} + +func (r *RoomManager) UpdateRoomMetadata(ctx context.Context, req *livekit.UpdateRoomMetadataRequest) (*livekit.Room, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + return nil, ErrRoomNotFound + } + + r.roomLogger(room).Debugw("updating room") + room.SetMetadata(req.Metadata) + return room.ToProto(), nil } func (r *RoomManager) iceServersForParticipant(apiKey string, participant types.LocalParticipant, tlsOnly bool) []*livekit.ICEServer { diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index d59d3ce82..fe49550f9 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -37,27 +37,36 @@ import ( type RoomService struct { roomConf config.RoomConfig apiConf config.APIConfig + psrpcConf config.PSRPCConfig router routing.MessageRouter roomAllocator RoomAllocator roomStore ServiceStore egressLauncher rtc.EgressLauncher + topicFormatter rpc.TopicFormatter + roomClient rpc.TypedRoomClient } func NewRoomService( roomConf config.RoomConfig, apiConf config.APIConfig, + psrpcConf config.PSRPCConfig, router routing.MessageRouter, roomAllocator RoomAllocator, serviceStore ServiceStore, egressLauncher rtc.EgressLauncher, + topicFormatter rpc.TopicFormatter, + roomClient rpc.TypedRoomClient, ) (svc *RoomService, err error) { svc = &RoomService{ roomConf: roomConf, apiConf: apiConf, + psrpcConf: psrpcConf, router: router, roomAllocator: roomAllocator, roomStore: serviceStore, egressLauncher: egressLauncher, + topicFormatter: topicFormatter, + roomClient: roomClient, } return } @@ -142,6 +151,10 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq return nil, twirpAuthError(err) } + if s.psrpcConf.Enable { + return s.roomClient.DeleteRoom(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + } + if _, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false); err == ErrRoomNotFound { return nil, twirp.NotFoundError("room not found") } @@ -207,10 +220,18 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.RemoveParticipantResponse, error) { AppendLogFields(ctx, "room", req.Room, "participant", req.Identity) + if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil { + return nil, twirpAuthError(err) + } + if _, err := s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)); err == ErrParticipantNotFound { return nil, twirp.NotFoundError("participant not found") } + if s.psrpcConf.Enable { + return s.roomClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + } + err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_RemoveParticipant{ RemoveParticipant: req, @@ -243,6 +264,10 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR return nil, twirpAuthError(err) } + if s.psrpcConf.Enable { + return s.roomClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + } + err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_MuteTrack{ MuteTrack: req, @@ -289,6 +314,14 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update return nil, twirp.InvalidArgumentError(ErrMetadataExceedsLimits.Error(), strconv.Itoa(maxMetadataSize)) } + if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil { + return nil, twirpAuthError(err) + } + + if s.psrpcConf.Enable { + return s.roomClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + } + err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_UpdateParticipant{ UpdateParticipant: req, @@ -329,6 +362,15 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda trackSIDs = append(trackSIDs, pt.TrackSids...) } AppendLogFields(ctx, "room", req.Room, "participant", req.Identity, "trackID", trackSIDs) + + if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil { + return nil, twirpAuthError(err) + } + + if s.psrpcConf.Enable { + return s.roomClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + } + err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_UpdateSubscriptions{ UpdateSubscriptions: req, @@ -348,6 +390,10 @@ func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest return nil, twirpAuthError(err) } + if s.psrpcConf.Enable { + return s.roomClient.SendData(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + } + err := s.router.WriteRoomRTC(ctx, roomName, &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_SendData{ SendData: req, @@ -386,13 +432,20 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return nil, err } - err = s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{ - Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{ - UpdateRoomMetadata: req, - }, - }) - if err != nil { - return nil, err + if s.psrpcConf.Enable { + _, err := s.roomClient.UpdateRoomMetadata(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + if err != nil { + return nil, err + } + } else { + err = s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{ + Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{ + UpdateRoomMetadata: req, + }, + }) + if err != nil { + return nil, err + } } err = s.confirmExecution(func() error { diff --git a/pkg/service/roomservice_test.go b/pkg/service/roomservice_test.go index a7433a090..bf25870e7 100644 --- a/pkg/service/roomservice_test.go +++ b/pkg/service/roomservice_test.go @@ -23,8 +23,10 @@ import ( "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc/rpcfakes" "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/routing/routingfakes" "github.com/livekit/livekit-server/pkg/service" "github.com/livekit/livekit-server/pkg/service/servicefakes" @@ -126,9 +128,17 @@ func newTestRoomService(conf config.RoomConfig) *TestRoomService { router := &routingfakes.FakeRouter{} allocator := &servicefakes.FakeRoomAllocator{} store := &servicefakes.FakeServiceStore{} - svc, err := service.NewRoomService(conf, + svc, err := service.NewRoomService( + conf, config.APIConfig{ExecutionTimeout: 2}, - router, allocator, store, nil) + config.PSRPCConfig{}, + router, + allocator, + store, + nil, + routing.NewTopicFormatter(), + &rpcfakes.FakeTypedRoomClient{}, + ) if err != nil { panic(err) } diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 05502a16c..0aa93088b 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -73,6 +73,9 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live getSignalRelayConfig, NewDefaultSignalServer, routing.NewSignalClient, + getPSRPCConfig, + routing.NewTopicFormatter, + routing.NewRoomClient, NewLocalRoomManager, NewTURNAuthHandler, getTURNAuthHandlerFunc, @@ -197,6 +200,10 @@ func getSignalRelayConfig(config *config.Config) config.SignalRelayConfig { return config.SignalRelay } +func getPSRPCConfig(config *config.Config) config.PSRPCConfig { + return config.PSRPC +} + func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { return NewTurnServer(conf, authHandler, false) } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index fa788a829..69412c1fe 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -35,6 +35,7 @@ import ( func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) { roomConfig := getRoomConf(conf) apiConfig := config.DefaultAPIConfig() + psrpcConfig := getPSRPCConfig(conf) universalClient, err := createRedisClient(conf) if err != nil { return nil, err @@ -73,7 +74,12 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService) - roomService, err := NewRoomService(roomConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher) + topicFormatter := routing.NewTopicFormatter() + roomClient, err := routing.NewRoomClient(nodeID, messageBus, psrpcConfig) + if err != nil { + return nil, err + } + roomService, err := NewRoomService(roomConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient) if err != nil { return nil, err } @@ -88,7 +94,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live clientConfigurationManager := createClientConfiguration() timedVersionGenerator := utils.NewDefaultTimedVersionGenerator() turnAuthHandler := NewTURNAuthHandler(keyProvider) - roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler) + roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus) if err != nil { return nil, err } @@ -227,6 +233,10 @@ func getSignalRelayConfig(config2 *config.Config) config.SignalRelayConfig { return config2.SignalRelay } +func getPSRPCConfig(config2 *config.Config) config.PSRPCConfig { + return config2.PSRPC +} + func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { return NewTurnServer(conf, authHandler, false) }