From c04fd86f9c2c143bbb377257680f85657f803118 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Tue, 19 Jan 2021 20:34:11 -0800 Subject: [PATCH] multi-node integration tests --- cmd/cli/client/client.go | 1 + magefile.go | 9 ++ pkg/service/roommanager.go | 23 ++-- proto/livekit/room.pb.go | 79 ++++++++------ proto/livekit/room.twirp.go | 43 ++++---- proto/room.proto | 2 + test/integration_helpers.go | 100 +++++++++--------- test/multinode_test.go | 70 ++++++++++++ ...integration_test.go => singlenode_test.go} | 38 ++++--- 9 files changed, 234 insertions(+), 131 deletions(-) create mode 100644 test/multinode_test.go rename test/{integration_test.go => singlenode_test.go} (76%) diff --git a/cmd/cli/client/client.go b/cmd/cli/client/client.go index f57e27c43..6b1efadcf 100644 --- a/cmd/cli/client/client.go +++ b/cmd/cli/client/client.go @@ -322,6 +322,7 @@ func (c *RTCClient) Stop() { c.connected = false c.iceConnected = false c.conn.Close() + c.PeerConn.Close() c.cancel() } diff --git a/magefile.go b/magefile.go index 94703a938..b8f54d190 100644 --- a/magefile.go +++ b/magefile.go @@ -134,7 +134,16 @@ func Build() error { return nil } +// run unit tests, skipping integration func Test() error { + mg.Deps(Proto) + cmd := exec.Command("go", "test", "-short", "./...") + connectStd(cmd) + return cmd.Run() +} + +// run all thests including integration +func TestAll() error { mg.Deps(Proto) cmd := exec.Command("go", "test", "./...") connectStd(cmd) diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 1d78ea6ae..c6c823a21 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -54,18 +54,21 @@ func (r *RoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, return nil, err } - // allocate room to a node - nodes, err := r.router.ListNodes() - if err != nil { - return nil, err + if req.NodeId == "" { + // select a node for room + nodes, err := r.router.ListNodes() + if err != nil { + return nil, err + } + + node, err := r.selector.SelectNode(nodes, rm) + if err != nil { + return nil, err + } + req.NodeId = node.Id } - node, err := r.selector.SelectNode(nodes, rm) - if err != nil { - return nil, err - } - - if err = r.router.SetNodeForRoom(req.Name, node.Id); err != nil { + if err := r.router.SetNodeForRoom(req.Name, req.NodeId); err != nil { return nil, err } diff --git a/proto/livekit/room.pb.go b/proto/livekit/room.pb.go index 16f6cb243..1bf695f86 100644 --- a/proto/livekit/room.pb.go +++ b/proto/livekit/room.pb.go @@ -34,6 +34,8 @@ type CreateRoomRequest struct { // number of seconds the room should cleanup after being empty EmptyTimeout uint32 `protobuf:"varint,2,opt,name=empty_timeout,json=emptyTimeout,proto3" json:"empty_timeout,omitempty"` MaxParticipants uint32 `protobuf:"varint,3,opt,name=max_participants,json=maxParticipants,proto3" json:"max_participants,omitempty"` + // override the node room is allocated to, for debugging + NodeId string `protobuf:"bytes,4,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"` } func (x *CreateRoomRequest) Reset() { @@ -89,6 +91,13 @@ func (x *CreateRoomRequest) GetMaxParticipants() uint32 { return 0 } +func (x *CreateRoomRequest) GetNodeId() string { + if x != nil { + return x.NodeId + } + return "" +} + type ListRoomsRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -264,40 +273,42 @@ var File_room_proto protoreflect.FileDescriptor var file_room_proto_rawDesc = []byte{ 0x0a, 0x0a, 0x72, 0x6f, 0x6f, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x1a, 0x0b, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x22, 0x77, 0x0a, 0x11, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x65, - 0x6d, 0x70, 0x74, 0x79, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0d, 0x52, 0x0c, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, - 0x12, 0x29, 0x0a, 0x10, 0x6d, 0x61, 0x78, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, - 0x61, 0x6e, 0x74, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x6d, 0x61, 0x78, 0x50, - 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x73, 0x22, 0x12, 0x0a, 0x10, 0x4c, - 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, - 0x38, 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x05, 0x72, 0x6f, 0x6f, 0x6d, 0x73, 0x18, 0x01, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x52, 0x6f, - 0x6f, 0x6d, 0x52, 0x05, 0x72, 0x6f, 0x6f, 0x6d, 0x73, 0x22, 0x27, 0x0a, 0x11, 0x44, 0x65, 0x6c, - 0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, - 0x0a, 0x04, 0x72, 0x6f, 0x6f, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x72, 0x6f, - 0x6f, 0x6d, 0x22, 0x14, 0x0a, 0x12, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xd1, 0x01, 0x0a, 0x0b, 0x52, 0x6f, 0x6f, - 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x0a, 0x43, 0x72, 0x65, 0x61, - 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x12, 0x1a, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, - 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x52, 0x6f, 0x6f, - 0x6d, 0x12, 0x42, 0x0a, 0x09, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x12, 0x19, - 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, - 0x6d, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x6c, 0x69, 0x76, 0x65, - 0x6b, 0x69, 0x74, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, 0x0a, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, - 0x6f, 0x6f, 0x6d, 0x12, 0x1a, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x44, 0x65, - 0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x1b, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, - 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x31, 0x5a, 0x2f, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, - 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x6f, 0x22, 0x90, 0x01, 0x0a, 0x11, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x52, 0x6f, 0x6f, + 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, + 0x65, 0x6d, 0x70, 0x74, 0x79, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, + 0x74, 0x12, 0x29, 0x0a, 0x10, 0x6d, 0x61, 0x78, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, + 0x70, 0x61, 0x6e, 0x74, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x6d, 0x61, 0x78, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x73, 0x12, 0x17, 0x0a, 0x07, + 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6e, + 0x6f, 0x64, 0x65, 0x49, 0x64, 0x22, 0x12, 0x0a, 0x10, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, + 0x6d, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x38, 0x0a, 0x11, 0x4c, 0x69, 0x73, + 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, + 0x0a, 0x05, 0x72, 0x6f, 0x6f, 0x6d, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, + 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x05, 0x72, 0x6f, + 0x6f, 0x6d, 0x73, 0x22, 0x27, 0x0a, 0x11, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f, + 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x72, 0x6f, 0x6f, 0x6d, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x72, 0x6f, 0x6f, 0x6d, 0x22, 0x14, 0x0a, 0x12, + 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x32, 0xd1, 0x01, 0x0a, 0x0b, 0x52, 0x6f, 0x6f, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x12, 0x37, 0x0a, 0x0a, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, + 0x12, 0x1a, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, + 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x6c, + 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x52, 0x6f, 0x6f, 0x6d, 0x12, 0x42, 0x0a, 0x09, 0x4c, + 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x12, 0x19, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, + 0x69, 0x74, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x4c, 0x69, + 0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x45, 0x0a, 0x0a, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x12, 0x1a, 0x2e, + 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f, + 0x6f, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x6c, 0x69, 0x76, 0x65, + 0x6b, 0x69, 0x74, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, + 0x65, 0x6b, 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( diff --git a/proto/livekit/room.twirp.go b/proto/livekit/room.twirp.go index f9dc4b360..cf846a30e 100644 --- a/proto/livekit/room.twirp.go +++ b/proto/livekit/room.twirp.go @@ -1594,25 +1594,26 @@ func callClientError(ctx context.Context, h *twirp.ClientHooks, err twirp.Error) } var twirpFileDescriptor0 = []byte{ - // 311 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0x3f, 0x4f, 0xf3, 0x30, - 0x10, 0xc6, 0x95, 0xb7, 0x2f, 0xa0, 0x5e, 0xa9, 0x68, 0x2d, 0x86, 0x60, 0x96, 0x2a, 0x0c, 0x94, - 0x81, 0x54, 0x94, 0x01, 0xe6, 0x02, 0x1b, 0x03, 0x0a, 0x4c, 0x2c, 0x95, 0x1b, 0x4e, 0x60, 0x11, - 0xc7, 0xc1, 0x76, 0x42, 0xf9, 0x98, 0x7c, 0x23, 0x64, 0xe7, 0x4f, 0x03, 0xe9, 0xe4, 0xf3, 0xf3, - 0x9c, 0xfc, 0xdc, 0xef, 0x64, 0x00, 0x25, 0xa5, 0x08, 0x33, 0x25, 0x8d, 0x24, 0x7b, 0x09, 0x2f, - 0xf0, 0x9d, 0x1b, 0x3a, 0x10, 0xf2, 0x05, 0x93, 0x52, 0x0d, 0x3e, 0x61, 0x7c, 0xa3, 0x90, 0x19, - 0x8c, 0xa4, 0x14, 0x11, 0x7e, 0xe4, 0xa8, 0x0d, 0x21, 0xf0, 0x3f, 0x65, 0x02, 0x7d, 0x6f, 0xe2, - 0x4d, 0xfb, 0x91, 0xab, 0xc9, 0x09, 0x0c, 0x51, 0x64, 0xe6, 0x6b, 0x69, 0xb8, 0x40, 0x99, 0x1b, - 0xff, 0xdf, 0xc4, 0x9b, 0x0e, 0xa3, 0x7d, 0x27, 0x3e, 0x95, 0x1a, 0x39, 0x83, 0x91, 0x60, 0xeb, - 0x65, 0xc6, 0x94, 0xe1, 0x31, 0xcf, 0x58, 0x6a, 0xb4, 0xdf, 0x73, 0x7d, 0x07, 0x82, 0xad, 0x1f, - 0x5a, 0x72, 0x40, 0x60, 0x74, 0xcf, 0xb5, 0xb1, 0xb1, 0xba, 0xca, 0x0d, 0xae, 0x61, 0xdc, 0xd2, - 0x74, 0x26, 0x53, 0x6d, 0x83, 0x77, 0x2c, 0x85, 0xf6, 0xbd, 0x49, 0x6f, 0x3a, 0x98, 0x0f, 0xc3, - 0x8a, 0x23, 0x74, 0x13, 0x97, 0x5e, 0x70, 0x0a, 0xe3, 0x5b, 0x4c, 0xb0, 0x83, 0x61, 0xdd, 0x1a, - 0xc3, 0xd6, 0xc1, 0x21, 0x90, 0x76, 0x63, 0x99, 0x31, 0xff, 0xf6, 0x60, 0x60, 0x85, 0x47, 0x54, - 0x05, 0x8f, 0x91, 0x5c, 0x01, 0x6c, 0xb6, 0x42, 0x68, 0x13, 0xd9, 0x59, 0x15, 0xfd, 0x3d, 0x0e, - 0x59, 0x40, 0xbf, 0x21, 0x20, 0x47, 0x8d, 0xf7, 0x97, 0x94, 0xd2, 0x6d, 0x56, 0x05, 0x7c, 0x07, - 0xb0, 0x19, 0xb1, 0x15, 0xde, 0x01, 0xa4, 0xc7, 0x5b, 0xbd, 0xf2, 0x99, 0xc5, 0xc5, 0xf3, 0xec, - 0x95, 0x9b, 0xb7, 0x7c, 0x15, 0xc6, 0x52, 0xcc, 0xaa, 0xc6, 0xfa, 0x3c, 0xd7, 0xa8, 0x0a, 0x54, - 0x33, 0xf7, 0x09, 0x6a, 0x71, 0xb5, 0xeb, 0xae, 0x97, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xe4, - 0x3c, 0x6a, 0xb4, 0x37, 0x02, 0x00, 0x00, + // 331 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0xc1, 0x4e, 0xf2, 0x40, + 0x14, 0x85, 0xd3, 0x1f, 0x7e, 0x08, 0x17, 0x89, 0x70, 0x63, 0x62, 0xad, 0x1b, 0x52, 0x17, 0xe2, + 0xc2, 0x12, 0x71, 0xa1, 0x6b, 0xd4, 0x85, 0x89, 0x0b, 0x53, 0x5d, 0xb9, 0x21, 0x03, 0xbd, 0xd1, + 0x89, 0x4c, 0xa7, 0xce, 0x0c, 0x04, 0xdf, 0xc2, 0x57, 0xf3, 0x8d, 0xcc, 0x4c, 0x0b, 0x54, 0xcb, + 0xaa, 0x33, 0xe7, 0xdc, 0xf4, 0x9c, 0xef, 0x66, 0x00, 0x94, 0x94, 0x22, 0xca, 0x94, 0x34, 0x12, + 0x9b, 0x73, 0xbe, 0xa4, 0x77, 0x6e, 0x82, 0xb6, 0x90, 0x09, 0xcd, 0x73, 0x35, 0xfc, 0xf2, 0xa0, + 0x77, 0xa3, 0x88, 0x19, 0x8a, 0xa5, 0x14, 0x31, 0x7d, 0x2c, 0x48, 0x1b, 0x44, 0xa8, 0xa7, 0x4c, + 0x90, 0xef, 0xf5, 0xbd, 0x41, 0x2b, 0x76, 0x67, 0x3c, 0x81, 0x0e, 0x89, 0xcc, 0x7c, 0x4e, 0x0c, + 0x17, 0x24, 0x17, 0xc6, 0xff, 0xd7, 0xf7, 0x06, 0x9d, 0x78, 0xcf, 0x89, 0xcf, 0xb9, 0x86, 0x67, + 0xd0, 0x15, 0x6c, 0x35, 0xc9, 0x98, 0x32, 0x7c, 0xc6, 0x33, 0x96, 0x1a, 0xed, 0xd7, 0xdc, 0xdc, + 0xbe, 0x60, 0xab, 0xc7, 0x92, 0x8c, 0x87, 0xd0, 0x4c, 0x65, 0x42, 0x13, 0x9e, 0xf8, 0x75, 0x17, + 0xd3, 0xb0, 0xd7, 0xfb, 0x24, 0x44, 0xe8, 0x3e, 0x70, 0x6d, 0x6c, 0x1f, 0x5d, 0x14, 0x0a, 0xaf, + 0xa1, 0x57, 0xd2, 0x74, 0x26, 0x53, 0x6d, 0x1b, 0xfd, 0xb7, 0x7c, 0xda, 0xf7, 0xfa, 0xb5, 0x41, + 0x7b, 0xd4, 0x89, 0x0a, 0xc2, 0xc8, 0xa1, 0xe4, 0x5e, 0x78, 0x0a, 0xbd, 0x5b, 0x9a, 0x53, 0x85, + 0xcf, 0xba, 0x6b, 0x3e, 0x7b, 0x0e, 0x0f, 0x00, 0xcb, 0x83, 0x79, 0xc6, 0xe8, 0xdb, 0x83, 0xb6, + 0x15, 0x9e, 0x48, 0x2d, 0xf9, 0x8c, 0xf0, 0x0a, 0x60, 0xbb, 0x2e, 0x0c, 0x36, 0x91, 0x95, 0x1d, + 0x06, 0xbf, 0xeb, 0xe0, 0x18, 0x5a, 0x1b, 0x02, 0x3c, 0xda, 0x78, 0x7f, 0x49, 0x83, 0x60, 0x97, + 0x55, 0x00, 0xdf, 0x01, 0x6c, 0x2b, 0x96, 0xc2, 0x2b, 0x80, 0xc1, 0xf1, 0x4e, 0x2f, 0xff, 0xcd, + 0xf8, 0xe2, 0x65, 0xf8, 0xca, 0xcd, 0xdb, 0x62, 0x1a, 0xcd, 0xa4, 0x18, 0x16, 0x83, 0xeb, 0xef, + 0xb9, 0x26, 0xb5, 0x24, 0x35, 0x74, 0xcf, 0x63, 0x2d, 0x4e, 0x1b, 0xee, 0x7a, 0xf9, 0x13, 0x00, + 0x00, 0xff, 0xff, 0x7a, 0x20, 0xfb, 0xae, 0x51, 0x02, 0x00, 0x00, } diff --git a/proto/room.proto b/proto/room.proto index 9dc6d0c6d..fee5408aa 100644 --- a/proto/room.proto +++ b/proto/room.proto @@ -20,6 +20,8 @@ message CreateRoomRequest { // number of seconds the room should cleanup after being empty uint32 empty_timeout = 2; uint32 max_participants = 3; + // override the node room is allocated to, for debugging + string node_id = 4; } message ListRoomsRequest { diff --git a/test/integration_helpers.go b/test/integration_helpers.go index 58d5d3f0e..53663190e 100644 --- a/test/integration_helpers.go +++ b/test/integration_helpers.go @@ -22,14 +22,17 @@ import ( ) const ( - testApiKey = "apikey" - testApiSecret = "apiSecret" - testRoom = "mytestroom" + testApiKey = "apikey" + testApiSecret = "apiSecret" + testRoom = "mytestroom" + defaultServerPort = 7880 + secondServerPort = 7881 + nodeId1 = "integration-test-1" + nodeId2 = "integration-test-2" ) var ( - serverConfig *config.Config - roomClient livekit.RoomService + roomClient livekit.RoomService ) func setupSingleNodeTest(roomName string) *service.LivekitServer { @@ -42,51 +45,39 @@ func setupSingleNodeTest(roomName string) *service.LivekitServer { waitForServerToStart(s) // create test room - header := make(http.Header) - client.SetAuthorizationToken(header, createRoomToken()) - tctx, err := twirp.WithHTTPRequestHeaders(context.Background(), header) - if err != nil { - panic(err) - } - _, err = roomClient.CreateRoom(tctx, &livekit.CreateRoomRequest{Name: roomName}) + _, err := roomClient.CreateRoom(contextWithCreateRoomToken(), &livekit.CreateRoomRequest{Name: roomName}) if err != nil { panic(err) } return s } -func setupMultiNodeTest(roomName string) *service.LivekitServer { +func setupMultiNodeTest() (*service.LivekitServer, *service.LivekitServer) { logger.InitDevelopment("") - s := createMultiNodeServer() - go func() { - s.Start() - }() + s1 := createMultiNodeServer(nodeId1, defaultServerPort) + s2 := createMultiNodeServer(nodeId2, secondServerPort) + go s1.Start() + go s2.Start() - waitForServerToStart(s) + waitForServerToStart(s1) + waitForServerToStart(s2) - // create test room - header := make(http.Header) - client.SetAuthorizationToken(header, createRoomToken()) - tctx, err := twirp.WithHTTPRequestHeaders(context.Background(), header) - if err != nil { - panic(err) - } - _, err = roomClient.CreateRoom(tctx, &livekit.CreateRoomRequest{Name: roomName}) - if err != nil { - panic(err) - } - return s + return s1, s2 } func teardownTest(s *service.LivekitServer, roomName string) { + roomClient.DeleteRoom(contextWithCreateRoomToken(), &livekit.DeleteRoomRequest{Room: roomName}) + s.Stop() +} + +func contextWithCreateRoomToken() context.Context { header := make(http.Header) client.SetAuthorizationToken(header, createRoomToken()) tctx, err := twirp.WithHTTPRequestHeaders(context.Background(), header) if err != nil { panic(err) } - roomClient.DeleteRoom(tctx, &livekit.DeleteRoomRequest{Room: roomName}) - s.Stop() + return tctx } func waitForServerToStart(s *service.LivekitServer) { @@ -105,7 +96,7 @@ func waitForServerToStart(s *service.LivekitServer) { } func withTimeout(t *testing.T, description string, f func() bool) { - ctx, _ := context.WithTimeout(context.Background(), time.Second) + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) for { select { case <-ctx.Done(): @@ -124,7 +115,9 @@ func waitUntilConnected(t *testing.T, clients ...*client.RTCClient) { c := clients[i] wg.Add(1) go func() { - assert.NoError(t, c.WaitUntilConnected()) + if !assert.NoError(t, c.WaitUntilConnected()) { + t.Fatal("one or more clients could not connect") + } wg.Done() }() } @@ -133,12 +126,13 @@ func waitUntilConnected(t *testing.T, clients ...*client.RTCClient) { func createSingleNodeServer() *service.LivekitServer { var err error - serverConfig, err = config.NewConfig("") + conf, err := config.NewConfig("") if err != nil { panic(fmt.Sprintf("could not create config: %v", err)) } - currentNode, err := routing.NewLocalNode(serverConfig) + currentNode, err := routing.NewLocalNode(conf) + currentNode.Id = nodeId1 if err != nil { panic(err) } @@ -146,53 +140,51 @@ func createSingleNodeServer() *service.LivekitServer { // local routing and store router := routing.NewLocalRouter(currentNode) roomStore := service.NewLocalRoomStore() - s, err := service.InitializeServer(serverConfig, &StaticKeyProvider{}, roomStore, router, currentNode, &routing.RandomSelector{}) + s, err := service.InitializeServer(conf, &StaticKeyProvider{}, roomStore, router, currentNode, &routing.RandomSelector{}) if err != nil { panic(fmt.Sprintf("could not create server: %v", err)) } - roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d", serverConfig.Port), &http.Client{}) + roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d", defaultServerPort), &http.Client{}) return s } -func createMultiNodeServer() *service.LivekitServer { +func createMultiNodeServer(nodeId string, port uint32) *service.LivekitServer { var err error - serverConfig, err = config.NewConfig("") + conf, err := config.NewConfig("") if err != nil { panic(fmt.Sprintf("could not create config: %v", err)) } - serverConfig.MultiNode = true - serverConfig.Redis.Address = "localhost:6379" + conf.Port = port + conf.MultiNode = true - currentNode, err := routing.NewLocalNode(serverConfig) + currentNode, err := routing.NewLocalNode(conf) + currentNode.Id = nodeId if err != nil { panic(err) } // local routing and store - rc := redis.NewClient(&redis.Options{ - Addr: serverConfig.Redis.Address, - Password: serverConfig.Redis.Password, - }) + rc := redisClient() if err = rc.Ping(context.Background()).Err(); err != nil { panic(err) } router := routing.NewRedisRouter(currentNode, rc, false) roomStore := service.NewRedisRoomStore(rc) - s, err := service.InitializeServer(serverConfig, &StaticKeyProvider{}, roomStore, router, currentNode, &routing.RandomSelector{}) + s, err := service.InitializeServer(conf, &StaticKeyProvider{}, roomStore, router, currentNode, &routing.RandomSelector{}) if err != nil { panic(fmt.Sprintf("could not create server: %v", err)) } - roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d", serverConfig.Port), &http.Client{}) + roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d", port), &http.Client{}) return s } // creates a client and runs against server -func createRTCClient(name string) *client.RTCClient { +func createRTCClient(name string, port int) *client.RTCClient { token := joinToken(testRoom, name) - ws, err := client.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", serverConfig.Port), token) + ws, err := client.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", port), token) if err != nil { panic(err) } @@ -207,6 +199,12 @@ func createRTCClient(name string) *client.RTCClient { return c } +func redisClient() *redis.Client { + return redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) +} + func joinToken(room, name string) string { at := auth.NewAccessToken(testApiKey, testApiSecret). AddGrant(&auth.VideoGrant{RoomJoin: true, Room: room}). diff --git a/test/multinode_test.go b/test/multinode_test.go new file mode 100644 index 000000000..636a2f57d --- /dev/null +++ b/test/multinode_test.go @@ -0,0 +1,70 @@ +package test + +import ( + "context" + "testing" + + "github.com/go-redis/redis/v8" + "github.com/stretchr/testify/assert" + + "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/service" + "github.com/livekit/livekit-server/proto/livekit" +) + +func TestMultiNodeRouting(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + s1, s2 := setupMultiNodeTest() + defer s1.Stop() + defer s2.Stop() + + // creating room on node 1 + _, err := roomClient.CreateRoom(contextWithCreateRoomToken(), &livekit.CreateRoomRequest{ + Name: testRoom, + NodeId: nodeId1, + }) + assert.NoError(t, err) + + // one node connecting to node 1, and another connecting to node 2 + c1 := createRTCClient("c1", defaultServerPort) + c2 := createRTCClient("c2", defaultServerPort) + waitUntilConnected(t, c1, c2) + + // c1 publishing, and c2 receiving + t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam") + assert.NoError(t, err) + defer t1.Stop() + + withTimeout(t, "c2 should receive one track", func() bool { + if len(c2.SubscribedTracks()) == 0 { + return false + } + // should have received two tracks + if len(c2.SubscribedTracks()[c1.ID()]) != 1 { + return false + } + + tr1 := c2.SubscribedTracks()[c1.ID()][0] + assert.Equal(t, "webcam", tr1.StreamID()) + return true + }) + + c1.Stop() + c2.Stop() + + // ensure that room is closed + rc := redisClient() + ctx := context.Background() + withTimeout(t, "room should be closed", func() bool { + if rc.HGet(ctx, service.RoomsKey, testRoom).Err() == nil { + return false + } + return true + }) + + assert.Equal(t, redis.Nil, rc.HGet(ctx, routing.NodeRoomKey, testRoom).Err()) + assert.Equal(t, redis.Nil, rc.HGet(ctx, service.RoomIdMap, testRoom).Err()) +} diff --git a/test/integration_test.go b/test/singlenode_test.go similarity index 76% rename from test/integration_test.go rename to test/singlenode_test.go index 30d04a296..d8f944dbd 100644 --- a/test/integration_test.go +++ b/test/singlenode_test.go @@ -1,15 +1,23 @@ package test import ( - "os" "testing" "github.com/stretchr/testify/assert" ) func TestClientCouldConnect(t *testing.T) { - c1 := createRTCClient("c1") - c2 := createRTCClient("c2") + if testing.Short() { + t.SkipNow() + } + + s := setupSingleNodeTest(testRoom) + defer func() { + teardownTest(s, testRoom) + }() + + c1 := createRTCClient("c1", defaultServerPort) + c2 := createRTCClient("c2", defaultServerPort) waitUntilConnected(t, c1, c2) // ensure they both see each other @@ -23,8 +31,17 @@ func TestClientCouldConnect(t *testing.T) { } func TestSinglePublisher(t *testing.T) { - c1 := createRTCClient("c1") - c2 := createRTCClient("c2") + if testing.Short() { + t.SkipNow() + } + + s := setupSingleNodeTest(testRoom) + defer func() { + teardownTest(s, testRoom) + }() + + c1 := createRTCClient("c1", defaultServerPort) + c2 := createRTCClient("c2", defaultServerPort) waitUntilConnected(t, c1, c2) // publish a track and ensure clients receive it ok @@ -36,7 +53,7 @@ func TestSinglePublisher(t *testing.T) { defer t2.Stop() // a new client joins and should get the initial stream - c3 := createRTCClient("c3") + c3 := createRTCClient("c3", defaultServerPort) withTimeout(t, "c2 should receive two tracks", func() bool { if len(c2.SubscribedTracks()) == 0 { @@ -65,12 +82,3 @@ func TestSinglePublisher(t *testing.T) { return true }) } - -func TestMain(m *testing.M) { - s := setupSingleNodeTest(testRoom) - - code := m.Run() - - teardownTest(s, testRoom) - os.Exit(code) -}