diff --git a/go.mod b/go.mod index eede62fef..fd744cac4 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20250310153736-45596af895b6 - github.com/livekit/protocol v1.36.2-0.20250409063348-18e676b49301 + github.com/livekit/protocol v1.36.2-0.20250409225025-9c8b99db90f5 github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index b979da6eb..909599b62 100644 --- a/go.sum +++ b/go.sum @@ -171,8 +171,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20250310153736-45596af895b6 h1:6ZhtnY9I9knfm3ieIPpznQSEU2rDECO8yliW/ANLQ7U= github.com/livekit/mediatransportutil v0.0.0-20250310153736-45596af895b6/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY= -github.com/livekit/protocol v1.36.2-0.20250409063348-18e676b49301 h1:XiI1VuokjisdmPhU2z7GJa4Q+cv6Z4fqNBl363L4bwA= -github.com/livekit/protocol v1.36.2-0.20250409063348-18e676b49301/go.mod h1:WrT/CYRxtMNOVUjnIPm5OjWtEkmreffTeE1PRZwlRg4= +github.com/livekit/protocol v1.36.2-0.20250409225025-9c8b99db90f5 h1:N0yFqrRS8obhf+/wyRW/OpdayMzGs5rbSmDVs/dB+mM= +github.com/livekit/protocol v1.36.2-0.20250409225025-9c8b99db90f5/go.mod h1:WrT/CYRxtMNOVUjnIPm5OjWtEkmreffTeE1PRZwlRg4= github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 h1:fzuYpAQbCid7ySPpQWWePfQOWUrs8x6dJ0T3Wl07n+Y= github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126/go.mod h1:X5WtEZ7OnEs72Fi5/J+i0on3964F1aynQpCalcgMqRo= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/rtc/egress.go b/pkg/rtc/egress.go index b3fc017a3..06ff4eaf2 100644 --- a/pkg/rtc/egress.go +++ b/pkg/rtc/egress.go @@ -42,16 +42,17 @@ func StartParticipantEgress( ) error { if req, err := startParticipantEgress(ctx, launcher, opts, identity, roomName, roomID); err != nil { // send egress failed webhook - ts.NotifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventEgressEnded, - EgressInfo: &livekit.EgressInfo{ - RoomId: string(roomID), - RoomName: string(roomName), - Status: livekit.EgressStatus_EGRESS_FAILED, - Error: err.Error(), - Request: &livekit.EgressInfo_Participant{Participant: req}, - }, - }) + + info := &livekit.EgressInfo{ + RoomId: string(roomID), + RoomName: string(roomName), + Status: livekit.EgressStatus_EGRESS_FAILED, + Error: err.Error(), + Request: &livekit.EgressInfo_Participant{Participant: req}, + } + + ts.NotifyEgressEvent(ctx, webhook.EventEgressEnded, info) + return err } return nil @@ -103,16 +104,16 @@ func StartTrackEgress( ) error { if req, err := startTrackEgress(ctx, launcher, opts, track, roomName, roomID); err != nil { // send egress failed webhook - ts.NotifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventEgressEnded, - EgressInfo: &livekit.EgressInfo{ - RoomId: string(roomID), - RoomName: string(roomName), - Status: livekit.EgressStatus_EGRESS_FAILED, - Error: err.Error(), - Request: &livekit.EgressInfo_Track{Track: req}, - }, - }) + + info := &livekit.EgressInfo{ + RoomId: string(roomID), + RoomName: string(roomName), + Status: livekit.EgressStatus_EGRESS_FAILED, + Error: err.Error(), + Request: &livekit.EgressInfo_Track{Track: req}, + } + ts.NotifyEgressEvent(ctx, webhook.EventEgressEnded, info) + return err } return nil diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 83bcc563d..a294baccb 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "github.com/livekit/protocol/auth/authfakes" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" "github.com/livekit/protocol/webhook" @@ -797,6 +798,12 @@ type testRoomOpts struct { } func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room { + kp := &authfakes.FakeKeyProvider{} + kp.GetSecretReturns("testkey") + + n, err := webhook.NewDefaultNotifier(webhook.DefaultWebHookConfig, kp) + require.NoError(t, err) + rm := NewRoom( &livekit.Room{Name: "room"}, nil, @@ -818,7 +825,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room { NodeId: "testnode", Region: "testregion", }, - telemetry.NewTelemetryService(webhook.NewDefaultNotifier(webhook.DefaultWebHookConfig, ""), &telemetryfakes.FakeAnalyticsService{}), + telemetry.NewTelemetryService(n, &telemetryfakes.FakeAnalyticsService{}), nil, nil, nil, ) for i := 0; i < opts.num+opts.numHidden; i++ { diff --git a/pkg/service/wire.go b/pkg/service/wire.go index dcef42908..7e1f78d11 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -159,15 +159,13 @@ func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) { func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webhook.QueuedNotifier, error) { wc := conf.WebHook - if len(wc.URLs) == 0 { - return nil, nil - } + secret := provider.GetSecret(wc.APIKey) - if secret == "" { + if secret == "" && len(wc.URLs) > 0 { return nil, ErrWebHookMissingAPIKey } - return webhook.NewDefaultNotifier(wc, secret), nil + return webhook.NewDefaultNotifier(wc, provider) } func createRedisClient(conf *config.Config) (redis.UniversalClient, error) { diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 5afe6d959..9558733a1 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -219,15 +219,13 @@ func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) { func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webhook.QueuedNotifier, error) { wc := conf.WebHook - if len(wc.URLs) == 0 { - return nil, nil - } + secret := provider.GetSecret(wc.APIKey) - if secret == "" { + if secret == "" && len(wc.URLs) > 0 { return nil, ErrWebHookMissingAPIKey } - return webhook.NewDefaultNotifier(wc, secret), nil + return webhook.NewDefaultNotifier(wc, provider) } func createRedisClient(conf *config.Config) (redis.UniversalClient, error) { diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index d0ba9389a..a1ca53c1f 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -28,7 +28,7 @@ import ( "github.com/livekit/protocol/webhook" ) -func (t *telemetryService) NotifyEvent(ctx context.Context, event *livekit.WebhookEvent) { +func (t *telemetryService) NotifyEvent(ctx context.Context, event *livekit.WebhookEvent, opts ...webhook.NotifyOption) { if t.notifier == nil { return } @@ -36,7 +36,7 @@ func (t *telemetryService) NotifyEvent(ctx context.Context, event *livekit.Webho event.CreatedAt = time.Now().Unix() event.Id = guid.New("EV_") - if err := t.notifier.QueueNotify(ctx, event); err != nil { + if err := t.notifier.QueueNotify(ctx, event, opts...); err != nil { logger.Warnw("failed to notify webhook", err, "event", event.Event) } } @@ -428,12 +428,19 @@ func (t *telemetryService) TrackSubscribeRTPStats( }) } +func (t *telemetryService) NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo) { + opts := getEgressNotifyOptions(info) + + t.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: event, + EgressInfo: info, + }, opts...) +} + func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) { + t.enqueue(func() { - t.NotifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventEgressStarted, - EgressInfo: info, - }) + t.NotifyEgressEvent(ctx, webhook.EventEgressStarted, info) t.SendEvent(ctx, newEgressEvent(livekit.AnalyticsEventType_EGRESS_STARTED, info)) }) @@ -441,20 +448,15 @@ func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.Egre func (t *telemetryService) EgressUpdated(ctx context.Context, info *livekit.EgressInfo) { t.enqueue(func() { - t.NotifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventEgressUpdated, - EgressInfo: info, - }) + t.NotifyEgressEvent(ctx, webhook.EventEgressUpdated, info) + t.SendEvent(ctx, newEgressEvent(livekit.AnalyticsEventType_EGRESS_UPDATED, info)) }) } func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) { t.enqueue(func() { - t.NotifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventEgressEnded, - EgressInfo: info, - }) + t.NotifyEgressEvent(ctx, webhook.EventEgressEnded, info) t.SendEvent(ctx, newEgressEvent(livekit.AnalyticsEventType_EGRESS_ENDED, info)) }) @@ -596,3 +598,44 @@ func newIngressEvent(event livekit.AnalyticsEventType, ingress *livekit.IngressI Ingress: ingress, } } + +func getEgressNotifyOptions(egressInfo *livekit.EgressInfo) []webhook.NotifyOption { + if egressInfo == nil { + return nil + } + + if egressInfo.Request == nil { + return nil + } + + var whs []*livekit.WebhookConfig + + switch req := egressInfo.Request.(type) { + case *livekit.EgressInfo_RoomComposite: + if req.RoomComposite != nil { + whs = req.RoomComposite.Webhooks + } + case *livekit.EgressInfo_Web: + if req.Web != nil { + whs = req.Web.Webhooks + } + case *livekit.EgressInfo_Participant: + if req.Participant != nil { + whs = req.Participant.Webhooks + } + case *livekit.EgressInfo_TrackComposite: + if req.TrackComposite != nil { + whs = req.TrackComposite.Webhooks + } + case *livekit.EgressInfo_Track: + if req.Track != nil { + whs = req.Track.Webhooks + } + } + + if len(whs) > 0 { + return []webhook.NotifyOption{webhook.WithExtraWebhooks(whs)} + } + + return nil +} diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index 52ab44536..627fc6eb0 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -75,11 +75,12 @@ type FakeTelemetryService struct { arg1 context.Context arg2 *livekit.AnalyticsNodeRooms } - NotifyEventStub func(context.Context, *livekit.WebhookEvent) - notifyEventMutex sync.RWMutex - notifyEventArgsForCall []struct { + NotifyEgressEventStub func(context.Context, string, *livekit.EgressInfo) + notifyEgressEventMutex sync.RWMutex + notifyEgressEventArgsForCall []struct { arg1 context.Context - arg2 *livekit.WebhookEvent + arg2 string + arg3 *livekit.EgressInfo } ParticipantActiveStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta, bool) participantActiveMutex sync.RWMutex @@ -630,37 +631,38 @@ func (fake *FakeTelemetryService) LocalRoomStateArgsForCall(i int) (context.Cont return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeTelemetryService) NotifyEvent(arg1 context.Context, arg2 *livekit.WebhookEvent) { - fake.notifyEventMutex.Lock() - fake.notifyEventArgsForCall = append(fake.notifyEventArgsForCall, struct { +func (fake *FakeTelemetryService) NotifyEgressEvent(arg1 context.Context, arg2 string, arg3 *livekit.EgressInfo) { + fake.notifyEgressEventMutex.Lock() + fake.notifyEgressEventArgsForCall = append(fake.notifyEgressEventArgsForCall, struct { arg1 context.Context - arg2 *livekit.WebhookEvent - }{arg1, arg2}) - stub := fake.NotifyEventStub - fake.recordInvocation("NotifyEvent", []interface{}{arg1, arg2}) - fake.notifyEventMutex.Unlock() + arg2 string + arg3 *livekit.EgressInfo + }{arg1, arg2, arg3}) + stub := fake.NotifyEgressEventStub + fake.recordInvocation("NotifyEgressEvent", []interface{}{arg1, arg2, arg3}) + fake.notifyEgressEventMutex.Unlock() if stub != nil { - fake.NotifyEventStub(arg1, arg2) + fake.NotifyEgressEventStub(arg1, arg2, arg3) } } -func (fake *FakeTelemetryService) NotifyEventCallCount() int { - fake.notifyEventMutex.RLock() - defer fake.notifyEventMutex.RUnlock() - return len(fake.notifyEventArgsForCall) +func (fake *FakeTelemetryService) NotifyEgressEventCallCount() int { + fake.notifyEgressEventMutex.RLock() + defer fake.notifyEgressEventMutex.RUnlock() + return len(fake.notifyEgressEventArgsForCall) } -func (fake *FakeTelemetryService) NotifyEventCalls(stub func(context.Context, *livekit.WebhookEvent)) { - fake.notifyEventMutex.Lock() - defer fake.notifyEventMutex.Unlock() - fake.NotifyEventStub = stub +func (fake *FakeTelemetryService) NotifyEgressEventCalls(stub func(context.Context, string, *livekit.EgressInfo)) { + fake.notifyEgressEventMutex.Lock() + defer fake.notifyEgressEventMutex.Unlock() + fake.NotifyEgressEventStub = stub } -func (fake *FakeTelemetryService) NotifyEventArgsForCall(i int) (context.Context, *livekit.WebhookEvent) { - fake.notifyEventMutex.RLock() - defer fake.notifyEventMutex.RUnlock() - argsForCall := fake.notifyEventArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 +func (fake *FakeTelemetryService) NotifyEgressEventArgsForCall(i int) (context.Context, string, *livekit.EgressInfo) { + fake.notifyEgressEventMutex.RLock() + defer fake.notifyEgressEventMutex.RUnlock() + argsForCall := fake.notifyEgressEventArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeTelemetryService) ParticipantActive(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.AnalyticsClientMeta, arg5 bool) { @@ -1559,8 +1561,8 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} { defer fake.ingressUpdatedMutex.RUnlock() fake.localRoomStateMutex.RLock() defer fake.localRoomStateMutex.RUnlock() - fake.notifyEventMutex.RLock() - defer fake.notifyEventMutex.RUnlock() + fake.notifyEgressEventMutex.RLock() + defer fake.notifyEgressEventMutex.RUnlock() fake.participantActiveMutex.RLock() defer fake.participantActiveMutex.RUnlock() fake.participantJoinedMutex.RLock() diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 71e5d1860..431517e4e 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -83,7 +83,7 @@ type TelemetryService interface { // helpers AnalyticsService - NotifyEvent(ctx context.Context, event *livekit.WebhookEvent) + NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo) FlushStats() }