diff --git a/pkg/rtc/types/protocol_version.go b/pkg/rtc/types/protocol_version.go index caa7d1a0f..e7e9e1c2b 100644 --- a/pkg/rtc/types/protocol_version.go +++ b/pkg/rtc/types/protocol_version.go @@ -16,7 +16,8 @@ package types type ProtocolVersion int -const CurrentProtocol = 16 +const CurrentProtocol = 17 +const MaxProtocolDualPeerConnection = 16 func (v ProtocolVersion) SupportsPackedStreamId() bool { return v > 0 diff --git a/test/agent_test.go b/test/agent_test.go index d6a2571f2..fdd09f49e 100644 --- a/test/agent_test.go +++ b/test/agent_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/testutils" testclient "github.com/livekit/livekit-server/test/client" "github.com/livekit/protocol/auth" @@ -33,194 +34,205 @@ var ( ) func TestAgents(t *testing.T) { - _, finish := setupSingleNodeTest("TestAgents") - defer finish() + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + _, finish := setupSingleNodeTest("TestAgents") + defer finish() - ac1, err := newAgentClient(agentToken(), defaultServerPort) - require.NoError(t, err) - ac2, err := newAgentClient(agentToken(), defaultServerPort) - require.NoError(t, err) - ac3, err := newAgentClient(agentToken(), defaultServerPort) - require.NoError(t, err) - ac4, err := newAgentClient(agentToken(), defaultServerPort) - require.NoError(t, err) - ac5, err := newAgentClient(agentToken(), defaultServerPort) - require.NoError(t, err) - ac6, err := newAgentClient(agentToken(), defaultServerPort) - require.NoError(t, err) - defer ac1.close() - defer ac2.close() - defer ac3.close() - defer ac4.close() - defer ac5.close() - defer ac6.close() - ac1.Run(livekit.JobType_JT_ROOM, "default") - ac2.Run(livekit.JobType_JT_ROOM, "default") - ac3.Run(livekit.JobType_JT_PUBLISHER, "default") - ac4.Run(livekit.JobType_JT_PUBLISHER, "default") - ac5.Run(livekit.JobType_JT_PARTICIPANT, "default") - ac6.Run(livekit.JobType_JT_PARTICIPANT, "default") + ac1, err := newAgentClient(agentToken(), defaultServerPort) + require.NoError(t, err) + ac2, err := newAgentClient(agentToken(), defaultServerPort) + require.NoError(t, err) + ac3, err := newAgentClient(agentToken(), defaultServerPort) + require.NoError(t, err) + ac4, err := newAgentClient(agentToken(), defaultServerPort) + require.NoError(t, err) + ac5, err := newAgentClient(agentToken(), defaultServerPort) + require.NoError(t, err) + ac6, err := newAgentClient(agentToken(), defaultServerPort) + require.NoError(t, err) + defer ac1.close() + defer ac2.close() + defer ac3.close() + defer ac4.close() + defer ac5.close() + defer ac6.close() + ac1.Run(livekit.JobType_JT_ROOM, "default") + ac2.Run(livekit.JobType_JT_ROOM, "default") + ac3.Run(livekit.JobType_JT_PUBLISHER, "default") + ac4.Run(livekit.JobType_JT_PUBLISHER, "default") + ac5.Run(livekit.JobType_JT_PARTICIPANT, "default") + ac6.Run(livekit.JobType_JT_PARTICIPANT, "default") - testutils.WithTimeout(t, func() string { - if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 || ac3.registered.Load() != 1 || ac4.registered.Load() != 1 || ac5.registered.Load() != 1 || ac6.registered.Load() != 1 { - return "worker not registered" - } + testutils.WithTimeout(t, func() string { + if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 || ac3.registered.Load() != 1 || ac4.registered.Load() != 1 || ac5.registered.Load() != 1 || ac6.registered.Load() != 1 { + return "worker not registered" + } - return "" - }, RegisterTimeout) + return "" + }, RegisterTimeout) - c1 := createRTCClient("c1", defaultServerPort, nil) - c2 := createRTCClient("c2", defaultServerPort, &testclient.Options{UseJoinRequestQueryParam: true}) - waitUntilConnected(t, c1, c2) + c1 := createRTCClient("c1", defaultServerPort, pv, nil) + c2 := createRTCClient("c2", defaultServerPort, pv, &testclient.Options{UseJoinRequestQueryParam: true}) + waitUntilConnected(t, c1, c2) - // publish 2 tracks - t1, err := c1.AddStaticTrack("audio/opus", "audio", "micro") - require.NoError(t, err) - defer t1.Stop() - t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam") - require.NoError(t, err) - defer t2.Stop() + // publish 2 tracks + t1, err := c1.AddStaticTrack("audio/opus", "audio", "micro") + require.NoError(t, err) + defer t1.Stop() + t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam") + require.NoError(t, err) + defer t2.Stop() - testutils.WithTimeout(t, func() string { - if ac1.roomJobs.Load()+ac2.roomJobs.Load() != 1 { - return "room job not assigned" - } + testutils.WithTimeout(t, func() string { + if ac1.roomJobs.Load()+ac2.roomJobs.Load() != 1 { + return "room job not assigned" + } - if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 1 { - return fmt.Sprintf("publisher jobs not assigned, ac3: %d, ac4: %d", ac3.publisherJobs.Load(), ac4.publisherJobs.Load()) - } + if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 1 { + return fmt.Sprintf("publisher jobs not assigned, ac3: %d, ac4: %d", ac3.publisherJobs.Load(), ac4.publisherJobs.Load()) + } - if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 { - return fmt.Sprintf("participant jobs not assigned, ac5: %d, ac6: %d", ac5.participantJobs.Load(), ac6.participantJobs.Load()) - } + if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 { + return fmt.Sprintf("participant jobs not assigned, ac5: %d, ac6: %d", ac5.participantJobs.Load(), ac6.participantJobs.Load()) + } - return "" - }, 6*time.Second) + return "" + }, 6*time.Second) - // publish 2 tracks - t3, err := c2.AddStaticTrack("audio/opus", "audio", "micro") - require.NoError(t, err) - defer t3.Stop() - t4, err := c2.AddStaticTrack("video/vp8", "video", "webcam") - require.NoError(t, err) - defer t4.Stop() + // publish 2 tracks + t3, err := c2.AddStaticTrack("audio/opus", "audio", "micro") + require.NoError(t, err) + defer t3.Stop() + t4, err := c2.AddStaticTrack("video/vp8", "video", "webcam") + require.NoError(t, err) + defer t4.Stop() - testutils.WithTimeout(t, func() string { - if ac1.roomJobs.Load()+ac2.roomJobs.Load() != 1 { - return "room job must be assigned 1 time" - } + testutils.WithTimeout(t, func() string { + if ac1.roomJobs.Load()+ac2.roomJobs.Load() != 1 { + return "room job must be assigned 1 time" + } - if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 2 { - return "2 publisher jobs must assigned" - } + if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 2 { + return "2 publisher jobs must assigned" + } - if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 { - return "2 participant jobs must assigned" - } + if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 { + return "2 participant jobs must assigned" + } - return "" - }, AssignJobTimeout) + return "" + }, AssignJobTimeout) + }) + } } func TestAgentNamespaces(t *testing.T) { - _, finish := setupSingleNodeTest("TestAgentNamespaces") - defer finish() + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + _, finish := setupSingleNodeTest("TestAgentNamespaces") + defer finish() - ac1, err := newAgentClient(agentToken(), defaultServerPort) - require.NoError(t, err) - ac2, err := newAgentClient(agentToken(), defaultServerPort) - require.NoError(t, err) - defer ac1.close() - defer ac2.close() - ac1.Run(livekit.JobType_JT_ROOM, "namespace1") - ac2.Run(livekit.JobType_JT_ROOM, "namespace2") + ac1, err := newAgentClient(agentToken(), defaultServerPort) + require.NoError(t, err) + ac2, err := newAgentClient(agentToken(), defaultServerPort) + require.NoError(t, err) + defer ac1.close() + defer ac2.close() + ac1.Run(livekit.JobType_JT_ROOM, "namespace1") + ac2.Run(livekit.JobType_JT_ROOM, "namespace2") - _, err = roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{ - Name: testRoom, - Agents: []*livekit.RoomAgentDispatch{ - {}, - { - AgentName: "ag", - }, - }, - }) - require.NoError(t, err) + _, err = roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{ + Name: testRoom, + Agents: []*livekit.RoomAgentDispatch{ + {}, + { + AgentName: "ag", + }, + }, + }) + require.NoError(t, err) - testutils.WithTimeout(t, func() string { - if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 { - return "worker not registered" - } - return "" - }, RegisterTimeout) + testutils.WithTimeout(t, func() string { + if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 { + return "worker not registered" + } + return "" + }, RegisterTimeout) - c1 := createRTCClient("c1", defaultServerPort, nil) - waitUntilConnected(t, c1) + c1 := createRTCClient("c1", defaultServerPort, pv, nil) + waitUntilConnected(t, c1) - testutils.WithTimeout(t, func() string { - if ac1.roomJobs.Load() != 1 || ac2.roomJobs.Load() != 1 { - return "room job not assigned" - } + testutils.WithTimeout(t, func() string { + if ac1.roomJobs.Load() != 1 || ac2.roomJobs.Load() != 1 { + return "room job not assigned" + } - job1 := <-ac1.requestedJobs - job2 := <-ac2.requestedJobs + job1 := <-ac1.requestedJobs + job2 := <-ac2.requestedJobs - if job1.Namespace != "namespace1" { - return "namespace is not 'namespace'" - } + if job1.Namespace != "namespace1" { + return "namespace is not 'namespace'" + } - if job2.Namespace != "namespace2" { - return "namespace is not 'namespace2'" - } + if job2.Namespace != "namespace2" { + return "namespace is not 'namespace2'" + } - if job1.Id == job2.Id { - return "job ids are the same" - } - - return "" - }, AssignJobTimeout) + if job1.Id == job2.Id { + return "job ids are the same" + } + return "" + }, AssignJobTimeout) + }) + } } func TestAgentMultiNode(t *testing.T) { - _, _, finish := setupMultiNodeTest("TestAgentMultiNode") - defer finish() + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + _, _, finish := setupMultiNodeTest("TestAgentMultiNode") + defer finish() - ac1, err := newAgentClient(agentToken(), defaultServerPort) - require.NoError(t, err) - ac2, err := newAgentClient(agentToken(), defaultServerPort) - require.NoError(t, err) - defer ac1.close() - defer ac2.close() - ac1.Run(livekit.JobType_JT_ROOM, "default") - ac2.Run(livekit.JobType_JT_PUBLISHER, "default") + ac1, err := newAgentClient(agentToken(), defaultServerPort) + require.NoError(t, err) + ac2, err := newAgentClient(agentToken(), defaultServerPort) + require.NoError(t, err) + defer ac1.close() + defer ac2.close() + ac1.Run(livekit.JobType_JT_ROOM, "default") + ac2.Run(livekit.JobType_JT_PUBLISHER, "default") - testutils.WithTimeout(t, func() string { - if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 { - return "worker not registered" - } - return "" - }, RegisterTimeout) + testutils.WithTimeout(t, func() string { + if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 { + return "worker not registered" + } + return "" + }, RegisterTimeout) - c1 := createRTCClient("c1", secondServerPort, nil) // Create a room on the second node - waitUntilConnected(t, c1) + c1 := createRTCClient("c1", secondServerPort, pv, nil) // Create a room on the second node + waitUntilConnected(t, c1) - t1, err := c1.AddStaticTrack("audio/opus", "audio", "micro") - require.NoError(t, err) - defer t1.Stop() + t1, err := c1.AddStaticTrack("audio/opus", "audio", "micro") + require.NoError(t, err) + defer t1.Stop() - time.Sleep(time.Second * 10) + time.Sleep(time.Second * 10) - testutils.WithTimeout(t, func() string { - if ac1.roomJobs.Load() != 1 { - return "room job not assigned" - } + testutils.WithTimeout(t, func() string { + if ac1.roomJobs.Load() != 1 { + return "room job not assigned" + } - if ac2.publisherJobs.Load() != 1 { - return "participant job not assigned" - } + if ac2.publisherJobs.Load() != 1 { + return "participant job not assigned" + } - return "" - }, AssignJobTimeout) + return "" + }, AssignJobTimeout) + }) + } } func agentToken() string { diff --git a/test/client/client.go b/test/client/client.go index d2ef5d2e8..4b24738b1 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -126,7 +126,7 @@ type Options struct { UseJoinRequestQueryParam bool } -func NewWebSocketConn(host, token string, opts *Options) (*websocket.Conn, error) { +func NewWebSocketConn(host, token string, protocolVersion types.ProtocolVersion, opts *Options) (*websocket.Conn, error) { u, err := url.Parse(host + "/rtc") if err != nil { return nil, err @@ -139,7 +139,7 @@ func NewWebSocketConn(host, token string, opts *Options) (*websocket.Conn, error clientInfo := &livekit.ClientInfo{ Os: runtime.GOOS, Sdk: livekit.ClientInfo_GO, - Protocol: types.CurrentProtocol, + Protocol: int32(protocolVersion), } if opts.ClientInfo != nil { clientInfo = opts.ClientInfo @@ -164,7 +164,7 @@ func NewWebSocketConn(host, token string, opts *Options) (*websocket.Conn, error } } } else { - connectUrl += fmt.Sprintf("?protocol=%d", types.CurrentProtocol) + connectUrl += fmt.Sprintf("?protocol=%d", protocolVersion) sdk := "go" if opts != nil { @@ -202,11 +202,11 @@ func SetAuthorizationToken(header http.Header, token string) { header.Set("Authorization", "Bearer "+token) } -func NewRTCClient(conn *websocket.Conn, opts *Options) (*RTCClient, error) { +func NewRTCClient(conn *websocket.Conn, protocolVersion types.ProtocolVersion, opts *Options) (*RTCClient, error) { var err error c := &RTCClient{ - protocolVersion: types.CurrentProtocol, + protocolVersion: protocolVersion, conn: conn, localTracks: make(map[string]webrtc.TrackLocal), trackSenders: make(map[string]*webrtc.RTPSender), diff --git a/test/integration_helpers.go b/test/integration_helpers.go index abb6bc149..51b107ded 100644 --- a/test/integration_helpers.go +++ b/test/integration_helpers.go @@ -33,6 +33,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/service" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/livekit-server/pkg/testutils" @@ -202,18 +203,18 @@ func createMultiNodeServer(nodeID string, port uint32) *service.LivekitServer { } // creates a client and runs against server -func createRTCClient(name string, port int, opts *testclient.Options) *testclient.RTCClient { +func createRTCClient(name string, port int, protocolVersion types.ProtocolVersion, opts *testclient.Options) *testclient.RTCClient { var customizer func(token *auth.AccessToken, grants *auth.VideoGrant) if opts != nil { customizer = opts.TokenCustomizer } token := joinToken(testRoom, name, customizer) - ws, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", port), token, opts) + ws, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", port), token, protocolVersion, opts) if err != nil { panic(err) } - c, err := testclient.NewRTCClient(ws, opts) + c, err := testclient.NewRTCClient(ws, protocolVersion, opts) if err != nil { panic(err) } @@ -224,13 +225,13 @@ func createRTCClient(name string, port int, opts *testclient.Options) *testclien } // creates a client and runs against server -func createRTCClientWithToken(token string, port int, opts *testclient.Options) *testclient.RTCClient { - ws, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", port), token, opts) +func createRTCClientWithToken(token string, port int, protocolVersion types.ProtocolVersion, opts *testclient.Options) *testclient.RTCClient { + ws, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", port), token, protocolVersion, opts) if err != nil { panic(err) } - c, err := testclient.NewRTCClient(ws, opts) + c, err := testclient.NewRTCClient(ws, protocolVersion, opts) if err != nil { panic(err) } diff --git a/test/multinode_roomservice_test.go b/test/multinode_roomservice_test.go index a1ec1cb8b..719475880 100644 --- a/test/multinode_roomservice_test.go +++ b/test/multinode_roomservice_test.go @@ -22,6 +22,7 @@ import ( "github.com/livekit/protocol/livekit" + "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/testutils" ) @@ -61,24 +62,28 @@ func TestMultiNodeUpdateRoomMetadata(t *testing.T) { }) t.Run("when room has a participant", func(t *testing.T) { - _, _, finish := setupMultiNodeTest("TestMultiNodeUpdateRoomMetadata_with_participant") - defer finish() + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + _, _, finish := setupMultiNodeTest("TestMultiNodeUpdateRoomMetadata_with_participant") + defer finish() - c1 := createRTCClient("c1", defaultServerPort, nil) - waitUntilConnected(t, c1) - defer c1.Stop() + c1 := createRTCClient("c1", defaultServerPort, pv, nil) + waitUntilConnected(t, c1) + defer c1.Stop() - _, err := roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{ - Name: "emptyRoom", - }) - require.NoError(t, err) + _, err := roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{ + Name: "emptyRoom", + }) + require.NoError(t, err) - rm, err := roomClient.UpdateRoomMetadata(contextWithToken(adminRoomToken("emptyRoom")), &livekit.UpdateRoomMetadataRequest{ - Room: "emptyRoom", - Metadata: "updated metadata", - }) - require.NoError(t, err) - require.Equal(t, "updated metadata", rm.Metadata) + rm, err := roomClient.UpdateRoomMetadata(contextWithToken(adminRoomToken("emptyRoom")), &livekit.UpdateRoomMetadataRequest{ + Room: "emptyRoom", + Metadata: "updated metadata", + }) + require.NoError(t, err) + require.Equal(t, "updated metadata", rm.Metadata) + }) + } }) } @@ -89,85 +94,97 @@ func TestMultiNodeRemoveParticipant(t *testing.T) { return } - _, _, finish := setupMultiNodeTest("TestMultiNodeRemoveParticipant") - defer finish() + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + _, _, finish := setupMultiNodeTest("TestMultiNodeRemoveParticipant") + defer finish() - c1 := createRTCClient("mn_remove_participant", defaultServerPort, nil) - defer c1.Stop() - waitUntilConnected(t, c1) + c1 := createRTCClient("mn_remove_participant", defaultServerPort, pv, nil) + defer c1.Stop() + waitUntilConnected(t, c1) - ctx := contextWithToken(adminRoomToken(testRoom)) - _, err := roomClient.RemoveParticipant(ctx, &livekit.RoomParticipantIdentity{ - Room: testRoom, - Identity: "mn_remove_participant", - }) - require.NoError(t, err) + ctx := contextWithToken(adminRoomToken(testRoom)) + _, err := roomClient.RemoveParticipant(ctx, &livekit.RoomParticipantIdentity{ + Room: testRoom, + Identity: "mn_remove_participant", + }) + require.NoError(t, err) - // participant list doesn't show the participant - listRes, err := roomClient.ListParticipants(ctx, &livekit.ListParticipantsRequest{ - Room: testRoom, - }) - require.NoError(t, err) - require.Len(t, listRes.Participants, 0) + // participant list doesn't show the participant + listRes, err := roomClient.ListParticipants(ctx, &livekit.ListParticipantsRequest{ + Room: testRoom, + }) + require.NoError(t, err) + require.Len(t, listRes.Participants, 0) + }) + } } // update participant metadata func TestMultiNodeUpdateParticipantMetadata(t *testing.T) { - _, _, finish := setupMultiNodeTest("TestMultiNodeUpdateParticipantMetadata") - defer finish() + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + _, _, finish := setupMultiNodeTest("TestMultiNodeUpdateParticipantMetadata") + defer finish() - c1 := createRTCClient("update_participant_metadata", defaultServerPort, nil) - defer c1.Stop() - waitUntilConnected(t, c1) + c1 := createRTCClient("update_participant_metadata", defaultServerPort, pv, nil) + defer c1.Stop() + waitUntilConnected(t, c1) - ctx := contextWithToken(adminRoomToken(testRoom)) - res, err := roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{ - Room: testRoom, - Identity: "update_participant_metadata", - Metadata: "the new metadata", - }) - require.NoError(t, err) - require.Equal(t, "the new metadata", res.Metadata) + ctx := contextWithToken(adminRoomToken(testRoom)) + res, err := roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{ + Room: testRoom, + Identity: "update_participant_metadata", + Metadata: "the new metadata", + }) + require.NoError(t, err) + require.Equal(t, "the new metadata", res.Metadata) + }) + } } // admin mute published track func TestMultiNodeMutePublishedTrack(t *testing.T) { - _, _, finish := setupMultiNodeTest("TestMultiNodeMutePublishedTrack") - defer finish() + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + _, _, finish := setupMultiNodeTest("TestMultiNodeMutePublishedTrack") + defer finish() - identity := "mute_published_track" - c1 := createRTCClient(identity, defaultServerPort, nil) - defer c1.Stop() - waitUntilConnected(t, c1) + identity := "mute_published_track" + c1 := createRTCClient(identity, defaultServerPort, pv, nil) + defer c1.Stop() + waitUntilConnected(t, c1) - writers := publishTracksForClients(t, c1) - defer stopWriters(writers...) + writers := publishTracksForClients(t, c1) + defer stopWriters(writers...) - trackIDs := c1.GetPublishedTrackIDs() - require.NotEmpty(t, trackIDs) + trackIDs := c1.GetPublishedTrackIDs() + require.NotEmpty(t, trackIDs) - ctx := contextWithToken(adminRoomToken(testRoom)) - // wait for it to be published before - testutils.WithTimeout(t, func() string { - res, err := roomClient.GetParticipant(ctx, &livekit.RoomParticipantIdentity{ - Room: testRoom, - Identity: identity, + ctx := contextWithToken(adminRoomToken(testRoom)) + // wait for it to be published before + testutils.WithTimeout(t, func() string { + res, err := roomClient.GetParticipant(ctx, &livekit.RoomParticipantIdentity{ + Room: testRoom, + Identity: identity, + }) + require.NoError(t, err) + if len(res.Tracks) == 2 { + return "" + } else { + return fmt.Sprintf("expected 2 tracks to be published, actual: %d", len(res.Tracks)) + } + }) + + res, err := roomClient.MutePublishedTrack(ctx, &livekit.MuteRoomTrackRequest{ + Room: testRoom, + Identity: identity, + TrackSid: trackIDs[0], + Muted: true, + }) + require.NoError(t, err) + require.Equal(t, trackIDs[0], res.Track.Sid) + require.True(t, res.Track.Muted) }) - require.NoError(t, err) - if len(res.Tracks) == 2 { - return "" - } else { - return fmt.Sprintf("expected 2 tracks to be published, actual: %d", len(res.Tracks)) - } - }) - - res, err := roomClient.MutePublishedTrack(ctx, &livekit.MuteRoomTrackRequest{ - Room: testRoom, - Identity: identity, - TrackSid: trackIDs[0], - Muted: true, - }) - require.NoError(t, err) - require.Equal(t, trackIDs[0], res.Track.Sid) - require.True(t, res.Track.Muted) + } } diff --git a/test/multinode_test.go b/test/multinode_test.go index e79c8316c..5a15a0542 100644 --- a/test/multinode_test.go +++ b/test/multinode_test.go @@ -15,6 +15,7 @@ package test import ( + "fmt" "testing" "time" @@ -24,6 +25,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/livekit-server/pkg/rtc" + "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/testutils" "github.com/livekit/livekit-server/test/client" ) @@ -33,6 +35,7 @@ func TestMultiNodeRouting(t *testing.T) { t.SkipNow() return } + _, _, finish := setupMultiNodeTest("TestMultiNodeRouting") defer finish() @@ -42,35 +45,39 @@ func TestMultiNodeRouting(t *testing.T) { }) require.NoError(t, err) - // one node connecting to node 1, and another connecting to node 2 - c1 := createRTCClient("c1", defaultServerPort, nil) - c2 := createRTCClient("c2", secondServerPort, nil) - waitUntilConnected(t, c1, c2) - defer stopClients(c1, c2) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + // one node connecting to node 1, and another connecting to node 2 + c1 := createRTCClient("c1", defaultServerPort, pv, nil) + c2 := createRTCClient("c2", secondServerPort, pv, nil) + waitUntilConnected(t, c1, c2) + defer stopClients(c1, c2) - // c1 publishing, and c2 receiving - t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam") - require.NoError(t, err) - if t1 != nil { - defer t1.Stop() + // c1 publishing, and c2 receiving + t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam") + require.NoError(t, err) + if t1 != nil { + defer t1.Stop() + } + + testutils.WithTimeout(t, func() string { + if len(c2.SubscribedTracks()) == 0 { + return "c2 received no tracks" + } + if len(c2.SubscribedTracks()[c1.ID()]) != 1 { + return "c2 didn't receive track published by c1" + } + tr1 := c2.SubscribedTracks()[c1.ID()][0] + streamID, _ := rtc.UnpackStreamID(tr1.StreamID()) + require.Equal(t, c1.ID(), streamID) + return "" + }) + + remoteC1 := c2.GetRemoteParticipant(c1.ID()) + require.Equal(t, "c1", remoteC1.Name) + require.Equal(t, "metadatac1", remoteC1.Metadata) + }) } - - testutils.WithTimeout(t, func() string { - if len(c2.SubscribedTracks()) == 0 { - return "c2 received no tracks" - } - if len(c2.SubscribedTracks()[c1.ID()]) != 1 { - return "c2 didn't receive track published by c1" - } - tr1 := c2.SubscribedTracks()[c1.ID()][0] - streamID, _ := rtc.UnpackStreamID(tr1.StreamID()) - require.Equal(t, c1.ID(), streamID) - return "" - }) - - remoteC1 := c2.GetRemoteParticipant(c1.ID()) - require.Equal(t, "c1", remoteC1.Name) - require.Equal(t, "metadatac1", remoteC1.Metadata) } func TestConnectWithoutCreation(t *testing.T) { @@ -82,10 +89,14 @@ func TestConnectWithoutCreation(t *testing.T) { _, _, finish := setupMultiNodeTest("TestConnectWithoutCreation") defer finish() - c1 := createRTCClient("c1", defaultServerPort, nil) - waitUntilConnected(t, c1) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("c1", defaultServerPort, pv, nil) + waitUntilConnected(t, c1) - c1.Stop() + c1.Stop() + }) + } } // testing multiple scenarios rooms @@ -118,30 +129,34 @@ func TestMultinodeReconnectAfterNodeShutdown(t *testing.T) { return } - _, s2, finish := setupMultiNodeTest("TestMultinodeReconnectAfterNodeShutdown") - defer finish() + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + _, s2, finish := setupMultiNodeTest("TestMultinodeReconnectAfterNodeShutdown") + defer finish() - // creating room on node 1 - _, err := roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{ - Name: testRoom, - NodeId: s2.Node().Id, - }) - require.NoError(t, err) + // creating room on node 1 + _, err := roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{ + Name: testRoom, + NodeId: s2.Node().Id, + }) + require.NoError(t, err) - // one node connecting to node 1, and another connecting to node 2 - c1 := createRTCClient("c1", defaultServerPort, nil) - c2 := createRTCClient("c2", secondServerPort, nil) + // one node connecting to node 1, and another connecting to node 2 + c1 := createRTCClient("c1", defaultServerPort, pv, nil) + c2 := createRTCClient("c2", secondServerPort, pv, nil) - waitUntilConnected(t, c1, c2) - stopClients(c1, c2) + waitUntilConnected(t, c1, c2) + stopClients(c1, c2) - // stop s2, and connect to room again - s2.Stop(true) + // stop s2, and connect to room again + s2.Stop(true) - time.Sleep(syncDelay) + time.Sleep(syncDelay) - c3 := createRTCClient("c3", defaultServerPort, nil) - waitUntilConnected(t, c3) + c3 := createRTCClient("c3", defaultServerPort, pv, nil) + waitUntilConnected(t, c3) + }) + } } func TestMultinodeDataPublishing(t *testing.T) { @@ -186,48 +201,52 @@ func TestMultiNodeRefreshToken(t *testing.T) { _, _, finish := setupMultiNodeTest("TestMultiNodeJoinAfterClose") defer finish() - // a participant joining with full permissions - c1 := createRTCClient("c1", defaultServerPort, nil) - waitUntilConnected(t, c1) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + // a participant joining with full permissions + c1 := createRTCClient("c1", defaultServerPort, pv, nil) + waitUntilConnected(t, c1) - // update permissions and metadata - ctx := contextWithToken(adminRoomToken(testRoom)) - _, err := roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{ - Room: testRoom, - Identity: "c1", - Permission: &livekit.ParticipantPermission{ - CanPublish: false, - CanSubscribe: true, - }, - Metadata: "metadata", - }) - require.NoError(t, err) + // update permissions and metadata + ctx := contextWithToken(adminRoomToken(testRoom)) + _, err := roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{ + Room: testRoom, + Identity: "c1", + Permission: &livekit.ParticipantPermission{ + CanPublish: false, + CanSubscribe: true, + }, + Metadata: "metadata", + }) + require.NoError(t, err) - testutils.WithTimeout(t, func() string { - if c1.RefreshToken() == "" { - return "did not receive refresh token" - } - // parse token to ensure it's correct - verifier, err := auth.ParseAPIToken(c1.RefreshToken()) - require.NoError(t, err) + testutils.WithTimeout(t, func() string { + if c1.RefreshToken() == "" { + return "did not receive refresh token" + } + // parse token to ensure it's correct + verifier, err := auth.ParseAPIToken(c1.RefreshToken()) + require.NoError(t, err) - grants, err := verifier.Verify(testApiSecret) - require.NoError(t, err) + grants, err := verifier.Verify(testApiSecret) + require.NoError(t, err) - if grants.Metadata != "metadata" { - return "metadata did not match" - } - if *grants.Video.CanPublish { - return "canPublish should be false" - } - if *grants.Video.CanPublishData { - return "canPublishData should be false" - } - if !*grants.Video.CanSubscribe { - return "canSubscribe should be true" - } - return "" - }) + if grants.Metadata != "metadata" { + return "metadata did not match" + } + if *grants.Video.CanPublish { + return "canPublish should be false" + } + if *grants.Video.CanPublishData { + return "canPublishData should be false" + } + if !*grants.Video.CanSubscribe { + return "canSubscribe should be true" + } + return "" + }) + }) + } } // ensure that token accurately reflects out of band updates @@ -240,158 +259,170 @@ func TestMultiNodeUpdateAttributes(t *testing.T) { _, _, finish := setupMultiNodeTest("TestMultiNodeUpdateAttributes") defer finish() - c1 := createRTCClient("au1", defaultServerPort, &client.Options{ - TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) { - token.SetAttributes(map[string]string{ - "mykey": "au1", + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("au1", defaultServerPort, pv, &client.Options{ + TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) { + token.SetAttributes(map[string]string{ + "mykey": "au1", + }) + }, }) - }, - }) - c2 := createRTCClient("au2", secondServerPort, &client.Options{ - TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) { - token.SetAttributes(map[string]string{ - "mykey": "au2", + c2 := createRTCClient("au2", secondServerPort, pv, &client.Options{ + TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) { + token.SetAttributes(map[string]string{ + "mykey": "au2", + }) + grants.SetCanUpdateOwnMetadata(true) + }, }) - grants.SetCanUpdateOwnMetadata(true) - }, - }) - waitUntilConnected(t, c1, c2) + waitUntilConnected(t, c1, c2) - testutils.WithTimeout(t, func() string { - rc2 := c1.GetRemoteParticipant(c2.ID()) - rc1 := c2.GetRemoteParticipant(c1.ID()) - if rc2 == nil || rc1 == nil { - return "participants could not see each other" - } - if rc1.Attributes == nil || rc1.Attributes["mykey"] != "au1" { - return "rc1's initial attributes are incorrect" - } - if rc2.Attributes == nil || rc2.Attributes["mykey"] != "au2" { - return "rc2's initial attributes are incorrect" - } - return "" - }) + testutils.WithTimeout(t, func() string { + rc2 := c1.GetRemoteParticipant(c2.ID()) + rc1 := c2.GetRemoteParticipant(c1.ID()) + if rc2 == nil || rc1 == nil { + return "participants could not see each other" + } + if rc1.Attributes == nil || rc1.Attributes["mykey"] != "au1" { + return "rc1's initial attributes are incorrect" + } + if rc2.Attributes == nil || rc2.Attributes["mykey"] != "au2" { + return "rc2's initial attributes are incorrect" + } + return "" + }) - // this one should not go through - _ = c1.SetAttributes(map[string]string{"mykey": "shouldnotchange"}) - _ = c2.SetAttributes(map[string]string{"secondkey": "au2"}) + // this one should not go through + _ = c1.SetAttributes(map[string]string{"mykey": "shouldnotchange"}) + _ = c2.SetAttributes(map[string]string{"secondkey": "au2"}) - // updates using room API should succeed - _, err := roomClient.UpdateParticipant(contextWithToken(adminRoomToken(testRoom)), &livekit.UpdateParticipantRequest{ - Room: testRoom, - Identity: "au1", - Attributes: map[string]string{ - "secondkey": "au1", - }, - }) - require.NoError(t, err) + // updates using room API should succeed + _, err := roomClient.UpdateParticipant(contextWithToken(adminRoomToken(testRoom)), &livekit.UpdateParticipantRequest{ + Room: testRoom, + Identity: "au1", + Attributes: map[string]string{ + "secondkey": "au1", + }, + }) + require.NoError(t, err) - testutils.WithTimeout(t, func() string { - rc1 := c2.GetRemoteParticipant(c1.ID()) - rc2 := c1.GetRemoteParticipant(c2.ID()) - if rc1.Attributes["secondkey"] != "au1" { - return "au1's attribute update failed" - } - if rc2.Attributes["secondkey"] != "au2" { - return "au2's attribute update failed" - } - if rc1.Attributes["mykey"] != "au1" { - return "au1's mykey should not change" - } - if rc2.Attributes["mykey"] != "au2" { - return "au2's mykey should not change" - } - return "" - }) + testutils.WithTimeout(t, func() string { + rc1 := c2.GetRemoteParticipant(c1.ID()) + rc2 := c1.GetRemoteParticipant(c2.ID()) + if rc1.Attributes["secondkey"] != "au1" { + return "au1's attribute update failed" + } + if rc2.Attributes["secondkey"] != "au2" { + return "au2's attribute update failed" + } + if rc1.Attributes["mykey"] != "au1" { + return "au1's mykey should not change" + } + if rc2.Attributes["mykey"] != "au2" { + return "au2's mykey should not change" + } + return "" + }) + }) + } } func TestMultiNodeRevokePublishPermission(t *testing.T) { _, _, finish := setupMultiNodeTest("TestMultiNodeRevokePublishPermission") defer finish() - c1 := createRTCClient("c1", defaultServerPort, nil) - c2 := createRTCClient("c2", secondServerPort, nil) - waitUntilConnected(t, c1, c2) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("c1", defaultServerPort, pv, nil) + c2 := createRTCClient("c2", secondServerPort, pv, nil) + waitUntilConnected(t, c1, c2) - // c1 publishes a track for c2 - writers := publishTracksForClients(t, c1) - defer stopWriters(writers...) + // c1 publishes a track for c2 + writers := publishTracksForClients(t, c1) + defer stopWriters(writers...) - testutils.WithTimeout(t, func() string { - if len(c2.SubscribedTracks()[c1.ID()]) != 2 { - return "c2 did not receive c1's tracks" - } - return "" - }) + testutils.WithTimeout(t, func() string { + if len(c2.SubscribedTracks()[c1.ID()]) != 2 { + return "c2 did not receive c1's tracks" + } + return "" + }) - // revoke permission - ctx := contextWithToken(adminRoomToken(testRoom)) - _, err := roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{ - Room: testRoom, - Identity: "c1", - Permission: &livekit.ParticipantPermission{ - CanPublish: false, - CanPublishData: true, - CanSubscribe: true, - }, - }) - require.NoError(t, err) + // revoke permission + ctx := contextWithToken(adminRoomToken(testRoom)) + _, err := roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{ + Room: testRoom, + Identity: "c1", + Permission: &livekit.ParticipantPermission{ + CanPublish: false, + CanPublishData: true, + CanSubscribe: true, + }, + }) + require.NoError(t, err) - // ensure c1 no longer has track published, c2 no longer see track under C1 - testutils.WithTimeout(t, func() string { - if len(c1.GetPublishedTrackIDs()) != 0 { - return "c1 did not unpublish tracks" - } - remoteC1 := c2.GetRemoteParticipant(c1.ID()) - if remoteC1 == nil { - return "c2 doesn't know about c1" - } - if len(remoteC1.Tracks) != 0 { - return "c2 still has c1's tracks" - } - return "" - }) + // ensure c1 no longer has track published, c2 no longer see track under C1 + testutils.WithTimeout(t, func() string { + if len(c1.GetPublishedTrackIDs()) != 0 { + return "c1 did not unpublish tracks" + } + remoteC1 := c2.GetRemoteParticipant(c1.ID()) + if remoteC1 == nil { + return "c2 doesn't know about c1" + } + if len(remoteC1.Tracks) != 0 { + return "c2 still has c1's tracks" + } + return "" + }) + }) + } } func TestCloseDisconnectedParticipantOnSignalClose(t *testing.T) { _, _, finish := setupMultiNodeTest("TestCloseDisconnectedParticipantOnSignalClose") defer finish() - c1 := createRTCClient("c1", secondServerPort, nil) - waitUntilConnected(t, c1) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("c1", secondServerPort, pv, nil) + waitUntilConnected(t, c1) - c2 := createRTCClient("c2", defaultServerPort, &client.Options{ - SignalRequestInterceptor: func(msg *livekit.SignalRequest, next client.SignalRequestHandler) error { - switch msg.Message.(type) { - case *livekit.SignalRequest_Offer, *livekit.SignalRequest_Answer, *livekit.SignalRequest_Leave: - return nil - default: - return next(msg) - } - }, - SignalResponseInterceptor: func(msg *livekit.SignalResponse, next client.SignalResponseHandler) error { - switch msg.Message.(type) { - case *livekit.SignalResponse_Offer, *livekit.SignalResponse_Answer: - return nil - default: - return next(msg) - } - }, - }) + c2 := createRTCClient("c2", defaultServerPort, pv, &client.Options{ + SignalRequestInterceptor: func(msg *livekit.SignalRequest, next client.SignalRequestHandler) error { + switch msg.Message.(type) { + case *livekit.SignalRequest_Offer, *livekit.SignalRequest_Answer, *livekit.SignalRequest_Leave: + return nil + default: + return next(msg) + } + }, + SignalResponseInterceptor: func(msg *livekit.SignalResponse, next client.SignalResponseHandler) error { + switch msg.Message.(type) { + case *livekit.SignalResponse_Offer, *livekit.SignalResponse_Answer: + return nil + default: + return next(msg) + } + }, + }) - testutils.WithTimeout(t, func() string { - if len(c1.RemoteParticipants()) != 1 { - return "c1 did not see c2 join" - } - return "" - }) + testutils.WithTimeout(t, func() string { + if len(c1.RemoteParticipants()) != 1 { + return "c1 did not see c2 join" + } + return "" + }) - c2.Stop() + c2.Stop() - testutils.WithTimeout(t, func() string { - if len(c1.RemoteParticipants()) != 0 { - return "c1 did not see c2 removed" - } - return "" - }) + testutils.WithTimeout(t, func() string { + if len(c1.RemoteParticipants()) != 0 { + return "c1 did not see c2 removed" + } + return "" + }) + }) + } } diff --git a/test/scenarios.go b/test/scenarios.go index 9a00776fd..05eea844c 100644 --- a/test/scenarios.go +++ b/test/scenarios.go @@ -25,185 +25,206 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/testutils" testclient "github.com/livekit/livekit-server/test/client" ) // a scenario with lots of clients connecting, publishing, and leaving at random periods func scenarioPublishingUponJoining(t *testing.T) { - c1 := createRTCClient("puj_1", defaultServerPort, nil) - c2 := createRTCClient("puj_2", secondServerPort, &testclient.Options{AutoSubscribe: true}) - c3 := createRTCClient("puj_3", defaultServerPort, &testclient.Options{AutoSubscribe: true}) - defer stopClients(c1, c2, c3) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("puj_1", defaultServerPort, pv, nil) + c2 := createRTCClient("puj_2", secondServerPort, pv, &testclient.Options{AutoSubscribe: true}) + c3 := createRTCClient("puj_3", defaultServerPort, pv, &testclient.Options{AutoSubscribe: true}) + defer stopClients(c1, c2, c3) - waitUntilConnected(t, c1, c2, c3) + waitUntilConnected(t, c1, c2, c3) - // c1 and c2 publishing, c3 just receiving - writers := publishTracksForClients(t, c1, c2) - defer stopWriters(writers...) + // c1 and c2 publishing, c3 just receiving + writers := publishTracksForClients(t, c1, c2) + defer stopWriters(writers...) - logger.Infow("waiting to receive tracks from c1 and c2") - testutils.WithTimeout(t, func() string { - tracks := c3.SubscribedTracks() - if len(tracks[c1.ID()]) != 2 { - return "did not receive tracks from c1" - } - if len(tracks[c2.ID()]) != 2 { - return "did not receive tracks from c2" - } - return "" - }) + logger.Infow("waiting to receive tracks from c1 and c2") + testutils.WithTimeout(t, func() string { + tracks := c3.SubscribedTracks() + if len(tracks[c1.ID()]) != 2 { + return "did not receive tracks from c1" + } + if len(tracks[c2.ID()]) != 2 { + return "did not receive tracks from c2" + } + return "" + }) - // after a delay, c2 reconnects, then publishing - time.Sleep(syncDelay) - c2.Stop() + // after a delay, c2 reconnects, then publishing + time.Sleep(syncDelay) + c2.Stop() - logger.Infow("waiting for c2 tracks to be gone") - testutils.WithTimeout(t, func() string { - tracks := c3.SubscribedTracks() + logger.Infow("waiting for c2 tracks to be gone") + testutils.WithTimeout(t, func() string { + tracks := c3.SubscribedTracks() - if len(tracks[c1.ID()]) != 2 { - return fmt.Sprintf("c3 should be subscribed to 2 tracks from c1, actual: %d", len(tracks[c1.ID()])) - } - if len(tracks[c2.ID()]) != 0 { - return fmt.Sprintf("c3 should be subscribed to 0 tracks from c2, actual: %d", len(tracks[c2.ID()])) - } - if len(c1.SubscribedTracks()[c2.ID()]) != 0 { - return fmt.Sprintf("c3 should be subscribed to 0 tracks from c2, actual: %d", len(c1.SubscribedTracks()[c2.ID()])) - } - return "" - }) + if len(tracks[c1.ID()]) != 2 { + return fmt.Sprintf("c3 should be subscribed to 2 tracks from c1, actual: %d", len(tracks[c1.ID()])) + } + if len(tracks[c2.ID()]) != 0 { + return fmt.Sprintf("c3 should be subscribed to 0 tracks from c2, actual: %d", len(tracks[c2.ID()])) + } + if len(c1.SubscribedTracks()[c2.ID()]) != 0 { + return fmt.Sprintf("c3 should be subscribed to 0 tracks from c2, actual: %d", len(c1.SubscribedTracks()[c2.ID()])) + } + return "" + }) - logger.Infow("c2 reconnecting") - // connect to a diff port - c2 = createRTCClient("puj_2", defaultServerPort, nil) - defer c2.Stop() - waitUntilConnected(t, c2) - writers = publishTracksForClients(t, c2) - defer stopWriters(writers...) + logger.Infow("c2 reconnecting") + // connect to a diff port + c2 = createRTCClient("puj_2", defaultServerPort, pv, nil) + defer c2.Stop() + waitUntilConnected(t, c2) + writers = publishTracksForClients(t, c2) + defer stopWriters(writers...) - testutils.WithTimeout(t, func() string { - tracks := c3.SubscribedTracks() - // "new c2 tracks should be published again", - if len(tracks[c2.ID()]) != 2 { - return fmt.Sprintf("c3 should be subscribed to 2 tracks from c2, actual: %d", len(tracks[c2.ID()])) - } - if len(c1.SubscribedTracks()[c2.ID()]) != 2 { - return fmt.Sprintf("c1 should be subscribed to 2 tracks from c2, actual: %d", len(c1.SubscribedTracks()[c2.ID()])) - } - return "" - }) + testutils.WithTimeout(t, func() string { + tracks := c3.SubscribedTracks() + // "new c2 tracks should be published again", + if len(tracks[c2.ID()]) != 2 { + return fmt.Sprintf("c3 should be subscribed to 2 tracks from c2, actual: %d", len(tracks[c2.ID()])) + } + if len(c1.SubscribedTracks()[c2.ID()]) != 2 { + return fmt.Sprintf("c1 should be subscribed to 2 tracks from c2, actual: %d", len(c1.SubscribedTracks()[c2.ID()])) + } + return "" + }) + }) + } } func scenarioReceiveBeforePublish(t *testing.T) { - c1 := createRTCClient("rbp_1", defaultServerPort, nil) - c2 := createRTCClient("rbp_2", defaultServerPort, nil) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("rbp_1", defaultServerPort, pv, nil) + c2 := createRTCClient("rbp_2", defaultServerPort, pv, nil) - waitUntilConnected(t, c1, c2) - defer stopClients(c1, c2) + waitUntilConnected(t, c1, c2) + defer stopClients(c1, c2) - // c1 publishes - writers := publishTracksForClients(t, c1) - defer stopWriters(writers...) + // c1 publishes + writers := publishTracksForClients(t, c1) + defer stopWriters(writers...) - // c2 should see some bytes flowing through - testutils.WithTimeout(t, func() string { - if c2.BytesReceived() > 20 { - return "" - } else { - return fmt.Sprintf("c2 only received %d bytes", c2.BytesReceived()) - } - }) + // c2 should see some bytes flowing through + testutils.WithTimeout(t, func() string { + if c2.BytesReceived() > 20 { + return "" + } else { + return fmt.Sprintf("c2 only received %d bytes", c2.BytesReceived()) + } + }) - // now publish on C2 - writers = publishTracksForClients(t, c2) - defer stopWriters(writers...) + // now publish on C2 + writers = publishTracksForClients(t, c2) + defer stopWriters(writers...) - testutils.WithTimeout(t, func() string { - if len(c1.SubscribedTracks()[c2.ID()]) == 2 { - return "" - } else { - return fmt.Sprintf("expected c1 to receive 2 tracks from c2, actual: %d", len(c1.SubscribedTracks()[c2.ID()])) - } - }) + testutils.WithTimeout(t, func() string { + if len(c1.SubscribedTracks()[c2.ID()]) == 2 { + return "" + } else { + return fmt.Sprintf("expected c1 to receive 2 tracks from c2, actual: %d", len(c1.SubscribedTracks()[c2.ID()])) + } + }) - // now leave, and ensure that it's immediate - c2.Stop() + // now leave, and ensure that it's immediate + c2.Stop() - testutils.WithTimeout(t, func() string { - if len(c1.RemoteParticipants()) > 0 { - return fmt.Sprintf("expected no remote participants, actual: %v", c1.RemoteParticipants()) - } - return "" - }) + testutils.WithTimeout(t, func() string { + if len(c1.RemoteParticipants()) > 0 { + return fmt.Sprintf("expected no remote participants, actual: %v", c1.RemoteParticipants()) + } + return "" + }) + }) + } } func scenarioDataPublish(t *testing.T) { - c1 := createRTCClient("dp1", defaultServerPort, nil) - c2 := createRTCClient("dp2", secondServerPort, nil) - waitUntilConnected(t, c1, c2) - defer stopClients(c1, c2) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("dp1", defaultServerPort, pv, nil) + c2 := createRTCClient("dp2", secondServerPort, pv, nil) + waitUntilConnected(t, c1, c2) + defer stopClients(c1, c2) - payload := "test bytes" + payload := "test bytes" - received := atomic.NewBool(false) - c2.OnDataReceived = func(data []byte, sid string) { - if string(data) == payload && livekit.ParticipantID(sid) == c1.ID() { - received.Store(true) - } + received := atomic.NewBool(false) + c2.OnDataReceived = func(data []byte, sid string) { + if string(data) == payload && livekit.ParticipantID(sid) == c1.ID() { + received.Store(true) + } + } + + require.NoError(t, c1.PublishData([]byte(payload), livekit.DataPacket_RELIABLE)) + + testutils.WithTimeout(t, func() string { + if received.Load() { + return "" + } else { + return "c2 did not receive published data" + } + }) + }) } - - require.NoError(t, c1.PublishData([]byte(payload), livekit.DataPacket_RELIABLE)) - - testutils.WithTimeout(t, func() string { - if received.Load() { - return "" - } else { - return "c2 did not receive published data" - } - }) } func scenarioDataUnlabeledPublish(t *testing.T) { - c1 := createRTCClient("dp1", defaultServerPort, nil) - c2 := createRTCClient("dp2", secondServerPort, nil) - waitUntilConnected(t, c1, c2) - defer stopClients(c1, c2) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("dp1", defaultServerPort, pv, nil) + c2 := createRTCClient("dp2", secondServerPort, pv, nil) + waitUntilConnected(t, c1, c2) + defer stopClients(c1, c2) - payload := "test unlabeled bytes" + payload := "test unlabeled bytes" - received := atomic.NewBool(false) - c2.OnDataReceived = func(data []byte, _sid string) { - if string(data) == payload { - received.Store(true) - } + received := atomic.NewBool(false) + c2.OnDataReceived = func(data []byte, _sid string) { + if string(data) == payload { + received.Store(true) + } + } + + require.NoError(t, c1.PublishDataUnlabeled([]byte(payload))) + + testutils.WithTimeout(t, func() string { + if received.Load() { + return "" + } else { + return "c2 did not receive published data unlabeled" + } + }) + }) } - - require.NoError(t, c1.PublishDataUnlabeled([]byte(payload))) - - testutils.WithTimeout(t, func() string { - if received.Load() { - return "" - } else { - return "c2 did not receive published data unlabeled" - } - }) } func scenarioJoinClosedRoom(t *testing.T) { - c1 := createRTCClient("jcr1", defaultServerPort, nil) - waitUntilConnected(t, c1) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("jcr1", defaultServerPort, pv, nil) + waitUntilConnected(t, c1) - // close room with room client - _, err := roomClient.DeleteRoom(contextWithToken(createRoomToken()), &livekit.DeleteRoomRequest{ - Room: testRoom, - }) - require.NoError(t, err) + // close room with room client + _, err := roomClient.DeleteRoom(contextWithToken(createRoomToken()), &livekit.DeleteRoomRequest{ + Room: testRoom, + }) + require.NoError(t, err) - // now join again - c2 := createRTCClient("jcr2", defaultServerPort, nil) - waitUntilConnected(t, c2) - stopClients(c2) + // now join again + c2 := createRTCClient("jcr2", defaultServerPort, pv, nil) + waitUntilConnected(t, c2) + stopClients(c2) + }) + } } // close a room that has been created, but no participant has joined diff --git a/test/singlenode_test.go b/test/singlenode_test.go index e2ebe6532..e74827427 100644 --- a/test/singlenode_test.go +++ b/test/singlenode_test.go @@ -38,6 +38,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc" + "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu/datachannel" "github.com/livekit/livekit-server/pkg/sfu/mime" "github.com/livekit/livekit-server/pkg/testutils" @@ -58,20 +59,24 @@ func TestClientCouldConnect(t *testing.T) { _, finish := setupSingleNodeTest("TestClientCouldConnect") defer finish() - c1 := createRTCClient("c1", defaultServerPort, nil) - c2 := createRTCClient("c2", defaultServerPort, nil) - waitUntilConnected(t, c1, c2) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("c1", defaultServerPort, pv, nil) + c2 := createRTCClient("c2", defaultServerPort, pv, nil) + waitUntilConnected(t, c1, c2) - // ensure they both see each other - testutils.WithTimeout(t, func() string { - if len(c1.RemoteParticipants()) == 0 { - return "c1 did not see c2" - } - if len(c2.RemoteParticipants()) == 0 { - return "c2 did not see c1" - } - return "" - }) + // ensure they both see each other + testutils.WithTimeout(t, func() string { + if len(c1.RemoteParticipants()) == 0 { + return "c1 did not see c2" + } + if len(c2.RemoteParticipants()) == 0 { + return "c2 did not see c1" + } + return "" + }) + }) + } } func TestClientConnectDuplicate(t *testing.T) { @@ -83,65 +88,68 @@ func TestClientConnectDuplicate(t *testing.T) { _, finish := setupSingleNodeTest("TestClientConnectDuplicate") defer finish() - grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom} - grant.SetCanPublish(true) - grant.SetCanSubscribe(true) - token := joinTokenWithGrant("c1", grant) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom} + grant.SetCanPublish(true) + grant.SetCanSubscribe(true) + token := joinTokenWithGrant("c1", grant) + c1 := createRTCClientWithToken(token, defaultServerPort, pv, nil) - c1 := createRTCClientWithToken(token, defaultServerPort, nil) + // publish 2 tracks + t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam") + require.NoError(t, err) + defer t1.Stop() + t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam") + require.NoError(t, err) + defer t2.Stop() - // publish 2 tracks - t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam") - require.NoError(t, err) - defer t1.Stop() - t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam") - require.NoError(t, err) - defer t2.Stop() + c2 := createRTCClient("c2", defaultServerPort, pv, nil) + waitUntilConnected(t, c1, c2) - c2 := createRTCClient("c2", defaultServerPort, nil) - waitUntilConnected(t, c1, c2) + opts := &testclient.Options{ + Publish: "duplicate_connection", + } + testutils.WithTimeout(t, func() string { + if len(c2.SubscribedTracks()) == 0 { + return "c2 didn't subscribe to anything" + } + // should have received three tracks + if len(c2.SubscribedTracks()[c1.ID()]) != 2 { + return "c2 didn't subscribe to both tracks from c1" + } - opts := &testclient.Options{ - Publish: "duplicate_connection", + // participant ID can be appended with '#..' . but should contain orig id as prefix + tr1 := c2.SubscribedTracks()[c1.ID()][0] + participantId1, _ := rtc.UnpackStreamID(tr1.StreamID()) + require.Equal(t, c1.ID(), participantId1) + tr2 := c2.SubscribedTracks()[c1.ID()][1] + participantId2, _ := rtc.UnpackStreamID(tr2.StreamID()) + require.Equal(t, c1.ID(), participantId2) + return "" + }) + + c1Dup := createRTCClientWithToken(token, defaultServerPort, pv, opts) + + waitUntilConnected(t, c1Dup) + + t3, err := c1Dup.AddStaticTrack("video/vp8", "video", "webcam") + require.NoError(t, err) + defer t3.Stop() + + testutils.WithTimeout(t, func() string { + if len(c2.SubscribedTracks()[c1Dup.ID()]) != 1 { + return "c2 was not subscribed to track from duplicated c1" + } + + tr3 := c2.SubscribedTracks()[c1Dup.ID()][0] + participantId3, _ := rtc.UnpackStreamID(tr3.StreamID()) + require.Contains(t, c1Dup.ID(), participantId3) + + return "" + }) + }) } - testutils.WithTimeout(t, func() string { - if len(c2.SubscribedTracks()) == 0 { - return "c2 didn't subscribe to anything" - } - // should have received three tracks - if len(c2.SubscribedTracks()[c1.ID()]) != 2 { - return "c2 didn't subscribe to both tracks from c1" - } - - // participant ID can be appended with '#..' . but should contain orig id as prefix - tr1 := c2.SubscribedTracks()[c1.ID()][0] - participantId1, _ := rtc.UnpackStreamID(tr1.StreamID()) - require.Equal(t, c1.ID(), participantId1) - tr2 := c2.SubscribedTracks()[c1.ID()][1] - participantId2, _ := rtc.UnpackStreamID(tr2.StreamID()) - require.Equal(t, c1.ID(), participantId2) - return "" - }) - - c1Dup := createRTCClientWithToken(token, defaultServerPort, opts) - - waitUntilConnected(t, c1Dup) - - t3, err := c1Dup.AddStaticTrack("video/vp8", "video", "webcam") - require.NoError(t, err) - defer t3.Stop() - - testutils.WithTimeout(t, func() string { - if len(c2.SubscribedTracks()[c1Dup.ID()]) != 1 { - return "c2 was not subscribed to track from duplicated c1" - } - - tr3 := c2.SubscribedTracks()[c1Dup.ID()][0] - participantId3, _ := rtc.UnpackStreamID(tr3.StreamID()) - require.Contains(t, c1Dup.ID(), participantId3) - - return "" - }) } func TestSinglePublisher(t *testing.T) { @@ -153,76 +161,80 @@ func TestSinglePublisher(t *testing.T) { s, finish := setupSingleNodeTest("TestSinglePublisher") defer finish() - c1 := createRTCClient("c1", defaultServerPort, nil) - c2 := createRTCClient("c2", defaultServerPort, nil) - waitUntilConnected(t, c1, c2) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("c1", defaultServerPort, pv, nil) + c2 := createRTCClient("c2", defaultServerPort, pv, nil) + waitUntilConnected(t, c1, c2) - // publish a track and ensure clients receive it ok - t1, err := c1.AddStaticTrack("audio/OPUS", "audio", "webcamaudio") - require.NoError(t, err) - defer t1.Stop() - t2, err := c1.AddStaticTrack("video/vp8", "video", "webcamvideo") - require.NoError(t, err) - defer t2.Stop() + // publish a track and ensure clients receive it ok + t1, err := c1.AddStaticTrack("audio/OPUS", "audio", "webcamaudio") + require.NoError(t, err) + defer t1.Stop() + t2, err := c1.AddStaticTrack("video/vp8", "video", "webcamvideo") + require.NoError(t, err) + defer t2.Stop() - testutils.WithTimeout(t, func() string { - if len(c2.SubscribedTracks()) == 0 { - return "c2 was not subscribed to anything" - } - // should have received two tracks - if len(c2.SubscribedTracks()[c1.ID()]) != 2 { - return "c2 didn't subscribe to both tracks from c1" - } + testutils.WithTimeout(t, func() string { + if len(c2.SubscribedTracks()) == 0 { + return "c2 was not subscribed to anything" + } + // should have received two tracks + if len(c2.SubscribedTracks()[c1.ID()]) != 2 { + return "c2 didn't subscribe to both tracks from c1" + } - tr1 := c2.SubscribedTracks()[c1.ID()][0] - participantId, _ := rtc.UnpackStreamID(tr1.StreamID()) - require.Equal(t, c1.ID(), participantId) - return "" - }) - // ensure mime type is received - remoteC1 := c2.GetRemoteParticipant(c1.ID()) - audioTrack := funk.Find(remoteC1.Tracks, func(ti *livekit.TrackInfo) bool { - return ti.Name == "webcamaudio" - }).(*livekit.TrackInfo) - require.Equal(t, "audio/opus", audioTrack.MimeType) + tr1 := c2.SubscribedTracks()[c1.ID()][0] + participantId, _ := rtc.UnpackStreamID(tr1.StreamID()) + require.Equal(t, c1.ID(), participantId) + return "" + }) + // ensure mime type is received + remoteC1 := c2.GetRemoteParticipant(c1.ID()) + audioTrack := funk.Find(remoteC1.Tracks, func(ti *livekit.TrackInfo) bool { + return ti.Name == "webcamaudio" + }).(*livekit.TrackInfo) + require.Equal(t, "audio/opus", audioTrack.MimeType) - // a new client joins and should get the initial stream - c3 := createRTCClient("c3", defaultServerPort, nil) + // a new client joins and should get the initial stream + c3 := createRTCClient("c3", defaultServerPort, pv, nil) - // ensure that new client that has joined also received tracks - waitUntilConnected(t, c3) - testutils.WithTimeout(t, func() string { - if len(c3.SubscribedTracks()) == 0 { - return "c3 didn't subscribe to anything" - } - // should have received two tracks - if len(c3.SubscribedTracks()[c1.ID()]) != 2 { - return "c3 didn't subscribe to tracks from c1" - } - return "" - }) + // ensure that new client that has joined also received tracks + waitUntilConnected(t, c3) + testutils.WithTimeout(t, func() string { + if len(c3.SubscribedTracks()) == 0 { + return "c3 didn't subscribe to anything" + } + // should have received two tracks + if len(c3.SubscribedTracks()[c1.ID()]) != 2 { + return "c3 didn't subscribe to tracks from c1" + } + return "" + }) - // ensure that the track ids are generated by server - tracks := c3.SubscribedTracks()[c1.ID()] - for _, tr := range tracks { - require.True(t, strings.HasPrefix(tr.ID(), "TR_"), "track should begin with TR") - } - - // when c3 disconnects, ensure subscriber is cleaned up correctly - c3.Stop() - - testutils.WithTimeout(t, func() string { - room := s.RoomManager().GetRoom(context.Background(), testRoom) - p := room.GetParticipant("c1") - require.NotNil(t, p) - - for _, t := range p.GetPublishedTracks() { - if t.IsSubscriber(c3.ID()) { - return "c3 was not a subscriber of c1's tracks" + // ensure that the track ids are generated by server + tracks := c3.SubscribedTracks()[c1.ID()] + for _, tr := range tracks { + require.True(t, strings.HasPrefix(tr.ID(), "TR_"), "track should begin with TR") } - } - return "" - }) + + // when c3 disconnects, ensure subscriber is cleaned up correctly + c3.Stop() + + testutils.WithTimeout(t, func() string { + room := s.RoomManager().GetRoom(context.Background(), testRoom) + p := room.GetParticipant("c1") + require.NotNil(t, p) + + for _, t := range p.GetPublishedTracks() { + if t.IsSubscriber(c3.ID()) { + return "c3 was not a subscriber of c1's tracks" + } + } + return "" + }) + }) + } } func Test_WhenAutoSubscriptionDisabled_ClientShouldNotReceiveAnyPublishedTracks(t *testing.T) { @@ -234,20 +246,24 @@ func Test_WhenAutoSubscriptionDisabled_ClientShouldNotReceiveAnyPublishedTracks( _, finish := setupSingleNodeTest("Test_WhenAutoSubscriptionDisabled_ClientShouldNotReceiveAnyPublishedTracks") defer finish() - opts := testclient.Options{AutoSubscribe: false} - publisher := createRTCClient("publisher", defaultServerPort, &opts) - client := createRTCClient("client", defaultServerPort, &opts) - defer publisher.Stop() - defer client.Stop() - waitUntilConnected(t, publisher, client) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + opts := testclient.Options{AutoSubscribe: false} + publisher := createRTCClient("publisher", defaultServerPort, pv, &opts) + client := createRTCClient("client", defaultServerPort, pv, &opts) + defer publisher.Stop() + defer client.Stop() + waitUntilConnected(t, publisher, client) - track, err := publisher.AddStaticTrack("audio/opus", "audio", "webcam") - require.NoError(t, err) - defer track.Stop() + track, err := publisher.AddStaticTrack("audio/opus", "audio", "webcam") + require.NoError(t, err) + defer track.Stop() - time.Sleep(syncDelay) + time.Sleep(syncDelay) - require.Empty(t, client.SubscribedTracks()[publisher.ID()]) + require.Empty(t, client.SubscribedTracks()[publisher.ID()]) + }) + } } func Test_RenegotiationWithDifferentCodecs(t *testing.T) { @@ -259,71 +275,75 @@ func Test_RenegotiationWithDifferentCodecs(t *testing.T) { _, finish := setupSingleNodeTest("TestRenegotiationWithDifferentCodecs") defer finish() - c1 := createRTCClient("c1", defaultServerPort, nil) - c2 := createRTCClient("c2", defaultServerPort, nil) - waitUntilConnected(t, c1, c2) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("c1", defaultServerPort, pv, nil) + c2 := createRTCClient("c2", defaultServerPort, pv, nil) + waitUntilConnected(t, c1, c2) - // publish a vp8 video track and ensure clients receive it ok - t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam") - require.NoError(t, err) - defer t1.Stop() - t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam") - require.NoError(t, err) - defer t2.Stop() + // publish a vp8 video track and ensure clients receive it ok + t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam") + require.NoError(t, err) + defer t1.Stop() + t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam") + require.NoError(t, err) + defer t2.Stop() - testutils.WithTimeout(t, func() string { - if len(c2.SubscribedTracks()) == 0 { - return "c2 was not subscribed to anything" - } - // should have received two tracks - if len(c2.SubscribedTracks()[c1.ID()]) != 2 { - return "c2 was not subscribed to tracks from c1" - } + testutils.WithTimeout(t, func() string { + if len(c2.SubscribedTracks()) == 0 { + return "c2 was not subscribed to anything" + } + // should have received two tracks + if len(c2.SubscribedTracks()[c1.ID()]) != 2 { + return "c2 was not subscribed to tracks from c1" + } - tracks := c2.SubscribedTracks()[c1.ID()] - for _, t := range tracks { - if mime.IsMimeTypeStringVP8(t.Codec().MimeType) { + tracks := c2.SubscribedTracks()[c1.ID()] + for _, t := range tracks { + if mime.IsMimeTypeStringVP8(t.Codec().MimeType) { + return "" + + } + } + return "did not receive track with vp8" + }) + + t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{ + MimeType: "video/h264", + ClockRate: 90000, + SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", + }, "videoscreen", "screen") + defer t3.Stop() + require.NoError(t, err) + + testutils.WithTimeout(t, func() string { + if len(c2.SubscribedTracks()) == 0 { + return "c2's not subscribed to anything" + } + // should have received three tracks + if len(c2.SubscribedTracks()[c1.ID()]) != 3 { + return "c2's not subscribed to 3 tracks from c1" + } + + var vp8Found, h264Found bool + tracks := c2.SubscribedTracks()[c1.ID()] + for _, t := range tracks { + if mime.IsMimeTypeStringVP8(t.Codec().MimeType) { + vp8Found = true + } else if mime.IsMimeTypeStringH264(t.Codec().MimeType) { + h264Found = true + } + } + if !vp8Found { + return "did not receive track with vp8" + } + if !h264Found { + return "did not receive track with h264" + } return "" - - } - } - return "did not receive track with vp8" - }) - - t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{ - MimeType: "video/h264", - ClockRate: 90000, - SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", - }, "videoscreen", "screen") - defer t3.Stop() - require.NoError(t, err) - - testutils.WithTimeout(t, func() string { - if len(c2.SubscribedTracks()) == 0 { - return "c2's not subscribed to anything" - } - // should have received three tracks - if len(c2.SubscribedTracks()[c1.ID()]) != 3 { - return "c2's not subscribed to 3 tracks from c1" - } - - var vp8Found, h264Found bool - tracks := c2.SubscribedTracks()[c1.ID()] - for _, t := range tracks { - if mime.IsMimeTypeStringVP8(t.Codec().MimeType) { - vp8Found = true - } else if mime.IsMimeTypeStringH264(t.Codec().MimeType) { - h264Found = true - } - } - if !vp8Found { - return "did not receive track with vp8" - } - if !h264Found { - return "did not receive track with h264" - } - return "" - }) + }) + }) + } } func TestSingleNodeRoomList(t *testing.T) { @@ -401,13 +421,17 @@ func TestPingPong(t *testing.T) { _, finish := setupSingleNodeTest("TestPingPong") defer finish() - c1 := createRTCClient("c1", defaultServerPort, nil) - waitUntilConnected(t, c1) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("c1", defaultServerPort, pv, nil) + waitUntilConnected(t, c1) - require.NoError(t, c1.SendPing()) - require.Eventually(t, func() bool { - return c1.PongReceivedAt() > 0 - }, time.Second, 10*time.Millisecond) + require.NoError(t, c1.SendPing()) + require.Eventually(t, func() bool { + return c1.PongReceivedAt() > 0 + }, time.Second, 10*time.Millisecond) + }) + } } func TestSingleNodeJoinAfterClose(t *testing.T) { @@ -453,14 +477,18 @@ func TestAutoCreate(t *testing.T) { waitForServerToStart(s) - token := joinToken(testRoom, "start-before-create", nil) - _, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", defaultServerPort), token, nil) - require.Error(t, err) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + token := joinToken(testRoom, "start-before-create", nil) + _, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", defaultServerPort), token, pv, nil) + require.Error(t, err) - // second join should also fail - token = joinToken(testRoom, "start-before-create-2", nil) - _, err = testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", defaultServerPort), token, nil) - require.Error(t, err) + // second join should also fail + token = joinToken(testRoom, "start-before-create-2", nil) + _, err = testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", defaultServerPort), token, pv, nil) + require.Error(t, err) + }) + } }) t.Run("join with explicit createRoom", func(t *testing.T) { @@ -478,10 +506,14 @@ func TestAutoCreate(t *testing.T) { _, err := roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{Name: testRoom}) require.NoError(t, err) - c1 := createRTCClient("join-after-create", defaultServerPort, nil) - waitUntilConnected(t, c1) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("join-after-create", defaultServerPort, pv, nil) + waitUntilConnected(t, c1) - c1.Stop() + c1.Stop() + }) + } }) } @@ -494,54 +526,58 @@ func TestSingleNodeUpdateSubscriptionPermissions(t *testing.T) { _, finish := setupSingleNodeTest("TestSingleNodeUpdateSubscriptionPermissions") defer finish() - pub := createRTCClient("pub", defaultServerPort, nil) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + pub := createRTCClient("pub", defaultServerPort, pv, nil) - grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom} - grant.SetCanSubscribe(false) - at := auth.NewAccessToken(testApiKey, testApiSecret). - AddGrant(grant). - SetIdentity("sub") - token, err := at.ToJWT() - require.NoError(t, err) - sub := createRTCClientWithToken(token, defaultServerPort, nil) + grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom} + grant.SetCanSubscribe(false) + at := auth.NewAccessToken(testApiKey, testApiSecret). + AddGrant(grant). + SetIdentity("sub") + token, err := at.ToJWT() + require.NoError(t, err) + sub := createRTCClientWithToken(token, defaultServerPort, pv, nil) - waitUntilConnected(t, pub, sub) + waitUntilConnected(t, pub, sub) - writers := publishTracksForClients(t, pub) - defer stopWriters(writers...) + writers := publishTracksForClients(t, pub) + defer stopWriters(writers...) - // wait sub receives tracks - testutils.WithTimeout(t, func() string { - pubRemote := sub.GetRemoteParticipant(pub.ID()) - if pubRemote == nil { - return "could not find remote publisher" - } - if len(pubRemote.Tracks) != 2 { - return "did not receive metadata for published tracks" - } - return "" - }) + // wait sub receives tracks + testutils.WithTimeout(t, func() string { + pubRemote := sub.GetRemoteParticipant(pub.ID()) + if pubRemote == nil { + return "could not find remote publisher" + } + if len(pubRemote.Tracks) != 2 { + return "did not receive metadata for published tracks" + } + return "" + }) - // set permissions out of band - ctx := contextWithToken(adminRoomToken(testRoom)) - _, err = roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{ - Room: testRoom, - Identity: "sub", - Permission: &livekit.ParticipantPermission{ - CanSubscribe: true, - CanPublish: true, - }, - }) - require.NoError(t, err) + // set permissions out of band + ctx := contextWithToken(adminRoomToken(testRoom)) + _, err = roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{ + Room: testRoom, + Identity: "sub", + Permission: &livekit.ParticipantPermission{ + CanSubscribe: true, + CanPublish: true, + }, + }) + require.NoError(t, err) - testutils.WithTimeout(t, func() string { - tracks := sub.SubscribedTracks()[pub.ID()] - if len(tracks) == 2 { - return "" - } else { - return fmt.Sprintf("expected 2 tracks subscribed, actual: %d", len(tracks)) - } - }) + testutils.WithTimeout(t, func() string { + tracks := sub.SubscribedTracks()[pub.ID()] + if len(tracks) == 2 { + return "" + } else { + return fmt.Sprintf("expected 2 tracks subscribed, actual: %d", len(tracks)) + } + }) + }) + } } func TestSingleNodeAttributes(t *testing.T) { @@ -552,49 +588,53 @@ func TestSingleNodeAttributes(t *testing.T) { _, finish := setupSingleNodeTest("TestSingleNodeAttributes") defer finish() - pub := createRTCClient("pub", defaultServerPort, &testclient.Options{ - Attributes: map[string]string{ - "b": "2", - "c": "3", - }, - TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) { - T := true - grants.CanUpdateOwnMetadata = &T - token.SetAttributes(map[string]string{ - "a": "0", - "b": "1", + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + pub := createRTCClient("pub", defaultServerPort, pv, &testclient.Options{ + Attributes: map[string]string{ + "b": "2", + "c": "3", + }, + TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) { + T := true + grants.CanUpdateOwnMetadata = &T + token.SetAttributes(map[string]string{ + "a": "0", + "b": "1", + }) + }, + UseJoinRequestQueryParam: true, }) - }, - UseJoinRequestQueryParam: true, - }) - grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom} - grant.SetCanSubscribe(false) - at := auth.NewAccessToken(testApiKey, testApiSecret). - SetVideoGrant(grant). - SetIdentity("sub") - token, err := at.ToJWT() - require.NoError(t, err) - sub := createRTCClientWithToken(token, defaultServerPort, nil) + grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom} + grant.SetCanSubscribe(false) + at := auth.NewAccessToken(testApiKey, testApiSecret). + SetVideoGrant(grant). + SetIdentity("sub") + token, err := at.ToJWT() + require.NoError(t, err) + sub := createRTCClientWithToken(token, defaultServerPort, pv, nil) - waitUntilConnected(t, pub, sub) + waitUntilConnected(t, pub, sub) - // wait sub receives initial attributes - testutils.WithTimeout(t, func() string { - pubRemote := sub.GetRemoteParticipant(pub.ID()) - if pubRemote == nil { - return "could not find remote publisher" - } - attrs := pubRemote.Attributes - if !reflect.DeepEqual(attrs, map[string]string{ - "a": "0", - "b": "2", - "c": "3", - }) { - return fmt.Sprintf("did not receive expected attributes: %v", attrs) - } - return "" - }) + // wait sub receives initial attributes + testutils.WithTimeout(t, func() string { + pubRemote := sub.GetRemoteParticipant(pub.ID()) + if pubRemote == nil { + return "could not find remote publisher" + } + attrs := pubRemote.Attributes + if !reflect.DeepEqual(attrs, map[string]string{ + "a": "0", + "b": "2", + "c": "3", + }) { + return fmt.Sprintf("did not receive expected attributes: %v", attrs) + } + return "" + }) + }) + } } // TestDeviceCodecOverride checks that codecs that are incompatible with a device is not @@ -608,65 +648,69 @@ func TestDeviceCodecOverride(t *testing.T) { _, finish := setupSingleNodeTest("TestDeviceCodecOverride") defer finish() - // simulate device that isn't compatible with H.264 - c1 := createRTCClient("c1", defaultServerPort, &testclient.Options{ - ClientInfo: &livekit.ClientInfo{ - Os: "android", - DeviceModel: "Xiaomi 2201117TI", - }, - }) - defer c1.Stop() - waitUntilConnected(t, c1) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + // simulate device that isn't compatible with H.264 + c1 := createRTCClient("c1", defaultServerPort, pv, &testclient.Options{ + ClientInfo: &livekit.ClientInfo{ + Os: "android", + DeviceModel: "Xiaomi 2201117TI", + }, + }) + defer c1.Stop() + waitUntilConnected(t, c1) - // it doesn't really matter what the codec set here is, uses default Pion MediaEngine codecs - tw, err := c1.AddStaticTrack("video/h264", "video", "webcam") - require.NoError(t, err) - defer stopWriters(tw) + // it doesn't really matter what the codec set here is, uses default Pion MediaEngine codecs + tw, err := c1.AddStaticTrack("video/h264", "video", "webcam") + require.NoError(t, err) + defer stopWriters(tw) - var sd *webrtc.SessionDescription - // wait for server to receive track - if !c1.ProtocolVersion().SupportsSinglePeerConnection() { - require.Eventually(t, func() bool { - return c1.LastAnswer() != nil - }, waitTimeout, waitTick, "did not receive answer") + var sd *webrtc.SessionDescription + // wait for server to receive track + if !c1.ProtocolVersion().SupportsSinglePeerConnection() { + require.Eventually(t, func() bool { + return c1.LastAnswer() != nil + }, waitTimeout, waitTick, "did not receive answer") - sd = &webrtc.SessionDescription{ - Type: webrtc.SDPTypeAnswer, - SDP: c1.LastAnswer().SDP, - } - } else { - require.Eventually(t, func() bool { - return c1.LastOffer() != nil - }, waitTimeout, waitTick, "did not receive offer") + sd = &webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: c1.LastAnswer().SDP, + } + } else { + require.Eventually(t, func() bool { + return c1.LastOffer() != nil + }, waitTimeout, waitTick, "did not receive offer") - sd = &webrtc.SessionDescription{ - Type: webrtc.SDPTypeOffer, - SDP: c1.LastOffer().SDP, - } - } - marshaled, err := sd.Unmarshal() - require.NoError(t, err) - - // video and data channel - require.Len(t, marshaled.MediaDescriptions, 2) - var desc *sdp.MediaDescription - for _, md := range marshaled.MediaDescriptions { - if md.MediaName.Media == "video" { - desc = md - break - } - } - require.NotNil(t, desc) - hasSeenVP8 := false - for _, a := range desc.Attributes { - if a.Key == "rtpmap" { - require.NotContains(t, a.Value, mime.MimeTypeCodecH264.String(), "should not contain H264 codec") - if strings.Contains(a.Value, mime.MimeTypeCodecVP8.String()) { - hasSeenVP8 = true + sd = &webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: c1.LastOffer().SDP, + } } - } + marshaled, err := sd.Unmarshal() + require.NoError(t, err) + + // video and data channel + require.Len(t, marshaled.MediaDescriptions, 2) + var desc *sdp.MediaDescription + for _, md := range marshaled.MediaDescriptions { + if md.MediaName.Media == "video" { + desc = md + break + } + } + require.NotNil(t, desc) + hasSeenVP8 := false + for _, a := range desc.Attributes { + if a.Key == "rtpmap" { + require.NotContains(t, a.Value, mime.MimeTypeCodecH264.String(), "should not contain H264 codec") + if strings.Contains(a.Value, mime.MimeTypeCodecVP8.String()) { + hasSeenVP8 = true + } + } + } + require.True(t, hasSeenVP8, "should have seen VP8 codec in SDP") + }) } - require.True(t, hasSeenVP8, "should have seen VP8 codec in SDP") } func TestSubscribeToCodecUnsupported(t *testing.T) { @@ -678,104 +722,108 @@ func TestSubscribeToCodecUnsupported(t *testing.T) { _, finish := setupSingleNodeTest("TestSubscribeToCodecUnsupported") defer finish() - c1 := createRTCClient("c1", defaultServerPort, &testclient.Options{ - UseJoinRequestQueryParam: true, - }) - // create a client that doesn't support H264 - c2 := createRTCClient("c2", defaultServerPort, &testclient.Options{ - AutoSubscribe: true, - DisabledCodecs: []webrtc.RTPCodecCapability{ - {MimeType: "video/H264"}, - }, - }) - waitUntilConnected(t, c1, c2) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("c1", defaultServerPort, pv, &testclient.Options{ + UseJoinRequestQueryParam: true, + }) + // create a client that doesn't support H264 + c2 := createRTCClient("c2", defaultServerPort, pv, &testclient.Options{ + AutoSubscribe: true, + DisabledCodecs: []webrtc.RTPCodecCapability{ + {MimeType: "video/H264"}, + }, + }) + waitUntilConnected(t, c1, c2) - // publish a vp8 video track and ensure c2 receives it ok - t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam") - require.NoError(t, err) - defer t1.Stop() - t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam") - require.NoError(t, err) - defer t2.Stop() + // publish a vp8 video track and ensure c2 receives it ok + t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam") + require.NoError(t, err) + defer t1.Stop() + t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam") + require.NoError(t, err) + defer t2.Stop() - testutils.WithTimeout(t, func() string { - if len(c2.SubscribedTracks()) == 0 { - return "c2 was not subscribed to anything" - } - // should have received two tracks - if len(c2.SubscribedTracks()[c1.ID()]) != 2 { - return "c2 was not subscribed to tracks from c1" - } + testutils.WithTimeout(t, func() string { + if len(c2.SubscribedTracks()) == 0 { + return "c2 was not subscribed to anything" + } + // should have received two tracks + if len(c2.SubscribedTracks()[c1.ID()]) != 2 { + return "c2 was not subscribed to tracks from c1" + } - tracks := c2.SubscribedTracks()[c1.ID()] - for _, t := range tracks { - if mime.IsMimeTypeStringVP8(t.Codec().MimeType) { - return "" - } - } - return "did not receive track with vp8" - }) - require.Nil(t, c2.GetSubscriptionResponseAndClear()) + tracks := c2.SubscribedTracks()[c1.ID()] + for _, t := range tracks { + if mime.IsMimeTypeStringVP8(t.Codec().MimeType) { + return "" + } + } + return "did not receive track with vp8" + }) + require.Nil(t, c2.GetSubscriptionResponseAndClear()) - // publish a h264 track and ensure c2 got subscription error - t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{ - MimeType: "video/h264", - ClockRate: 90000, - SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", - }, "videoscreen", "screen") - defer t3.Stop() - require.NoError(t, err) + // publish a h264 track and ensure c2 got subscription error + t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{ + MimeType: "video/h264", + ClockRate: 90000, + SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", + }, "videoscreen", "screen") + defer t3.Stop() + require.NoError(t, err) - var h264TrackID string - require.Eventually(t, func() bool { - remoteC1 := c2.GetRemoteParticipant(c1.ID()) - require.NotNil(t, remoteC1) - for _, track := range remoteC1.Tracks { - if mime.IsMimeTypeStringH264(track.MimeType) { - h264TrackID = track.Sid + var h264TrackID string + require.Eventually(t, func() bool { + remoteC1 := c2.GetRemoteParticipant(c1.ID()) + require.NotNil(t, remoteC1) + for _, track := range remoteC1.Tracks { + if mime.IsMimeTypeStringH264(track.MimeType) { + h264TrackID = track.Sid + return true + } + } + return false + }, time.Second, 10*time.Millisecond, "did not receive track info with h264") + + require.Eventually(t, func() bool { + sr := c2.GetSubscriptionResponseAndClear() + if sr == nil { + return false + } + require.Equal(t, h264TrackID, sr.TrackSid) + require.Equal(t, livekit.SubscriptionError_SE_CODEC_UNSUPPORTED, sr.Err) return true - } - } - return false - }, time.Second, 10*time.Millisecond, "did not receive track info with h264") + }, 5*time.Second, 10*time.Millisecond, "did not receive subscription response") - require.Eventually(t, func() bool { - sr := c2.GetSubscriptionResponseAndClear() - if sr == nil { - return false - } - require.Equal(t, h264TrackID, sr.TrackSid) - require.Equal(t, livekit.SubscriptionError_SE_CODEC_UNSUPPORTED, sr.Err) - return true - }, 5*time.Second, 10*time.Millisecond, "did not receive subscription response") + // publish another vp8 track again, ensure the transport recovered by sfu and c2 can receive it + t4, err := c1.AddStaticTrack("video/vp8", "video2", "webcam2") + require.NoError(t, err) + defer t4.Stop() - // publish another vp8 track again, ensure the transport recovered by sfu and c2 can receive it - t4, err := c1.AddStaticTrack("video/vp8", "video2", "webcam2") - require.NoError(t, err) - defer t4.Stop() + testutils.WithTimeout(t, func() string { + if len(c2.SubscribedTracks()) == 0 { + return "c2 was not subscribed to anything" + } + // should have received two tracks + if len(c2.SubscribedTracks()[c1.ID()]) != 3 { + return "c2 was not subscribed to tracks from c1" + } - testutils.WithTimeout(t, func() string { - if len(c2.SubscribedTracks()) == 0 { - return "c2 was not subscribed to anything" - } - // should have received two tracks - if len(c2.SubscribedTracks()[c1.ID()]) != 3 { - return "c2 was not subscribed to tracks from c1" - } - - var vp8Count int - tracks := c2.SubscribedTracks()[c1.ID()] - for _, t := range tracks { - if mime.IsMimeTypeStringVP8(t.Codec().MimeType) { - vp8Count++ - } - } - if vp8Count == 2 { - return "" - } - return "did not 2 receive track with vp8" - }) - require.Nil(t, c2.GetSubscriptionResponseAndClear()) + var vp8Count int + tracks := c2.SubscribedTracks()[c1.ID()] + for _, t := range tracks { + if mime.IsMimeTypeStringVP8(t.Codec().MimeType) { + vp8Count++ + } + } + if vp8Count == 2 { + return "" + } + return "did not 2 receive track with vp8" + }) + require.Nil(t, c2.GetSubscriptionResponseAndClear()) + }) + } } func TestDataPublishSlowSubscriber(t *testing.T) { @@ -803,97 +851,101 @@ func TestDataPublishSlowSubscriber(t *testing.T) { logger.Infow("----------------FINISHING TEST----------------", "test", t.Name()) }() - pub := createRTCClient("pub", defaultServerPort, nil) - fastSub := createRTCClient("fastSub", defaultServerPort, nil) - slowSubNotDrop := createRTCClient("slowSubNotDrop", defaultServerPort, nil) - slowSubDrop := createRTCClient("slowSubDrop", defaultServerPort, nil) - waitUntilConnected(t, pub, fastSub, slowSubDrop, slowSubNotDrop) - defer func() { - pub.Stop() - fastSub.Stop() - slowSubNotDrop.Stop() - slowSubDrop.Stop() - }() + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + pub := createRTCClient("pub", defaultServerPort, pv, nil) + fastSub := createRTCClient("fastSub", defaultServerPort, pv, nil) + slowSubNotDrop := createRTCClient("slowSubNotDrop", defaultServerPort, pv, nil) + slowSubDrop := createRTCClient("slowSubDrop", defaultServerPort, pv, nil) + waitUntilConnected(t, pub, fastSub, slowSubDrop, slowSubNotDrop) + defer func() { + pub.Stop() + fastSub.Stop() + slowSubNotDrop.Stop() + slowSubDrop.Stop() + }() - // no data should be dropped for fast subscriber - var fastDataIndex atomic.Uint64 - fastSub.OnDataReceived = func(data []byte, sid string) { - idx := binary.BigEndian.Uint64(data[len(data)-8:]) - require.Equal(t, fastDataIndex.Load()+1, idx) - fastDataIndex.Store(idx) - } + // no data should be dropped for fast subscriber + var fastDataIndex atomic.Uint64 + fastSub.OnDataReceived = func(data []byte, sid string) { + idx := binary.BigEndian.Uint64(data[len(data)-8:]) + require.Equal(t, fastDataIndex.Load()+1, idx) + fastDataIndex.Store(idx) + } - // no data should be dropped for slow subscriber that is above threshold - var slowNoDropDataIndex atomic.Uint64 - var drainSlowSubNotDrop atomic.Bool - slowNoDropReader := testclient.NewDataChannelReader(dataChannelSlowThreshold * 2) - slowSubNotDrop.OnDataReceived = func(data []byte, sid string) { - idx := binary.BigEndian.Uint64(data[len(data)-8:]) - require.Equal(t, slowNoDropDataIndex.Load()+1, idx) - slowNoDropDataIndex.Store(idx) - if !drainSlowSubNotDrop.Load() { - slowNoDropReader.Read(data, sid) - } - } - - // data should be dropped for slow subscriber that is below threshold - var slowDropDataIndex atomic.Uint64 - dropped := make(chan struct{}) - slowDropReader := testclient.NewDataChannelReader(dataChannelSlowThreshold / 2) - slowSubDrop.OnDataReceived = func(data []byte, sid string) { - select { - case <-dropped: - return - default: - } - idx := binary.BigEndian.Uint64(data[len(data)-8:]) - if idx != slowDropDataIndex.Load()+1 { - close(dropped) - } - slowDropDataIndex.Store(idx) - slowDropReader.Read(data, sid) - } - - // publisher sends data as fast as possible, it will block by the slowest subscriber above the slow threshold - var ( - blocked atomic.Bool - stopWrite atomic.Bool - writeIdx atomic.Uint64 - ) - writeStopped := make(chan struct{}) - go func() { - defer close(writeStopped) - var i int - buf := make([]byte, 100) - for !stopWrite.Load() { - i++ - binary.BigEndian.PutUint64(buf[len(buf)-8:], uint64(i)) - if err := pub.PublishData(buf, livekit.DataPacket_RELIABLE); err != nil { - if errors.Is(err, datachannel.ErrDataDroppedBySlowReader) { - blocked.Store(true) - i-- - continue - } else { - t.Log("error writing", err) - break + // no data should be dropped for slow subscriber that is above threshold + var slowNoDropDataIndex atomic.Uint64 + var drainSlowSubNotDrop atomic.Bool + slowNoDropReader := testclient.NewDataChannelReader(dataChannelSlowThreshold * 2) + slowSubNotDrop.OnDataReceived = func(data []byte, sid string) { + idx := binary.BigEndian.Uint64(data[len(data)-8:]) + require.Equal(t, slowNoDropDataIndex.Load()+1, idx) + slowNoDropDataIndex.Store(idx) + if !drainSlowSubNotDrop.Load() { + slowNoDropReader.Read(data, sid) } } - writeIdx.Store(uint64(i)) - } - }() - <-dropped + // data should be dropped for slow subscriber that is below threshold + var slowDropDataIndex atomic.Uint64 + dropped := make(chan struct{}) + slowDropReader := testclient.NewDataChannelReader(dataChannelSlowThreshold / 2) + slowSubDrop.OnDataReceived = func(data []byte, sid string) { + select { + case <-dropped: + return + default: + } + idx := binary.BigEndian.Uint64(data[len(data)-8:]) + if idx != slowDropDataIndex.Load()+1 { + close(dropped) + } + slowDropDataIndex.Store(idx) + slowDropReader.Read(data, sid) + } - time.Sleep(time.Second) - blocked.Store(false) - require.Eventually(t, func() bool { return blocked.Load() }, 30*time.Second, 100*time.Millisecond) - stopWrite.Store(true) - <-writeStopped - drainSlowSubNotDrop.Store(true) - require.Eventually(t, func() bool { - return writeIdx.Load() == fastDataIndex.Load() && - writeIdx.Load() == slowNoDropDataIndex.Load() - }, 10*time.Second, 50*time.Millisecond, "writeIdx %d, fast %d, slowNoDrop %d", writeIdx.Load(), fastDataIndex.Load(), slowNoDropDataIndex.Load()) + // publisher sends data as fast as possible, it will block by the slowest subscriber above the slow threshold + var ( + blocked atomic.Bool + stopWrite atomic.Bool + writeIdx atomic.Uint64 + ) + writeStopped := make(chan struct{}) + go func() { + defer close(writeStopped) + var i int + buf := make([]byte, 100) + for !stopWrite.Load() { + i++ + binary.BigEndian.PutUint64(buf[len(buf)-8:], uint64(i)) + if err := pub.PublishData(buf, livekit.DataPacket_RELIABLE); err != nil { + if errors.Is(err, datachannel.ErrDataDroppedBySlowReader) { + blocked.Store(true) + i-- + continue + } else { + t.Log("error writing", err) + break + } + } + writeIdx.Store(uint64(i)) + } + }() + + <-dropped + + time.Sleep(time.Second) + blocked.Store(false) + require.Eventually(t, func() bool { return blocked.Load() }, 30*time.Second, 100*time.Millisecond) + stopWrite.Store(true) + <-writeStopped + drainSlowSubNotDrop.Store(true) + require.Eventually(t, func() bool { + return writeIdx.Load() == fastDataIndex.Load() && + writeIdx.Load() == slowNoDropDataIndex.Load() + }, 10*time.Second, 50*time.Millisecond, "writeIdx %d, fast %d, slowNoDrop %d", writeIdx.Load(), fastDataIndex.Load(), slowNoDropDataIndex.Load()) + }) + } } func TestFireTrackBySdp(t *testing.T) { @@ -925,15 +977,70 @@ func TestFireTrackBySdp(t *testing.T) { for _, c := range cases { codecs, sdk := c.codecs, c.pubSDK t.Run(c.name, func(t *testing.T) { - c1 := createRTCClient(c.name+"_c1", defaultServerPort, &testclient.Options{ + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient(c.name+"_c1", defaultServerPort, pv, &testclient.Options{ + ClientInfo: &livekit.ClientInfo{ + Sdk: sdk, + }, + }) + c2 := createRTCClient(c.name+"_c2", defaultServerPort, pv, &testclient.Options{ + AutoSubscribe: true, + ClientInfo: &livekit.ClientInfo{ + Sdk: livekit.ClientInfo_JS, + }, + }) + waitUntilConnected(t, c1, c2) + defer func() { + c1.Stop() + c2.Stop() + }() + + // publish tracks and don't write any packets + for _, codec := range codecs { + _, err := c1.AddStaticTrackWithCodec(codec, codec.MimeType, codec.MimeType, testclient.AddTrackNoWriter()) + require.NoError(t, err) + } + + require.Eventually(t, func() bool { + return len(c2.SubscribedTracks()[c1.ID()]) == len(codecs) + }, 5*time.Second, 10*time.Millisecond) + + var found int + for _, pubTrack := range c1.GetPublishedTrackIDs() { + t.Log("pub track", pubTrack) + tracks := c2.SubscribedTracks()[c1.ID()] + for _, track := range tracks { + t.Log("sub track", track.ID(), track.Codec()) + if track.Codec().PayloadType == 0 && track.ID() == pubTrack { + found++ + break + } + } + } + require.Equal(t, len(codecs), found) + }) + } + }) + } +} + +// SINGLE-PEER-CONNECTION-TODO: delete this test and make all other tests for both two peer connections and one peer connection +func TestSinglePeerConnection(t *testing.T) { + _, finish := setupSingleNodeTest("TestSinglePeerConnection") + defer finish() + + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("c1", defaultServerPort, pv, &testclient.Options{ ClientInfo: &livekit.ClientInfo{ - Sdk: sdk, + Sdk: livekit.ClientInfo_GO, }, }) - c2 := createRTCClient(c.name+"_c2", defaultServerPort, &testclient.Options{ + c2 := createRTCClient("c2", defaultServerPort, pv, &testclient.Options{ AutoSubscribe: true, ClientInfo: &livekit.ClientInfo{ - Sdk: livekit.ClientInfo_JS, + Sdk: livekit.ClientInfo_GO, }, }) waitUntilConnected(t, c1, c2) @@ -942,9 +1049,12 @@ func TestFireTrackBySdp(t *testing.T) { c2.Stop() }() - // publish tracks and don't write any packets + codecs := []webrtc.RTPCodecCapability{ + {MimeType: mime.MimeTypeH264.String()}, + {MimeType: mime.MimeTypeOpus.String()}, + } for _, codec := range codecs { - _, err := c1.AddStaticTrackWithCodec(codec, codec.MimeType, codec.MimeType, testclient.AddTrackNoWriter()) + _, err := c1.AddStaticTrackWithCodec(codec, codec.MimeType, codec.MimeType) require.NoError(t, err) } @@ -958,7 +1068,7 @@ func TestFireTrackBySdp(t *testing.T) { tracks := c2.SubscribedTracks()[c1.ID()] for _, track := range tracks { t.Log("sub track", track.ID(), track.Codec()) - if track.Codec().PayloadType == 0 && track.ID() == pubTrack { + if track.ID() == pubTrack { found++ break } @@ -968,53 +1078,3 @@ func TestFireTrackBySdp(t *testing.T) { }) } } - -// SINGLE-PEER-CONNECTION-TODO: delete this test and make all other tests for both two peer connections and one peer connection -func TestSinglePeerConnection(t *testing.T) { - _, finish := setupSingleNodeTest("TestSinglePeerConnection") - defer finish() - - c1 := createRTCClient("c1", defaultServerPort, &testclient.Options{ - ClientInfo: &livekit.ClientInfo{ - Sdk: livekit.ClientInfo_GO, - }, - }) - c2 := createRTCClient("c2", defaultServerPort, &testclient.Options{ - AutoSubscribe: true, - ClientInfo: &livekit.ClientInfo{ - Sdk: livekit.ClientInfo_GO, - }, - }) - waitUntilConnected(t, c1, c2) - defer func() { - c1.Stop() - c2.Stop() - }() - - codecs := []webrtc.RTPCodecCapability{ - {MimeType: mime.MimeTypeH264.String()}, - {MimeType: mime.MimeTypeOpus.String()}, - } - for _, codec := range codecs { - _, err := c1.AddStaticTrackWithCodec(codec, codec.MimeType, codec.MimeType) - require.NoError(t, err) - } - - require.Eventually(t, func() bool { - return len(c2.SubscribedTracks()[c1.ID()]) == len(codecs) - }, 5*time.Second, 10*time.Millisecond) - - var found int - for _, pubTrack := range c1.GetPublishedTrackIDs() { - t.Log("pub track", pubTrack) - tracks := c2.SubscribedTracks()[c1.ID()] - for _, track := range tracks { - t.Log("sub track", track.ID(), track.Codec()) - if track.ID() == pubTrack { - found++ - break - } - } - } - require.Equal(t, len(codecs), found) -} diff --git a/test/webhook_test.go b/test/webhook_test.go index e0469a4e0..a6ce1a8ef 100644 --- a/test/webhook_test.go +++ b/test/webhook_test.go @@ -45,78 +45,82 @@ func TestWebhooks(t *testing.T) { require.NoError(t, err) defer finish() - c1 := createRTCClient("c1", defaultServerPort, nil) - waitUntilConnected(t, c1) - testutils.WithTimeout(t, func() string { - if ts.GetEvent(webhook.EventRoomStarted) == nil { - return "did not receive RoomStarted" - } - if ts.GetEvent(webhook.EventParticipantJoined) == nil { - return "did not receive ParticipantJoined" - } - return "" - }) + for _, pv := range []types.ProtocolVersion{types.MaxProtocolDualPeerConnection, types.CurrentProtocol} { + t.Run(fmt.Sprintf("protocolVersion=%d", pv), func(t *testing.T) { + c1 := createRTCClient("c1", defaultServerPort, pv, nil) + waitUntilConnected(t, c1) + testutils.WithTimeout(t, func() string { + if ts.GetEvent(webhook.EventRoomStarted) == nil { + return "did not receive RoomStarted" + } + if ts.GetEvent(webhook.EventParticipantJoined) == nil { + return "did not receive ParticipantJoined" + } + return "" + }) - // first participant join should have started the room - started := ts.GetEvent(webhook.EventRoomStarted) - require.Equal(t, testRoom, started.Room.Name) - require.NotEmpty(t, started.Id) - require.Greater(t, started.CreatedAt, time.Now().Unix()-100) - require.GreaterOrEqual(t, time.Now().Unix(), started.CreatedAt) - joined := ts.GetEvent(webhook.EventParticipantJoined) - require.Equal(t, "c1", joined.Participant.Identity) - ts.ClearEvents() + // first participant join should have started the room + started := ts.GetEvent(webhook.EventRoomStarted) + require.Equal(t, testRoom, started.Room.Name) + require.NotEmpty(t, started.Id) + require.Greater(t, started.CreatedAt, time.Now().Unix()-100) + require.GreaterOrEqual(t, time.Now().Unix(), started.CreatedAt) + joined := ts.GetEvent(webhook.EventParticipantJoined) + require.Equal(t, "c1", joined.Participant.Identity) + ts.ClearEvents() - // another participant joins - c2 := createRTCClient("c2", defaultServerPort, nil) - waitUntilConnected(t, c2) - defer c2.Stop() - testutils.WithTimeout(t, func() string { - if ts.GetEvent(webhook.EventParticipantJoined) == nil { - return "did not receive ParticipantJoined" - } - return "" - }) - joined = ts.GetEvent(webhook.EventParticipantJoined) - require.Equal(t, "c2", joined.Participant.Identity) - ts.ClearEvents() + // another participant joins + c2 := createRTCClient("c2", defaultServerPort, pv, nil) + waitUntilConnected(t, c2) + defer c2.Stop() + testutils.WithTimeout(t, func() string { + if ts.GetEvent(webhook.EventParticipantJoined) == nil { + return "did not receive ParticipantJoined" + } + return "" + }) + joined = ts.GetEvent(webhook.EventParticipantJoined) + require.Equal(t, "c2", joined.Participant.Identity) + ts.ClearEvents() - // track published - writers := publishTracksForClients(t, c1) - defer stopWriters(writers...) - testutils.WithTimeout(t, func() string { - ev := ts.GetEvent(webhook.EventTrackPublished) - if ev == nil { - return "did not receive TrackPublished" - } - require.NotNil(t, ev.Track, "TrackPublished did not include trackInfo") - require.Equal(t, string(c1.ID()), ev.Participant.Sid) - return "" - }) - ts.ClearEvents() + // track published + writers := publishTracksForClients(t, c1) + defer stopWriters(writers...) + testutils.WithTimeout(t, func() string { + ev := ts.GetEvent(webhook.EventTrackPublished) + if ev == nil { + return "did not receive TrackPublished" + } + require.NotNil(t, ev.Track, "TrackPublished did not include trackInfo") + require.Equal(t, string(c1.ID()), ev.Participant.Sid) + return "" + }) + ts.ClearEvents() - // first participant leaves - c1.Stop() - testutils.WithTimeout(t, func() string { - if ts.GetEvent(webhook.EventParticipantLeft) == nil { - return "did not receive ParticipantLeft" - } - return "" - }) - left := ts.GetEvent(webhook.EventParticipantLeft) - require.Equal(t, "c1", left.Participant.Identity) - ts.ClearEvents() + // first participant leaves + c1.Stop() + testutils.WithTimeout(t, func() string { + if ts.GetEvent(webhook.EventParticipantLeft) == nil { + return "did not receive ParticipantLeft" + } + return "" + }) + left := ts.GetEvent(webhook.EventParticipantLeft) + require.Equal(t, "c1", left.Participant.Identity) + ts.ClearEvents() - // room closed - rm := server.RoomManager().GetRoom(context.Background(), testRoom) - rm.Close(types.ParticipantCloseReasonNone) - testutils.WithTimeout(t, func() string { - if ts.GetEvent(webhook.EventRoomFinished) == nil { - return "did not receive RoomFinished" - } - return "" - }) - require.Equal(t, testRoom, ts.GetEvent(webhook.EventRoomFinished).Room.Name) + // room closed + rm := server.RoomManager().GetRoom(context.Background(), testRoom) + rm.Close(types.ParticipantCloseReasonNone) + testutils.WithTimeout(t, func() string { + if ts.GetEvent(webhook.EventRoomFinished) == nil { + return "did not receive RoomFinished" + } + return "" + }) + require.Equal(t, testRoom, ts.GetEvent(webhook.EventRoomFinished).Room.Name) + }) + } } func setupServerWithWebhook() (server *service.LivekitServer, testServer *webhookTestServer, finishFunc func(), err error) {