Telemetry capture published track updates (#367)

* Telemetry capture published track updates

Signed-off-by: shishir gowda <shishir@livekit.io>

* Updated OnVideoLayerUpdate to take slice of layers

Signed-off-by: shishir gowda <shishir@livekit.io>

* Update proto dep

Signed-off-by: shishir gowda <shishir@livekit.io>
This commit is contained in:
shishirng
2022-01-24 14:38:04 -05:00
committed by GitHub
parent 90b97137c9
commit 56ebd521f9
7 changed files with 84 additions and 7 deletions

2
go.mod
View File

@@ -14,7 +14,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v0.11.11
github.com/livekit/protocol v0.11.12-0.20220124192741-b94955852f2a
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0

6
go.sum
View File

@@ -132,8 +132,10 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.11.11 h1:je6yFjRMtDULH1Ir6d6PhX3ii676NGH7bUru7xmqGZ0=
github.com/livekit/protocol v0.11.11/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg=
github.com/livekit/protocol v0.11.12-0.20220124142501-c1f7afc1502f h1:gxMYQrYJjQSmIOgvPVKS8laL/QRMkkcblsNuompoGzk=
github.com/livekit/protocol v0.11.12-0.20220124142501-c1f7afc1502f/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg=
github.com/livekit/protocol v0.11.12-0.20220124192741-b94955852f2a h1:iYvpBQ5a7IgVWZfl5A+VggN2H86MAwYOoAwwvq6Qeq4=
github.com/livekit/protocol v0.11.12-0.20220124192741-b94955852f2a/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=
github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=

View File

@@ -90,6 +90,19 @@ func NewMediaTrack(track *webrtc.TrackRemote, params MediaTrackParams) *MediaTra
t.buffer.SetLastFractionLostReport(fractionalLoss)
}
})
t.MediaTrackReceiver.OnVideoLayerUpdate(func(layers []*livekit.VideoLayer) {
for _, layer := range layers {
t.params.Telemetry.TrackPublishedUpdate(context.Background(), t.PublisherID(),
&livekit.TrackInfo{
Sid: string(t.ID()),
Type: livekit.TrackType_VIDEO,
Muted: t.IsMuted(),
Width: layer.Width,
Height: layer.Height,
Simulcast: t.IsSimulcast(),
})
}
})
// on close signal via closing channel to workers
t.AddOnClose(t.closeChan)

View File

@@ -33,10 +33,11 @@ type MediaTrackReceiver struct {
layerDimensions sync.Map // livekit.VideoQuality => *livekit.VideoLayer
// track audio fraction lost
downFracLostLock sync.Mutex
maxDownFracLost uint8
maxDownFracLostTs time.Time
onMediaLossUpdate func(fractionalLoss uint8)
downFracLostLock sync.Mutex
maxDownFracLost uint8
maxDownFracLostTs time.Time
onMediaLossUpdate func(fractionalLoss uint8)
onVideoLayerUpdate func(layers []*livekit.VideoLayer)
onClose []func()
@@ -93,6 +94,10 @@ func (t *MediaTrackReceiver) OnMediaLossUpdate(f func(fractionalLoss uint8)) {
t.onMediaLossUpdate = f
}
func (t *MediaTrackReceiver) OnVideoLayerUpdate(f func(layers []*livekit.VideoLayer)) {
t.onVideoLayerUpdate = f
}
func (t *MediaTrackReceiver) Close() {
t.lock.Lock()
t.receiver = nil
@@ -214,6 +219,9 @@ func (t *MediaTrackReceiver) UpdateVideoLayers(layers []*livekit.VideoLayer) {
}
t.MediaTrackSubscriptions.UpdateVideoLayers()
if t.onVideoLayerUpdate != nil {
t.onVideoLayerUpdate(layers)
}
// TODO: this might need to trigger a participant update for clients to pick up dimension change
}

View File

@@ -28,6 +28,7 @@ type TelemetryService interface {
TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32)
TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo)
TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo)
TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo)
RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo)
RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo)
}
@@ -144,3 +145,9 @@ func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.Recor
t.internalService.RecordingEnded(ctx, ri)
}
}
func (t *telemetryService) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.jobQueue <- func() {
t.internalService.TrackPublishedUpdate(ctx, participantID, track)
}
}

View File

@@ -104,6 +104,20 @@ func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participa
})
}
func (t *telemetryServiceInternal) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
prometheus.AddPublishedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_PUBLISHED_UPDATE,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
Track: track,
Room: &livekit.Room{Name: string(roomName)},
})
}
func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) {
roomID := livekit.RoomID("")
roomName := livekit.RoomName("")

View File

@@ -65,3 +65,36 @@ func Test_OnParticipantLeft_EventIsSent(t *testing.T) {
require.Equal(t, room.Sid, event.RoomId)
require.Equal(t, room, event.Room)
}
func Test_OnTrackUpdate_EventIsSent(t *testing.T) {
fixture := createFixture()
// prepare
partID := "part1"
trackID := "track1"
width := uint32(360)
height := uint32(720)
trackInfo := &livekit.TrackInfo{
Sid: trackID,
Type: livekit.TrackType_VIDEO,
Muted: false,
Width: width,
Height: height,
Simulcast: false,
DisableDtx: false,
}
// do
fixture.sut.TrackPublishedUpdate(context.Background(), livekit.ParticipantID(partID), trackInfo)
// test
require.Equal(t, 1, fixture.analytics.SendEventCallCount())
_, event := fixture.analytics.SendEventArgsForCall(0)
require.Equal(t, livekit.AnalyticsEventType_TRACK_PUBLISHED_UPDATE, event.Type)
require.Equal(t, partID, event.ParticipantId)
require.Equal(t, trackID, event.Track.Sid)
require.Equal(t, width, event.Track.Width)
require.Equal(t, height, event.Track.Height)
}