From a010b618afe2d5066e98d9e6c066854e03da97fe Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sat, 20 Feb 2021 21:09:04 -0800 Subject: [PATCH] re-assign node for room if existing node for room is no longer available --- cmd/cli/client/client.go | 5 ++- pkg/routing/interfaces.go | 2 +- pkg/routing/localrouter.go | 4 +- pkg/routing/redisrouter.go | 24 ++++++----- pkg/routing/routingfakes/fake_router.go | 20 ++++----- pkg/service/roommanager.go | 35 +++++++++++++--- pkg/service/rtcservice.go | 7 ++-- pkg/service/server.go | 4 ++ test/multinode_test.go | 54 +++++++++++++++++-------- 9 files changed, 106 insertions(+), 49 deletions(-) diff --git a/cmd/cli/client/client.go b/cmd/cli/client/client.go index f7666918e..f28f773ae 100644 --- a/cmd/cli/client/client.go +++ b/cmd/cli/client/client.go @@ -507,7 +507,10 @@ func (c *RTCClient) handleAnswer(desc webrtc.SessionDescription) error { } func (c *RTCClient) negotiate() { - logger.Debugw("starting negotiation", "participant", c.localParticipant.Identity) + if c.localParticipant != nil { + logger.Debugw("starting negotiation", "participant", c.localParticipant.Identity) + } + offer, err := c.publisher.PeerConnection().CreateOffer(nil) if err != nil { return diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index fbf0638f5..a304dae25 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -29,7 +29,7 @@ type RTCMessageCallback func(roomName, identity string, msg *livekit.RTCNodeMess // Router allows multiple nodes to coordinate the participant session //counterfeiter:generate . Router type Router interface { - GetNodeForRoom(roomName string) (string, error) + GetNodeForRoom(roomName string) (*livekit.Node, error) SetNodeForRoom(roomName string, nodeId string) error ClearRoomState(roomName string) error RegisterNode() error diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index 3fd593493..7daa6ec59 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -33,8 +33,8 @@ func NewLocalRouter(currentNode LocalNode) *LocalRouter { } } -func (r *LocalRouter) GetNodeForRoom(roomName string) (string, error) { - return r.currentNode.Id, nil +func (r *LocalRouter) GetNodeForRoom(roomName string) (*livekit.Node, error) { + return r.currentNode, nil } func (r *LocalRouter) SetNodeForRoom(roomName string, nodeId string) error { diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index b396a4473..ea653641f 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -76,12 +76,15 @@ func (r *RedisRouter) RemoveDeadNodes() error { return nil } -func (r *RedisRouter) GetNodeForRoom(roomName string) (string, error) { - val, err := r.rc.HGet(r.ctx, NodeRoomKey, roomName).Result() - if err != nil { - err = errors.Wrap(err, "could not get node for room") +func (r *RedisRouter) GetNodeForRoom(roomName string) (*livekit.Node, error) { + nodeId, err := r.rc.HGet(r.ctx, NodeRoomKey, roomName).Result() + if err == redis.Nil { + return nil, ErrNotFound + } else if err != nil { + return nil, errors.Wrap(err, "could not get node for room") } - return val, err + + return r.GetNode(nodeId) } func (r *RedisRouter) SetNodeForRoom(roomName string, nodeId string) error { @@ -97,7 +100,9 @@ func (r *RedisRouter) ClearRoomState(roomName string) error { func (r *RedisRouter) GetNode(nodeId string) (*livekit.Node, error) { data, err := r.rc.HGet(r.ctx, NodesKey, nodeId).Result() - if err != nil { + if err == redis.Nil { + return nil, ErrNotFound + } else if err != nil { return nil, err } n := livekit.Node{} @@ -140,7 +145,7 @@ func (r *RedisRouter) StartParticipantSignal(roomName, identity, metadata string return } - sink := NewRTCNodeSink(r.rc, rtcNode, pKey) + sink := NewRTCNodeSink(r.rc, rtcNode.Id, pKey) // sends a message to start session err = sink.WriteMessage(&livekit.StartSession{ @@ -177,13 +182,13 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK return err } - if rtcNode != r.currentNode.Id { + if rtcNode.Id != r.currentNode.Id { logger.Errorw("called participant on incorrect node", "rtcNode", rtcNode, "currentNode", r.currentNode.Id) return ErrIncorrectRTCNode } - if err := r.setParticipantRTCNode(participantKey, rtcNode); err != nil { + if err := r.setParticipantRTCNode(participantKey, rtcNode.Id); err != nil { return err } @@ -238,6 +243,7 @@ func (r *RedisRouter) Stop() { } logger.Debugw("stopping RedisRouter") r.pubsub.Close() + r.UnregisterNode() r.cancel() } diff --git a/pkg/routing/routingfakes/fake_router.go b/pkg/routing/routingfakes/fake_router.go index 320b79fbb..ac6482555 100644 --- a/pkg/routing/routingfakes/fake_router.go +++ b/pkg/routing/routingfakes/fake_router.go @@ -47,17 +47,17 @@ type FakeRouter struct { result1 *livekit.Node result2 error } - GetNodeForRoomStub func(string) (string, error) + GetNodeForRoomStub func(string) (*livekit.Node, error) getNodeForRoomMutex sync.RWMutex getNodeForRoomArgsForCall []struct { arg1 string } getNodeForRoomReturns struct { - result1 string + result1 *livekit.Node result2 error } getNodeForRoomReturnsOnCall map[int]struct { - result1 string + result1 *livekit.Node result2 error } ListNodesStub func() ([]*livekit.Node, error) @@ -352,7 +352,7 @@ func (fake *FakeRouter) GetNodeReturnsOnCall(i int, result1 *livekit.Node, resul }{result1, result2} } -func (fake *FakeRouter) GetNodeForRoom(arg1 string) (string, error) { +func (fake *FakeRouter) GetNodeForRoom(arg1 string) (*livekit.Node, error) { fake.getNodeForRoomMutex.Lock() ret, specificReturn := fake.getNodeForRoomReturnsOnCall[len(fake.getNodeForRoomArgsForCall)] fake.getNodeForRoomArgsForCall = append(fake.getNodeForRoomArgsForCall, struct { @@ -377,7 +377,7 @@ func (fake *FakeRouter) GetNodeForRoomCallCount() int { return len(fake.getNodeForRoomArgsForCall) } -func (fake *FakeRouter) GetNodeForRoomCalls(stub func(string) (string, error)) { +func (fake *FakeRouter) GetNodeForRoomCalls(stub func(string) (*livekit.Node, error)) { fake.getNodeForRoomMutex.Lock() defer fake.getNodeForRoomMutex.Unlock() fake.GetNodeForRoomStub = stub @@ -390,28 +390,28 @@ func (fake *FakeRouter) GetNodeForRoomArgsForCall(i int) string { return argsForCall.arg1 } -func (fake *FakeRouter) GetNodeForRoomReturns(result1 string, result2 error) { +func (fake *FakeRouter) GetNodeForRoomReturns(result1 *livekit.Node, result2 error) { fake.getNodeForRoomMutex.Lock() defer fake.getNodeForRoomMutex.Unlock() fake.GetNodeForRoomStub = nil fake.getNodeForRoomReturns = struct { - result1 string + result1 *livekit.Node result2 error }{result1, result2} } -func (fake *FakeRouter) GetNodeForRoomReturnsOnCall(i int, result1 string, result2 error) { +func (fake *FakeRouter) GetNodeForRoomReturnsOnCall(i int, result1 *livekit.Node, result2 error) { fake.getNodeForRoomMutex.Lock() defer fake.getNodeForRoomMutex.Unlock() fake.GetNodeForRoomStub = nil if fake.getNodeForRoomReturnsOnCall == nil { fake.getNodeForRoomReturnsOnCall = make(map[int]struct { - result1 string + result1 *livekit.Node result2 error }) } fake.getNodeForRoomReturnsOnCall[i] = struct { - result1 string + result1 *livekit.Node result2 error }{result1, result2} } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 1a31ee7c7..f5242c01d 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -49,17 +49,40 @@ func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.Loc // CreateRoom creates a new room from a request and allocates it to a node to handle // it'll also monitor its state, and cleans it up when appropriate func (r *RoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error) { - rm := &livekit.Room{ - Sid: utils.NewGuid(utils.RoomPrefix), - Name: req.Name, - EmptyTimeout: req.EmptyTimeout, - MaxParticipants: req.MaxParticipants, - CreationTime: time.Now().Unix(), + // find existing room and update it + rm, err := r.roomStore.GetRoom(req.Name) + if err == ErrRoomNotFound { + rm = &livekit.Room{ + Sid: utils.NewGuid(utils.RoomPrefix), + Name: req.Name, + CreationTime: time.Now().Unix(), + } + } else if err != nil { + return nil, err + } + + if req.EmptyTimeout > 0 { + rm.EmptyTimeout = req.EmptyTimeout + } + if req.MaxParticipants > 0 { + rm.MaxParticipants = req.MaxParticipants } if err := r.roomStore.CreateRoom(rm); err != nil { return nil, err } + // Is that node still available? + node, err := r.router.GetNodeForRoom(rm.Name) + if err != routing.ErrNotFound && err != nil { + return nil, err + } + + // keep it on that node + if err == nil && routing.IsAvailable(node) { + return rm, nil + } + + // select a new node nodeId := req.NodeId if nodeId == "" { // select a node for room diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index dda5db4e4..70704c501 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -67,10 +67,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { roomName = onlyName } - rm, err := s.roomManager.roomStore.GetRoom(roomName) - if err == ErrRoomNotFound { - rm, err = s.roomManager.CreateRoom(&livekit.CreateRoomRequest{Name: roomName}) - } else if err != nil { + // create room if it doesn't exist, also assigns an RTC node for the room + rm, err := s.roomManager.CreateRoom(&livekit.CreateRoomRequest{Name: roomName}) + if err != nil { handleError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/pkg/service/server.go b/pkg/service/server.go index 233ed8853..41d8eb093 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -80,6 +80,10 @@ func NewLivekitServer(conf *config.Config, return } +func (s *LivekitServer) Node() *livekit.Node { + return s.currentNode +} + func (s *LivekitServer) IsRunning() bool { return s.running.Get() } diff --git a/test/multinode_test.go b/test/multinode_test.go index 0672a8c19..4cb214339 100644 --- a/test/multinode_test.go +++ b/test/multinode_test.go @@ -2,6 +2,7 @@ package test import ( "testing" + "time" "github.com/stretchr/testify/assert" @@ -54,22 +55,6 @@ func TestMultiNodeRouting(t *testing.T) { assert.Equal(t, c1.ID(), tr1.StreamID()) return true }) - - // TODO: delete room explicitly and ensure it's closed - // - //// ensure that room is closed - // - //rc := redisClient() - //ctx := context.Background() - //withTimeout(t, "room should be closed", func() bool { - // if rc.HGet(ctx, service.RoomsKey, testRoom).Err() == nil { - // return false - // } - // return true - //}) - // - //assert.Equal(t, redis.Nil, rc.HGet(ctx, routing.NodeRoomKey, testRoom).Err()) - //assert.Equal(t, redis.Nil, rc.HGet(ctx, service.RoomIdMap, testRoom).Err()) } func TestConnectWithoutCreation(t *testing.T) { @@ -122,3 +107,40 @@ func TestMultinodeReceiveBeforePublish(t *testing.T) { scenarioReceiveBeforePublish(t) } + +// reconnecting to the same room, after one of the servers has gone away +func TestMultinodeReconnectAfterNodeShutdown(t *testing.T) { + if testing.Short() { + t.SkipNow() + return + } + + logger.Infow("\n\n---Starting TestMultiNodeRouting---") + defer logger.Infow("---Finishing TestMultiNodeRouting---") + + s1, s2 := setupMultiNodeTest() + defer s1.Stop() + defer s2.Stop() + + // creating room on node 1 + _, err := roomClient.CreateRoom(contextWithCreateRoomToken(), &livekit.CreateRoomRequest{ + Name: testRoom, + NodeId: s2.Node().Id, + }) + assert.NoError(t, err) + + // one node connecting to node 1, and another connecting to node 2 + c1 := createRTCClient("c1", defaultServerPort) + c2 := createRTCClient("c2", secondServerPort) + + waitUntilConnected(t, c1, c2) + stopClients(c1, c2) + + // stop s2, and connect to room again + s2.Stop() + + time.Sleep(syncDelay) + + c3 := createRTCClient("c3", defaultServerPort) + waitUntilConnected(t, c3) +}