From 3f2f850bdb5cbf87b79f7d549872ed512f2109ec Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sun, 14 Jan 2024 01:49:26 -0800 Subject: [PATCH] clean up legacy rpc (#2384) * clean up legacy rpc * cleanup * cleanup * cleanup * tidy * cleanup * cleanup --- config-sample.yaml | 4 - pkg/config/config.go | 2 - pkg/routing/interfaces.go | 15 +- pkg/routing/localrouter.go | 129 +--------- pkg/routing/redisrouter.go | 310 ++---------------------- pkg/routing/routingfakes/fake_router.go | 236 ------------------ pkg/service/roommanager.go | 29 +-- pkg/service/roomservice.go | 201 +-------------- pkg/service/roomservice_test.go | 20 -- pkg/service/signal.go | 30 +-- pkg/service/signal_test.go | 1 - pkg/service/wire_gen.go | 4 +- test/integration_helpers.go | 1 - 13 files changed, 45 insertions(+), 937 deletions(-) diff --git a/config-sample.yaml b/config-sample.yaml index 4a70f2ae8..0d001bd10 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -183,8 +183,6 @@ keys: # since v1.4.0, a more reliable, psrpc based signal relay is available # this gives us the ability to reliably proxy messages between a signal server and RTC node # signal_relay: -# # enabled by default as of v1.5.0, legacy signal proxy will be removed in v1.6 -# enabled: true # # amount of time a message delivery is tried before giving up # retry_timeout: 30s # # minimum amount of time to wait for RTC node to ack, @@ -199,8 +197,6 @@ keys: # PSRPC # since v1.5.1, a more reliable, psrpc based internal rpc # psrpc: -# # enable the psrpc internal api client for roomservice calls -# enabled: true # # maximum number of rpc attempts # max_attempts: 3 # # initial time to wait for calls to complete diff --git a/pkg/config/config.go b/pkg/config/config.go index be9920cce..1a515f86f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -268,7 +268,6 @@ type NodeSelectorConfig struct { } type SignalRelayConfig struct { - Enabled bool `yaml:"enabled,omitempty"` RetryTimeout time.Duration `yaml:"retry_timeout,omitempty"` MinRetryInterval time.Duration `yaml:"min_retry_interval,omitempty"` MaxRetryInterval time.Duration `yaml:"max_retry_interval,omitempty"` @@ -487,7 +486,6 @@ var DefaultConfig = Config{ CPULoadLimit: 0.9, }, SignalRelay: SignalRelayConfig{ - Enabled: true, RetryTimeout: 7500 * time.Millisecond, MinRetryInterval: 500 * time.Millisecond, MaxRetryInterval: 4 * time.Second, diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 37985bb9d..0b4f31c41 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -21,7 +21,6 @@ import ( "github.com/redis/go-redis/v9" "google.golang.org/protobuf/proto" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -99,12 +98,6 @@ type Router interface { Start() error Drain() Stop() - - // OnNewParticipantRTC is called to start a new participant's RTC connection - OnNewParticipantRTC(callback NewParticipantCallback) - - // OnRTCMessage is called to execute actions on the RTC node - OnRTCMessage(callback RTCMessageCallback) } type StartParticipantSignalResults struct { @@ -118,17 +111,13 @@ type StartParticipantSignalResults struct { type MessageRouter interface { // StartParticipantSignal participant signal connection is ready to start StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error) - - // Write a message to a participant or room - WriteParticipantRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error - WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error } -func CreateRouter(config *config.Config, rc redis.UniversalClient, node LocalNode, signalClient SignalClient) Router { +func CreateRouter(rc redis.UniversalClient, node LocalNode, signalClient SignalClient) Router { lr := NewLocalRouter(node, signalClient) if rc != nil { - return NewRedisRouter(config, lr, rc) + return NewRedisRouter(lr, rc) } // local routing and store diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index d83f5ea60..85279ead0 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -26,8 +26,7 @@ import ( "github.com/livekit/protocol/logger" ) -// aggregated channel for all participants -const localRTCChannelSize = 10000 +var _ Router = (*LocalRouter)(nil) // a router of messages on the same node, basic implementation for local testing type LocalRouter struct { @@ -39,11 +38,6 @@ type LocalRouter struct { requestChannels map[string]*MessageChannel responseChannels map[string]*MessageChannel isStarted atomic.Bool - - rtcMessageChan *MessageChannel - - onNewParticipant NewParticipantCallback - onRTCMessage RTCMessageCallback } func NewLocalRouter(currentNode LocalNode, signalClient SignalClient) *LocalRouter { @@ -52,7 +46,6 @@ func NewLocalRouter(currentNode LocalNode, signalClient SignalClient) *LocalRout signalClient: signalClient, requestChannels: make(map[string]*MessageChannel), responseChannels: make(map[string]*MessageChannel), - rtcMessageChan: NewMessageChannel(livekit.ConnectionID("local"), localRTCChannelSize), } } @@ -68,7 +61,6 @@ func (r *LocalRouter) SetNodeForRoom(_ context.Context, _ livekit.RoomName, _ li } func (r *LocalRouter) ClearRoomState(_ context.Context, _ livekit.RoomName) error { - // do nothing return nil } @@ -120,65 +112,22 @@ func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, room return } -func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error { - r.lock.Lock() - if r.rtcMessageChan.IsClosed() { - // create a new one - r.rtcMessageChan = NewMessageChannel(livekit.ConnectionID("local"), localRTCChannelSize) - } - r.lock.Unlock() - msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, identity)) - msg.ParticipantKeyB62 = string(ParticipantKey(roomName, identity)) - return r.writeRTCMessage(r.rtcMessageChan, msg) -} - -func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error { - msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, "")) - msg.ParticipantKeyB62 = string(ParticipantKey(roomName, "")) - return r.WriteNodeRTC(ctx, r.currentNode.Id, msg) -} - -func (r *LocalRouter) WriteNodeRTC(_ context.Context, _ string, msg *livekit.RTCNodeMessage) error { - r.lock.Lock() - if r.rtcMessageChan.IsClosed() { - // create a new one - r.rtcMessageChan = NewMessageChannel(livekit.ConnectionID("local"), localRTCChannelSize) - } - r.lock.Unlock() - return r.writeRTCMessage(r.rtcMessageChan, msg) -} - -func (r *LocalRouter) writeRTCMessage(sink MessageSink, msg *livekit.RTCNodeMessage) error { - msg.SenderTime = time.Now().Unix() - return sink.WriteMessage(msg) -} - -func (r *LocalRouter) OnNewParticipantRTC(callback NewParticipantCallback) { - r.onNewParticipant = callback -} - -func (r *LocalRouter) OnRTCMessage(callback RTCMessageCallback) { - r.onRTCMessage = callback -} - func (r *LocalRouter) Start() error { if r.isStarted.Swap(true) { return nil } go r.statsWorker() // go r.memStatsWorker() - // on local routers, Start doesn't do anything, websocket connections initiate the connections - go r.rtcMessageWorker() return nil } func (r *LocalRouter) Drain() { + r.lock.Lock() + defer r.lock.Unlock() r.currentNode.State = livekit.NodeState_SHUTTING_DOWN } -func (r *LocalRouter) Stop() { - r.rtcMessageChan.Close() -} +func (r *LocalRouter) Stop() {} func (r *LocalRouter) GetRegion() string { return r.currentNode.Region @@ -214,73 +163,3 @@ func (r *LocalRouter) statsWorker() { } } */ -func (r *LocalRouter) rtcMessageWorker() { - // is a new channel available? if so swap to that one - if !r.isStarted.Load() { - return - } - - // start a new worker after this finished - defer func() { - go r.rtcMessageWorker() - }() - - r.lock.RLock() - isClosed := r.rtcMessageChan.IsClosed() - r.lock.RUnlock() - if isClosed { - // sleep and retry - time.Sleep(time.Second) - } - - r.lock.RLock() - msgChan := r.rtcMessageChan.ReadChan() - r.lock.RUnlock() - // consume messages from - for msg := range msgChan { - if rtcMsg, ok := msg.(*livekit.RTCNodeMessage); ok { - var room livekit.RoomName - var identity livekit.ParticipantIdentity - var err error - if rtcMsg.ParticipantKeyB62 != "" { - room, identity, err = parseParticipantKey(livekit.ParticipantKey(rtcMsg.ParticipantKeyB62)) - } - if err != nil { - room, identity, err = parseParticipantKeyLegacy(livekit.ParticipantKey(rtcMsg.ParticipantKey)) - } - if err != nil { - logger.Errorw("could not process RTC message", err) - continue - } - if r.onRTCMessage != nil { - r.onRTCMessage(context.Background(), room, identity, rtcMsg) - } - } - } -} - -func (r *LocalRouter) getMessageChannel(target map[string]*MessageChannel, key string) *MessageChannel { - r.lock.RLock() - defer r.lock.RUnlock() - return target[key] -} - -func (r *LocalRouter) getOrCreateMessageChannel(target map[string]*MessageChannel, key string) *MessageChannel { - r.lock.Lock() - defer r.lock.Unlock() - mc := target[key] - - if mc != nil { - return mc - } - - mc = NewMessageChannel(livekit.ConnectionID(key), DefaultMessageChannelSize) - mc.OnClose(func() { - r.lock.Lock() - delete(target, key) - r.lock.Unlock() - }) - target[key] = mc - - return mc -} diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 5a920b9c5..743d63d1c 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -28,9 +28,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/utils" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing/selector" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) @@ -42,17 +40,18 @@ const ( statsMaxDelaySeconds = 30 ) +var _ Router = (*RedisRouter)(nil) + // RedisRouter uses Redis pub/sub to route signaling messages across different nodes // It relies on the RTC node to be the primary driver of the participant connection. // Because type RedisRouter struct { *LocalRouter - rc redis.UniversalClient - usePSRPCSignal bool - ctx context.Context - isStarted atomic.Bool - nodeMu sync.RWMutex + rc redis.UniversalClient + ctx context.Context + isStarted atomic.Bool + nodeMu sync.RWMutex // previous stats for computing averages prevStats *livekit.NodeStats @@ -60,11 +59,10 @@ type RedisRouter struct { cancel func() } -func NewRedisRouter(config *config.Config, lr *LocalRouter, rc redis.UniversalClient) *RedisRouter { +func NewRedisRouter(lr *LocalRouter, rc redis.UniversalClient) *RedisRouter { rr := &RedisRouter{ - LocalRouter: lr, - rc: rc, - usePSRPCSignal: config.SignalRelay.Enabled, + LocalRouter: lr, + rc: rc, } rr.ctx, rr.cancel = context.WithCancel(context.Background()) return rr @@ -163,72 +161,7 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livek return } - if r.usePSRPCSignal { - res, err = r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(rtcNode.Id)) - if err != nil { - return - } - - // map signal & rtc nodes - err = r.setParticipantSignalNode(res.ConnectionID, r.currentNode.Id) - return - } - - res.ConnectionID = livekit.ConnectionID(utils.NewGuid("CO_")) - pKey := ParticipantKeyLegacy(roomName, pi.Identity) - pKeyB62 := ParticipantKey(roomName, pi.Identity) - - // map signal & rtc nodes - if err = r.setParticipantSignalNode(res.ConnectionID, r.currentNode.Id); err != nil { - return - } - - // index by connectionID, since there may be multiple connections for the participant - // set up response channel before sending StartSession and be ready to receive responses. - resChan := r.getOrCreateMessageChannel(r.responseChannels, string(res.ConnectionID)) - - sink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode.Id), res.ConnectionID, pKey, pKeyB62) - - // serialize claims - ss, err := pi.ToStartSession(roomName, res.ConnectionID) - if err != nil { - return - } - - // sends a message to start session - err = sink.WriteMessage(ss) - if err != nil { - return - } - - res.RequestSink = sink - res.ResponseSource = resChan - return res, nil -} - -func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error { - pkey := ParticipantKeyLegacy(roomName, identity) - pkeyB62 := ParticipantKey(roomName, identity) - rtcNode, err := r.getParticipantRTCNode(pkey, pkeyB62) - if err != nil { - return err - } - - rtcSink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode), "ephemeral", pkey, pkeyB62) - msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, identity)) - msg.ParticipantKeyB62 = string(ParticipantKey(roomName, identity)) - defer rtcSink.Close() - return r.writeRTCMessage(rtcSink, msg) -} - -func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error { - node, err := r.GetNodeForRoom(ctx, roomName) - if err != nil { - return err - } - msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, "")) - msg.ParticipantKeyB62 = string(ParticipantKey(roomName, "")) - return r.WriteNodeRTC(ctx, node.Id, msg) + return r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(rtcNode.Id)) } func (r *RedisRouter) WriteNodeRTC(_ context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error { @@ -237,82 +170,9 @@ func (r *RedisRouter) WriteNodeRTC(_ context.Context, rtcNodeID string, msg *liv return r.writeRTCMessage(rtcSink, msg) } -func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey) error { - prometheus.IncrementParticipantRtcInit(1) - // find the node where the room is hosted at - rtcNode, err := r.GetNodeForRoom(r.ctx, livekit.RoomName(ss.RoomName)) - if err != nil { - return err - } - - if rtcNode.Id != r.currentNode.Id { - err = ErrIncorrectRTCNode - logger.Errorw("called participant on incorrect node", err, - "rtcNode", rtcNode, - ) - return err - } - - if err := r.SetParticipantRTCNode(participantKey, participantKeyB62, rtcNode.Id); err != nil { - return err - } - - // find signal node to send responses back - signalNode, err := r.getParticipantSignalNode(livekit.ConnectionID(ss.ConnectionId)) - if err != nil { - return err - } - - // treat it as a new participant connecting - if r.onNewParticipant == nil { - return ErrHandlerNotDefined - } - - // we do not want to re-use the same response sink - // the previous rtc worker thread is still consuming off of it. - // we'll want to sever the connection and switch to the new one - r.lock.RLock() - var requestChan *MessageChannel - var ok bool - var pkey livekit.ParticipantKey - if participantKeyB62 != "" { - requestChan, ok = r.requestChannels[string(participantKeyB62)] - pkey = participantKeyB62 - } else { - requestChan, ok = r.requestChannels[string(participantKey)] - pkey = participantKey - } - r.lock.RUnlock() - if ok { - requestChan.Close() - } - - pi, err := ParticipantInitFromStartSession(ss, r.currentNode.Region) - if err != nil { - return err - } - - reqChan := r.getOrCreateMessageChannel(r.requestChannels, string(pkey)) - resSink := NewSignalNodeSink(r.rc, livekit.NodeID(signalNode), livekit.ConnectionID(ss.ConnectionId)) - go func() { - err := r.onNewParticipant( - r.ctx, - livekit.RoomName(ss.RoomName), - *pi, - reqChan, - resSink, - ) - if err != nil { - logger.Errorw("could not handle new participant", err, - "room", ss.RoomName, - "participant", ss.Identity, - ) - // cleanup request channels - reqChan.Close() - resSink.Close() - } - }() - return nil +func (r *LocalRouter) writeRTCMessage(sink MessageSink, msg *livekit.RTCNodeMessage) error { + msg.SenderTime = time.Now().Unix() + return sink.WriteMessage(msg) } func (r *RedisRouter) Start() error { @@ -352,58 +212,6 @@ func (r *RedisRouter) Stop() { r.cancel() } -func (r *RedisRouter) SetParticipantRTCNode(participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey, nodeID string) error { - var err error - if participantKey != "" { - err1 := r.rc.Set(r.ctx, participantRTCKey(participantKey), nodeID, participantMappingTTL).Err() - if err1 != nil { - err = errors.Wrap(err, "could not set rtc node") - } - } - if participantKeyB62 != "" { - err2 := r.rc.Set(r.ctx, participantRTCKey(participantKeyB62), nodeID, participantMappingTTL).Err() - if err2 != nil { - err = errors.Wrap(err, "could not set rtc node") - } - } - return err -} - -func (r *RedisRouter) setParticipantSignalNode(connectionID livekit.ConnectionID, nodeID string) error { - if err := r.rc.Set(r.ctx, participantSignalKey(connectionID), nodeID, participantMappingTTL).Err(); err != nil { - return errors.Wrap(err, "could not set signal node") - } - return nil -} - -func (r *RedisRouter) getParticipantRTCNode(participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey) (string, error) { - var val string - var err error - if participantKeyB62 != "" { - val, err = r.rc.Get(r.ctx, participantRTCKey(participantKeyB62)).Result() - if err == redis.Nil { - val, err = r.rc.Get(r.ctx, participantRTCKey(participantKey)).Result() - if err == redis.Nil { - err = ErrNodeNotFound - } - } - } else { - val, err = r.rc.Get(r.ctx, participantRTCKey(participantKey)).Result() - if err == redis.Nil { - err = ErrNodeNotFound - } - } - return val, err -} - -func (r *RedisRouter) getParticipantSignalNode(connectionID livekit.ConnectionID) (nodeID string, err error) { - val, err := r.rc.Get(r.ctx, participantSignalKey(connectionID)).Result() - if err == redis.Nil { - err = ErrNodeNotFound - } - return val, err -} - // update node stats and cleanup func (r *RedisRouter) statsWorker() { goroutineDumped := false @@ -444,9 +252,8 @@ func (r *RedisRouter) redisWorker(startedChan chan struct{}) { }() logger.Debugw("starting redisWorker", "nodeID", r.currentNode.Id) - sigChannel := signalNodeChannel(livekit.NodeID(r.currentNode.Id)) rtcChannel := rtcNodeChannel(livekit.NodeID(r.currentNode.Id)) - r.pubsub = r.rc.Subscribe(r.ctx, sigChannel, rtcChannel) + r.pubsub = r.rc.Subscribe(r.ctx, rtcChannel) close(startedChan) for msg := range r.pubsub.Channel() { @@ -454,20 +261,7 @@ func (r *RedisRouter) redisWorker(startedChan chan struct{}) { return } - if msg.Channel == sigChannel { - sm := livekit.SignalNodeMessage{} - if err := proto.Unmarshal([]byte(msg.Payload), &sm); err != nil { - logger.Errorw("could not unmarshal signal message on sigchan", err) - prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) - continue - } - if err := r.handleSignalMessage(&sm); err != nil { - logger.Errorw("error processing signal message", err) - prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) - continue - } - prometheus.MessageCounter.WithLabelValues("signal", "success").Add(1) - } else if msg.Channel == rtcChannel { + if msg.Channel == rtcChannel { rm := livekit.RTCNodeMessage{} if err := proto.Unmarshal([]byte(msg.Payload), &rm); err != nil { logger.Errorw("could not unmarshal RTC message on rtcchan", err) @@ -484,62 +278,8 @@ func (r *RedisRouter) redisWorker(startedChan chan struct{}) { } } -func (r *RedisRouter) handleSignalMessage(sm *livekit.SignalNodeMessage) error { - connectionID := sm.ConnectionId - - r.lock.RLock() - resSink := r.responseChannels[connectionID] - r.lock.RUnlock() - - // if a client closed the channel, then sent more messages after that, - if resSink == nil { - return nil - } - - switch rmb := sm.Message.(type) { - case *livekit.SignalNodeMessage_Response: - // logger.Debugw("forwarding signal message", - // "connID", connectionID, - // "type", fmt.Sprintf("%T", rmb.Response.Message)) - if err := resSink.WriteMessage(rmb.Response); err != nil { - return err - } - - case *livekit.SignalNodeMessage_EndSession: - // logger.Debugw("received EndSession, closing signal connection", - // "connID", connectionID) - resSink.Close() - } - return nil -} - func (r *RedisRouter) handleRTCMessage(rm *livekit.RTCNodeMessage) error { - pKey := livekit.ParticipantKey(rm.ParticipantKey) - pKeyB62 := livekit.ParticipantKey(rm.ParticipantKeyB62) - - switch rmb := rm.Message.(type) { - case *livekit.RTCNodeMessage_StartSession: - // RTC session should start on this node - if err := r.startParticipantRTC(rmb.StartSession, pKey, pKeyB62); err != nil { - return errors.Wrap(err, "could not start participant") - } - - case *livekit.RTCNodeMessage_Request: - r.lock.RLock() - var requestChan *MessageChannel - if pKeyB62 != "" { - requestChan = r.requestChannels[string(pKeyB62)] - } else { - requestChan = r.requestChannels[string(pKey)] - } - r.lock.RUnlock() - if requestChan == nil { - return ErrChannelClosed - } - if err := requestChan.WriteMessage(rmb.Request); err != nil { - return err - } - + switch rm.Message.(type) { case *livekit.RTCNodeMessage_KeepAlive: if time.Since(time.Unix(rm.SenderTime, 0)) > statsUpdateInterval { logger.Infow("keep alive too old, skipping", "senderTime", rm.SenderTime) @@ -566,24 +306,6 @@ func (r *RedisRouter) handleRTCMessage(rm *livekit.RTCNodeMessage) error { if err := r.RegisterNode(); err != nil { logger.Errorw("could not update node", err) } - - default: - // route it to handler - if r.onRTCMessage != nil { - var roomName livekit.RoomName - var identity livekit.ParticipantIdentity - var err error - if pKeyB62 != "" { - roomName, identity, err = parseParticipantKey(pKeyB62) - } - if err != nil || pKeyB62 == "" { - roomName, identity, err = parseParticipantKeyLegacy(pKey) - } - if err != nil { - return err - } - r.onRTCMessage(r.ctx, roomName, identity, rm) - } } return nil } diff --git a/pkg/routing/routingfakes/fake_router.go b/pkg/routing/routingfakes/fake_router.go index b52f36bb0..88fa9bf58 100644 --- a/pkg/routing/routingfakes/fake_router.go +++ b/pkg/routing/routingfakes/fake_router.go @@ -62,16 +62,6 @@ type FakeRouter struct { result1 []*livekit.Node result2 error } - OnNewParticipantRTCStub func(routing.NewParticipantCallback) - onNewParticipantRTCMutex sync.RWMutex - onNewParticipantRTCArgsForCall []struct { - arg1 routing.NewParticipantCallback - } - OnRTCMessageStub func(routing.RTCMessageCallback) - onRTCMessageMutex sync.RWMutex - onRTCMessageArgsForCall []struct { - arg1 routing.RTCMessageCallback - } RegisterNodeStub func() error registerNodeMutex sync.RWMutex registerNodeArgsForCall []struct { @@ -144,33 +134,6 @@ type FakeRouter struct { unregisterNodeReturnsOnCall map[int]struct { result1 error } - WriteParticipantRTCStub func(context.Context, livekit.RoomName, livekit.ParticipantIdentity, *livekit.RTCNodeMessage) error - writeParticipantRTCMutex sync.RWMutex - writeParticipantRTCArgsForCall []struct { - arg1 context.Context - arg2 livekit.RoomName - arg3 livekit.ParticipantIdentity - arg4 *livekit.RTCNodeMessage - } - writeParticipantRTCReturns struct { - result1 error - } - writeParticipantRTCReturnsOnCall map[int]struct { - result1 error - } - WriteRoomRTCStub func(context.Context, livekit.RoomName, *livekit.RTCNodeMessage) error - writeRoomRTCMutex sync.RWMutex - writeRoomRTCArgsForCall []struct { - arg1 context.Context - arg2 livekit.RoomName - arg3 *livekit.RTCNodeMessage - } - writeRoomRTCReturns struct { - result1 error - } - writeRoomRTCReturnsOnCall map[int]struct { - result1 error - } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -435,70 +398,6 @@ func (fake *FakeRouter) ListNodesReturnsOnCall(i int, result1 []*livekit.Node, r }{result1, result2} } -func (fake *FakeRouter) OnNewParticipantRTC(arg1 routing.NewParticipantCallback) { - fake.onNewParticipantRTCMutex.Lock() - fake.onNewParticipantRTCArgsForCall = append(fake.onNewParticipantRTCArgsForCall, struct { - arg1 routing.NewParticipantCallback - }{arg1}) - stub := fake.OnNewParticipantRTCStub - fake.recordInvocation("OnNewParticipantRTC", []interface{}{arg1}) - fake.onNewParticipantRTCMutex.Unlock() - if stub != nil { - fake.OnNewParticipantRTCStub(arg1) - } -} - -func (fake *FakeRouter) OnNewParticipantRTCCallCount() int { - fake.onNewParticipantRTCMutex.RLock() - defer fake.onNewParticipantRTCMutex.RUnlock() - return len(fake.onNewParticipantRTCArgsForCall) -} - -func (fake *FakeRouter) OnNewParticipantRTCCalls(stub func(routing.NewParticipantCallback)) { - fake.onNewParticipantRTCMutex.Lock() - defer fake.onNewParticipantRTCMutex.Unlock() - fake.OnNewParticipantRTCStub = stub -} - -func (fake *FakeRouter) OnNewParticipantRTCArgsForCall(i int) routing.NewParticipantCallback { - fake.onNewParticipantRTCMutex.RLock() - defer fake.onNewParticipantRTCMutex.RUnlock() - argsForCall := fake.onNewParticipantRTCArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeRouter) OnRTCMessage(arg1 routing.RTCMessageCallback) { - fake.onRTCMessageMutex.Lock() - fake.onRTCMessageArgsForCall = append(fake.onRTCMessageArgsForCall, struct { - arg1 routing.RTCMessageCallback - }{arg1}) - stub := fake.OnRTCMessageStub - fake.recordInvocation("OnRTCMessage", []interface{}{arg1}) - fake.onRTCMessageMutex.Unlock() - if stub != nil { - fake.OnRTCMessageStub(arg1) - } -} - -func (fake *FakeRouter) OnRTCMessageCallCount() int { - fake.onRTCMessageMutex.RLock() - defer fake.onRTCMessageMutex.RUnlock() - return len(fake.onRTCMessageArgsForCall) -} - -func (fake *FakeRouter) OnRTCMessageCalls(stub func(routing.RTCMessageCallback)) { - fake.onRTCMessageMutex.Lock() - defer fake.onRTCMessageMutex.Unlock() - fake.OnRTCMessageStub = stub -} - -func (fake *FakeRouter) OnRTCMessageArgsForCall(i int) routing.RTCMessageCallback { - fake.onRTCMessageMutex.RLock() - defer fake.onRTCMessageMutex.RUnlock() - argsForCall := fake.onRTCMessageArgsForCall[i] - return argsForCall.arg1 -} - func (fake *FakeRouter) RegisterNode() error { fake.registerNodeMutex.Lock() ret, specificReturn := fake.registerNodeReturnsOnCall[len(fake.registerNodeArgsForCall)] @@ -864,133 +763,6 @@ func (fake *FakeRouter) UnregisterNodeReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRouter) WriteParticipantRTC(arg1 context.Context, arg2 livekit.RoomName, arg3 livekit.ParticipantIdentity, arg4 *livekit.RTCNodeMessage) error { - fake.writeParticipantRTCMutex.Lock() - ret, specificReturn := fake.writeParticipantRTCReturnsOnCall[len(fake.writeParticipantRTCArgsForCall)] - fake.writeParticipantRTCArgsForCall = append(fake.writeParticipantRTCArgsForCall, struct { - arg1 context.Context - arg2 livekit.RoomName - arg3 livekit.ParticipantIdentity - arg4 *livekit.RTCNodeMessage - }{arg1, arg2, arg3, arg4}) - stub := fake.WriteParticipantRTCStub - fakeReturns := fake.writeParticipantRTCReturns - fake.recordInvocation("WriteParticipantRTC", []interface{}{arg1, arg2, arg3, arg4}) - fake.writeParticipantRTCMutex.Unlock() - if stub != nil { - return stub(arg1, arg2, arg3, arg4) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeRouter) WriteParticipantRTCCallCount() int { - fake.writeParticipantRTCMutex.RLock() - defer fake.writeParticipantRTCMutex.RUnlock() - return len(fake.writeParticipantRTCArgsForCall) -} - -func (fake *FakeRouter) WriteParticipantRTCCalls(stub func(context.Context, livekit.RoomName, livekit.ParticipantIdentity, *livekit.RTCNodeMessage) error) { - fake.writeParticipantRTCMutex.Lock() - defer fake.writeParticipantRTCMutex.Unlock() - fake.WriteParticipantRTCStub = stub -} - -func (fake *FakeRouter) WriteParticipantRTCArgsForCall(i int) (context.Context, livekit.RoomName, livekit.ParticipantIdentity, *livekit.RTCNodeMessage) { - fake.writeParticipantRTCMutex.RLock() - defer fake.writeParticipantRTCMutex.RUnlock() - argsForCall := fake.writeParticipantRTCArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 -} - -func (fake *FakeRouter) WriteParticipantRTCReturns(result1 error) { - fake.writeParticipantRTCMutex.Lock() - defer fake.writeParticipantRTCMutex.Unlock() - fake.WriteParticipantRTCStub = nil - fake.writeParticipantRTCReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeRouter) WriteParticipantRTCReturnsOnCall(i int, result1 error) { - fake.writeParticipantRTCMutex.Lock() - defer fake.writeParticipantRTCMutex.Unlock() - fake.WriteParticipantRTCStub = nil - if fake.writeParticipantRTCReturnsOnCall == nil { - fake.writeParticipantRTCReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.writeParticipantRTCReturnsOnCall[i] = struct { - result1 error - }{result1} -} - -func (fake *FakeRouter) WriteRoomRTC(arg1 context.Context, arg2 livekit.RoomName, arg3 *livekit.RTCNodeMessage) error { - fake.writeRoomRTCMutex.Lock() - ret, specificReturn := fake.writeRoomRTCReturnsOnCall[len(fake.writeRoomRTCArgsForCall)] - fake.writeRoomRTCArgsForCall = append(fake.writeRoomRTCArgsForCall, struct { - arg1 context.Context - arg2 livekit.RoomName - arg3 *livekit.RTCNodeMessage - }{arg1, arg2, arg3}) - stub := fake.WriteRoomRTCStub - fakeReturns := fake.writeRoomRTCReturns - fake.recordInvocation("WriteRoomRTC", []interface{}{arg1, arg2, arg3}) - fake.writeRoomRTCMutex.Unlock() - if stub != nil { - return stub(arg1, arg2, arg3) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeRouter) WriteRoomRTCCallCount() int { - fake.writeRoomRTCMutex.RLock() - defer fake.writeRoomRTCMutex.RUnlock() - return len(fake.writeRoomRTCArgsForCall) -} - -func (fake *FakeRouter) WriteRoomRTCCalls(stub func(context.Context, livekit.RoomName, *livekit.RTCNodeMessage) error) { - fake.writeRoomRTCMutex.Lock() - defer fake.writeRoomRTCMutex.Unlock() - fake.WriteRoomRTCStub = stub -} - -func (fake *FakeRouter) WriteRoomRTCArgsForCall(i int) (context.Context, livekit.RoomName, *livekit.RTCNodeMessage) { - fake.writeRoomRTCMutex.RLock() - defer fake.writeRoomRTCMutex.RUnlock() - argsForCall := fake.writeRoomRTCArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 -} - -func (fake *FakeRouter) WriteRoomRTCReturns(result1 error) { - fake.writeRoomRTCMutex.Lock() - defer fake.writeRoomRTCMutex.Unlock() - fake.WriteRoomRTCStub = nil - fake.writeRoomRTCReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeRouter) WriteRoomRTCReturnsOnCall(i int, result1 error) { - fake.writeRoomRTCMutex.Lock() - defer fake.writeRoomRTCMutex.Unlock() - fake.WriteRoomRTCStub = nil - if fake.writeRoomRTCReturnsOnCall == nil { - fake.writeRoomRTCReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.writeRoomRTCReturnsOnCall[i] = struct { - result1 error - }{result1} -} - func (fake *FakeRouter) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -1004,10 +776,6 @@ func (fake *FakeRouter) Invocations() map[string][][]interface{} { defer fake.getRegionMutex.RUnlock() fake.listNodesMutex.RLock() defer fake.listNodesMutex.RUnlock() - fake.onNewParticipantRTCMutex.RLock() - defer fake.onNewParticipantRTCMutex.RUnlock() - fake.onRTCMessageMutex.RLock() - defer fake.onRTCMessageMutex.RUnlock() fake.registerNodeMutex.RLock() defer fake.registerNodeMutex.RUnlock() fake.removeDeadNodesMutex.RLock() @@ -1022,10 +790,6 @@ func (fake *FakeRouter) Invocations() map[string][][]interface{} { defer fake.stopMutex.RUnlock() fake.unregisterNodeMutex.RLock() defer fake.unregisterNodeMutex.RUnlock() - fake.writeParticipantRTCMutex.RLock() - defer fake.writeParticipantRTCMutex.RUnlock() - fake.writeRoomRTCMutex.RLock() - defer fake.writeRoomRTCMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 6d122fd6e..51ab919a0 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -101,7 +101,7 @@ func NewLocalRoomManager( return nil, err } - r := &RoomManager{ + return &RoomManager{ config: conf, rtcConfig: rtcConf, currentNode: currentNode, @@ -126,12 +126,7 @@ func NewLocalRoomManager( Region: conf.Region, NodeId: currentNode.Id, }, - } - - // hook up to router - router.OnNewParticipantRTC(r.StartSession) - router.OnRTCMessage(r.handleRTCMessage) - return r, nil + }, nil } func (r *RoomManager) GetRoom(_ context.Context, roomName livekit.RoomName) *rtc.Room { @@ -638,26 +633,6 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa } } -// handles RTC messages resulted from Room API calls -func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) { - switch rm := msg.Message.(type) { - case *livekit.RTCNodeMessage_RemoveParticipant: - r.RemoveParticipant(ctx, rm.RemoveParticipant) - case *livekit.RTCNodeMessage_MuteTrack: - r.MutePublishedTrack(ctx, rm.MuteTrack) - case *livekit.RTCNodeMessage_UpdateParticipant: - r.UpdateParticipant(ctx, rm.UpdateParticipant) - case *livekit.RTCNodeMessage_DeleteRoom: - r.DeleteRoom(ctx, rm.DeleteRoom) - case *livekit.RTCNodeMessage_UpdateSubscriptions: - r.UpdateSubscriptions(ctx, rm.UpdateSubscriptions) - case *livekit.RTCNodeMessage_SendData: - r.SendData(ctx, rm.SendData) - case *livekit.RTCNodeMessage_UpdateRoomMetadata: - r.UpdateRoomMetadata(ctx, rm.UpdateRoomMetadata) - } -} - type participantReq interface { GetRoom() string GetIdentity() string diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 2076b5ca4..003da7f19 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -16,20 +16,16 @@ package service import ( "context" - "fmt" "strconv" "time" "github.com/pkg/errors" - "github.com/thoas/go-funk" "github.com/twitchtv/twirp" - "google.golang.org/protobuf/proto" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" ) @@ -170,39 +166,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq return nil, twirpAuthError(err) } - if s.psrpcConf.Enabled { - return s.roomClient.DeleteRoom(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) - } - - if _, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false); err == ErrRoomNotFound { - return nil, twirp.NotFoundError("room not found") - } - - err := s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{ - Message: &livekit.RTCNodeMessage_DeleteRoom{ - DeleteRoom: req, - }, - }) - if err != nil { - return nil, err - } - - // we should not return until when the room is confirmed deleted - err = s.confirmExecution(func() error { - _, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false) - if err == nil { - return ErrOperationFailed - } else if err != ErrRoomNotFound { - return err - } else { - return nil - } - }) - if err != nil { - return nil, err - } - - return &livekit.DeleteRoomResponse{}, nil + return s.roomClient.DeleteRoom(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) } func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListParticipantsRequest) (*livekit.ListParticipantsResponse, error) { @@ -247,34 +211,7 @@ func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomPa return nil, twirp.NotFoundError("participant not found") } - if s.psrpcConf.Enabled { - return s.participantClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) - } - - err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ - Message: &livekit.RTCNodeMessage_RemoveParticipant{ - RemoveParticipant: req, - }, - }) - if err != nil { - return nil, err - } - - err = s.confirmExecution(func() error { - _, err := s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)) - if err == ErrParticipantNotFound { - return nil - } else if err != nil { - return err - } else { - return ErrOperationFailed - } - }) - if err != nil { - return nil, err - } - - return &livekit.RemoveParticipantResponse{}, nil + return s.participantClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) } func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteRoomTrackRequest) (*livekit.MuteRoomTrackResponse, error) { @@ -283,47 +220,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR return nil, twirpAuthError(err) } - if s.psrpcConf.Enabled { - return s.participantClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) - } - - err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ - Message: &livekit.RTCNodeMessage_MuteTrack{ - MuteTrack: req, - }, - }) - if err != nil { - return nil, err - } - - var track *livekit.TrackInfo - err = s.confirmExecution(func() error { - p, err := s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)) - if err != nil { - return err - } - // ensure track is muted - t := funk.Find(p.Tracks, func(t *livekit.TrackInfo) bool { - return t.Sid == req.TrackSid - }) - var ok bool - track, ok = t.(*livekit.TrackInfo) - if !ok { - return ErrTrackNotFound - } - if track.Muted != req.Muted { - return ErrOperationFailed - } - return nil - }) - if err != nil { - return nil, err - } - - res := &livekit.MuteRoomTrackResponse{ - Track: track, - } - return res, nil + return s.participantClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) } func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) { @@ -337,42 +234,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update return nil, twirpAuthError(err) } - if s.psrpcConf.Enabled { - return s.participantClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) - } - - err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ - Message: &livekit.RTCNodeMessage_UpdateParticipant{ - UpdateParticipant: req, - }, - }) - if err != nil { - return nil, err - } - - var participant *livekit.ParticipantInfo - var detailedError error - err = s.confirmExecution(func() error { - participant, err = s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)) - if err != nil { - return err - } - if req.Metadata != "" && participant.Metadata != req.Metadata { - detailedError = fmt.Errorf("metadata does not match") - return ErrOperationFailed - } - if req.Permission != nil && !proto.Equal(req.Permission, participant.Permission) { - detailedError = fmt.Errorf("permissions do not match, expected: %v, actual: %v", req.Permission, participant.Permission) - return ErrOperationFailed - } - return nil - }) - if err != nil { - logger.Warnw("could not confirm participant update", detailedError) - return nil, err - } - - return participant, nil + return s.participantClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) } func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.UpdateSubscriptionsRequest) (*livekit.UpdateSubscriptionsResponse, error) { @@ -386,20 +248,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda return nil, twirpAuthError(err) } - if s.psrpcConf.Enabled { - return s.participantClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) - } - - err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ - Message: &livekit.RTCNodeMessage_UpdateSubscriptions{ - UpdateSubscriptions: req, - }, - }) - if err != nil { - return nil, err - } - - return &livekit.UpdateSubscriptionsResponse{}, nil + return s.participantClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) } func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) { @@ -409,20 +258,7 @@ func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest return nil, twirpAuthError(err) } - if s.psrpcConf.Enabled { - return s.roomClient.SendData(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) - } - - err := s.router.WriteRoomRTC(ctx, roomName, &livekit.RTCNodeMessage{ - Message: &livekit.RTCNodeMessage_SendData{ - SendData: req, - }, - }) - if err != nil { - return nil, err - } - - return &livekit.SendDataResponse{}, nil + return s.roomClient.SendData(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) } func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.UpdateRoomMetadataRequest) (*livekit.Room, error) { @@ -451,20 +287,9 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return nil, err } - if s.psrpcConf.Enabled { - _, err := s.roomClient.UpdateRoomMetadata(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) - if err != nil { - return nil, err - } - } else { - err = s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{ - Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{ - UpdateRoomMetadata: req, - }, - }) - if err != nil { - return nil, err - } + _, err = s.roomClient.UpdateRoomMetadata(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + if err != nil { + return nil, err } err = s.confirmExecution(func() error { @@ -494,14 +319,6 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return room, nil } -func (s *RoomService) writeParticipantMessage(ctx context.Context, room livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error { - if err := EnsureAdminPermission(ctx, room); err != nil { - return twirpAuthError(err) - } - - return s.router.WriteParticipantRTC(ctx, room, identity, msg) -} - func (s *RoomService) confirmExecution(f func() error) error { expired := time.After(s.apiConf.ExecutionTimeout) var err error diff --git a/pkg/service/roomservice_test.go b/pkg/service/roomservice_test.go index f9de24a51..0237117f4 100644 --- a/pkg/service/roomservice_test.go +++ b/pkg/service/roomservice_test.go @@ -33,26 +33,6 @@ import ( ) func TestDeleteRoom(t *testing.T) { - t.Run("delete non-existent", func(t *testing.T) { - svc := newTestRoomService(config.RoomConfig{}) - grant := &auth.ClaimGrants{ - Video: &auth.VideoGrant{ - RoomCreate: true, - }, - } - ctx := service.WithGrants(context.Background(), grant) - svc.store.LoadRoomReturns(nil, nil, service.ErrRoomNotFound) - _, err := svc.DeleteRoom(ctx, &livekit.DeleteRoomRequest{ - Room: "testroom", - }) - require.Error(t, err) - if terr, ok := err.(twirp.Error); ok { - require.Equal(t, twirp.NotFound, terr.Code()) - } else { - require.Fail(t, "should be twirp error") - } - }) - t.Run("missing permissions", func(t *testing.T) { svc := newTestRoomService(config.RoomConfig{}) grant := &auth.ClaimGrants{ diff --git a/pkg/service/signal.go b/pkg/service/signal.go index a0abc5952..6c4ff820e 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -102,27 +102,17 @@ func (s *defaultSessionHandler) HandleSession( ) error { prometheus.IncrementParticipantRtcInit(1) - if rr, ok := s.router.(*routing.RedisRouter); ok { - rtcNode, err := s.router.GetNodeForRoom(ctx, roomName) - if err != nil { - return err - } + rtcNode, err := s.router.GetNodeForRoom(ctx, roomName) + if err != nil { + return err + } - if rtcNode.Id != s.currentNode.Id { - err = routing.ErrIncorrectRTCNode - logger.Errorw("called participant on incorrect node", err, - "rtcNode", rtcNode, - ) - return err - } - - pKey := routing.ParticipantKeyLegacy(roomName, pi.Identity) - pKeyB62 := routing.ParticipantKey(roomName, pi.Identity) - - // RTC session should start on this node - if err := rr.SetParticipantRTCNode(pKey, pKeyB62, s.currentNode.Id); err != nil { - return err - } + if rtcNode.Id != s.currentNode.Id { + err = routing.ErrIncorrectRTCNode + logger.Errorw("called participant on incorrect node", err, + "rtcNode", rtcNode, + ) + return err } return s.roomManager.StartSession(ctx, roomName, pi, requestSource, responseSink) diff --git a/pkg/service/signal_test.go b/pkg/service/signal_test.go index a80935182..951837176 100644 --- a/pkg/service/signal_test.go +++ b/pkg/service/signal_test.go @@ -40,7 +40,6 @@ func init() { func TestSignal(t *testing.T) { cfg := config.SignalRelayConfig{ - Enabled: false, RetryTimeout: 30 * time.Second, MinRetryInterval: 500 * time.Millisecond, MaxRetryInterval: 5 * time.Second, diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 9c9a741e5..5b2a3aab6 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -50,7 +50,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - router := routing.CreateRouter(conf, universalClient, currentNode, signalClient) + router := routing.CreateRouter(universalClient, currentNode, signalClient) objectStore := createStore(universalClient) roomAllocator, err := NewRoomAllocator(conf, router, objectStore) if err != nil { @@ -149,7 +149,7 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi if err != nil { return nil, err } - router := routing.CreateRouter(conf, universalClient, currentNode, signalClient) + router := routing.CreateRouter(universalClient, currentNode, signalClient) return router, nil } diff --git a/test/integration_helpers.go b/test/integration_helpers.go index 7a11b353e..0ce079c23 100644 --- a/test/integration_helpers.go +++ b/test/integration_helpers.go @@ -183,7 +183,6 @@ func createMultiNodeServer(nodeID string, port uint32) *service.LivekitServer { conf.RTC.TCPPort = port + 2 conf.Redis.Address = "localhost:6379" conf.Keys = map[string]string{testApiKey: testApiSecret} - conf.SignalRelay.Enabled = true currentNode, err := routing.NewLocalNode(conf) if err != nil {