From 3770fbce6412cfd9bc7f7b629f9af3544c4dfa1a Mon Sep 17 00:00:00 2001 From: shishirng Date: Fri, 22 Dec 2023 18:59:04 -0500 Subject: [PATCH] Analytics: send local node room state/info (#2335) * Analytics: send local node room state/info Signed-off-by: shishir gowda --- go.mod | 6 +- go.sum | 12 +-- pkg/telemetry/analyticsservice.go | 27 +++++- .../telemetryfakes/fake_analytics_service.go | 41 ++++++++++ .../telemetryfakes/fake_telemetry_service.go | 82 +++++++++++++++++++ pkg/telemetry/telemetryservice.go | 7 ++ 6 files changed, 162 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 5e88b072b..0b61459ab 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f - github.com/livekit/protocol v1.9.4-0.20231221100346-e9e5fcd7d371 + github.com/livekit/protocol v1.9.4-0.20231222234445-80e8b5a2d1fa github.com/livekit/psrpc v0.5.3-0.20231214055026-06ce27a934c9 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -48,7 +48,7 @@ require ( go.uber.org/atomic v1.11.0 golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 golang.org/x/sync v0.5.0 - google.golang.org/protobuf v1.31.0 + google.golang.org/protobuf v1.32.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -61,7 +61,7 @@ require ( github.com/eapache/channels v1.1.0 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/go-jose/go-jose/v3 v3.0.1 // indirect - github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/google/subcommands v1.2.0 // indirect diff --git a/go.sum b/go.sum index ae4de7f61..abcfd8611 100644 --- a/go.sum +++ b/go.sum @@ -42,8 +42,8 @@ github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44 github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc= github.com/go-jose/go-jose/v3 v3.0.1 h1:pWmKFVtt+Jl0vBZTIpz/eAKwsm6LkIxDVVbFHKkchhA= github.com/go-jose/go-jose/v3 v3.0.1/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8= -github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= -github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -126,8 +126,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f h1:XHrwGwLNGQB3ZqolH1YdMH/22hgXKr4vm+2M7JKMMGg= github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc= -github.com/livekit/protocol v1.9.4-0.20231221100346-e9e5fcd7d371 h1:mF2FpLIPs2CDGQ1QB99Z2USItNhDd+tg4m5+flt4I+Y= -github.com/livekit/protocol v1.9.4-0.20231221100346-e9e5fcd7d371/go.mod h1:pdyn8m58RfiRl/0nA612rAD9eat5/kJI91UFqB/rYEU= +github.com/livekit/protocol v1.9.4-0.20231222234445-80e8b5a2d1fa h1:qIzXYbpCR01Czwe/j2HYFzHp3j3VSCavdR+R2+qSmS4= +github.com/livekit/protocol v1.9.4-0.20231222234445-80e8b5a2d1fa/go.mod h1:qL0J9HZaUrimDs29b/uRARvWn1cqjbvXUhayZ02RF9U= github.com/livekit/psrpc v0.5.3-0.20231214055026-06ce27a934c9 h1:kXXV/NLVDHZ+Gn7xrR+UPpdwbH48n7WReBjLHAzqzhY= github.com/livekit/psrpc v0.5.3-0.20231214055026-06ce27a934c9/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -433,8 +433,8 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= +google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/telemetry/analyticsservice.go b/pkg/telemetry/analyticsservice.go index 8611337f1..3f1955873 100644 --- a/pkg/telemetry/analyticsservice.go +++ b/pkg/telemetry/analyticsservice.go @@ -17,6 +17,9 @@ package telemetry import ( "context" + "go.uber.org/atomic" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -28,14 +31,17 @@ import ( type AnalyticsService interface { SendStats(ctx context.Context, stats []*livekit.AnalyticsStat) SendEvent(ctx context.Context, events *livekit.AnalyticsEvent) + SendNodeRoomStates(ctx context.Context, nodeRooms *livekit.AnalyticsNodeRooms) } type analyticsService struct { - analyticsKey string - nodeID string + analyticsKey string + nodeID string + sequenceNumber atomic.Uint64 - events livekit.AnalyticsRecorderService_IngestEventsClient - stats livekit.AnalyticsRecorderService_IngestStatsClient + events livekit.AnalyticsRecorderService_IngestEventsClient + stats livekit.AnalyticsRecorderService_IngestStatsClient + nodeRooms livekit.AnalyticsRecorderService_IngestNodeRoomStatesClient } func NewAnalyticsService(_ *config.Config, currentNode routing.LocalNode) AnalyticsService { @@ -71,3 +77,16 @@ func (a *analyticsService) SendEvent(_ context.Context, event *livekit.Analytics logger.Errorw("failed to send event", err, "eventType", event.Type.String()) } } + +func (a *analyticsService) SendNodeRoomStates(_ context.Context, nodeRooms *livekit.AnalyticsNodeRooms) { + if a.nodeRooms == nil { + return + } + + nodeRooms.NodeId = a.nodeID + nodeRooms.SequenceNumber = a.sequenceNumber.Add(1) + nodeRooms.Timestamp = timestamppb.Now() + if err := a.nodeRooms.Send(nodeRooms); err != nil { + logger.Errorw("failed to send node room states", err) + } +} diff --git a/pkg/telemetry/telemetryfakes/fake_analytics_service.go b/pkg/telemetry/telemetryfakes/fake_analytics_service.go index 5af3a3513..21b2bb6a1 100644 --- a/pkg/telemetry/telemetryfakes/fake_analytics_service.go +++ b/pkg/telemetry/telemetryfakes/fake_analytics_service.go @@ -16,6 +16,12 @@ type FakeAnalyticsService struct { arg1 context.Context arg2 *livekit.AnalyticsEvent } + SendNodeRoomStatesStub func(context.Context, *livekit.AnalyticsNodeRooms) + sendNodeRoomStatesMutex sync.RWMutex + sendNodeRoomStatesArgsForCall []struct { + arg1 context.Context + arg2 *livekit.AnalyticsNodeRooms + } SendStatsStub func(context.Context, []*livekit.AnalyticsStat) sendStatsMutex sync.RWMutex sendStatsArgsForCall []struct { @@ -59,6 +65,39 @@ func (fake *FakeAnalyticsService) SendEventArgsForCall(i int) (context.Context, return argsForCall.arg1, argsForCall.arg2 } +func (fake *FakeAnalyticsService) SendNodeRoomStates(arg1 context.Context, arg2 *livekit.AnalyticsNodeRooms) { + fake.sendNodeRoomStatesMutex.Lock() + fake.sendNodeRoomStatesArgsForCall = append(fake.sendNodeRoomStatesArgsForCall, struct { + arg1 context.Context + arg2 *livekit.AnalyticsNodeRooms + }{arg1, arg2}) + stub := fake.SendNodeRoomStatesStub + fake.recordInvocation("SendNodeRoomStates", []interface{}{arg1, arg2}) + fake.sendNodeRoomStatesMutex.Unlock() + if stub != nil { + fake.SendNodeRoomStatesStub(arg1, arg2) + } +} + +func (fake *FakeAnalyticsService) SendNodeRoomStatesCallCount() int { + fake.sendNodeRoomStatesMutex.RLock() + defer fake.sendNodeRoomStatesMutex.RUnlock() + return len(fake.sendNodeRoomStatesArgsForCall) +} + +func (fake *FakeAnalyticsService) SendNodeRoomStatesCalls(stub func(context.Context, *livekit.AnalyticsNodeRooms)) { + fake.sendNodeRoomStatesMutex.Lock() + defer fake.sendNodeRoomStatesMutex.Unlock() + fake.SendNodeRoomStatesStub = stub +} + +func (fake *FakeAnalyticsService) SendNodeRoomStatesArgsForCall(i int) (context.Context, *livekit.AnalyticsNodeRooms) { + fake.sendNodeRoomStatesMutex.RLock() + defer fake.sendNodeRoomStatesMutex.RUnlock() + argsForCall := fake.sendNodeRoomStatesArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeAnalyticsService) SendStats(arg1 context.Context, arg2 []*livekit.AnalyticsStat) { var arg2Copy []*livekit.AnalyticsStat if arg2 != nil { @@ -102,6 +141,8 @@ func (fake *FakeAnalyticsService) Invocations() map[string][][]interface{} { defer fake.invocationsMutex.RUnlock() fake.sendEventMutex.RLock() defer fake.sendEventMutex.RUnlock() + fake.sendNodeRoomStatesMutex.RLock() + defer fake.sendNodeRoomStatesMutex.RUnlock() fake.sendStatsMutex.RLock() defer fake.sendStatsMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index fd3ae6ff5..3be71676b 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -62,6 +62,12 @@ type FakeTelemetryService struct { arg1 context.Context arg2 *livekit.IngressInfo } + LocalRoomStateStub func(context.Context, *livekit.AnalyticsNodeRooms) + localRoomStateMutex sync.RWMutex + localRoomStateArgsForCall []struct { + arg1 context.Context + arg2 *livekit.AnalyticsNodeRooms + } NotifyEventStub func(context.Context, *livekit.WebhookEvent) notifyEventMutex sync.RWMutex notifyEventArgsForCall []struct { @@ -122,6 +128,12 @@ type FakeTelemetryService struct { arg1 context.Context arg2 *livekit.AnalyticsEvent } + SendNodeRoomStatesStub func(context.Context, *livekit.AnalyticsNodeRooms) + sendNodeRoomStatesMutex sync.RWMutex + sendNodeRoomStatesArgsForCall []struct { + arg1 context.Context + arg2 *livekit.AnalyticsNodeRooms + } SendStatsStub func(context.Context, []*livekit.AnalyticsStat) sendStatsMutex sync.RWMutex sendStatsArgsForCall []struct { @@ -533,6 +545,39 @@ func (fake *FakeTelemetryService) IngressUpdatedArgsForCall(i int) (context.Cont return argsForCall.arg1, argsForCall.arg2 } +func (fake *FakeTelemetryService) LocalRoomState(arg1 context.Context, arg2 *livekit.AnalyticsNodeRooms) { + fake.localRoomStateMutex.Lock() + fake.localRoomStateArgsForCall = append(fake.localRoomStateArgsForCall, struct { + arg1 context.Context + arg2 *livekit.AnalyticsNodeRooms + }{arg1, arg2}) + stub := fake.LocalRoomStateStub + fake.recordInvocation("LocalRoomState", []interface{}{arg1, arg2}) + fake.localRoomStateMutex.Unlock() + if stub != nil { + fake.LocalRoomStateStub(arg1, arg2) + } +} + +func (fake *FakeTelemetryService) LocalRoomStateCallCount() int { + fake.localRoomStateMutex.RLock() + defer fake.localRoomStateMutex.RUnlock() + return len(fake.localRoomStateArgsForCall) +} + +func (fake *FakeTelemetryService) LocalRoomStateCalls(stub func(context.Context, *livekit.AnalyticsNodeRooms)) { + fake.localRoomStateMutex.Lock() + defer fake.localRoomStateMutex.Unlock() + fake.LocalRoomStateStub = stub +} + +func (fake *FakeTelemetryService) LocalRoomStateArgsForCall(i int) (context.Context, *livekit.AnalyticsNodeRooms) { + fake.localRoomStateMutex.RLock() + defer fake.localRoomStateMutex.RUnlock() + argsForCall := fake.localRoomStateArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeTelemetryService) NotifyEvent(arg1 context.Context, arg2 *livekit.WebhookEvent) { fake.notifyEventMutex.Lock() fake.notifyEventArgsForCall = append(fake.notifyEventArgsForCall, struct { @@ -809,6 +854,39 @@ func (fake *FakeTelemetryService) SendEventArgsForCall(i int) (context.Context, return argsForCall.arg1, argsForCall.arg2 } +func (fake *FakeTelemetryService) SendNodeRoomStates(arg1 context.Context, arg2 *livekit.AnalyticsNodeRooms) { + fake.sendNodeRoomStatesMutex.Lock() + fake.sendNodeRoomStatesArgsForCall = append(fake.sendNodeRoomStatesArgsForCall, struct { + arg1 context.Context + arg2 *livekit.AnalyticsNodeRooms + }{arg1, arg2}) + stub := fake.SendNodeRoomStatesStub + fake.recordInvocation("SendNodeRoomStates", []interface{}{arg1, arg2}) + fake.sendNodeRoomStatesMutex.Unlock() + if stub != nil { + fake.SendNodeRoomStatesStub(arg1, arg2) + } +} + +func (fake *FakeTelemetryService) SendNodeRoomStatesCallCount() int { + fake.sendNodeRoomStatesMutex.RLock() + defer fake.sendNodeRoomStatesMutex.RUnlock() + return len(fake.sendNodeRoomStatesArgsForCall) +} + +func (fake *FakeTelemetryService) SendNodeRoomStatesCalls(stub func(context.Context, *livekit.AnalyticsNodeRooms)) { + fake.sendNodeRoomStatesMutex.Lock() + defer fake.sendNodeRoomStatesMutex.Unlock() + fake.SendNodeRoomStatesStub = stub +} + +func (fake *FakeTelemetryService) SendNodeRoomStatesArgsForCall(i int) (context.Context, *livekit.AnalyticsNodeRooms) { + fake.sendNodeRoomStatesMutex.RLock() + defer fake.sendNodeRoomStatesMutex.RUnlock() + argsForCall := fake.sendNodeRoomStatesArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeTelemetryService) SendStats(arg1 context.Context, arg2 []*livekit.AnalyticsStat) { var arg2Copy []*livekit.AnalyticsStat if arg2 != nil { @@ -1359,6 +1437,8 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} { defer fake.ingressStartedMutex.RUnlock() fake.ingressUpdatedMutex.RLock() defer fake.ingressUpdatedMutex.RUnlock() + fake.localRoomStateMutex.RLock() + defer fake.localRoomStateMutex.RUnlock() fake.notifyEventMutex.RLock() defer fake.notifyEventMutex.RUnlock() fake.participantActiveMutex.RLock() @@ -1375,6 +1455,8 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} { defer fake.roomStartedMutex.RUnlock() fake.sendEventMutex.RLock() defer fake.sendEventMutex.RUnlock() + fake.sendNodeRoomStatesMutex.RLock() + defer fake.sendNodeRoomStatesMutex.RUnlock() fake.sendStatsMutex.RLock() defer fake.sendStatsMutex.RUnlock() fake.trackMaxSubscribedVideoQualityMutex.RLock() diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index c74619eda..572bba118 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -73,6 +73,7 @@ type TelemetryService interface { IngressStarted(ctx context.Context, info *livekit.IngressInfo) IngressUpdated(ctx context.Context, info *livekit.IngressInfo) IngressEnded(ctx context.Context, info *livekit.IngressInfo) + LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms) // helpers AnalyticsService @@ -187,3 +188,9 @@ func (t *telemetryService) cleanupWorkers() { } } } + +func (t *telemetryService) LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms) { + t.enqueue(func() { + t.SendNodeRoomStates(ctx, info) + }) +}