decouple websocket and RTC nodes, prep for distributed

This commit is contained in:
David Zhao
2021-01-16 00:25:13 -08:00
parent 9a10a57b30
commit 9064f6ade1
61 changed files with 2683 additions and 1361 deletions
+15 -5
View File
@@ -13,6 +13,7 @@ import (
"github.com/livekit/livekit-server/cmd/cli/client"
"github.com/livekit/livekit-server/pkg/auth"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/service"
"github.com/livekit/livekit-server/proto/livekit"
)
@@ -43,12 +44,12 @@ func waitForServerToStart(s *service.LivekitServer) {
}
}
func withTimeout(t *testing.T, f func() bool) {
func withTimeout(t *testing.T, description string, f func() bool) {
ctx, _ := context.WithTimeout(context.Background(), time.Second)
for {
select {
case <-ctx.Done():
t.Fatal("timed out")
t.Fatal("timed out: " + description)
case <-time.After(10 * time.Millisecond):
if f() {
return
@@ -76,19 +77,28 @@ func createServer() *service.LivekitServer {
if err != nil {
panic(fmt.Sprintf("could not create config: %v", err))
}
s, err := service.InitializeServer(serverConfig, &StaticKeyProvider{})
currentNode, err := routing.NewLocalNode(serverConfig)
if err != nil {
panic(err)
}
// local routing and store
router := routing.NewLocalRouter(currentNode)
roomStore := service.NewLocalRoomStore()
s, err := service.InitializeServer(serverConfig, &StaticKeyProvider{}, roomStore, router, currentNode)
if err != nil {
panic(fmt.Sprintf("could not create server: %v", err))
}
roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d", serverConfig.APIPort), &http.Client{})
roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d", serverConfig.Port), &http.Client{})
return s
}
// creates a client and runs against server
func createClient(name string) *client.RTCClient {
token := joinToken(testRoom, name)
ws, err := client.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", serverConfig.RTCPort), token)
ws, err := client.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", serverConfig.Port), token)
if err != nil {
panic(err)
}
+4 -6
View File
@@ -10,7 +10,6 @@ import (
"github.com/twitchtv/twirp"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/service"
"github.com/livekit/livekit-server/proto/livekit"
)
@@ -20,7 +19,7 @@ func TestClientCouldConnect(t *testing.T) {
waitUntilConnected(t, c1, c2)
// ensure they both see each other
withTimeout(t, func() bool {
withTimeout(t, "c1 and c2 could connect", func() bool {
if len(c1.RemoteParticipants()) == 0 || len(c2.RemoteParticipants()) == 0 {
return false
}
@@ -45,7 +44,7 @@ func TestSinglePublisher(t *testing.T) {
// a new client joins and should get the initial stream
c3 := createClient("c3")
withTimeout(t, func() bool {
withTimeout(t, "c2 should receive two tracks", func() bool {
if len(c2.SubscribedTracks()) == 0 {
return false
}
@@ -61,7 +60,7 @@ func TestSinglePublisher(t *testing.T) {
// ensure that new client that has joined also received tracks
waitUntilConnected(t, c3)
withTimeout(t, func() bool {
withTimeout(t, "c2 should receive two tracks", func() bool {
if len(c3.SubscribedTracks()) == 0 {
return false
}
@@ -76,7 +75,6 @@ func TestSinglePublisher(t *testing.T) {
func TestMain(m *testing.M) {
logger.InitDevelopment()
s := createServer()
service.AuthRequired = true
go func() {
s.Start()
}()
@@ -86,7 +84,7 @@ func TestMain(m *testing.M) {
// create test room
token := createRoomToken()
header := make(http.Header)
logger.GetLogger().Debugw("auth token", "token", token)
logger.Debugw("auth token", "token", token)
header.Set("Authorization", "Bearer "+token)
tctx, err := twirp.WithHTTPRequestHeaders(context.Background(), header)
if err != nil {