tests for both dual and single peer connection

This commit is contained in:
boks1971
2025-08-15 16:30:42 +05:30
parent 7183dcaad3
commit 8aaac9b397
9 changed files with 1389 additions and 1242 deletions

View File

@@ -16,7 +16,8 @@ package types
type ProtocolVersion int
const CurrentProtocol = 16
const CurrentProtocol = 17
const MaxProtocolDualPeerConnection = 16
func (v ProtocolVersion) SupportsPackedStreamId() bool {
return v > 0

View File

@@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/testutils"
testclient "github.com/livekit/livekit-server/test/client"
"github.com/livekit/protocol/auth"
@@ -33,194 +34,205 @@ var (
)
func TestAgents(t *testing.T) {
_, finish := setupSingleNodeTest("TestAgents")
defer finish()
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
_, finish := setupSingleNodeTest("TestAgents")
defer finish()
ac1, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac2, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac3, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac4, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac5, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac6, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
defer ac1.close()
defer ac2.close()
defer ac3.close()
defer ac4.close()
defer ac5.close()
defer ac6.close()
ac1.Run(livekit.JobType_JT_ROOM, "default")
ac2.Run(livekit.JobType_JT_ROOM, "default")
ac3.Run(livekit.JobType_JT_PUBLISHER, "default")
ac4.Run(livekit.JobType_JT_PUBLISHER, "default")
ac5.Run(livekit.JobType_JT_PARTICIPANT, "default")
ac6.Run(livekit.JobType_JT_PARTICIPANT, "default")
ac1, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac2, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac3, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac4, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac5, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac6, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
defer ac1.close()
defer ac2.close()
defer ac3.close()
defer ac4.close()
defer ac5.close()
defer ac6.close()
ac1.Run(livekit.JobType_JT_ROOM, "default")
ac2.Run(livekit.JobType_JT_ROOM, "default")
ac3.Run(livekit.JobType_JT_PUBLISHER, "default")
ac4.Run(livekit.JobType_JT_PUBLISHER, "default")
ac5.Run(livekit.JobType_JT_PARTICIPANT, "default")
ac6.Run(livekit.JobType_JT_PARTICIPANT, "default")
testutils.WithTimeout(t, func() string {
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 || ac3.registered.Load() != 1 || ac4.registered.Load() != 1 || ac5.registered.Load() != 1 || ac6.registered.Load() != 1 {
return "worker not registered"
}
testutils.WithTimeout(t, func() string {
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 || ac3.registered.Load() != 1 || ac4.registered.Load() != 1 || ac5.registered.Load() != 1 || ac6.registered.Load() != 1 {
return "worker not registered"
}
return ""
}, RegisterTimeout)
return ""
}, RegisterTimeout)
c1 := createRTCClient("c1", defaultServerPort, nil)
c2 := createRTCClient("c2", defaultServerPort, &testclient.Options{UseJoinRequestQueryParam: true})
waitUntilConnected(t, c1, c2)
c1 := createRTCClient("c1", defaultServerPort, pv, nil)
c2 := createRTCClient("c2", defaultServerPort, pv, &testclient.Options{UseJoinRequestQueryParam: true})
waitUntilConnected(t, c1, c2)
// publish 2 tracks
t1, err := c1.AddStaticTrack("audio/opus", "audio", "micro")
require.NoError(t, err)
defer t1.Stop()
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
require.NoError(t, err)
defer t2.Stop()
// publish 2 tracks
t1, err := c1.AddStaticTrack("audio/opus", "audio", "micro")
require.NoError(t, err)
defer t1.Stop()
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
require.NoError(t, err)
defer t2.Stop()
testutils.WithTimeout(t, func() string {
if ac1.roomJobs.Load()+ac2.roomJobs.Load() != 1 {
return "room job not assigned"
}
testutils.WithTimeout(t, func() string {
if ac1.roomJobs.Load()+ac2.roomJobs.Load() != 1 {
return "room job not assigned"
}
if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 1 {
return fmt.Sprintf("publisher jobs not assigned, ac3: %d, ac4: %d", ac3.publisherJobs.Load(), ac4.publisherJobs.Load())
}
if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 1 {
return fmt.Sprintf("publisher jobs not assigned, ac3: %d, ac4: %d", ac3.publisherJobs.Load(), ac4.publisherJobs.Load())
}
if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 {
return fmt.Sprintf("participant jobs not assigned, ac5: %d, ac6: %d", ac5.participantJobs.Load(), ac6.participantJobs.Load())
}
if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 {
return fmt.Sprintf("participant jobs not assigned, ac5: %d, ac6: %d", ac5.participantJobs.Load(), ac6.participantJobs.Load())
}
return ""
}, 6*time.Second)
return ""
}, 6*time.Second)
// publish 2 tracks
t3, err := c2.AddStaticTrack("audio/opus", "audio", "micro")
require.NoError(t, err)
defer t3.Stop()
t4, err := c2.AddStaticTrack("video/vp8", "video", "webcam")
require.NoError(t, err)
defer t4.Stop()
// publish 2 tracks
t3, err := c2.AddStaticTrack("audio/opus", "audio", "micro")
require.NoError(t, err)
defer t3.Stop()
t4, err := c2.AddStaticTrack("video/vp8", "video", "webcam")
require.NoError(t, err)
defer t4.Stop()
testutils.WithTimeout(t, func() string {
if ac1.roomJobs.Load()+ac2.roomJobs.Load() != 1 {
return "room job must be assigned 1 time"
}
testutils.WithTimeout(t, func() string {
if ac1.roomJobs.Load()+ac2.roomJobs.Load() != 1 {
return "room job must be assigned 1 time"
}
if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 2 {
return "2 publisher jobs must assigned"
}
if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 2 {
return "2 publisher jobs must assigned"
}
if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 {
return "2 participant jobs must assigned"
}
if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 {
return "2 participant jobs must assigned"
}
return ""
}, AssignJobTimeout)
return ""
}, AssignJobTimeout)
})
}
}
func TestAgentNamespaces(t *testing.T) {
_, finish := setupSingleNodeTest("TestAgentNamespaces")
defer finish()
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
_, finish := setupSingleNodeTest("TestAgentNamespaces")
defer finish()
ac1, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac2, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
defer ac1.close()
defer ac2.close()
ac1.Run(livekit.JobType_JT_ROOM, "namespace1")
ac2.Run(livekit.JobType_JT_ROOM, "namespace2")
ac1, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac2, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
defer ac1.close()
defer ac2.close()
ac1.Run(livekit.JobType_JT_ROOM, "namespace1")
ac2.Run(livekit.JobType_JT_ROOM, "namespace2")
_, err = roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{
Name: testRoom,
Agents: []*livekit.RoomAgentDispatch{
{},
{
AgentName: "ag",
},
},
})
require.NoError(t, err)
_, err = roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{
Name: testRoom,
Agents: []*livekit.RoomAgentDispatch{
{},
{
AgentName: "ag",
},
},
})
require.NoError(t, err)
testutils.WithTimeout(t, func() string {
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 {
return "worker not registered"
}
return ""
}, RegisterTimeout)
testutils.WithTimeout(t, func() string {
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 {
return "worker not registered"
}
return ""
}, RegisterTimeout)
c1 := createRTCClient("c1", defaultServerPort, nil)
waitUntilConnected(t, c1)
c1 := createRTCClient("c1", defaultServerPort, pv, nil)
waitUntilConnected(t, c1)
testutils.WithTimeout(t, func() string {
if ac1.roomJobs.Load() != 1 || ac2.roomJobs.Load() != 1 {
return "room job not assigned"
}
testutils.WithTimeout(t, func() string {
if ac1.roomJobs.Load() != 1 || ac2.roomJobs.Load() != 1 {
return "room job not assigned"
}
job1 := <-ac1.requestedJobs
job2 := <-ac2.requestedJobs
job1 := <-ac1.requestedJobs
job2 := <-ac2.requestedJobs
if job1.Namespace != "namespace1" {
return "namespace is not 'namespace'"
}
if job1.Namespace != "namespace1" {
return "namespace is not 'namespace'"
}
if job2.Namespace != "namespace2" {
return "namespace is not 'namespace2'"
}
if job2.Namespace != "namespace2" {
return "namespace is not 'namespace2'"
}
if job1.Id == job2.Id {
return "job ids are the same"
}
return ""
}, AssignJobTimeout)
if job1.Id == job2.Id {
return "job ids are the same"
}
return ""
}, AssignJobTimeout)
})
}
}
func TestAgentMultiNode(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestAgentMultiNode")
defer finish()
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestAgentMultiNode")
defer finish()
ac1, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac2, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
defer ac1.close()
defer ac2.close()
ac1.Run(livekit.JobType_JT_ROOM, "default")
ac2.Run(livekit.JobType_JT_PUBLISHER, "default")
ac1, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac2, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
defer ac1.close()
defer ac2.close()
ac1.Run(livekit.JobType_JT_ROOM, "default")
ac2.Run(livekit.JobType_JT_PUBLISHER, "default")
testutils.WithTimeout(t, func() string {
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 {
return "worker not registered"
}
return ""
}, RegisterTimeout)
testutils.WithTimeout(t, func() string {
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 {
return "worker not registered"
}
return ""
}, RegisterTimeout)
c1 := createRTCClient("c1", secondServerPort, nil) // Create a room on the second node
waitUntilConnected(t, c1)
c1 := createRTCClient("c1", secondServerPort, pv, nil) // Create a room on the second node
waitUntilConnected(t, c1)
t1, err := c1.AddStaticTrack("audio/opus", "audio", "micro")
require.NoError(t, err)
defer t1.Stop()
t1, err := c1.AddStaticTrack("audio/opus", "audio", "micro")
require.NoError(t, err)
defer t1.Stop()
time.Sleep(time.Second * 10)
time.Sleep(time.Second * 10)
testutils.WithTimeout(t, func() string {
if ac1.roomJobs.Load() != 1 {
return "room job not assigned"
}
testutils.WithTimeout(t, func() string {
if ac1.roomJobs.Load() != 1 {
return "room job not assigned"
}
if ac2.publisherJobs.Load() != 1 {
return "participant job not assigned"
}
if ac2.publisherJobs.Load() != 1 {
return "participant job not assigned"
}
return ""
}, AssignJobTimeout)
return ""
}, AssignJobTimeout)
})
}
}
func agentToken() string {

View File

@@ -126,7 +126,7 @@ type Options struct {
UseJoinRequestQueryParam bool
}
func NewWebSocketConn(host, token string, opts *Options) (*websocket.Conn, error) {
func NewWebSocketConn(host, token string, protocolVersion types.ProtocolVersion, opts *Options) (*websocket.Conn, error) {
u, err := url.Parse(host + "/rtc")
if err != nil {
return nil, err
@@ -139,7 +139,7 @@ func NewWebSocketConn(host, token string, opts *Options) (*websocket.Conn, error
clientInfo := &livekit.ClientInfo{
Os: runtime.GOOS,
Sdk: livekit.ClientInfo_GO,
Protocol: types.CurrentProtocol,
Protocol: int32(protocolVersion),
}
if opts.ClientInfo != nil {
clientInfo = opts.ClientInfo
@@ -164,7 +164,7 @@ func NewWebSocketConn(host, token string, opts *Options) (*websocket.Conn, error
}
}
} else {
connectUrl += fmt.Sprintf("?protocol=%d", types.CurrentProtocol)
connectUrl += fmt.Sprintf("?protocol=%d", protocolVersion)
sdk := "go"
if opts != nil {
@@ -202,11 +202,11 @@ func SetAuthorizationToken(header http.Header, token string) {
header.Set("Authorization", "Bearer "+token)
}
func NewRTCClient(conn *websocket.Conn, opts *Options) (*RTCClient, error) {
func NewRTCClient(conn *websocket.Conn, protocolVersion types.ProtocolVersion, opts *Options) (*RTCClient, error) {
var err error
c := &RTCClient{
protocolVersion: types.CurrentProtocol,
protocolVersion: protocolVersion,
conn: conn,
localTracks: make(map[string]webrtc.TrackLocal),
trackSenders: make(map[string]*webrtc.RTPSender),

View File

@@ -33,6 +33,7 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/service"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/livekit-server/pkg/testutils"
@@ -202,18 +203,18 @@ func createMultiNodeServer(nodeID string, port uint32) *service.LivekitServer {
}
// creates a client and runs against server
func createRTCClient(name string, port int, opts *testclient.Options) *testclient.RTCClient {
func createRTCClient(name string, port int, protocolVersion types.ProtocolVersion, opts *testclient.Options) *testclient.RTCClient {
var customizer func(token *auth.AccessToken, grants *auth.VideoGrant)
if opts != nil {
customizer = opts.TokenCustomizer
}
token := joinToken(testRoom, name, customizer)
ws, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", port), token, opts)
ws, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", port), token, protocolVersion, opts)
if err != nil {
panic(err)
}
c, err := testclient.NewRTCClient(ws, opts)
c, err := testclient.NewRTCClient(ws, protocolVersion, opts)
if err != nil {
panic(err)
}
@@ -224,13 +225,13 @@ func createRTCClient(name string, port int, opts *testclient.Options) *testclien
}
// creates a client and runs against server
func createRTCClientWithToken(token string, port int, opts *testclient.Options) *testclient.RTCClient {
ws, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", port), token, opts)
func createRTCClientWithToken(token string, port int, protocolVersion types.ProtocolVersion, opts *testclient.Options) *testclient.RTCClient {
ws, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", port), token, protocolVersion, opts)
if err != nil {
panic(err)
}
c, err := testclient.NewRTCClient(ws, opts)
c, err := testclient.NewRTCClient(ws, protocolVersion, opts)
if err != nil {
panic(err)
}

View File

@@ -22,6 +22,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/testutils"
)
@@ -61,24 +62,28 @@ func TestMultiNodeUpdateRoomMetadata(t *testing.T) {
})
t.Run("when room has a participant", func(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestMultiNodeUpdateRoomMetadata_with_participant")
defer finish()
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestMultiNodeUpdateRoomMetadata_with_participant")
defer finish()
c1 := createRTCClient("c1", defaultServerPort, nil)
waitUntilConnected(t, c1)
defer c1.Stop()
c1 := createRTCClient("c1", defaultServerPort, pv, nil)
waitUntilConnected(t, c1)
defer c1.Stop()
_, err := roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{
Name: "emptyRoom",
})
require.NoError(t, err)
_, err := roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{
Name: "emptyRoom",
})
require.NoError(t, err)
rm, err := roomClient.UpdateRoomMetadata(contextWithToken(adminRoomToken("emptyRoom")), &livekit.UpdateRoomMetadataRequest{
Room: "emptyRoom",
Metadata: "updated metadata",
})
require.NoError(t, err)
require.Equal(t, "updated metadata", rm.Metadata)
rm, err := roomClient.UpdateRoomMetadata(contextWithToken(adminRoomToken("emptyRoom")), &livekit.UpdateRoomMetadataRequest{
Room: "emptyRoom",
Metadata: "updated metadata",
})
require.NoError(t, err)
require.Equal(t, "updated metadata", rm.Metadata)
})
}
})
}
@@ -89,85 +94,97 @@ func TestMultiNodeRemoveParticipant(t *testing.T) {
return
}
_, _, finish := setupMultiNodeTest("TestMultiNodeRemoveParticipant")
defer finish()
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestMultiNodeRemoveParticipant")
defer finish()
c1 := createRTCClient("mn_remove_participant", defaultServerPort, nil)
defer c1.Stop()
waitUntilConnected(t, c1)
c1 := createRTCClient("mn_remove_participant", defaultServerPort, pv, nil)
defer c1.Stop()
waitUntilConnected(t, c1)
ctx := contextWithToken(adminRoomToken(testRoom))
_, err := roomClient.RemoveParticipant(ctx, &livekit.RoomParticipantIdentity{
Room: testRoom,
Identity: "mn_remove_participant",
})
require.NoError(t, err)
ctx := contextWithToken(adminRoomToken(testRoom))
_, err := roomClient.RemoveParticipant(ctx, &livekit.RoomParticipantIdentity{
Room: testRoom,
Identity: "mn_remove_participant",
})
require.NoError(t, err)
// participant list doesn't show the participant
listRes, err := roomClient.ListParticipants(ctx, &livekit.ListParticipantsRequest{
Room: testRoom,
})
require.NoError(t, err)
require.Len(t, listRes.Participants, 0)
// participant list doesn't show the participant
listRes, err := roomClient.ListParticipants(ctx, &livekit.ListParticipantsRequest{
Room: testRoom,
})
require.NoError(t, err)
require.Len(t, listRes.Participants, 0)
})
}
}
// update participant metadata
func TestMultiNodeUpdateParticipantMetadata(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestMultiNodeUpdateParticipantMetadata")
defer finish()
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestMultiNodeUpdateParticipantMetadata")
defer finish()
c1 := createRTCClient("update_participant_metadata", defaultServerPort, nil)
defer c1.Stop()
waitUntilConnected(t, c1)
c1 := createRTCClient("update_participant_metadata", defaultServerPort, pv, nil)
defer c1.Stop()
waitUntilConnected(t, c1)
ctx := contextWithToken(adminRoomToken(testRoom))
res, err := roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{
Room: testRoom,
Identity: "update_participant_metadata",
Metadata: "the new metadata",
})
require.NoError(t, err)
require.Equal(t, "the new metadata", res.Metadata)
ctx := contextWithToken(adminRoomToken(testRoom))
res, err := roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{
Room: testRoom,
Identity: "update_participant_metadata",
Metadata: "the new metadata",
})
require.NoError(t, err)
require.Equal(t, "the new metadata", res.Metadata)
})
}
}
// admin mute published track
func TestMultiNodeMutePublishedTrack(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestMultiNodeMutePublishedTrack")
defer finish()
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestMultiNodeMutePublishedTrack")
defer finish()
identity := "mute_published_track"
c1 := createRTCClient(identity, defaultServerPort, nil)
defer c1.Stop()
waitUntilConnected(t, c1)
identity := "mute_published_track"
c1 := createRTCClient(identity, defaultServerPort, pv, nil)
defer c1.Stop()
waitUntilConnected(t, c1)
writers := publishTracksForClients(t, c1)
defer stopWriters(writers...)
writers := publishTracksForClients(t, c1)
defer stopWriters(writers...)
trackIDs := c1.GetPublishedTrackIDs()
require.NotEmpty(t, trackIDs)
trackIDs := c1.GetPublishedTrackIDs()
require.NotEmpty(t, trackIDs)
ctx := contextWithToken(adminRoomToken(testRoom))
// wait for it to be published before
testutils.WithTimeout(t, func() string {
res, err := roomClient.GetParticipant(ctx, &livekit.RoomParticipantIdentity{
Room: testRoom,
Identity: identity,
ctx := contextWithToken(adminRoomToken(testRoom))
// wait for it to be published before
testutils.WithTimeout(t, func() string {
res, err := roomClient.GetParticipant(ctx, &livekit.RoomParticipantIdentity{
Room: testRoom,
Identity: identity,
})
require.NoError(t, err)
if len(res.Tracks) == 2 {
return ""
} else {
return fmt.Sprintf("expected 2 tracks to be published, actual: %d", len(res.Tracks))
}
})
res, err := roomClient.MutePublishedTrack(ctx, &livekit.MuteRoomTrackRequest{
Room: testRoom,
Identity: identity,
TrackSid: trackIDs[0],
Muted: true,
})
require.NoError(t, err)
require.Equal(t, trackIDs[0], res.Track.Sid)
require.True(t, res.Track.Muted)
})
require.NoError(t, err)
if len(res.Tracks) == 2 {
return ""
} else {
return fmt.Sprintf("expected 2 tracks to be published, actual: %d", len(res.Tracks))
}
})
res, err := roomClient.MutePublishedTrack(ctx, &livekit.MuteRoomTrackRequest{
Room: testRoom,
Identity: identity,
TrackSid: trackIDs[0],
Muted: true,
})
require.NoError(t, err)
require.Equal(t, trackIDs[0], res.Track.Sid)
require.True(t, res.Track.Muted)
}
}

