multi-node integration tests

This commit is contained in:
David Zhao
2021-01-19 20:34:11 -08:00
parent 0b00b26d05
commit c04fd86f9c
9 changed files with 234 additions and 131 deletions
+1
View File
@@ -322,6 +322,7 @@ func (c *RTCClient) Stop() {
c.connected = false
c.iceConnected = false
c.conn.Close()
c.PeerConn.Close()
c.cancel()
}
+9
View File
@@ -134,7 +134,16 @@ func Build() error {
return nil
}
// run unit tests, skipping integration
func Test() error {
mg.Deps(Proto)
cmd := exec.Command("go", "test", "-short", "./...")
connectStd(cmd)
return cmd.Run()
}
// run all thests including integration
func TestAll() error {
mg.Deps(Proto)
cmd := exec.Command("go", "test", "./...")
connectStd(cmd)
+13 -10
View File
@@ -54,18 +54,21 @@ func (r *RoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room,
return nil, err
}
// allocate room to a node
nodes, err := r.router.ListNodes()
if err != nil {
return nil, err
if req.NodeId == "" {
// select a node for room
nodes, err := r.router.ListNodes()
if err != nil {
return nil, err
}
node, err := r.selector.SelectNode(nodes, rm)
if err != nil {
return nil, err
}
req.NodeId = node.Id
}
node, err := r.selector.SelectNode(nodes, rm)
if err != nil {
return nil, err
}
if err = r.router.SetNodeForRoom(req.Name, node.Id); err != nil {
if err := r.router.SetNodeForRoom(req.Name, req.NodeId); err != nil {
return nil, err
}
+45 -34
View File
@@ -34,6 +34,8 @@ type CreateRoomRequest struct {
// number of seconds the room should cleanup after being empty
EmptyTimeout uint32 `protobuf:"varint,2,opt,name=empty_timeout,json=emptyTimeout,proto3" json:"empty_timeout,omitempty"`
MaxParticipants uint32 `protobuf:"varint,3,opt,name=max_participants,json=maxParticipants,proto3" json:"max_participants,omitempty"`
// override the node room is allocated to, for debugging
NodeId string `protobuf:"bytes,4,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"`
}
func (x *CreateRoomRequest) Reset() {
@@ -89,6 +91,13 @@ func (x *CreateRoomRequest) GetMaxParticipants() uint32 {
return 0
}
func (x *CreateRoomRequest) GetNodeId() string {
if x != nil {
return x.NodeId
}
return ""
}
type ListRoomsRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -264,40 +273,42 @@ var File_room_proto protoreflect.FileDescriptor
var file_room_proto_rawDesc = []byte{
0x0a, 0x0a, 0x72, 0x6f, 0x6f, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6c, 0x69,
0x76, 0x65, 0x6b, 0x69, 0x74, 0x1a, 0x0b, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x22, 0x77, 0x0a, 0x11, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x65,
0x6d, 0x70, 0x74, 0x79, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01,
0x28, 0x0d, 0x52, 0x0c, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74,
0x12, 0x29, 0x0a, 0x10, 0x6d, 0x61, 0x78, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70,
0x61, 0x6e, 0x74, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x6d, 0x61, 0x78, 0x50,
0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x73, 0x22, 0x12, 0x0a, 0x10, 0x4c,
0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22,
0x38, 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x05, 0x72, 0x6f, 0x6f, 0x6d, 0x73, 0x18, 0x01, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x52, 0x6f,
0x6f, 0x6d, 0x52, 0x05, 0x72, 0x6f, 0x6f, 0x6d, 0x73, 0x22, 0x27, 0x0a, 0x11, 0x44, 0x65, 0x6c,
0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12,
0x0a, 0x04, 0x72, 0x6f, 0x6f, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x72, 0x6f,
0x6f, 0x6d, 0x22, 0x14, 0x0a, 0x12, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xd1, 0x01, 0x0a, 0x0b, 0x52, 0x6f, 0x6f,
0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x0a, 0x43, 0x72, 0x65, 0x61,
0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x12, 0x1a, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74,
0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x52, 0x6f, 0x6f,
0x6d, 0x12, 0x42, 0x0a, 0x09, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x12, 0x19,
0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f,
0x6d, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x6c, 0x69, 0x76, 0x65,
0x6b, 0x69, 0x74, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, 0x0a, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52,
0x6f, 0x6f, 0x6d, 0x12, 0x1a, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x44, 0x65,
0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x1b, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65,
0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x31, 0x5a, 0x2f,
0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b,
0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65,
0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62,
0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x74, 0x6f, 0x22, 0x90, 0x01, 0x0a, 0x11, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x52, 0x6f, 0x6f,
0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d,
0x65, 0x6d, 0x70, 0x74, 0x79, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x02, 0x20,
0x01, 0x28, 0x0d, 0x52, 0x0c, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75,
0x74, 0x12, 0x29, 0x0a, 0x10, 0x6d, 0x61, 0x78, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69,
0x70, 0x61, 0x6e, 0x74, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x6d, 0x61, 0x78,
0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x73, 0x12, 0x17, 0x0a, 0x07,
0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6e,
0x6f, 0x64, 0x65, 0x49, 0x64, 0x22, 0x12, 0x0a, 0x10, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f,
0x6d, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x38, 0x0a, 0x11, 0x4c, 0x69, 0x73,
0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23,
0x0a, 0x05, 0x72, 0x6f, 0x6f, 0x6d, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e,
0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x05, 0x72, 0x6f,
0x6f, 0x6d, 0x73, 0x22, 0x27, 0x0a, 0x11, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f,
0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x72, 0x6f, 0x6f, 0x6d,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x72, 0x6f, 0x6f, 0x6d, 0x22, 0x14, 0x0a, 0x12,
0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x32, 0xd1, 0x01, 0x0a, 0x0b, 0x52, 0x6f, 0x6f, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69,
0x63, 0x65, 0x12, 0x37, 0x0a, 0x0a, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d,
0x12, 0x1a, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74,
0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x6c,
0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x52, 0x6f, 0x6f, 0x6d, 0x12, 0x42, 0x0a, 0x09, 0x4c,
0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x12, 0x19, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b,
0x69, 0x74, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x4c, 0x69,
0x73, 0x74, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
0x45, 0x0a, 0x0a, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x12, 0x1a, 0x2e,
0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f,
0x6f, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x6c, 0x69, 0x76, 0x65,
0x6b, 0x69, 0x74, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76,
0x65, 0x6b, 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x33,
}
var (
+22 -21
View File
@@ -1594,25 +1594,26 @@ func callClientError(ctx context.Context, h *twirp.ClientHooks, err twirp.Error)
}
var twirpFileDescriptor0 = []byte{
// 311 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0x3f, 0x4f, 0xf3, 0x30,
0x10, 0xc6, 0x95, 0xb7, 0x2f, 0xa0, 0x5e, 0xa9, 0x68, 0x2d, 0x86, 0x60, 0x96, 0x2a, 0x0c, 0x94,
0x81, 0x54, 0x94, 0x01, 0xe6, 0x02, 0x1b, 0x03, 0x0a, 0x4c, 0x2c, 0x95, 0x1b, 0x4e, 0x60, 0x11,
0xc7, 0xc1, 0x76, 0x42, 0xf9, 0x98, 0x7c, 0x23, 0x64, 0xe7, 0x4f, 0x03, 0xe9, 0xe4, 0xf3, 0xf3,
0x9c, 0xfc, 0xdc, 0xef, 0x64, 0x00, 0x25, 0xa5, 0x08, 0x33, 0x25, 0x8d, 0x24, 0x7b, 0x09, 0x2f,
0xf0, 0x9d, 0x1b, 0x3a, 0x10, 0xf2, 0x05, 0x93, 0x52, 0x0d, 0x3e, 0x61, 0x7c, 0xa3, 0x90, 0x19,
0x8c, 0xa4, 0x14, 0x11, 0x7e, 0xe4, 0xa8, 0x0d, 0x21, 0xf0, 0x3f, 0x65, 0x02, 0x7d, 0x6f, 0xe2,
0x4d, 0xfb, 0x91, 0xab, 0xc9, 0x09, 0x0c, 0x51, 0x64, 0xe6, 0x6b, 0x69, 0xb8, 0x40, 0x99, 0x1b,
0xff, 0xdf, 0xc4, 0x9b, 0x0e, 0xa3, 0x7d, 0x27, 0x3e, 0x95, 0x1a, 0x39, 0x83, 0x91, 0x60, 0xeb,
0x65, 0xc6, 0x94, 0xe1, 0x31, 0xcf, 0x58, 0x6a, 0xb4, 0xdf, 0x73, 0x7d, 0x07, 0x82, 0xad, 0x1f,
0x5a, 0x72, 0x40, 0x60, 0x74, 0xcf, 0xb5, 0xb1, 0xb1, 0xba, 0xca, 0x0d, 0xae, 0x61, 0xdc, 0xd2,
0x74, 0x26, 0x53, 0x6d, 0x83, 0x77, 0x2c, 0x85, 0xf6, 0xbd, 0x49, 0x6f, 0x3a, 0x98, 0x0f, 0xc3,
0x8a, 0x23, 0x74, 0x13, 0x97, 0x5e, 0x70, 0x0a, 0xe3, 0x5b, 0x4c, 0xb0, 0x83, 0x61, 0xdd, 0x1a,
0xc3, 0xd6, 0xc1, 0x21, 0x90, 0x76, 0x63, 0x99, 0x31, 0xff, 0xf6, 0x60, 0x60, 0x85, 0x47, 0x54,
0x05, 0x8f, 0x91, 0x5c, 0x01, 0x6c, 0xb6, 0x42, 0x68, 0x13, 0xd9, 0x59, 0x15, 0xfd, 0x3d, 0x0e,
0x59, 0x40, 0xbf, 0x21, 0x20, 0x47, 0x8d, 0xf7, 0x97, 0x94, 0xd2, 0x6d, 0x56, 0x05, 0x7c, 0x07,
0xb0, 0x19, 0xb1, 0x15, 0xde, 0x01, 0xa4, 0xc7, 0x5b, 0xbd, 0xf2, 0x99, 0xc5, 0xc5, 0xf3, 0xec,
0x95, 0x9b, 0xb7, 0x7c, 0x15, 0xc6, 0x52, 0xcc, 0xaa, 0xc6, 0xfa, 0x3c, 0xd7, 0xa8, 0x0a, 0x54,
0x33, 0xf7, 0x09, 0x6a, 0x71, 0xb5, 0xeb, 0xae, 0x97, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xe4,
0x3c, 0x6a, 0xb4, 0x37, 0x02, 0x00, 0x00,
// 331 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0xc1, 0x4e, 0xf2, 0x40,
0x14, 0x85, 0xd3, 0x1f, 0x7e, 0x08, 0x17, 0x89, 0x70, 0x63, 0x62, 0xad, 0x1b, 0x52, 0x17, 0xe2,
0xc2, 0x12, 0x71, 0xa1, 0x6b, 0xd4, 0x85, 0x89, 0x0b, 0x53, 0x5d, 0xb9, 0x21, 0x03, 0xbd, 0xd1,
0x89, 0x4c, 0xa7, 0xce, 0x0c, 0x04, 0xdf, 0xc2, 0x57, 0xf3, 0x8d, 0xcc, 0x4c, 0x0b, 0x54, 0xcb,
0xaa, 0x33, 0xe7, 0xdc, 0xf4, 0x9c, 0xef, 0x66, 0x00, 0x94, 0x94, 0x22, 0xca, 0x94, 0x34, 0x12,
0x9b, 0x73, 0xbe, 0xa4, 0x77, 0x6e, 0x82, 0xb6, 0x90, 0x09, 0xcd, 0x73, 0x35, 0xfc, 0xf2, 0xa0,
0x77, 0xa3, 0x88, 0x19, 0x8a, 0xa5, 0x14, 0x31, 0x7d, 0x2c, 0x48, 0x1b, 0x44, 0xa8, 0xa7, 0x4c,
0x90, 0xef, 0xf5, 0xbd, 0x41, 0x2b, 0x76, 0x67, 0x3c, 0x81, 0x0e, 0x89, 0xcc, 0x7c, 0x4e, 0x0c,
0x17, 0x24, 0x17, 0xc6, 0xff, 0xd7, 0xf7, 0x06, 0x9d, 0x78, 0xcf, 0x89, 0xcf, 0xb9, 0x86, 0x67,
0xd0, 0x15, 0x6c, 0x35, 0xc9, 0x98, 0x32, 0x7c, 0xc6, 0x33, 0x96, 0x1a, 0xed, 0xd7, 0xdc, 0xdc,
0xbe, 0x60, 0xab, 0xc7, 0x92, 0x8c, 0x87, 0xd0, 0x4c, 0x65, 0x42, 0x13, 0x9e, 0xf8, 0x75, 0x17,
0xd3, 0xb0, 0xd7, 0xfb, 0x24, 0x44, 0xe8, 0x3e, 0x70, 0x6d, 0x6c, 0x1f, 0x5d, 0x14, 0x0a, 0xaf,
0xa1, 0x57, 0xd2, 0x74, 0x26, 0x53, 0x6d, 0x1b, 0xfd, 0xb7, 0x7c, 0xda, 0xf7, 0xfa, 0xb5, 0x41,
0x7b, 0xd4, 0x89, 0x0a, 0xc2, 0xc8, 0xa1, 0xe4, 0x5e, 0x78, 0x0a, 0xbd, 0x5b, 0x9a, 0x53, 0x85,
0xcf, 0xba, 0x6b, 0x3e, 0x7b, 0x0e, 0x0f, 0x00, 0xcb, 0x83, 0x79, 0xc6, 0xe8, 0xdb, 0x83, 0xb6,
0x15, 0x9e, 0x48, 0x2d, 0xf9, 0x8c, 0xf0, 0x0a, 0x60, 0xbb, 0x2e, 0x0c, 0x36, 0x91, 0x95, 0x1d,
0x06, 0xbf, 0xeb, 0xe0, 0x18, 0x5a, 0x1b, 0x02, 0x3c, 0xda, 0x78, 0x7f, 0x49, 0x83, 0x60, 0x97,
0x55, 0x00, 0xdf, 0x01, 0x6c, 0x2b, 0x96, 0xc2, 0x2b, 0x80, 0xc1, 0xf1, 0x4e, 0x2f, 0xff, 0xcd,
0xf8, 0xe2, 0x65, 0xf8, 0xca, 0xcd, 0xdb, 0x62, 0x1a, 0xcd, 0xa4, 0x18, 0x16, 0x83, 0xeb, 0xef,
0xb9, 0x26, 0xb5, 0x24, 0x35, 0x74, 0xcf, 0x63, 0x2d, 0x4e, 0x1b, 0xee, 0x7a, 0xf9, 0x13, 0x00,
0x00, 0xff, 0xff, 0x7a, 0x20, 0xfb, 0xae, 0x51, 0x02, 0x00, 0x00,
}
+2
View File
@@ -20,6 +20,8 @@ message CreateRoomRequest {
// number of seconds the room should cleanup after being empty
uint32 empty_timeout = 2;
uint32 max_participants = 3;
// override the node room is allocated to, for debugging
string node_id = 4;
}
message ListRoomsRequest {
+49 -51
View File
@@ -22,14 +22,17 @@ import (
)
const (
testApiKey = "apikey"
testApiSecret = "apiSecret"
testRoom = "mytestroom"
testApiKey = "apikey"
testApiSecret = "apiSecret"
testRoom = "mytestroom"
defaultServerPort = 7880
secondServerPort = 7881
nodeId1 = "integration-test-1"
nodeId2 = "integration-test-2"
)
var (
serverConfig *config.Config
roomClient livekit.RoomService
roomClient livekit.RoomService
)
func setupSingleNodeTest(roomName string) *service.LivekitServer {
@@ -42,51 +45,39 @@ func setupSingleNodeTest(roomName string) *service.LivekitServer {
waitForServerToStart(s)
// create test room
header := make(http.Header)
client.SetAuthorizationToken(header, createRoomToken())
tctx, err := twirp.WithHTTPRequestHeaders(context.Background(), header)
if err != nil {
panic(err)
}
_, err = roomClient.CreateRoom(tctx, &livekit.CreateRoomRequest{Name: roomName})
_, err := roomClient.CreateRoom(contextWithCreateRoomToken(), &livekit.CreateRoomRequest{Name: roomName})
if err != nil {
panic(err)
}
return s
}
func setupMultiNodeTest(roomName string) *service.LivekitServer {
func setupMultiNodeTest() (*service.LivekitServer, *service.LivekitServer) {
logger.InitDevelopment("")
s := createMultiNodeServer()
go func() {
s.Start()
}()
s1 := createMultiNodeServer(nodeId1, defaultServerPort)
s2 := createMultiNodeServer(nodeId2, secondServerPort)
go s1.Start()
go s2.Start()
waitForServerToStart(s)
waitForServerToStart(s1)
waitForServerToStart(s2)
// create test room
header := make(http.Header)
client.SetAuthorizationToken(header, createRoomToken())
tctx, err := twirp.WithHTTPRequestHeaders(context.Background(), header)
if err != nil {
panic(err)
}
_, err = roomClient.CreateRoom(tctx, &livekit.CreateRoomRequest{Name: roomName})
if err != nil {
panic(err)
}
return s
return s1, s2
}
func teardownTest(s *service.LivekitServer, roomName string) {
roomClient.DeleteRoom(contextWithCreateRoomToken(), &livekit.DeleteRoomRequest{Room: roomName})
s.Stop()
}
func contextWithCreateRoomToken() context.Context {
header := make(http.Header)
client.SetAuthorizationToken(header, createRoomToken())
tctx, err := twirp.WithHTTPRequestHeaders(context.Background(), header)
if err != nil {
panic(err)
}
roomClient.DeleteRoom(tctx, &livekit.DeleteRoomRequest{Room: roomName})
s.Stop()
return tctx
}
func waitForServerToStart(s *service.LivekitServer) {
@@ -105,7 +96,7 @@ func waitForServerToStart(s *service.LivekitServer) {
}
func withTimeout(t *testing.T, description string, f func() bool) {
ctx, _ := context.WithTimeout(context.Background(), time.Second)
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
for {
select {
case <-ctx.Done():
@@ -124,7 +115,9 @@ func waitUntilConnected(t *testing.T, clients ...*client.RTCClient) {
c := clients[i]
wg.Add(1)
go func() {
assert.NoError(t, c.WaitUntilConnected())
if !assert.NoError(t, c.WaitUntilConnected()) {
t.Fatal("one or more clients could not connect")
}
wg.Done()
}()
}
@@ -133,12 +126,13 @@ func waitUntilConnected(t *testing.T, clients ...*client.RTCClient) {
func createSingleNodeServer() *service.LivekitServer {
var err error
serverConfig, err = config.NewConfig("")
conf, err := config.NewConfig("")
if err != nil {
panic(fmt.Sprintf("could not create config: %v", err))
}
currentNode, err := routing.NewLocalNode(serverConfig)
currentNode, err := routing.NewLocalNode(conf)
currentNode.Id = nodeId1
if err != nil {
panic(err)
}
@@ -146,53 +140,51 @@ func createSingleNodeServer() *service.LivekitServer {
// local routing and store
router := routing.NewLocalRouter(currentNode)
roomStore := service.NewLocalRoomStore()
s, err := service.InitializeServer(serverConfig, &StaticKeyProvider{}, roomStore, router, currentNode, &routing.RandomSelector{})
s, err := service.InitializeServer(conf, &StaticKeyProvider{}, roomStore, router, currentNode, &routing.RandomSelector{})
if err != nil {
panic(fmt.Sprintf("could not create server: %v", err))
}
roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d", serverConfig.Port), &http.Client{})
roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d", defaultServerPort), &http.Client{})
return s
}
func createMultiNodeServer() *service.LivekitServer {
func createMultiNodeServer(nodeId string, port uint32) *service.LivekitServer {
var err error
serverConfig, err = config.NewConfig("")
conf, err := config.NewConfig("")
if err != nil {
panic(fmt.Sprintf("could not create config: %v", err))
}
serverConfig.MultiNode = true
serverConfig.Redis.Address = "localhost:6379"
conf.Port = port
conf.MultiNode = true
currentNode, err := routing.NewLocalNode(serverConfig)
currentNode, err := routing.NewLocalNode(conf)
currentNode.Id = nodeId
if err != nil {
panic(err)
}
// local routing and store
rc := redis.NewClient(&redis.Options{
Addr: serverConfig.Redis.Address,
Password: serverConfig.Redis.Password,
})
rc := redisClient()
if err = rc.Ping(context.Background()).Err(); err != nil {
panic(err)
}
router := routing.NewRedisRouter(currentNode, rc, false)
roomStore := service.NewRedisRoomStore(rc)
s, err := service.InitializeServer(serverConfig, &StaticKeyProvider{}, roomStore, router, currentNode, &routing.RandomSelector{})
s, err := service.InitializeServer(conf, &StaticKeyProvider{}, roomStore, router, currentNode, &routing.RandomSelector{})
if err != nil {
panic(fmt.Sprintf("could not create server: %v", err))
}
roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d", serverConfig.Port), &http.Client{})
roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d", port), &http.Client{})
return s
}
// creates a client and runs against server
func createRTCClient(name string) *client.RTCClient {
func createRTCClient(name string, port int) *client.RTCClient {
token := joinToken(testRoom, name)
ws, err := client.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", serverConfig.Port), token)
ws, err := client.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", port), token)
if err != nil {
panic(err)
}
@@ -207,6 +199,12 @@ func createRTCClient(name string) *client.RTCClient {
return c
}
func redisClient() *redis.Client {
return redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
}
func joinToken(room, name string) string {
at := auth.NewAccessToken(testApiKey, testApiSecret).
AddGrant(&auth.VideoGrant{RoomJoin: true, Room: room}).
+70
View File
@@ -0,0 +1,70 @@
package test
import (
"context"
"testing"
"github.com/go-redis/redis/v8"
"github.com/stretchr/testify/assert"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/service"
"github.com/livekit/livekit-server/proto/livekit"
)
func TestMultiNodeRouting(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
s1, s2 := setupMultiNodeTest()
defer s1.Stop()
defer s2.Stop()
// creating room on node 1
_, err := roomClient.CreateRoom(contextWithCreateRoomToken(), &livekit.CreateRoomRequest{
Name: testRoom,
NodeId: nodeId1,
})
assert.NoError(t, err)
// one node connecting to node 1, and another connecting to node 2
c1 := createRTCClient("c1", defaultServerPort)
c2 := createRTCClient("c2", defaultServerPort)
waitUntilConnected(t, c1, c2)
// c1 publishing, and c2 receiving
t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
assert.NoError(t, err)
defer t1.Stop()
withTimeout(t, "c2 should receive one track", func() bool {
if len(c2.SubscribedTracks()) == 0 {
return false
}
// should have received two tracks
if len(c2.SubscribedTracks()[c1.ID()]) != 1 {
return false
}
tr1 := c2.SubscribedTracks()[c1.ID()][0]
assert.Equal(t, "webcam", tr1.StreamID())
return true
})
c1.Stop()
c2.Stop()
// ensure that room is closed
rc := redisClient()
ctx := context.Background()
withTimeout(t, "room should be closed", func() bool {
if rc.HGet(ctx, service.RoomsKey, testRoom).Err() == nil {
return false
}
return true
})
assert.Equal(t, redis.Nil, rc.HGet(ctx, routing.NodeRoomKey, testRoom).Err())
assert.Equal(t, redis.Nil, rc.HGet(ctx, service.RoomIdMap, testRoom).Err())
}
@@ -1,15 +1,23 @@
package test
import (
"os"
"testing"
"github.com/stretchr/testify/assert"
)
func TestClientCouldConnect(t *testing.T) {
c1 := createRTCClient("c1")
c2 := createRTCClient("c2")
if testing.Short() {
t.SkipNow()
}
s := setupSingleNodeTest(testRoom)
defer func() {
teardownTest(s, testRoom)
}()
c1 := createRTCClient("c1", defaultServerPort)
c2 := createRTCClient("c2", defaultServerPort)
waitUntilConnected(t, c1, c2)
// ensure they both see each other
@@ -23,8 +31,17 @@ func TestClientCouldConnect(t *testing.T) {
}
func TestSinglePublisher(t *testing.T) {
c1 := createRTCClient("c1")
c2 := createRTCClient("c2")
if testing.Short() {
t.SkipNow()
}
s := setupSingleNodeTest(testRoom)
defer func() {
teardownTest(s, testRoom)
}()
c1 := createRTCClient("c1", defaultServerPort)
c2 := createRTCClient("c2", defaultServerPort)
waitUntilConnected(t, c1, c2)
// publish a track and ensure clients receive it ok
@@ -36,7 +53,7 @@ func TestSinglePublisher(t *testing.T) {
defer t2.Stop()
// a new client joins and should get the initial stream
c3 := createRTCClient("c3")
c3 := createRTCClient("c3", defaultServerPort)
withTimeout(t, "c2 should receive two tracks", func() bool {
if len(c2.SubscribedTracks()) == 0 {
@@ -65,12 +82,3 @@ func TestSinglePublisher(t *testing.T) {
return true
})
}
func TestMain(m *testing.M) {
s := setupSingleNodeTest(testRoom)
code := m.Run()
teardownTest(s, testRoom)
os.Exit(code)
}