From 3e7fae96eae2cd5f291fc5c633cd6bb6f758ff8f Mon Sep 17 00:00:00 2001 From: shishirng Date: Tue, 22 Feb 2022 19:08:49 -0500 Subject: [PATCH] Add telemetry method to capture max video_quality (#457) * Add telemetry method to capture max video_quality Signed-off-by: shishir gowda * Telemetry fakes Signed-off-by: shishir gowda * Update go mod dep Signed-off-by: shishir gowda --- go.mod | 2 +- go.sum | 4 +- pkg/rtc/participant.go | 5 ++- .../telemetryfakes/fake_telemetry_service.go | 45 +++++++++++++++++++ pkg/telemetry/telemetryservice.go | 7 +++ .../telemetryserviceinternalevents.go | 15 +++++++ 6 files changed, 74 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 1c841aaa3..923ff06f2 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v0.11.14-0.20220221030410-28685be0639d + github.com/livekit/protocol v0.11.14-0.20220223000017-946db440fa31 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index 4a0d1c723..29d04e612 100644 --- a/go.sum +++ b/go.sum @@ -132,8 +132,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.11.14-0.20220221030410-28685be0639d h1:j1r2zNSzRM12ZbQye9B/HpszTmNv+cBYJ2dPmMFqkf0= -github.com/livekit/protocol v0.11.14-0.20220221030410-28685be0639d/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg= +github.com/livekit/protocol v0.11.14-0.20220223000017-946db440fa31 h1:hKxQgugr7nDA28kY+XEKELL/c4fD5kaoUHkTSPQyXwQ= +github.com/livekit/protocol v0.11.14-0.20220223000017-946db440fa31/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index b262a79a1..8dd3f55eb 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1,6 +1,7 @@ package rtc import ( + "context" "fmt" "io" "strings" @@ -1302,7 +1303,7 @@ func (p *ParticipantImpl) onStreamStateChange(update *sfu.StreamStateUpdate) err }) } -func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality, _maxSubscribedQuality livekit.VideoQuality) error { +func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality, maxSubscribedQuality livekit.VideoQuality) error { if len(subscribedQualities) == 0 { return nil } @@ -1312,6 +1313,8 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, SubscribedQualities: subscribedQualities, } + p.params.Telemetry.TrackMaxSubscribedVideoQuality(context.Background(), p.ID(), &livekit.TrackInfo{Sid: string(trackID)}, maxSubscribedQuality) + return p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_SubscribedQualityUpdate{ SubscribedQualityUpdate: subscribedQualityUpdate, diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index 93375106a..ef6a1a08e 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -57,6 +57,14 @@ type FakeTelemetryService struct { arg1 context.Context arg2 *livekit.Room } + TrackMaxSubscribedVideoQualityStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo, livekit.VideoQuality) + trackMaxSubscribedVideoQualityMutex sync.RWMutex + trackMaxSubscribedVideoQualityArgsForCall []struct { + arg1 context.Context + arg2 livekit.ParticipantID + arg3 *livekit.TrackInfo + arg4 livekit.VideoQuality + } TrackPublishedStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo) trackPublishedMutex sync.RWMutex trackPublishedArgsForCall []struct { @@ -341,6 +349,41 @@ func (fake *FakeTelemetryService) RoomStartedArgsForCall(i int) (context.Context return argsForCall.arg1, argsForCall.arg2 } +func (fake *FakeTelemetryService) TrackMaxSubscribedVideoQuality(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo, arg4 livekit.VideoQuality) { + fake.trackMaxSubscribedVideoQualityMutex.Lock() + fake.trackMaxSubscribedVideoQualityArgsForCall = append(fake.trackMaxSubscribedVideoQualityArgsForCall, struct { + arg1 context.Context + arg2 livekit.ParticipantID + arg3 *livekit.TrackInfo + arg4 livekit.VideoQuality + }{arg1, arg2, arg3, arg4}) + stub := fake.TrackMaxSubscribedVideoQualityStub + fake.recordInvocation("TrackMaxSubscribedVideoQuality", []interface{}{arg1, arg2, arg3, arg4}) + fake.trackMaxSubscribedVideoQualityMutex.Unlock() + if stub != nil { + fake.TrackMaxSubscribedVideoQualityStub(arg1, arg2, arg3, arg4) + } +} + +func (fake *FakeTelemetryService) TrackMaxSubscribedVideoQualityCallCount() int { + fake.trackMaxSubscribedVideoQualityMutex.RLock() + defer fake.trackMaxSubscribedVideoQualityMutex.RUnlock() + return len(fake.trackMaxSubscribedVideoQualityArgsForCall) +} + +func (fake *FakeTelemetryService) TrackMaxSubscribedVideoQualityCalls(stub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo, livekit.VideoQuality)) { + fake.trackMaxSubscribedVideoQualityMutex.Lock() + defer fake.trackMaxSubscribedVideoQualityMutex.Unlock() + fake.TrackMaxSubscribedVideoQualityStub = stub +} + +func (fake *FakeTelemetryService) TrackMaxSubscribedVideoQualityArgsForCall(i int) (context.Context, livekit.ParticipantID, *livekit.TrackInfo, livekit.VideoQuality) { + fake.trackMaxSubscribedVideoQualityMutex.RLock() + defer fake.trackMaxSubscribedVideoQualityMutex.RUnlock() + argsForCall := fake.trackMaxSubscribedVideoQualityArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 +} + func (fake *FakeTelemetryService) TrackPublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo) { fake.trackPublishedMutex.Lock() fake.trackPublishedArgsForCall = append(fake.trackPublishedArgsForCall, struct { @@ -564,6 +607,8 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} { defer fake.roomEndedMutex.RUnlock() fake.roomStartedMutex.RLock() defer fake.roomStartedMutex.RUnlock() + fake.trackMaxSubscribedVideoQualityMutex.RLock() + defer fake.trackMaxSubscribedVideoQualityMutex.RUnlock() fake.trackPublishedMutex.RLock() defer fake.trackPublishedMutex.RUnlock() fake.trackPublishedUpdateMutex.RLock() diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 0c8df42cd..60be5a805 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -25,6 +25,7 @@ type TelemetryService interface { TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) + TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, maxQuality livekit.VideoQuality) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) ParticipantActive(ctx context.Context, participantID livekit.ParticipantID, clientMeta *livekit.AnalyticsClientMeta) @@ -143,3 +144,9 @@ func (t *telemetryService) ParticipantActive(ctx context.Context, participantID t.internalService.ParticipantActive(ctx, participantID, clientMeta) } } + +func (t *telemetryService) TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, maxQuality livekit.VideoQuality) { + t.jobQueue <- func() { + t.internalService.TrackMaxSubscribedVideoQuality(ctx, participantID, track, maxQuality) + } +} diff --git a/pkg/telemetry/telemetryserviceinternalevents.go b/pkg/telemetry/telemetryserviceinternalevents.go index 5f27fb5bf..4e504ce37 100644 --- a/pkg/telemetry/telemetryserviceinternalevents.go +++ b/pkg/telemetry/telemetryserviceinternalevents.go @@ -119,6 +119,21 @@ func (t *telemetryServiceInternal) TrackPublishedUpdate(ctx context.Context, par }) } +func (t *telemetryServiceInternal) TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, + maxQuality livekit.VideoQuality) { + + roomID, roomName := t.getRoomDetails(participantID) + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_TRACK_MAX_SUBSCRIBED_VIDEO_QUALITY, + Timestamp: timestamppb.Now(), + RoomId: string(roomID), + ParticipantId: string(participantID), + Track: track, + Room: &livekit.Room{Name: string(roomName)}, + MaxSubscribedVideoQuality: maxQuality, + }) +} + func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) { roomID := livekit.RoomID("") roomName := livekit.RoomName("")