Files
livekit/test/singlenode_test.go
Raja Subramanian 9551c52c85 Try 2 to consolidate mime type (#3407)
* Normalize mime type and add utilities.

An attempt to normalize mime type and avoid string compares remembering
to do case insensitive search.

Not the best solution. Open to ideas. But, define our own mime types
(just in case Pion changes things and Pion also does not have red mime
type defined which should be easy to add though) and tried to use it everywhere.
But, as we get a bunch of callbacks and info from Pion, needed conversion in
more places than I anticipated. And also makes it necessary to carry
that cognitive load of what comes from Pion and needing to process it
properly.

* more locations

* test

* Paul feedback

* MimeType type

* more consolidation

* Remove unused

* test

* test

* mime type as int

* use string method

* Pass error details and timeouts. (#3402)

* go mod tidy (#3408)

* Rename CHANGELOG to CHANGELOG.md (#3391)

Enables markdown features in this otherwise already markdown'ish formatted document

* Update config.go to properly process bool env vars (#3382)

Fixes issue https://github.com/livekit/livekit/issues/3381

* fix(deps): update go deps (#3341)

Generated by renovateBot

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

* Use a Twirp server hook to send API call details to telemetry. (#3401)

* Use a Twirp server hook to send API call details to telemetry.

* mage generate and clean up

* Add project_id

* deps

* - Redact requests
- Do not store responses
- Extract top level fields room_name, room_id, participant_identity,
  participant_id, track_id as appropriate
- Store status as int

* deps

* Update pkg/sfu/mime/mimetype.go

* Fix prefer codec test

* handle down track mime changes

---------

Co-authored-by: Denys Smirnov <dennwc@pm.me>
Co-authored-by: Philzen <Philzen@users.noreply.github.com>
Co-authored-by: Pablo Fuente Pérez <pablofuenteperez@gmail.com>
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Co-authored-by: Paul Wells <paulwe@gmail.com>
Co-authored-by: cnderrauber <zengjie9004@gmail.com>
2025-02-10 10:44:15 +05:30

904 lines
25 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"context"
"encoding/binary"
"errors"
"fmt"
"net/http"
"strings"
"testing"
"time"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v4"
"github.com/stretchr/testify/require"
"github.com/thoas/go-funk"
"github.com/twitchtv/twirp"
"go.uber.org/atomic"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/sfu/datachannel"
"github.com/livekit/livekit-server/pkg/sfu/mime"
"github.com/livekit/livekit-server/pkg/testutils"
testclient "github.com/livekit/livekit-server/test/client"
)
const (
waitTick = 10 * time.Millisecond
waitTimeout = 5 * time.Second
)
func TestClientCouldConnect(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestClientCouldConnect")
defer finish()
c1 := createRTCClient("c1", defaultServerPort, nil)
c2 := createRTCClient("c2", defaultServerPort, nil)
waitUntilConnected(t, c1, c2)
// ensure they both see each other
testutils.WithTimeout(t, func() string {
if len(c1.RemoteParticipants()) == 0 {
return "c1 did not see c2"
}
if len(c2.RemoteParticipants()) == 0 {
return "c2 did not see c1"
}
return ""
})
}
func TestClientConnectDuplicate(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestClientCouldConnect")
defer finish()
grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom}
grant.SetCanPublish(true)
grant.SetCanSubscribe(true)
token := joinTokenWithGrant("c1", grant)
c1 := createRTCClientWithToken(token, defaultServerPort, nil)
// publish 2 tracks
t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
require.NoError(t, err)
defer t1.Stop()
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
require.NoError(t, err)
defer t2.Stop()
c2 := createRTCClient("c2", defaultServerPort, nil)
waitUntilConnected(t, c1, c2)
opts := &testclient.Options{
Publish: "duplicate_connection",
}
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2 didn't subscribe to anything"
}
// should have received three tracks
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
return "c2 didn't subscribe to both tracks from c1"
}
// participant ID can be appended with '#..' . but should contain orig id as prefix
tr1 := c2.SubscribedTracks()[c1.ID()][0]
participantId1, _ := rtc.UnpackStreamID(tr1.StreamID())
require.Equal(t, c1.ID(), participantId1)
tr2 := c2.SubscribedTracks()[c1.ID()][1]
participantId2, _ := rtc.UnpackStreamID(tr2.StreamID())
require.Equal(t, c1.ID(), participantId2)
return ""
})
c1Dup := createRTCClientWithToken(token, defaultServerPort, opts)
waitUntilConnected(t, c1Dup)
t3, err := c1Dup.AddStaticTrack("video/vp8", "video", "webcam")
require.NoError(t, err)
defer t3.Stop()
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()[c1Dup.ID()]) != 1 {
return "c2 was not subscribed to track from duplicated c1"
}
tr3 := c2.SubscribedTracks()[c1Dup.ID()][0]
participantId3, _ := rtc.UnpackStreamID(tr3.StreamID())
require.Contains(t, c1Dup.ID(), participantId3)
return ""
})
}
func TestSinglePublisher(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
s, finish := setupSingleNodeTest("TestSinglePublisher")
defer finish()
c1 := createRTCClient("c1", defaultServerPort, nil)
c2 := createRTCClient("c2", defaultServerPort, nil)
waitUntilConnected(t, c1, c2)
// publish a track and ensure clients receive it ok
t1, err := c1.AddStaticTrack("audio/OPUS", "audio", "webcamaudio")
require.NoError(t, err)
defer t1.Stop()
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcamvideo")
require.NoError(t, err)
defer t2.Stop()
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2 was not subscribed to anything"
}
// should have received two tracks
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
return "c2 didn't subscribe to both tracks from c1"
}
tr1 := c2.SubscribedTracks()[c1.ID()][0]
participantId, _ := rtc.UnpackStreamID(tr1.StreamID())
require.Equal(t, c1.ID(), participantId)
return ""
})
// ensure mime type is received
remoteC1 := c2.GetRemoteParticipant(c1.ID())
audioTrack := funk.Find(remoteC1.Tracks, func(ti *livekit.TrackInfo) bool {
return ti.Name == "webcamaudio"
}).(*livekit.TrackInfo)
require.Equal(t, "audio/opus", audioTrack.MimeType)
// a new client joins and should get the initial stream
c3 := createRTCClient("c3", defaultServerPort, nil)
// ensure that new client that has joined also received tracks
waitUntilConnected(t, c3)
testutils.WithTimeout(t, func() string {
if len(c3.SubscribedTracks()) == 0 {
return "c3 didn't subscribe to anything"
}
// should have received two tracks
if len(c3.SubscribedTracks()[c1.ID()]) != 2 {
return "c3 didn't subscribe to tracks from c1"
}
return ""
})
// ensure that the track ids are generated by server
tracks := c3.SubscribedTracks()[c1.ID()]
for _, tr := range tracks {
require.True(t, strings.HasPrefix(tr.ID(), "TR_"), "track should begin with TR")
}
// when c3 disconnects, ensure subscriber is cleaned up correctly
c3.Stop()
testutils.WithTimeout(t, func() string {
room := s.RoomManager().GetRoom(context.Background(), testRoom)
p := room.GetParticipant("c1")
require.NotNil(t, p)
for _, t := range p.GetPublishedTracks() {
if t.IsSubscriber(c3.ID()) {
return "c3 was not a subscriber of c1's tracks"
}
}
return ""
})
}
func Test_WhenAutoSubscriptionDisabled_ClientShouldNotReceiveAnyPublishedTracks(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("Test_WhenAutoSubscriptionDisabled_ClientShouldNotReceiveAnyPublishedTracks")
defer finish()
opts := testclient.Options{AutoSubscribe: false}
publisher := createRTCClient("publisher", defaultServerPort, &opts)
client := createRTCClient("client", defaultServerPort, &opts)
defer publisher.Stop()
defer client.Stop()
waitUntilConnected(t, publisher, client)
track, err := publisher.AddStaticTrack("audio/opus", "audio", "webcam")
require.NoError(t, err)
defer track.Stop()
time.Sleep(syncDelay)
require.Empty(t, client.SubscribedTracks()[publisher.ID()])
}
func Test_RenegotiationWithDifferentCodecs(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestRenegotiationWithDifferentCodecs")
defer finish()
c1 := createRTCClient("c1", defaultServerPort, nil)
c2 := createRTCClient("c2", defaultServerPort, nil)
waitUntilConnected(t, c1, c2)
// publish a vp8 video track and ensure clients receive it ok
t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
require.NoError(t, err)
defer t1.Stop()
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
require.NoError(t, err)
defer t2.Stop()
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2 was not subscribed to anything"
}
// should have received two tracks
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
return "c2 was not subscribed to tracks from c1"
}
tracks := c2.SubscribedTracks()[c1.ID()]
for _, t := range tracks {
if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
return ""
}
}
return "did not receive track with vp8"
})
t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{
MimeType: "video/h264",
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
}, "videoscreen", "screen")
defer t3.Stop()
require.NoError(t, err)
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2's not subscribed to anything"
}
// should have received three tracks
if len(c2.SubscribedTracks()[c1.ID()]) != 3 {
return "c2's not subscribed to 3 tracks from c1"
}
var vp8Found, h264Found bool
tracks := c2.SubscribedTracks()[c1.ID()]
for _, t := range tracks {
if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
vp8Found = true
} else if mime.IsMimeTypeStringH264(t.Codec().MimeType) {
h264Found = true
}
}
if !vp8Found {
return "did not receive track with vp8"
}
if !h264Found {
return "did not receive track with h264"
}
return ""
})
}
func TestSingleNodeRoomList(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestSingleNodeRoomList")
defer finish()
roomServiceListRoom(t)
}
func TestSingleNodeUpdateParticipant(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestSingleNodeRoomList")
defer finish()
adminCtx := contextWithToken(adminRoomToken(testRoom))
t.Run("update nonexistent participant", func(t *testing.T) {
_, err := roomClient.UpdateParticipant(adminCtx, &livekit.UpdateParticipantRequest{
Room: testRoom,
Identity: "nonexistent",
Permission: &livekit.ParticipantPermission{
CanPublish: true,
},
})
require.Error(t, err)
var twErr twirp.Error
require.True(t, errors.As(err, &twErr))
// Note: for Cloud this would return 404, currently we are not able to differentiate between
// non-existent participant vs server being unavailable in OSS
require.Equal(t, twirp.Unavailable, twErr.Code())
})
}
// Ensure that CORS headers are returned
func TestSingleNodeCORS(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
s, finish := setupSingleNodeTest("TestSingleNodeCORS")
defer finish()
req, err := http.NewRequest("POST", fmt.Sprintf("http://localhost:%d", s.HTTPPort()), nil)
require.NoError(t, err)
req.Header.Set("Authorization", "bearer xyz")
req.Header.Set("Origin", "testhost.com")
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, "testhost.com", res.Header.Get("Access-Control-Allow-Origin"))
}
func TestSingleNodeDoubleSlash(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
s, finish := setupSingleNodeTest("TestSingleNodeDoubleSlash")
defer finish()
// client contains trailing slash in URL, causing path to contain double //
// without our middleware, this would cause a 302 redirect
roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d/", s.HTTPPort()), &http.Client{})
_, err := roomClient.ListRooms(contextWithToken(listRoomToken()), &livekit.ListRoomsRequest{})
require.NoError(t, err)
}
func TestPingPong(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestPingPong")
defer finish()
c1 := createRTCClient("c1", defaultServerPort, nil)
waitUntilConnected(t, c1)
require.NoError(t, c1.SendPing())
require.Eventually(t, func() bool {
return c1.PongReceivedAt() > 0
}, time.Second, 10*time.Millisecond)
}
func TestSingleNodeJoinAfterClose(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestJoinAfterClose")
defer finish()
scenarioJoinClosedRoom(t)
}
func TestSingleNodeCloseNonRTCRoom(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("closeNonRTCRoom")
defer finish()
closeNonRTCRoom(t)
}
func TestAutoCreate(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
disableAutoCreate := func(conf *config.Config) {
conf.Room.AutoCreate = false
}
t.Run("cannot join if room isn't created", func(t *testing.T) {
s := createSingleNodeServer(disableAutoCreate)
go func() {
if err := s.Start(); err != nil {
logger.Errorw("server returned error", err)
}
}()
defer s.Stop(true)
waitForServerToStart(s)
token := joinToken(testRoom, "start-before-create", nil)
_, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", defaultServerPort), token, nil)
require.Error(t, err)
// second join should also fail
token = joinToken(testRoom, "start-before-create-2", nil)
_, err = testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", defaultServerPort), token, nil)
require.Error(t, err)
})
t.Run("join with explicit createRoom", func(t *testing.T) {
s := createSingleNodeServer(disableAutoCreate)
go func() {
if err := s.Start(); err != nil {
logger.Errorw("server returned error", err)
}
}()
defer s.Stop(true)
waitForServerToStart(s)
// explicitly create
_, err := roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{Name: testRoom})
require.NoError(t, err)
c1 := createRTCClient("join-after-create", defaultServerPort, nil)
waitUntilConnected(t, c1)
c1.Stop()
})
}
// don't give user subscribe permissions initially, and ensure autosubscribe is triggered afterwards
func TestSingleNodeUpdateSubscriptionPermissions(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestSingleNodeUpdateSubscriptionPermissions")
defer finish()
pub := createRTCClient("pub", defaultServerPort, nil)
grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom}
grant.SetCanSubscribe(false)
at := auth.NewAccessToken(testApiKey, testApiSecret).
AddGrant(grant).
SetIdentity("sub")
token, err := at.ToJWT()
require.NoError(t, err)
sub := createRTCClientWithToken(token, defaultServerPort, nil)
waitUntilConnected(t, pub, sub)
writers := publishTracksForClients(t, pub)
defer stopWriters(writers...)
// wait sub receives tracks
testutils.WithTimeout(t, func() string {
pubRemote := sub.GetRemoteParticipant(pub.ID())
if pubRemote == nil {
return "could not find remote publisher"
}
if len(pubRemote.Tracks) != 2 {
return "did not receive metadata for published tracks"
}
return ""
})
// set permissions out of band
ctx := contextWithToken(adminRoomToken(testRoom))
_, err = roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{
Room: testRoom,
Identity: "sub",
Permission: &livekit.ParticipantPermission{
CanSubscribe: true,
CanPublish: true,
},
})
require.NoError(t, err)
testutils.WithTimeout(t, func() string {
tracks := sub.SubscribedTracks()[pub.ID()]
if len(tracks) == 2 {
return ""
} else {
return fmt.Sprintf("expected 2 tracks subscribed, actual: %d", len(tracks))
}
})
}
// TestDeviceCodecOverride checks that codecs that are incompatible with a device is not
// negotiated by the server
func TestDeviceCodecOverride(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestDeviceCodecOverride")
defer finish()
// simulate device that isn't compatible with H.264
c1 := createRTCClient("c1", defaultServerPort, &testclient.Options{
ClientInfo: &livekit.ClientInfo{
Os: "android",
DeviceModel: "Xiaomi 2201117TI",
},
})
defer c1.Stop()
waitUntilConnected(t, c1)
// it doesn't really matter what the codec set here is, uses default Pion MediaEngine codecs
tw, err := c1.AddStaticTrack("video/h264", "video", "webcam")
require.NoError(t, err)
defer stopWriters(tw)
// wait for server to receive track
require.Eventually(t, func() bool {
return c1.LastAnswer() != nil
}, waitTimeout, waitTick, "did not receive answer")
sd := webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: c1.LastAnswer().SDP,
}
answer, err := sd.Unmarshal()
require.NoError(t, err)
// video and data channel
require.Len(t, answer.MediaDescriptions, 2)
var desc *sdp.MediaDescription
for _, md := range answer.MediaDescriptions {
if md.MediaName.Media == "video" {
desc = md
break
}
}
require.NotNil(t, desc)
hasSeenVP8 := false
for _, a := range desc.Attributes {
if a.Key == "rtpmap" {
require.NotContains(t, a.Value, mime.MimeTypeCodecH264.String(), "should not contain H264 codec")
if strings.Contains(a.Value, mime.MimeTypeCodecVP8.String()) {
hasSeenVP8 = true
}
}
}
require.True(t, hasSeenVP8, "should have seen VP8 codec in SDP")
}
func TestSubscribeToCodecUnsupported(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("TestSubscribeToCodecUnsupported")
defer finish()
c1 := createRTCClient("c1", defaultServerPort, nil)
// create a client that doesn't support H264
c2 := createRTCClient("c2", defaultServerPort, &testclient.Options{
AutoSubscribe: true,
DisabledCodecs: []webrtc.RTPCodecCapability{
{MimeType: "video/H264"},
},
})
waitUntilConnected(t, c1, c2)
// publish a vp8 video track and ensure c2 receives it ok
t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
require.NoError(t, err)
defer t1.Stop()
t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
require.NoError(t, err)
defer t2.Stop()
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2 was not subscribed to anything"
}
// should have received two tracks
if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
return "c2 was not subscribed to tracks from c1"
}
tracks := c2.SubscribedTracks()[c1.ID()]
for _, t := range tracks {
if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
return ""
}
}
return "did not receive track with vp8"
})
require.Nil(t, c2.GetSubscriptionResponseAndClear())
// publish a h264 track and ensure c2 got subscription error
t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{
MimeType: "video/h264",
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
}, "videoscreen", "screen")
defer t3.Stop()
require.NoError(t, err)
var h264TrackID string
require.Eventually(t, func() bool {
remoteC1 := c2.GetRemoteParticipant(c1.ID())
require.NotNil(t, remoteC1)
for _, track := range remoteC1.Tracks {
if mime.IsMimeTypeStringH264(track.MimeType) {
h264TrackID = track.Sid
return true
}
}
return false
}, time.Second, 10*time.Millisecond, "did not receive track info with h264")
require.Eventually(t, func() bool {
sr := c2.GetSubscriptionResponseAndClear()
if sr == nil {
return false
}
require.Equal(t, h264TrackID, sr.TrackSid)
require.Equal(t, livekit.SubscriptionError_SE_CODEC_UNSUPPORTED, sr.Err)
return true
}, 5*time.Second, 10*time.Millisecond, "did not receive subscription response")
// publish another vp8 track again, ensure the transport recovered by sfu and c2 can receive it
t4, err := c1.AddStaticTrack("video/vp8", "video2", "webcam2")
require.NoError(t, err)
defer t4.Stop()
testutils.WithTimeout(t, func() string {
if len(c2.SubscribedTracks()) == 0 {
return "c2 was not subscribed to anything"
}
// should have received two tracks
if len(c2.SubscribedTracks()[c1.ID()]) != 3 {
return "c2 was not subscribed to tracks from c1"
}
var vp8Count int
tracks := c2.SubscribedTracks()[c1.ID()]
for _, t := range tracks {
if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
vp8Count++
}
}
if vp8Count == 2 {
return ""
}
return "did not 2 receive track with vp8"
})
require.Nil(t, c2.GetSubscriptionResponseAndClear())
}
func TestDataPublishSlowSubscriber(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
dataChannelSlowThreshold := 21024
logger.Infow("----------------STARTING TEST----------------", "test", t.Name())
s := createSingleNodeServer(func(c *config.Config) {
c.RTC.DatachannelSlowThreshold = dataChannelSlowThreshold
})
go func() {
if err := s.Start(); err != nil {
logger.Errorw("server returned error", err)
}
}()
waitForServerToStart(s)
defer func() {
s.Stop(true)
logger.Infow("----------------FINISHING TEST----------------", "test", t.Name())
}()
pub := createRTCClient("pub", defaultServerPort, nil)
fastSub := createRTCClient("fastSub", defaultServerPort, nil)
slowSubNotDrop := createRTCClient("slowSubNotDrop", defaultServerPort, nil)
slowSubDrop := createRTCClient("slowSubDrop", defaultServerPort, nil)
waitUntilConnected(t, pub, fastSub, slowSubDrop, slowSubNotDrop)
defer func() {
pub.Stop()
fastSub.Stop()
slowSubNotDrop.Stop()
slowSubDrop.Stop()
}()
// no data should be dropped for fast subscriber
var fastDataIndex atomic.Uint64
fastSub.OnDataReceived = func(data []byte, sid string) {
idx := binary.BigEndian.Uint64(data[len(data)-8:])
require.Equal(t, fastDataIndex.Load()+1, idx)
fastDataIndex.Store(idx)
}
// no data should be dropped for slow subscriber that is above threshold
var slowNoDropDataIndex atomic.Uint64
var drainSlowSubNotDrop atomic.Bool
slowNoDropReader := testclient.NewDataChannelReader(dataChannelSlowThreshold * 2)
slowSubNotDrop.OnDataReceived = func(data []byte, sid string) {
idx := binary.BigEndian.Uint64(data[len(data)-8:])
require.Equal(t, slowNoDropDataIndex.Load()+1, idx)
slowNoDropDataIndex.Store(idx)
if !drainSlowSubNotDrop.Load() {
slowNoDropReader.Read(data, sid)
}
}
// data should be dropped for slow subscriber that is below threshold
var slowDropDataIndex atomic.Uint64
dropped := make(chan struct{})
slowDropReader := testclient.NewDataChannelReader(dataChannelSlowThreshold / 2)
slowSubDrop.OnDataReceived = func(data []byte, sid string) {
select {
case <-dropped:
return
default:
}
idx := binary.BigEndian.Uint64(data[len(data)-8:])
if idx != slowDropDataIndex.Load()+1 {
close(dropped)
}
slowDropDataIndex.Store(idx)
slowDropReader.Read(data, sid)
}
// publisher sends data as fast as possible, it will block by the slowest subscriber above the slow threshold
var (
blocked atomic.Bool
stopWrite atomic.Bool
writeIdx atomic.Uint64
)
writeStopped := make(chan struct{})
go func() {
defer close(writeStopped)
var i int
buf := make([]byte, 100)
for !stopWrite.Load() {
i++
binary.BigEndian.PutUint64(buf[len(buf)-8:], uint64(i))
if err := pub.PublishData(buf, livekit.DataPacket_RELIABLE); err != nil {
if errors.Is(err, datachannel.ErrDataDroppedBySlowReader) {
blocked.Store(true)
i--
continue
} else {
t.Log("error writing", err)
break
}
}
writeIdx.Store(uint64(i))
}
}()
<-dropped
time.Sleep(time.Second)
blocked.Store(false)
require.Eventually(t, func() bool { return blocked.Load() }, 30*time.Second, 100*time.Millisecond)
stopWrite.Store(true)
<-writeStopped
drainSlowSubNotDrop.Store(true)
require.Eventually(t, func() bool {
return writeIdx.Load() == fastDataIndex.Load() &&
writeIdx.Load() == slowNoDropDataIndex.Load()
}, 10*time.Second, 50*time.Millisecond, "writeIdx %d, fast %d, slowNoDrop %d", writeIdx.Load(), fastDataIndex.Load(), slowNoDropDataIndex.Load())
}
func TestFireTrackBySdp(t *testing.T) {
_, finish := setupSingleNodeTest("TestFireTrackBySdp")
defer finish()
var cases = []struct {
name string
codecs []webrtc.RTPCodecCapability
pubSDK livekit.ClientInfo_SDK
}{
{
name: "js client could pub a/v tracks",
codecs: []webrtc.RTPCodecCapability{
{MimeType: "video/H264"},
{MimeType: "audio/opus"},
},
pubSDK: livekit.ClientInfo_JS,
},
{
name: "go client could pub audio tracks",
codecs: []webrtc.RTPCodecCapability{
{MimeType: "audio/opus"},
},
pubSDK: livekit.ClientInfo_GO,
},
}
for _, c := range cases {
codecs, sdk := c.codecs, c.pubSDK
t.Run(c.name, func(t *testing.T) {
c1 := createRTCClient(c.name+"_c1", defaultServerPort, &testclient.Options{
ClientInfo: &livekit.ClientInfo{
Sdk: sdk,
},
})
c2 := createRTCClient(c.name+"_c2", defaultServerPort, &testclient.Options{
AutoSubscribe: true,
ClientInfo: &livekit.ClientInfo{
Sdk: livekit.ClientInfo_JS,
},
})
waitUntilConnected(t, c1, c2)
defer func() {
c1.Stop()
c2.Stop()
}()
// publish tracks and don't write any packets
for _, codec := range codecs {
_, err := c1.AddStaticTrackWithCodec(codec, codec.MimeType, codec.MimeType, testclient.AddTrackNoWriter())
require.NoError(t, err)
}
require.Eventually(t, func() bool {
return len(c2.SubscribedTracks()[c1.ID()]) == len(codecs)
}, 5*time.Second, 10*time.Millisecond)
var found int
for _, pubTrack := range c1.GetPublishedTrackIDs() {
t.Log("pub track", pubTrack)
tracks := c2.SubscribedTracks()[c1.ID()]
for _, track := range tracks {
t.Log("sub track", track.ID(), track.Codec())
if track.Codec().PayloadType == 0 && track.ID() == pubTrack {
found++
break
}
}
}
require.Equal(t, len(codecs), found)
})
}
}