From 40ceddd18b54c35ea1466271ed30f3383adeee79 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sat, 15 Apr 2023 01:20:23 -0700 Subject: [PATCH] Integrate QueuedNotifier, fixes out-of-order delivery (#1615) --- go.mod | 3 +-- go.sum | 7 +++---- pkg/rtc/room_test.go | 2 +- pkg/rtc/subscriptionmanager_test.go | 5 ++++- pkg/service/wire.go | 4 ++-- pkg/service/wire_gen.go | 8 ++++---- pkg/telemetry/events.go | 8 +++----- pkg/telemetry/telemetryservice.go | 16 ++++++---------- 8 files changed, 24 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index e6549ab34..4717dd5a7 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26 - github.com/livekit/protocol v1.5.4-0.20230413111958-5fea69067bbc + github.com/livekit/protocol v1.5.4 github.com/livekit/psrpc v0.2.11-0.20230405191830-d76f71512630 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.14.0 @@ -98,7 +98,6 @@ require ( golang.org/x/net v0.9.0 // indirect golang.org/x/sys v0.7.0 // indirect golang.org/x/text v0.9.0 // indirect - golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.6.0 // indirect google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd // indirect google.golang.org/grpc v1.54.0 // indirect diff --git a/go.sum b/go.sum index f6e8e47c1..3b56e50e4 100644 --- a/go.sum +++ b/go.sum @@ -124,8 +124,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26 h1:QlQFyMwCDgjyySsrgmrMcVbEBA6KZcyTzvK+z346tUA= github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26/go.mod h1:eDA41kiySZoG+wy4Etsjb3w0jjLx69i/vAmSjG4bteA= -github.com/livekit/protocol v1.5.4-0.20230413111958-5fea69067bbc h1:15IrYsN4PRgrH2MldkYgnTqqNxgRgjVGLjEtwurphCQ= -github.com/livekit/protocol v1.5.4-0.20230413111958-5fea69067bbc/go.mod h1:YPmFvsD0cr7KlC7wsoLTLwCAAJun/ovCDBCvUnWvdwo= +github.com/livekit/protocol v1.5.4 h1:lfEUqsE9AV1ZI/w8oZUKSAoi708V8RYwraOjeY83KVo= +github.com/livekit/protocol v1.5.4/go.mod h1:KJJVGHiNR6abdJIpoxB1kqQH2s902wM3cMt+P4p6jao= github.com/livekit/psrpc v0.2.11-0.20230405191830-d76f71512630 h1:Rm5KLZgQxWnTidY+H8MsAV6sk1iiFxeXqPFgSLkMing= github.com/livekit/psrpc v0.2.11-0.20230405191830-d76f71512630/go.mod h1:K0j8f1PgLShR7Lx80KbmwFkDH2BvOnycXGV0OSRURKc= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -233,8 +233,8 @@ github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJf github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/redis/go-redis/v9 v9.0.3 h1:+7mmR26M0IvyLxGZUHxu4GiBkJkVDid0Un+j4ScYu4k= github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= -github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rs/cors v1.9.0 h1:l9HGsTsHJcvW14Nk7J9KFz8bzeAWXn3CG6bgt7LsrAE= github.com/rs/cors v1.9.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -382,7 +382,6 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= -golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 9f4a4fd10..8d93367bf 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -701,7 +701,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room { NodeId: "testnode", Region: "testregion", }, - telemetry.NewTelemetryService(webhook.NewNotifier("", "", nil), &telemetryfakes.FakeAnalyticsService{}), + telemetry.NewTelemetryService(webhook.NewDefaultNotifier("", "", nil), &telemetryfakes.FakeAnalyticsService{}), nil, ) for i := 0; i < opts.num+opts.numHidden; i++ { diff --git a/pkg/rtc/subscriptionmanager_test.go b/pkg/rtc/subscriptionmanager_test.go index 6a0edfdb5..05fea3d8c 100644 --- a/pkg/rtc/subscriptionmanager_test.go +++ b/pkg/rtc/subscriptionmanager_test.go @@ -76,7 +76,10 @@ func TestSubscribe(t *testing.T) { require.NotNil(t, s.getSubscribedTrack()) require.Len(t, sm.GetSubscribedTracks(), 1) - require.Len(t, sm.GetSubscribedParticipants(), 1) + + require.Eventually(t, func() bool { + return len(sm.GetSubscribedParticipants()) == 1 + }, subSettleTimeout, subCheckInterval, "GetSubscribedParticipants should have returned one item") require.Equal(t, "pubID", string(sm.GetSubscribedParticipants()[0])) // ensure telemetry events are sent diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 67310da29..5da0e2c79 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -114,7 +114,7 @@ func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) { return auth.NewFileBasedKeyProviderFromMap(conf.Keys), nil } -func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webhook.Notifier, error) { +func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webhook.QueuedNotifier, error) { wc := conf.WebHook if len(wc.URLs) == 0 { return nil, nil @@ -124,7 +124,7 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webh return nil, ErrWebHookMissingAPIKey } - return webhook.NewNotifier(wc.APIKey, secret, wc.URLs), nil + return webhook.NewDefaultNotifier(wc.APIKey, secret, wc.URLs), nil } func createRedisClient(conf *config.Config) (redis.UniversalClient, error) { diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 080858485..bce220b63 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -63,12 +63,12 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - notifier, err := createWebhookNotifier(conf, keyProvider) + queuedNotifier, err := createWebhookNotifier(conf, keyProvider) if err != nil { return nil, err } analyticsService := telemetry.NewAnalyticsService(conf, currentNode) - telemetryService := telemetry.NewTelemetryService(notifier, analyticsService) + telemetryService := telemetry.NewTelemetryService(queuedNotifier, analyticsService) rtcEgressLauncher := NewEgressLauncher(egressClient, rpcClient, egressStore, telemetryService) roomService, err := NewRoomService(roomConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher) if err != nil { @@ -159,7 +159,7 @@ func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) { return auth.NewFileBasedKeyProviderFromMap(conf.Keys), nil } -func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webhook.Notifier, error) { +func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webhook.QueuedNotifier, error) { wc := conf.WebHook if len(wc.URLs) == 0 { return nil, nil @@ -169,7 +169,7 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webh return nil, ErrWebHookMissingAPIKey } - return webhook.NewNotifier(wc.APIKey, secret, wc.URLs), nil + return webhook.NewDefaultNotifier(wc.APIKey, secret, wc.URLs), nil } func createRedisClient(conf *config.Config) (redis.UniversalClient, error) { diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index 75f7c8ce2..c75e4d828 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -21,11 +21,9 @@ func (t *telemetryService) NotifyEvent(ctx context.Context, event *livekit.Webho event.CreatedAt = time.Now().Unix() event.Id = utils.NewGuid("EV_") - t.webhookPool.Submit(func() { - if err := t.notifier.Notify(ctx, event); err != nil { - logger.Warnw("failed to notify webhook", err, "event", event.Event) - } - }) + if err := t.notifier.QueueNotify(ctx, event); err != nil { + logger.Warnw("failed to notify webhook", err, "event", event.Event) + } } func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) { diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index d78664bf6..9e6ef65ae 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -5,8 +5,6 @@ import ( "sync" "time" - "github.com/gammazero/workerpool" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -72,22 +70,20 @@ const ( type telemetryService struct { AnalyticsService - notifier webhook.Notifier - webhookPool *workerpool.WorkerPool - jobsChan chan func() + notifier webhook.QueuedNotifier + jobsChan chan func() lock sync.RWMutex workers map[livekit.ParticipantID]*StatsWorker } -func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService { +func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsService) TelemetryService { t := &telemetryService{ AnalyticsService: analytics, - notifier: notifier, - webhookPool: workerpool.New(maxWebhookWorkers), - jobsChan: make(chan func(), jobQueueBufferSize), - workers: make(map[livekit.ParticipantID]*StatsWorker), + notifier: notifier, + jobsChan: make(chan func(), jobQueueBufferSize), + workers: make(map[livekit.ParticipantID]*StatsWorker), } go t.run()