diff --git a/go.mod b/go.mod index f9a258872..8a990ac1a 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16 + github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -70,14 +70,14 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.4 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/josharian/native v1.1.0 // indirect - github.com/klauspost/compress v1.17.0 // indirect + github.com/klauspost/compress v1.17.2 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mdlayher/netlink v1.7.1 // indirect github.com/mdlayher/socket v0.4.0 // indirect - github.com/nats-io/nats.go v1.30.2 // indirect + github.com/nats-io/nats.go v1.31.0 // indirect github.com/nats-io/nkeys v0.4.5 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pion/datachannel v1.5.5 // indirect @@ -101,7 +101,7 @@ require ( golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/tools v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/grpc v1.59.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index b1d7080f8..9927c1032 100644 --- a/go.sum +++ b/go.sum @@ -107,8 +107,8 @@ github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786 h1:N527AHMa79 github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786/go.mod h1:v4hqbTdfQngbVSZJVWUhGE/lbTFf9jb+ygmNUDQMuOs= github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= -github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= -github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16 h1:0Ca5hQP9+DaRjrr5d9msNhfCL+CBvcxqLdh4xOXAZeU= -github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16/go.mod h1:K47UnOyc9C0L9LEBDznwd1NMBSsohgyhfurshU5f+4Y= +github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca h1:M3gymSJFriayMERPin3BSGgYTUpLVfGaMgmqp+3JV6I= +github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca/go.mod h1:K47UnOyc9C0L9LEBDznwd1NMBSsohgyhfurshU5f+4Y= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -158,13 +158,10 @@ github.com/mdlayher/socket v0.0.0-20210307095302-262dc9984e00/go.mod h1:GAFlyu4/ github.com/mdlayher/socket v0.1.1/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5Awbj+qDs= github.com/mdlayher/socket v0.4.0 h1:280wsy40IC9M9q1uPGcLBwXpcTQDtoGwVt+BNoITxIw= github.com/mdlayher/socket v0.4.0/go.mod h1:xxFqz5GRCUN3UEOm9CZqEJsAbe1C8OwSK46NlmWuVoc= -github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= -github.com/nats-io/nats-server/v2 v2.9.8 h1:jgxZsv+A3Reb3MgwxaINcNq/za8xZInKhDg9Q0cGN1o= -github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY= -github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -403,7 +400,6 @@ golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -416,8 +412,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a h1:a2MQQVoTo96JC9PMGtGBymLp7+/RzpFc2yX/9WfFg1c= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a/go.mod h1:4cYg8o5yUbm77w8ZX00LhMVNl/YVBFJRYWDc0uYWMs0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/config/config.go b/pkg/config/config.go index 44a12648e..8c5f290f1 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -71,6 +71,7 @@ type Config struct { Keys map[string]string `yaml:"keys,omitempty"` Region string `yaml:"region,omitempty"` SignalRelay SignalRelayConfig `yaml:"signal_relay,omitempty"` + PSRPC PSRPCConfig `yaml:"psrpc,omitempty"` // LogLevel is deprecated LogLevel string `yaml:"log_level,omitempty"` Logging LoggingConfig `yaml:"logging,omitempty"` @@ -269,6 +270,14 @@ type SignalRelayConfig struct { StreamBufferSize int `yaml:"stream_buffer_size,omitempty"` } +type PSRPCConfig struct { + Enable bool `yaml:"enable,omitempty"` + MaxAttempts int `yaml:"retry_attempts,omitempty"` + Timeout time.Duration `yaml:"retry_timeout,omitempty"` + Backoff time.Duration `yaml:"retry_backoff,omitempty"` + BufferSize int `yaml:"stream_buffer_size,omitempty"` +} + // RegionConfig lists available regions and their latitude/longitude, so the selector would prefer // regions that are closer type RegionConfig struct { @@ -484,6 +493,12 @@ var DefaultConfig = Config{ MaxRetryInterval: 4 * time.Second, StreamBufferSize: 1000, }, + PSRPC: PSRPCConfig{ + MaxAttempts: 3, + Timeout: 500 * time.Millisecond, + Backoff: 500 * time.Millisecond, + BufferSize: 1000, + }, Keys: map[string]string{}, } diff --git a/pkg/routing/roomclient.go b/pkg/routing/roomclient.go new file mode 100644 index 000000000..d511fc282 --- /dev/null +++ b/pkg/routing/roomclient.go @@ -0,0 +1,27 @@ +package routing + +import ( + "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + protopsrpc "github.com/livekit/protocol/psrpc" + "github.com/livekit/protocol/rpc" + "github.com/livekit/psrpc" + "github.com/livekit/psrpc/pkg/middleware" +) + +func NewRoomClient(nodeID livekit.NodeID, bus psrpc.MessageBus, config config.PSRPCConfig) (rpc.TypedRoomClient, error) { + return rpc.NewTypedRoomClient( + nodeID, + bus, + protopsrpc.WithClientLogger(logger.GetLogger()), + middleware.WithClientMetrics(prometheus.PSRPCMetricsObserver{}), + psrpc.WithClientChannelSize(config.BufferSize), + middleware.WithRPCRetries(middleware.RetryOptions{ + MaxAttempts: config.MaxAttempts, + Timeout: config.Timeout, + Backoff: config.Backoff, + }), + ) +} diff --git a/pkg/routing/topic.go b/pkg/routing/topic.go new file mode 100644 index 000000000..24ebb0bba --- /dev/null +++ b/pkg/routing/topic.go @@ -0,0 +1,22 @@ +package routing + +import ( + "context" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" +) + +type topicFormatter struct{} + +func NewTopicFormatter() rpc.TopicFormatter { + return topicFormatter{} +} + +func (f topicFormatter) ParticipantTopic(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) rpc.ParticipantTopic { + return rpc.FormatParticipantTopic(roomName, identity) +} + +func (f topicFormatter) RoomTopic(ctx context.Context, roomName livekit.RoomName) rpc.RoomTopic { + return rpc.FormatRoomTopic(roomName) +} diff --git a/pkg/rtc/errors.go b/pkg/rtc/errors.go index 656995632..c8f66073b 100644 --- a/pkg/rtc/errors.go +++ b/pkg/rtc/errors.go @@ -27,6 +27,7 @@ var ( ErrEmptyIdentity = errors.New("participant identity cannot be empty") ErrEmptyParticipantID = errors.New("participant ID cannot be empty") ErrMissingGrants = errors.New("VideoGrant is missing") + ErrInternalError = errors.New("internal error") // Track subscription related ErrNoTrackPermission = errors.New("participant is not allowed to subscribe to this track") diff --git a/pkg/rtc/helper_test.go b/pkg/rtc/helper_test.go index c47b5a658..4a0087ea1 100644 --- a/pkg/rtc/helper_test.go +++ b/pkg/rtc/helper_test.go @@ -64,8 +64,9 @@ func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.Pro } } - p.SetTrackMutedCalls(func(sid livekit.TrackID, muted bool, fromServer bool) { + p.SetTrackMutedCalls(func(sid livekit.TrackID, muted bool, fromServer bool) *livekit.TrackInfo { updateTrack() + return nil }) p.AddTrackCalls(func(req *livekit.AddTrackRequest) { updateTrack() diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 38fac3f58..f09b13d2f 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1670,16 +1670,16 @@ func (p *ParticipantImpl) sendTrackPublished(cid string, ti *livekit.TrackInfo) }) } -func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) { +func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) *livekit.TrackInfo { // when request is coming from admin, send message to current participant if fromAdmin { p.sendTrackMuted(trackID, muted) } - p.setTrackMuted(trackID, muted) + return p.setTrackMuted(trackID, muted) } -func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) { +func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) *livekit.TrackInfo { p.dirty.Store(true) p.supervisor.SetPublicationMute(trackID, muted) @@ -1713,6 +1713,8 @@ func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) { if !isPending && track == nil { p.pubLogger.Warnw("could not locate track", nil, "trackID", trackID) } + + return trackInfo } func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) (*MediaTrack, bool) { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 3a971fce4..da3ee7178 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -87,6 +87,7 @@ const ( ParticipantCloseReasonVerifyFailed ParticipantCloseReasonJoinFailed ParticipantCloseReasonJoinTimeout + ParticipantCloseReasonMessageBusFailed ParticipantCloseReasonStateDisconnected ParticipantCloseReasonPeerConnectionDisconnected ParticipantCloseReasonDuplicateIdentity @@ -119,6 +120,8 @@ func (p ParticipantCloseReason) String() string { return "JOIN_FAILED" case ParticipantCloseReasonJoinTimeout: return "JOIN_TIMEOUT" + case ParticipantCloseReasonMessageBusFailed: + return "MESSAGE_BUS_FAILED" case ParticipantCloseReasonStateDisconnected: return "STATE_DISCONNECTED" case ParticipantCloseReasonPeerConnectionDisconnected: @@ -162,7 +165,7 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason { return livekit.DisconnectReason_CLIENT_INITIATED case ParticipantCloseReasonRoomManagerStop: return livekit.DisconnectReason_SERVER_SHUTDOWN - case ParticipantCloseReasonVerifyFailed, ParticipantCloseReasonJoinFailed, ParticipantCloseReasonJoinTimeout: + case ParticipantCloseReasonVerifyFailed, ParticipantCloseReasonJoinFailed, ParticipantCloseReasonJoinTimeout, ParticipantCloseReasonMessageBusFailed: // expected to be connected but is not return livekit.DisconnectReason_JOIN_FAILURE case ParticipantCloseReasonPeerConnectionDisconnected: @@ -335,7 +338,7 @@ type LocalParticipant interface { AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) HandleOffer(sdp webrtc.SessionDescription) AddTrack(req *livekit.AddTrackRequest) - SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) + SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) *livekit.TrackInfo HandleAnswer(sdp webrtc.SessionDescription) Negotiate(force bool) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index ac0b18932..4020502be 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -738,13 +738,19 @@ type FakeLocalParticipant struct { setSubscriberChannelCapacityArgsForCall []struct { arg1 int64 } - SetTrackMutedStub func(livekit.TrackID, bool, bool) + SetTrackMutedStub func(livekit.TrackID, bool, bool) *livekit.TrackInfo setTrackMutedMutex sync.RWMutex setTrackMutedArgsForCall []struct { arg1 livekit.TrackID arg2 bool arg3 bool } + setTrackMutedReturns struct { + result1 *livekit.TrackInfo + } + setTrackMutedReturnsOnCall map[int]struct { + result1 *livekit.TrackInfo + } StartStub func() startMutex sync.RWMutex startArgsForCall []struct { @@ -4877,19 +4883,25 @@ func (fake *FakeLocalParticipant) SetSubscriberChannelCapacityArgsForCall(i int) return argsForCall.arg1 } -func (fake *FakeLocalParticipant) SetTrackMuted(arg1 livekit.TrackID, arg2 bool, arg3 bool) { +func (fake *FakeLocalParticipant) SetTrackMuted(arg1 livekit.TrackID, arg2 bool, arg3 bool) *livekit.TrackInfo { fake.setTrackMutedMutex.Lock() + ret, specificReturn := fake.setTrackMutedReturnsOnCall[len(fake.setTrackMutedArgsForCall)] fake.setTrackMutedArgsForCall = append(fake.setTrackMutedArgsForCall, struct { arg1 livekit.TrackID arg2 bool arg3 bool }{arg1, arg2, arg3}) stub := fake.SetTrackMutedStub + fakeReturns := fake.setTrackMutedReturns fake.recordInvocation("SetTrackMuted", []interface{}{arg1, arg2, arg3}) fake.setTrackMutedMutex.Unlock() if stub != nil { - fake.SetTrackMutedStub(arg1, arg2, arg3) + return stub(arg1, arg2, arg3) } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 } func (fake *FakeLocalParticipant) SetTrackMutedCallCount() int { @@ -4898,7 +4910,7 @@ func (fake *FakeLocalParticipant) SetTrackMutedCallCount() int { return len(fake.setTrackMutedArgsForCall) } -func (fake *FakeLocalParticipant) SetTrackMutedCalls(stub func(livekit.TrackID, bool, bool)) { +func (fake *FakeLocalParticipant) SetTrackMutedCalls(stub func(livekit.TrackID, bool, bool) *livekit.TrackInfo) { fake.setTrackMutedMutex.Lock() defer fake.setTrackMutedMutex.Unlock() fake.SetTrackMutedStub = stub @@ -4911,6 +4923,29 @@ func (fake *FakeLocalParticipant) SetTrackMutedArgsForCall(i int) (livekit.Track return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } +func (fake *FakeLocalParticipant) SetTrackMutedReturns(result1 *livekit.TrackInfo) { + fake.setTrackMutedMutex.Lock() + defer fake.setTrackMutedMutex.Unlock() + fake.SetTrackMutedStub = nil + fake.setTrackMutedReturns = struct { + result1 *livekit.TrackInfo + }{result1} +} + +func (fake *FakeLocalParticipant) SetTrackMutedReturnsOnCall(i int, result1 *livekit.TrackInfo) { + fake.setTrackMutedMutex.Lock() + defer fake.setTrackMutedMutex.Unlock() + fake.SetTrackMutedStub = nil + if fake.setTrackMutedReturnsOnCall == nil { + fake.setTrackMutedReturnsOnCall = make(map[int]struct { + result1 *livekit.TrackInfo + }) + } + fake.setTrackMutedReturnsOnCall[i] = struct { + result1 *livekit.TrackInfo + }{result1} +} + func (fake *FakeLocalParticipant) Start() { fake.startMutex.Lock() fake.startArgsForCall = append(fake.startArgsForCall, struct { diff --git a/pkg/service/errors.go b/pkg/service/errors.go index 92093d682..7c27d0dac 100644 --- a/pkg/service/errors.go +++ b/pkg/service/errors.go @@ -19,18 +19,19 @@ import ( ) var ( - ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress does not exist") - ErrEgressNotConnected = psrpc.NewErrorf(psrpc.Internal, "egress not connected (redis required)") - ErrIdentityEmpty = psrpc.NewErrorf(psrpc.InvalidArgument, "identity cannot be empty") - ErrIngressNotConnected = psrpc.NewErrorf(psrpc.Internal, "ingress not connected (redis required)") - ErrIngressNotFound = psrpc.NewErrorf(psrpc.NotFound, "ingress does not exist") - ErrIngressNonReusable = psrpc.NewErrorf(psrpc.InvalidArgument, "ingress is not reusable and cannot be modified") - ErrMetadataExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "metadata size exceeds limits") - ErrOperationFailed = psrpc.NewErrorf(psrpc.Internal, "operation cannot be completed") - ErrParticipantNotFound = psrpc.NewErrorf(psrpc.NotFound, "participant does not exist") - ErrRoomNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested room does not exist") - ErrRoomLockFailed = psrpc.NewErrorf(psrpc.Internal, "could not lock room") - ErrRoomUnlockFailed = psrpc.NewErrorf(psrpc.Internal, "could not unlock room, lock token does not match") - ErrTrackNotFound = psrpc.NewErrorf(psrpc.NotFound, "track is not found") - ErrWebHookMissingAPIKey = psrpc.NewErrorf(psrpc.InvalidArgument, "api_key is required to use webhooks") + ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress does not exist") + ErrEgressNotConnected = psrpc.NewErrorf(psrpc.Internal, "egress not connected (redis required)") + ErrIdentityEmpty = psrpc.NewErrorf(psrpc.InvalidArgument, "identity cannot be empty") + ErrIngressNotConnected = psrpc.NewErrorf(psrpc.Internal, "ingress not connected (redis required)") + ErrIngressNotFound = psrpc.NewErrorf(psrpc.NotFound, "ingress does not exist") + ErrIngressNonReusable = psrpc.NewErrorf(psrpc.InvalidArgument, "ingress is not reusable and cannot be modified") + ErrMetadataExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "metadata size exceeds limits") + ErrOperationFailed = psrpc.NewErrorf(psrpc.Internal, "operation cannot be completed") + ErrParticipantNotFound = psrpc.NewErrorf(psrpc.NotFound, "participant does not exist") + ErrRoomNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested room does not exist") + ErrRoomLockFailed = psrpc.NewErrorf(psrpc.Internal, "could not lock room") + ErrRoomUnlockFailed = psrpc.NewErrorf(psrpc.Internal, "could not unlock room, lock token does not match") + ErrRemoteUnmuteNoteEnabled = psrpc.NewErrorf(psrpc.FailedPrecondition, "remote unmute not enabled") + ErrTrackNotFound = psrpc.NewErrorf(psrpc.NotFound, "track is not found") + ErrWebHookMissingAPIKey = psrpc.NewErrorf(psrpc.InvalidArgument, "api_key is required to use webhooks") ) diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index 71fe77a9d..cc8c2e146 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -70,7 +70,7 @@ type RedisStore struct { func NewRedisStore(rc redis.UniversalClient) *RedisStore { unlockScript := `if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) - else return 0 + else return 0 end` return &RedisStore{ diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 5797142e3..71b9cf5f9 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pkg/errors" + "golang.org/x/exp/maps" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/livekit-server/version" @@ -29,7 +30,9 @@ import ( "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" + "github.com/livekit/psrpc" "github.com/livekit/livekit-server/pkg/clientconfiguration" "github.com/livekit/livekit-server/pkg/config" @@ -67,6 +70,7 @@ type RoomManager struct { egressLauncher rtc.EgressLauncher versionGenerator utils.TimedVersionGenerator turnAuthHandler *TURNAuthHandler + roomServer rpc.TypedRoomServer rooms map[livekit.RoomName]*rtc.Room @@ -83,6 +87,7 @@ func NewLocalRoomManager( egressLauncher rtc.EgressLauncher, versionGenerator utils.TimedVersionGenerator, turnAuthHandler *TURNAuthHandler, + bus psrpc.MessageBus, ) (*RoomManager, error) { rtcConf, err := rtc.NewWebRTCConfig(conf) if err != nil { @@ -114,6 +119,11 @@ func NewLocalRoomManager( }, } + r.roomServer, err = rpc.NewTypedRoomServer(livekit.NodeID(r.currentNode.Id), r, bus) + if err != nil { + return nil, err + } + // hook up to router router.OnNewParticipantRTC(r.StartSession) router.OnRTCMessage(r.handleRTCMessage) @@ -126,8 +136,8 @@ func (r *RoomManager) GetRoom(_ context.Context, roomName livekit.RoomName) *rtc return r.rooms[roomName] } -// DeleteRoom completely deletes all room information, including active sessions, room store, and routing info -func (r *RoomManager) DeleteRoom(ctx context.Context, roomName livekit.RoomName) error { +// deleteRoom completely deletes all room information, including active sessions, room store, and routing info +func (r *RoomManager) deleteRoom(ctx context.Context, roomName livekit.RoomName) error { logger.Infow("deleting room state", "room", roomName) r.lock.Lock() delete(r.rooms, roomName) @@ -167,7 +177,7 @@ func (r *RoomManager) CleanupRooms() error { now := time.Now().Unix() for _, room := range rooms { if (now - room.CreationTime) > roomPurgeSeconds { - if err := r.DeleteRoom(ctx, livekit.RoomName(room.Name)); err != nil { + if err := r.deleteRoom(ctx, livekit.RoomName(room.Name)); err != nil { return err } } @@ -177,10 +187,7 @@ func (r *RoomManager) CleanupRooms() error { func (r *RoomManager) CloseIdleRooms() { r.lock.RLock() - rooms := make([]*rtc.Room, 0, len(r.rooms)) - for _, rm := range r.rooms { - rooms = append(rooms, rm) - } + rooms := maps.Values(r.rooms) r.lock.RUnlock() for _, room := range rooms { @@ -203,10 +210,7 @@ func (r *RoomManager) HasParticipants() bool { func (r *RoomManager) Stop() { // disconnect all clients r.lock.RLock() - rooms := make([]*rtc.Room, 0, len(r.rooms)) - for _, rm := range r.rooms { - rooms = append(rooms, rm) - } + rooms := maps.Values(r.rooms) r.lock.RUnlock() for _, room := range rooms { @@ -216,6 +220,8 @@ func (r *RoomManager) Stop() { room.Close() } + r.roomServer.Kill() + if r.rtcConfig != nil { if r.rtcConfig.UDPMux != nil { _ = r.rtcConfig.UDPMux.Close() @@ -426,6 +432,11 @@ func (r *RoomManager) StartSession( _ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false) return err } + if err := r.roomServer.RegisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())); err != nil { + pLogger.Errorw("could not join register participant topic", err) + _ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false) + return err + } if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil { pLogger.Errorw("could not store participant", err) } @@ -449,6 +460,8 @@ func (r *RoomManager) StartSession( pLogger.Errorw("could not delete participant", err) } + r.roomServer.DeregisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())) + // update room store with new numParticipants proto := room.ToProto() persistRoomForParticipantCount(proto) @@ -489,6 +502,10 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room return nil, err } + if err := r.roomServer.RegisterAllRoomTopics(rpc.FormatRoomTopic(roomName)); err != nil { + return nil, err + } + r.lock.Lock() currentRoom := r.rooms[roomName] @@ -507,10 +524,12 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry, r.egressLauncher) newRoom.OnClose(func() { + r.roomServer.DeregisterAllRoomTopics(rpc.FormatRoomTopic(roomName)) + roomInfo := newRoom.ToProto() r.telemetry.RoomEnded(ctx, roomInfo) prometheus.RoomEnded(time.Unix(roomInfo.CreationTime, 0)) - if err := r.DeleteRoom(ctx, roomName); err != nil { + if err := r.deleteRoom(ctx, roomName); err != nil { newRoom.Logger.Errorw("could not delete room", err) } @@ -602,96 +621,152 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa // handles RTC messages resulted from Room API calls func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) { - r.lock.RLock() - room := r.rooms[roomName] - r.lock.RUnlock() - - if room == nil { - if _, ok := msg.Message.(*livekit.RTCNodeMessage_DeleteRoom); ok { - // special case of a non-RTC room e.g. room created but no participants joined - logger.Debugw("Deleting non-rtc room, loading from roomstore") - err := r.roomStore.DeleteRoom(ctx, roomName) - if err != nil { - logger.Debugw("Error deleting non-rtc room", "err", err) - } - return - } else { - logger.Warnw("Could not find room", nil, "room", roomName) - return - } - } - - participant := room.GetParticipant(identity) - var sid livekit.ParticipantID - if participant != nil { - sid = participant.ID() - } - pLogger := rtc.LoggerWithParticipant( - rtc.LoggerWithRoom(logger.GetLogger(), roomName, room.ID()), - identity, - sid, - false, - ) - switch rm := msg.Message.(type) { case *livekit.RTCNodeMessage_RemoveParticipant: - if participant == nil { - return - } - pLogger.Infow("removing participant") - // remove participant by identity, any SID - room.RemoveParticipant(identity, "", types.ParticipantCloseReasonServiceRequestRemoveParticipant) + r.RemoveParticipant(ctx, rm.RemoveParticipant) case *livekit.RTCNodeMessage_MuteTrack: - if participant == nil { - return - } - pLogger.Debugw("setting track muted", - "trackID", rm.MuteTrack.TrackSid, "muted", rm.MuteTrack.Muted) - if !rm.MuteTrack.Muted && !r.config.Room.EnableRemoteUnmute { - pLogger.Errorw("cannot unmute track, remote unmute is disabled", nil) - return - } - participant.SetTrackMuted(livekit.TrackID(rm.MuteTrack.TrackSid), rm.MuteTrack.Muted, true) + r.MutePublishedTrack(ctx, rm.MuteTrack) case *livekit.RTCNodeMessage_UpdateParticipant: - if participant == nil { - return - } - pLogger.Debugw("updating participant", "metadata", rm.UpdateParticipant.Metadata, - "permission", rm.UpdateParticipant.Permission) - room.UpdateParticipantMetadata(participant, rm.UpdateParticipant.Name, rm.UpdateParticipant.Metadata) - if rm.UpdateParticipant.Permission != nil { - participant.SetPermission(rm.UpdateParticipant.Permission) - } + r.UpdateParticipant(ctx, rm.UpdateParticipant) case *livekit.RTCNodeMessage_DeleteRoom: + r.DeleteRoom(ctx, rm.DeleteRoom) + case *livekit.RTCNodeMessage_UpdateSubscriptions: + r.UpdateSubscriptions(ctx, rm.UpdateSubscriptions) + case *livekit.RTCNodeMessage_SendData: + r.SendData(ctx, rm.SendData) + case *livekit.RTCNodeMessage_UpdateRoomMetadata: + r.UpdateRoomMetadata(ctx, rm.UpdateRoomMetadata) + } +} + +func (r *RoomManager) roomLogger(room *rtc.Room) logger.Logger { + return rtc.LoggerWithParticipant(rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()), "", "", false) +} + +func (r *RoomManager) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.RemoveParticipantResponse, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + return nil, ErrRoomNotFound + } + + participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity)) + if participant == nil { + return nil, ErrParticipantNotFound + } + + participant.GetLogger().Infow("removing participant") + room.RemoveParticipant(livekit.ParticipantIdentity(req.Identity), "", types.ParticipantCloseReasonServiceRequestRemoveParticipant) + return &livekit.RemoveParticipantResponse{}, nil +} + +func (r *RoomManager) MutePublishedTrack(ctx context.Context, req *livekit.MuteRoomTrackRequest) (*livekit.MuteRoomTrackResponse, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + return nil, ErrRoomNotFound + } + + participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity)) + if participant == nil { + return nil, ErrParticipantNotFound + } + + participant.GetLogger().Debugw("setting track muted", + "trackID", req.TrackSid, "muted", req.Muted) + if !req.Muted && !r.config.Room.EnableRemoteUnmute { + participant.GetLogger().Errorw("cannot unmute track, remote unmute is disabled", nil) + return nil, ErrRemoteUnmuteNoteEnabled + } + track := participant.SetTrackMuted(livekit.TrackID(req.TrackSid), req.Muted, true) + return &livekit.MuteRoomTrackResponse{Track: track}, nil +} + +func (r *RoomManager) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + return nil, ErrRoomNotFound + } + + participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity)) + if participant == nil { + return nil, ErrParticipantNotFound + } + + participant.GetLogger().Debugw("updating participant", + "metadata", req.Metadata, "permission", req.Permission) + room.UpdateParticipantMetadata(participant, req.Name, req.Metadata) + if req.Permission != nil { + participant.SetPermission(req.Permission) + } + return participant.ToProto(), nil +} + +func (r *RoomManager) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomRequest) (*livekit.DeleteRoomResponse, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + // special case of a non-RTC room e.g. room created but no participants joined + logger.Debugw("Deleting non-rtc room, loading from roomstore") + err := r.roomStore.DeleteRoom(ctx, livekit.RoomName(req.Room)) + if err != nil { + logger.Debugw("Error deleting non-rtc room", "err", err) + return nil, err + } + } else { room.Logger.Infow("deleting room") for _, p := range room.GetParticipants() { _ = p.Close(true, types.ParticipantCloseReasonServiceRequestDeleteRoom, false) } room.Close() - case *livekit.RTCNodeMessage_UpdateSubscriptions: - if participant == nil { - return - } - pLogger.Debugw("updating participant subscriptions") - room.UpdateSubscriptions( - participant, - livekit.StringsAsIDs[livekit.TrackID](rm.UpdateSubscriptions.TrackSids), - rm.UpdateSubscriptions.ParticipantTracks, - rm.UpdateSubscriptions.Subscribe, - ) - case *livekit.RTCNodeMessage_SendData: - pLogger.Debugw("api send data", "size", len(rm.SendData.Data)) - up := &livekit.UserPacket{ - Payload: rm.SendData.Data, - DestinationSids: rm.SendData.DestinationSids, - DestinationIdentities: rm.SendData.DestinationIdentities, - Topic: rm.SendData.Topic, - } - room.SendDataPacket(up, rm.SendData.Kind) - case *livekit.RTCNodeMessage_UpdateRoomMetadata: - pLogger.Debugw("updating room") - room.SetMetadata(rm.UpdateRoomMetadata.Metadata) } + return &livekit.DeleteRoomResponse{}, nil +} + +func (r *RoomManager) UpdateSubscriptions(ctx context.Context, req *livekit.UpdateSubscriptionsRequest) (*livekit.UpdateSubscriptionsResponse, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + return nil, ErrRoomNotFound + } + + participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity)) + if participant == nil { + return nil, ErrParticipantNotFound + } + + participant.GetLogger().Debugw("updating participant subscriptions") + room.UpdateSubscriptions( + participant, + livekit.StringsAsIDs[livekit.TrackID](req.TrackSids), + req.ParticipantTracks, + req.Subscribe, + ) + return &livekit.UpdateSubscriptionsResponse{}, nil +} + +func (r *RoomManager) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + return nil, ErrRoomNotFound + } + + r.roomLogger(room).Debugw("api send data", "size", len(req.Data)) + up := &livekit.UserPacket{ + Payload: req.Data, + DestinationSids: req.DestinationSids, + DestinationIdentities: req.DestinationIdentities, + Topic: req.Topic, + } + room.SendDataPacket(up, req.Kind) + return &livekit.SendDataResponse{}, nil +} + +func (r *RoomManager) UpdateRoomMetadata(ctx context.Context, req *livekit.UpdateRoomMetadataRequest) (*livekit.Room, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.Room)) + if room == nil { + return nil, ErrRoomNotFound + } + + r.roomLogger(room).Debugw("updating room") + room.SetMetadata(req.Metadata) + return room.ToProto(), nil } func (r *RoomManager) iceServersForParticipant(apiKey string, participant types.LocalParticipant, tlsOnly bool) []*livekit.ICEServer { diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index d59d3ce82..fe49550f9 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -37,27 +37,36 @@ import ( type RoomService struct { roomConf config.RoomConfig apiConf config.APIConfig + psrpcConf config.PSRPCConfig router routing.MessageRouter roomAllocator RoomAllocator roomStore ServiceStore egressLauncher rtc.EgressLauncher + topicFormatter rpc.TopicFormatter + roomClient rpc.TypedRoomClient } func NewRoomService( roomConf config.RoomConfig, apiConf config.APIConfig, + psrpcConf config.PSRPCConfig, router routing.MessageRouter, roomAllocator RoomAllocator, serviceStore ServiceStore, egressLauncher rtc.EgressLauncher, + topicFormatter rpc.TopicFormatter, + roomClient rpc.TypedRoomClient, ) (svc *RoomService, err error) { svc = &RoomService{ roomConf: roomConf, apiConf: apiConf, + psrpcConf: psrpcConf, router: router, roomAllocator: roomAllocator, roomStore: serviceStore, egressLauncher: egressLauncher, + topicFormatter: topicFormatter, + roomClient: roomClient, } return } @@ -142,6 +151,10 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq return nil, twirpAuthError(err) } + if s.psrpcConf.Enable { + return s.roomClient.DeleteRoom(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + } + if _, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false); err == ErrRoomNotFound { return nil, twirp.NotFoundError("room not found") } @@ -207,10 +220,18 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.RemoveParticipantResponse, error) { AppendLogFields(ctx, "room", req.Room, "participant", req.Identity) + if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil { + return nil, twirpAuthError(err) + } + if _, err := s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)); err == ErrParticipantNotFound { return nil, twirp.NotFoundError("participant not found") } + if s.psrpcConf.Enable { + return s.roomClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + } + err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_RemoveParticipant{ RemoveParticipant: req, @@ -243,6 +264,10 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR return nil, twirpAuthError(err) } + if s.psrpcConf.Enable { + return s.roomClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + } + err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_MuteTrack{ MuteTrack: req, @@ -289,6 +314,14 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update return nil, twirp.InvalidArgumentError(ErrMetadataExceedsLimits.Error(), strconv.Itoa(maxMetadataSize)) } + if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil { + return nil, twirpAuthError(err) + } + + if s.psrpcConf.Enable { + return s.roomClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + } + err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_UpdateParticipant{ UpdateParticipant: req, @@ -329,6 +362,15 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda trackSIDs = append(trackSIDs, pt.TrackSids...) } AppendLogFields(ctx, "room", req.Room, "participant", req.Identity, "trackID", trackSIDs) + + if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil { + return nil, twirpAuthError(err) + } + + if s.psrpcConf.Enable { + return s.roomClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + } + err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_UpdateSubscriptions{ UpdateSubscriptions: req, @@ -348,6 +390,10 @@ func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest return nil, twirpAuthError(err) } + if s.psrpcConf.Enable { + return s.roomClient.SendData(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + } + err := s.router.WriteRoomRTC(ctx, roomName, &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_SendData{ SendData: req, @@ -386,13 +432,20 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return nil, err } - err = s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{ - Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{ - UpdateRoomMetadata: req, - }, - }) - if err != nil { - return nil, err + if s.psrpcConf.Enable { + _, err := s.roomClient.UpdateRoomMetadata(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + if err != nil { + return nil, err + } + } else { + err = s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{ + Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{ + UpdateRoomMetadata: req, + }, + }) + if err != nil { + return nil, err + } } err = s.confirmExecution(func() error { diff --git a/pkg/service/roomservice_test.go b/pkg/service/roomservice_test.go index a7433a090..bf25870e7 100644 --- a/pkg/service/roomservice_test.go +++ b/pkg/service/roomservice_test.go @@ -23,8 +23,10 @@ import ( "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc/rpcfakes" "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/routing/routingfakes" "github.com/livekit/livekit-server/pkg/service" "github.com/livekit/livekit-server/pkg/service/servicefakes" @@ -126,9 +128,17 @@ func newTestRoomService(conf config.RoomConfig) *TestRoomService { router := &routingfakes.FakeRouter{} allocator := &servicefakes.FakeRoomAllocator{} store := &servicefakes.FakeServiceStore{} - svc, err := service.NewRoomService(conf, + svc, err := service.NewRoomService( + conf, config.APIConfig{ExecutionTimeout: 2}, - router, allocator, store, nil) + config.PSRPCConfig{}, + router, + allocator, + store, + nil, + routing.NewTopicFormatter(), + &rpcfakes.FakeTypedRoomClient{}, + ) if err != nil { panic(err) } diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 05502a16c..0aa93088b 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -73,6 +73,9 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live getSignalRelayConfig, NewDefaultSignalServer, routing.NewSignalClient, + getPSRPCConfig, + routing.NewTopicFormatter, + routing.NewRoomClient, NewLocalRoomManager, NewTURNAuthHandler, getTURNAuthHandlerFunc, @@ -197,6 +200,10 @@ func getSignalRelayConfig(config *config.Config) config.SignalRelayConfig { return config.SignalRelay } +func getPSRPCConfig(config *config.Config) config.PSRPCConfig { + return config.PSRPC +} + func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { return NewTurnServer(conf, authHandler, false) } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index fa788a829..69412c1fe 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -35,6 +35,7 @@ import ( func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) { roomConfig := getRoomConf(conf) apiConfig := config.DefaultAPIConfig() + psrpcConfig := getPSRPCConfig(conf) universalClient, err := createRedisClient(conf) if err != nil { return nil, err @@ -73,7 +74,12 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService) - roomService, err := NewRoomService(roomConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher) + topicFormatter := routing.NewTopicFormatter() + roomClient, err := routing.NewRoomClient(nodeID, messageBus, psrpcConfig) + if err != nil { + return nil, err + } + roomService, err := NewRoomService(roomConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient) if err != nil { return nil, err } @@ -88,7 +94,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live clientConfigurationManager := createClientConfiguration() timedVersionGenerator := utils.NewDefaultTimedVersionGenerator() turnAuthHandler := NewTURNAuthHandler(keyProvider) - roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler) + roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus) if err != nil { return nil, err } @@ -227,6 +233,10 @@ func getSignalRelayConfig(config2 *config.Config) config.SignalRelayConfig { return config2.SignalRelay } +func getPSRPCConfig(config2 *config.Config) config.PSRPCConfig { + return config2.PSRPC +} + func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { return NewTurnServer(conf, authHandler, false) } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index f37bea03f..887c48566 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1691,7 +1691,7 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in tp.shouldDrop = true if f.started && result.IsRelevant { // call to update highest incoming sequence number and other internal structures - if _, err := f.rtpMunger.UpdateAndGetSnTs(extPkt); err == nil { + if tpRTP, err := f.rtpMunger.UpdateAndGetSnTs(extPkt); err == nil && tpRTP.snOrdering == SequenceNumberOrderingContiguous { f.rtpMunger.PacketDropped(extPkt) } }