mirror of
https://github.com/livekit/livekit.git
synced 2026-03-29 09:19:53 +00:00
* Make new path for signalling v1.5 support. To be able to support newer clients to interact with older servers, move signalling v1.5 to new path (`/rtc1`). On the new path, `join_request` is required and single peer connection is used. With the existing path `/rtc`, single peer connection is still supported if `join_request` is used. Newer clients connecting to old server should follow 1. Try new path WebSocket 2. If that fails, try new path validate at `/rtc1/validate`. 3. If the above gets a 404 which will happen with older server, revert back to old path and signalling 1.0. Open to suggestions on path name. * test on both paths * change path from /rtc1 -> /rtc/v1 * test all rtc service path combinations
245 lines
7.1 KiB
Go
245 lines
7.1 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package test
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/livekit/livekit-server/pkg/testutils"
|
|
"github.com/livekit/protocol/auth"
|
|
"github.com/livekit/protocol/livekit"
|
|
)
|
|
|
|
var (
|
|
RegisterTimeout = 2 * time.Second
|
|
AssignJobTimeout = 3 * time.Second
|
|
)
|
|
|
|
func TestAgents(t *testing.T) {
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
_, finish := setupSingleNodeTest("TestAgents")
|
|
defer finish()
|
|
|
|
ac1, err := newAgentClient(agentToken(), defaultServerPort)
|
|
require.NoError(t, err)
|
|
ac2, err := newAgentClient(agentToken(), defaultServerPort)
|
|
require.NoError(t, err)
|
|
ac3, err := newAgentClient(agentToken(), defaultServerPort)
|
|
require.NoError(t, err)
|
|
ac4, err := newAgentClient(agentToken(), defaultServerPort)
|
|
require.NoError(t, err)
|
|
ac5, err := newAgentClient(agentToken(), defaultServerPort)
|
|
require.NoError(t, err)
|
|
ac6, err := newAgentClient(agentToken(), defaultServerPort)
|
|
require.NoError(t, err)
|
|
defer ac1.close()
|
|
defer ac2.close()
|
|
defer ac3.close()
|
|
defer ac4.close()
|
|
defer ac5.close()
|
|
defer ac6.close()
|
|
ac1.Run(livekit.JobType_JT_ROOM, "default")
|
|
ac2.Run(livekit.JobType_JT_ROOM, "default")
|
|
ac3.Run(livekit.JobType_JT_PUBLISHER, "default")
|
|
ac4.Run(livekit.JobType_JT_PUBLISHER, "default")
|
|
ac5.Run(livekit.JobType_JT_PARTICIPANT, "default")
|
|
ac6.Run(livekit.JobType_JT_PARTICIPANT, "default")
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 || ac3.registered.Load() != 1 || ac4.registered.Load() != 1 || ac5.registered.Load() != 1 || ac6.registered.Load() != 1 {
|
|
return "worker not registered"
|
|
}
|
|
|
|
return ""
|
|
}, RegisterTimeout)
|
|
|
|
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
|
|
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
|
|
waitUntilConnected(t, c1, c2)
|
|
|
|
// publish 2 tracks
|
|
t1, err := c1.AddStaticTrack("audio/opus", "audio", "micro")
|
|
require.NoError(t, err)
|
|
defer t1.Stop()
|
|
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
|
|
require.NoError(t, err)
|
|
defer t2.Stop()
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if ac1.roomJobs.Load()+ac2.roomJobs.Load() != 1 {
|
|
return "room job not assigned"
|
|
}
|
|
|
|
if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 1 {
|
|
return fmt.Sprintf("publisher jobs not assigned, ac3: %d, ac4: %d", ac3.publisherJobs.Load(), ac4.publisherJobs.Load())
|
|
}
|
|
|
|
if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 {
|
|
return fmt.Sprintf("participant jobs not assigned, ac5: %d, ac6: %d", ac5.participantJobs.Load(), ac6.participantJobs.Load())
|
|
}
|
|
|
|
return ""
|
|
}, 6*time.Second)
|
|
|
|
// publish 2 tracks
|
|
t3, err := c2.AddStaticTrack("audio/opus", "audio", "micro")
|
|
require.NoError(t, err)
|
|
defer t3.Stop()
|
|
t4, err := c2.AddStaticTrack("video/vp8", "video", "webcam")
|
|
require.NoError(t, err)
|
|
defer t4.Stop()
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if ac1.roomJobs.Load()+ac2.roomJobs.Load() != 1 {
|
|
return "room job must be assigned 1 time"
|
|
}
|
|
|
|
if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 2 {
|
|
return "2 publisher jobs must assigned"
|
|
}
|
|
|
|
if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 {
|
|
return "2 participant jobs must assigned"
|
|
}
|
|
|
|
return ""
|
|
}, AssignJobTimeout)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestAgentNamespaces(t *testing.T) {
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
_, finish := setupSingleNodeTest("TestAgentNamespaces")
|
|
defer finish()
|
|
|
|
ac1, err := newAgentClient(agentToken(), defaultServerPort)
|
|
require.NoError(t, err)
|
|
ac2, err := newAgentClient(agentToken(), defaultServerPort)
|
|
require.NoError(t, err)
|
|
defer ac1.close()
|
|
defer ac2.close()
|
|
ac1.Run(livekit.JobType_JT_ROOM, "namespace1")
|
|
ac2.Run(livekit.JobType_JT_ROOM, "namespace2")
|
|
|
|
_, err = roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{
|
|
Name: testRoom,
|
|
Agents: []*livekit.RoomAgentDispatch{
|
|
{},
|
|
{
|
|
AgentName: "ag",
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 {
|
|
return "worker not registered"
|
|
}
|
|
return ""
|
|
}, RegisterTimeout)
|
|
|
|
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
|
|
waitUntilConnected(t, c1)
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if ac1.roomJobs.Load() != 1 || ac2.roomJobs.Load() != 1 {
|
|
return "room job not assigned"
|
|
}
|
|
|
|
job1 := <-ac1.requestedJobs
|
|
job2 := <-ac2.requestedJobs
|
|
|
|
if job1.Namespace != "namespace1" {
|
|
return "namespace is not 'namespace'"
|
|
}
|
|
|
|
if job2.Namespace != "namespace2" {
|
|
return "namespace is not 'namespace2'"
|
|
}
|
|
|
|
if job1.Id == job2.Id {
|
|
return "job ids are the same"
|
|
}
|
|
|
|
return ""
|
|
}, AssignJobTimeout)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestAgentMultiNode(t *testing.T) {
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
_, _, finish := setupMultiNodeTest("TestAgentMultiNode")
|
|
defer finish()
|
|
|
|
ac1, err := newAgentClient(agentToken(), defaultServerPort)
|
|
require.NoError(t, err)
|
|
ac2, err := newAgentClient(agentToken(), defaultServerPort)
|
|
require.NoError(t, err)
|
|
defer ac1.close()
|
|
defer ac2.close()
|
|
ac1.Run(livekit.JobType_JT_ROOM, "default")
|
|
ac2.Run(livekit.JobType_JT_PUBLISHER, "default")
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 {
|
|
return "worker not registered"
|
|
}
|
|
return ""
|
|
}, RegisterTimeout)
|
|
|
|
c1 := createRTCClient("c1", secondServerPort, testRTCServicePath, nil) // Create a room on the second node
|
|
waitUntilConnected(t, c1)
|
|
|
|
t1, err := c1.AddStaticTrack("audio/opus", "audio", "micro")
|
|
require.NoError(t, err)
|
|
defer t1.Stop()
|
|
|
|
time.Sleep(time.Second * 10)
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if ac1.roomJobs.Load() != 1 {
|
|
return "room job not assigned"
|
|
}
|
|
|
|
if ac2.publisherJobs.Load() != 1 {
|
|
return "participant job not assigned"
|
|
}
|
|
|
|
return ""
|
|
}, AssignJobTimeout)
|
|
})
|
|
}
|
|
}
|
|
|
|
func agentToken() string {
|
|
at := auth.NewAccessToken(testApiKey, testApiSecret).
|
|
AddGrant(&auth.VideoGrant{Agent: true})
|
|
t, err := at.ToJWT()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return t
|
|
}
|