Files
livekit/test/singlenode_test.go
T
Raja Subramanian 1ab1e072d1 test: verify upstream and downstream connection stats end-to-end (#4508)
* test: verify upstream and downstream connection stats

Adds TestConnectionStats integration test where two clients connect,
each publishes audio + video, and the test asserts that both
publisher-side (LocalMediaTrack.GetTrackStats) and subscriber-side
(DownTrack.GetTrackStats) report non-zero packets and bytes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* sfu: add DownTrack.OnStatsUpdate hook and use it in stats test

Adds a public OnStatsUpdate setter on DownTrack mirroring the existing
pattern on WebRTCReceiver. The new callback fires alongside the
configured DownTrackListener (production path is unaffected) and is
intended for tests/observers to validate the AnalyticsStat data flowing
through the listener.

Augments TestConnectionStats to:
- hook WebRTCReceiver.OnStatsUpdate for each published track and assert
  the captured AnalyticsStat has non-zero packets/bytes (upstream).
- hook the new DownTrack.OnStatsUpdate for each subscribed track and
  make the same assertion (downstream).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 14:52:30 +05:30

1352 lines
40 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"context"
"encoding/binary"
"errors"
"fmt"
"net/http"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v4"
"github.com/stretchr/testify/require"
"github.com/thoas/go-funk"
"github.com/twitchtv/twirp"
"go.uber.org/atomic"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/codecs/mime"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/datachannel"
"github.com/livekit/livekit-server/pkg/testutils"
testclient "github.com/livekit/livekit-server/test/client"
)
const (
waitTick = 10 * time.Millisecond
waitTimeout = 5 * time.Second
)
func TestClientCouldConnect(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestClientCouldConnect")
defer finish()
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
waitUntilConnected(t, c1, c2)
// ensure they both see each other
testutils.WithTimeout(t, func() string {
if len(c1.RemoteParticipants()) == 0 {
return "c1 did not see c2"
}
if len(c2.RemoteParticipants()) == 0 {
return "c2 did not see c1"
}
return ""
})
})
}
}
func TestClientConnectDuplicate(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestClientConnectDuplicate")
defer finish()
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom}
grant.SetCanPublish(true)
grant.SetCanSubscribe(true)
token := joinTokenWithGrant("c1", grant)
c1 := createRTCClientWithToken(token, defaultServerPort, testRTCServicePath, nil)
// publish 2 tracks
t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
require.NoError(t, err)
defer t1.Stop()
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
require.NoError(t, err)
defer t2.Stop()
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
waitUntilConnected(t, c1, c2)
opts := &testclient.Options{
Publish: "duplicate_connection",
}
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2 didn't subscribe to anything"
}
// should have received two tracks
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
return "c2 didn't subscribe to both tracks from c1"
}
// participant ID can be appended with '#..' . but should contain orig id as prefix
tr1 := c2.SubscribedTracks()[c1.ID()][0]
participantId1, _ := rtc.UnpackStreamID(tr1.StreamID())
require.Equal(t, c1.ID(), participantId1)
tr2 := c2.SubscribedTracks()[c1.ID()][1]
participantId2, _ := rtc.UnpackStreamID(tr2.StreamID())
require.Equal(t, c1.ID(), participantId2)
return ""
})
c1Dup := createRTCClientWithToken(token, defaultServerPort, testRTCServicePath, opts)
waitUntilConnected(t, c1Dup)
t3, err := c1Dup.AddStaticTrack("video/vp8", "video", "webcam")
require.NoError(t, err)
defer t3.Stop()
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()[c1Dup.ID()]) != 1 {
return "c2 was not subscribed to track from duplicated c1"
}
tr3 := c2.SubscribedTracks()[c1Dup.ID()][0]
participantId3, _ := rtc.UnpackStreamID(tr3.StreamID())
require.Contains(t, c1Dup.ID(), participantId3)
return ""
})
})
}
}
func TestSinglePublisher(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
s, finish := setupSingleNodeTest("TestSinglePublisher")
defer finish()
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
waitUntilConnected(t, c1, c2)
// publish an audio and video track and ensure clients receive it ok
t1, err := c1.AddStaticTrack("audio/OPUS", "audio", "webcamaudio")
require.NoError(t, err)
defer t1.Stop()
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcamvideo")
require.NoError(t, err)
defer t2.Stop()
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2 was not subscribed to anything"
}
// should have received two tracks
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
return "c2 didn't subscribe to both tracks from c1"
}
tr1 := c2.SubscribedTracks()[c1.ID()][0]
participantId, _ := rtc.UnpackStreamID(tr1.StreamID())
require.Equal(t, c1.ID(), participantId)
return ""
})
// ensure mime type is received
remoteC1 := c2.GetRemoteParticipant(c1.ID())
audioTrack := funk.Find(remoteC1.Tracks, func(ti *livekit.TrackInfo) bool {
return ti.Name == "webcamaudio"
}).(*livekit.TrackInfo)
require.Equal(t, "audio/opus", audioTrack.MimeType)
// a new client joins and should get the initial stream
c3 := createRTCClient("c3", defaultServerPort, testRTCServicePath, nil)
// ensure that new client that has joined also received tracks
waitUntilConnected(t, c3)
testutils.WithTimeout(t, func() string {
if len(c3.SubscribedTracks()) == 0 {
return "c3 didn't subscribe to anything"
}
// should have received two tracks
if len(c3.SubscribedTracks()[c1.ID()]) != 2 {
return "c3 didn't subscribe to tracks from c1"
}
return ""
})
// ensure that the track ids are generated by server
tracks := c3.SubscribedTracks()[c1.ID()]
for _, tr := range tracks {
require.True(t, strings.HasPrefix(tr.ID(), "TR_"), "track should begin with TR")
}
// when c3 disconnects, ensure subscriber is cleaned up correctly
c3.Stop()
testutils.WithTimeout(t, func() string {
room := s.RoomManager().GetRoom(context.Background(), testRoom)
p := room.GetParticipant("c1")
require.NotNil(t, p)
for _, t := range p.GetPublishedTracks() {
if t.IsSubscriber(c3.ID()) {
return "c3 was not a subscriber of c1's tracks"
}
}
return ""
})
})
}
}
func TestConnectionStats(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
s, finish := setupSingleNodeTest("TestConnectionStats")
defer finish()
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
waitUntilConnected(t, c1, c2)
defer func() {
c1.Stop()
c2.Stop()
}()
// both clients publish audio + video
t1, err := c1.AddStaticTrack("audio/opus", "audio", "c1audio")
require.NoError(t, err)
defer t1.Stop()
t2, err := c1.AddStaticTrack("video/vp8", "video", "c1video")
require.NoError(t, err)
defer t2.Stop()
t3, err := c2.AddStaticTrack("audio/opus", "audio", "c2audio")
require.NoError(t, err)
defer t3.Stop()
t4, err := c2.AddStaticTrack("video/vp8", "video", "c2video")
require.NoError(t, err)
defer t4.Stop()
// wait for cross-subscriptions: each client should receive 2 tracks from the other
testutils.WithTimeout(t, func() string {
if len(c1.SubscribedTracks()[c2.ID()]) != 2 {
return "c1 did not subscribe to both tracks from c2"
}
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
return "c2 did not subscribe to both tracks from c1"
}
return ""
})
room := s.RoomManager().GetRoom(context.Background(), testRoom)
require.NotNil(t, room)
// hook the upstream WebRTCReceiver.OnStatsUpdate and downstream DownTrack.OnStatsUpdate
// callbacks so we can verify the AnalyticsStat delivered through each carries valid
// delta data. MediaTrack.Receivers() returns one entry per potential codec; only those
// matching the actually published codec are *sfu.WebRTCReceiver, the rest are
// placeholder *rtc.DummyReceiver instances that we skip.
type statCapture struct {
lock sync.Mutex
stat *livekit.AnalyticsStat
}
receiverCaptures := make(map[livekit.TrackID]*statCapture)
downTrackCaptures := make(map[livekit.ParticipantIdentity]map[livekit.TrackID]*statCapture)
for _, identity := range []livekit.ParticipantIdentity{"c1", "c2"} {
p := room.GetParticipant(identity)
require.NotNil(t, p, "participant %s not found", identity)
for _, mt := range p.GetPublishedTracks() {
rc := &statCapture{}
receiverCaptures[mt.ID()] = rc
var hooked int
for _, r := range mt.Receivers() {
if dr, ok := r.(*rtc.DummyReceiver); ok {
underlying := dr.Receiver()
if underlying == nil {
continue
}
r = underlying
}
wr, ok := r.(*sfu.WebRTCReceiver)
if !ok {
continue
}
wr.OnStatsUpdate(func(_ *sfu.WebRTCReceiver, stat *livekit.AnalyticsStat) {
rc.lock.Lock()
rc.stat = stat
rc.lock.Unlock()
})
hooked++
}
require.Greater(t, hooked, 0, "no live WebRTCReceiver found for published track %s", mt.ID())
}
dtCaps := make(map[livekit.TrackID]*statCapture)
downTrackCaptures[identity] = dtCaps
for _, st := range p.GetSubscribedTracks() {
dt := st.DownTrack()
require.NotNil(t, dt, "subscribed track %s has no DownTrack", st.ID())
dc := &statCapture{}
dtCaps[st.ID()] = dc
dt.OnStatsUpdate(func(_ *sfu.DownTrack, stat *livekit.AnalyticsStat) {
dc.lock.Lock()
dc.stat = stat
dc.lock.Unlock()
})
}
}
validateAnalyticsStat := func(stat *livekit.AnalyticsStat) string {
if stat == nil {
return "stat nil"
}
if len(stat.Streams) == 0 {
return "stat has no streams"
}
var totalPackets uint32
var totalBytes uint64
for _, s := range stat.Streams {
totalPackets += s.PrimaryPackets
totalBytes += s.PrimaryBytes
}
if totalPackets == 0 {
return "stat has no packets across streams"
}
if totalBytes == 0 {
return "stat has no bytes across streams"
}
return ""
}
// wait for cumulative + delta + OnStatsUpdate-derived stats. the
// connection-quality update interval is 5s, so allow plenty of time for
// the receiver OnStatsUpdate callback to fire at least once and for
// the downstream connection-quality scorer to compute a real score.
testutils.WithTimeout(t, func() string {
for _, identity := range []livekit.ParticipantIdentity{"c1", "c2"} {
p := room.GetParticipant(identity)
if p == nil {
return fmt.Sprintf("participant %s not found", identity)
}
// upstream (publisher) cumulative stats
published := p.GetPublishedTracks()
if len(published) != 2 {
return fmt.Sprintf("%s expected 2 published tracks, got %d", identity, len(published))
}
for _, mt := range published {
lmt, ok := mt.(types.LocalMediaTrack)
if !ok {
return fmt.Sprintf("%s published track %s is not a LocalMediaTrack", identity, mt.ID())
}
stats := lmt.GetTrackStats()
if stats == nil {
return fmt.Sprintf("%s upstream cumulative stats nil for track %s", identity, mt.ID())
}
if stats.Packets == 0 {
return fmt.Sprintf("%s upstream cumulative stats has no packets for track %s", identity, mt.ID())
}
if stats.Bytes == 0 {
return fmt.Sprintf("%s upstream cumulative stats has no bytes for track %s", identity, mt.ID())
}
// upstream delta stats fed into the receiver OnStatsUpdate path
rc, ok := receiverCaptures[mt.ID()]
if !ok {
return fmt.Sprintf("%s missing receiver capture for track %s", identity, mt.ID())
}
rc.lock.Lock()
stat := rc.stat
rc.lock.Unlock()
if msg := validateAnalyticsStat(stat); msg != "" {
return fmt.Sprintf("%s upstream OnStatsUpdate %s for track %s", identity, msg, mt.ID())
}
}
// downstream (subscriber) cumulative stats and DownTrack OnStatsUpdate
// delta stats captured from the listener path
subscribed := p.GetSubscribedTracks()
if len(subscribed) != 2 {
return fmt.Sprintf("%s expected 2 subscribed tracks, got %d", identity, len(subscribed))
}
for _, st := range subscribed {
dt := st.DownTrack()
if dt == nil {
return fmt.Sprintf("%s subscribed track %s has no DownTrack", identity, st.ID())
}
stats := dt.GetTrackStats()
if stats == nil {
return fmt.Sprintf("%s downstream cumulative stats nil for track %s", identity, st.ID())
}
if stats.Packets == 0 {
return fmt.Sprintf("%s downstream cumulative stats has no packets for track %s", identity, st.ID())
}
if stats.Bytes == 0 {
return fmt.Sprintf("%s downstream cumulative stats has no bytes for track %s", identity, st.ID())
}
// downstream delta stats fed into the DownTrack OnStatsUpdate path
dc, ok := downTrackCaptures[identity][st.ID()]
if !ok {
return fmt.Sprintf("%s missing DownTrack capture for track %s", identity, st.ID())
}
dc.lock.Lock()
stat := dc.stat
dc.lock.Unlock()
if msg := validateAnalyticsStat(stat); msg != "" {
return fmt.Sprintf("%s downstream OnStatsUpdate %s for track %s", identity, msg, st.ID())
}
}
}
return ""
}, 15*time.Second)
})
}
}
func Test_WhenAutoSubscriptionDisabled_ClientShouldNotReceiveAnyPublishedTracks(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("Test_WhenAutoSubscriptionDisabled_ClientShouldNotReceiveAnyPublishedTracks")
defer finish()
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
opts := testclient.Options{AutoSubscribe: false}
publisher := createRTCClient("publisher", defaultServerPort, testRTCServicePath, &opts)
client := createRTCClient("client", defaultServerPort, testRTCServicePath, &opts)
defer publisher.Stop()
defer client.Stop()
waitUntilConnected(t, publisher, client)
track, err := publisher.AddStaticTrack("audio/opus", "audio", "webcam")
require.NoError(t, err)
defer track.Stop()
time.Sleep(syncDelay)
require.Empty(t, client.SubscribedTracks()[publisher.ID()])
})
}
}
func Test_RenegotiationWithDifferentCodecs(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestRenegotiationWithDifferentCodecs")
defer finish()
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
waitUntilConnected(t, c1, c2)
// publish a vp8 video track and ensure clients receive it ok
t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
require.NoError(t, err)
defer t1.Stop()
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
require.NoError(t, err)
defer t2.Stop()
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2 was not subscribed to anything"
}
// should have received two tracks
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
return "c2 was not subscribed to tracks from c1"
}
tracks := c2.SubscribedTracks()[c1.ID()]
for _, t := range tracks {
if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
return ""
}
}
return "did not receive track with vp8"
})
t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{
MimeType: "video/h264",
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
}, "videoscreen", "screen")
defer t3.Stop()
require.NoError(t, err)
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2's not subscribed to anything"
}
// should have received three tracks
if len(c2.SubscribedTracks()[c1.ID()]) != 3 {
return "c2's not subscribed to 3 tracks from c1"
}
var vp8Found, h264Found bool
tracks := c2.SubscribedTracks()[c1.ID()]
for _, t := range tracks {
if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
vp8Found = true
} else if mime.IsMimeTypeStringH264(t.Codec().MimeType) {
h264Found = true
}
}
if !vp8Found {
return "did not receive track with vp8"
}
if !h264Found {
return "did not receive track with h264"
}
return ""
})
})
}
}
func TestSingleNodeRoomList(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestSingleNodeRoomList")
defer finish()
roomServiceListRoom(t)
}
func TestSingleNodeUpdateParticipant(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestSingleNodeRoomList")
defer finish()
adminCtx := contextWithToken(adminRoomToken(testRoom))
t.Run("update nonexistent participant", func(t *testing.T) {
_, err := roomClient.UpdateParticipant(adminCtx, &livekit.UpdateParticipantRequest{
Room: testRoom,
Identity: "nonexistent",
Permission: &livekit.ParticipantPermission{
CanPublish: true,
},
})
require.Error(t, err)
var twErr twirp.Error
require.True(t, errors.As(err, &twErr))
require.Equal(t, twirp.NotFound, twErr.Code())
})
}
// Ensure that CORS headers are returned
func TestSingleNodeCORS(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
s, finish := setupSingleNodeTest("TestSingleNodeCORS")
defer finish()
req, err := http.NewRequest("POST", fmt.Sprintf("http://localhost:%d", s.HTTPPort()), nil)
require.NoError(t, err)
req.Header.Set("Authorization", "bearer xyz")
req.Header.Set("Origin", "testhost.com")
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, "testhost.com", res.Header.Get("Access-Control-Allow-Origin"))
}
func TestSingleNodeDoubleSlash(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
s, finish := setupSingleNodeTest("TestSingleNodeDoubleSlash")
defer finish()
// client contains trailing slash in URL, causing path to contain double //
// without our middleware, this would cause a 302 redirect
roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d/", s.HTTPPort()), &http.Client{})
_, err := roomClient.ListRooms(contextWithToken(listRoomToken()), &livekit.ListRoomsRequest{})
require.NoError(t, err)
}
func TestPingPong(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestPingPong")
defer finish()
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
waitUntilConnected(t, c1)
require.NoError(t, c1.SendPing())
require.Eventually(t, func() bool {
return c1.PongReceivedAt() > 0
}, time.Second, 10*time.Millisecond)
})
}
}
func TestSingleNodeJoinAfterClose(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestJoinAfterClose")
defer finish()
scenarioJoinClosedRoom(t)
}
func TestSingleNodeCloseNonRTCRoom(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("closeNonRTCRoom")
defer finish()
closeNonRTCRoom(t)
}
func TestAutoCreate(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
disableAutoCreate := func(conf *config.Config) {
conf.Room.AutoCreate = false
}
t.Run("cannot join if room isn't created", func(t *testing.T) {
s := createSingleNodeServer(disableAutoCreate)
go func() {
if err := s.Start(); err != nil {
logger.Errorw("server returned error", err)
}
}()
defer s.Stop(true)
waitForServerToStart(s)
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
token := joinToken(testRoom, "start-before-create", nil)
opts := &testclient.Options{}
testRTCServicePathToTestClientOptions(testRTCServicePath, opts)
_, err := testclient.NewWebSocketConn(
fmt.Sprintf("ws://localhost:%d", defaultServerPort),
token,
opts,
)
require.Error(t, err)
// second join should also fail
token = joinToken(testRoom, "start-before-create-2", nil)
_, err = testclient.NewWebSocketConn(
fmt.Sprintf("ws://localhost:%d", defaultServerPort),
token,
opts,
)
require.Error(t, err)
})
}
})
t.Run("join with explicit createRoom", func(t *testing.T) {
s := createSingleNodeServer(disableAutoCreate)
go func() {
if err := s.Start(); err != nil {
logger.Errorw("server returned error", err)
}
}()
defer s.Stop(true)
waitForServerToStart(s)
// explicitly create
_, err := roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{Name: testRoom})
require.NoError(t, err)
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
c1 := createRTCClient("join-after-create", defaultServerPort, testRTCServicePath, nil)
waitUntilConnected(t, c1)
c1.Stop()
})
}
})
}
// don't give user subscribe permissions initially, and ensure autosubscribe is triggered afterwards
func TestSingleNodeUpdateSubscriptionPermissions(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestSingleNodeUpdateSubscriptionPermissions")
defer finish()
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
pub := createRTCClient("pub", defaultServerPort, testRTCServicePath, nil)
grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom}
grant.SetCanSubscribe(false)
at := auth.NewAccessToken(testApiKey, testApiSecret).
AddGrant(grant).
SetIdentity("sub")
token, err := at.ToJWT()
require.NoError(t, err)
sub := createRTCClientWithToken(token, defaultServerPort, testRTCServicePath, nil)
waitUntilConnected(t, pub, sub)
writers := publishTracksForClients(t, pub)
defer stopWriters(writers...)
// wait sub receives tracks
testutils.WithTimeout(t, func() string {
pubRemote := sub.GetRemoteParticipant(pub.ID())
if pubRemote == nil {
return "could not find remote publisher"
}
if len(pubRemote.Tracks) != 2 {
return "did not receive metadata for published tracks"
}
return ""
})
// set permissions out of band
ctx := contextWithToken(adminRoomToken(testRoom))
_, err = roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{
Room: testRoom,
Identity: "sub",
Permission: &livekit.ParticipantPermission{
CanSubscribe: true,
CanPublish: true,
},
})
require.NoError(t, err)
testutils.WithTimeout(t, func() string {
tracks := sub.SubscribedTracks()[pub.ID()]
if len(tracks) == 2 {
return ""
} else {
return fmt.Sprintf("expected 2 tracks subscribed, actual: %d", len(tracks))
}
})
})
}
}
func TestSingleNodeAttributes(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestSingleNodeAttributes")
defer finish()
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
pub := createRTCClient("pub", defaultServerPort, testRTCServicePath, &testclient.Options{
Attributes: map[string]string{
"b": "2",
"c": "3",
},
TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) {
T := true
grants.CanUpdateOwnMetadata = &T
token.SetAttributes(map[string]string{
"a": "0",
"b": "1",
})
},
})
grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom}
grant.SetCanSubscribe(false)
at := auth.NewAccessToken(testApiKey, testApiSecret).
SetVideoGrant(grant).
SetIdentity("sub")
token, err := at.ToJWT()
require.NoError(t, err)
sub := createRTCClientWithToken(token, defaultServerPort, testRTCServicePath, nil)
waitUntilConnected(t, pub, sub)
// wait sub receives initial attributes
testutils.WithTimeout(t, func() string {
pubRemote := sub.GetRemoteParticipant(pub.ID())
if pubRemote == nil {
return "could not find remote publisher"
}
attrs := pubRemote.Attributes
if !reflect.DeepEqual(attrs, map[string]string{
"a": "0",
"b": "2",
"c": "3",
}) {
return fmt.Sprintf("did not receive expected attributes: %v", attrs)
}
return ""
})
})
}
}
// TestDeviceCodecOverride checks that codecs that are incompatible with a device is not
// negotiated by the server
func TestDeviceCodecOverride(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestDeviceCodecOverride")
defer finish()
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
// simulate device that isn't compatible with H.264
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, &testclient.Options{
ClientInfo: &livekit.ClientInfo{
Os: "android",
DeviceModel: "Xiaomi 2201117TI",
},
})
defer c1.Stop()
waitUntilConnected(t, c1)
// it doesn't really matter what the codec set here is, uses default Pion MediaEngine codecs
tw, err := c1.AddStaticTrack("video/h264", "video", "webcam")
require.NoError(t, err)
defer stopWriters(tw)
var desc *sdp.MediaDescription
require.Eventually(t, func() bool {
lastAnswer := c1.LastAnswer()
if lastAnswer == nil {
return false
}
sd := webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: lastAnswer.SDP,
}
answer, err := sd.Unmarshal()
require.NoError(t, err)
// video and data channel
if len(answer.MediaDescriptions) < 2 {
return false
}
for _, md := range answer.MediaDescriptions {
if md.MediaName.Media == "video" {
desc = md
break
}
}
return desc != nil
}, waitTimeout, waitTick, "did not receive answer")
hasSeenVP8 := false
for _, a := range desc.Attributes {
if a.Key == "rtpmap" {
require.NotContains(t, a.Value, mime.MimeTypeCodecH264.String(), "should not contain H264 codec")
if strings.Contains(a.Value, mime.MimeTypeCodecVP8.String()) {
hasSeenVP8 = true
}
}
}
require.True(t, hasSeenVP8, "should have seen VP8 codec in SDP")
})
}
}
func TestSubscribeToCodecUnsupported(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestSubscribeToCodecUnsupported")
defer finish()
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
// create a client that doesn't support H264
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, &testclient.Options{
AutoSubscribe: true,
DisabledCodecs: []webrtc.RTPCodecCapability{
{MimeType: "video/H264"},
},
})
waitUntilConnected(t, c1, c2)
// publish a vp8 video track and ensure c2 receives it ok
t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
require.NoError(t, err)
defer t1.Stop()
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
require.NoError(t, err)
defer t2.Stop()
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2 was not subscribed to anything"
}
// should have received two tracks
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
return "c2 was not subscribed to tracks from c1"
}
tracks := c2.SubscribedTracks()[c1.ID()]
for _, t := range tracks {
if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
return ""
}
}
return "did not receive track with vp8"
})
require.Nil(t, c2.GetSubscriptionResponseAndClear())
// publish a h264 track and ensure c2 got subscription error
t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{
MimeType: "video/h264",
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
}, "videoscreen", "screen")
defer t3.Stop()
require.NoError(t, err)
var h264TrackID string
require.Eventually(t, func() bool {
remoteC1 := c2.GetRemoteParticipant(c1.ID())
require.NotNil(t, remoteC1)
for _, track := range remoteC1.Tracks {
if mime.IsMimeTypeStringH264(track.MimeType) {
h264TrackID = track.Sid
return true
}
}
return false
}, time.Second, 10*time.Millisecond, "did not receive track info with h264")
require.Eventually(t, func() bool {
sr := c2.GetSubscriptionResponseAndClear()
if sr == nil {
return false
}
require.Equal(t, h264TrackID, sr.TrackSid)
require.Equal(t, livekit.SubscriptionError_SE_CODEC_UNSUPPORTED, sr.Err)
return true
}, 5*time.Second, 10*time.Millisecond, "did not receive subscription response")
// publish another vp8 track again, ensure the transport recovered by sfu and c2 can receive it
t4, err := c1.AddStaticTrack("video/vp8", "video2", "webcam2")
require.NoError(t, err)
defer t4.Stop()
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2 was not subscribed to anything"
}
// should have received two tracks
if len(c2.SubscribedTracks()[c1.ID()]) != 3 {
return "c2 was not subscribed to tracks from c1"
}
var vp8Count int
tracks := c2.SubscribedTracks()[c1.ID()]
for _, t := range tracks {
if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
vp8Count++
}
}
if vp8Count == 2 {
return ""
}
return "did not 2 receive track with vp8"
})
require.Nil(t, c2.GetSubscriptionResponseAndClear())
})
}
}
func TestDataPublishSlowSubscriber(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
dataChannelSlowThreshold := 21024
logger.Infow("----------------STARTING TEST----------------", "test", t.Name())
s := createSingleNodeServer(func(c *config.Config) {
c.RTC.DatachannelSlowThreshold = dataChannelSlowThreshold
})
go func() {
if err := s.Start(); err != nil {
logger.Errorw("server returned error", err)
}
}()
waitForServerToStart(s)
defer func() {
s.Stop(true)
logger.Infow("----------------FINISHING TEST----------------", "test", t.Name())
}()
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
pub := createRTCClient("pub", defaultServerPort, testRTCServicePath, nil)
fastSub := createRTCClient("fastSub", defaultServerPort, testRTCServicePath, nil)
slowSubNotDrop := createRTCClient("slowSubNotDrop", defaultServerPort, testRTCServicePath, nil)
slowSubDrop := createRTCClient("slowSubDrop", defaultServerPort, testRTCServicePath, nil)
waitUntilConnected(t, pub, fastSub, slowSubDrop, slowSubNotDrop)
defer func() {
pub.Stop()
fastSub.Stop()
slowSubNotDrop.Stop()
slowSubDrop.Stop()
}()
// no data should be dropped for fast subscriber
var fastDataIndex atomic.Uint64
fastSub.OnDataReceived = func(data []byte, sid string) {
idx := binary.BigEndian.Uint64(data[len(data)-8:])
require.Equal(t, fastDataIndex.Load()+1, idx)
fastDataIndex.Store(idx)
}
// no data should be dropped for slow subscriber that is above threshold
var slowNoDropDataIndex atomic.Uint64
var drainSlowSubNotDrop atomic.Bool
slowNoDropReader := testclient.NewDataChannelReader(dataChannelSlowThreshold * 2)
slowSubNotDrop.OnDataReceived = func(data []byte, sid string) {
idx := binary.BigEndian.Uint64(data[len(data)-8:])
require.Equal(t, slowNoDropDataIndex.Load()+1, idx)
slowNoDropDataIndex.Store(idx)
if !drainSlowSubNotDrop.Load() {
slowNoDropReader.Read(data, sid)
}
}
// data should be dropped for slow subscriber that is below threshold
var slowDropDataIndex atomic.Uint64
dropped := make(chan struct{})
slowDropReader := testclient.NewDataChannelReader(dataChannelSlowThreshold / 2)
slowSubDrop.OnDataReceived = func(data []byte, sid string) {
select {
case <-dropped:
return
default:
}
idx := binary.BigEndian.Uint64(data[len(data)-8:])
if idx != slowDropDataIndex.Load()+1 {
close(dropped)
}
slowDropDataIndex.Store(idx)
slowDropReader.Read(data, sid)
}
// publisher sends data as fast as possible, it will block by the slowest subscriber above the slow threshold
var (
blocked atomic.Bool
stopWrite atomic.Bool
writeIdx atomic.Uint64
)
writeStopped := make(chan struct{})
go func() {
defer close(writeStopped)
var i int
buf := make([]byte, 100)
for !stopWrite.Load() {
i++
binary.BigEndian.PutUint64(buf[len(buf)-8:], uint64(i))
if err := pub.PublishData(buf, livekit.DataPacket_RELIABLE); err != nil {
if errors.Is(err, datachannel.ErrDataDroppedBySlowReader) {
blocked.Store(true)
i--
continue
} else {
t.Log("error writing", err)
break
}
}
writeIdx.Store(uint64(i))
}
}()
<-dropped
time.Sleep(time.Second)
blocked.Store(false)
require.Eventually(t, func() bool { return blocked.Load() }, 30*time.Second, 100*time.Millisecond)
stopWrite.Store(true)
<-writeStopped
drainSlowSubNotDrop.Store(true)
require.Eventually(t, func() bool {
return writeIdx.Load() == fastDataIndex.Load() &&
writeIdx.Load() == slowNoDropDataIndex.Load()
}, 10*time.Second, 50*time.Millisecond, "writeIdx %d, fast %d, slowNoDrop %d", writeIdx.Load(), fastDataIndex.Load(), slowNoDropDataIndex.Load())
})
}
}
func TestFireTrackBySdp(t *testing.T) {
_, finish := setupSingleNodeTest("TestFireTrackBySdp")
defer finish()
var cases = []struct {
name string
codecs []webrtc.RTPCodecCapability
pubSDK livekit.ClientInfo_SDK
}{
{
name: "js client could pub a/v tracks",
codecs: []webrtc.RTPCodecCapability{
{MimeType: mime.MimeTypeH264.String()},
{MimeType: mime.MimeTypeOpus.String()},
},
pubSDK: livekit.ClientInfo_JS,
},
{
name: "go client could pub audio tracks",
codecs: []webrtc.RTPCodecCapability{
{MimeType: "audio/opus"},
},
pubSDK: livekit.ClientInfo_GO,
},
}
for _, c := range cases {
codecs, sdk := c.codecs, c.pubSDK
t.Run(c.name, func(t *testing.T) {
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
c1 := createRTCClient(c.name+"_c1", defaultServerPort, testRTCServicePath, &testclient.Options{
ClientInfo: &livekit.ClientInfo{
Sdk: sdk,
},
})
c2 := createRTCClient(c.name+"_c2", defaultServerPort, testRTCServicePath, &testclient.Options{
AutoSubscribe: true,
ClientInfo: &livekit.ClientInfo{
Sdk: livekit.ClientInfo_JS,
},
})
waitUntilConnected(t, c1, c2)
defer func() {
c1.Stop()
c2.Stop()
}()
// publish tracks and don't write any packets
for _, codec := range codecs {
_, err := c1.AddStaticTrackWithCodec(codec, codec.MimeType, codec.MimeType, testclient.AddTrackNoWriter())
require.NoError(t, err)
}
require.Eventually(t, func() bool {
return len(c2.SubscribedTracks()[c1.ID()]) == len(codecs)
}, 5*time.Second, 10*time.Millisecond)
var found int
for _, pubTrack := range c1.GetPublishedTrackIDs() {
t.Log("pub track", pubTrack)
tracks := c2.SubscribedTracks()[c1.ID()]
for _, track := range tracks {
t.Log("sub track", track.ID(), track.Codec())
if track.Codec().PayloadType == 0 && track.ID() == pubTrack {
found++
break
}
}
}
require.Equal(t, len(codecs), found)
})
}
})
}
}
func TestSinglePublisherDataTrack(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
s, finish := setupSingleNodeTest("TestSinglePublisherDataTrack")
defer finish()
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
waitUntilConnected(t, c1, c2)
// publish a couple of data tracks and ensure clients receive it ok
dt1, err := c1.PublishDataTrack()
require.NoError(t, err)
defer dt1.Stop()
dt2, err := c1.PublishDataTrack()
require.NoError(t, err)
defer dt2.Stop()
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedDataTracks()) == 0 {
return "c2 was not subscribed to any data tracks"
}
// should have received two data tracks
if len(c2.SubscribedDataTracks()[c1.ID()]) != 2 {
return "c2 didn't subscribe to both data tracks from c1"
}
return ""
})
// a new client joins and should get the initial stream
c3 := createRTCClient("c3", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
// ensure that new client that has joined also received data tracks
waitUntilConnected(t, c3)
testutils.WithTimeout(t, func() string {
if len(c3.SubscribedDataTracks()) == 0 {
return "c3 didn't subscribe to any data tracks"
}
// should have received two data tracks
if len(c3.SubscribedDataTracks()[c1.ID()]) != 2 {
return "c3 didn't subscribe to tracks from c1"
}
return ""
})
// ensure that the data track ids are generated by server
tracks := c3.SubscribedDataTracks()[c1.ID()]
for _, tr := range tracks {
require.True(t, strings.HasPrefix(string(tr.ID()), "DTR_"), "data track should begin with DTR")
}
// when c3 disconnects, ensure subscriber is cleaned up correctly
c3.Stop()
testutils.WithTimeout(t, func() string {
room := s.RoomManager().GetRoom(context.Background(), testRoom)
p := room.GetParticipant("c1")
require.NotNil(t, p)
for _, t := range p.GetPublishedDataTracks() {
if t.IsSubscriber(c3.ID()) {
return "c3 was not a subscriber of c1's data tracks"
}
}
return ""
})
})
}
}
func TestTurnRelay(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
s := createSingleNodeServer(func(c *config.Config) {
c.TURN.Enabled = true
c.TURN.UDPPort = 3478
})
go func() {
if err := s.Start(); err != nil {
logger.Errorw("server returned error", err)
}
}()
defer s.Stop(true)
waitForServerToStart(s)
c1 := createRTCClient("relay_c1", defaultServerPort, testRTCServicePathv0, &testclient.Options{
AutoSubscribe: true,
ForceRelay: true,
})
defer c1.Stop()
waitUntilConnected(t, c1)
testutils.WithTimeout(t, func() string {
if !c1.IsLocalCandidateRelaySelected() {
return "expected local candidate to be relay"
}
return ""
})
}