View File

@@ -15,6 +15,7 @@
package test
import (
"fmt"
"testing"
"time"
@@ -24,6 +25,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/testutils"
"github.com/livekit/livekit-server/test/client"
)
@@ -33,6 +35,7 @@ func TestMultiNodeRouting(t *testing.T) {
t.SkipNow()
return
}
_, _, finish := setupMultiNodeTest("TestMultiNodeRouting")
defer finish()
@@ -42,35 +45,39 @@ func TestMultiNodeRouting(t *testing.T) {
})
require.NoError(t, err)
// one node connecting to node 1, and another connecting to node 2
c1 := createRTCClient("c1", defaultServerPort, nil)
c2 := createRTCClient("c2", secondServerPort, nil)
waitUntilConnected(t, c1, c2)
defer stopClients(c1, c2)
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
// one node connecting to node 1, and another connecting to node 2
c1 := createRTCClient("c1", defaultServerPort, pv, nil)
c2 := createRTCClient("c2", secondServerPort, pv, nil)
waitUntilConnected(t, c1, c2)
defer stopClients(c1, c2)
// c1 publishing, and c2 receiving
t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
require.NoError(t, err)
if t1 != nil {
defer t1.Stop()
// c1 publishing, and c2 receiving
t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
require.NoError(t, err)
if t1 != nil {
defer t1.Stop()
}
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2 received no tracks"
}
if len(c2.SubscribedTracks()[c1.ID()]) != 1 {
return "c2 didn't receive track published by c1"
}
tr1 := c2.SubscribedTracks()[c1.ID()][0]
streamID, _ := rtc.UnpackStreamID(tr1.StreamID())
require.Equal(t, c1.ID(), streamID)
return ""
})
remoteC1 := c2.GetRemoteParticipant(c1.ID())
require.Equal(t, "c1", remoteC1.Name)
require.Equal(t, "metadatac1", remoteC1.Metadata)
})
}
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2 received no tracks"
}
if len(c2.SubscribedTracks()[c1.ID()]) != 1 {
return "c2 didn't receive track published by c1"
}
tr1 := c2.SubscribedTracks()[c1.ID()][0]
streamID, _ := rtc.UnpackStreamID(tr1.StreamID())
require.Equal(t, c1.ID(), streamID)
return ""
})
remoteC1 := c2.GetRemoteParticipant(c1.ID())
require.Equal(t, "c1", remoteC1.Name)
require.Equal(t, "metadatac1", remoteC1.Metadata)
}
func TestConnectWithoutCreation(t *testing.T) {
@@ -82,10 +89,14 @@ func TestConnectWithoutCreation(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestConnectWithoutCreation")
defer finish()
c1 := createRTCClient("c1", defaultServerPort, nil)
waitUntilConnected(t, c1)
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
c1 := createRTCClient("c1", defaultServerPort, pv, nil)
waitUntilConnected(t, c1)
c1.Stop()
c1.Stop()
})
}
}
// testing multiple scenarios rooms
@@ -118,30 +129,34 @@ func TestMultinodeReconnectAfterNodeShutdown(t *testing.T) {
return
}
_, s2, finish := setupMultiNodeTest("TestMultinodeReconnectAfterNodeShutdown")
defer finish()
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
_, s2, finish := setupMultiNodeTest("TestMultinodeReconnectAfterNodeShutdown")
defer finish()
// creating room on node 1
_, err := roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{
Name: testRoom,
NodeId: s2.Node().Id,
})
require.NoError(t, err)
// creating room on node 1
_, err := roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{
Name: testRoom,
NodeId: s2.Node().Id,
})
require.NoError(t, err)
// one node connecting to node 1, and another connecting to node 2
c1 := createRTCClient("c1", defaultServerPort, nil)
c2 := createRTCClient("c2", secondServerPort, nil)
// one node connecting to node 1, and another connecting to node 2
c1 := createRTCClient("c1", defaultServerPort, pv, nil)
c2 := createRTCClient("c2", secondServerPort, pv, nil)
waitUntilConnected(t, c1, c2)
stopClients(c1, c2)
waitUntilConnected(t, c1, c2)
stopClients(c1, c2)
// stop s2, and connect to room again
s2.Stop(true)
// stop s2, and connect to room again
s2.Stop(true)
time.Sleep(syncDelay)
time.Sleep(syncDelay)
c3 := createRTCClient("c3", defaultServerPort, nil)
waitUntilConnected(t, c3)
c3 := createRTCClient("c3", defaultServerPort, pv, nil)
waitUntilConnected(t, c3)
})
}
}
func TestMultinodeDataPublishing(t *testing.T) {
@@ -186,48 +201,52 @@ func TestMultiNodeRefreshToken(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestMultiNodeJoinAfterClose")
defer finish()
// a participant joining with full permissions
c1 := createRTCClient("c1", defaultServerPort, nil)
waitUntilConnected(t, c1)
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
// a participant joining with full permissions
c1 := createRTCClient("c1", defaultServerPort, pv, nil)
waitUntilConnected(t, c1)
// update permissions and metadata
ctx := contextWithToken(adminRoomToken(testRoom))
_, err := roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{
Room: testRoom,
Identity: "c1",
Permission: &livekit.ParticipantPermission{
CanPublish: false,
CanSubscribe: true,
},
Metadata: "metadata",
})
require.NoError(t, err)
// update permissions and metadata
ctx := contextWithToken(adminRoomToken(testRoom))
_, err := roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{
Room: testRoom,
Identity: "c1",
Permission: &livekit.ParticipantPermission{
CanPublish: false,
CanSubscribe: true,
},
Metadata: "metadata",
})
require.NoError(t, err)
testutils.WithTimeout(t, func() string {
if c1.RefreshToken() == "" {
return "did not receive refresh token"
}
// parse token to ensure it's correct
verifier, err := auth.ParseAPIToken(c1.RefreshToken())
require.NoError(t, err)
testutils.WithTimeout(t, func() string {
if c1.RefreshToken() == "" {
return "did not receive refresh token"
}
// parse token to ensure it's correct
verifier, err := auth.ParseAPIToken(c1.RefreshToken())
require.NoError(t, err)
grants, err := verifier.Verify(testApiSecret)
require.NoError(t, err)
grants, err := verifier.Verify(testApiSecret)
require.NoError(t, err)
if grants.Metadata != "metadata" {
return "metadata did not match"
}
if *grants.Video.CanPublish {
return "canPublish should be false"
}
if *grants.Video.CanPublishData {
return "canPublishData should be false"
}
if !*grants.Video.CanSubscribe {
return "canSubscribe should be true"
}
return ""
})
if grants.Metadata != "metadata" {
return "metadata did not match"
}
if *grants.Video.CanPublish {
return "canPublish should be false"
}
if *grants.Video.CanPublishData {
return "canPublishData should be false"
}
if !*grants.Video.CanSubscribe {
return "canSubscribe should be true"
}
return ""
})
})
}
}
// ensure that token accurately reflects out of band updates
@@ -240,158 +259,170 @@ func TestMultiNodeUpdateAttributes(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestMultiNodeUpdateAttributes")
defer finish()
c1 := createRTCClient("au1", defaultServerPort, &client.Options{
TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) {
token.SetAttributes(map[string]string{
"mykey": "au1",
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
c1 := createRTCClient("au1", defaultServerPort, pv, &client.Options{
TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) {
token.SetAttributes(map[string]string{
"mykey": "au1",
})
},
})
},
})
c2 := createRTCClient("au2", secondServerPort, &client.Options{
TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) {
token.SetAttributes(map[string]string{
"mykey": "au2",
c2 := createRTCClient("au2", secondServerPort, pv, &client.Options{
TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) {
token.SetAttributes(map[string]string{
"mykey": "au2",
})
grants.SetCanUpdateOwnMetadata(true)
},
})
grants.SetCanUpdateOwnMetadata(true)
},
})
waitUntilConnected(t, c1, c2)
waitUntilConnected(t, c1, c2)
testutils.WithTimeout(t, func() string {
rc2 := c1.GetRemoteParticipant(c2.ID())
rc1 := c2.GetRemoteParticipant(c1.ID())
if rc2 == nil || rc1 == nil {
return "participants could not see each other"
}
if rc1.Attributes == nil || rc1.Attributes["mykey"] != "au1" {
return "rc1's initial attributes are incorrect"
}
if rc2.Attributes == nil || rc2.Attributes["mykey"] != "au2" {
return "rc2's initial attributes are incorrect"
}
return ""
})
testutils.WithTimeout(t, func() string {
rc2 := c1.GetRemoteParticipant(c2.ID())
rc1 := c2.GetRemoteParticipant(c1.ID())
if rc2 == nil || rc1 == nil {
return "participants could not see each other"
}
if rc1.Attributes == nil || rc1.Attributes["mykey"] != "au1" {
return "rc1's initial attributes are incorrect"
}
if rc2.Attributes == nil || rc2.Attributes["mykey"] != "au2" {
return "rc2's initial attributes are incorrect"
}
return ""
})
// this one should not go through
_ = c1.SetAttributes(map[string]string{"mykey": "shouldnotchange"})
_ = c2.SetAttributes(map[string]string{"secondkey": "au2"})
// this one should not go through
_ = c1.SetAttributes(map[string]string{"mykey": "shouldnotchange"})
_ = c2.SetAttributes(map[string]string{"secondkey": "au2"})
// updates using room API should succeed
_, err := roomClient.UpdateParticipant(contextWithToken(adminRoomToken(testRoom)), &livekit.UpdateParticipantRequest{
Room: testRoom,
Identity: "au1",
Attributes: map[string]string{
"secondkey": "au1",
},
})
require.NoError(t, err)
// updates using room API should succeed
_, err := roomClient.UpdateParticipant(contextWithToken(adminRoomToken(testRoom)), &livekit.UpdateParticipantRequest{
Room: testRoom,
Identity: "au1",
Attributes: map[string]string{
"secondkey": "au1",
},
})
require.NoError(t, err)
testutils.WithTimeout(t, func() string {
rc1 := c2.GetRemoteParticipant(c1.ID())
rc2 := c1.GetRemoteParticipant(c2.ID())
if rc1.Attributes["secondkey"] != "au1" {
return "au1's attribute update failed"
}
if rc2.Attributes["secondkey"] != "au2" {
return "au2's attribute update failed"
}
if rc1.Attributes["mykey"] != "au1" {
return "au1's mykey should not change"
}
if rc2.Attributes["mykey"] != "au2" {
return "au2's mykey should not change"
}
return ""
})
testutils.WithTimeout(t, func() string {
rc1 := c2.GetRemoteParticipant(c1.ID())
rc2 := c1.GetRemoteParticipant(c2.ID())
if rc1.Attributes["secondkey"] != "au1" {
return "au1's attribute update failed"
}
if rc2.Attributes["secondkey"] != "au2" {
return "au2's attribute update failed"
}
if rc1.Attributes["mykey"] != "au1" {
return "au1's mykey should not change"
}
if rc2.Attributes["mykey"] != "au2" {
return "au2's mykey should not change"
}
return ""
})
})
}
}
func TestMultiNodeRevokePublishPermission(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestMultiNodeRevokePublishPermission")
defer finish()
c1 := createRTCClient("c1", defaultServerPort, nil)
c2 := createRTCClient("c2", secondServerPort, nil)
waitUntilConnected(t, c1, c2)
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
c1 := createRTCClient("c1", defaultServerPort, pv, nil)
c2 := createRTCClient("c2", secondServerPort, pv, nil)
waitUntilConnected(t, c1, c2)
// c1 publishes a track for c2
writers := publishTracksForClients(t, c1)
defer stopWriters(writers...)
// c1 publishes a track for c2
writers := publishTracksForClients(t, c1)
defer stopWriters(writers...)
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
return "c2 did not receive c1's tracks"
}
return ""
})
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
return "c2 did not receive c1's tracks"
}
return ""
})
// revoke permission
ctx := contextWithToken(adminRoomToken(testRoom))
_, err := roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{
Room: testRoom,
Identity: "c1",
Permission: &livekit.ParticipantPermission{
CanPublish: false,
CanPublishData: true,
CanSubscribe: true,
},
})
require.NoError(t, err)
// revoke permission
ctx := contextWithToken(adminRoomToken(testRoom))
_, err := roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{
Room: testRoom,
Identity: "c1",
Permission: &livekit.ParticipantPermission{
CanPublish: false,
CanPublishData: true,
CanSubscribe: true,
},
})
require.NoError(t, err)
// ensure c1 no longer has track published, c2 no longer see track under C1
testutils.WithTimeout(t, func() string {
if len(c1.GetPublishedTrackIDs()) != 0 {
return "c1 did not unpublish tracks"
}
remoteC1 := c2.GetRemoteParticipant(c1.ID())
if remoteC1 == nil {
return "c2 doesn't know about c1"
}
if len(remoteC1.Tracks) != 0 {
return "c2 still has c1's tracks"
}
return ""
})
// ensure c1 no longer has track published, c2 no longer see track under C1
testutils.WithTimeout(t, func() string {
if len(c1.GetPublishedTrackIDs()) != 0 {
return "c1 did not unpublish tracks"
}
remoteC1 := c2.GetRemoteParticipant(c1.ID())
if remoteC1 == nil {
return "c2 doesn't know about c1"
}
if len(remoteC1.Tracks) != 0 {
return "c2 still has c1's tracks"
}
return ""
})
})
}
}
func TestCloseDisconnectedParticipantOnSignalClose(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestCloseDisconnectedParticipantOnSignalClose")
defer finish()
c1 := createRTCClient("c1", secondServerPort, nil)
waitUntilConnected(t, c1)
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
c1 := createRTCClient("c1", secondServerPort, pv, nil)
waitUntilConnected(t, c1)
c2 := createRTCClient("c2", defaultServerPort, &client.Options{
SignalRequestInterceptor: func(msg *livekit.SignalRequest, next client.SignalRequestHandler) error {
switch msg.Message.(type) {
case *livekit.SignalRequest_Offer, *livekit.SignalRequest_Answer, *livekit.SignalRequest_Leave:
return nil
default:
return next(msg)
}
},
SignalResponseInterceptor: func(msg *livekit.SignalResponse, next client.SignalResponseHandler) error {
switch msg.Message.(type) {
case *livekit.SignalResponse_Offer, *livekit.SignalResponse_Answer:
return nil
default:
return next(msg)
}
},
})
c2 := createRTCClient("c2", defaultServerPort, pv, &client.Options{
SignalRequestInterceptor: func(msg *livekit.SignalRequest, next client.SignalRequestHandler) error {
switch msg.Message.(type) {
case *livekit.SignalRequest_Offer, *livekit.SignalRequest_Answer, *livekit.SignalRequest_Leave:
return nil
default:
return next(msg)
}
},
SignalResponseInterceptor: func(msg *livekit.SignalResponse, next client.SignalResponseHandler) error {
switch msg.Message.(type) {
case *livekit.SignalResponse_Offer, *livekit.SignalResponse_Answer:
return nil
default:
return next(msg)
}
},
})
testutils.WithTimeout(t, func() string {
if len(c1.RemoteParticipants()) != 1 {
return "c1 did not see c2 join"
}
return ""
})
testutils.WithTimeout(t, func() string {
if len(c1.RemoteParticipants()) != 1 {
return "c1 did not see c2 join"
}
return ""
})
c2.Stop()
c2.Stop()
testutils.WithTimeout(t, func() string {
if len(c1.RemoteParticipants()) != 0 {
return "c1 did not see c2 removed"
}
return ""
})
testutils.WithTimeout(t, func() string {
if len(c1.RemoteParticipants()) != 0 {
return "c1 did not see c2 removed"
}
return ""
})
})
}
}

