// Copyright 2023 LiveKit, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package telemetry import ( "context" "go.uber.org/atomic" "google.golang.org/protobuf/types/known/timestamppb" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/observability/roomobs" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils/guid" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" ) //counterfeiter:generate . AnalyticsService type AnalyticsService interface { SendStats(ctx context.Context, stats []*livekit.AnalyticsStat) SendEvent(ctx context.Context, events *livekit.AnalyticsEvent) SendNodeRoomStates(ctx context.Context, nodeRooms *livekit.AnalyticsNodeRooms) RoomProjectReporter(ctx context.Context) roomobs.ProjectReporter } // ---------------------------- var _ AnalyticsService = &NullAnalyticService{} type NullAnalyticService struct{} func (n NullAnalyticService) SendStats(_ context.Context, _ []*livekit.AnalyticsStat) {} func (n NullAnalyticService) SendEvent(_ context.Context, _ *livekit.AnalyticsEvent) {} func (n NullAnalyticService) SendNodeRoomStates(_ context.Context, _ *livekit.AnalyticsNodeRooms) {} func (n NullAnalyticService) RoomProjectReporter(_ctx context.Context) roomobs.ProjectReporter { return nil } // ---------------------------- type analyticsService struct { analyticsKey string nodeID string sequenceNumber atomic.Uint64 events rpc.AnalyticsRecorderService_IngestEventsClient stats rpc.AnalyticsRecorderService_IngestStatsClient nodeRooms rpc.AnalyticsRecorderService_IngestNodeRoomStatesClient } func NewAnalyticsService(_ *config.Config, currentNode routing.LocalNode) AnalyticsService { return &analyticsService{ analyticsKey: "", // TODO: conf.AnalyticsKey nodeID: string(currentNode.NodeID()), } } func (a *analyticsService) SendStats(_ context.Context, stats []*livekit.AnalyticsStat) { if a.stats == nil { return } for _, stat := range stats { stat.Id = guid.New("AS_") stat.AnalyticsKey = a.analyticsKey stat.Node = a.nodeID } if err := a.stats.Send(&livekit.AnalyticsStats{Stats: stats}); err != nil { logger.Errorw("failed to send stats", err) } } func (a *analyticsService) SendEvent(_ context.Context, event *livekit.AnalyticsEvent) { if a.events == nil { return } event.Id = guid.New("AE_") event.NodeId = a.nodeID event.AnalyticsKey = a.analyticsKey if err := a.events.Send(&livekit.AnalyticsEvents{ Events: []*livekit.AnalyticsEvent{event}, }); err != nil { logger.Errorw("failed to send event", err, "eventType", event.Type.String()) } } func (a *analyticsService) SendNodeRoomStates(_ context.Context, nodeRooms *livekit.AnalyticsNodeRooms) { if a.nodeRooms == nil { return } nodeRooms.NodeId = a.nodeID nodeRooms.SequenceNumber = a.sequenceNumber.Add(1) nodeRooms.Timestamp = timestamppb.Now() if err := a.nodeRooms.Send(nodeRooms); err != nil { logger.Errorw("failed to send node room states", err) } } func (a *analyticsService) RoomProjectReporter(_ context.Context) roomobs.ProjectReporter { return roomobs.NewNoopProjectReporter() }