Only launch room egress once (#2175)

* only launch room egress once

* regenerate fakes
This commit is contained in:
David Colburn
2023-10-24 13:05:23 -07:00
committed by GitHub
parent f80e87b216
commit b8ac836b9b
6 changed files with 38 additions and 33 deletions

View File

@@ -83,6 +83,6 @@ type IOClient interface {
//counterfeiter:generate . RoomAllocator
type RoomAllocator interface {
CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error)
CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error)
ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error
}

View File

@@ -50,10 +50,10 @@ func NewRoomAllocator(conf *config.Config, router routing.Router, rs ObjectStore
// CreateRoom creates a new room from a request and allocates it to a node to handle
// it'll also monitor its state, and cleans it up when appropriate
func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) {
func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error) {
token, err := r.roomStore.LockRoom(ctx, livekit.RoomName(req.Name), 5*time.Second)
if err != nil {
return nil, err
return nil, false, err
}
defer func() {
_ = r.roomStore.UnlockRoom(ctx, livekit.RoomName(req.Name), token)
@@ -71,7 +71,7 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
internal = &livekit.RoomInternal{}
applyDefaultRoomConfig(rm, internal, &r.config.Room)
} else if err != nil {
return nil, err
return nil, false, err
}
if req.EmptyTimeout > 0 {
@@ -98,23 +98,23 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
}
if err = r.roomStore.StoreRoom(ctx, rm, internal); err != nil {
return nil, err
return nil, false, err
}
// check if room already assigned
existing, err := r.router.GetNodeForRoom(ctx, livekit.RoomName(rm.Name))
if err != routing.ErrNotFound && err != nil {
return nil, err
return nil, false, err
}
// if already assigned and still available, keep it on that node
if err == nil && selector.IsAvailable(existing) {
// if node hosting the room is full, deny entry
if selector.LimitsReached(r.config.Limit, existing.Stats) {
return nil, routing.ErrNodeLimitReached
return nil, false, routing.ErrNodeLimitReached
}
return rm, nil
return rm, false, nil
}
// select a new node
@@ -122,12 +122,12 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
if nodeID == "" {
nodes, err := r.router.ListNodes()
if err != nil {
return nil, err
return nil, false, err
}
node, err := r.selector.SelectNode(nodes)
if err != nil {
return nil, err
return nil, false, err
}
nodeID = livekit.NodeID(node.Id)
@@ -136,10 +136,10 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
logger.Infow("selected node for room", "room", rm.Name, "roomID", rm.Sid, "selectedNodeID", nodeID)
err = r.router.SetNodeForRoom(ctx, livekit.RoomName(rm.Name), nodeID)
if err != nil {
return nil, err
return nil, false, err
}
return rm, nil
return rm, true, nil
}
func (r *StandardRoomAllocator) ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error {

View File

@@ -39,7 +39,7 @@ func TestCreateRoom(t *testing.T) {
ra, conf := newTestRoomAllocator(t, conf, node)
room, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"})
room, _, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"})
require.NoError(t, err)
require.Equal(t, conf.Room.EmptyTimeout, room.EmptyTimeout)
require.NotEmpty(t, room.EnabledCodecs)
@@ -57,7 +57,7 @@ func TestCreateRoom(t *testing.T) {
ra, _ := newTestRoomAllocator(t, conf, node)
_, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"})
_, _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"})
require.ErrorIs(t, err, routing.ErrNodeLimitReached)
})
@@ -73,7 +73,7 @@ func TestCreateRoom(t *testing.T) {
ra, _ := newTestRoomAllocator(t, conf, node)
_, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"})
_, _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"})
require.ErrorIs(t, err, routing.ErrNodeLimitReached)
})
}

View File