View File

@@ -25,185 +25,206 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/testutils"
testclient "github.com/livekit/livekit-server/test/client"
)
// a scenario with lots of clients connecting, publishing, and leaving at random periods
func scenarioPublishingUponJoining(t *testing.T) {
c1 := createRTCClient("puj_1", defaultServerPort, nil)
c2 := createRTCClient("puj_2", secondServerPort, &testclient.Options{AutoSubscribe: true})
c3 := createRTCClient("puj_3", defaultServerPort, &testclient.Options{AutoSubscribe: true})
defer stopClients(c1, c2, c3)
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
c1 := createRTCClient("puj_1", defaultServerPort, pv, nil)
c2 := createRTCClient("puj_2", secondServerPort, pv, &testclient.Options{AutoSubscribe: true})
c3 := createRTCClient("puj_3", defaultServerPort, pv, &testclient.Options{AutoSubscribe: true})
defer stopClients(c1, c2, c3)
waitUntilConnected(t, c1, c2, c3)
waitUntilConnected(t, c1, c2, c3)
// c1 and c2 publishing, c3 just receiving
writers := publishTracksForClients(t, c1, c2)
defer stopWriters(writers...)
// c1 and c2 publishing, c3 just receiving
writers := publishTracksForClients(t, c1, c2)
defer stopWriters(writers...)
logger.Infow("waiting to receive tracks from c1 and c2")
testutils.WithTimeout(t, func() string {
tracks := c3.SubscribedTracks()
if len(tracks[c1.ID()]) != 2 {
return "did not receive tracks from c1"
}
if len(tracks[c2.ID()]) != 2 {
return "did not receive tracks from c2"
}
return ""
})
logger.Infow("waiting to receive tracks from c1 and c2")
testutils.WithTimeout(t, func() string {
tracks := c3.SubscribedTracks()
if len(tracks[c1.ID()]) != 2 {
return "did not receive tracks from c1"
}
if len(tracks[c2.ID()]) != 2 {
return "did not receive tracks from c2"
}
return ""
})
// after a delay, c2 reconnects, then publishing
time.Sleep(syncDelay)
c2.Stop()
// after a delay, c2 reconnects, then publishing
time.Sleep(syncDelay)
c2.Stop()
logger.Infow("waiting for c2 tracks to be gone")
testutils.WithTimeout(t, func() string {
tracks := c3.SubscribedTracks()
logger.Infow("waiting for c2 tracks to be gone")
testutils.WithTimeout(t, func() string {
tracks := c3.SubscribedTracks()
if len(tracks[c1.ID()]) != 2 {
return fmt.Sprintf("c3 should be subscribed to 2 tracks from c1, actual: %d", len(tracks[c1.ID()]))
}
if len(tracks[c2.ID()]) != 0 {
return fmt.Sprintf("c3 should be subscribed to 0 tracks from c2, actual: %d", len(tracks[c2.ID()]))
}
if len(c1.SubscribedTracks()[c2.ID()]) != 0 {
return fmt.Sprintf("c3 should be subscribed to 0 tracks from c2, actual: %d", len(c1.SubscribedTracks()[c2.ID()]))
}
return ""
})
if len(tracks[c1.ID()]) != 2 {
return fmt.Sprintf("c3 should be subscribed to 2 tracks from c1, actual: %d", len(tracks[c1.ID()]))
}
if len(tracks[c2.ID()]) != 0 {
return fmt.Sprintf("c3 should be subscribed to 0 tracks from c2, actual: %d", len(tracks[c2.ID()]))
}
if len(c1.SubscribedTracks()[c2.ID()]) != 0 {
return fmt.Sprintf("c3 should be subscribed to 0 tracks from c2, actual: %d", len(c1.SubscribedTracks()[c2.ID()]))
}
return ""
})
logger.Infow("c2 reconnecting")
// connect to a diff port
c2 = createRTCClient("puj_2", defaultServerPort, nil)
defer c2.Stop()
waitUntilConnected(t, c2)
writers = publishTracksForClients(t, c2)
defer stopWriters(writers...)
logger.Infow("c2 reconnecting")
// connect to a diff port
c2 = createRTCClient("puj_2", defaultServerPort, pv, nil)
defer c2.Stop()
waitUntilConnected(t, c2)
writers = publishTracksForClients(t, c2)
defer stopWriters(writers...)
testutils.WithTimeout(t, func() string {
tracks := c3.SubscribedTracks()
// "new c2 tracks should be published again",
if len(tracks[c2.ID()]) != 2 {
return fmt.Sprintf("c3 should be subscribed to 2 tracks from c2, actual: %d", len(tracks[c2.ID()]))
}
if len(c1.SubscribedTracks()[c2.ID()]) != 2 {
return fmt.Sprintf("c1 should be subscribed to 2 tracks from c2, actual: %d", len(c1.SubscribedTracks()[c2.ID()]))
}
return ""
})
testutils.WithTimeout(t, func() string {
tracks := c3.SubscribedTracks()
// "new c2 tracks should be published again",
if len(tracks[c2.ID()]) != 2 {
return fmt.Sprintf("c3 should be subscribed to 2 tracks from c2, actual: %d", len(tracks[c2.ID()]))
}
if len(c1.SubscribedTracks()[c2.ID()]) != 2 {
return fmt.Sprintf("c1 should be subscribed to 2 tracks from c2, actual: %d", len(c1.SubscribedTracks()[c2.ID()]))
}
return ""
})
})
}
}
func scenarioReceiveBeforePublish(t *testing.T) {
c1 := createRTCClient("rbp_1", defaultServerPort, nil)
c2 := createRTCClient("rbp_2", defaultServerPort, nil)
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
c1 := createRTCClient("rbp_1", defaultServerPort, pv, nil)
c2 := createRTCClient("rbp_2", defaultServerPort, pv, nil)
waitUntilConnected(t, c1, c2)
defer stopClients(c1, c2)
waitUntilConnected(t, c1, c2)
defer stopClients(c1, c2)
// c1 publishes
writers := publishTracksForClients(t, c1)
defer stopWriters(writers...)
// c1 publishes
writers := publishTracksForClients(t, c1)
defer stopWriters(writers...)
// c2 should see some bytes flowing through
testutils.WithTimeout(t, func() string {
if c2.BytesReceived() > 20 {
return ""
} else {
return fmt.Sprintf("c2 only received %d bytes", c2.BytesReceived())
}
})
// c2 should see some bytes flowing through
testutils.WithTimeout(t, func() string {
if c2.BytesReceived() > 20 {
return ""
} else {
return fmt.Sprintf("c2 only received %d bytes", c2.BytesReceived())
}
})
// now publish on C2
writers = publishTracksForClients(t, c2)
defer stopWriters(writers...)
// now publish on C2
writers = publishTracksForClients(t, c2)
defer stopWriters(writers...)
testutils.WithTimeout(t, func() string {
if len(c1.SubscribedTracks()[c2.ID()]) == 2 {
return ""
} else {
return fmt.Sprintf("expected c1 to receive 2 tracks from c2, actual: %d", len(c1.SubscribedTracks()[c2.ID()]))
}
})
testutils.WithTimeout(t, func() string {
if len(c1.SubscribedTracks()[c2.ID()]) == 2 {
return ""
} else {
return fmt.Sprintf("expected c1 to receive 2 tracks from c2, actual: %d", len(c1.SubscribedTracks()[c2.ID()]))
}
})
// now leave, and ensure that it's immediate
c2.Stop()
// now leave, and ensure that it's immediate
c2.Stop()
testutils.WithTimeout(t, func() string {
if len(c1.RemoteParticipants()) > 0 {
return fmt.Sprintf("expected no remote participants, actual: %v", c1.RemoteParticipants())
}
return ""
})
testutils.WithTimeout(t, func() string {
if len(c1.RemoteParticipants()) > 0 {
return fmt.Sprintf("expected no remote participants, actual: %v", c1.RemoteParticipants())
}
return ""
})
})
}
}
func scenarioDataPublish(t *testing.T) {
c1 := createRTCClient("dp1", defaultServerPort, nil)
c2 := createRTCClient("dp2", secondServerPort, nil)
waitUntilConnected(t, c1, c2)
defer stopClients(c1, c2)
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
c1 := createRTCClient("dp1", defaultServerPort, pv, nil)
c2 := createRTCClient("dp2", secondServerPort, pv, nil)
waitUntilConnected(t, c1, c2)
defer stopClients(c1, c2)
payload := "test bytes"
payload := "test bytes"
received := atomic.NewBool(false)
c2.OnDataReceived = func(data []byte, sid string) {
if string(data) == payload && livekit.ParticipantID(sid) == c1.ID() {
received.Store(true)
}
received := atomic.NewBool(false)
c2.OnDataReceived = func(data []byte, sid string) {
if string(data) == payload && livekit.ParticipantID(sid) == c1.ID() {
received.Store(true)
}
}
require.NoError(t, c1.PublishData([]byte(payload), livekit.DataPacket_RELIABLE))
testutils.WithTimeout(t, func() string {
if received.Load() {
return ""
} else {
return "c2 did not receive published data"
}
})
})
}
require.NoError(t, c1.PublishData([]byte(payload), livekit.DataPacket_RELIABLE))
testutils.WithTimeout(t, func() string {
if received.Load() {
return ""
} else {
return "c2 did not receive published data"
}
})
}
func scenarioDataUnlabeledPublish(t *testing.T) {
c1 := createRTCClient("dp1", defaultServerPort, nil)
c2 := createRTCClient("dp2", secondServerPort, nil)
waitUntilConnected(t, c1, c2)
defer stopClients(c1, c2)
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
c1 := createRTCClient("dp1", defaultServerPort, pv, nil)
c2 := createRTCClient("dp2", secondServerPort, pv, nil)
waitUntilConnected(t, c1, c2)
defer stopClients(c1, c2)
payload := "test unlabeled bytes"
payload := "test unlabeled bytes"
received := atomic.NewBool(false)
c2.OnDataReceived = func(data []byte, _sid string) {
if string(data) == payload {
received.Store(true)
}
received := atomic.NewBool(false)
c2.OnDataReceived = func(data []byte, _sid string) {
if string(data) == payload {
received.Store(true)
}
}
require.NoError(t, c1.PublishDataUnlabeled([]byte(payload)))
testutils.WithTimeout(t, func() string {
if received.Load() {
return ""
} else {
return "c2 did not receive published data unlabeled"
}
})
})
}
require.NoError(t, c1.PublishDataUnlabeled([]byte(payload)))
testutils.WithTimeout(t, func() string {
if received.Load() {
return ""
} else {
return "c2 did not receive published data unlabeled"
}
})
}
func scenarioJoinClosedRoom(t *testing.T) {
c1 := createRTCClient("jcr1", defaultServerPort, nil)
waitUntilConnected(t, c1)
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
c1 := createRTCClient("jcr1", defaultServerPort, pv, nil)
waitUntilConnected(t, c1)
// close room with room client
_, err := roomClient.DeleteRoom(contextWithToken(createRoomToken()), &livekit.DeleteRoomRequest{
Room: testRoom,
})
require.NoError(t, err)
// close room with room client
_, err := roomClient.DeleteRoom(contextWithToken(createRoomToken()), &livekit.DeleteRoomRequest{
Room: testRoom,
})
require.NoError(t, err)
// now join again
c2 := createRTCClient("jcr2", defaultServerPort, nil)
waitUntilConnected(t, c2)
stopClients(c2)
// now join again
c2 := createRTCClient("jcr2", defaultServerPort, pv, nil)
waitUntilConnected(t, c2)
stopClients(c2)
})
}
}
// close a room that has been created, but no participant has joined

