mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 15:35:41 +00:00
Analytics: send local node room state/info (#2335)
* Analytics: send local node room state/info Signed-off-by: shishir gowda <shishir@livekit.io>
This commit is contained in:
6
go.mod
6
go.mod
@@ -18,7 +18,7 @@ require (
|
||||
github.com/jxskiss/base62 v1.1.0
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
|
||||
github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f
|
||||
github.com/livekit/protocol v1.9.4-0.20231221100346-e9e5fcd7d371
|
||||
github.com/livekit/protocol v1.9.4-0.20231222234445-80e8b5a2d1fa
|
||||
github.com/livekit/psrpc v0.5.3-0.20231214055026-06ce27a934c9
|
||||
github.com/mackerelio/go-osstat v0.2.4
|
||||
github.com/magefile/mage v1.15.0
|
||||
@@ -48,7 +48,7 @@ require (
|
||||
go.uber.org/atomic v1.11.0
|
||||
golang.org/x/exp v0.0.0-20231219180239-dc181d75b848
|
||||
golang.org/x/sync v0.5.0
|
||||
google.golang.org/protobuf v1.31.0
|
||||
google.golang.org/protobuf v1.32.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
@@ -61,7 +61,7 @@ require (
|
||||
github.com/eapache/channels v1.1.0 // indirect
|
||||
github.com/eapache/queue v1.1.0 // indirect
|
||||
github.com/go-jose/go-jose/v3 v3.0.1 // indirect
|
||||
github.com/go-logr/logr v1.3.0 // indirect
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/google/go-cmp v0.5.9 // indirect
|
||||
github.com/google/subcommands v1.2.0 // indirect
|
||||
|
||||
12
go.sum
12
go.sum
@@ -42,8 +42,8 @@ github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44
|
||||
github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc=
|
||||
github.com/go-jose/go-jose/v3 v3.0.1 h1:pWmKFVtt+Jl0vBZTIpz/eAKwsm6LkIxDVVbFHKkchhA=
|
||||
github.com/go-jose/go-jose/v3 v3.0.1/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8=
|
||||
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
|
||||
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
|
||||
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
@@ -126,8 +126,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f h1:XHrwGwLNGQB3ZqolH1YdMH/22hgXKr4vm+2M7JKMMGg=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc=
|
||||
github.com/livekit/protocol v1.9.4-0.20231221100346-e9e5fcd7d371 h1:mF2FpLIPs2CDGQ1QB99Z2USItNhDd+tg4m5+flt4I+Y=
|
||||
github.com/livekit/protocol v1.9.4-0.20231221100346-e9e5fcd7d371/go.mod h1:pdyn8m58RfiRl/0nA612rAD9eat5/kJI91UFqB/rYEU=
|
||||
github.com/livekit/protocol v1.9.4-0.20231222234445-80e8b5a2d1fa h1:qIzXYbpCR01Czwe/j2HYFzHp3j3VSCavdR+R2+qSmS4=
|
||||
github.com/livekit/protocol v1.9.4-0.20231222234445-80e8b5a2d1fa/go.mod h1:qL0J9HZaUrimDs29b/uRARvWn1cqjbvXUhayZ02RF9U=
|
||||
github.com/livekit/psrpc v0.5.3-0.20231214055026-06ce27a934c9 h1:kXXV/NLVDHZ+Gn7xrR+UPpdwbH48n7WReBjLHAzqzhY=
|
||||
github.com/livekit/psrpc v0.5.3-0.20231214055026-06ce27a934c9/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
|
||||
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
|
||||
@@ -433,8 +433,8 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
|
||||
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
|
||||
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
||||
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
|
||||
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
||||
@@ -17,6 +17,9 @@ package telemetry
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
|
||||
@@ -28,14 +31,17 @@ import (
|
||||
type AnalyticsService interface {
|
||||
SendStats(ctx context.Context, stats []*livekit.AnalyticsStat)
|
||||
SendEvent(ctx context.Context, events *livekit.AnalyticsEvent)
|
||||
SendNodeRoomStates(ctx context.Context, nodeRooms *livekit.AnalyticsNodeRooms)
|
||||
}
|
||||
|
||||
type analyticsService struct {
|
||||
analyticsKey string
|
||||
nodeID string
|
||||
analyticsKey string
|
||||
nodeID string
|
||||
sequenceNumber atomic.Uint64
|
||||
|
||||
events livekit.AnalyticsRecorderService_IngestEventsClient
|
||||
stats livekit.AnalyticsRecorderService_IngestStatsClient
|
||||
events livekit.AnalyticsRecorderService_IngestEventsClient
|
||||
stats livekit.AnalyticsRecorderService_IngestStatsClient
|
||||
nodeRooms livekit.AnalyticsRecorderService_IngestNodeRoomStatesClient
|
||||
}
|
||||
|
||||
func NewAnalyticsService(_ *config.Config, currentNode routing.LocalNode) AnalyticsService {
|
||||
@@ -71,3 +77,16 @@ func (a *analyticsService) SendEvent(_ context.Context, event *livekit.Analytics
|
||||
logger.Errorw("failed to send event", err, "eventType", event.Type.String())
|
||||
}
|
||||
}
|
||||
|
||||
func (a *analyticsService) SendNodeRoomStates(_ context.Context, nodeRooms *livekit.AnalyticsNodeRooms) {
|
||||
if a.nodeRooms == nil {
|
||||
return
|
||||
}
|
||||
|
||||
nodeRooms.NodeId = a.nodeID
|
||||
nodeRooms.SequenceNumber = a.sequenceNumber.Add(1)
|
||||
nodeRooms.Timestamp = timestamppb.Now()
|
||||
if err := a.nodeRooms.Send(nodeRooms); err != nil {
|
||||
logger.Errorw("failed to send node room states", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,12 @@ type FakeAnalyticsService struct {
|
||||
arg1 context.Context
|
||||
arg2 *livekit.AnalyticsEvent
|
||||
}
|
||||
SendNodeRoomStatesStub func(context.Context, *livekit.AnalyticsNodeRooms)
|
||||
sendNodeRoomStatesMutex sync.RWMutex
|
||||
sendNodeRoomStatesArgsForCall []struct {
|
||||
arg1 context.Context
|
||||
arg2 *livekit.AnalyticsNodeRooms
|
||||
}
|
||||
SendStatsStub func(context.Context, []*livekit.AnalyticsStat)
|
||||
sendStatsMutex sync.RWMutex
|
||||
sendStatsArgsForCall []struct {
|
||||
@@ -59,6 +65,39 @@ func (fake *FakeAnalyticsService) SendEventArgsForCall(i int) (context.Context,
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *FakeAnalyticsService) SendNodeRoomStates(arg1 context.Context, arg2 *livekit.AnalyticsNodeRooms) {
|
||||
fake.sendNodeRoomStatesMutex.Lock()
|
||||
fake.sendNodeRoomStatesArgsForCall = append(fake.sendNodeRoomStatesArgsForCall, struct {
|
||||
arg1 context.Context
|
||||
arg2 *livekit.AnalyticsNodeRooms
|
||||
}{arg1, arg2})
|
||||
stub := fake.SendNodeRoomStatesStub
|
||||
fake.recordInvocation("SendNodeRoomStates", []interface{}{arg1, arg2})
|
||||
fake.sendNodeRoomStatesMutex.Unlock()
|
||||
if stub != nil {
|
||||
fake.SendNodeRoomStatesStub(arg1, arg2)
|
||||
}
|
||||
}
|
||||
|
||||
func (fake *FakeAnalyticsService) SendNodeRoomStatesCallCount() int {
|
||||
fake.sendNodeRoomStatesMutex.RLock()
|
||||
defer fake.sendNodeRoomStatesMutex.RUnlock()
|
||||
return len(fake.sendNodeRoomStatesArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeAnalyticsService) SendNodeRoomStatesCalls(stub func(context.Context, *livekit.AnalyticsNodeRooms)) {
|
||||
fake.sendNodeRoomStatesMutex.Lock()
|
||||
defer fake.sendNodeRoomStatesMutex.Unlock()
|
||||
fake.SendNodeRoomStatesStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeAnalyticsService) SendNodeRoomStatesArgsForCall(i int) (context.Context, *livekit.AnalyticsNodeRooms) {
|
||||
fake.sendNodeRoomStatesMutex.RLock()
|
||||
defer fake.sendNodeRoomStatesMutex.RUnlock()
|
||||
argsForCall := fake.sendNodeRoomStatesArgsForCall[i]
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *FakeAnalyticsService) SendStats(arg1 context.Context, arg2 []*livekit.AnalyticsStat) {
|
||||
var arg2Copy []*livekit.AnalyticsStat
|
||||
if arg2 != nil {
|
||||
@@ -102,6 +141,8 @@ func (fake *FakeAnalyticsService) Invocations() map[string][][]interface{} {
|
||||
defer fake.invocationsMutex.RUnlock()
|
||||
fake.sendEventMutex.RLock()
|
||||
defer fake.sendEventMutex.RUnlock()
|
||||
fake.sendNodeRoomStatesMutex.RLock()
|
||||
defer fake.sendNodeRoomStatesMutex.RUnlock()
|
||||
fake.sendStatsMutex.RLock()
|
||||
defer fake.sendStatsMutex.RUnlock()
|
||||
copiedInvocations := map[string][][]interface{}{}
|
||||
|
||||
@@ -62,6 +62,12 @@ type FakeTelemetryService struct {
|
||||
arg1 context.Context
|
||||
arg2 *livekit.IngressInfo
|
||||
}
|
||||
LocalRoomStateStub func(context.Context, *livekit.AnalyticsNodeRooms)
|
||||
localRoomStateMutex sync.RWMutex
|
||||
localRoomStateArgsForCall []struct {
|
||||
arg1 context.Context
|
||||
arg2 *livekit.AnalyticsNodeRooms
|
||||
}
|
||||
NotifyEventStub func(context.Context, *livekit.WebhookEvent)
|
||||
notifyEventMutex sync.RWMutex
|
||||
notifyEventArgsForCall []struct {
|
||||
@@ -122,6 +128,12 @@ type FakeTelemetryService struct {
|
||||
arg1 context.Context
|
||||
arg2 *livekit.AnalyticsEvent
|
||||
}
|
||||
SendNodeRoomStatesStub func(context.Context, *livekit.AnalyticsNodeRooms)
|
||||
sendNodeRoomStatesMutex sync.RWMutex
|
||||
sendNodeRoomStatesArgsForCall []struct {
|
||||
arg1 context.Context
|
||||
arg2 *livekit.AnalyticsNodeRooms
|
||||
}
|
||||
SendStatsStub func(context.Context, []*livekit.AnalyticsStat)
|
||||
sendStatsMutex sync.RWMutex
|
||||
sendStatsArgsForCall []struct {
|
||||
@@ -533,6 +545,39 @@ func (fake *FakeTelemetryService) IngressUpdatedArgsForCall(i int) (context.Cont
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *FakeTelemetryService) LocalRoomState(arg1 context.Context, arg2 *livekit.AnalyticsNodeRooms) {
|
||||
fake.localRoomStateMutex.Lock()
|
||||
fake.localRoomStateArgsForCall = append(fake.localRoomStateArgsForCall, struct {
|
||||
arg1 context.Context
|
||||
arg2 *livekit.AnalyticsNodeRooms
|
||||
}{arg1, arg2})
|
||||
stub := fake.LocalRoomStateStub
|
||||
fake.recordInvocation("LocalRoomState", []interface{}{arg1, arg2})
|
||||
fake.localRoomStateMutex.Unlock()
|
||||
if stub != nil {
|
||||
fake.LocalRoomStateStub(arg1, arg2)
|
||||
}
|
||||
}
|
||||
|
||||
func (fake *FakeTelemetryService) LocalRoomStateCallCount() int {
|
||||
fake.localRoomStateMutex.RLock()
|
||||
defer fake.localRoomStateMutex.RUnlock()
|
||||
return len(fake.localRoomStateArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeTelemetryService) LocalRoomStateCalls(stub func(context.Context, *livekit.AnalyticsNodeRooms)) {
|
||||
fake.localRoomStateMutex.Lock()
|
||||
defer fake.localRoomStateMutex.Unlock()
|
||||
fake.LocalRoomStateStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeTelemetryService) LocalRoomStateArgsForCall(i int) (context.Context, *livekit.AnalyticsNodeRooms) {
|
||||
fake.localRoomStateMutex.RLock()
|
||||
defer fake.localRoomStateMutex.RUnlock()
|
||||
argsForCall := fake.localRoomStateArgsForCall[i]
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *FakeTelemetryService) NotifyEvent(arg1 context.Context, arg2 *livekit.WebhookEvent) {
|
||||
fake.notifyEventMutex.Lock()
|
||||
fake.notifyEventArgsForCall = append(fake.notifyEventArgsForCall, struct {
|
||||
@@ -809,6 +854,39 @@ func (fake *FakeTelemetryService) SendEventArgsForCall(i int) (context.Context,
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *FakeTelemetryService) SendNodeRoomStates(arg1 context.Context, arg2 *livekit.AnalyticsNodeRooms) {
|
||||
fake.sendNodeRoomStatesMutex.Lock()
|
||||
fake.sendNodeRoomStatesArgsForCall = append(fake.sendNodeRoomStatesArgsForCall, struct {
|
||||
arg1 context.Context
|
||||
arg2 *livekit.AnalyticsNodeRooms
|
||||
}{arg1, arg2})
|
||||
stub := fake.SendNodeRoomStatesStub
|
||||
fake.recordInvocation("SendNodeRoomStates", []interface{}{arg1, arg2})
|
||||
fake.sendNodeRoomStatesMutex.Unlock()
|
||||
if stub != nil {
|
||||
fake.SendNodeRoomStatesStub(arg1, arg2)
|
||||
}
|
||||
}
|
||||
|
||||
func (fake *FakeTelemetryService) SendNodeRoomStatesCallCount() int {
|
||||
fake.sendNodeRoomStatesMutex.RLock()
|
||||
defer fake.sendNodeRoomStatesMutex.RUnlock()
|
||||
return len(fake.sendNodeRoomStatesArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeTelemetryService) SendNodeRoomStatesCalls(stub func(context.Context, *livekit.AnalyticsNodeRooms)) {
|
||||
fake.sendNodeRoomStatesMutex.Lock()
|
||||
defer fake.sendNodeRoomStatesMutex.Unlock()
|
||||
fake.SendNodeRoomStatesStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeTelemetryService) SendNodeRoomStatesArgsForCall(i int) (context.Context, *livekit.AnalyticsNodeRooms) {
|
||||
fake.sendNodeRoomStatesMutex.RLock()
|
||||
defer fake.sendNodeRoomStatesMutex.RUnlock()
|
||||
argsForCall := fake.sendNodeRoomStatesArgsForCall[i]
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *FakeTelemetryService) SendStats(arg1 context.Context, arg2 []*livekit.AnalyticsStat) {
|
||||
var arg2Copy []*livekit.AnalyticsStat
|
||||
if arg2 != nil {
|
||||
@@ -1359,6 +1437,8 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} {
|
||||
defer fake.ingressStartedMutex.RUnlock()
|
||||
fake.ingressUpdatedMutex.RLock()
|
||||
defer fake.ingressUpdatedMutex.RUnlock()
|
||||
fake.localRoomStateMutex.RLock()
|
||||
defer fake.localRoomStateMutex.RUnlock()
|
||||
fake.notifyEventMutex.RLock()
|
||||
defer fake.notifyEventMutex.RUnlock()
|
||||
fake.participantActiveMutex.RLock()
|
||||
@@ -1375,6 +1455,8 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} {
|
||||
defer fake.roomStartedMutex.RUnlock()
|
||||
fake.sendEventMutex.RLock()
|
||||
defer fake.sendEventMutex.RUnlock()
|
||||
fake.sendNodeRoomStatesMutex.RLock()
|
||||
defer fake.sendNodeRoomStatesMutex.RUnlock()
|
||||
fake.sendStatsMutex.RLock()
|
||||
defer fake.sendStatsMutex.RUnlock()
|
||||
fake.trackMaxSubscribedVideoQualityMutex.RLock()
|
||||
|
||||
@@ -73,6 +73,7 @@ type TelemetryService interface {
|
||||
IngressStarted(ctx context.Context, info *livekit.IngressInfo)
|
||||
IngressUpdated(ctx context.Context, info *livekit.IngressInfo)
|
||||
IngressEnded(ctx context.Context, info *livekit.IngressInfo)
|
||||
LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms)
|
||||
|
||||
// helpers
|
||||
AnalyticsService
|
||||
@@ -187,3 +188,9 @@ func (t *telemetryService) cleanupWorkers() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *telemetryService) LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms) {
|
||||
t.enqueue(func() {
|
||||
t.SendNodeRoomStates(ctx, info)
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user