diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 97fa4e2d2..996578cd9 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -1,6 +1,7 @@ package rtc import ( + "context" "errors" "sync" "sync/atomic" @@ -74,7 +75,7 @@ type MediaTrackParams struct { BufferFactory *buffer.Factory ReceiverConfig ReceiverConfig AudioConfig config.AudioConfig - Telemetry *telemetry.TelemetryService + Telemetry telemetry.TelemetryService Logger logger.Logger } @@ -249,7 +250,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { delete(t.subscribedTracks, sub.ID()) t.lock.Unlock() - t.params.Telemetry.TrackUnsubscribed(sub.ID(), t.ToProto()) + t.params.Telemetry.TrackUnsubscribed(context.Background(), sub.ID(), t.ToProto()) // ignore if the subscribing sub is not connected if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed { @@ -299,7 +300,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { sub.Negotiate() }() - t.params.Telemetry.TrackSubscribed(sub.ID(), t.ToProto()) + t.params.Telemetry.TrackSubscribed(context.Background(), sub.ID(), t.ToProto()) return nil } @@ -380,12 +381,12 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra onclose := t.onClose t.lock.Unlock() t.RemoveAllSubscribers() - t.params.Telemetry.TrackUnpublished(t.params.ParticipantID, t.ToProto(), uint32(track.SSRC())) + t.params.Telemetry.TrackUnpublished(context.Background(), t.params.ParticipantID, t.ToProto(), uint32(track.SSRC())) if onclose != nil { onclose() } }) - t.params.Telemetry.TrackPublished(t.params.ParticipantID, t.ToProto()) + t.params.Telemetry.TrackPublished(context.Background(), t.params.ParticipantID, t.ToProto()) if t.Kind() == livekit.TrackType_AUDIO { t.buffer = buff } diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 7931b8e24..34d17519f 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -39,7 +39,7 @@ type ParticipantParams struct { Sink routing.MessageSink AudioConfig config.AudioConfig ProtocolVersion types.ProtocolVersion - Telemetry *telemetry.TelemetryService + Telemetry telemetry.TelemetryService ThrottleConfig config.PLIThrottleConfig EnabledCodecs []*livekit.Codec Hidden bool diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index e3ec822db..5346ebbff 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -34,7 +34,7 @@ type Room struct { config WebRTCConfig audioConfig *config.AudioConfig - telemetry *telemetry.TelemetryService + telemetry telemetry.TelemetryService // map of identity -> Participant participants map[string]types.Participant @@ -57,7 +57,7 @@ type ParticipantOptions struct { AutoSubscribe bool } -func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig, telemetry *telemetry.TelemetryService) *Room { +func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig, telemetry telemetry.TelemetryService) *Room { r := &Room{ Room: proto.Clone(room).(*livekit.Room), Logger: logger.Logger(logger.GetLogger().WithValues("room", room.Name)), diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 098be3cbb..3e47211ff 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -552,7 +552,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room { UpdateInterval: audioUpdateInterval, SmoothIntervals: opts.audioSmoothIntervals, }, - telemetry.NewTelemetryService(nil), + telemetry.NewTelemetryService(nil, nil), ) for i := 0; i < opts.num+opts.numHidden; i++ { identity := fmt.Sprintf("p%d", i) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 7018d4b66..a580ecf63 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -45,7 +45,7 @@ type TransportParams struct { ParticipantIdentity string Target livekit.SignalTarget Config *WebRTCConfig - Telemetry *telemetry.TelemetryService + Telemetry telemetry.TelemetryService EnabledCodecs []*livekit.Codec Logger logger.Logger } diff --git a/pkg/service/recordingservice.go b/pkg/service/recordingservice.go index 8028a4a74..a39bbf162 100644 --- a/pkg/service/recordingservice.go +++ b/pkg/service/recordingservice.go @@ -17,11 +17,11 @@ import ( type RecordingService struct { bus utils.MessageBus - telemetry *telemetry.TelemetryService + telemetry telemetry.TelemetryService shutdown chan struct{} } -func NewRecordingService(mb utils.MessageBus, telemetry *telemetry.TelemetryService) *RecordingService { +func NewRecordingService(mb utils.MessageBus, telemetry telemetry.TelemetryService) *RecordingService { return &RecordingService{ bus: mb, telemetry: telemetry, @@ -158,7 +158,7 @@ func (s *RecordingService) resultsWorker() { } logger.Debugw("recording ended", values...) - s.telemetry.RecordingEnded(res) + s.telemetry.RecordingEnded(context.Background(), res) case <-s.shutdown: _ = sub.Close() return diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index dd9559c6c..813b6a3ce 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -30,7 +30,7 @@ type RoomManager struct { currentNode routing.LocalNode router routing.Router roomStore RoomStore - telemetry *telemetry.TelemetryService + telemetry telemetry.TelemetryService rooms map[string]*rtc.Room } @@ -40,7 +40,7 @@ func NewLocalRoomManager( roomStore RoomStore, currentNode routing.LocalNode, router routing.Router, - telemetry *telemetry.TelemetryService, + telemetry telemetry.TelemetryService, ) (*RoomManager, error) { rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip) diff --git a/pkg/service/wire.go b/pkg/service/wire.go index df0bbc4be..46249a15a 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -32,6 +32,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live createWebhookNotifier, routing.CreateRouter, wire.Bind(new(routing.MessageRouter), new(routing.Router)), + telemetry.NewAnalyticsService, telemetry.NewTelemetryService, NewRecordingService, NewRoomAllocator, diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 5f34f2921..fafb4ac46 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,8 +1,7 @@ // Code generated by Wire. DO NOT EDIT. //go:generate go run github.com/google/wire/cmd/wire -//go:build !wireinject -// +build !wireinject +//+build !wireinject package service @@ -47,7 +46,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - telemetryService := telemetry.NewTelemetryService(notifier) + analyticsService := telemetry.NewAnalyticsService(conf, currentNode) + telemetryService := telemetry.NewTelemetryService(notifier, analyticsService) recordingService := NewRecordingService(messageBus, telemetryService) rtcService := NewRTCService(conf, roomAllocator, router, currentNode) roomManager, err := NewLocalRoomManager(conf, roomStore, currentNode, router, telemetryService) diff --git a/pkg/telemetry/analytics.go b/pkg/telemetry/analytics.go new file mode 100644 index 000000000..4106a7cdb --- /dev/null +++ b/pkg/telemetry/analytics.go @@ -0,0 +1,58 @@ +package telemetry + +import ( + "context" + + "github.com/livekit/protocol/logger" + livekit "github.com/livekit/protocol/proto" + + "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/routing" +) + +type AnalyticsService interface { + SendStats(ctx context.Context, stats []*livekit.AnalyticsStat) + SendEvent(ctx context.Context, events *livekit.AnalyticsEvent) +} + +type analyticsService struct { + analyticsKey string + nodeID string + + events livekit.AnalyticsRecorderService_IngestEventsClient + stats livekit.AnalyticsRecorderService_IngestStatsClient +} + +func NewAnalyticsService(conf *config.Config, currentNode routing.LocalNode) AnalyticsService { + return &analyticsService{ + analyticsKey: "", // TODO: conf.AnalyticsKey + nodeID: currentNode.Id, + } +} + +func (a *analyticsService) SendStats(ctx context.Context, stats []*livekit.AnalyticsStat) { + if a.stats == nil { + return + } + + for _, stat := range stats { + stat.AnalyticsKey = a.analyticsKey + stat.Node = a.nodeID + } + if err := a.stats.Send(&livekit.AnalyticsStats{Stats: stats}); err != nil { + logger.Errorw("failed to send stats", err) + } +} + +func (a *analyticsService) SendEvent(ctx context.Context, event *livekit.AnalyticsEvent) { + if a.events == nil { + return + } + + event.AnalyticsKey = a.analyticsKey + if err := a.events.Send(&livekit.AnalyticsEvents{ + Events: []*livekit.AnalyticsEvent{event}, + }); err != nil { + logger.Errorw("failed to send event", err, "eventType", event.Type.String()) + } +} diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index 7fe219696..c91df289c 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -9,11 +9,10 @@ import ( "github.com/livekit/protocol/webhook" "google.golang.org/protobuf/types/known/timestamppb" - "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) -func (t *TelemetryService) RoomStarted(ctx context.Context, room *livekit.Room) { +func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) { prometheus.RoomStarted() t.notifyEvent(ctx, &livekit.WebhookEvent{ @@ -21,14 +20,14 @@ func (t *TelemetryService) RoomStarted(ctx context.Context, room *livekit.Room) Room: room, }) - t.sendEvent(&livekit.AnalyticsEvent{ + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_ROOM_CREATED, Timestamp: ×tamppb.Timestamp{Seconds: room.CreationTime}, Room: room, }) } -func (t *TelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { +func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { prometheus.RoomEnded(time.Unix(room.CreationTime, 0)) t.notifyEvent(ctx, &livekit.WebhookEvent{ @@ -36,16 +35,16 @@ func (t *TelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { Room: room, }) - t.sendEvent(&livekit.AnalyticsEvent{ + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_ROOM_ENDED, Timestamp: timestamppb.Now(), RoomSid: room.Sid, }) } -func (t *TelemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { +func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { t.Lock() - t.workers[participant.Sid] = NewStatsWorker(t, room.Sid, participant.Sid, room.Name) + t.workers[participant.Sid] = newStatsWorker(ctx, t, room.Sid, participant.Sid, room.Name) t.Unlock() prometheus.AddParticipant() @@ -56,14 +55,14 @@ func (t *TelemetryService) ParticipantJoined(ctx context.Context, room *livekit. Participant: participant, }) - t.sendEvent(&livekit.AnalyticsEvent{ + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED, Timestamp: timestamppb.Now(), Participant: participant, }) } -func (t *TelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { +func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { t.Lock() if w := t.workers[participant.Sid]; w != nil { w.Close() @@ -79,17 +78,17 @@ func (t *TelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Ro Participant: participant, }) - t.sendEvent(&livekit.AnalyticsEvent{ + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_PARTICIPANT_LEFT, Timestamp: timestamppb.Now(), ParticipantId: participant.Sid, }) } -func (t *TelemetryService) TrackPublished(participantID string, track *livekit.TrackInfo) { +func (t *telemetryService) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) { prometheus.AddPublishedTrack(track.Type.String()) - t.sendEvent(&livekit.AnalyticsEvent{ + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_TRACK_PUBLISHED, Timestamp: timestamppb.Now(), ParticipantId: participantID, @@ -97,16 +96,7 @@ func (t *TelemetryService) TrackPublished(participantID string, track *livekit.T }) } -func (t *TelemetryService) AddUpTrack(participantID string, buff *buffer.Buffer) { - t.RLock() - w := t.workers[participantID] - t.RUnlock() - if w != nil { - w.AddBuffer(buff) - } -} - -func (t *TelemetryService) TrackUnpublished(participantID string, track *livekit.TrackInfo, ssrc uint32) { +func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) { t.RLock() w := t.workers[participantID] t.RUnlock() @@ -116,7 +106,7 @@ func (t *TelemetryService) TrackUnpublished(participantID string, track *livekit prometheus.SubPublishedTrack(track.Type.String()) - t.sendEvent(&livekit.AnalyticsEvent{ + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_TRACK_UNPUBLISHED, Timestamp: timestamppb.Now(), ParticipantId: participantID, @@ -124,10 +114,10 @@ func (t *TelemetryService) TrackUnpublished(participantID string, track *livekit }) } -func (t *TelemetryService) TrackSubscribed(participantID string, track *livekit.TrackInfo) { +func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { prometheus.AddSubscribedTrack(track.Type.String()) - t.sendEvent(&livekit.AnalyticsEvent{ + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_TRACK_SUBSCRIBED, Timestamp: timestamppb.Now(), ParticipantId: participantID, @@ -135,10 +125,10 @@ func (t *TelemetryService) TrackSubscribed(participantID string, track *livekit. }) } -func (t *TelemetryService) TrackUnsubscribed(participantID string, track *livekit.TrackInfo) { +func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { prometheus.SubSubscribedTrack(track.Type.String()) - t.sendEvent(&livekit.AnalyticsEvent{ + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_TRACK_UNSUBSCRIBED, Timestamp: timestamppb.Now(), ParticipantId: participantID, @@ -146,7 +136,7 @@ func (t *TelemetryService) TrackUnsubscribed(participantID string, track *liveki }) } -func (t *TelemetryService) RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) { +func (t *telemetryService) RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) { t.notifyEvent(ctx, &livekit.WebhookEvent{ Event: webhook.EventRecordingStarted, RecordingInfo: &livekit.RecordingInfo{ @@ -155,27 +145,27 @@ func (t *TelemetryService) RecordingStarted(ctx context.Context, recordingID str }, }) - t.sendEvent(&livekit.AnalyticsEvent{ + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_RECORDING_STARTED, Timestamp: timestamppb.Now(), RecordingId: recordingID, }) } -func (t *TelemetryService) RecordingEnded(res *livekit.RecordingResult) { - t.notifyEvent(context.Background(), &livekit.WebhookEvent{ +func (t *telemetryService) RecordingEnded(ctx context.Context, res *livekit.RecordingResult) { + t.notifyEvent(ctx, &livekit.WebhookEvent{ Event: webhook.EventRecordingFinished, RecordingResult: res, }) - t.sendEvent(&livekit.AnalyticsEvent{ + t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_RECORDING_ENDED, Timestamp: timestamppb.Now(), RecordingId: res.Id, }) } -func (t *TelemetryService) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) { +func (t *telemetryService) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) { if t.notifier == nil { return } @@ -186,13 +176,3 @@ func (t *TelemetryService) notifyEvent(ctx context.Context, event *livekit.Webho } }) } - -func (t *TelemetryService) sendEvent(event *livekit.AnalyticsEvent) { - if t.analyticsEnabled { - if err := t.events.Send(&livekit.AnalyticsEvents{ - Events: []*livekit.AnalyticsEvent{event}, - }); err != nil { - logger.Errorw("failed to send event", err, "eventType", event.Type.String()) - } - } -} diff --git a/pkg/telemetry/interceptor.go b/pkg/telemetry/interceptor.go index 08fba08f8..28b347219 100644 --- a/pkg/telemetry/interceptor.go +++ b/pkg/telemetry/interceptor.go @@ -6,7 +6,7 @@ import ( "github.com/pion/rtcp" ) -func (t *TelemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory { +func (t *telemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory { return &StatsInterceptorFactory{ t: t, participantID: participantID, @@ -15,7 +15,7 @@ func (t *TelemetryService) NewStatsInterceptorFactory(participantID, identity st } type StatsInterceptorFactory struct { - t *TelemetryService + t TelemetryService participantID string identity string } @@ -31,7 +31,7 @@ func (f *StatsInterceptorFactory) NewInterceptor(id string) (interceptor.Interce type StatsInterceptor struct { interceptor.NoOp - t *TelemetryService + t TelemetryService participantID string identity string } diff --git a/pkg/telemetry/service.go b/pkg/telemetry/service.go index 79789ea39..213c8a1b5 100644 --- a/pkg/telemetry/service.go +++ b/pkg/telemetry/service.go @@ -1,18 +1,40 @@ package telemetry import ( + "context" "sync" "github.com/gammazero/workerpool" - "github.com/livekit/protocol/logger" livekit "github.com/livekit/protocol/proto" "github.com/livekit/protocol/webhook" "github.com/pion/rtcp" + "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) -type TelemetryService struct { +type TelemetryService interface { + // stats + NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory + AddUpTrack(participantID string, buff *buffer.Buffer) + OnDownstreamPacket(participantID string, bytes int) + HandleRTCP(streamType livekit.StreamType, participantID string, pkts []rtcp.Packet) + Report(ctx context.Context, stats []*livekit.AnalyticsStat) + + // events + RoomStarted(ctx context.Context, room *livekit.Room) + RoomEnded(ctx context.Context, room *livekit.Room) + ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) + ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) + TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) + TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) + TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) + TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) + RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) + RecordingEnded(ctx context.Context, res *livekit.RecordingResult) +} + +type telemetryService struct { notifier webhook.Notifier webhookPool *workerpool.WorkerPool @@ -20,26 +42,28 @@ type TelemetryService struct { // one worker per participant workers map[string]*StatsWorker - analyticsEnabled bool - analyticsKey string - nodeID string - events livekit.AnalyticsRecorderService_IngestEventsClient - stats livekit.AnalyticsRecorderService_IngestStatsClient + analytics AnalyticsService } -func NewTelemetryService(notifier webhook.Notifier) *TelemetryService { - return &TelemetryService{ +func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService { + return &telemetryService{ notifier: notifier, webhookPool: workerpool.New(1), workers: make(map[string]*StatsWorker), - - analyticsEnabled: false, // TODO - analyticsKey: "", - nodeID: "", + analytics: analytics, } } -func (t *TelemetryService) OnDownstreamPacket(participantID string, bytes int) { +func (t *telemetryService) AddUpTrack(participantID string, buff *buffer.Buffer) { + t.RLock() + w := t.workers[participantID] + t.RUnlock() + if w != nil { + w.AddBuffer(buff) + } +} + +func (t *telemetryService) OnDownstreamPacket(participantID string, bytes int) { t.RLock() w := t.workers[participantID] t.RUnlock() @@ -48,7 +72,7 @@ func (t *TelemetryService) OnDownstreamPacket(participantID string, bytes int) { } } -func (t *TelemetryService) HandleRTCP(streamType livekit.StreamType, participantID string, pkts []rtcp.Packet) { +func (t *telemetryService) HandleRTCP(streamType livekit.StreamType, participantID string, pkts []rtcp.Packet) { stats := &livekit.AnalyticsStat{} for _, pkt := range pkts { switch pkt := pkt.(type) { @@ -86,7 +110,7 @@ func (t *TelemetryService) HandleRTCP(streamType livekit.StreamType, participant } } -func (t *TelemetryService) Report(stats []*livekit.AnalyticsStat) { +func (t *telemetryService) Report(ctx context.Context, stats []*livekit.AnalyticsStat) { for _, stat := range stats { direction := prometheus.Incoming if stat.Kind == livekit.StreamType_DOWNSTREAM { @@ -97,18 +121,5 @@ func (t *TelemetryService) Report(stats []*livekit.AnalyticsStat) { prometheus.IncrementBytes(direction, stat.TotalBytes) } - t.sendStats(stats) -} - -func (t *TelemetryService) sendStats(stats []*livekit.AnalyticsStat) { - if t.analyticsEnabled { - for _, stat := range stats { - stat.AnalyticsKey = t.analyticsKey - stat.Node = t.nodeID - } - - if err := t.stats.Send(&livekit.AnalyticsStats{Stats: stats}); err != nil { - logger.Errorw("failed to send stats", err) - } - } + t.analytics.SendStats(ctx, stats) } diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index a95dc4a1f..9316ecf47 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -1,6 +1,7 @@ package telemetry import ( + "context" "sync" "time" @@ -14,7 +15,8 @@ const updateFrequency = time.Second * 10 // StatsWorker handles participant stats type StatsWorker struct { - t *TelemetryService + ctx context.Context + t TelemetryService roomID string roomName string participantID string @@ -38,12 +40,13 @@ type Stats struct { prevBytes uint64 } -func NewStatsWorker(t *TelemetryService, roomID, participantID, roomName string) *StatsWorker { +func newStatsWorker(ctx context.Context, t TelemetryService, roomID, roomName, participantID string) *StatsWorker { s := &StatsWorker{ + ctx: ctx, t: t, roomID: roomID, - participantID: participantID, roomName: roomName, + participantID: participantID, buffers: make(map[uint32]*buffer.Buffer), drain: make(map[uint32]bool), @@ -151,7 +154,7 @@ func (s *StatsWorker) Update() { stats = append(stats, downstream) } - s.t.Report(stats) + s.t.Report(s.ctx, stats) } func (s *StatsWorker) update(stats *Stats, ts *timestamppb.Timestamp) *livekit.AnalyticsStat {