File diff suppressed because it is too large Load Diff

View File

@@ -45,78 +45,82 @@ func TestWebhooks(t *testing.T) {
require.NoError(t, err)
defer finish()
c1 := createRTCClient("c1", defaultServerPort, nil)
waitUntilConnected(t, c1)
testutils.WithTimeout(t, func() string {
if ts.GetEvent(webhook.EventRoomStarted) == nil {
return "did not receive RoomStarted"
}
if ts.GetEvent(webhook.EventParticipantJoined) == nil {
return "did not receive ParticipantJoined"
}
return ""
})
for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} {
t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) {
c1 := createRTCClient("c1", defaultServerPort, pv, nil)
waitUntilConnected(t, c1)
testutils.WithTimeout(t, func() string {
if ts.GetEvent(webhook.EventRoomStarted) == nil {
return "did not receive RoomStarted"
}
if ts.GetEvent(webhook.EventParticipantJoined) == nil {
return "did not receive ParticipantJoined"
}
return ""
})
// first participant join should have started the room
started := ts.GetEvent(webhook.EventRoomStarted)
require.Equal(t, testRoom, started.Room.Name)
require.NotEmpty(t, started.Id)
require.Greater(t, started.CreatedAt, time.Now().Unix()-100)
require.GreaterOrEqual(t, time.Now().Unix(), started.CreatedAt)
joined := ts.GetEvent(webhook.EventParticipantJoined)
require.Equal(t, "c1", joined.Participant.Identity)
ts.ClearEvents()
// first participant join should have started the room
started := ts.GetEvent(webhook.EventRoomStarted)
require.Equal(t, testRoom, started.Room.Name)
require.NotEmpty(t, started.Id)
require.Greater(t, started.CreatedAt, time.Now().Unix()-100)
require.GreaterOrEqual(t, time.Now().Unix(), started.CreatedAt)
joined := ts.GetEvent(webhook.EventParticipantJoined)
require.Equal(t, "c1", joined.Participant.Identity)
ts.ClearEvents()
// another participant joins
c2 := createRTCClient("c2", defaultServerPort, nil)
waitUntilConnected(t, c2)
defer c2.Stop()
testutils.WithTimeout(t, func() string {
if ts.GetEvent(webhook.EventParticipantJoined) == nil {
return "did not receive ParticipantJoined"
}
return ""
})
joined = ts.GetEvent(webhook.EventParticipantJoined)
require.Equal(t, "c2", joined.Participant.Identity)
ts.ClearEvents()
// another participant joins
c2 := createRTCClient("c2", defaultServerPort, pv, nil)
waitUntilConnected(t, c2)
defer c2.Stop()
testutils.WithTimeout(t, func() string {
if ts.GetEvent(webhook.EventParticipantJoined) == nil {
return "did not receive ParticipantJoined"
}
return ""
})
joined = ts.GetEvent(webhook.EventParticipantJoined)
require.Equal(t, "c2", joined.Participant.Identity)
ts.ClearEvents()
// track published
writers := publishTracksForClients(t, c1)
defer stopWriters(writers...)
testutils.WithTimeout(t, func() string {
ev := ts.GetEvent(webhook.EventTrackPublished)
if ev == nil {
return "did not receive TrackPublished"
}
require.NotNil(t, ev.Track, "TrackPublished did not include trackInfo")
require.Equal(t, string(c1.ID()), ev.Participant.Sid)
return ""
})
ts.ClearEvents()
// track published
writers := publishTracksForClients(t, c1)
defer stopWriters(writers...)
testutils.WithTimeout(t, func() string {
ev := ts.GetEvent(webhook.EventTrackPublished)
if ev == nil {
return "did not receive TrackPublished"
}
require.NotNil(t, ev.Track, "TrackPublished did not include trackInfo")
require.Equal(t, string(c1.ID()), ev.Participant.Sid)
return ""
})
ts.ClearEvents()
// first participant leaves
c1.Stop()
testutils.WithTimeout(t, func() string {
if ts.GetEvent(webhook.EventParticipantLeft) == nil {
return "did not receive ParticipantLeft"
}
return ""
})
left := ts.GetEvent(webhook.EventParticipantLeft)
require.Equal(t, "c1", left.Participant.Identity)
ts.ClearEvents()
// first participant leaves
c1.Stop()
testutils.WithTimeout(t, func() string {
if ts.GetEvent(webhook.EventParticipantLeft) == nil {
return "did not receive ParticipantLeft"
}
return ""
})
left := ts.GetEvent(webhook.EventParticipantLeft)
require.Equal(t, "c1", left.Participant.Identity)
ts.ClearEvents()
// room closed
rm := server.RoomManager().GetRoom(context.Background(), testRoom)
rm.Close(types.ParticipantCloseReasonNone)
testutils.WithTimeout(t, func() string {
if ts.GetEvent(webhook.EventRoomFinished) == nil {
return "did not receive RoomFinished"
}
return ""
})
require.Equal(t, testRoom, ts.GetEvent(webhook.EventRoomFinished).Room.Name)
// room closed
rm := server.RoomManager().GetRoom(context.Background(), testRoom)
rm.Close(types.ParticipantCloseReasonNone)
testutils.WithTimeout(t, func() string {
if ts.GetEvent(webhook.EventRoomFinished) == nil {
return "did not receive RoomFinished"
}
return ""
})
require.Equal(t, testRoom, ts.GetEvent(webhook.EventRoomFinished).Room.Name)
})
}
}
func setupServerWithWebhook() (server *service.LivekitServer, testServer *webhookTestServer, finishFunc func(), err error) {