diff --git a/go.mod b/go.mod index 456926050..7bd70560e 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v0.11.14-0.20220316214650-51f5188bc422 + github.com/livekit/protocol v0.11.14 github.com/mackerelio/go-osstat v0.2.1 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 diff --git a/go.sum b/go.sum index acb92a0e2..93db648f3 100644 --- a/go.sum +++ b/go.sum @@ -132,8 +132,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.11.14-0.20220316214650-51f5188bc422 h1:PQ+YgSJxL/UmXi46ThYSVbAl7BWyTAYEy9sCmnyokPg= -github.com/livekit/protocol v0.11.14-0.20220316214650-51f5188bc422/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90= +github.com/livekit/protocol v0.11.14 h1:KmFPWNMtrKMhwhdPZHMQ9Dj2DFH4XLzdvv1gTJlJJKM= +github.com/livekit/protocol v0.11.14/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90= github.com/mackerelio/go-osstat v0.2.1 h1:5AeAcBEutEErAOlDz6WCkEvm6AKYgHTUQrfwm5RbeQc= github.com/mackerelio/go-osstat v0.2.1/go.mod h1:UzRL8dMCCTqG5WdRtsxbuljMpZt9PCAGXqxPst5QtaY= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= diff --git a/pkg/rtc/helper_test.go b/pkg/rtc/helper_test.go index 78b2d761b..d1a07ba61 100644 --- a/pkg/rtc/helper_test.go +++ b/pkg/rtc/helper_test.go @@ -10,7 +10,8 @@ import ( func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.ProtocolVersion, hidden bool) *typesfakes.FakeLocalParticipant { p := &typesfakes.FakeLocalParticipant{} - p.IDReturns(livekit.ParticipantID(utils.NewGuid(utils.ParticipantPrefix))) + sid := utils.NewGuid(utils.ParticipantPrefix) + p.IDReturns(livekit.ParticipantID(sid)) p.IdentityReturns(identity) p.StateReturns(livekit.ParticipantInfo_JOINED) p.ProtocolVersionReturns(protocol) @@ -18,6 +19,11 @@ func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.Pro p.CanPublishReturns(!hidden) p.CanPublishDataReturns(!hidden) p.HiddenReturns(hidden) + p.ToProtoReturns(&livekit.ParticipantInfo{ + Sid: sid, + Identity: string(identity), + State: livekit.ParticipantInfo_JOINED, + }) p.SetMetadataStub = func(m string) { var f func(participant types.LocalParticipant) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 73f644201..60e1be0b0 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -171,12 +171,23 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra t.RemoveAllSubscribers() t.MediaTrackReceiver.Close() t.MediaTrackReceiver.ClearReceiver() - t.params.Telemetry.TrackUnpublished(context.Background(), t.PublisherID(), t.ToProto(), uint32(track.SSRC())) + t.params.Telemetry.TrackUnpublished( + context.Background(), + t.PublisherID(), + t.PublisherIdentity(), + t.ToProto(), + uint32(track.SSRC()), + ) }) wr.OnStatsUpdate(func(_ *sfu.WebRTCReceiver, stat *livekit.AnalyticsStat) { t.params.Telemetry.TrackStats(livekit.StreamType_UPSTREAM, t.PublisherID(), t.ID(), stat) }) - t.params.Telemetry.TrackPublished(context.Background(), t.PublisherID(), t.ToProto()) + t.params.Telemetry.TrackPublished( + context.Background(), + t.PublisherID(), + t.PublisherIdentity(), + t.ToProto(), + ) t.buffer = buff diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index c74c8cc0c..bfdd876a1 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -1,6 +1,7 @@ package rtc import ( + "context" "math" "sort" "sync" @@ -214,6 +215,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions // start the workers once connectivity is established p.Start() + r.telemetry.ParticipantActive(context.Background(), r.Room, p.ToProto(), &livekit.AnalyticsClientMeta{ClientConnectTime: uint32(time.Since(p.ConnectedAt()).Milliseconds())}) } else if state == livekit.ParticipantInfo_DISCONNECTED { // remove participant from room go r.RemoveParticipant(p.Identity()) diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index babf748e6..3ab60b931 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -5,7 +5,9 @@ import ( "testing" "time" + "github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/webhook" "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/config" @@ -564,7 +566,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room { UpdateInterval: audioUpdateInterval, SmoothIntervals: opts.audioSmoothIntervals, }, - telemetry.NewTelemetryService(nil, nil), + telemetry.NewTelemetryService(webhook.NewNotifier("", "", nil), &telemetryfakes.FakeAnalyticsService{}), ) for i := 0; i < opts.num+opts.numHidden; i++ { identity := livekit.ParticipantIdentity(fmt.Sprintf("p%d", i)) diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index bfb60ca36..035cbaf76 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -63,11 +63,12 @@ type Stats struct { // StatsWorker handles participant stats type StatsWorker struct { - ctx context.Context - t TelemetryReporter - roomID livekit.RoomID - roomName livekit.RoomName - participantID livekit.ParticipantID + ctx context.Context + t TelemetryReporter + roomID livekit.RoomID + roomName livekit.RoomName + participantID livekit.ParticipantID + participantIdentity livekit.ParticipantIdentity outgoingPerTrack map[livekit.TrackID]Stats incomingPerTrack map[livekit.TrackID]Stats @@ -82,13 +83,15 @@ func newStatsWorker( roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, + identity livekit.ParticipantIdentity, ) *StatsWorker { s := &StatsWorker{ - ctx: ctx, - t: t, - roomID: roomID, - roomName: roomName, - participantID: participantID, + ctx: ctx, + t: t, + roomID: roomID, + roomName: roomName, + participantID: participantID, + participantIdentity: identity, outgoingPerTrack: make(map[livekit.TrackID]Stats), incomingPerTrack: make(map[livekit.TrackID]Stats), diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index 5956ca6f9..ed354f374 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -22,12 +22,13 @@ type FakeTelemetryService struct { arg1 context.Context arg2 *livekit.EgressInfo } - ParticipantActiveStub func(context.Context, livekit.ParticipantID, *livekit.AnalyticsClientMeta) + ParticipantActiveStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta) participantActiveMutex sync.RWMutex participantActiveArgsForCall []struct { arg1 context.Context - arg2 livekit.ParticipantID - arg3 *livekit.AnalyticsClientMeta + arg2 *livekit.Room + arg3 *livekit.ParticipantInfo + arg4 *livekit.AnalyticsClientMeta } ParticipantJoinedStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta) participantJoinedMutex sync.RWMutex @@ -77,12 +78,13 @@ type FakeTelemetryService struct { arg3 *livekit.TrackInfo arg4 livekit.VideoQuality } - TrackPublishedStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo) + TrackPublishedStub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo) trackPublishedMutex sync.RWMutex trackPublishedArgsForCall []struct { arg1 context.Context arg2 livekit.ParticipantID - arg3 *livekit.TrackInfo + arg3 livekit.ParticipantIdentity + arg4 *livekit.TrackInfo } TrackPublishedUpdateStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo) trackPublishedUpdateMutex sync.RWMutex @@ -107,13 +109,14 @@ type FakeTelemetryService struct { arg3 *livekit.TrackInfo arg4 *livekit.ParticipantInfo } - TrackUnpublishedStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo, uint32) + TrackUnpublishedStub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, uint32) trackUnpublishedMutex sync.RWMutex trackUnpublishedArgsForCall []struct { arg1 context.Context arg2 livekit.ParticipantID - arg3 *livekit.TrackInfo - arg4 uint32 + arg3 livekit.ParticipantIdentity + arg4 *livekit.TrackInfo + arg5 uint32 } TrackUnsubscribedStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo) trackUnsubscribedMutex sync.RWMutex @@ -192,18 +195,19 @@ func (fake *FakeTelemetryService) EgressStartedArgsForCall(i int) (context.Conte return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeTelemetryService) ParticipantActive(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.AnalyticsClientMeta) { +func (fake *FakeTelemetryService) ParticipantActive(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.AnalyticsClientMeta) { fake.participantActiveMutex.Lock() fake.participantActiveArgsForCall = append(fake.participantActiveArgsForCall, struct { arg1 context.Context - arg2 livekit.ParticipantID - arg3 *livekit.AnalyticsClientMeta - }{arg1, arg2, arg3}) + arg2 *livekit.Room + arg3 *livekit.ParticipantInfo + arg4 *livekit.AnalyticsClientMeta + }{arg1, arg2, arg3, arg4}) stub := fake.ParticipantActiveStub - fake.recordInvocation("ParticipantActive", []interface{}{arg1, arg2, arg3}) + fake.recordInvocation("ParticipantActive", []interface{}{arg1, arg2, arg3, arg4}) fake.participantActiveMutex.Unlock() if stub != nil { - fake.ParticipantActiveStub(arg1, arg2, arg3) + fake.ParticipantActiveStub(arg1, arg2, arg3, arg4) } } @@ -213,17 +217,17 @@ func (fake *FakeTelemetryService) ParticipantActiveCallCount() int { return len(fake.participantActiveArgsForCall) } -func (fake *FakeTelemetryService) ParticipantActiveCalls(stub func(context.Context, livekit.ParticipantID, *livekit.AnalyticsClientMeta)) { +func (fake *FakeTelemetryService) ParticipantActiveCalls(stub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta)) { fake.participantActiveMutex.Lock() defer fake.participantActiveMutex.Unlock() fake.ParticipantActiveStub = stub } -func (fake *FakeTelemetryService) ParticipantActiveArgsForCall(i int) (context.Context, livekit.ParticipantID, *livekit.AnalyticsClientMeta) { +func (fake *FakeTelemetryService) ParticipantActiveArgsForCall(i int) (context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta) { fake.participantActiveMutex.RLock() defer fake.participantActiveMutex.RUnlock() argsForCall := fake.participantActiveArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 } func (fake *FakeTelemetryService) ParticipantJoined(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.ClientInfo, arg5 *livekit.AnalyticsClientMeta) { @@ -463,18 +467,19 @@ func (fake *FakeTelemetryService) TrackMaxSubscribedVideoQualityArgsForCall(i in return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 } -func (fake *FakeTelemetryService) TrackPublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo) { +func (fake *FakeTelemetryService) TrackPublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 livekit.ParticipantIdentity, arg4 *livekit.TrackInfo) { fake.trackPublishedMutex.Lock() fake.trackPublishedArgsForCall = append(fake.trackPublishedArgsForCall, struct { arg1 context.Context arg2 livekit.ParticipantID - arg3 *livekit.TrackInfo - }{arg1, arg2, arg3}) + arg3 livekit.ParticipantIdentity + arg4 *livekit.TrackInfo + }{arg1, arg2, arg3, arg4}) stub := fake.TrackPublishedStub - fake.recordInvocation("TrackPublished", []interface{}{arg1, arg2, arg3}) + fake.recordInvocation("TrackPublished", []interface{}{arg1, arg2, arg3, arg4}) fake.trackPublishedMutex.Unlock() if stub != nil { - fake.TrackPublishedStub(arg1, arg2, arg3) + fake.TrackPublishedStub(arg1, arg2, arg3, arg4) } } @@ -484,17 +489,17 @@ func (fake *FakeTelemetryService) TrackPublishedCallCount() int { return len(fake.trackPublishedArgsForCall) } -func (fake *FakeTelemetryService) TrackPublishedCalls(stub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo)) { +func (fake *FakeTelemetryService) TrackPublishedCalls(stub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo)) { fake.trackPublishedMutex.Lock() defer fake.trackPublishedMutex.Unlock() fake.TrackPublishedStub = stub } -func (fake *FakeTelemetryService) TrackPublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, *livekit.TrackInfo) { +func (fake *FakeTelemetryService) TrackPublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo) { fake.trackPublishedMutex.RLock() defer fake.trackPublishedMutex.RUnlock() argsForCall := fake.trackPublishedArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 } func (fake *FakeTelemetryService) TrackPublishedUpdate(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo) { @@ -601,19 +606,20 @@ func (fake *FakeTelemetryService) TrackSubscribedArgsForCall(i int) (context.Con return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 } -func (fake *FakeTelemetryService) TrackUnpublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo, arg4 uint32) { +func (fake *FakeTelemetryService) TrackUnpublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 livekit.ParticipantIdentity, arg4 *livekit.TrackInfo, arg5 uint32) { fake.trackUnpublishedMutex.Lock() fake.trackUnpublishedArgsForCall = append(fake.trackUnpublishedArgsForCall, struct { arg1 context.Context arg2 livekit.ParticipantID - arg3 *livekit.TrackInfo - arg4 uint32 - }{arg1, arg2, arg3, arg4}) + arg3 livekit.ParticipantIdentity + arg4 *livekit.TrackInfo + arg5 uint32 + }{arg1, arg2, arg3, arg4, arg5}) stub := fake.TrackUnpublishedStub - fake.recordInvocation("TrackUnpublished", []interface{}{arg1, arg2, arg3, arg4}) + fake.recordInvocation("TrackUnpublished", []interface{}{arg1, arg2, arg3, arg4, arg5}) fake.trackUnpublishedMutex.Unlock() if stub != nil { - fake.TrackUnpublishedStub(arg1, arg2, arg3, arg4) + fake.TrackUnpublishedStub(arg1, arg2, arg3, arg4, arg5) } } @@ -623,17 +629,17 @@ func (fake *FakeTelemetryService) TrackUnpublishedCallCount() int { return len(fake.trackUnpublishedArgsForCall) } -func (fake *FakeTelemetryService) TrackUnpublishedCalls(stub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo, uint32)) { +func (fake *FakeTelemetryService) TrackUnpublishedCalls(stub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, uint32)) { fake.trackUnpublishedMutex.Lock() defer fake.trackUnpublishedMutex.Unlock() fake.TrackUnpublishedStub = stub } -func (fake *FakeTelemetryService) TrackUnpublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, *livekit.TrackInfo, uint32) { +func (fake *FakeTelemetryService) TrackUnpublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, uint32) { fake.trackUnpublishedMutex.RLock() defer fake.trackUnpublishedMutex.RUnlock() argsForCall := fake.trackUnpublishedArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 } func (fake *FakeTelemetryService) TrackUnsubscribed(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo) { diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index fb0cdcee5..b5e9bdd8f 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -5,6 +5,7 @@ import ( "time" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/webhook" ) @@ -19,16 +20,16 @@ type TelemetryService interface { RoomStarted(ctx context.Context, room *livekit.Room) RoomEnded(ctx context.Context, room *livekit.Room) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta) + ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) - TrackPublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) - TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) + TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) + TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, maxQuality livekit.VideoQuality) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) - ParticipantActive(ctx context.Context, participantID livekit.ParticipantID, clientMeta *livekit.AnalyticsClientMeta) EgressStarted(ctx context.Context, info *livekit.EgressInfo) EgressEnded(ctx context.Context, info *livekit.EgressInfo) } @@ -40,7 +41,8 @@ type telemetryService struct { jobQueue chan doWorkFunc } -const jobQueueBufferSize = 100 +// queue should be sufficiently large to avoid blocking +const jobQueueBufferSize = 10000 func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService { t := &telemetryService{ @@ -54,7 +56,6 @@ func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) } func (t *telemetryService) run() { - ticker := time.NewTicker(updateFrequency) for { select { @@ -68,99 +69,108 @@ func (t *telemetryService) run() { } } -func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stats *livekit.AnalyticsStat) { - t.jobQueue <- func() { - t.internalService.TrackStats(streamType, participantID, trackID, stats) +func (t *telemetryService) enqueue(f func()) { + select { + case t.jobQueue <- f: + return + default: + logger.Warnw("telemetry queue full, dropping message", nil) } } +func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stats *livekit.AnalyticsStat) { + t.enqueue(func() { + t.internalService.TrackStats(streamType, participantID, trackID, stats) + }) +} + func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) { - t.jobQueue <- func() { + t.enqueue(func() { t.internalService.RoomStarted(ctx, room) - } + }) } func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { - t.jobQueue <- func() { + t.enqueue(func() { t.internalService.RoomEnded(ctx, room) - } + }) } func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta) { - t.jobQueue <- func() { + t.enqueue(func() { t.internalService.ParticipantJoined(ctx, room, participant, clientInfo, clientMeta) - } + }) +} + +func (t *telemetryService) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) { + t.enqueue(func() { + t.internalService.ParticipantActive(ctx, room, participant, clientMeta) + }) } func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { - t.jobQueue <- func() { + t.enqueue(func() { t.internalService.ParticipantLeft(ctx, room, participant) - } + }) } -func (t *telemetryService) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { - t.jobQueue <- func() { - t.internalService.TrackPublished(ctx, participantID, track) - } +func (t *telemetryService) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) { + t.enqueue(func() { + t.internalService.TrackPublished(ctx, participantID, identity, track) + }) } -func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) { - t.jobQueue <- func() { - t.internalService.TrackUnpublished(ctx, participantID, track, ssrc) - } +func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32) { + t.enqueue(func() { + t.internalService.TrackUnpublished(ctx, participantID, identity, track, ssrc) + }) } func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo) { - t.jobQueue <- func() { + t.enqueue(func() { t.internalService.TrackSubscribed(ctx, participantID, track, publisher) - } + }) } func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { - t.jobQueue <- func() { + t.enqueue(func() { t.internalService.TrackUnsubscribed(ctx, participantID, track) - } + }) } func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) { - t.jobQueue <- func() { + t.enqueue(func() { t.internalService.RecordingStarted(ctx, ri) - } + }) } func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) { - t.jobQueue <- func() { + t.enqueue(func() { t.internalService.RecordingEnded(ctx, ri) - } + }) } func (t *telemetryService) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { - t.jobQueue <- func() { + t.enqueue(func() { t.internalService.TrackPublishedUpdate(ctx, participantID, track) - } -} - -func (t *telemetryService) ParticipantActive(ctx context.Context, participantID livekit.ParticipantID, clientMeta *livekit.AnalyticsClientMeta) { - t.jobQueue <- func() { - t.internalService.ParticipantActive(ctx, participantID, clientMeta) - } + }) } func (t *telemetryService) TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, maxQuality livekit.VideoQuality) { - t.jobQueue <- func() { + t.enqueue(func() { t.internalService.TrackMaxSubscribedVideoQuality(ctx, participantID, track, maxQuality) - } + }) } func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) { - t.jobQueue <- func() { + t.enqueue(func() { t.internalService.EgressStarted(ctx, info) - } + }) } func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) { - t.jobQueue <- func() { + t.enqueue(func() { t.internalService.EgressEnded(ctx, info) - } + }) } diff --git a/pkg/telemetry/telemetryserviceinternal.go b/pkg/telemetry/telemetryserviceinternal.go index fe8856c38..663ba4459 100644 --- a/pkg/telemetry/telemetryserviceinternal.go +++ b/pkg/telemetry/telemetryserviceinternal.go @@ -9,6 +9,8 @@ import ( "github.com/livekit/protocol/webhook" ) +const maxWebhookWorkers = 50 + type TelemetryServiceInternal interface { TelemetryService SendAnalytics() @@ -31,14 +33,13 @@ type telemetryServiceInternal struct { func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsService) TelemetryServiceInternal { return &telemetryServiceInternal{ notifier: notifier, - webhookPool: workerpool.New(1), + webhookPool: workerpool.New(maxWebhookWorkers), workers: make(map[livekit.ParticipantID]*StatsWorker), analytics: analytics, } } func (t *telemetryServiceInternal) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat) { - direction := prometheus.Incoming if streamType == livekit.StreamType_DOWNSTREAM { direction = prometheus.Outgoing diff --git a/pkg/telemetry/telemetryserviceinternalevents.go b/pkg/telemetry/telemetryserviceinternalevents.go index 35c7854f7..aa44bf250 100644 --- a/pkg/telemetry/telemetryserviceinternalevents.go +++ b/pkg/telemetry/telemetryserviceinternalevents.go @@ -46,16 +46,17 @@ func (t *telemetryServiceInternal) RoomEnded(ctx context.Context, room *livekit. func (t *telemetryServiceInternal) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta) { - t.workers[livekit.ParticipantID(participant.Sid)] = newStatsWorker(ctx, t, livekit.RoomID(room.Sid), livekit.RoomName(room.Name), livekit.ParticipantID(participant.Sid)) + t.workers[livekit.ParticipantID(participant.Sid)] = newStatsWorker( + ctx, + t, + livekit.RoomID(room.Sid), + livekit.RoomName(room.Name), + livekit.ParticipantID(participant.Sid), + livekit.ParticipantIdentity(participant.Identity), + ) prometheus.AddParticipant() - t.notifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventParticipantJoined, - Room: room, - Participant: participant, - }) - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED, Timestamp: timestamppb.Now(), @@ -68,6 +69,24 @@ func (t *telemetryServiceInternal) ParticipantJoined(ctx context.Context, room * }) } +func (t *telemetryServiceInternal) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) { + // consider participant joined only when they became active + t.notifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventParticipantJoined, + Room: room, + Participant: participant, + }) + + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_PARTICIPANT_ACTIVE, + Timestamp: timestamppb.Now(), + RoomId: room.Sid, + ParticipantId: participant.Sid, + Room: room, + ClientMeta: clientMeta, + }) +} + func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { if w := t.workers[livekit.ParticipantID(participant.Sid)]; w != nil { w.Close() @@ -92,10 +111,23 @@ func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *li }) } -func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { +func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) { prometheus.AddPublishedTrack(track.Type.String()) roomID, roomName := t.getRoomDetails(participantID) + t.notifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventTrackPublished, + Room: &livekit.Room{ + Sid: string(roomID), + Name: string(roomName), + }, + Participant: &livekit.ParticipantInfo{ + Sid: string(participantID), + Identity: string(identity), + }, + Track: track, + }) + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_TRACK_PUBLISHED, Timestamp: timestamppb.Now(), @@ -107,8 +139,6 @@ func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participa } func (t *telemetryServiceInternal) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { - prometheus.AddPublishedTrack(track.Type.String()) - roomID, roomName := t.getRoomDetails(participantID) t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_TRACK_PUBLISHED_UPDATE, @@ -135,7 +165,7 @@ func (t *telemetryServiceInternal) TrackMaxSubscribedVideoQuality(ctx context.Co }) } -func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) { +func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32) { roomID := livekit.RoomID("") roomName := livekit.RoomName("") w := t.workers[participantID] @@ -147,6 +177,19 @@ func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, partici prometheus.SubPublishedTrack(track.Type.String()) + t.notifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventTrackUnpublished, + Room: &livekit.Room{ + Sid: string(roomID), + Name: string(roomName), + }, + Participant: &livekit.ParticipantInfo{ + Sid: string(participantID), + Identity: string(identity), + }, + Track: track, + }) + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_TRACK_UNPUBLISHED, Timestamp: timestamppb.Now(), @@ -238,19 +281,6 @@ func (t *telemetryServiceInternal) notifyEvent(ctx context.Context, event *livek }) } -func (t *telemetryServiceInternal) ParticipantActive(ctx context.Context, participantID livekit.ParticipantID, clientMeta *livekit.AnalyticsClientMeta) { - roomID, roomName := t.getRoomDetails(participantID) - - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_PARTICIPANT_ACTIVE, - Timestamp: timestamppb.Now(), - RoomId: string(roomID), - ParticipantId: string(participantID), - Room: &livekit.Room{Name: string(roomName)}, - ClientMeta: clientMeta, - }) -} - func (t *telemetryServiceInternal) EgressStarted(ctx context.Context, info *livekit.EgressInfo) { t.notifyEvent(ctx, &livekit.WebhookEvent{ Event: webhook.EventEgressStarted, diff --git a/pkg/telemetry/test/telemetry_service_events_test.go b/pkg/telemetry/test/telemetry_service_events_test.go index 5851b1516..e8d0fed48 100644 --- a/pkg/telemetry/test/telemetry_service_events_test.go +++ b/pkg/telemetry/test/telemetry_service_events_test.go @@ -151,7 +151,8 @@ func Test_OnParticipantActive_EventIsSent(t *testing.T) { clientMetaConnect := &livekit.AnalyticsClientMeta{ ClientConnectTime: 420, } - fixture.sut.ParticipantActive(context.Background(), livekit.ParticipantID(partSID), clientMetaConnect) + + fixture.sut.ParticipantActive(context.Background(), room, participantInfo, clientMetaConnect) require.Equal(t, 2, fixture.analytics.SendEventCallCount()) _, eventActive := fixture.analytics.SendEventArgsForCall(1) diff --git a/pkg/telemetry/test/telemetry_service_test.go b/pkg/telemetry/test/telemetry_service_test.go index aa42e2b66..130c7d1f2 100644 --- a/pkg/telemetry/test/telemetry_service_test.go +++ b/pkg/telemetry/test/telemetry_service_test.go @@ -359,7 +359,8 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) { // prepare room := &livekit.Room{} partSID := livekit.ParticipantID("part1") - participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)} + identity := livekit.ParticipantIdentity("part1Identity") + participantInfo := &livekit.ParticipantInfo{Sid: string(partSID), Identity: string(identity)} fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil) // there should be bytes reported so that stats are sent @@ -429,7 +430,7 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) { require.True(t, found2) // remove 1 track - track stats were flushed above, so no more calls to SendStats - fixture.sut.TrackUnpublished(context.Background(), partSID, &livekit.TrackInfo{Sid: string(trackID2)}, 0) + fixture.sut.TrackUnpublished(context.Background(), partSID, identity, &livekit.TrackInfo{Sid: string(trackID2)}, 0) fixture.sut.SendAnalytics() require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) } diff --git a/test/webhook_test.go b/test/webhook_test.go index c1fa7a3ea..e0a69c020 100644 --- a/test/webhook_test.go +++ b/test/webhook_test.go @@ -65,6 +65,20 @@ func TestWebhooks(t *testing.T) { require.Equal(t, "c2", joined.Participant.Identity) ts.ClearEvents() + // track published + writers := publishTracksForClients(t, c1) + defer stopWriters(writers...) + testutils.WithTimeout(t, func() string { + ev := ts.GetEvent(webhook.EventTrackPublished) + if ev == nil { + return "did not receive TrackPublished" + } + require.NotNil(t, ev.Track, "TrackPublished did not include trackInfo") + require.Equal(t, string(c1.ID()), ev.Participant.Sid) + return "" + }) + ts.ClearEvents() + // first participant leaves c1.Stop() testutils.WithTimeout(t, func() string {