Telemetry and webhook improvements. (#535)

* Telemetry and webhook improvements.

* avoid blocking on telemetry channel - increase channel size and drop when full
* send ParticipantJoined webhook when fully joined (i.e. on ParticipantActive)
* send TrackPublished & TrackUnpublished webhooks
* increase number of parallel webhook workers to 50

* update protocol
This commit is contained in:
David Zhao
2022-03-18 23:20:33 -07:00
committed by GitHub
parent 5a9da8bee2
commit f14c452f8c
14 changed files with 213 additions and 126 deletions
+1 -1
View File
@@ -13,7 +13,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v0.11.14-0.20220316214650-51f5188bc422
github.com/livekit/protocol v0.11.14
github.com/mackerelio/go-osstat v0.2.1
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
+2 -2
View File
@@ -132,8 +132,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.11.14-0.20220316214650-51f5188bc422 h1:PQ+YgSJxL/UmXi46ThYSVbAl7BWyTAYEy9sCmnyokPg=
github.com/livekit/protocol v0.11.14-0.20220316214650-51f5188bc422/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90=
github.com/livekit/protocol v0.11.14 h1:KmFPWNMtrKMhwhdPZHMQ9Dj2DFH4XLzdvv1gTJlJJKM=
github.com/livekit/protocol v0.11.14/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90=
github.com/mackerelio/go-osstat v0.2.1 h1:5AeAcBEutEErAOlDz6WCkEvm6AKYgHTUQrfwm5RbeQc=
github.com/mackerelio/go-osstat v0.2.1/go.mod h1:UzRL8dMCCTqG5WdRtsxbuljMpZt9PCAGXqxPst5QtaY=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=
+7 -1
View File
@@ -10,7 +10,8 @@ import (
func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.ProtocolVersion, hidden bool) *typesfakes.FakeLocalParticipant {
p := &typesfakes.FakeLocalParticipant{}
p.IDReturns(livekit.ParticipantID(utils.NewGuid(utils.ParticipantPrefix)))
sid := utils.NewGuid(utils.ParticipantPrefix)
p.IDReturns(livekit.ParticipantID(sid))
p.IdentityReturns(identity)
p.StateReturns(livekit.ParticipantInfo_JOINED)
p.ProtocolVersionReturns(protocol)
@@ -18,6 +19,11 @@ func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.Pro
p.CanPublishReturns(!hidden)
p.CanPublishDataReturns(!hidden)
p.HiddenReturns(hidden)
p.ToProtoReturns(&livekit.ParticipantInfo{
Sid: sid,
Identity: string(identity),
State: livekit.ParticipantInfo_JOINED,
})
p.SetMetadataStub = func(m string) {
var f func(participant types.LocalParticipant)
+13 -2
View File
@@ -171,12 +171,23 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
t.RemoveAllSubscribers()
t.MediaTrackReceiver.Close()
t.MediaTrackReceiver.ClearReceiver()
t.params.Telemetry.TrackUnpublished(context.Background(), t.PublisherID(), t.ToProto(), uint32(track.SSRC()))
t.params.Telemetry.TrackUnpublished(
context.Background(),
t.PublisherID(),
t.PublisherIdentity(),
t.ToProto(),
uint32(track.SSRC()),
)
})
wr.OnStatsUpdate(func(_ *sfu.WebRTCReceiver, stat *livekit.AnalyticsStat) {
t.params.Telemetry.TrackStats(livekit.StreamType_UPSTREAM, t.PublisherID(), t.ID(), stat)
})
t.params.Telemetry.TrackPublished(context.Background(), t.PublisherID(), t.ToProto())
t.params.Telemetry.TrackPublished(
context.Background(),
t.PublisherID(),
t.PublisherIdentity(),
t.ToProto(),
)
t.buffer = buff
+2
View File
@@ -1,6 +1,7 @@
package rtc
import (
"context"
"math"
"sort"
"sync"
@@ -214,6 +215,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
// start the workers once connectivity is established
p.Start()
r.telemetry.ParticipantActive(context.Background(), r.Room, p.ToProto(), &livekit.AnalyticsClientMeta{ClientConnectTime: uint32(time.Since(p.ConnectedAt()).Milliseconds())})
} else if state == livekit.ParticipantInfo_DISCONNECTED {
// remove participant from room
go r.RemoveParticipant(p.Identity())
+3 -1
View File
@@ -5,7 +5,9 @@ import (
"testing"
"time"
"github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/webhook"
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/config"
@@ -564,7 +566,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room {
UpdateInterval: audioUpdateInterval,
SmoothIntervals: opts.audioSmoothIntervals,
},
telemetry.NewTelemetryService(nil, nil),
telemetry.NewTelemetryService(webhook.NewNotifier("", "", nil), &telemetryfakes.FakeAnalyticsService{}),
)
for i := 0; i < opts.num+opts.numHidden; i++ {
identity := livekit.ParticipantIdentity(fmt.Sprintf("p%d", i))
+13 -10
View File
@@ -63,11 +63,12 @@ type Stats struct {
// StatsWorker handles participant stats
type StatsWorker struct {
ctx context.Context
t TelemetryReporter
roomID livekit.RoomID
roomName livekit.RoomName
participantID livekit.ParticipantID
ctx context.Context
t TelemetryReporter
roomID livekit.RoomID
roomName livekit.RoomName
participantID livekit.ParticipantID
participantIdentity livekit.ParticipantIdentity
outgoingPerTrack map[livekit.TrackID]Stats
incomingPerTrack map[livekit.TrackID]Stats
@@ -82,13 +83,15 @@ func newStatsWorker(
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
identity livekit.ParticipantIdentity,
) *StatsWorker {
s := &StatsWorker{
ctx: ctx,
t: t,
roomID: roomID,
roomName: roomName,
participantID: participantID,
ctx: ctx,
t: t,
roomID: roomID,
roomName: roomName,
participantID: participantID,
participantIdentity: identity,
outgoingPerTrack: make(map[livekit.TrackID]Stats),
incomingPerTrack: make(map[livekit.TrackID]Stats),
@@ -22,12 +22,13 @@ type FakeTelemetryService struct {
arg1 context.Context
arg2 *livekit.EgressInfo
}
ParticipantActiveStub func(context.Context, livekit.ParticipantID, *livekit.AnalyticsClientMeta)
ParticipantActiveStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta)
participantActiveMutex sync.RWMutex
participantActiveArgsForCall []struct {
arg1 context.Context
arg2 livekit.ParticipantID
arg3 *livekit.AnalyticsClientMeta
arg2 *livekit.Room
arg3 *livekit.ParticipantInfo
arg4 *livekit.AnalyticsClientMeta
}
ParticipantJoinedStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta)
participantJoinedMutex sync.RWMutex
@@ -77,12 +78,13 @@ type FakeTelemetryService struct {
arg3 *livekit.TrackInfo
arg4 livekit.VideoQuality
}
TrackPublishedStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo)
TrackPublishedStub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo)
trackPublishedMutex sync.RWMutex
trackPublishedArgsForCall []struct {
arg1 context.Context
arg2 livekit.ParticipantID
arg3 *livekit.TrackInfo
arg3 livekit.ParticipantIdentity
arg4 *livekit.TrackInfo
}
TrackPublishedUpdateStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo)
trackPublishedUpdateMutex sync.RWMutex
@@ -107,13 +109,14 @@ type FakeTelemetryService struct {
arg3 *livekit.TrackInfo
arg4 *livekit.ParticipantInfo
}
TrackUnpublishedStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo, uint32)
TrackUnpublishedStub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, uint32)
trackUnpublishedMutex sync.RWMutex
trackUnpublishedArgsForCall []struct {
arg1 context.Context
arg2 livekit.ParticipantID
arg3 *livekit.TrackInfo
arg4 uint32
arg3 livekit.ParticipantIdentity
arg4 *livekit.TrackInfo
arg5 uint32
}
TrackUnsubscribedStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo)
trackUnsubscribedMutex sync.RWMutex
@@ -192,18 +195,19 @@ func (fake *FakeTelemetryService) EgressStartedArgsForCall(i int) (context.Conte
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeTelemetryService) ParticipantActive(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.AnalyticsClientMeta) {
func (fake *FakeTelemetryService) ParticipantActive(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.AnalyticsClientMeta) {
fake.participantActiveMutex.Lock()
fake.participantActiveArgsForCall = append(fake.participantActiveArgsForCall, struct {
arg1 context.Context
arg2 livekit.ParticipantID
arg3 *livekit.AnalyticsClientMeta
}{arg1, arg2, arg3})
arg2 *livekit.Room
arg3 *livekit.ParticipantInfo
arg4 *livekit.AnalyticsClientMeta
}{arg1, arg2, arg3, arg4})
stub := fake.ParticipantActiveStub
fake.recordInvocation("ParticipantActive", []interface{}{arg1, arg2, arg3})
fake.recordInvocation("ParticipantActive", []interface{}{arg1, arg2, arg3, arg4})
fake.participantActiveMutex.Unlock()
if stub != nil {
fake.ParticipantActiveStub(arg1, arg2, arg3)
fake.ParticipantActiveStub(arg1, arg2, arg3, arg4)
}
}
@@ -213,17 +217,17 @@ func (fake *FakeTelemetryService) ParticipantActiveCallCount() int {
return len(fake.participantActiveArgsForCall)
}
func (fake *FakeTelemetryService) ParticipantActiveCalls(stub func(context.Context, livekit.ParticipantID, *livekit.AnalyticsClientMeta)) {
func (fake *FakeTelemetryService) ParticipantActiveCalls(stub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta)) {
fake.participantActiveMutex.Lock()
defer fake.participantActiveMutex.Unlock()
fake.ParticipantActiveStub = stub
}
func (fake *FakeTelemetryService) ParticipantActiveArgsForCall(i int) (context.Context, livekit.ParticipantID, *livekit.AnalyticsClientMeta) {
func (fake *FakeTelemetryService) ParticipantActiveArgsForCall(i int) (context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta) {
fake.participantActiveMutex.RLock()
defer fake.participantActiveMutex.RUnlock()
argsForCall := fake.participantActiveArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
}
func (fake *FakeTelemetryService) ParticipantJoined(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.ClientInfo, arg5 *livekit.AnalyticsClientMeta) {
@@ -463,18 +467,19 @@ func (fake *FakeTelemetryService) TrackMaxSubscribedVideoQualityArgsForCall(i in
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
}
func (fake *FakeTelemetryService) TrackPublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo) {
func (fake *FakeTelemetryService) TrackPublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 livekit.ParticipantIdentity, arg4 *livekit.TrackInfo) {
fake.trackPublishedMutex.Lock()
fake.trackPublishedArgsForCall = append(fake.trackPublishedArgsForCall, struct {
arg1 context.Context
arg2 livekit.ParticipantID
arg3 *livekit.TrackInfo
}{arg1, arg2, arg3})
arg3 livekit.ParticipantIdentity
arg4 *livekit.TrackInfo
}{arg1, arg2, arg3, arg4})
stub := fake.TrackPublishedStub
fake.recordInvocation("TrackPublished", []interface{}{arg1, arg2, arg3})
fake.recordInvocation("TrackPublished", []interface{}{arg1, arg2, arg3, arg4})
fake.trackPublishedMutex.Unlock()
if stub != nil {
fake.TrackPublishedStub(arg1, arg2, arg3)
fake.TrackPublishedStub(arg1, arg2, arg3, arg4)
}
}
@@ -484,17 +489,17 @@ func (fake *FakeTelemetryService) TrackPublishedCallCount() int {
return len(fake.trackPublishedArgsForCall)
}
func (fake *FakeTelemetryService) TrackPublishedCalls(stub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo)) {
func (fake *FakeTelemetryService) TrackPublishedCalls(stub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo)) {
fake.trackPublishedMutex.Lock()
defer fake.trackPublishedMutex.Unlock()
fake.TrackPublishedStub = stub
}
func (fake *FakeTelemetryService) TrackPublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, *livekit.TrackInfo) {
func (fake *FakeTelemetryService) TrackPublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo) {
fake.trackPublishedMutex.RLock()
defer fake.trackPublishedMutex.RUnlock()
argsForCall := fake.trackPublishedArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
}
func (fake *FakeTelemetryService) TrackPublishedUpdate(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo) {
@@ -601,19 +606,20 @@ func (fake *FakeTelemetryService) TrackSubscribedArgsForCall(i int) (context.Con
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
}
func (fake *FakeTelemetryService) TrackUnpublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo, arg4 uint32) {
func (fake *FakeTelemetryService) TrackUnpublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 livekit.ParticipantIdentity, arg4 *livekit.TrackInfo, arg5 uint32) {
fake.trackUnpublishedMutex.Lock()
fake.trackUnpublishedArgsForCall = append(fake.trackUnpublishedArgsForCall, struct {
arg1 context.Context
arg2 livekit.ParticipantID
arg3 *livekit.TrackInfo
arg4 uint32
}{arg1, arg2, arg3, arg4})
arg3 livekit.ParticipantIdentity
arg4 *livekit.TrackInfo
arg5 uint32
}{arg1, arg2, arg3, arg4, arg5})
stub := fake.TrackUnpublishedStub
fake.recordInvocation("TrackUnpublished", []interface{}{arg1, arg2, arg3, arg4})
fake.recordInvocation("TrackUnpublished", []interface{}{arg1, arg2, arg3, arg4, arg5})
fake.trackUnpublishedMutex.Unlock()
if stub != nil {
fake.TrackUnpublishedStub(arg1, arg2, arg3, arg4)
fake.TrackUnpublishedStub(arg1, arg2, arg3, arg4, arg5)
}
}
@@ -623,17 +629,17 @@ func (fake *FakeTelemetryService) TrackUnpublishedCallCount() int {
return len(fake.trackUnpublishedArgsForCall)
}
func (fake *FakeTelemetryService) TrackUnpublishedCalls(stub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo, uint32)) {
func (fake *FakeTelemetryService) TrackUnpublishedCalls(stub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, uint32)) {
fake.trackUnpublishedMutex.Lock()
defer fake.trackUnpublishedMutex.Unlock()
fake.TrackUnpublishedStub = stub
}
func (fake *FakeTelemetryService) TrackUnpublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, *livekit.TrackInfo, uint32) {
func (fake *FakeTelemetryService) TrackUnpublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, uint32) {
fake.trackUnpublishedMutex.RLock()
defer fake.trackUnpublishedMutex.RUnlock()
argsForCall := fake.trackUnpublishedArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5
}
func (fake *FakeTelemetryService) TrackUnsubscribed(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo) {
+56 -46
View File
@@ -5,6 +5,7 @@ import (
"time"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/webhook"
)
@@ -19,16 +20,16 @@ type TelemetryService interface {
RoomStarted(ctx context.Context, room *livekit.Room)
RoomEnded(ctx context.Context, room *livekit.Room)
ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta)
ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta)
ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo)
TrackPublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo)
TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32)
TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo)
TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32)
TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo)
TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo)
TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo)
TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, maxQuality livekit.VideoQuality)
RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo)
RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo)
ParticipantActive(ctx context.Context, participantID livekit.ParticipantID, clientMeta *livekit.AnalyticsClientMeta)
EgressStarted(ctx context.Context, info *livekit.EgressInfo)
EgressEnded(ctx context.Context, info *livekit.EgressInfo)
}
@@ -40,7 +41,8 @@ type telemetryService struct {
jobQueue chan doWorkFunc
}
const jobQueueBufferSize = 100
// queue should be sufficiently large to avoid blocking
const jobQueueBufferSize = 10000
func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService {
t := &telemetryService{
@@ -54,7 +56,6 @@ func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService)
}
func (t *telemetryService) run() {
ticker := time.NewTicker(updateFrequency)
for {
select {
@@ -68,99 +69,108 @@ func (t *telemetryService) run() {
}
}
func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stats *livekit.AnalyticsStat) {
t.jobQueue <- func() {
t.internalService.TrackStats(streamType, participantID, trackID, stats)
func (t *telemetryService) enqueue(f func()) {
select {
case t.jobQueue <- f:
return
default:
logger.Warnw("telemetry queue full, dropping message", nil)
}
}
func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stats *livekit.AnalyticsStat) {
t.enqueue(func() {
t.internalService.TrackStats(streamType, participantID, trackID, stats)
})
}
func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
t.jobQueue <- func() {
t.enqueue(func() {
t.internalService.RoomStarted(ctx, room)
}
})
}
func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
t.jobQueue <- func() {
t.enqueue(func() {
t.internalService.RoomEnded(ctx, room)
}
})
}
func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo,
clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta) {
t.jobQueue <- func() {
t.enqueue(func() {
t.internalService.ParticipantJoined(ctx, room, participant, clientInfo, clientMeta)
}
})
}
func (t *telemetryService) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) {
t.enqueue(func() {
t.internalService.ParticipantActive(ctx, room, participant, clientMeta)
})
}
func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
t.jobQueue <- func() {
t.enqueue(func() {
t.internalService.ParticipantLeft(ctx, room, participant)
}
})
}
func (t *telemetryService) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.jobQueue <- func() {
t.internalService.TrackPublished(ctx, participantID, track)
}
func (t *telemetryService) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) {
t.enqueue(func() {
t.internalService.TrackPublished(ctx, participantID, identity, track)
})
}
func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) {
t.jobQueue <- func() {
t.internalService.TrackUnpublished(ctx, participantID, track, ssrc)
}
func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32) {
t.enqueue(func() {
t.internalService.TrackUnpublished(ctx, participantID, identity, track, ssrc)
})
}
func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo) {
t.jobQueue <- func() {
t.enqueue(func() {
t.internalService.TrackSubscribed(ctx, participantID, track, publisher)
}
})
}
func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.jobQueue <- func() {
t.enqueue(func() {
t.internalService.TrackUnsubscribed(ctx, participantID, track)
}
})
}
func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) {
t.jobQueue <- func() {
t.enqueue(func() {
t.internalService.RecordingStarted(ctx, ri)
}
})
}
func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) {
t.jobQueue <- func() {
t.enqueue(func() {
t.internalService.RecordingEnded(ctx, ri)
}
})
}
func (t *telemetryService) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.jobQueue <- func() {
t.enqueue(func() {
t.internalService.TrackPublishedUpdate(ctx, participantID, track)
}
}
func (t *telemetryService) ParticipantActive(ctx context.Context, participantID livekit.ParticipantID, clientMeta *livekit.AnalyticsClientMeta) {
t.jobQueue <- func() {
t.internalService.ParticipantActive(ctx, participantID, clientMeta)
}
})
}
func (t *telemetryService) TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, maxQuality livekit.VideoQuality) {
t.jobQueue <- func() {
t.enqueue(func() {
t.internalService.TrackMaxSubscribedVideoQuality(ctx, participantID, track, maxQuality)
}
})
}
func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) {
t.jobQueue <- func() {
t.enqueue(func() {
t.internalService.EgressStarted(ctx, info)
}
})
}
func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) {
t.jobQueue <- func() {
t.enqueue(func() {
t.internalService.EgressEnded(ctx, info)
}
})
}
+3 -2
View File
@@ -9,6 +9,8 @@ import (
"github.com/livekit/protocol/webhook"
)
const maxWebhookWorkers = 50
type TelemetryServiceInternal interface {
TelemetryService
SendAnalytics()
@@ -31,14 +33,13 @@ type telemetryServiceInternal struct {
func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsService) TelemetryServiceInternal {
return &telemetryServiceInternal{
notifier: notifier,
webhookPool: workerpool.New(1),
webhookPool: workerpool.New(maxWebhookWorkers),
workers: make(map[livekit.ParticipantID]*StatsWorker),
analytics: analytics,
}
}
func (t *telemetryServiceInternal) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat) {
direction := prometheus.Incoming
if streamType == livekit.StreamType_DOWNSTREAM {
direction = prometheus.Outgoing
+54 -24
View File
@@ -46,16 +46,17 @@ func (t *telemetryServiceInternal) RoomEnded(ctx context.Context, room *livekit.
func (t *telemetryServiceInternal) ParticipantJoined(ctx context.Context, room *livekit.Room,
participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta) {
t.workers[livekit.ParticipantID(participant.Sid)] = newStatsWorker(ctx, t, livekit.RoomID(room.Sid), livekit.RoomName(room.Name), livekit.ParticipantID(participant.Sid))
t.workers[livekit.ParticipantID(participant.Sid)] = newStatsWorker(
ctx,
t,
livekit.RoomID(room.Sid),
livekit.RoomName(room.Name),
livekit.ParticipantID(participant.Sid),
livekit.ParticipantIdentity(participant.Identity),
)
prometheus.AddParticipant()
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventParticipantJoined,
Room: room,
Participant: participant,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED,
Timestamp: timestamppb.Now(),
@@ -68,6 +69,24 @@ func (t *telemetryServiceInternal) ParticipantJoined(ctx context.Context, room *
})
}
func (t *telemetryServiceInternal) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) {
// consider participant joined only when they became active
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventParticipantJoined,
Room: room,
Participant: participant,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_PARTICIPANT_ACTIVE,
Timestamp: timestamppb.Now(),
RoomId: room.Sid,
ParticipantId: participant.Sid,
Room: room,
ClientMeta: clientMeta,
})
}
func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
if w := t.workers[livekit.ParticipantID(participant.Sid)]; w != nil {
w.Close()
@@ -92,10 +111,23 @@ func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *li
})
}
func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) {
prometheus.AddPublishedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventTrackPublished,
Room: &livekit.Room{
Sid: string(roomID),
Name: string(roomName),
},
Participant: &livekit.ParticipantInfo{
Sid: string(participantID),
Identity: string(identity),
},
Track: track,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_PUBLISHED,
Timestamp: timestamppb.Now(),
@@ -107,8 +139,6 @@ func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participa
}
func (t *telemetryServiceInternal) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
prometheus.AddPublishedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_PUBLISHED_UPDATE,
@@ -135,7 +165,7 @@ func (t *telemetryServiceInternal) TrackMaxSubscribedVideoQuality(ctx context.Co
})
}
func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) {
func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32) {
roomID := livekit.RoomID("")
roomName := livekit.RoomName("")
w := t.workers[participantID]
@@ -147,6 +177,19 @@ func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, partici
prometheus.SubPublishedTrack(track.Type.String())
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventTrackUnpublished,
Room: &livekit.Room{
Sid: string(roomID),
Name: string(roomName),
},
Participant: &livekit.ParticipantInfo{
Sid: string(participantID),
Identity: string(identity),
},
Track: track,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_UNPUBLISHED,
Timestamp: timestamppb.Now(),
@@ -238,19 +281,6 @@ func (t *telemetryServiceInternal) notifyEvent(ctx context.Context, event *livek
})
}
func (t *telemetryServiceInternal) ParticipantActive(ctx context.Context, participantID livekit.ParticipantID, clientMeta *livekit.AnalyticsClientMeta) {
roomID, roomName := t.getRoomDetails(participantID)
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_PARTICIPANT_ACTIVE,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
Room: &livekit.Room{Name: string(roomName)},
ClientMeta: clientMeta,
})
}
func (t *telemetryServiceInternal) EgressStarted(ctx context.Context, info *livekit.EgressInfo) {
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventEgressStarted,
@@ -151,7 +151,8 @@ func Test_OnParticipantActive_EventIsSent(t *testing.T) {
clientMetaConnect := &livekit.AnalyticsClientMeta{
ClientConnectTime: 420,
}
fixture.sut.ParticipantActive(context.Background(), livekit.ParticipantID(partSID), clientMetaConnect)
fixture.sut.ParticipantActive(context.Background(), room, participantInfo, clientMetaConnect)
require.Equal(t, 2, fixture.analytics.SendEventCallCount())
_, eventActive := fixture.analytics.SendEventArgsForCall(1)
+3 -2
View File
@@ -359,7 +359,8 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
// prepare
room := &livekit.Room{}
partSID := livekit.ParticipantID("part1")
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
identity := livekit.ParticipantIdentity("part1Identity")
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID), Identity: string(identity)}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil)
// there should be bytes reported so that stats are sent
@@ -429,7 +430,7 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
require.True(t, found2)
// remove 1 track - track stats were flushed above, so no more calls to SendStats
fixture.sut.TrackUnpublished(context.Background(), partSID, &livekit.TrackInfo{Sid: string(trackID2)}, 0)
fixture.sut.TrackUnpublished(context.Background(), partSID, identity, &livekit.TrackInfo{Sid: string(trackID2)}, 0)
fixture.sut.SendAnalytics()
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
}
+14
View File
@@ -65,6 +65,20 @@ func TestWebhooks(t *testing.T) {
require.Equal(t, "c2", joined.Participant.Identity)
ts.ClearEvents()
// track published
writers := publishTracksForClients(t, c1)
defer stopWriters(writers...)
testutils.WithTimeout(t, func() string {
ev := ts.GetEvent(webhook.EventTrackPublished)
if ev == nil {
return "did not receive TrackPublished"
}
require.NotNil(t, ev.Track, "TrackPublished did not include trackInfo")
require.Equal(t, string(c1.ID()), ev.Participant.Sid)
return ""
})
ts.ClearEvents()
// first participant leaves
c1.Stop()
testutils.WithTimeout(t, func() string {