From abde72a907f4f5598086af7934cd91501d60fc31 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 16 Sep 2021 23:29:29 -0700 Subject: [PATCH] Remove room manager from room service (#119) * start splitting * room allocator * remove room manager * Update pkg/service/roomallocator.go Co-authored-by: David Zhao Co-authored-by: David Zhao --- pkg/routing/routingfakes/fake_router.go | 32 +++++- pkg/service/interfaces.go | 1 - pkg/service/roomallocator.go | 99 +++++++++++++++++++ ...mmanager_test.go => roomallocator_test.go} | 12 +-- pkg/service/roommanager.go | 71 ------------- pkg/service/roomservice.go | 61 +++++++----- pkg/service/rtcservice.go | 24 ++--- pkg/service/utils.go | 1 + pkg/service/wire_gen.go | 20 ++-- 9 files changed, 196 insertions(+), 125 deletions(-) create mode 100644 pkg/service/roomallocator.go rename pkg/service/{roommanager_test.go => roomallocator_test.go} (74%) diff --git a/pkg/routing/routingfakes/fake_router.go b/pkg/routing/routingfakes/fake_router.go index 18716d13c..8a5849095 100644 --- a/pkg/routing/routingfakes/fake_router.go +++ b/pkg/routing/routingfakes/fake_router.go @@ -71,6 +71,10 @@ type FakeRouter struct { onRTCMessageArgsForCall []struct { arg1 routing.RTCMessageCallback } + PreStopStub func() + preStopMutex sync.RWMutex + preStopArgsForCall []struct { + } RegisterNodeStub func() error registerNodeMutex sync.RWMutex registerNodeArgsForCall []struct { @@ -476,6 +480,30 @@ func (fake *FakeRouter) OnRTCMessageArgsForCall(i int) routing.RTCMessageCallbac return argsForCall.arg1 } +func (fake *FakeRouter) PreStop() { + fake.preStopMutex.Lock() + fake.preStopArgsForCall = append(fake.preStopArgsForCall, struct { + }{}) + stub := fake.PreStopStub + fake.recordInvocation("PreStop", []interface{}{}) + fake.preStopMutex.Unlock() + if stub != nil { + fake.PreStopStub() + } +} + +func (fake *FakeRouter) PreStopCallCount() int { + fake.preStopMutex.RLock() + defer fake.preStopMutex.RUnlock() + return len(fake.preStopArgsForCall) +} + +func (fake *FakeRouter) PreStopCalls(stub func()) { + fake.preStopMutex.Lock() + defer fake.preStopMutex.Unlock() + fake.PreStopStub = stub +} + func (fake *FakeRouter) RegisterNode() error { fake.registerNodeMutex.Lock() ret, specificReturn := fake.registerNodeReturnsOnCall[len(fake.registerNodeArgsForCall)] @@ -782,8 +810,6 @@ func (fake *FakeRouter) Stop() { } } -func (fake *FakeRouter) PreStop() {} - func (fake *FakeRouter) StopCallCount() int { fake.stopMutex.RLock() defer fake.stopMutex.RUnlock() @@ -928,6 +954,8 @@ func (fake *FakeRouter) Invocations() map[string][][]interface{} { defer fake.onNewParticipantRTCMutex.RUnlock() fake.onRTCMessageMutex.RLock() defer fake.onRTCMessageMutex.RUnlock() + fake.preStopMutex.RLock() + defer fake.preStopMutex.RUnlock() fake.registerNodeMutex.RLock() defer fake.registerNodeMutex.RUnlock() fake.removeDeadNodesMutex.RLock() diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 242efcf1b..2aab7ff00 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -35,7 +35,6 @@ type RoomStore interface { type RoomManager interface { RoomStore - CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) GetRoom(ctx context.Context, roomName string) *rtc.Room DeleteRoom(ctx context.Context, roomName string) error StartSession(ctx context.Context, roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go new file mode 100644 index 000000000..f3f6556c8 --- /dev/null +++ b/pkg/service/roomallocator.go @@ -0,0 +1,99 @@ +package service + +import ( + "context" + "time" + + "github.com/livekit/protocol/logger" + livekit "github.com/livekit/protocol/proto" + "github.com/livekit/protocol/utils" + + "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/routing" +) + +type RoomAllocator struct { + config *config.Config + router routing.Router + selector routing.NodeSelector + roomStore RoomStore +} + +func NewRoomAllocator(conf *config.Config, router routing.Router, selector routing.NodeSelector, rs RoomStore) *RoomAllocator { + return &RoomAllocator{ + config: conf, + router: router, + selector: selector, + roomStore: rs, + } +} + +// CreateRoom creates a new room from a request and allocates it to a node to handle +// it'll also monitor its state, and cleans it up when appropriate +func (r *RoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) { + token, err := r.roomStore.LockRoom(ctx, req.Name, 5*time.Second) + if err != nil { + return nil, err + } + defer func() { + _ = r.roomStore.UnlockRoom(ctx, req.Name, token) + }() + + // find existing room and update it + rm, err := r.roomStore.LoadRoom(ctx, req.Name) + if err == ErrRoomNotFound { + rm = &livekit.Room{ + Sid: utils.NewGuid(utils.RoomPrefix), + Name: req.Name, + CreationTime: time.Now().Unix(), + TurnPassword: utils.RandomSecret(), + } + applyDefaultRoomConfig(rm, &r.config.Room) + } else if err != nil { + return nil, err + } + + if req.EmptyTimeout > 0 { + rm.EmptyTimeout = req.EmptyTimeout + } + if req.MaxParticipants > 0 { + rm.MaxParticipants = req.MaxParticipants + } + if err := r.roomStore.StoreRoom(ctx, rm); err != nil { + return nil, err + } + + // Is that node still available? + node, err := r.router.GetNodeForRoom(ctx, rm.Name) + if err != routing.ErrNotFound && err != nil { + return nil, err + } + + // keep it on that node + if err == nil && routing.IsAvailable(node) { + return rm, nil + } + + // select a new node + nodeId := req.NodeId + if nodeId == "" { + // select a node for room + nodes, err := r.router.ListNodes() + if err != nil { + return nil, err + } + + node, err := r.selector.SelectNode(nodes, rm) + if err != nil { + return nil, err + } + nodeId = node.Id + } + + logger.Debugw("selected node for room", "room", rm.Name, "roomID", rm.Sid, "nodeID", nodeId) + if err := r.router.SetNodeForRoom(ctx, req.Name, nodeId); err != nil { + return nil, err + } + + return rm, nil +} diff --git a/pkg/service/roommanager_test.go b/pkg/service/roomallocator_test.go similarity index 74% rename from pkg/service/roommanager_test.go rename to pkg/service/roomallocator_test.go index e0ece4ea2..44d717fdf 100644 --- a/pkg/service/roommanager_test.go +++ b/pkg/service/roomallocator_test.go @@ -15,17 +15,17 @@ import ( ) func TestCreateRoom(t *testing.T) { - manager, conf := newTestRoomManager(t) + ra, conf := newTestRoomAllocator(t) t.Run("ensure default room settings are applied", func(t *testing.T) { - room, err := manager.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"}) + room, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"}) require.NoError(t, err) require.Equal(t, conf.Room.EmptyTimeout, room.EmptyTimeout) require.NotEmpty(t, room.EnabledCodecs) }) } -func newTestRoomManager(t *testing.T) (*service.LocalRoomManager, *config.Config) { +func newTestRoomAllocator(t *testing.T) (*service.RoomAllocator, *config.Config) { store := &servicefakes.FakeRoomStore{} store.LoadRoomReturns(nil, service.ErrRoomNotFound) router := &routingfakes.FakeRouter{} @@ -37,8 +37,6 @@ func newTestRoomManager(t *testing.T) (*service.LocalRoomManager, *config.Config router.GetNodeForRoomReturns(node, nil) - rm, err := service.NewLocalRoomManager(store, router, node, selector, nil, conf) - require.NoError(t, err) - - return rm, conf + ra := service.NewRoomAllocator(conf, router, selector, store) + return ra, conf } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 3054232fc..7cce7c0d9 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -9,7 +9,6 @@ import ( "github.com/gammazero/workerpool" "github.com/livekit/protocol/logger" livekit "github.com/livekit/protocol/proto" - "github.com/livekit/protocol/utils" "github.com/livekit/protocol/webhook" "github.com/livekit/livekit-server/pkg/config" @@ -64,76 +63,6 @@ func NewLocalRoomManager(rp RoomStore, router routing.Router, currentNode routin return r, nil } -// CreateRoom creates a new room from a request and allocates it to a node to handle -// it'll also monitor fits state, and cleans it up when appropriate -func (r *LocalRoomManager) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) { - token, err := r.LockRoom(ctx, req.Name, 5*time.Second) - if err != nil { - return nil, err - } - defer func() { - _ = r.UnlockRoom(ctx, req.Name, token) - }() - - // find existing room and update it - rm, err := r.LoadRoom(ctx, req.Name) - if err == ErrRoomNotFound { - rm = &livekit.Room{ - Sid: utils.NewGuid(utils.RoomPrefix), - Name: req.Name, - CreationTime: time.Now().Unix(), - TurnPassword: utils.RandomSecret(), - } - applyDefaultRoomConfig(rm, &r.config.Room) - } else if err != nil { - return nil, err - } - - if req.EmptyTimeout > 0 { - rm.EmptyTimeout = req.EmptyTimeout - } - if req.MaxParticipants > 0 { - rm.MaxParticipants = req.MaxParticipants - } - if err := r.StoreRoom(ctx, rm); err != nil { - return nil, err - } - - // Is that node still available? - node, err := r.router.GetNodeForRoom(ctx, rm.Name) - if err != routing.ErrNotFound && err != nil { - return nil, err - } - - // keep it on that node - if err == nil && routing.IsAvailable(node) { - return rm, nil - } - - // select a new node - nodeId := req.NodeId - if nodeId == "" { - // select a node for room - nodes, err := r.router.ListNodes() - if err != nil { - return nil, err - } - - node, err := r.selector.SelectNode(nodes, rm) - if err != nil { - return nil, err - } - nodeId = node.Id - } - - logger.Debugw("selected node for room", "room", rm.Name, "roomID", rm.Sid, "nodeID", nodeId) - if err := r.router.SetNodeForRoom(ctx, req.Name, nodeId); err != nil { - return nil, err - } - - return rm, nil -} - func (r *LocalRoomManager) GetRoom(ctx context.Context, roomName string) *rtc.Room { r.lock.RLock() defer r.lock.RUnlock() diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 24d846f10..524cf2df0 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -2,6 +2,7 @@ package service import ( "context" + "sync" livekit "github.com/livekit/protocol/proto" "github.com/pkg/errors" @@ -13,14 +14,17 @@ import ( // A rooms service that supports a single node type RoomService struct { - router routing.Router - roomManager RoomManager + router routing.Router + selector routing.NodeSelector + roomAllocator *RoomAllocator + roomStore RoomStore } -func NewRoomService(roomManager RoomManager, router routing.Router) (svc *RoomService, err error) { +func NewRoomService(ra *RoomAllocator, rs RoomStore, router routing.Router) (svc *RoomService, err error) { svc = &RoomService{ - router: router, - roomManager: roomManager, + router: router, + roomAllocator: ra, + roomStore: rs, } return } @@ -30,7 +34,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return nil, twirpAuthError(err) } - rm, err = s.roomManager.CreateRoom(ctx, req) + rm, err = s.roomAllocator.CreateRoom(ctx, req) if err != nil { err = errors.Wrap(err, "could not create room") } @@ -44,7 +48,7 @@ func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsReque return nil, twirpAuthError(err) } - rooms, err := s.roomManager.ListRooms(ctx) + rooms, err := s.roomStore.ListRooms(ctx) if err != nil { // TODO: translate error codes to twirp return @@ -62,27 +66,40 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq } // if the room is currently active, RTC node needs to disconnect clients // here we are using any user's identity, due to how it works with routing - participants, err := s.roomManager.ListParticipants(ctx, req.Room) + participants, err := s.roomStore.ListParticipants(ctx, req.Room) if err != nil { return nil, err } if len(participants) > 0 { - err := s.writeMessage(ctx, req.Room, participants[0].Identity, &livekit.RTCNodeMessage{ + err = s.writeMessage(ctx, req.Room, participants[0].Identity, &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_DeleteRoom{ DeleteRoom: req, }, }) - if err != nil { - return nil, err - } } else { - // if a room hasn't started, delete locally - if err = s.roomManager.DeleteRoom(ctx, req.Room); err != nil { - err = twirp.WrapError(twirp.InternalError("could not delete room"), err) - return nil, err + var err2 error + wg := sync.WaitGroup{} + wg.Add(2) + // clear routing information + go func() { + defer wg.Done() + err = s.router.ClearRoomState(ctx, req.Room) + }() + // also delete room from db + go func() { + defer wg.Done() + err2 = s.roomStore.DeleteRoom(ctx, req.Room) + }() + + wg.Wait() + if err2 != nil { + err = err2 } } + if err != nil { + return nil, err + } return &livekit.DeleteRoomResponse{}, nil } @@ -92,7 +109,7 @@ func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListPar return nil, twirpAuthError(err) } - participants, err := s.roomManager.ListParticipants(ctx, req.Room) + participants, err := s.roomStore.ListParticipants(ctx, req.Room) if err != nil { return } @@ -108,7 +125,7 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti return nil, twirpAuthError(err) } - participant, err := s.roomManager.LoadParticipant(ctx, req.Room, req.Identity) + participant, err := s.roomStore.LoadParticipant(ctx, req.Room, req.Identity) if err != nil { return } @@ -136,7 +153,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR return nil, twirpAuthError(err) } - participant, err := s.roomManager.LoadParticipant(ctx, req.Room, req.Identity) + participant, err := s.roomStore.LoadParticipant(ctx, req.Room, req.Identity) if err != nil { return nil, err } @@ -175,7 +192,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update return nil, err } - participant, err := s.roomManager.LoadParticipant(ctx, req.Room, req.Identity) + participant, err := s.roomStore.LoadParticipant(ctx, req.Room, req.Identity) if err != nil { return nil, err } @@ -199,7 +216,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) { // here we are using any user's identity, due to how it works with routing - participants, err := s.roomManager.ListParticipants(ctx, req.Room) + participants, err := s.roomStore.ListParticipants(ctx, req.Room) if err != nil { return nil, err } @@ -223,7 +240,7 @@ func (s *RoomService) writeMessage(ctx context.Context, room, identity string, m return twirpAuthError(err) } - _, err := s.roomManager.LoadParticipant(ctx, room, identity) + _, err := s.roomStore.LoadParticipant(ctx, room, identity) if err != nil { return err } diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 6fc5ac29b..6b390acec 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -18,20 +18,20 @@ import ( ) type RTCService struct { - router routing.Router - roomManager RoomManager - upgrader websocket.Upgrader - currentNode routing.LocalNode - isDev bool + router routing.Router + roomAllocator *RoomAllocator + upgrader websocket.Upgrader + currentNode routing.LocalNode + isDev bool } -func NewRTCService(conf *config.Config, roomManager RoomManager, router routing.Router, currentNode routing.LocalNode) *RTCService { +func NewRTCService(conf *config.Config, ra *RoomAllocator, router routing.Router, currentNode routing.LocalNode) *RTCService { s := &RTCService{ - router: router, - roomManager: roomManager, - upgrader: websocket.Upgrader{}, - currentNode: currentNode, - isDev: conf.Development, + router: router, + roomAllocator: ra, + upgrader: websocket.Upgrader{}, + currentNode: currentNode, + isDev: conf.Development, } // allow connections from any origin, since script may be hosted anywhere @@ -105,7 +105,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // create room if it doesn't exist, also assigns an RTC node for the room - rm, err := s.roomManager.CreateRoom(r.Context(), &livekit.CreateRoomRequest{Name: roomName}) + rm, err := s.roomAllocator.CreateRoom(r.Context(), &livekit.CreateRoomRequest{Name: roomName}) if err != nil { handleError(w, http.StatusInternalServerError, err.Error()) return diff --git a/pkg/service/utils.go b/pkg/service/utils.go index 13ede9ae6..dac1d7b3c 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -29,6 +29,7 @@ var ServiceSet = wire.NewSet( CreateWebhookNotifier, CreateNodeSelector, NewRecordingService, + NewRoomAllocator, NewRoomService, NewRTCService, NewLivekitServer, diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 1a9cf9fc2..e0de0f5de 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,8 +1,7 @@ // Code generated by Wire. DO NOT EDIT. //go:generate go run github.com/google/wire/cmd/wire -//go:build !wireinject -// +build !wireinject +//+build !wireinject package service @@ -19,9 +18,17 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - roomStore := createStore(client) router := createRouter(client, currentNode) nodeSelector := CreateNodeSelector(conf) + roomStore := createStore(client) + roomAllocator := NewRoomAllocator(conf, router, nodeSelector, roomStore) + roomService, err := NewRoomService(roomAllocator, roomStore, router) + if err != nil { + return nil, err + } + messageBus := utils.NewRedisMessageBus(client) + recordingService := NewRecordingService(messageBus) + rtcService := NewRTCService(conf, roomAllocator, router, currentNode) keyProvider, err := CreateKeyProvider(conf) if err != nil { return nil, err @@ -34,13 +41,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - roomService, err := NewRoomService(localRoomManager, router) - if err != nil { - return nil, err - } - messageBus := utils.NewRedisMessageBus(client) - recordingService := NewRecordingService(messageBus) - rtcService := NewRTCService(conf, localRoomManager, router, currentNode) server, err := NewTurnServer(conf, roomStore, currentNode) if err != nil { return nil, err