telemetry interfaces (#210)

* telemetry interfaces

* move AddUptrack under stats

* regenerate

* a space

* consistency

* fix test
This commit is contained in:
David Colburn
2021-11-24 17:58:04 -08:00
committed by GitHub
parent f4c96449ae
commit 991c334d2d
14 changed files with 152 additions and 98 deletions
+6 -5
View File
@@ -1,6 +1,7 @@
package rtc
import (
"context"
"errors"
"sync"
"sync/atomic"
@@ -74,7 +75,7 @@ type MediaTrackParams struct {
BufferFactory *buffer.Factory
ReceiverConfig ReceiverConfig
AudioConfig config.AudioConfig
Telemetry *telemetry.TelemetryService
Telemetry telemetry.TelemetryService
Logger logger.Logger
}
@@ -249,7 +250,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
delete(t.subscribedTracks, sub.ID())
t.lock.Unlock()
t.params.Telemetry.TrackUnsubscribed(sub.ID(), t.ToProto())
t.params.Telemetry.TrackUnsubscribed(context.Background(), sub.ID(), t.ToProto())
// ignore if the subscribing sub is not connected
if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed {
@@ -299,7 +300,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
sub.Negotiate()
}()
t.params.Telemetry.TrackSubscribed(sub.ID(), t.ToProto())
t.params.Telemetry.TrackSubscribed(context.Background(), sub.ID(), t.ToProto())
return nil
}
@@ -380,12 +381,12 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
onclose := t.onClose
t.lock.Unlock()
t.RemoveAllSubscribers()
t.params.Telemetry.TrackUnpublished(t.params.ParticipantID, t.ToProto(), uint32(track.SSRC()))
t.params.Telemetry.TrackUnpublished(context.Background(), t.params.ParticipantID, t.ToProto(), uint32(track.SSRC()))
if onclose != nil {
onclose()
}
})
t.params.Telemetry.TrackPublished(t.params.ParticipantID, t.ToProto())
t.params.Telemetry.TrackPublished(context.Background(), t.params.ParticipantID, t.ToProto())
if t.Kind() == livekit.TrackType_AUDIO {
t.buffer = buff
}
+1 -1
View File
@@ -39,7 +39,7 @@ type ParticipantParams struct {
Sink routing.MessageSink
AudioConfig config.AudioConfig
ProtocolVersion types.ProtocolVersion
Telemetry *telemetry.TelemetryService
Telemetry telemetry.TelemetryService
ThrottleConfig config.PLIThrottleConfig
EnabledCodecs []*livekit.Codec
Hidden bool
+2 -2
View File
@@ -34,7 +34,7 @@ type Room struct {
config WebRTCConfig
audioConfig *config.AudioConfig
telemetry *telemetry.TelemetryService
telemetry telemetry.TelemetryService
// map of identity -> Participant
participants map[string]types.Participant
@@ -57,7 +57,7 @@ type ParticipantOptions struct {
AutoSubscribe bool
}
func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig, telemetry *telemetry.TelemetryService) *Room {
func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig, telemetry telemetry.TelemetryService) *Room {
r := &Room{
Room: proto.Clone(room).(*livekit.Room),
Logger: logger.Logger(logger.GetLogger().WithValues("room", room.Name)),
+1 -1
View File
@@ -552,7 +552,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room {
UpdateInterval: audioUpdateInterval,
SmoothIntervals: opts.audioSmoothIntervals,
},
telemetry.NewTelemetryService(nil),
telemetry.NewTelemetryService(nil, nil),
)
for i := 0; i < opts.num+opts.numHidden; i++ {
identity := fmt.Sprintf("p%d", i)
+1 -1
View File
@@ -45,7 +45,7 @@ type TransportParams struct {
ParticipantIdentity string
Target livekit.SignalTarget
Config *WebRTCConfig
Telemetry *telemetry.TelemetryService
Telemetry telemetry.TelemetryService
EnabledCodecs []*livekit.Codec
Logger logger.Logger
}
+3 -3
View File
@@ -17,11 +17,11 @@ import (
type RecordingService struct {
bus utils.MessageBus
telemetry *telemetry.TelemetryService
telemetry telemetry.TelemetryService
shutdown chan struct{}
}
func NewRecordingService(mb utils.MessageBus, telemetry *telemetry.TelemetryService) *RecordingService {
func NewRecordingService(mb utils.MessageBus, telemetry telemetry.TelemetryService) *RecordingService {
return &RecordingService{
bus: mb,
telemetry: telemetry,
@@ -158,7 +158,7 @@ func (s *RecordingService) resultsWorker() {
}
logger.Debugw("recording ended", values...)
s.telemetry.RecordingEnded(res)
s.telemetry.RecordingEnded(context.Background(), res)
case <-s.shutdown:
_ = sub.Close()
return
+2 -2
View File
@@ -30,7 +30,7 @@ type RoomManager struct {
currentNode routing.LocalNode
router routing.Router
roomStore RoomStore
telemetry *telemetry.TelemetryService
telemetry telemetry.TelemetryService
rooms map[string]*rtc.Room
}
@@ -40,7 +40,7 @@ func NewLocalRoomManager(
roomStore RoomStore,
currentNode routing.LocalNode,
router routing.Router,
telemetry *telemetry.TelemetryService,
telemetry telemetry.TelemetryService,
) (*RoomManager, error) {
rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip)
+1
View File
@@ -32,6 +32,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
createWebhookNotifier,
routing.CreateRouter,
wire.Bind(new(routing.MessageRouter), new(routing.Router)),
telemetry.NewAnalyticsService,
telemetry.NewTelemetryService,
NewRecordingService,
NewRoomAllocator,
+3 -3
View File
@@ -1,8 +1,7 @@
// Code generated by Wire. DO NOT EDIT.
//go:generate go run github.com/google/wire/cmd/wire
//go:build !wireinject
// +build !wireinject
//+build !wireinject
package service
@@ -47,7 +46,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
telemetryService := telemetry.NewTelemetryService(notifier)
analyticsService := telemetry.NewAnalyticsService(conf, currentNode)
telemetryService := telemetry.NewTelemetryService(notifier, analyticsService)
recordingService := NewRecordingService(messageBus, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, router, currentNode)
roomManager, err := NewLocalRoomManager(conf, roomStore, currentNode, router, telemetryService)
+58
View File
@@ -0,0 +1,58 @@
package telemetry
import (
"context"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
)
type AnalyticsService interface {
SendStats(ctx context.Context, stats []*livekit.AnalyticsStat)
SendEvent(ctx context.Context, events *livekit.AnalyticsEvent)
}
type analyticsService struct {
analyticsKey string
nodeID string
events livekit.AnalyticsRecorderService_IngestEventsClient
stats livekit.AnalyticsRecorderService_IngestStatsClient
}
func NewAnalyticsService(conf *config.Config, currentNode routing.LocalNode) AnalyticsService {
return &analyticsService{
analyticsKey: "", // TODO: conf.AnalyticsKey
nodeID: currentNode.Id,
}
}
func (a *analyticsService) SendStats(ctx context.Context, stats []*livekit.AnalyticsStat) {
if a.stats == nil {
return
}
for _, stat := range stats {
stat.AnalyticsKey = a.analyticsKey
stat.Node = a.nodeID
}
if err := a.stats.Send(&livekit.AnalyticsStats{Stats: stats}); err != nil {
logger.Errorw("failed to send stats", err)
}
}
func (a *analyticsService) SendEvent(ctx context.Context, event *livekit.AnalyticsEvent) {
if a.events == nil {
return
}
event.AnalyticsKey = a.analyticsKey
if err := a.events.Send(&livekit.AnalyticsEvents{
Events: []*livekit.AnalyticsEvent{event},
}); err != nil {
logger.Errorw("failed to send event", err, "eventType", event.Type.String())
}
}
+23 -43
View File
@@ -9,11 +9,10 @@ import (
"github.com/livekit/protocol/webhook"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
func (t *TelemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
prometheus.RoomStarted()
t.notifyEvent(ctx, &livekit.WebhookEvent{
@@ -21,14 +20,14 @@ func (t *TelemetryService) RoomStarted(ctx context.Context, room *livekit.Room)
Room: room,
})
t.sendEvent(&livekit.AnalyticsEvent{
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_ROOM_CREATED,
Timestamp: &timestamppb.Timestamp{Seconds: room.CreationTime},
Room: room,
})
}
func (t *TelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
prometheus.RoomEnded(time.Unix(room.CreationTime, 0))
t.notifyEvent(ctx, &livekit.WebhookEvent{
@@ -36,16 +35,16 @@ func (t *TelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
Room: room,
})
t.sendEvent(&livekit.AnalyticsEvent{
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_ROOM_ENDED,
Timestamp: timestamppb.Now(),
RoomSid: room.Sid,
})
}
func (t *TelemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
t.Lock()
t.workers[participant.Sid] = NewStatsWorker(t, room.Sid, participant.Sid, room.Name)
t.workers[participant.Sid] = newStatsWorker(ctx, t, room.Sid, participant.Sid, room.Name)
t.Unlock()
prometheus.AddParticipant()
@@ -56,14 +55,14 @@ func (t *TelemetryService) ParticipantJoined(ctx context.Context, room *livekit.
Participant: participant,
})
t.sendEvent(&livekit.AnalyticsEvent{
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED,
Timestamp: timestamppb.Now(),
Participant: participant,
})
}
func (t *TelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
t.Lock()
if w := t.workers[participant.Sid]; w != nil {
w.Close()
@@ -79,17 +78,17 @@ func (t *TelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Ro
Participant: participant,
})
t.sendEvent(&livekit.AnalyticsEvent{
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_PARTICIPANT_LEFT,
Timestamp: timestamppb.Now(),
ParticipantId: participant.Sid,
})
}
func (t *TelemetryService) TrackPublished(participantID string, track *livekit.TrackInfo) {
func (t *telemetryService) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) {
prometheus.AddPublishedTrack(track.Type.String())
t.sendEvent(&livekit.AnalyticsEvent{
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_PUBLISHED,
Timestamp: timestamppb.Now(),
ParticipantId: participantID,
@@ -97,16 +96,7 @@ func (t *TelemetryService) TrackPublished(participantID string, track *livekit.T
})
}
func (t *TelemetryService) AddUpTrack(participantID string, buff *buffer.Buffer) {
t.RLock()
w := t.workers[participantID]
t.RUnlock()
if w != nil {
w.AddBuffer(buff)
}
}
func (t *TelemetryService) TrackUnpublished(participantID string, track *livekit.TrackInfo, ssrc uint32) {
func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) {
t.RLock()
w := t.workers[participantID]
t.RUnlock()
@@ -116,7 +106,7 @@ func (t *TelemetryService) TrackUnpublished(participantID string, track *livekit
prometheus.SubPublishedTrack(track.Type.String())
t.sendEvent(&livekit.AnalyticsEvent{
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_UNPUBLISHED,
Timestamp: timestamppb.Now(),
ParticipantId: participantID,
@@ -124,10 +114,10 @@ func (t *TelemetryService) TrackUnpublished(participantID string, track *livekit
})
}
func (t *TelemetryService) TrackSubscribed(participantID string, track *livekit.TrackInfo) {
func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
prometheus.AddSubscribedTrack(track.Type.String())
t.sendEvent(&livekit.AnalyticsEvent{
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_SUBSCRIBED,
Timestamp: timestamppb.Now(),
ParticipantId: participantID,
@@ -135,10 +125,10 @@ func (t *TelemetryService) TrackSubscribed(participantID string, track *livekit.
})
}
func (t *TelemetryService) TrackUnsubscribed(participantID string, track *livekit.TrackInfo) {
func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
prometheus.SubSubscribedTrack(track.Type.String())
t.sendEvent(&livekit.AnalyticsEvent{
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_UNSUBSCRIBED,
Timestamp: timestamppb.Now(),
ParticipantId: participantID,
@@ -146,7 +136,7 @@ func (t *TelemetryService) TrackUnsubscribed(participantID string, track *liveki
})
}
func (t *TelemetryService) RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) {
func (t *telemetryService) RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) {
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRecordingStarted,
RecordingInfo: &livekit.RecordingInfo{
@@ -155,27 +145,27 @@ func (t *TelemetryService) RecordingStarted(ctx context.Context, recordingID str
},
})
t.sendEvent(&livekit.AnalyticsEvent{
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_RECORDING_STARTED,
Timestamp: timestamppb.Now(),
RecordingId: recordingID,
})
}
func (t *TelemetryService) RecordingEnded(res *livekit.RecordingResult) {
t.notifyEvent(context.Background(), &livekit.WebhookEvent{
func (t *telemetryService) RecordingEnded(ctx context.Context, res *livekit.RecordingResult) {
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRecordingFinished,
RecordingResult: res,
})
t.sendEvent(&livekit.AnalyticsEvent{
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_RECORDING_ENDED,
Timestamp: timestamppb.Now(),
RecordingId: res.Id,
})
}
func (t *TelemetryService) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) {
func (t *telemetryService) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) {
if t.notifier == nil {
return
}
@@ -186,13 +176,3 @@ func (t *TelemetryService) notifyEvent(ctx context.Context, event *livekit.Webho
}
})
}
func (t *TelemetryService) sendEvent(event *livekit.AnalyticsEvent) {
if t.analyticsEnabled {
if err := t.events.Send(&livekit.AnalyticsEvents{
Events: []*livekit.AnalyticsEvent{event},
}); err != nil {
logger.Errorw("failed to send event", err, "eventType", event.Type.String())
}
}
}
+3 -3
View File
@@ -6,7 +6,7 @@ import (
"github.com/pion/rtcp"
)
func (t *TelemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory {
func (t *telemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory {
return &StatsInterceptorFactory{
t: t,
participantID: participantID,
@@ -15,7 +15,7 @@ func (t *TelemetryService) NewStatsInterceptorFactory(participantID, identity st
}
type StatsInterceptorFactory struct {
t *TelemetryService
t TelemetryService
participantID string
identity string
}
@@ -31,7 +31,7 @@ func (f *StatsInterceptorFactory) NewInterceptor(id string) (interceptor.Interce
type StatsInterceptor struct {
interceptor.NoOp
t *TelemetryService
t TelemetryService
participantID string
identity string
}
+41 -30
View File
@@ -1,18 +1,40 @@
package telemetry
import (
"context"
"sync"
"github.com/gammazero/workerpool"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/webhook"
"github.com/pion/rtcp"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
type TelemetryService struct {
type TelemetryService interface {
// stats
NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory
AddUpTrack(participantID string, buff *buffer.Buffer)
OnDownstreamPacket(participantID string, bytes int)
HandleRTCP(streamType livekit.StreamType, participantID string, pkts []rtcp.Packet)
Report(ctx context.Context, stats []*livekit.AnalyticsStat)
// events
RoomStarted(ctx context.Context, room *livekit.Room)
RoomEnded(ctx context.Context, room *livekit.Room)
ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo)
ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo)
TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo)
TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32)
TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo)
TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo)
RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest)
RecordingEnded(ctx context.Context, res *livekit.RecordingResult)
}
type telemetryService struct {
notifier webhook.Notifier
webhookPool *workerpool.WorkerPool
@@ -20,26 +42,28 @@ type TelemetryService struct {
// one worker per participant
workers map[string]*StatsWorker
analyticsEnabled bool
analyticsKey string
nodeID string
events livekit.AnalyticsRecorderService_IngestEventsClient
stats livekit.AnalyticsRecorderService_IngestStatsClient
analytics AnalyticsService
}
func NewTelemetryService(notifier webhook.Notifier) *TelemetryService {
return &TelemetryService{
func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService {
return &telemetryService{
notifier: notifier,
webhookPool: workerpool.New(1),
workers: make(map[string]*StatsWorker),
analyticsEnabled: false, // TODO
analyticsKey: "",
nodeID: "",
analytics: analytics,
}
}
func (t *TelemetryService) OnDownstreamPacket(participantID string, bytes int) {
func (t *telemetryService) AddUpTrack(participantID string, buff *buffer.Buffer) {
t.RLock()
w := t.workers[participantID]
t.RUnlock()
if w != nil {
w.AddBuffer(buff)
}
}
func (t *telemetryService) OnDownstreamPacket(participantID string, bytes int) {
t.RLock()
w := t.workers[participantID]
t.RUnlock()
@@ -48,7 +72,7 @@ func (t *TelemetryService) OnDownstreamPacket(participantID string, bytes int) {
}
}
func (t *TelemetryService) HandleRTCP(streamType livekit.StreamType, participantID string, pkts []rtcp.Packet) {
func (t *telemetryService) HandleRTCP(streamType livekit.StreamType, participantID string, pkts []rtcp.Packet) {
stats := &livekit.AnalyticsStat{}
for _, pkt := range pkts {
switch pkt := pkt.(type) {
@@ -86,7 +110,7 @@ func (t *TelemetryService) HandleRTCP(streamType livekit.StreamType, participant
}
}
func (t *TelemetryService) Report(stats []*livekit.AnalyticsStat) {
func (t *telemetryService) Report(ctx context.Context, stats []*livekit.AnalyticsStat) {
for _, stat := range stats {
direction := prometheus.Incoming
if stat.Kind == livekit.StreamType_DOWNSTREAM {
@@ -97,18 +121,5 @@ func (t *TelemetryService) Report(stats []*livekit.AnalyticsStat) {
prometheus.IncrementBytes(direction, stat.TotalBytes)
}
t.sendStats(stats)
}
func (t *TelemetryService) sendStats(stats []*livekit.AnalyticsStat) {
if t.analyticsEnabled {
for _, stat := range stats {
stat.AnalyticsKey = t.analyticsKey
stat.Node = t.nodeID
}
if err := t.stats.Send(&livekit.AnalyticsStats{Stats: stats}); err != nil {
logger.Errorw("failed to send stats", err)
}
}
t.analytics.SendStats(ctx, stats)
}
+7 -4
View File
@@ -1,6 +1,7 @@
package telemetry
import (
"context"
"sync"
"time"
@@ -14,7 +15,8 @@ const updateFrequency = time.Second * 10
// StatsWorker handles participant stats
type StatsWorker struct {
t *TelemetryService
ctx context.Context
t TelemetryService
roomID string
roomName string
participantID string
@@ -38,12 +40,13 @@ type Stats struct {
prevBytes uint64
}
func NewStatsWorker(t *TelemetryService, roomID, participantID, roomName string) *StatsWorker {
func newStatsWorker(ctx context.Context, t TelemetryService, roomID, roomName, participantID string) *StatsWorker {
s := &StatsWorker{
ctx: ctx,
t: t,
roomID: roomID,
participantID: participantID,
roomName: roomName,
participantID: participantID,
buffers: make(map[uint32]*buffer.Buffer),
drain: make(map[uint32]bool),
@@ -151,7 +154,7 @@ func (s *StatsWorker) Update() {
stats = append(stats, downstream)
}
s.t.Report(stats)
s.t.Report(s.ctx, stats)
}
func (s *StatsWorker) update(stats *Stats, ts *timestamppb.Timestamp) *livekit.AnalyticsStat {