@@ -79,7 +79,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
return nil, ErrEgressNotConnected
}
rm, err := s.roomAllocator.CreateRoom(ctx, req)
rm, created, err := s.roomAllocator.CreateRoom(ctx, req)
if err != nil {
err = errors.Wrap(err, "could not create room")
return nil, err
@@ -109,7 +109,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
return nil, err
}
if req.Egress != nil && req.Egress.Room != nil {
if created && req.Egress != nil && req.Egress.Room != nil {
egress := &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_RoomComposite{
RoomComposite: req.Egress.Room,
@@ -424,7 +424,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
// no one has joined the room, would not have been created on an RTC node.
// in this case, we'd want to run create again
_, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{
_, _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{
Name: req.Room,
Metadata: req.Metadata,
})

View File

@@ -517,7 +517,7 @@ type connectionResult struct {
func (s *RTCService) startConnection(ctx context.Context, roomName livekit.RoomName, pi routing.ParticipantInit, timeout time.Duration) (connectionResult, *livekit.SignalResponse, error) {
var cr connectionResult
var err error
cr.Room, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName)})
cr.Room, _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName)})
if err != nil {
return cr, nil, err
}

View File

@@ -10,7 +10,7 @@ import (
)
type FakeRoomAllocator struct {
CreateRoomStub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, error)
CreateRoomStub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, bool, error)
createRoomMutex sync.RWMutex
createRoomArgsForCall []struct {
arg1 context.Context
@@ -18,11 +18,13 @@ type FakeRoomAllocator struct {
}
createRoomReturns struct {
result1 *livekit.Room
result2 error
result2 bool
result3 error
}
createRoomReturnsOnCall map[int]struct {
result1 *livekit.Room
result2 error
result2 bool
result3 error
}
ValidateCreateRoomStub func(context.Context, livekit.RoomName) error
validateCreateRoomMutex sync.RWMutex
@@ -40,7 +42,7 @@ type FakeRoomAllocator struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.CreateRoomRequest) (*livekit.Room, error) {
func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.CreateRoomRequest) (*livekit.Room, bool, error) {
fake.createRoomMutex.Lock()
ret, specificReturn := fake.createRoomReturnsOnCall[len(fake.createRoomArgsForCall)]
fake.createRoomArgsForCall = append(fake.createRoomArgsForCall, struct {
@@ -55,9 +57,9 @@ func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.Cr
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
return ret.result1, ret.result2, ret.result3
}
return fakeReturns.result1, fakeReturns.result2
return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3
}
func (fake *FakeRoomAllocator) CreateRoomCallCount() int {
@@ -66,7 +68,7 @@ func (fake *FakeRoomAllocator) CreateRoomCallCount() int {
return len(fake.createRoomArgsForCall)
}
func (fake *FakeRoomAllocator) CreateRoomCalls(stub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, error)) {
func (fake *FakeRoomAllocator) CreateRoomCalls(stub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, bool, error)) {
fake.createRoomMutex.Lock()
defer fake.createRoomMutex.Unlock()
fake.CreateRoomStub = stub
@@ -79,30 +81,33 @@ func (fake *FakeRoomAllocator) CreateRoomArgsForCall(i int) (context.Context, *l
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRoomAllocator) CreateRoomReturns(result1 *livekit.Room, result2 error) {
func (fake *FakeRoomAllocator) CreateRoomReturns(result1 *livekit.Room, result2 bool, result3 error) {
fake.createRoomMutex.Lock()
defer fake.createRoomMutex.Unlock()
fake.CreateRoomStub = nil
fake.createRoomReturns = struct {
result1 *livekit.Room
result2 error
}{result1, result2}
result2 bool
result3 error
}{result1, result2, result3}
}
func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.Room, result2 error) {
func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.Room, result2 bool, result3 error) {
fake.createRoomMutex.Lock()
defer fake.createRoomMutex.Unlock()
fake.CreateRoomStub = nil
if fake.createRoomReturnsOnCall == nil {
fake.createRoomReturnsOnCall = make(map[int]struct {
result1 *livekit.Room
result2 error
result2 bool
result3 error
})
}
fake.createRoomReturnsOnCall[i] = struct {
result1 *livekit.Room
result2 error
}{result1, result2}
result2 bool
result3 error
}{result1, result2, result3}
}
func (fake *FakeRoomAllocator) ValidateCreateRoom(arg1 context.Context, arg2 livekit.RoomName) error {