diff --git a/pkg/service/ingress.go b/pkg/service/ingress.go index cb5604e2c..04d4a6bd8 100644 --- a/pkg/service/ingress.go +++ b/pkg/service/ingress.go @@ -110,6 +110,7 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref logger.Errorw("could not write ingress info", err) return nil, err } + s.telemetry.IngressCreated(ctx, info) return info, nil } @@ -254,5 +255,8 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI } info.State.Status = livekit.IngressState_ENDPOINT_INACTIVE + + s.telemetry.IngressDeleted(ctx, info) + return info, nil } diff --git a/pkg/service/ioinfo.go b/pkg/service/ioinfo.go index 529eec8f9..41ff4541c 100644 --- a/pkg/service/ioinfo.go +++ b/pkg/service/ioinfo.go @@ -109,10 +109,37 @@ func (s *IOInfoService) loadIngressFromInfoRequest(req *rpc.GetIngressInfoReques } func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error) { + info, err := s.is.LoadIngress(ctx, req.IngressId) + if err != nil { + return nil, err + } + if err := s.is.UpdateIngressState(ctx, req.IngressId, req.State); err != nil { logger.Errorw("could not update ingress", err) return nil, err } + + if info.State.Status != req.State.Status { + info.State = req.State + + switch req.State.Status { + case livekit.IngressState_ENDPOINT_ERROR, + livekit.IngressState_ENDPOINT_INACTIVE: + s.telemetry.IngressEnded(ctx, info) + + if req.State.Error != "" { + logger.Infow("ingress failed", "error", req.State.Error, "ingressID", req.IngressId) + } else { + logger.Infow("ingress ended", "ingressID", req.IngressId) + } + + case livekit.IngressState_ENDPOINT_PUBLISHING: + s.telemetry.IngressStarted(ctx, info) + + logger.Infow("ingress started", "ingressID", req.IngressId) + } + } + return &emptypb.Empty{}, nil } diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index c75e4d828..1e831b9b5 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -429,6 +429,40 @@ func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.Egress }) } +func (t *telemetryService) IngressCreated(ctx context.Context, info *livekit.IngressInfo) { + t.enqueue(func() { + t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_CREATED, info)) + }) +} + +func (t *telemetryService) IngressDeleted(ctx context.Context, info *livekit.IngressInfo) { + t.enqueue(func() { + t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_DELETED, info)) + }) +} + +func (t *telemetryService) IngressStarted(ctx context.Context, info *livekit.IngressInfo) { + t.enqueue(func() { + t.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventIngressStarted, + IngressInfo: info, + }) + + t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_STARTED, info)) + }) +} + +func (t *telemetryService) IngressEnded(ctx context.Context, info *livekit.IngressInfo) { + t.enqueue(func() { + t.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventIngressEnded, + IngressInfo: info, + }) + + t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_ENDED, info)) + }) +} + // returns a livekit.Room with only name and sid filled out // returns nil if room is not found func (t *telemetryService) getRoomDetails(participantID livekit.ParticipantID) *livekit.Room { @@ -483,3 +517,12 @@ func newEgressEvent(event livekit.AnalyticsEventType, egress *livekit.EgressInfo Egress: egress, } } + +func newIngressEvent(event livekit.AnalyticsEventType, ingress *livekit.IngressInfo) *livekit.AnalyticsEvent { + return &livekit.AnalyticsEvent{ + Type: event, + Timestamp: timestamppb.Now(), + IngressId: ingress.IngressId, + Ingress: ingress, + } +} diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index 923916164..800160fb7 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -32,6 +32,30 @@ type FakeTelemetryService struct { flushStatsMutex sync.RWMutex flushStatsArgsForCall []struct { } + IngressCreatedStub func(context.Context, *livekit.IngressInfo) + ingressCreatedMutex sync.RWMutex + ingressCreatedArgsForCall []struct { + arg1 context.Context + arg2 *livekit.IngressInfo + } + IngressDeletedStub func(context.Context, *livekit.IngressInfo) + ingressDeletedMutex sync.RWMutex + ingressDeletedArgsForCall []struct { + arg1 context.Context + arg2 *livekit.IngressInfo + } + IngressEndedStub func(context.Context, *livekit.IngressInfo) + ingressEndedMutex sync.RWMutex + ingressEndedArgsForCall []struct { + arg1 context.Context + arg2 *livekit.IngressInfo + } + IngressStartedStub func(context.Context, *livekit.IngressInfo) + ingressStartedMutex sync.RWMutex + ingressStartedArgsForCall []struct { + arg1 context.Context + arg2 *livekit.IngressInfo + } NotifyEventStub func(context.Context, *livekit.WebhookEvent) notifyEventMutex sync.RWMutex notifyEventArgsForCall []struct { @@ -337,6 +361,138 @@ func (fake *FakeTelemetryService) FlushStatsCalls(stub func()) { fake.FlushStatsStub = stub } +func (fake *FakeTelemetryService) IngressCreated(arg1 context.Context, arg2 *livekit.IngressInfo) { + fake.ingressCreatedMutex.Lock() + fake.ingressCreatedArgsForCall = append(fake.ingressCreatedArgsForCall, struct { + arg1 context.Context + arg2 *livekit.IngressInfo + }{arg1, arg2}) + stub := fake.IngressCreatedStub + fake.recordInvocation("IngressCreated", []interface{}{arg1, arg2}) + fake.ingressCreatedMutex.Unlock() + if stub != nil { + fake.IngressCreatedStub(arg1, arg2) + } +} + +func (fake *FakeTelemetryService) IngressCreatedCallCount() int { + fake.ingressCreatedMutex.RLock() + defer fake.ingressCreatedMutex.RUnlock() + return len(fake.ingressCreatedArgsForCall) +} + +func (fake *FakeTelemetryService) IngressCreatedCalls(stub func(context.Context, *livekit.IngressInfo)) { + fake.ingressCreatedMutex.Lock() + defer fake.ingressCreatedMutex.Unlock() + fake.IngressCreatedStub = stub +} + +func (fake *FakeTelemetryService) IngressCreatedArgsForCall(i int) (context.Context, *livekit.IngressInfo) { + fake.ingressCreatedMutex.RLock() + defer fake.ingressCreatedMutex.RUnlock() + argsForCall := fake.ingressCreatedArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeTelemetryService) IngressDeleted(arg1 context.Context, arg2 *livekit.IngressInfo) { + fake.ingressDeletedMutex.Lock() + fake.ingressDeletedArgsForCall = append(fake.ingressDeletedArgsForCall, struct { + arg1 context.Context + arg2 *livekit.IngressInfo + }{arg1, arg2}) + stub := fake.IngressDeletedStub + fake.recordInvocation("IngressDeleted", []interface{}{arg1, arg2}) + fake.ingressDeletedMutex.Unlock() + if stub != nil { + fake.IngressDeletedStub(arg1, arg2) + } +} + +func (fake *FakeTelemetryService) IngressDeletedCallCount() int { + fake.ingressDeletedMutex.RLock() + defer fake.ingressDeletedMutex.RUnlock() + return len(fake.ingressDeletedArgsForCall) +} + +func (fake *FakeTelemetryService) IngressDeletedCalls(stub func(context.Context, *livekit.IngressInfo)) { + fake.ingressDeletedMutex.Lock() + defer fake.ingressDeletedMutex.Unlock() + fake.IngressDeletedStub = stub +} + +func (fake *FakeTelemetryService) IngressDeletedArgsForCall(i int) (context.Context, *livekit.IngressInfo) { + fake.ingressDeletedMutex.RLock() + defer fake.ingressDeletedMutex.RUnlock() + argsForCall := fake.ingressDeletedArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeTelemetryService) IngressEnded(arg1 context.Context, arg2 *livekit.IngressInfo) { + fake.ingressEndedMutex.Lock() + fake.ingressEndedArgsForCall = append(fake.ingressEndedArgsForCall, struct { + arg1 context.Context + arg2 *livekit.IngressInfo + }{arg1, arg2}) + stub := fake.IngressEndedStub + fake.recordInvocation("IngressEnded", []interface{}{arg1, arg2}) + fake.ingressEndedMutex.Unlock() + if stub != nil { + fake.IngressEndedStub(arg1, arg2) + } +} + +func (fake *FakeTelemetryService) IngressEndedCallCount() int { + fake.ingressEndedMutex.RLock() + defer fake.ingressEndedMutex.RUnlock() + return len(fake.ingressEndedArgsForCall) +} + +func (fake *FakeTelemetryService) IngressEndedCalls(stub func(context.Context, *livekit.IngressInfo)) { + fake.ingressEndedMutex.Lock() + defer fake.ingressEndedMutex.Unlock() + fake.IngressEndedStub = stub +} + +func (fake *FakeTelemetryService) IngressEndedArgsForCall(i int) (context.Context, *livekit.IngressInfo) { + fake.ingressEndedMutex.RLock() + defer fake.ingressEndedMutex.RUnlock() + argsForCall := fake.ingressEndedArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeTelemetryService) IngressStarted(arg1 context.Context, arg2 *livekit.IngressInfo) { + fake.ingressStartedMutex.Lock() + fake.ingressStartedArgsForCall = append(fake.ingressStartedArgsForCall, struct { + arg1 context.Context + arg2 *livekit.IngressInfo + }{arg1, arg2}) + stub := fake.IngressStartedStub + fake.recordInvocation("IngressStarted", []interface{}{arg1, arg2}) + fake.ingressStartedMutex.Unlock() + if stub != nil { + fake.IngressStartedStub(arg1, arg2) + } +} + +func (fake *FakeTelemetryService) IngressStartedCallCount() int { + fake.ingressStartedMutex.RLock() + defer fake.ingressStartedMutex.RUnlock() + return len(fake.ingressStartedArgsForCall) +} + +func (fake *FakeTelemetryService) IngressStartedCalls(stub func(context.Context, *livekit.IngressInfo)) { + fake.ingressStartedMutex.Lock() + defer fake.ingressStartedMutex.Unlock() + fake.IngressStartedStub = stub +} + +func (fake *FakeTelemetryService) IngressStartedArgsForCall(i int) (context.Context, *livekit.IngressInfo) { + fake.ingressStartedMutex.RLock() + defer fake.ingressStartedMutex.RUnlock() + argsForCall := fake.ingressStartedArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeTelemetryService) NotifyEvent(arg1 context.Context, arg2 *livekit.WebhookEvent) { fake.notifyEventMutex.Lock() fake.notifyEventArgsForCall = append(fake.notifyEventArgsForCall, struct { @@ -1152,6 +1308,14 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} { defer fake.egressUpdatedMutex.RUnlock() fake.flushStatsMutex.RLock() defer fake.flushStatsMutex.RUnlock() + fake.ingressCreatedMutex.RLock() + defer fake.ingressCreatedMutex.RUnlock() + fake.ingressDeletedMutex.RLock() + defer fake.ingressDeletedMutex.RUnlock() + fake.ingressEndedMutex.RLock() + defer fake.ingressEndedMutex.RUnlock() + fake.ingressStartedMutex.RLock() + defer fake.ingressStartedMutex.RUnlock() fake.notifyEventMutex.RLock() defer fake.notifyEventMutex.RUnlock() fake.participantActiveMutex.RLock() diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index fdc6ff052..d3aaf1715 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -54,6 +54,10 @@ type TelemetryService interface { EgressStarted(ctx context.Context, info *livekit.EgressInfo) EgressUpdated(ctx context.Context, info *livekit.EgressInfo) EgressEnded(ctx context.Context, info *livekit.EgressInfo) + IngressCreated(ctx context.Context, info *livekit.IngressInfo) + IngressDeleted(ctx context.Context, info *livekit.IngressInfo) + IngressStarted(ctx context.Context, info *livekit.IngressInfo) + IngressEnded(ctx context.Context, info *livekit.IngressInfo) // helpers AnalyticsService