Allow specifying extra webhooks with egress requests (#3597)

This commit is contained in:
Benjamin Pracht
2025-04-09 16:20:21 -07:00
committed by GitHub
parent 7e16106a0e
commit e5cbb22777
9 changed files with 126 additions and 77 deletions

2
go.mod
View File

@@ -23,7 +23,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20250310153736-45596af895b6
github.com/livekit/protocol v1.36.2-0.20250409063348-18e676b49301
github.com/livekit/protocol v1.36.2-0.20250409225025-9c8b99db90f5
github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0

4
go.sum
View File

@@ -171,8 +171,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20250310153736-45596af895b6 h1:6ZhtnY9I9knfm3ieIPpznQSEU2rDECO8yliW/ANLQ7U=
github.com/livekit/mediatransportutil v0.0.0-20250310153736-45596af895b6/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY=
github.com/livekit/protocol v1.36.2-0.20250409063348-18e676b49301 h1:XiI1VuokjisdmPhU2z7GJa4Q+cv6Z4fqNBl363L4bwA=
github.com/livekit/protocol v1.36.2-0.20250409063348-18e676b49301/go.mod h1:WrT/CYRxtMNOVUjnIPm5OjWtEkmreffTeE1PRZwlRg4=
github.com/livekit/protocol v1.36.2-0.20250409225025-9c8b99db90f5 h1:N0yFqrRS8obhf+/wyRW/OpdayMzGs5rbSmDVs/dB+mM=
github.com/livekit/protocol v1.36.2-0.20250409225025-9c8b99db90f5/go.mod h1:WrT/CYRxtMNOVUjnIPm5OjWtEkmreffTeE1PRZwlRg4=
github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 h1:fzuYpAQbCid7ySPpQWWePfQOWUrs8x6dJ0T3Wl07n+Y=
github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126/go.mod h1:X5WtEZ7OnEs72Fi5/J+i0on3964F1aynQpCalcgMqRo=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=

View File

@@ -42,16 +42,17 @@ func StartParticipantEgress(
) error {
if req, err := startParticipantEgress(ctx, launcher, opts, identity, roomName, roomID); err != nil {
// send egress failed webhook
ts.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventEgressEnded,
EgressInfo: &livekit.EgressInfo{
RoomId: string(roomID),
RoomName: string(roomName),
Status: livekit.EgressStatus_EGRESS_FAILED,
Error: err.Error(),
Request: &livekit.EgressInfo_Participant{Participant: req},
},
})
info := &livekit.EgressInfo{
RoomId: string(roomID),
RoomName: string(roomName),
Status: livekit.EgressStatus_EGRESS_FAILED,
Error: err.Error(),
Request: &livekit.EgressInfo_Participant{Participant: req},
}
ts.NotifyEgressEvent(ctx, webhook.EventEgressEnded, info)
return err
}
return nil
@@ -103,16 +104,16 @@ func StartTrackEgress(
) error {
if req, err := startTrackEgress(ctx, launcher, opts, track, roomName, roomID); err != nil {
// send egress failed webhook
ts.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventEgressEnded,
EgressInfo: &livekit.EgressInfo{
RoomId: string(roomID),
RoomName: string(roomName),
Status: livekit.EgressStatus_EGRESS_FAILED,
Error: err.Error(),
Request: &livekit.EgressInfo_Track{Track: req},
},
})
info := &livekit.EgressInfo{
RoomId: string(roomID),
RoomName: string(roomName),
Status: livekit.EgressStatus_EGRESS_FAILED,
Error: err.Error(),
Request: &livekit.EgressInfo_Track{Track: req},
}
ts.NotifyEgressEvent(ctx, webhook.EventEgressEnded, info)
return err
}
return nil

View File

