mirror of
https://github.com/livekit/livekit.git
synced 2026-03-29 15:49:55 +00:00
1103 lines
32 KiB
Go
1103 lines
32 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package test
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"reflect"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pion/sdp/v3"
|
|
"github.com/pion/webrtc/v4"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/thoas/go-funk"
|
|
"github.com/twitchtv/twirp"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/livekit/protocol/auth"
|
|
"github.com/livekit/protocol/codecs/mime"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
|
|
"github.com/livekit/livekit-server/pkg/config"
|
|
"github.com/livekit/livekit-server/pkg/rtc"
|
|
"github.com/livekit/livekit-server/pkg/sfu/datachannel"
|
|
"github.com/livekit/livekit-server/pkg/testutils"
|
|
testclient "github.com/livekit/livekit-server/test/client"
|
|
)
|
|
|
|
const (
|
|
waitTick = 10 * time.Millisecond
|
|
waitTimeout = 5 * time.Second
|
|
)
|
|
|
|
func TestClientCouldConnect(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
|
|
_, finish := setupSingleNodeTest("TestClientCouldConnect")
|
|
defer finish()
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
|
|
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
|
|
waitUntilConnected(t, c1, c2)
|
|
|
|
// ensure they both see each other
|
|
testutils.WithTimeout(t, func() string {
|
|
if len(c1.RemoteParticipants()) == 0 {
|
|
return "c1 did not see c2"
|
|
}
|
|
if len(c2.RemoteParticipants()) == 0 {
|
|
return "c2 did not see c1"
|
|
}
|
|
return ""
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientConnectDuplicate(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
|
|
_, finish := setupSingleNodeTest("TestClientConnectDuplicate")
|
|
defer finish()
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom}
|
|
grant.SetCanPublish(true)
|
|
grant.SetCanSubscribe(true)
|
|
token := joinTokenWithGrant("c1", grant)
|
|
c1 := createRTCClientWithToken(token, defaultServerPort, testRTCServicePath, nil)
|
|
|
|
// publish 2 tracks
|
|
t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
|
|
require.NoError(t, err)
|
|
defer t1.Stop()
|
|
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
|
|
require.NoError(t, err)
|
|
defer t2.Stop()
|
|
|
|
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
|
|
waitUntilConnected(t, c1, c2)
|
|
|
|
opts := &testclient.Options{
|
|
Publish: "duplicate_connection",
|
|
}
|
|
testutils.WithTimeout(t, func() string {
|
|
if len(c2.SubscribedTracks()) == 0 {
|
|
return "c2 didn't subscribe to anything"
|
|
}
|
|
// should have received two tracks
|
|
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
|
|
return "c2 didn't subscribe to both tracks from c1"
|
|
}
|
|
|
|
// participant ID can be appended with '#..' . but should contain orig id as prefix
|
|
tr1 := c2.SubscribedTracks()[c1.ID()][0]
|
|
participantId1, _ := rtc.UnpackStreamID(tr1.StreamID())
|
|
require.Equal(t, c1.ID(), participantId1)
|
|
tr2 := c2.SubscribedTracks()[c1.ID()][1]
|
|
participantId2, _ := rtc.UnpackStreamID(tr2.StreamID())
|
|
require.Equal(t, c1.ID(), participantId2)
|
|
return ""
|
|
})
|
|
|
|
c1Dup := createRTCClientWithToken(token, defaultServerPort, testRTCServicePath, opts)
|
|
|
|
waitUntilConnected(t, c1Dup)
|
|
|
|
t3, err := c1Dup.AddStaticTrack("video/vp8", "video", "webcam")
|
|
require.NoError(t, err)
|
|
defer t3.Stop()
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if len(c2.SubscribedTracks()[c1Dup.ID()]) != 1 {
|
|
return "c2 was not subscribed to track from duplicated c1"
|
|
}
|
|
|
|
tr3 := c2.SubscribedTracks()[c1Dup.ID()][0]
|
|
participantId3, _ := rtc.UnpackStreamID(tr3.StreamID())
|
|
require.Contains(t, c1Dup.ID(), participantId3)
|
|
|
|
return ""
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSinglePublisher(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
|
|
s, finish := setupSingleNodeTest("TestSinglePublisher")
|
|
defer finish()
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
|
|
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
|
|
waitUntilConnected(t, c1, c2)
|
|
|
|
// publish an audio and video track and ensure clients receive it ok
|
|
t1, err := c1.AddStaticTrack("audio/OPUS", "audio", "webcamaudio")
|
|
require.NoError(t, err)
|
|
defer t1.Stop()
|
|
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcamvideo")
|
|
require.NoError(t, err)
|
|
defer t2.Stop()
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if len(c2.SubscribedTracks()) == 0 {
|
|
return "c2 was not subscribed to anything"
|
|
}
|
|
// should have received two tracks
|
|
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
|
|
return "c2 didn't subscribe to both tracks from c1"
|
|
}
|
|
|
|
tr1 := c2.SubscribedTracks()[c1.ID()][0]
|
|
participantId, _ := rtc.UnpackStreamID(tr1.StreamID())
|
|
require.Equal(t, c1.ID(), participantId)
|
|
return ""
|
|
})
|
|
// ensure mime type is received
|
|
remoteC1 := c2.GetRemoteParticipant(c1.ID())
|
|
audioTrack := funk.Find(remoteC1.Tracks, func(ti *livekit.TrackInfo) bool {
|
|
return ti.Name == "webcamaudio"
|
|
}).(*livekit.TrackInfo)
|
|
require.Equal(t, "audio/opus", audioTrack.MimeType)
|
|
|
|
// a new client joins and should get the initial stream
|
|
c3 := createRTCClient("c3", defaultServerPort, testRTCServicePath, nil)
|
|
|
|
// ensure that new client that has joined also received tracks
|
|
waitUntilConnected(t, c3)
|
|
testutils.WithTimeout(t, func() string {
|
|
if len(c3.SubscribedTracks()) == 0 {
|
|
return "c3 didn't subscribe to anything"
|
|
}
|
|
// should have received two tracks
|
|
if len(c3.SubscribedTracks()[c1.ID()]) != 2 {
|
|
return "c3 didn't subscribe to tracks from c1"
|
|
}
|
|
return ""
|
|
})
|
|
|
|
// ensure that the track ids are generated by server
|
|
tracks := c3.SubscribedTracks()[c1.ID()]
|
|
for _, tr := range tracks {
|
|
require.True(t, strings.HasPrefix(tr.ID(), "TR_"), "track should begin with TR")
|
|
}
|
|
|
|
// when c3 disconnects, ensure subscriber is cleaned up correctly
|
|
c3.Stop()
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
room := s.RoomManager().GetRoom(context.Background(), testRoom)
|
|
p := room.GetParticipant("c1")
|
|
require.NotNil(t, p)
|
|
|
|
for _, t := range p.GetPublishedTracks() {
|
|
if t.IsSubscriber(c3.ID()) {
|
|
return "c3 was not a subscriber of c1's tracks"
|
|
}
|
|
}
|
|
return ""
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_WhenAutoSubscriptionDisabled_ClientShouldNotReceiveAnyPublishedTracks(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
|
|
_, finish := setupSingleNodeTest("Test_WhenAutoSubscriptionDisabled_ClientShouldNotReceiveAnyPublishedTracks")
|
|
defer finish()
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
opts := testclient.Options{AutoSubscribe: false}
|
|
publisher := createRTCClient("publisher", defaultServerPort, testRTCServicePath, &opts)
|
|
client := createRTCClient("client", defaultServerPort, testRTCServicePath, &opts)
|
|
defer publisher.Stop()
|
|
defer client.Stop()
|
|
waitUntilConnected(t, publisher, client)
|
|
|
|
track, err := publisher.AddStaticTrack("audio/opus", "audio", "webcam")
|
|
require.NoError(t, err)
|
|
defer track.Stop()
|
|
|
|
time.Sleep(syncDelay)
|
|
|
|
require.Empty(t, client.SubscribedTracks()[publisher.ID()])
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_RenegotiationWithDifferentCodecs(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
|
|
_, finish := setupSingleNodeTest("TestRenegotiationWithDifferentCodecs")
|
|
defer finish()
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
|
|
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
|
|
waitUntilConnected(t, c1, c2)
|
|
|
|
// publish a vp8 video track and ensure clients receive it ok
|
|
t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
|
|
require.NoError(t, err)
|
|
defer t1.Stop()
|
|
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
|
|
require.NoError(t, err)
|
|
defer t2.Stop()
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if len(c2.SubscribedTracks()) == 0 {
|
|
return "c2 was not subscribed to anything"
|
|
}
|
|
// should have received two tracks
|
|
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
|
|
return "c2 was not subscribed to tracks from c1"
|
|
}
|
|
|
|
tracks := c2.SubscribedTracks()[c1.ID()]
|
|
for _, t := range tracks {
|
|
if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
|
|
return ""
|
|
|
|
}
|
|
}
|
|
return "did not receive track with vp8"
|
|
})
|
|
|
|
t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{
|
|
MimeType: "video/h264",
|
|
ClockRate: 90000,
|
|
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
|
|
}, "videoscreen", "screen")
|
|
defer t3.Stop()
|
|
require.NoError(t, err)
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if len(c2.SubscribedTracks()) == 0 {
|
|
return "c2's not subscribed to anything"
|
|
}
|
|
// should have received three tracks
|
|
if len(c2.SubscribedTracks()[c1.ID()]) != 3 {
|
|
return "c2's not subscribed to 3 tracks from c1"
|
|
}
|
|
|
|
var vp8Found, h264Found bool
|
|
tracks := c2.SubscribedTracks()[c1.ID()]
|
|
for _, t := range tracks {
|
|
if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
|
|
vp8Found = true
|
|
} else if mime.IsMimeTypeStringH264(t.Codec().MimeType) {
|
|
h264Found = true
|
|
}
|
|
}
|
|
if !vp8Found {
|
|
return "did not receive track with vp8"
|
|
}
|
|
if !h264Found {
|
|
return "did not receive track with h264"
|
|
}
|
|
return ""
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSingleNodeRoomList(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
_, finish := setupSingleNodeTest("TestSingleNodeRoomList")
|
|
defer finish()
|
|
|
|
roomServiceListRoom(t)
|
|
}
|
|
|
|
func TestSingleNodeUpdateParticipant(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
_, finish := setupSingleNodeTest("TestSingleNodeRoomList")
|
|
defer finish()
|
|
|
|
adminCtx := contextWithToken(adminRoomToken(testRoom))
|
|
t.Run("update nonexistent participant", func(t *testing.T) {
|
|
_, err := roomClient.UpdateParticipant(adminCtx, &livekit.UpdateParticipantRequest{
|
|
Room: testRoom,
|
|
Identity: "nonexistent",
|
|
Permission: &livekit.ParticipantPermission{
|
|
CanPublish: true,
|
|
},
|
|
})
|
|
require.Error(t, err)
|
|
var twErr twirp.Error
|
|
require.True(t, errors.As(err, &twErr))
|
|
require.Equal(t, twirp.NotFound, twErr.Code())
|
|
})
|
|
}
|
|
|
|
// Ensure that CORS headers are returned
|
|
func TestSingleNodeCORS(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
s, finish := setupSingleNodeTest("TestSingleNodeCORS")
|
|
defer finish()
|
|
|
|
req, err := http.NewRequest("POST", fmt.Sprintf("http://localhost:%d", s.HTTPPort()), nil)
|
|
require.NoError(t, err)
|
|
req.Header.Set("Authorization", "bearer xyz")
|
|
req.Header.Set("Origin", "testhost.com")
|
|
res, err := http.DefaultClient.Do(req)
|
|
require.NoError(t, err)
|
|
require.Equal(t, "testhost.com", res.Header.Get("Access-Control-Allow-Origin"))
|
|
}
|
|
|
|
func TestSingleNodeDoubleSlash(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
s, finish := setupSingleNodeTest("TestSingleNodeDoubleSlash")
|
|
defer finish()
|
|
// client contains trailing slash in URL, causing path to contain double //
|
|
// without our middleware, this would cause a 302 redirect
|
|
roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d/", s.HTTPPort()), &http.Client{})
|
|
_, err := roomClient.ListRooms(contextWithToken(listRoomToken()), &livekit.ListRoomsRequest{})
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func TestPingPong(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
_, finish := setupSingleNodeTest("TestPingPong")
|
|
defer finish()
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
|
|
waitUntilConnected(t, c1)
|
|
|
|
require.NoError(t, c1.SendPing())
|
|
require.Eventually(t, func() bool {
|
|
return c1.PongReceivedAt() > 0
|
|
}, time.Second, 10*time.Millisecond)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSingleNodeJoinAfterClose(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
|
|
_, finish := setupSingleNodeTest("TestJoinAfterClose")
|
|
defer finish()
|
|
|
|
scenarioJoinClosedRoom(t)
|
|
}
|
|
|
|
func TestSingleNodeCloseNonRTCRoom(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
|
|
_, finish := setupSingleNodeTest("closeNonRTCRoom")
|
|
defer finish()
|
|
|
|
closeNonRTCRoom(t)
|
|
}
|
|
|
|
func TestAutoCreate(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
disableAutoCreate := func(conf *config.Config) {
|
|
conf.Room.AutoCreate = false
|
|
}
|
|
t.Run("cannot join if room isn't created", func(t *testing.T) {
|
|
s := createSingleNodeServer(disableAutoCreate)
|
|
go func() {
|
|
if err := s.Start(); err != nil {
|
|
logger.Errorw("server returned error", err)
|
|
}
|
|
}()
|
|
defer s.Stop(true)
|
|
|
|
waitForServerToStart(s)
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
token := joinToken(testRoom, "start-before-create", nil)
|
|
opts := &testclient.Options{}
|
|
testRTCServicePathToTestClientOptions(testRTCServicePath, opts)
|
|
_, err := testclient.NewWebSocketConn(
|
|
fmt.Sprintf("ws://localhost:%d", defaultServerPort),
|
|
token,
|
|
opts,
|
|
)
|
|
require.Error(t, err)
|
|
|
|
// second join should also fail
|
|
token = joinToken(testRoom, "start-before-create-2", nil)
|
|
_, err = testclient.NewWebSocketConn(
|
|
fmt.Sprintf("ws://localhost:%d", defaultServerPort),
|
|
token,
|
|
opts,
|
|
)
|
|
require.Error(t, err)
|
|
})
|
|
}
|
|
})
|
|
|
|
t.Run("join with explicit createRoom", func(t *testing.T) {
|
|
s := createSingleNodeServer(disableAutoCreate)
|
|
go func() {
|
|
if err := s.Start(); err != nil {
|
|
logger.Errorw("server returned error", err)
|
|
}
|
|
}()
|
|
defer s.Stop(true)
|
|
|
|
waitForServerToStart(s)
|
|
|
|
// explicitly create
|
|
_, err := roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{Name: testRoom})
|
|
require.NoError(t, err)
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
c1 := createRTCClient("join-after-create", defaultServerPort, testRTCServicePath, nil)
|
|
waitUntilConnected(t, c1)
|
|
|
|
c1.Stop()
|
|
})
|
|
}
|
|
})
|
|
}
|
|
|
|
// don't give user subscribe permissions initially, and ensure autosubscribe is triggered afterwards
|
|
func TestSingleNodeUpdateSubscriptionPermissions(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
_, finish := setupSingleNodeTest("TestSingleNodeUpdateSubscriptionPermissions")
|
|
defer finish()
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
pub := createRTCClient("pub", defaultServerPort, testRTCServicePath, nil)
|
|
|
|
grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom}
|
|
grant.SetCanSubscribe(false)
|
|
at := auth.NewAccessToken(testApiKey, testApiSecret).
|
|
AddGrant(grant).
|
|
SetIdentity("sub")
|
|
token, err := at.ToJWT()
|
|
require.NoError(t, err)
|
|
sub := createRTCClientWithToken(token, defaultServerPort, testRTCServicePath, nil)
|
|
|
|
waitUntilConnected(t, pub, sub)
|
|
|
|
writers := publishTracksForClients(t, pub)
|
|
defer stopWriters(writers...)
|
|
|
|
// wait sub receives tracks
|
|
testutils.WithTimeout(t, func() string {
|
|
pubRemote := sub.GetRemoteParticipant(pub.ID())
|
|
if pubRemote == nil {
|
|
return "could not find remote publisher"
|
|
}
|
|
if len(pubRemote.Tracks) != 2 {
|
|
return "did not receive metadata for published tracks"
|
|
}
|
|
return ""
|
|
})
|
|
|
|
// set permissions out of band
|
|
ctx := contextWithToken(adminRoomToken(testRoom))
|
|
_, err = roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{
|
|
Room: testRoom,
|
|
Identity: "sub",
|
|
Permission: &livekit.ParticipantPermission{
|
|
CanSubscribe: true,
|
|
CanPublish: true,
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
tracks := sub.SubscribedTracks()[pub.ID()]
|
|
if len(tracks) == 2 {
|
|
return ""
|
|
} else {
|
|
return fmt.Sprintf("expected 2 tracks subscribed, actual: %d", len(tracks))
|
|
}
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSingleNodeAttributes(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
_, finish := setupSingleNodeTest("TestSingleNodeAttributes")
|
|
defer finish()
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
pub := createRTCClient("pub", defaultServerPort, testRTCServicePath, &testclient.Options{
|
|
Attributes: map[string]string{
|
|
"b": "2",
|
|
"c": "3",
|
|
},
|
|
TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) {
|
|
T := true
|
|
grants.CanUpdateOwnMetadata = &T
|
|
token.SetAttributes(map[string]string{
|
|
"a": "0",
|
|
"b": "1",
|
|
})
|
|
},
|
|
})
|
|
|
|
grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom}
|
|
grant.SetCanSubscribe(false)
|
|
at := auth.NewAccessToken(testApiKey, testApiSecret).
|
|
SetVideoGrant(grant).
|
|
SetIdentity("sub")
|
|
token, err := at.ToJWT()
|
|
require.NoError(t, err)
|
|
sub := createRTCClientWithToken(token, defaultServerPort, testRTCServicePath, nil)
|
|
|
|
waitUntilConnected(t, pub, sub)
|
|
|
|
// wait sub receives initial attributes
|
|
testutils.WithTimeout(t, func() string {
|
|
pubRemote := sub.GetRemoteParticipant(pub.ID())
|
|
if pubRemote == nil {
|
|
return "could not find remote publisher"
|
|
}
|
|
attrs := pubRemote.Attributes
|
|
if !reflect.DeepEqual(attrs, map[string]string{
|
|
"a": "0",
|
|
"b": "2",
|
|
"c": "3",
|
|
}) {
|
|
return fmt.Sprintf("did not receive expected attributes: %v", attrs)
|
|
}
|
|
return ""
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestDeviceCodecOverride checks that codecs that are incompatible with a device is not
|
|
// negotiated by the server
|
|
func TestDeviceCodecOverride(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
|
|
_, finish := setupSingleNodeTest("TestDeviceCodecOverride")
|
|
defer finish()
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
// simulate device that isn't compatible with H.264
|
|
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, &testclient.Options{
|
|
ClientInfo: &livekit.ClientInfo{
|
|
Os: "android",
|
|
DeviceModel: "Xiaomi 2201117TI",
|
|
},
|
|
})
|
|
defer c1.Stop()
|
|
waitUntilConnected(t, c1)
|
|
|
|
// it doesn't really matter what the codec set here is, uses default Pion MediaEngine codecs
|
|
tw, err := c1.AddStaticTrack("video/h264", "video", "webcam")
|
|
require.NoError(t, err)
|
|
defer stopWriters(tw)
|
|
|
|
var desc *sdp.MediaDescription
|
|
require.Eventually(t, func() bool {
|
|
lastAnswer := c1.LastAnswer()
|
|
if lastAnswer == nil {
|
|
return false
|
|
}
|
|
|
|
sd := webrtc.SessionDescription{
|
|
Type: webrtc.SDPTypeAnswer,
|
|
SDP: lastAnswer.SDP,
|
|
}
|
|
answer, err := sd.Unmarshal()
|
|
require.NoError(t, err)
|
|
|
|
// video and data channel
|
|
if len(answer.MediaDescriptions) < 2 {
|
|
return false
|
|
}
|
|
|
|
for _, md := range answer.MediaDescriptions {
|
|
if md.MediaName.Media == "video" {
|
|
desc = md
|
|
break
|
|
}
|
|
}
|
|
return desc != nil
|
|
}, waitTimeout, waitTick, "did not receive answer")
|
|
|
|
hasSeenVP8 := false
|
|
for _, a := range desc.Attributes {
|
|
if a.Key == "rtpmap" {
|
|
require.NotContains(t, a.Value, mime.MimeTypeCodecH264.String(), "should not contain H264 codec")
|
|
if strings.Contains(a.Value, mime.MimeTypeCodecVP8.String()) {
|
|
hasSeenVP8 = true
|
|
}
|
|
}
|
|
}
|
|
require.True(t, hasSeenVP8, "should have seen VP8 codec in SDP")
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSubscribeToCodecUnsupported(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
|
|
_, finish := setupSingleNodeTest("TestSubscribeToCodecUnsupported")
|
|
defer finish()
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
|
|
// create a client that doesn't support H264
|
|
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, &testclient.Options{
|
|
AutoSubscribe: true,
|
|
DisabledCodecs: []webrtc.RTPCodecCapability{
|
|
{MimeType: "video/H264"},
|
|
},
|
|
})
|
|
waitUntilConnected(t, c1, c2)
|
|
|
|
// publish a vp8 video track and ensure c2 receives it ok
|
|
t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
|
|
require.NoError(t, err)
|
|
defer t1.Stop()
|
|
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
|
|
require.NoError(t, err)
|
|
defer t2.Stop()
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if len(c2.SubscribedTracks()) == 0 {
|
|
return "c2 was not subscribed to anything"
|
|
}
|
|
// should have received two tracks
|
|
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
|
|
return "c2 was not subscribed to tracks from c1"
|
|
}
|
|
|
|
tracks := c2.SubscribedTracks()[c1.ID()]
|
|
for _, t := range tracks {
|
|
if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
|
|
return ""
|
|
}
|
|
}
|
|
return "did not receive track with vp8"
|
|
})
|
|
require.Nil(t, c2.GetSubscriptionResponseAndClear())
|
|
|
|
// publish a h264 track and ensure c2 got subscription error
|
|
t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{
|
|
MimeType: "video/h264",
|
|
ClockRate: 90000,
|
|
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
|
|
}, "videoscreen", "screen")
|
|
defer t3.Stop()
|
|
require.NoError(t, err)
|
|
|
|
var h264TrackID string
|
|
require.Eventually(t, func() bool {
|
|
remoteC1 := c2.GetRemoteParticipant(c1.ID())
|
|
require.NotNil(t, remoteC1)
|
|
for _, track := range remoteC1.Tracks {
|
|
if mime.IsMimeTypeStringH264(track.MimeType) {
|
|
h264TrackID = track.Sid
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}, time.Second, 10*time.Millisecond, "did not receive track info with h264")
|
|
|
|
require.Eventually(t, func() bool {
|
|
sr := c2.GetSubscriptionResponseAndClear()
|
|
if sr == nil {
|
|
return false
|
|
}
|
|
require.Equal(t, h264TrackID, sr.TrackSid)
|
|
require.Equal(t, livekit.SubscriptionError_SE_CODEC_UNSUPPORTED, sr.Err)
|
|
return true
|
|
}, 5*time.Second, 10*time.Millisecond, "did not receive subscription response")
|
|
|
|
// publish another vp8 track again, ensure the transport recovered by sfu and c2 can receive it
|
|
t4, err := c1.AddStaticTrack("video/vp8", "video2", "webcam2")
|
|
require.NoError(t, err)
|
|
defer t4.Stop()
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if len(c2.SubscribedTracks()) == 0 {
|
|
return "c2 was not subscribed to anything"
|
|
}
|
|
// should have received two tracks
|
|
if len(c2.SubscribedTracks()[c1.ID()]) != 3 {
|
|
return "c2 was not subscribed to tracks from c1"
|
|
}
|
|
|
|
var vp8Count int
|
|
tracks := c2.SubscribedTracks()[c1.ID()]
|
|
for _, t := range tracks {
|
|
if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
|
|
vp8Count++
|
|
}
|
|
}
|
|
if vp8Count == 2 {
|
|
return ""
|
|
}
|
|
return "did not 2 receive track with vp8"
|
|
})
|
|
require.Nil(t, c2.GetSubscriptionResponseAndClear())
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestDataPublishSlowSubscriber(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
|
|
dataChannelSlowThreshold := 21024
|
|
|
|
logger.Infow("----------------STARTING TEST----------------", "test", t.Name())
|
|
s := createSingleNodeServer(func(c *config.Config) {
|
|
c.RTC.DatachannelSlowThreshold = dataChannelSlowThreshold
|
|
})
|
|
go func() {
|
|
if err := s.Start(); err != nil {
|
|
logger.Errorw("server returned error", err)
|
|
}
|
|
}()
|
|
|
|
waitForServerToStart(s)
|
|
|
|
defer func() {
|
|
s.Stop(true)
|
|
logger.Infow("----------------FINISHING TEST----------------", "test", t.Name())
|
|
}()
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
pub := createRTCClient("pub", defaultServerPort, testRTCServicePath, nil)
|
|
fastSub := createRTCClient("fastSub", defaultServerPort, testRTCServicePath, nil)
|
|
slowSubNotDrop := createRTCClient("slowSubNotDrop", defaultServerPort, testRTCServicePath, nil)
|
|
slowSubDrop := createRTCClient("slowSubDrop", defaultServerPort, testRTCServicePath, nil)
|
|
waitUntilConnected(t, pub, fastSub, slowSubDrop, slowSubNotDrop)
|
|
defer func() {
|
|
pub.Stop()
|
|
fastSub.Stop()
|
|
slowSubNotDrop.Stop()
|
|
slowSubDrop.Stop()
|
|
}()
|
|
|
|
// no data should be dropped for fast subscriber
|
|
var fastDataIndex atomic.Uint64
|
|
fastSub.OnDataReceived = func(data []byte, sid string) {
|
|
idx := binary.BigEndian.Uint64(data[len(data)-8:])
|
|
require.Equal(t, fastDataIndex.Load()+1, idx)
|
|
fastDataIndex.Store(idx)
|
|
}
|
|
|
|
// no data should be dropped for slow subscriber that is above threshold
|
|
var slowNoDropDataIndex atomic.Uint64
|
|
var drainSlowSubNotDrop atomic.Bool
|
|
slowNoDropReader := testclient.NewDataChannelReader(dataChannelSlowThreshold * 2)
|
|
slowSubNotDrop.OnDataReceived = func(data []byte, sid string) {
|
|
idx := binary.BigEndian.Uint64(data[len(data)-8:])
|
|
require.Equal(t, slowNoDropDataIndex.Load()+1, idx)
|
|
slowNoDropDataIndex.Store(idx)
|
|
if !drainSlowSubNotDrop.Load() {
|
|
slowNoDropReader.Read(data, sid)
|
|
}
|
|
}
|
|
|
|
// data should be dropped for slow subscriber that is below threshold
|
|
var slowDropDataIndex atomic.Uint64
|
|
dropped := make(chan struct{})
|
|
slowDropReader := testclient.NewDataChannelReader(dataChannelSlowThreshold / 2)
|
|
slowSubDrop.OnDataReceived = func(data []byte, sid string) {
|
|
select {
|
|
case <-dropped:
|
|
return
|
|
default:
|
|
}
|
|
idx := binary.BigEndian.Uint64(data[len(data)-8:])
|
|
if idx != slowDropDataIndex.Load()+1 {
|
|
close(dropped)
|
|
}
|
|
slowDropDataIndex.Store(idx)
|
|
slowDropReader.Read(data, sid)
|
|
}
|
|
|
|
// publisher sends data as fast as possible, it will block by the slowest subscriber above the slow threshold
|
|
var (
|
|
blocked atomic.Bool
|
|
stopWrite atomic.Bool
|
|
writeIdx atomic.Uint64
|
|
)
|
|
writeStopped := make(chan struct{})
|
|
go func() {
|
|
defer close(writeStopped)
|
|
var i int
|
|
buf := make([]byte, 100)
|
|
for !stopWrite.Load() {
|
|
i++
|
|
binary.BigEndian.PutUint64(buf[len(buf)-8:], uint64(i))
|
|
if err := pub.PublishData(buf, livekit.DataPacket_RELIABLE); err != nil {
|
|
if errors.Is(err, datachannel.ErrDataDroppedBySlowReader) {
|
|
blocked.Store(true)
|
|
i--
|
|
continue
|
|
} else {
|
|
t.Log("error writing", err)
|
|
break
|
|
}
|
|
}
|
|
writeIdx.Store(uint64(i))
|
|
}
|
|
}()
|
|
|
|
<-dropped
|
|
|
|
time.Sleep(time.Second)
|
|
blocked.Store(false)
|
|
require.Eventually(t, func() bool { return blocked.Load() }, 30*time.Second, 100*time.Millisecond)
|
|
stopWrite.Store(true)
|
|
<-writeStopped
|
|
drainSlowSubNotDrop.Store(true)
|
|
require.Eventually(t, func() bool {
|
|
return writeIdx.Load() == fastDataIndex.Load() &&
|
|
writeIdx.Load() == slowNoDropDataIndex.Load()
|
|
}, 10*time.Second, 50*time.Millisecond, "writeIdx %d, fast %d, slowNoDrop %d", writeIdx.Load(), fastDataIndex.Load(), slowNoDropDataIndex.Load())
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFireTrackBySdp(t *testing.T) {
|
|
_, finish := setupSingleNodeTest("TestFireTrackBySdp")
|
|
defer finish()
|
|
|
|
var cases = []struct {
|
|
name string
|
|
codecs []webrtc.RTPCodecCapability
|
|
pubSDK livekit.ClientInfo_SDK
|
|
}{
|
|
{
|
|
name: "js client could pub a/v tracks",
|
|
codecs: []webrtc.RTPCodecCapability{
|
|
{MimeType: mime.MimeTypeH264.String()},
|
|
{MimeType: mime.MimeTypeOpus.String()},
|
|
},
|
|
pubSDK: livekit.ClientInfo_JS,
|
|
},
|
|
{
|
|
name: "go client could pub audio tracks",
|
|
codecs: []webrtc.RTPCodecCapability{
|
|
{MimeType: "audio/opus"},
|
|
},
|
|
pubSDK: livekit.ClientInfo_GO,
|
|
},
|
|
}
|
|
|
|
for _, c := range cases {
|
|
codecs, sdk := c.codecs, c.pubSDK
|
|
t.Run(c.name, func(t *testing.T) {
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
c1 := createRTCClient(c.name+"_c1", defaultServerPort, testRTCServicePath, &testclient.Options{
|
|
ClientInfo: &livekit.ClientInfo{
|
|
Sdk: sdk,
|
|
},
|
|
})
|
|
c2 := createRTCClient(c.name+"_c2", defaultServerPort, testRTCServicePath, &testclient.Options{
|
|
AutoSubscribe: true,
|
|
ClientInfo: &livekit.ClientInfo{
|
|
Sdk: livekit.ClientInfo_JS,
|
|
},
|
|
})
|
|
waitUntilConnected(t, c1, c2)
|
|
defer func() {
|
|
c1.Stop()
|
|
c2.Stop()
|
|
}()
|
|
|
|
// publish tracks and don't write any packets
|
|
for _, codec := range codecs {
|
|
_, err := c1.AddStaticTrackWithCodec(codec, codec.MimeType, codec.MimeType, testclient.AddTrackNoWriter())
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
require.Eventually(t, func() bool {
|
|
return len(c2.SubscribedTracks()[c1.ID()]) == len(codecs)
|
|
}, 5*time.Second, 10*time.Millisecond)
|
|
|
|
var found int
|
|
for _, pubTrack := range c1.GetPublishedTrackIDs() {
|
|
t.Log("pub track", pubTrack)
|
|
tracks := c2.SubscribedTracks()[c1.ID()]
|
|
for _, track := range tracks {
|
|
t.Log("sub track", track.ID(), track.Codec())
|
|
if track.Codec().PayloadType == 0 && track.ID() == pubTrack {
|
|
found++
|
|
break
|
|
}
|
|
}
|
|
}
|
|
require.Equal(t, len(codecs), found)
|
|
})
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSinglePublisherDataTrack(t *testing.T) {
|
|
if testing.Short() {
|
|
t.SkipNow()
|
|
return
|
|
}
|
|
|
|
s, finish := setupSingleNodeTest("TestSinglePublisherDataTrack")
|
|
defer finish()
|
|
|
|
for _, testRTCServicePath := range testRTCServicePaths {
|
|
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
|
|
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
|
|
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
|
|
waitUntilConnected(t, c1, c2)
|
|
|
|
// publish a couple of data tracks and ensure clients receive it ok
|
|
dt1, err := c1.PublishDataTrack()
|
|
require.NoError(t, err)
|
|
defer dt1.Stop()
|
|
|
|
dt2, err := c1.PublishDataTrack()
|
|
require.NoError(t, err)
|
|
defer dt2.Stop()
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
if len(c2.SubscribedDataTracks()) == 0 {
|
|
return "c2 was not subscribed to any data tracks"
|
|
}
|
|
// should have received two data tracks
|
|
if len(c2.SubscribedDataTracks()[c1.ID()]) != 2 {
|
|
return "c2 didn't subscribe to both data tracks from c1"
|
|
}
|
|
return ""
|
|
})
|
|
|
|
// a new client joins and should get the initial stream
|
|
c3 := createRTCClient("c3", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
|
|
|
|
// ensure that new client that has joined also received data tracks
|
|
waitUntilConnected(t, c3)
|
|
testutils.WithTimeout(t, func() string {
|
|
if len(c3.SubscribedDataTracks()) == 0 {
|
|
return "c3 didn't subscribe to any data tracks"
|
|
}
|
|
// should have received two data tracks
|
|
if len(c3.SubscribedDataTracks()[c1.ID()]) != 2 {
|
|
return "c3 didn't subscribe to tracks from c1"
|
|
}
|
|
return ""
|
|
})
|
|
|
|
// ensure that the data track ids are generated by server
|
|
tracks := c3.SubscribedDataTracks()[c1.ID()]
|
|
for _, tr := range tracks {
|
|
require.True(t, strings.HasPrefix(string(tr.ID()), "DTR_"), "data track should begin with DTR")
|
|
}
|
|
|
|
// when c3 disconnects, ensure subscriber is cleaned up correctly
|
|
c3.Stop()
|
|
|
|
testutils.WithTimeout(t, func() string {
|
|
room := s.RoomManager().GetRoom(context.Background(), testRoom)
|
|
p := room.GetParticipant("c1")
|
|
require.NotNil(t, p)
|
|
|
|
for _, t := range p.GetPublishedDataTracks() {
|
|
if t.IsSubscriber(c3.ID()) {
|
|
return "c3 was not a subscriber of c1's data tracks"
|
|
}
|
|
}
|
|
return ""
|
|
})
|
|
})
|
|
}
|
|
}
|