diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index ba390554f..653269225 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -83,6 +83,6 @@ type IOClient interface { //counterfeiter:generate . RoomAllocator type RoomAllocator interface { - CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) + CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error) ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error } diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index 2d538cdd1..f222f7b15 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -50,10 +50,10 @@ func NewRoomAllocator(conf *config.Config, router routing.Router, rs ObjectStore // CreateRoom creates a new room from a request and allocates it to a node to handle // it'll also monitor its state, and cleans it up when appropriate -func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) { +func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error) { token, err := r.roomStore.LockRoom(ctx, livekit.RoomName(req.Name), 5*time.Second) if err != nil { - return nil, err + return nil, false, err } defer func() { _ = r.roomStore.UnlockRoom(ctx, livekit.RoomName(req.Name), token) @@ -71,7 +71,7 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre internal = &livekit.RoomInternal{} applyDefaultRoomConfig(rm, internal, &r.config.Room) } else if err != nil { - return nil, err + return nil, false, err } if req.EmptyTimeout > 0 { @@ -98,23 +98,23 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre } if err = r.roomStore.StoreRoom(ctx, rm, internal); err != nil { - return nil, err + return nil, false, err } // check if room already assigned existing, err := r.router.GetNodeForRoom(ctx, livekit.RoomName(rm.Name)) if err != routing.ErrNotFound && err != nil { - return nil, err + return nil, false, err } // if already assigned and still available, keep it on that node if err == nil && selector.IsAvailable(existing) { // if node hosting the room is full, deny entry if selector.LimitsReached(r.config.Limit, existing.Stats) { - return nil, routing.ErrNodeLimitReached + return nil, false, routing.ErrNodeLimitReached } - return rm, nil + return rm, false, nil } // select a new node @@ -122,12 +122,12 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre if nodeID == "" { nodes, err := r.router.ListNodes() if err != nil { - return nil, err + return nil, false, err } node, err := r.selector.SelectNode(nodes) if err != nil { - return nil, err + return nil, false, err } nodeID = livekit.NodeID(node.Id) @@ -136,10 +136,10 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre logger.Infow("selected node for room", "room", rm.Name, "roomID", rm.Sid, "selectedNodeID", nodeID) err = r.router.SetNodeForRoom(ctx, livekit.RoomName(rm.Name), nodeID) if err != nil { - return nil, err + return nil, false, err } - return rm, nil + return rm, true, nil } func (r *StandardRoomAllocator) ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error { diff --git a/pkg/service/roomallocator_test.go b/pkg/service/roomallocator_test.go index 4397e22c4..8c87dc8c2 100644 --- a/pkg/service/roomallocator_test.go +++ b/pkg/service/roomallocator_test.go @@ -39,7 +39,7 @@ func TestCreateRoom(t *testing.T) { ra, conf := newTestRoomAllocator(t, conf, node) - room, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"}) + room, _, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"}) require.NoError(t, err) require.Equal(t, conf.Room.EmptyTimeout, room.EmptyTimeout) require.NotEmpty(t, room.EnabledCodecs) @@ -57,7 +57,7 @@ func TestCreateRoom(t *testing.T) { ra, _ := newTestRoomAllocator(t, conf, node) - _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"}) + _, _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"}) require.ErrorIs(t, err, routing.ErrNodeLimitReached) }) @@ -73,7 +73,7 @@ func TestCreateRoom(t *testing.T) { ra, _ := newTestRoomAllocator(t, conf, node) - _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"}) + _, _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"}) require.ErrorIs(t, err, routing.ErrNodeLimitReached) }) } diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 2f75ac00c..dd9cefebc 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -79,7 +79,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return nil, ErrEgressNotConnected } - rm, err := s.roomAllocator.CreateRoom(ctx, req) + rm, created, err := s.roomAllocator.CreateRoom(ctx, req) if err != nil { err = errors.Wrap(err, "could not create room") return nil, err @@ -109,7 +109,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return nil, err } - if req.Egress != nil && req.Egress.Room != nil { + if created && req.Egress != nil && req.Egress.Room != nil { egress := &rpc.StartEgressRequest{ Request: &rpc.StartEgressRequest_RoomComposite{ RoomComposite: req.Egress.Room, @@ -424,7 +424,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat // no one has joined the room, would not have been created on an RTC node. // in this case, we'd want to run create again - _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{ + _, _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{ Name: req.Room, Metadata: req.Metadata, }) diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index dc203fec9..b0683115b 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -517,7 +517,7 @@ type connectionResult struct { func (s *RTCService) startConnection(ctx context.Context, roomName livekit.RoomName, pi routing.ParticipantInit, timeout time.Duration) (connectionResult, *livekit.SignalResponse, error) { var cr connectionResult var err error - cr.Room, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName)}) + cr.Room, _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName)}) if err != nil { return cr, nil, err } diff --git a/pkg/service/servicefakes/fake_room_allocator.go b/pkg/service/servicefakes/fake_room_allocator.go index e88dde554..134b9649b 100644 --- a/pkg/service/servicefakes/fake_room_allocator.go +++ b/pkg/service/servicefakes/fake_room_allocator.go @@ -10,7 +10,7 @@ import ( ) type FakeRoomAllocator struct { - CreateRoomStub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, error) + CreateRoomStub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, bool, error) createRoomMutex sync.RWMutex createRoomArgsForCall []struct { arg1 context.Context @@ -18,11 +18,13 @@ type FakeRoomAllocator struct { } createRoomReturns struct { result1 *livekit.Room - result2 error + result2 bool + result3 error } createRoomReturnsOnCall map[int]struct { result1 *livekit.Room - result2 error + result2 bool + result3 error } ValidateCreateRoomStub func(context.Context, livekit.RoomName) error validateCreateRoomMutex sync.RWMutex @@ -40,7 +42,7 @@ type FakeRoomAllocator struct { invocationsMutex sync.RWMutex } -func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.CreateRoomRequest) (*livekit.Room, error) { +func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.CreateRoomRequest) (*livekit.Room, bool, error) { fake.createRoomMutex.Lock() ret, specificReturn := fake.createRoomReturnsOnCall[len(fake.createRoomArgsForCall)] fake.createRoomArgsForCall = append(fake.createRoomArgsForCall, struct { @@ -55,9 +57,9 @@ func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.Cr return stub(arg1, arg2) } if specificReturn { - return ret.result1, ret.result2 + return ret.result1, ret.result2, ret.result3 } - return fakeReturns.result1, fakeReturns.result2 + return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3 } func (fake *FakeRoomAllocator) CreateRoomCallCount() int { @@ -66,7 +68,7 @@ func (fake *FakeRoomAllocator) CreateRoomCallCount() int { return len(fake.createRoomArgsForCall) } -func (fake *FakeRoomAllocator) CreateRoomCalls(stub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, error)) { +func (fake *FakeRoomAllocator) CreateRoomCalls(stub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, bool, error)) { fake.createRoomMutex.Lock() defer fake.createRoomMutex.Unlock() fake.CreateRoomStub = stub @@ -79,30 +81,33 @@ func (fake *FakeRoomAllocator) CreateRoomArgsForCall(i int) (context.Context, *l return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeRoomAllocator) CreateRoomReturns(result1 *livekit.Room, result2 error) { +func (fake *FakeRoomAllocator) CreateRoomReturns(result1 *livekit.Room, result2 bool, result3 error) { fake.createRoomMutex.Lock() defer fake.createRoomMutex.Unlock() fake.CreateRoomStub = nil fake.createRoomReturns = struct { result1 *livekit.Room - result2 error - }{result1, result2} + result2 bool + result3 error + }{result1, result2, result3} } -func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.Room, result2 error) { +func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.Room, result2 bool, result3 error) { fake.createRoomMutex.Lock() defer fake.createRoomMutex.Unlock() fake.CreateRoomStub = nil if fake.createRoomReturnsOnCall == nil { fake.createRoomReturnsOnCall = make(map[int]struct { result1 *livekit.Room - result2 error + result2 bool + result3 error }) } fake.createRoomReturnsOnCall[i] = struct { result1 *livekit.Room - result2 error - }{result1, result2} + result2 bool + result3 error + }{result1, result2, result3} } func (fake *FakeRoomAllocator) ValidateCreateRoom(arg1 context.Context, arg2 livekit.RoomName) error {