@@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/auth/authfakes"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
@@ -797,6 +798,12 @@ type testRoomOpts struct {
}
func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room {
kp := &authfakes.FakeKeyProvider{}
kp.GetSecretReturns("testkey")
n, err := webhook.NewDefaultNotifier(webhook.DefaultWebHookConfig, kp)
require.NoError(t, err)
rm := NewRoom(
&livekit.Room{Name: "room"},
nil,
@@ -818,7 +825,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room {
NodeId: "testnode",
Region: "testregion",
},
telemetry.NewTelemetryService(webhook.NewDefaultNotifier(webhook.DefaultWebHookConfig, ""), &telemetryfakes.FakeAnalyticsService{}),
telemetry.NewTelemetryService(n, &telemetryfakes.FakeAnalyticsService{}),
nil, nil, nil,
)
for i := 0; i < opts.num+opts.numHidden; i++ {

View File

@@ -159,15 +159,13 @@ func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) {
func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webhook.QueuedNotifier, error) {
wc := conf.WebHook
if len(wc.URLs) == 0 {
return nil, nil
}
secret := provider.GetSecret(wc.APIKey)
if secret == "" {
if secret == "" && len(wc.URLs) > 0 {
return nil, ErrWebHookMissingAPIKey
}
return webhook.NewDefaultNotifier(wc, secret), nil
return webhook.NewDefaultNotifier(wc, provider)
}
func createRedisClient(conf *config.Config) (redis.UniversalClient, error) {

View File

@@ -219,15 +219,13 @@ func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) {
func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webhook.QueuedNotifier, error) {
wc := conf.WebHook
if len(wc.URLs) == 0 {
return nil, nil
}
secret := provider.GetSecret(wc.APIKey)
if secret == "" {
if secret == "" && len(wc.URLs) > 0 {
return nil, ErrWebHookMissingAPIKey
}
return webhook.NewDefaultNotifier(wc, secret), nil
return webhook.NewDefaultNotifier(wc, provider)
}
func createRedisClient(conf *config.Config) (redis.UniversalClient, error) {

View File

@@ -28,7 +28,7 @@ import (
"github.com/livekit/protocol/webhook"
)
func (t *telemetryService) NotifyEvent(ctx context.Context, event *livekit.WebhookEvent) {
func (t *telemetryService) NotifyEvent(ctx context.Context, event *livekit.WebhookEvent, opts ...webhook.NotifyOption) {
if t.notifier == nil {
return
}
@@ -36,7 +36,7 @@ func (t *telemetryService) NotifyEvent(ctx context.Context, event *livekit.Webho
event.CreatedAt = time.Now().Unix()
event.Id = guid.New("EV_")
if err := t.notifier.QueueNotify(ctx, event); err != nil {
if err := t.notifier.QueueNotify(ctx, event, opts...); err != nil {
logger.Warnw("failed to notify webhook", err, "event", event.Event)
}
}
@@ -428,12 +428,19 @@ func (t *telemetryService) TrackSubscribeRTPStats(
})
}
func (t *telemetryService) NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo) {
opts := getEgressNotifyOptions(info)
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: event,
EgressInfo: info,
}, opts...)
}
func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) {
t.enqueue(func() {
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventEgressStarted,
EgressInfo: info,
})
t.NotifyEgressEvent(ctx, webhook.EventEgressStarted, info)
t.SendEvent(ctx, newEgressEvent(livekit.AnalyticsEventType_EGRESS_STARTED, info))
})
@@ -441,20 +448,15 @@ func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.Egre
func (t *telemetryService) EgressUpdated(ctx context.Context, info *livekit.EgressInfo) {
t.enqueue(func() {
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventEgressUpdated,
EgressInfo: info,
})
t.NotifyEgressEvent(ctx, webhook.EventEgressUpdated, info)
t.SendEvent(ctx, newEgressEvent(livekit.AnalyticsEventType_EGRESS_UPDATED, info))
})
}
func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) {
t.enqueue(func() {
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventEgressEnded,
EgressInfo: info,
})
t.NotifyEgressEvent(ctx, webhook.EventEgressEnded, info)
t.SendEvent(ctx, newEgressEvent(livekit.AnalyticsEventType_EGRESS_ENDED, info))
})
@@ -596,3 +598,44 @@ func newIngressEvent(event livekit.AnalyticsEventType, ingress *livekit.IngressI
Ingress: ingress,
}
}
func getEgressNotifyOptions(egressInfo *livekit.EgressInfo) []webhook.NotifyOption {
if egressInfo == nil {
return nil
}
if egressInfo.Request == nil {
return nil
}
var whs []*livekit.WebhookConfig
switch req := egressInfo.Request.(type) {
case *livekit.EgressInfo_RoomComposite:
if req.RoomComposite != nil {
whs = req.RoomComposite.Webhooks
}
case *livekit.EgressInfo_Web:
if req.Web != nil {
whs = req.Web.Webhooks
}
case *livekit.EgressInfo_Participant:
if req.Participant != nil {
whs = req.Participant.Webhooks
}
case *livekit.EgressInfo_TrackComposite:
if req.TrackComposite != nil {
whs = req.TrackComposite.Webhooks
}
case *livekit.EgressInfo_Track:
if req.Track != nil {
whs = req.Track.Webhooks
}
}
if len(whs) > 0 {
return []webhook.NotifyOption{webhook.WithExtraWebhooks(whs)}
}
return nil
}

