diff --git a/cmd/cli/client/client.go b/cmd/cli/client/client.go index 0aef06ae5..d0e1e8c36 100644 --- a/cmd/cli/client/client.go +++ b/cmd/cli/client/client.go @@ -3,6 +3,7 @@ package client import ( "container/ring" "context" + "errors" "fmt" "io" "net/http" @@ -254,6 +255,20 @@ func (c *RTCClient) Run() error { return nil } +func (c *RTCClient) WaitUntilConnected() error { + ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) + for { + select { + case <-ctx.Done(): + return errors.New("could not connect after timeout") + case <-time.After(10 * time.Millisecond): + if c.iceConnected { + return nil + } + } + } +} + func (c *RTCClient) ReadResponse() (*livekit.SignalResponse, error) { for { // handle special messages and pass on the rest diff --git a/go.sum b/go.sum index 379e78a28..34dfc3b64 100644 --- a/go.sum +++ b/go.sum @@ -314,8 +314,6 @@ github.com/pion/interceptor v0.0.9 h1:fk5hTdyLO3KURQsf/+RjMpEm4NE3yeTY9Kh97b5Bvw github.com/pion/interceptor v0.0.9/go.mod h1:dHgEP5dtxOTf21MObuBAjJeAayPxLUAZjerGH8Xr07c= github.com/pion/ion-log v1.0.0 h1:2lJLImCmfCWCR38hLWsjQfBWe6NFz/htbqiYHwvOP/Q= github.com/pion/ion-log v1.0.0/go.mod h1:jwcla9KoB9bB/4FxYDSRJPcPYSLp5XiUUMnOLaqwl4E= -github.com/pion/ion-sfu v1.7.2 h1:jW59IxIQtcGotK/BYlTqOF1Vsp/UqM4BoRD6MMnYw1Y= -github.com/pion/ion-sfu v1.7.2/go.mod h1:61HfTCWVx6rTpYCc7kmvnTToEpyBoMwDhOyvMiUtNhk= github.com/pion/ion-sfu v1.7.7 h1:qbBOsUJrU8ZlFUC8Gmz8P3FBfZ5rzr6X/FErt87dioA= github.com/pion/ion-sfu v1.7.7/go.mod h1:D6Qnd7GHbYiRI1ye5a1IHst6cbbATawCb8fDs0oLxEI= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= diff --git a/pkg/service/auth.go b/pkg/service/auth.go index 14cb2d7a9..94ad20531 100644 --- a/pkg/service/auth.go +++ b/pkg/service/auth.go @@ -9,6 +9,7 @@ import ( "github.com/twitchtv/twirp" "github.com/livekit/livekit-server/pkg/auth" + "github.com/livekit/livekit-server/pkg/logger" ) const ( @@ -40,8 +41,7 @@ func (m *APIKeyAuthMiddleware) ServeHTTP(w http.ResponseWriter, r *http.Request, if authHeader != "" { if !strings.HasPrefix(authHeader, bearerPrefix) { - w.WriteHeader(http.StatusUnauthorized) - w.Write([]byte("invalid authorization header. Must start with " + bearerPrefix)) + handleError(w, http.StatusUnauthorized, "invalid authorization header. Must start with "+bearerPrefix) return } @@ -54,22 +54,19 @@ func (m *APIKeyAuthMiddleware) ServeHTTP(w http.ResponseWriter, r *http.Request, if authToken != "" { v, err := auth.ParseAPIToken(authToken) if err != nil { - w.WriteHeader(http.StatusUnauthorized) - w.Write([]byte("invalid authorization token")) + handleError(w, http.StatusUnauthorized, "invalid authorization token") return } secret := m.provider.GetSecret(v.APIKey()) if secret == "" { - w.WriteHeader(http.StatusUnauthorized) - w.Write([]byte("invalid API key")) + handleError(w, http.StatusUnauthorized, "invalid API key") return } grants, err := v.Verify(secret) if err != nil { - w.WriteHeader(http.StatusUnauthorized) - w.Write([]byte("invalid token: " + err.Error())) + handleError(w, http.StatusUnauthorized, "invalid token: "+err.Error()) return } @@ -130,3 +127,9 @@ func EnsureCreatePermission(ctx context.Context) error { func twirpAuthError(err error) error { return twirp.NewError(twirp.Unauthenticated, err.Error()) } + +func handleError(w http.ResponseWriter, status int, msg string) { + logger.GetLogger().Debugw("error handling request", "error", msg, "status", status) + w.WriteHeader(status) + w.Write([]byte(msg)) +} diff --git a/pkg/service/rtc.go b/pkg/service/rtc.go index b2d652e62..07b7b4c7f 100644 --- a/pkg/service/rtc.go +++ b/pkg/service/rtc.go @@ -46,7 +46,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { claims := GetGrants(r.Context()) // require a claim if claims == nil || claims.Video == nil { - writeJSONError(w, http.StatusUnauthorized, rtc.ErrPermissionDenied.Error()) + handleError(w, http.StatusUnauthorized, rtc.ErrPermissionDenied.Error()) } pName = claims.Identity } @@ -54,14 +54,14 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { onlyName, err := EnsureJoinPermission(r.Context()) if err != nil { - writeJSONError(w, http.StatusUnauthorized, err.Error()) + handleError(w, http.StatusUnauthorized, err.Error()) return } room, err := s.manager.GetRoomWithConstraint(roomId, onlyName) if err != nil { // TODO: return errors/status correctly - writeJSONError(w, http.StatusNotFound, err.Error()) + handleError(w, http.StatusNotFound, err.Error()) return } @@ -71,7 +71,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { logger.GetLogger().Warnw("could not upgrade to WS", "err", err, ) - writeJSONError(w, http.StatusInternalServerError, err.Error()) + handleError(w, http.StatusInternalServerError, err.Error()) return } conn.SetCloseHandler(func(code int, text string) error { @@ -82,12 +82,12 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { pc, err := rtc.NewPeerConnection(s.manager.Config()) if err != nil { - writeJSONError(w, http.StatusInternalServerError, "could not create peerConnection", err.Error()) + handleError(w, http.StatusInternalServerError, "could not create peerConnection: "+err.Error()) return } participant, err := rtc.NewParticipant(pc, signalConn, pName, s.manager.Config().Receiver) if err != nil { - writeJSONError(w, http.StatusInternalServerError, "could not create participant", err.Error()) + handleError(w, http.StatusInternalServerError, "could not create participant: "+err.Error()) return } @@ -98,7 +98,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { ) if err := room.Join(participant); err != nil { - writeJSONError(w, http.StatusInternalServerError, "could not join room", err.Error()) + handleError(w, http.StatusInternalServerError, "could not join room: "+err.Error()) return } diff --git a/test/integration_helpers.go b/test/integration_helpers.go index a923c96b7..86242e8a4 100644 --- a/test/integration_helpers.go +++ b/test/integration_helpers.go @@ -3,22 +3,32 @@ package test import ( "context" "fmt" + "net/http" + "testing" "time" "github.com/livekit/livekit-server/cmd/cli/client" "github.com/livekit/livekit-server/pkg/auth" "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/logger" "github.com/livekit/livekit-server/pkg/service" + "github.com/livekit/livekit-server/proto/livekit" ) const ( testApiKey = "apikey" testApiSecret = "apiSecret" + testRoom = "mytestroom" +) + +var ( + serverConfig *config.Config + roomClient livekit.RoomService ) func waitForServerToStart(s *service.LivekitServer) { // wait till ready - ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) for { select { case <-ctx.Done(): @@ -31,20 +41,51 @@ func waitForServerToStart(s *service.LivekitServer) { } } +func withTimeout(t *testing.T, f func() bool) { + ctx, _ := context.WithTimeout(context.Background(), time.Second) + for { + select { + case <-ctx.Done(): + t.Fatal("timed out") + case <-time.After(10 * time.Millisecond): + if f() { + return + } + } + } +} + func createServer() *service.LivekitServer { - conf, err := config.NewConfig("") + var err error + serverConfig, err = config.NewConfig("") if err != nil { panic(fmt.Sprintf("could not create config: %v", err)) } - s, err := service.InitializeServer(conf, &StaticKeyProvider{}) + s, err := service.InitializeServer(serverConfig, &StaticKeyProvider{}) if err != nil { panic(fmt.Sprintf("could not create server: %v", err)) } + + roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d", serverConfig.APIPort), &http.Client{}) return s } -func createClient(room, name string) *client.RTCClient { - return nil +// creates a client and runs against server +func createClient(name string) *client.RTCClient { + token := joinToken(testRoom, name) + ws, err := client.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", serverConfig.RTCPort), token) + if err != nil { + panic(err) + } + + c, err := client.NewRTCClient(ws) + if err != nil { + panic(err) + } + + go c.Run() + + return c } func joinToken(room, name string) string { @@ -78,6 +119,7 @@ func (p *StaticKeyProvider) NumKeys() int { func (p *StaticKeyProvider) GetSecret(key string) string { if key == testApiKey { + logger.GetLogger().Debugf("returning secret: %s", testApiSecret) return testApiSecret } return "" diff --git a/test/integration_test.go b/test/integration_test.go index 1f1776479..af3ec66fe 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1,26 +1,58 @@ package test import ( + "context" + "net/http" "os" "testing" "github.com/stretchr/testify/assert" + "github.com/twitchtv/twirp" + + "github.com/livekit/livekit-server/pkg/logger" + "github.com/livekit/livekit-server/pkg/service" + "github.com/livekit/livekit-server/proto/livekit" ) -func TestScenarioDefault(t *testing.T) { - assert.True(t, true) +func TestClientCouldConnect(t *testing.T) { + c1 := createClient("c1") + assert.NoError(t, c1.WaitUntilConnected()) + c2 := createClient("c2") + assert.NoError(t, c2.WaitUntilConnected()) + + // ensure they both see each other + withTimeout(t, func() bool { + return len(c1.RemoteParticipants()) == 1 && len(c2.RemoteParticipants()) == 1 + }) } func TestMain(m *testing.M) { + logger.InitDevelopment() s := createServer() + service.AuthRequired = true go func() { s.Start() }() waitForServerToStart(s) + // create test room + token := createRoomToken() + header := make(http.Header) + logger.GetLogger().Debugw("auth token", "token", token) + header.Set("Authorization", "Bearer "+token) + tctx, err := twirp.WithHTTPRequestHeaders(context.Background(), header) + if err != nil { + panic(err) + } + _, err = roomClient.CreateRoom(tctx, &livekit.CreateRoomRequest{Name: testRoom}) + if err != nil { + panic(err) + } + code := m.Run() + roomClient.DeleteRoom(tctx, &livekit.DeleteRoomRequest{Room: testRoom}) s.Stop() os.Exit(code) }