From 56ebd521f99649d5332173ffbe21e71fe10956db Mon Sep 17 00:00:00 2001 From: shishirng Date: Mon, 24 Jan 2022 14:38:04 -0500 Subject: [PATCH] Telemetry capture published track updates (#367) * Telemetry capture published track updates Signed-off-by: shishir gowda * Updated OnVideoLayerUpdate to take slice of layers Signed-off-by: shishir gowda * Update proto dep Signed-off-by: shishir gowda --- go.mod | 2 +- go.sum | 6 ++-- pkg/rtc/mediatrack.go | 13 ++++++++ pkg/rtc/mediatrackreceiver.go | 16 ++++++--- pkg/telemetry/telemetryservice.go | 7 ++++ .../telemetryserviceinternalevents.go | 14 ++++++++ .../test/telemetry_service_events_test.go | 33 +++++++++++++++++++ 7 files changed, 84 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 40ff48f96..c7e97d2f5 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v0.11.11 + github.com/livekit/protocol v0.11.12-0.20220124192741-b94955852f2a github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index 0d57beb05..8e994271c 100644 --- a/go.sum +++ b/go.sum @@ -132,8 +132,10 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.11.11 h1:je6yFjRMtDULH1Ir6d6PhX3ii676NGH7bUru7xmqGZ0= -github.com/livekit/protocol v0.11.11/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg= +github.com/livekit/protocol v0.11.12-0.20220124142501-c1f7afc1502f h1:gxMYQrYJjQSmIOgvPVKS8laL/QRMkkcblsNuompoGzk= +github.com/livekit/protocol v0.11.12-0.20220124142501-c1f7afc1502f/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg= +github.com/livekit/protocol v0.11.12-0.20220124192741-b94955852f2a h1:iYvpBQ5a7IgVWZfl5A+VggN2H86MAwYOoAwwvq6Qeq4= +github.com/livekit/protocol v0.11.12-0.20220124192741-b94955852f2a/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 35577c51a..5ae117ec0 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -90,6 +90,19 @@ func NewMediaTrack(track *webrtc.TrackRemote, params MediaTrackParams) *MediaTra t.buffer.SetLastFractionLostReport(fractionalLoss) } }) + t.MediaTrackReceiver.OnVideoLayerUpdate(func(layers []*livekit.VideoLayer) { + for _, layer := range layers { + t.params.Telemetry.TrackPublishedUpdate(context.Background(), t.PublisherID(), + &livekit.TrackInfo{ + Sid: string(t.ID()), + Type: livekit.TrackType_VIDEO, + Muted: t.IsMuted(), + Width: layer.Width, + Height: layer.Height, + Simulcast: t.IsSimulcast(), + }) + } + }) // on close signal via closing channel to workers t.AddOnClose(t.closeChan) diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 8eb1dff7a..d14df48e7 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -33,10 +33,11 @@ type MediaTrackReceiver struct { layerDimensions sync.Map // livekit.VideoQuality => *livekit.VideoLayer // track audio fraction lost - downFracLostLock sync.Mutex - maxDownFracLost uint8 - maxDownFracLostTs time.Time - onMediaLossUpdate func(fractionalLoss uint8) + downFracLostLock sync.Mutex + maxDownFracLost uint8 + maxDownFracLostTs time.Time + onMediaLossUpdate func(fractionalLoss uint8) + onVideoLayerUpdate func(layers []*livekit.VideoLayer) onClose []func() @@ -93,6 +94,10 @@ func (t *MediaTrackReceiver) OnMediaLossUpdate(f func(fractionalLoss uint8)) { t.onMediaLossUpdate = f } +func (t *MediaTrackReceiver) OnVideoLayerUpdate(f func(layers []*livekit.VideoLayer)) { + t.onVideoLayerUpdate = f +} + func (t *MediaTrackReceiver) Close() { t.lock.Lock() t.receiver = nil @@ -214,6 +219,9 @@ func (t *MediaTrackReceiver) UpdateVideoLayers(layers []*livekit.VideoLayer) { } t.MediaTrackSubscriptions.UpdateVideoLayers() + if t.onVideoLayerUpdate != nil { + t.onVideoLayerUpdate(layers) + } // TODO: this might need to trigger a participant update for clients to pick up dimension change } diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 5208fd70e..a399ff213 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -28,6 +28,7 @@ type TelemetryService interface { TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) + TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) } @@ -144,3 +145,9 @@ func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.Recor t.internalService.RecordingEnded(ctx, ri) } } + +func (t *telemetryService) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { + t.jobQueue <- func() { + t.internalService.TrackPublishedUpdate(ctx, participantID, track) + } +} diff --git a/pkg/telemetry/telemetryserviceinternalevents.go b/pkg/telemetry/telemetryserviceinternalevents.go index e42186bb3..990fcf963 100644 --- a/pkg/telemetry/telemetryserviceinternalevents.go +++ b/pkg/telemetry/telemetryserviceinternalevents.go @@ -104,6 +104,20 @@ func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participa }) } +func (t *telemetryServiceInternal) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { + prometheus.AddPublishedTrack(track.Type.String()) + + roomID, roomName := t.getRoomDetails(participantID) + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_TRACK_PUBLISHED_UPDATE, + Timestamp: timestamppb.Now(), + RoomId: string(roomID), + ParticipantId: string(participantID), + Track: track, + Room: &livekit.Room{Name: string(roomName)}, + }) +} + func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) { roomID := livekit.RoomID("") roomName := livekit.RoomName("") diff --git a/pkg/telemetry/test/telemetry_service_events_test.go b/pkg/telemetry/test/telemetry_service_events_test.go index 3f6ab52f7..9fcad999c 100644 --- a/pkg/telemetry/test/telemetry_service_events_test.go +++ b/pkg/telemetry/test/telemetry_service_events_test.go @@ -65,3 +65,36 @@ func Test_OnParticipantLeft_EventIsSent(t *testing.T) { require.Equal(t, room.Sid, event.RoomId) require.Equal(t, room, event.Room) } + +func Test_OnTrackUpdate_EventIsSent(t *testing.T) { + fixture := createFixture() + + // prepare + partID := "part1" + trackID := "track1" + width := uint32(360) + height := uint32(720) + trackInfo := &livekit.TrackInfo{ + Sid: trackID, + Type: livekit.TrackType_VIDEO, + Muted: false, + Width: width, + Height: height, + Simulcast: false, + DisableDtx: false, + } + + // do + fixture.sut.TrackPublishedUpdate(context.Background(), livekit.ParticipantID(partID), trackInfo) + + // test + require.Equal(t, 1, fixture.analytics.SendEventCallCount()) + _, event := fixture.analytics.SendEventArgsForCall(0) + require.Equal(t, livekit.AnalyticsEventType_TRACK_PUBLISHED_UPDATE, event.Type) + require.Equal(t, partID, event.ParticipantId) + + require.Equal(t, trackID, event.Track.Sid) + require.Equal(t, width, event.Track.Width) + require.Equal(t, height, event.Track.Height) + +}