View File

@@ -75,11 +75,12 @@ type FakeTelemetryService struct {
arg1 context.Context
arg2 *livekit.AnalyticsNodeRooms
}
NotifyEventStub func(context.Context, *livekit.WebhookEvent)
notifyEventMutex sync.RWMutex
notifyEventArgsForCall []struct {
NotifyEgressEventStub func(context.Context, string, *livekit.EgressInfo)
notifyEgressEventMutex sync.RWMutex
notifyEgressEventArgsForCall []struct {
arg1 context.Context
arg2 *livekit.WebhookEvent
arg2 string
arg3 *livekit.EgressInfo
}
ParticipantActiveStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta, bool)
participantActiveMutex sync.RWMutex
@@ -630,37 +631,38 @@ func (fake *FakeTelemetryService) LocalRoomStateArgsForCall(i int) (context.Cont
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeTelemetryService) NotifyEvent(arg1 context.Context, arg2 *livekit.WebhookEvent) {
fake.notifyEventMutex.Lock()
fake.notifyEventArgsForCall = append(fake.notifyEventArgsForCall, struct {
func (fake *FakeTelemetryService) NotifyEgressEvent(arg1 context.Context, arg2 string, arg3 *livekit.EgressInfo) {
fake.notifyEgressEventMutex.Lock()
fake.notifyEgressEventArgsForCall = append(fake.notifyEgressEventArgsForCall, struct {
arg1 context.Context
arg2 *livekit.WebhookEvent
}{arg1, arg2})
stub := fake.NotifyEventStub
fake.recordInvocation("NotifyEvent", []interface{}{arg1, arg2})
fake.notifyEventMutex.Unlock()
arg2 string
arg3 *livekit.EgressInfo
}{arg1, arg2, arg3})
stub := fake.NotifyEgressEventStub
fake.recordInvocation("NotifyEgressEvent", []interface{}{arg1, arg2, arg3})
fake.notifyEgressEventMutex.Unlock()
if stub != nil {
fake.NotifyEventStub(arg1, arg2)
fake.NotifyEgressEventStub(arg1, arg2, arg3)
}
}
func (fake *FakeTelemetryService) NotifyEventCallCount() int {
fake.notifyEventMutex.RLock()
defer fake.notifyEventMutex.RUnlock()
return len(fake.notifyEventArgsForCall)
func (fake *FakeTelemetryService) NotifyEgressEventCallCount() int {
fake.notifyEgressEventMutex.RLock()
defer fake.notifyEgressEventMutex.RUnlock()
return len(fake.notifyEgressEventArgsForCall)
}
func (fake *FakeTelemetryService) NotifyEventCalls(stub func(context.Context, *livekit.WebhookEvent)) {
fake.notifyEventMutex.Lock()
defer fake.notifyEventMutex.Unlock()
fake.NotifyEventStub = stub
func (fake *FakeTelemetryService) NotifyEgressEventCalls(stub func(context.Context, string, *livekit.EgressInfo)) {
fake.notifyEgressEventMutex.Lock()
defer fake.notifyEgressEventMutex.Unlock()
fake.NotifyEgressEventStub = stub
}
func (fake *FakeTelemetryService) NotifyEventArgsForCall(i int) (context.Context, *livekit.WebhookEvent) {
fake.notifyEventMutex.RLock()
defer fake.notifyEventMutex.RUnlock()
argsForCall := fake.notifyEventArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
func (fake *FakeTelemetryService) NotifyEgressEventArgsForCall(i int) (context.Context, string, *livekit.EgressInfo) {
fake.notifyEgressEventMutex.RLock()
defer fake.notifyEgressEventMutex.RUnlock()
argsForCall := fake.notifyEgressEventArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeTelemetryService) ParticipantActive(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.AnalyticsClientMeta, arg5 bool) {
@@ -1559,8 +1561,8 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} {
defer fake.ingressUpdatedMutex.RUnlock()
fake.localRoomStateMutex.RLock()
defer fake.localRoomStateMutex.RUnlock()
fake.notifyEventMutex.RLock()
defer fake.notifyEventMutex.RUnlock()
fake.notifyEgressEventMutex.RLock()
defer fake.notifyEgressEventMutex.RUnlock()
fake.participantActiveMutex.RLock()
defer fake.participantActiveMutex.RUnlock()
fake.participantJoinedMutex.RLock()

View File

@@ -83,7 +83,7 @@ type TelemetryService interface {
// helpers
AnalyticsService
NotifyEvent(ctx context.Context, event *livekit.WebhookEvent)
NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo)
FlushStats()
}