Remove room manager from room service (#119)

* start splitting

* room allocator

* remove room manager

* Update pkg/service/roomallocator.go

Co-authored-by: David Zhao <david@davidzhao.com>

Co-authored-by: David Zhao <david@davidzhao.com>
This commit is contained in:
David Colburn
2021-09-16 23:29:29 -07:00
committed by GitHub
parent 700a879c0b
commit abde72a907
9 changed files with 196 additions and 125 deletions
+30 -2
View File
@@ -71,6 +71,10 @@ type FakeRouter struct {
onRTCMessageArgsForCall []struct {
arg1 routing.RTCMessageCallback
}
PreStopStub func()
preStopMutex sync.RWMutex
preStopArgsForCall []struct {
}
RegisterNodeStub func() error
registerNodeMutex sync.RWMutex
registerNodeArgsForCall []struct {
@@ -476,6 +480,30 @@ func (fake *FakeRouter) OnRTCMessageArgsForCall(i int) routing.RTCMessageCallbac
return argsForCall.arg1
}
func (fake *FakeRouter) PreStop() {
fake.preStopMutex.Lock()
fake.preStopArgsForCall = append(fake.preStopArgsForCall, struct {
}{})
stub := fake.PreStopStub
fake.recordInvocation("PreStop", []interface{}{})
fake.preStopMutex.Unlock()
if stub != nil {
fake.PreStopStub()
}
}
func (fake *FakeRouter) PreStopCallCount() int {
fake.preStopMutex.RLock()
defer fake.preStopMutex.RUnlock()
return len(fake.preStopArgsForCall)
}
func (fake *FakeRouter) PreStopCalls(stub func()) {
fake.preStopMutex.Lock()
defer fake.preStopMutex.Unlock()
fake.PreStopStub = stub
}
func (fake *FakeRouter) RegisterNode() error {
fake.registerNodeMutex.Lock()
ret, specificReturn := fake.registerNodeReturnsOnCall[len(fake.registerNodeArgsForCall)]
@@ -782,8 +810,6 @@ func (fake *FakeRouter) Stop() {
}
}
func (fake *FakeRouter) PreStop() {}
func (fake *FakeRouter) StopCallCount() int {
fake.stopMutex.RLock()
defer fake.stopMutex.RUnlock()
@@ -928,6 +954,8 @@ func (fake *FakeRouter) Invocations() map[string][][]interface{} {
defer fake.onNewParticipantRTCMutex.RUnlock()
fake.onRTCMessageMutex.RLock()
defer fake.onRTCMessageMutex.RUnlock()
fake.preStopMutex.RLock()
defer fake.preStopMutex.RUnlock()
fake.registerNodeMutex.RLock()
defer fake.registerNodeMutex.RUnlock()
fake.removeDeadNodesMutex.RLock()
-1
View File
@@ -35,7 +35,6 @@ type RoomStore interface {
type RoomManager interface {
RoomStore
CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error)
GetRoom(ctx context.Context, roomName string) *rtc.Room
DeleteRoom(ctx context.Context, roomName string) error
StartSession(ctx context.Context, roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink)
+99
View File
@@ -0,0 +1,99 @@
package service
import (
"context"
"time"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
)
type RoomAllocator struct {
config *config.Config
router routing.Router
selector routing.NodeSelector
roomStore RoomStore
}
func NewRoomAllocator(conf *config.Config, router routing.Router, selector routing.NodeSelector, rs RoomStore) *RoomAllocator {
return &RoomAllocator{
config: conf,
router: router,
selector: selector,
roomStore: rs,
}
}
// CreateRoom creates a new room from a request and allocates it to a node to handle
// it'll also monitor its state, and cleans it up when appropriate
func (r *RoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) {
token, err := r.roomStore.LockRoom(ctx, req.Name, 5*time.Second)
if err != nil {
return nil, err
}
defer func() {
_ = r.roomStore.UnlockRoom(ctx, req.Name, token)
}()
// find existing room and update it
rm, err := r.roomStore.LoadRoom(ctx, req.Name)
if err == ErrRoomNotFound {
rm = &livekit.Room{
Sid: utils.NewGuid(utils.RoomPrefix),
Name: req.Name,
CreationTime: time.Now().Unix(),
TurnPassword: utils.RandomSecret(),
}
applyDefaultRoomConfig(rm, &r.config.Room)
} else if err != nil {
return nil, err
}
if req.EmptyTimeout > 0 {
rm.EmptyTimeout = req.EmptyTimeout
}
if req.MaxParticipants > 0 {
rm.MaxParticipants = req.MaxParticipants
}
if err := r.roomStore.StoreRoom(ctx, rm); err != nil {
return nil, err
}
// Is that node still available?
node, err := r.router.GetNodeForRoom(ctx, rm.Name)
if err != routing.ErrNotFound && err != nil {
return nil, err
}
// keep it on that node
if err == nil && routing.IsAvailable(node) {
return rm, nil
}
// select a new node
nodeId := req.NodeId
if nodeId == "" {
// select a node for room
nodes, err := r.router.ListNodes()
if err != nil {
return nil, err
}
node, err := r.selector.SelectNode(nodes, rm)
if err != nil {
return nil, err
}
nodeId = node.Id
}
logger.Debugw("selected node for room", "room", rm.Name, "roomID", rm.Sid, "nodeID", nodeId)
if err := r.router.SetNodeForRoom(ctx, req.Name, nodeId); err != nil {
return nil, err
}
return rm, nil
}
@@ -15,17 +15,17 @@ import (
)
func TestCreateRoom(t *testing.T) {
manager, conf := newTestRoomManager(t)
ra, conf := newTestRoomAllocator(t)
t.Run("ensure default room settings are applied", func(t *testing.T) {
room, err := manager.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"})
room, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"})
require.NoError(t, err)
require.Equal(t, conf.Room.EmptyTimeout, room.EmptyTimeout)
require.NotEmpty(t, room.EnabledCodecs)
})
}
func newTestRoomManager(t *testing.T) (*service.LocalRoomManager, *config.Config) {
func newTestRoomAllocator(t *testing.T) (*service.RoomAllocator, *config.Config) {
store := &servicefakes.FakeRoomStore{}
store.LoadRoomReturns(nil, service.ErrRoomNotFound)
router := &routingfakes.FakeRouter{}
@@ -37,8 +37,6 @@ func newTestRoomManager(t *testing.T) (*service.LocalRoomManager, *config.Config
router.GetNodeForRoomReturns(node, nil)
rm, err := service.NewLocalRoomManager(store, router, node, selector, nil, conf)
require.NoError(t, err)
return rm, conf
ra := service.NewRoomAllocator(conf, router, selector, store)
return ra, conf
}
-71
View File
@@ -9,7 +9,6 @@ import (
"github.com/gammazero/workerpool"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
"github.com/livekit/livekit-server/pkg/config"
@@ -64,76 +63,6 @@ func NewLocalRoomManager(rp RoomStore, router routing.Router, currentNode routin
return r, nil
}
// CreateRoom creates a new room from a request and allocates it to a node to handle
// it'll also monitor fits state, and cleans it up when appropriate
func (r *LocalRoomManager) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) {
token, err := r.LockRoom(ctx, req.Name, 5*time.Second)
if err != nil {
return nil, err
}
defer func() {
_ = r.UnlockRoom(ctx, req.Name, token)
}()
// find existing room and update it
rm, err := r.LoadRoom(ctx, req.Name)
if err == ErrRoomNotFound {
rm = &livekit.Room{
Sid: utils.NewGuid(utils.RoomPrefix),
Name: req.Name,
CreationTime: time.Now().Unix(),
TurnPassword: utils.RandomSecret(),
}
applyDefaultRoomConfig(rm, &r.config.Room)
} else if err != nil {
return nil, err
}
if req.EmptyTimeout > 0 {
rm.EmptyTimeout = req.EmptyTimeout
}
if req.MaxParticipants > 0 {
rm.MaxParticipants = req.MaxParticipants
}
if err := r.StoreRoom(ctx, rm); err != nil {
return nil, err
}
// Is that node still available?
node, err := r.router.GetNodeForRoom(ctx, rm.Name)
if err != routing.ErrNotFound && err != nil {
return nil, err
}
// keep it on that node
if err == nil && routing.IsAvailable(node) {
return rm, nil
}
// select a new node
nodeId := req.NodeId
if nodeId == "" {
// select a node for room
nodes, err := r.router.ListNodes()
if err != nil {
return nil, err
}
node, err := r.selector.SelectNode(nodes, rm)
if err != nil {
return nil, err
}
nodeId = node.Id
}
logger.Debugw("selected node for room", "room", rm.Name, "roomID", rm.Sid, "nodeID", nodeId)
if err := r.router.SetNodeForRoom(ctx, req.Name, nodeId); err != nil {
return nil, err
}
return rm, nil
}
func (r *LocalRoomManager) GetRoom(ctx context.Context, roomName string) *rtc.Room {
r.lock.RLock()
defer r.lock.RUnlock()
+39 -22
View File
@@ -2,6 +2,7 @@ package service
import (
"context"
"sync"
livekit "github.com/livekit/protocol/proto"
"github.com/pkg/errors"
@@ -13,14 +14,17 @@ import (
// A rooms service that supports a single node
type RoomService struct {
router routing.Router
roomManager RoomManager
router routing.Router
selector routing.NodeSelector
roomAllocator *RoomAllocator
roomStore RoomStore
}
func NewRoomService(roomManager RoomManager, router routing.Router) (svc *RoomService, err error) {
func NewRoomService(ra *RoomAllocator, rs RoomStore, router routing.Router) (svc *RoomService, err error) {
svc = &RoomService{
router: router,
roomManager: roomManager,
router: router,
roomAllocator: ra,
roomStore: rs,
}
return
}
@@ -30,7 +34,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
return nil, twirpAuthError(err)
}
rm, err = s.roomManager.CreateRoom(ctx, req)
rm, err = s.roomAllocator.CreateRoom(ctx, req)
if err != nil {
err = errors.Wrap(err, "could not create room")
}
@@ -44,7 +48,7 @@ func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsReque
return nil, twirpAuthError(err)
}
rooms, err := s.roomManager.ListRooms(ctx)
rooms, err := s.roomStore.ListRooms(ctx)
if err != nil {
// TODO: translate error codes to twirp
return
@@ -62,27 +66,40 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
}
// if the room is currently active, RTC node needs to disconnect clients
// here we are using any user's identity, due to how it works with routing
participants, err := s.roomManager.ListParticipants(ctx, req.Room)
participants, err := s.roomStore.ListParticipants(ctx, req.Room)
if err != nil {
return nil, err
}
if len(participants) > 0 {
err := s.writeMessage(ctx, req.Room, participants[0].Identity, &livekit.RTCNodeMessage{
err = s.writeMessage(ctx, req.Room, participants[0].Identity, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_DeleteRoom{
DeleteRoom: req,
},
})
if err != nil {
return nil, err
}
} else {
// if a room hasn't started, delete locally
if err = s.roomManager.DeleteRoom(ctx, req.Room); err != nil {
err = twirp.WrapError(twirp.InternalError("could not delete room"), err)
return nil, err
var err2 error
wg := sync.WaitGroup{}
wg.Add(2)
// clear routing information
go func() {
defer wg.Done()
err = s.router.ClearRoomState(ctx, req.Room)
}()
// also delete room from db
go func() {
defer wg.Done()
err2 = s.roomStore.DeleteRoom(ctx, req.Room)
}()
wg.Wait()
if err2 != nil {
err = err2
}
}
if err != nil {
return nil, err
}
return &livekit.DeleteRoomResponse{}, nil
}
@@ -92,7 +109,7 @@ func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListPar
return nil, twirpAuthError(err)
}
participants, err := s.roomManager.ListParticipants(ctx, req.Room)
participants, err := s.roomStore.ListParticipants(ctx, req.Room)
if err != nil {
return
}
@@ -108,7 +125,7 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti
return nil, twirpAuthError(err)
}
participant, err := s.roomManager.LoadParticipant(ctx, req.Room, req.Identity)
participant, err := s.roomStore.LoadParticipant(ctx, req.Room, req.Identity)
if err != nil {
return
}
@@ -136,7 +153,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR
return nil, twirpAuthError(err)
}
participant, err := s.roomManager.LoadParticipant(ctx, req.Room, req.Identity)
participant, err := s.roomStore.LoadParticipant(ctx, req.Room, req.Identity)
if err != nil {
return nil, err
}
@@ -175,7 +192,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update
return nil, err
}
participant, err := s.roomManager.LoadParticipant(ctx, req.Room, req.Identity)
participant, err := s.roomStore.LoadParticipant(ctx, req.Room, req.Identity)
if err != nil {
return nil, err
}
@@ -199,7 +216,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda
func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) {
// here we are using any user's identity, due to how it works with routing
participants, err := s.roomManager.ListParticipants(ctx, req.Room)
participants, err := s.roomStore.ListParticipants(ctx, req.Room)
if err != nil {
return nil, err
}
@@ -223,7 +240,7 @@ func (s *RoomService) writeMessage(ctx context.Context, room, identity string, m
return twirpAuthError(err)
}
_, err := s.roomManager.LoadParticipant(ctx, room, identity)
_, err := s.roomStore.LoadParticipant(ctx, room, identity)
if err != nil {
return err
}
+12 -12
View File
@@ -18,20 +18,20 @@ import (
)
type RTCService struct {
router routing.Router
roomManager RoomManager
upgrader websocket.Upgrader
currentNode routing.LocalNode
isDev bool
router routing.Router
roomAllocator *RoomAllocator
upgrader websocket.Upgrader
currentNode routing.LocalNode
isDev bool
}
func NewRTCService(conf *config.Config, roomManager RoomManager, router routing.Router, currentNode routing.LocalNode) *RTCService {
func NewRTCService(conf *config.Config, ra *RoomAllocator, router routing.Router, currentNode routing.LocalNode) *RTCService {
s := &RTCService{
router: router,
roomManager: roomManager,
upgrader: websocket.Upgrader{},
currentNode: currentNode,
isDev: conf.Development,
router: router,
roomAllocator: ra,
upgrader: websocket.Upgrader{},
currentNode: currentNode,
isDev: conf.Development,
}
// allow connections from any origin, since script may be hosted anywhere
@@ -105,7 +105,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// create room if it doesn't exist, also assigns an RTC node for the room
rm, err := s.roomManager.CreateRoom(r.Context(), &livekit.CreateRoomRequest{Name: roomName})
rm, err := s.roomAllocator.CreateRoom(r.Context(), &livekit.CreateRoomRequest{Name: roomName})
if err != nil {
handleError(w, http.StatusInternalServerError, err.Error())
return
+1
View File
@@ -29,6 +29,7 @@ var ServiceSet = wire.NewSet(
CreateWebhookNotifier,
CreateNodeSelector,
NewRecordingService,
NewRoomAllocator,
NewRoomService,
NewRTCService,
NewLivekitServer,
+10 -10
View File
@@ -1,8 +1,7 @@
// Code generated by Wire. DO NOT EDIT.
//go:generate go run github.com/google/wire/cmd/wire
//go:build !wireinject
// +build !wireinject
//+build !wireinject
package service
@@ -19,9 +18,17 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
roomStore := createStore(client)
router := createRouter(client, currentNode)
nodeSelector := CreateNodeSelector(conf)
roomStore := createStore(client)
roomAllocator := NewRoomAllocator(conf, router, nodeSelector, roomStore)
roomService, err := NewRoomService(roomAllocator, roomStore, router)
if err != nil {
return nil, err
}
messageBus := utils.NewRedisMessageBus(client)
recordingService := NewRecordingService(messageBus)
rtcService := NewRTCService(conf, roomAllocator, router, currentNode)
keyProvider, err := CreateKeyProvider(conf)
if err != nil {
return nil, err
@@ -34,13 +41,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
roomService, err := NewRoomService(localRoomManager, router)
if err != nil {
return nil, err
}
messageBus := utils.NewRedisMessageBus(client)
recordingService := NewRecordingService(messageBus)
rtcService := NewRTCService(conf, localRoomManager, router, currentNode)
server, err := NewTurnServer(conf, roomStore, currentNode)
if err != nil {
return nil, err