mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 13:25:42 +00:00
Some checks failed
Test / test (push) Failing after 17s
Release to Docker / docker (push) Failing after 3m42s
* Key telemetry stats work using combination of roomID, participantID With forwarded participant, the same participantID can existing in two rooms. NOTE: This does not yet allow a participant session to report its events/track stats into multiple rooms. That would require regitering multiple listeners (from rooms a participant is forwarded to). * missed file * data channel stats * PR comments + pass in room name so that telemetry events have proper room name also
633 lines
21 KiB
Go
633 lines
21 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package telemetry_test
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/livekit/livekit-server/pkg/telemetry"
|
|
"github.com/livekit/protocol/livekit"
|
|
|
|
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
|
"github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes"
|
|
)
|
|
|
|
func init() {
|
|
prometheus.Init("test", livekit.NodeType_SERVER)
|
|
}
|
|
|
|
type telemetryServiceFixture struct {
|
|
sut telemetry.TelemetryService
|
|
analytics *telemetryfakes.FakeAnalyticsService
|
|
}
|
|
|
|
func createFixture() *telemetryServiceFixture {
|
|
fixture := &telemetryServiceFixture{}
|
|
fixture.analytics = &telemetryfakes.FakeAnalyticsService{}
|
|
fixture.sut = telemetry.NewTelemetryService(nil, fixture.analytics)
|
|
return fixture
|
|
}
|
|
|
|
func Test_ParticipantAndRoomDataAreSentWithAnalytics(t *testing.T) {
|
|
fixture := createFixture()
|
|
|
|
// prepare
|
|
room := &livekit.Room{Sid: "RoomSid", Name: "RoomName"}
|
|
partSID := livekit.ParticipantID("part1")
|
|
clientInfo := &livekit.ClientInfo{Sdk: 2}
|
|
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
|
|
guard := &telemetry.ReferenceGuard{}
|
|
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil, true, guard)
|
|
|
|
// do
|
|
packet := 33
|
|
stat := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet)}}}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, ""), stat)
|
|
|
|
// flush
|
|
fixture.flush()
|
|
|
|
// test
|
|
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
|
|
_, stats := fixture.analytics.SendStatsArgsForCall(0)
|
|
require.Equal(t, 1, len(stats))
|
|
require.Equal(t, livekit.StreamType_DOWNSTREAM, stats[0].Kind)
|
|
require.Equal(t, string(partSID), stats[0].ParticipantId)
|
|
require.Equal(t, room.Sid, stats[0].RoomId)
|
|
require.Equal(t, room.Name, stats[0].RoomName)
|
|
}
|
|
|
|
func Test_OnDownstreamPackets(t *testing.T) {
|
|
fixture := createFixture()
|
|
|
|
// prepare
|
|
room := &livekit.Room{Sid: "RoomSid", Name: "RoomName"}
|
|
partSID := livekit.ParticipantID("part1")
|
|
clientInfo := &livekit.ClientInfo{Sdk: 2}
|
|
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
|
|
guard := &telemetry.ReferenceGuard{}
|
|
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil, true, guard)
|
|
|
|
// do
|
|
packets := []int{33, 23}
|
|
totalBytes := packets[0] + packets[1]
|
|
totalPackets := len(packets)
|
|
trackID := livekit.TrackID("trackID")
|
|
for i := range packets {
|
|
stat := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packets[i]), PrimaryPackets: uint32(1)}}}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat)
|
|
}
|
|
|
|
// flush
|
|
fixture.flush()
|
|
|
|
// test
|
|
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
|
|
_, stats := fixture.analytics.SendStatsArgsForCall(0)
|
|
require.Equal(t, 1, len(stats))
|
|
require.Equal(t, livekit.StreamType_DOWNSTREAM, stats[0].Kind)
|
|
require.Equal(t, totalBytes, int(stats[0].Streams[0].PrimaryBytes))
|
|
require.Equal(t, totalPackets, int(stats[0].Streams[0].PrimaryPackets))
|
|
require.Equal(t, string(trackID), stats[0].TrackId)
|
|
}
|
|
|
|
func Test_OnDownstreamPackets_SeveralTracks(t *testing.T) {
|
|
fixture := createFixture()
|
|
|
|
// prepare
|
|
room := &livekit.Room{Sid: "RoomSid", Name: "RoomName"}
|
|
partSID := livekit.ParticipantID("part1")
|
|
clientInfo := &livekit.ClientInfo{Sdk: 2}
|
|
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
|
|
guard := &telemetry.ReferenceGuard{}
|
|
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil, true, guard)
|
|
|
|
// do
|
|
packet1 := 33
|
|
trackID1 := livekit.TrackID("trackID1")
|
|
stat1 := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet1), PrimaryPackets: 1}}}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat1)
|
|
|
|
packet2 := 23
|
|
trackID2 := livekit.TrackID("trackID2")
|
|
stat2 := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet2), PrimaryPackets: 1}}}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID2), stat2)
|
|
|
|
// flush
|
|
fixture.flush()
|
|
|
|
// test
|
|
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
|
|
_, stats := fixture.analytics.SendStatsArgsForCall(0)
|
|
require.Equal(t, 2, len(stats))
|
|
|
|
found1 := false
|
|
found2 := false
|
|
for _, sentStat := range stats {
|
|
if livekit.TrackID(sentStat.TrackId) == trackID1 {
|
|
found1 = true
|
|
require.Equal(t, packet1, int(sentStat.Streams[0].PrimaryBytes))
|
|
require.Equal(t, 1, int(sentStat.Streams[0].PrimaryPackets))
|
|
} else if livekit.TrackID(sentStat.TrackId) == trackID2 {
|
|
found2 = true
|
|
require.Equal(t, packet2, int(sentStat.Streams[0].PrimaryBytes))
|
|
require.Equal(t, 1, int(sentStat.Streams[0].PrimaryPackets))
|
|
}
|
|
}
|
|
require.True(t, found1)
|
|
require.True(t, found2)
|
|
}
|
|
|
|
func Test_OnDownStreamStat(t *testing.T) {
|
|
fixture := createFixture()
|
|
|
|
// prepare
|
|
room := &livekit.Room{Sid: "RoomSid", Name: "RoomName"}
|
|
partSID := livekit.ParticipantID("part1")
|
|
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
|
|
guard := &telemetry.ReferenceGuard{}
|
|
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true, guard)
|
|
|
|
// do
|
|
stat1 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: 1,
|
|
PrimaryPackets: 1,
|
|
PacketsLost: 3,
|
|
Nacks: 1,
|
|
Plis: 1,
|
|
Rtt: 23,
|
|
Jitter: 3,
|
|
},
|
|
},
|
|
}
|
|
trackID := livekit.TrackID("trackID1")
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat1)
|
|
|
|
stat2 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: 2,
|
|
PrimaryPackets: 2,
|
|
PacketsLost: 4,
|
|
Nacks: 1,
|
|
Plis: 1,
|
|
Firs: 1,
|
|
Rtt: 10,
|
|
Jitter: 5,
|
|
},
|
|
},
|
|
}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat2)
|
|
|
|
// flush
|
|
fixture.flush()
|
|
|
|
// test
|
|
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
|
|
_, stats := fixture.analytics.SendStatsArgsForCall(0)
|
|
require.Equal(t, 1, len(stats))
|
|
require.Equal(t, livekit.StreamType_DOWNSTREAM, stats[0].Kind)
|
|
require.Equal(t, 2, int(stats[0].Streams[0].Nacks))
|
|
require.Equal(t, 2, int(stats[0].Streams[0].Plis))
|
|
require.Equal(t, 1, int(stats[0].Streams[0].Firs))
|
|
require.Equal(t, 23, int(stats[0].Streams[0].Rtt)) // max of RTT
|
|
require.Equal(t, 5, int(stats[0].Streams[0].Jitter)) // max of jitter
|
|
require.Equal(t, 7, int(stats[0].Streams[0].PacketsLost)) // coalesced delta packet losses
|
|
require.Equal(t, string(trackID), stats[0].TrackId)
|
|
}
|
|
|
|
func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) {
|
|
fixture := createFixture()
|
|
|
|
// prepare
|
|
room := &livekit.Room{Sid: "RoomSid", Name: "RoomName"}
|
|
partSID := livekit.ParticipantID("part1")
|
|
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
|
|
guard := &telemetry.ReferenceGuard{}
|
|
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true, guard)
|
|
|
|
// do
|
|
trackID := livekit.TrackID("trackID1")
|
|
stat1 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: 1,
|
|
PrimaryPackets: 1,
|
|
PacketsLost: 1,
|
|
},
|
|
},
|
|
}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat1) // there should be bytes reported so that stats are sent
|
|
|
|
// flush
|
|
fixture.flush()
|
|
|
|
stat2 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: 2,
|
|
PrimaryPackets: 2,
|
|
PacketsLost: 4,
|
|
},
|
|
},
|
|
}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat2)
|
|
|
|
// flush
|
|
fixture.flush()
|
|
|
|
// test
|
|
require.Equal(t, 2, fixture.analytics.SendStatsCallCount()) // 2 calls to fixture.sut.FlushStats()
|
|
_, stats := fixture.analytics.SendStatsArgsForCall(0)
|
|
require.Equal(t, 1, len(stats))
|
|
require.Equal(t, livekit.StreamType_DOWNSTREAM, stats[0].Kind)
|
|
require.Equal(t, 1, int(stats[0].Streams[0].PacketsLost)) // see pkts1
|
|
|
|
_, stats = fixture.analytics.SendStatsArgsForCall(1)
|
|
require.Equal(t, 1, len(stats))
|
|
require.Equal(t, livekit.StreamType_DOWNSTREAM, stats[0].Kind)
|
|
require.Equal(t, 4, int(stats[0].Streams[0].PacketsLost)) // delta loss should be sent as is
|
|
}
|
|
|
|
func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) {
|
|
fixture := createFixture()
|
|
|
|
// prepare
|
|
room := &livekit.Room{Sid: "RoomSid", Name: "RoomName"}
|
|
partSID := livekit.ParticipantID("part1")
|
|
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
|
|
guard := &telemetry.ReferenceGuard{}
|
|
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true, guard)
|
|
|
|
// do
|
|
trackID1 := livekit.TrackID("trackID1")
|
|
trackID2 := livekit.TrackID("trackID2")
|
|
stat1 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: 1,
|
|
PrimaryPackets: 1,
|
|
},
|
|
},
|
|
}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat1) // there should be bytes reported so that stats are sent
|
|
|
|
stat2 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: 2,
|
|
PrimaryPackets: 2,
|
|
Nacks: 1,
|
|
},
|
|
},
|
|
}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat2)
|
|
|
|
stat3 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: 3,
|
|
PrimaryPackets: 3,
|
|
Firs: 1,
|
|
},
|
|
},
|
|
}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID2), stat3)
|
|
|
|
// flush
|
|
fixture.flush()
|
|
|
|
// test
|
|
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
|
|
_, stats := fixture.analytics.SendStatsArgsForCall(0)
|
|
require.Equal(t, 2, len(stats))
|
|
|
|
found1 := false
|
|
found2 := false
|
|
for _, sentStat := range stats {
|
|
if livekit.TrackID(sentStat.TrackId) == trackID1 {
|
|
found1 = true
|
|
require.Equal(t, livekit.StreamType_DOWNSTREAM, sentStat.Kind)
|
|
require.Equal(t, 1, int(sentStat.Streams[0].Nacks)) // see pkts1 above
|
|
} else if livekit.TrackID(sentStat.TrackId) == trackID2 {
|
|
found2 = true
|
|
require.Equal(t, livekit.StreamType_DOWNSTREAM, sentStat.Kind)
|
|
require.Equal(t, 1, int(sentStat.Streams[0].Firs)) // see pkts2 above
|
|
}
|
|
}
|
|
require.True(t, found1)
|
|
require.True(t, found2)
|
|
}
|
|
|
|
func Test_OnUpstreamStat(t *testing.T) {
|
|
fixture := createFixture()
|
|
|
|
// prepare
|
|
room := &livekit.Room{Sid: "RoomSid", Name: "RoomName"}
|
|
partSID := livekit.ParticipantID("part1")
|
|
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
|
|
guard := &telemetry.ReferenceGuard{}
|
|
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true, guard)
|
|
|
|
// do
|
|
stat1 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: 1,
|
|
PrimaryPackets: 1,
|
|
PacketsLost: 3,
|
|
Nacks: 1,
|
|
Plis: 1,
|
|
Firs: 1,
|
|
Rtt: 13,
|
|
Jitter: 5,
|
|
},
|
|
},
|
|
}
|
|
trackID := livekit.TrackID("trackID")
|
|
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID), stat1)
|
|
|
|
stat2 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: 2,
|
|
PrimaryPackets: 2,
|
|
PacketsLost: 4,
|
|
Nacks: 1,
|
|
Plis: 1,
|
|
Firs: 1,
|
|
Rtt: 33,
|
|
Jitter: 2,
|
|
},
|
|
},
|
|
}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID), stat2)
|
|
|
|
// flush
|
|
fixture.flush()
|
|
|
|
// test
|
|
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
|
|
_, stats := fixture.analytics.SendStatsArgsForCall(0)
|
|
require.Equal(t, 1, len(stats))
|
|
require.Equal(t, livekit.StreamType_UPSTREAM, stats[0].Kind)
|
|
require.Equal(t, 2, int(stats[0].Streams[0].Nacks))
|
|
require.Equal(t, 2, int(stats[0].Streams[0].Plis))
|
|
require.Equal(t, 2, int(stats[0].Streams[0].Firs))
|
|
require.Equal(t, 33, int(stats[0].Streams[0].Rtt)) // max of RTT
|
|
require.Equal(t, 5, int(stats[0].Streams[0].Jitter)) // max of jitter
|
|
require.Equal(t, 7, int(stats[0].Streams[0].PacketsLost)) // coalesced delta packet losses
|
|
require.Equal(t, string(trackID), stats[0].TrackId)
|
|
}
|
|
|
|
func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
|
|
fixture := createFixture()
|
|
|
|
// prepare
|
|
room := &livekit.Room{Sid: "RoomSid", Name: "RoomName"}
|
|
partSID := livekit.ParticipantID("part1")
|
|
identity := livekit.ParticipantIdentity("part1Identity")
|
|
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID), Identity: string(identity)}
|
|
guard := &telemetry.ReferenceGuard{}
|
|
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true, guard)
|
|
|
|
// there should be bytes reported so that stats are sent
|
|
totalBytes := 1
|
|
totalPackets := 1
|
|
trackID1 := livekit.TrackID("trackID1")
|
|
trackID2 := livekit.TrackID("trackID2")
|
|
|
|
stat1 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: uint64(totalBytes),
|
|
PrimaryPackets: uint32(totalPackets),
|
|
},
|
|
},
|
|
}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID1), stat1)
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID2), stat1) // using same buffer is not correct but for test it is fine
|
|
|
|
// do
|
|
totalBytes++
|
|
totalPackets++
|
|
stat2 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: uint64(totalBytes),
|
|
PrimaryPackets: uint32(totalPackets),
|
|
Nacks: 1,
|
|
},
|
|
},
|
|
}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID1), stat2)
|
|
|
|
stat3 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: uint64(totalBytes),
|
|
PrimaryPackets: uint32(totalPackets),
|
|
Firs: 1,
|
|
},
|
|
},
|
|
}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID2), stat3)
|
|
|
|
// flush
|
|
fixture.flush()
|
|
|
|
// test
|
|
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
|
|
_, stats := fixture.analytics.SendStatsArgsForCall(0)
|
|
require.Equal(t, 2, len(stats))
|
|
|
|
found1 := false
|
|
found2 := false
|
|
for _, sentStat := range stats {
|
|
if livekit.TrackID(sentStat.TrackId) == trackID1 {
|
|
found1 = true
|
|
require.Equal(t, livekit.StreamType_UPSTREAM, sentStat.Kind)
|
|
require.Equal(t, 1, int(sentStat.Streams[0].Nacks)) // see pkts1 above
|
|
} else if livekit.TrackID(sentStat.TrackId) == trackID2 {
|
|
found2 = true
|
|
require.Equal(t, livekit.StreamType_UPSTREAM, sentStat.Kind)
|
|
require.Equal(t, 1, int(sentStat.Streams[0].Firs)) // see pkts2 above
|
|
}
|
|
require.Equal(t, 3, int(sentStat.Streams[0].PrimaryBytes))
|
|
require.Equal(t, 3, int(sentStat.Streams[0].PrimaryPackets))
|
|
}
|
|
require.True(t, found1)
|
|
require.True(t, found2)
|
|
|
|
// remove 1 track - track stats were flushed above, so no more calls to SendStats
|
|
fixture.sut.TrackUnpublished(context.Background(), livekit.RoomID(room.Sid), livekit.RoomName(room.Name), partSID, identity, &livekit.TrackInfo{Sid: string(trackID2)}, true)
|
|
|
|
// flush
|
|
fixture.flush()
|
|
|
|
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
|
|
}
|
|
|
|
func Test_AnalyticsSentWhenParticipantLeaves(t *testing.T) {
|
|
fixture := createFixture()
|
|
|
|
// prepare
|
|
room := &livekit.Room{}
|
|
partSID := "part1"
|
|
participantInfo := &livekit.ParticipantInfo{Sid: partSID}
|
|
guard := &telemetry.ReferenceGuard{}
|
|
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true, guard)
|
|
|
|
// do
|
|
fixture.sut.ParticipantLeft(context.Background(), room, participantInfo, true, guard)
|
|
|
|
// should not be called if there are no track stats
|
|
time.Sleep(time.Millisecond * 500)
|
|
require.Equal(t, 0, fixture.analytics.SendStatsCallCount())
|
|
}
|
|
|
|
func Test_AddUpTrack(t *testing.T) {
|
|
fixture := createFixture()
|
|
|
|
// prepare
|
|
room := &livekit.Room{Sid: "RoomSid", Name: "RoomName"}
|
|
partSID := livekit.ParticipantID("part1")
|
|
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
|
|
guard := &telemetry.ReferenceGuard{}
|
|
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true, guard)
|
|
|
|
// do
|
|
var totalBytes uint64 = 3
|
|
var totalPackets uint32 = 3
|
|
|
|
stat := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: totalBytes,
|
|
PrimaryPackets: totalPackets,
|
|
},
|
|
},
|
|
}
|
|
trackID := livekit.TrackID("trackID")
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID), stat)
|
|
|
|
// flush
|
|
fixture.flush()
|
|
|
|
// test
|
|
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
|
|
_, stats := fixture.analytics.SendStatsArgsForCall(0)
|
|
require.Equal(t, 1, len(stats))
|
|
require.Equal(t, livekit.StreamType_UPSTREAM, stats[0].Kind)
|
|
require.Equal(t, totalBytes, stats[0].Streams[0].PrimaryBytes)
|
|
require.Equal(t, totalPackets, stats[0].Streams[0].PrimaryPackets)
|
|
require.Equal(t, string(trackID), stats[0].TrackId)
|
|
}
|
|
|
|
func Test_AddUpTrack_SeveralBuffers_Simulcast(t *testing.T) {
|
|
fixture := createFixture()
|
|
|
|
// prepare
|
|
room := &livekit.Room{Sid: "RoomSid", Name: "RoomName"}
|
|
partSID := livekit.ParticipantID("part1")
|
|
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
|
|
guard := &telemetry.ReferenceGuard{}
|
|
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true, guard)
|
|
|
|
// do
|
|
trackID := livekit.TrackID("trackID")
|
|
stat1 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: 1,
|
|
PrimaryPackets: 1,
|
|
},
|
|
{
|
|
PrimaryBytes: 2,
|
|
PrimaryPackets: 2,
|
|
},
|
|
},
|
|
}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID), stat1)
|
|
|
|
// flush
|
|
fixture.flush()
|
|
|
|
// test
|
|
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
|
|
_, stats := fixture.analytics.SendStatsArgsForCall(0)
|
|
require.Equal(t, 1, len(stats))
|
|
require.Equal(t, livekit.StreamType_UPSTREAM, stats[0].Kind)
|
|
// should be a consolidated stream
|
|
require.Equal(t, stat1.Streams[0].PrimaryBytes+stat1.Streams[1].PrimaryBytes, stats[0].Streams[0].PrimaryBytes)
|
|
require.Equal(t, stat1.Streams[0].PrimaryPackets+stat1.Streams[1].PrimaryPackets, stats[0].Streams[0].PrimaryPackets)
|
|
require.Equal(t, string(trackID), stats[0].TrackId)
|
|
}
|
|
|
|
func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) {
|
|
fixture := createFixture()
|
|
|
|
// prepare
|
|
room := &livekit.Room{Sid: "RoomSid", Name: "RoomName"}
|
|
partSID := livekit.ParticipantID("part1")
|
|
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
|
|
guard := &telemetry.ReferenceGuard{}
|
|
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true, guard)
|
|
|
|
// do
|
|
// upstream bytes
|
|
stat1 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: 3,
|
|
PrimaryPackets: 3,
|
|
},
|
|
},
|
|
}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, "trackID"), stat1)
|
|
// downstream bytes
|
|
stat2 := &livekit.AnalyticsStat{
|
|
Streams: []*livekit.AnalyticsStream{
|
|
{
|
|
PrimaryBytes: 1,
|
|
PrimaryPackets: 1,
|
|
},
|
|
},
|
|
}
|
|
fixture.sut.TrackStats(livekit.RoomID(room.Sid), livekit.RoomName(room.Name), telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, "trackID1"), stat2)
|
|
|
|
// flush
|
|
fixture.flush()
|
|
|
|
// test
|
|
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
|
|
_, stats := fixture.analytics.SendStatsArgsForCall(0)
|
|
require.Equal(t, 2, len(stats))
|
|
require.Equal(t, livekit.StreamType_UPSTREAM, stats[0].Kind)
|
|
require.Equal(t, livekit.StreamType_DOWNSTREAM, stats[1].Kind)
|
|
}
|
|
|
|
func (f *telemetryServiceFixture) flush() {
|
|
time.Sleep(time.Millisecond * 500)
|
|
f.sut.FlushStats()
|
|
}
|