mirror of
https://github.com/livekit/livekit.git
synced 2026-05-14 16:15:25 +00:00
re-assign node for room if existing node for room is no longer available
This commit is contained in:
@@ -507,7 +507,10 @@ func (c *RTCClient) handleAnswer(desc webrtc.SessionDescription) error {
|
||||
}
|
||||
|
||||
func (c *RTCClient) negotiate() {
|
||||
logger.Debugw("starting negotiation", "participant", c.localParticipant.Identity)
|
||||
if c.localParticipant != nil {
|
||||
logger.Debugw("starting negotiation", "participant", c.localParticipant.Identity)
|
||||
}
|
||||
|
||||
offer, err := c.publisher.PeerConnection().CreateOffer(nil)
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
@@ -29,7 +29,7 @@ type RTCMessageCallback func(roomName, identity string, msg *livekit.RTCNodeMess
|
||||
// Router allows multiple nodes to coordinate the participant session
|
||||
//counterfeiter:generate . Router
|
||||
type Router interface {
|
||||
GetNodeForRoom(roomName string) (string, error)
|
||||
GetNodeForRoom(roomName string) (*livekit.Node, error)
|
||||
SetNodeForRoom(roomName string, nodeId string) error
|
||||
ClearRoomState(roomName string) error
|
||||
RegisterNode() error
|
||||
|
||||
@@ -33,8 +33,8 @@ func NewLocalRouter(currentNode LocalNode) *LocalRouter {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *LocalRouter) GetNodeForRoom(roomName string) (string, error) {
|
||||
return r.currentNode.Id, nil
|
||||
func (r *LocalRouter) GetNodeForRoom(roomName string) (*livekit.Node, error) {
|
||||
return r.currentNode, nil
|
||||
}
|
||||
|
||||
func (r *LocalRouter) SetNodeForRoom(roomName string, nodeId string) error {
|
||||
|
||||
@@ -76,12 +76,15 @@ func (r *RedisRouter) RemoveDeadNodes() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RedisRouter) GetNodeForRoom(roomName string) (string, error) {
|
||||
val, err := r.rc.HGet(r.ctx, NodeRoomKey, roomName).Result()
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "could not get node for room")
|
||||
func (r *RedisRouter) GetNodeForRoom(roomName string) (*livekit.Node, error) {
|
||||
nodeId, err := r.rc.HGet(r.ctx, NodeRoomKey, roomName).Result()
|
||||
if err == redis.Nil {
|
||||
return nil, ErrNotFound
|
||||
} else if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get node for room")
|
||||
}
|
||||
return val, err
|
||||
|
||||
return r.GetNode(nodeId)
|
||||
}
|
||||
|
||||
func (r *RedisRouter) SetNodeForRoom(roomName string, nodeId string) error {
|
||||
@@ -97,7 +100,9 @@ func (r *RedisRouter) ClearRoomState(roomName string) error {
|
||||
|
||||
func (r *RedisRouter) GetNode(nodeId string) (*livekit.Node, error) {
|
||||
data, err := r.rc.HGet(r.ctx, NodesKey, nodeId).Result()
|
||||
if err != nil {
|
||||
if err == redis.Nil {
|
||||
return nil, ErrNotFound
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
n := livekit.Node{}
|
||||
@@ -140,7 +145,7 @@ func (r *RedisRouter) StartParticipantSignal(roomName, identity, metadata string
|
||||
return
|
||||
}
|
||||
|
||||
sink := NewRTCNodeSink(r.rc, rtcNode, pKey)
|
||||
sink := NewRTCNodeSink(r.rc, rtcNode.Id, pKey)
|
||||
|
||||
// sends a message to start session
|
||||
err = sink.WriteMessage(&livekit.StartSession{
|
||||
@@ -177,13 +182,13 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK
|
||||
return err
|
||||
}
|
||||
|
||||
if rtcNode != r.currentNode.Id {
|
||||
if rtcNode.Id != r.currentNode.Id {
|
||||
logger.Errorw("called participant on incorrect node",
|
||||
"rtcNode", rtcNode, "currentNode", r.currentNode.Id)
|
||||
return ErrIncorrectRTCNode
|
||||
}
|
||||
|
||||
if err := r.setParticipantRTCNode(participantKey, rtcNode); err != nil {
|
||||
if err := r.setParticipantRTCNode(participantKey, rtcNode.Id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -238,6 +243,7 @@ func (r *RedisRouter) Stop() {
|
||||
}
|
||||
logger.Debugw("stopping RedisRouter")
|
||||
r.pubsub.Close()
|
||||
r.UnregisterNode()
|
||||
r.cancel()
|
||||
}
|
||||
|
||||
|
||||
@@ -47,17 +47,17 @@ type FakeRouter struct {
|
||||
result1 *livekit.Node
|
||||
result2 error
|
||||
}
|
||||
GetNodeForRoomStub func(string) (string, error)
|
||||
GetNodeForRoomStub func(string) (*livekit.Node, error)
|
||||
getNodeForRoomMutex sync.RWMutex
|
||||
getNodeForRoomArgsForCall []struct {
|
||||
arg1 string
|
||||
}
|
||||
getNodeForRoomReturns struct {
|
||||
result1 string
|
||||
result1 *livekit.Node
|
||||
result2 error
|
||||
}
|
||||
getNodeForRoomReturnsOnCall map[int]struct {
|
||||
result1 string
|
||||
result1 *livekit.Node
|
||||
result2 error
|
||||
}
|
||||
ListNodesStub func() ([]*livekit.Node, error)
|
||||
@@ -352,7 +352,7 @@ func (fake *FakeRouter) GetNodeReturnsOnCall(i int, result1 *livekit.Node, resul
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) GetNodeForRoom(arg1 string) (string, error) {
|
||||
func (fake *FakeRouter) GetNodeForRoom(arg1 string) (*livekit.Node, error) {
|
||||
fake.getNodeForRoomMutex.Lock()
|
||||
ret, specificReturn := fake.getNodeForRoomReturnsOnCall[len(fake.getNodeForRoomArgsForCall)]
|
||||
fake.getNodeForRoomArgsForCall = append(fake.getNodeForRoomArgsForCall, struct {
|
||||
@@ -377,7 +377,7 @@ func (fake *FakeRouter) GetNodeForRoomCallCount() int {
|
||||
return len(fake.getNodeForRoomArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) GetNodeForRoomCalls(stub func(string) (string, error)) {
|
||||
func (fake *FakeRouter) GetNodeForRoomCalls(stub func(string) (*livekit.Node, error)) {
|
||||
fake.getNodeForRoomMutex.Lock()
|
||||
defer fake.getNodeForRoomMutex.Unlock()
|
||||
fake.GetNodeForRoomStub = stub
|
||||
@@ -390,28 +390,28 @@ func (fake *FakeRouter) GetNodeForRoomArgsForCall(i int) string {
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) GetNodeForRoomReturns(result1 string, result2 error) {
|
||||
func (fake *FakeRouter) GetNodeForRoomReturns(result1 *livekit.Node, result2 error) {
|
||||
fake.getNodeForRoomMutex.Lock()
|
||||
defer fake.getNodeForRoomMutex.Unlock()
|
||||
fake.GetNodeForRoomStub = nil
|
||||
fake.getNodeForRoomReturns = struct {
|
||||
result1 string
|
||||
result1 *livekit.Node
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) GetNodeForRoomReturnsOnCall(i int, result1 string, result2 error) {
|
||||
func (fake *FakeRouter) GetNodeForRoomReturnsOnCall(i int, result1 *livekit.Node, result2 error) {
|
||||
fake.getNodeForRoomMutex.Lock()
|
||||
defer fake.getNodeForRoomMutex.Unlock()
|
||||
fake.GetNodeForRoomStub = nil
|
||||
if fake.getNodeForRoomReturnsOnCall == nil {
|
||||
fake.getNodeForRoomReturnsOnCall = make(map[int]struct {
|
||||
result1 string
|
||||
result1 *livekit.Node
|
||||
result2 error
|
||||
})
|
||||
}
|
||||
fake.getNodeForRoomReturnsOnCall[i] = struct {
|
||||
result1 string
|
||||
result1 *livekit.Node
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
@@ -49,17 +49,40 @@ func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.Loc
|
||||
// CreateRoom creates a new room from a request and allocates it to a node to handle
|
||||
// it'll also monitor its state, and cleans it up when appropriate
|
||||
func (r *RoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error) {
|
||||
rm := &livekit.Room{
|
||||
Sid: utils.NewGuid(utils.RoomPrefix),
|
||||
Name: req.Name,
|
||||
EmptyTimeout: req.EmptyTimeout,
|
||||
MaxParticipants: req.MaxParticipants,
|
||||
CreationTime: time.Now().Unix(),
|
||||
// find existing room and update it
|
||||
rm, err := r.roomStore.GetRoom(req.Name)
|
||||
if err == ErrRoomNotFound {
|
||||
rm = &livekit.Room{
|
||||
Sid: utils.NewGuid(utils.RoomPrefix),
|
||||
Name: req.Name,
|
||||
CreationTime: time.Now().Unix(),
|
||||
}
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if req.EmptyTimeout > 0 {
|
||||
rm.EmptyTimeout = req.EmptyTimeout
|
||||
}
|
||||
if req.MaxParticipants > 0 {
|
||||
rm.MaxParticipants = req.MaxParticipants
|
||||
}
|
||||
if err := r.roomStore.CreateRoom(rm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Is that node still available?
|
||||
node, err := r.router.GetNodeForRoom(rm.Name)
|
||||
if err != routing.ErrNotFound && err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// keep it on that node
|
||||
if err == nil && routing.IsAvailable(node) {
|
||||
return rm, nil
|
||||
}
|
||||
|
||||
// select a new node
|
||||
nodeId := req.NodeId
|
||||
if nodeId == "" {
|
||||
// select a node for room
|
||||
|
||||
@@ -67,10 +67,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
roomName = onlyName
|
||||
}
|
||||
|
||||
rm, err := s.roomManager.roomStore.GetRoom(roomName)
|
||||
if err == ErrRoomNotFound {
|
||||
rm, err = s.roomManager.CreateRoom(&livekit.CreateRoomRequest{Name: roomName})
|
||||
} else if err != nil {
|
||||
// create room if it doesn't exist, also assigns an RTC node for the room
|
||||
rm, err := s.roomManager.CreateRoom(&livekit.CreateRoomRequest{Name: roomName})
|
||||
if err != nil {
|
||||
handleError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -80,6 +80,10 @@ func NewLivekitServer(conf *config.Config,
|
||||
return
|
||||
}
|
||||
|
||||
func (s *LivekitServer) Node() *livekit.Node {
|
||||
return s.currentNode
|
||||
}
|
||||
|
||||
func (s *LivekitServer) IsRunning() bool {
|
||||
return s.running.Get()
|
||||
}
|
||||
|
||||
+38
-16
@@ -2,6 +2,7 @@ package test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
@@ -54,22 +55,6 @@ func TestMultiNodeRouting(t *testing.T) {
|
||||
assert.Equal(t, c1.ID(), tr1.StreamID())
|
||||
return true
|
||||
})
|
||||
|
||||
// TODO: delete room explicitly and ensure it's closed
|
||||
//
|
||||
//// ensure that room is closed
|
||||
//
|
||||
//rc := redisClient()
|
||||
//ctx := context.Background()
|
||||
//withTimeout(t, "room should be closed", func() bool {
|
||||
// if rc.HGet(ctx, service.RoomsKey, testRoom).Err() == nil {
|
||||
// return false
|
||||
// }
|
||||
// return true
|
||||
//})
|
||||
//
|
||||
//assert.Equal(t, redis.Nil, rc.HGet(ctx, routing.NodeRoomKey, testRoom).Err())
|
||||
//assert.Equal(t, redis.Nil, rc.HGet(ctx, service.RoomIdMap, testRoom).Err())
|
||||
}
|
||||
|
||||
func TestConnectWithoutCreation(t *testing.T) {
|
||||
@@ -122,3 +107,40 @@ func TestMultinodeReceiveBeforePublish(t *testing.T) {
|
||||
|
||||
scenarioReceiveBeforePublish(t)
|
||||
}
|
||||
|
||||
// reconnecting to the same room, after one of the servers has gone away
|
||||
func TestMultinodeReconnectAfterNodeShutdown(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
return
|
||||
}
|
||||
|
||||
logger.Infow("\n\n---Starting TestMultiNodeRouting---")
|
||||
defer logger.Infow("---Finishing TestMultiNodeRouting---")
|
||||
|
||||
s1, s2 := setupMultiNodeTest()
|
||||
defer s1.Stop()
|
||||
defer s2.Stop()
|
||||
|
||||
// creating room on node 1
|
||||
_, err := roomClient.CreateRoom(contextWithCreateRoomToken(), &livekit.CreateRoomRequest{
|
||||
Name: testRoom,
|
||||
NodeId: s2.Node().Id,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// one node connecting to node 1, and another connecting to node 2
|
||||
c1 := createRTCClient("c1", defaultServerPort)
|
||||
c2 := createRTCClient("c2", secondServerPort)
|
||||
|
||||
waitUntilConnected(t, c1, c2)
|
||||
stopClients(c1, c2)
|
||||
|
||||
// stop s2, and connect to room again
|
||||
s2.Stop()
|
||||
|
||||
time.Sleep(syncDelay)
|
||||
|
||||
c3 := createRTCClient("c3", defaultServerPort)
|
||||
waitUntilConnected(t, c3)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user