Integrate QueuedNotifier, fixes out-of-order delivery (#1615)

This commit is contained in:
David Zhao
2023-04-15 01:20:23 -07:00
committed by GitHub
parent e75b73af52
commit 40ceddd18b
8 changed files with 24 additions and 29 deletions

3
go.mod
View File

@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26
github.com/livekit/protocol v1.5.4-0.20230413111958-5fea69067bbc
github.com/livekit/protocol v1.5.4
github.com/livekit/psrpc v0.2.11-0.20230405191830-d76f71512630
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.14.0
@@ -98,7 +98,6 @@ require (
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.6.0 // indirect
google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd // indirect
google.golang.org/grpc v1.54.0 // indirect

7
go.sum
View File

@@ -124,8 +124,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26 h1:QlQFyMwCDgjyySsrgmrMcVbEBA6KZcyTzvK+z346tUA=
github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26/go.mod h1:eDA41kiySZoG+wy4Etsjb3w0jjLx69i/vAmSjG4bteA=
github.com/livekit/protocol v1.5.4-0.20230413111958-5fea69067bbc h1:15IrYsN4PRgrH2MldkYgnTqqNxgRgjVGLjEtwurphCQ=
github.com/livekit/protocol v1.5.4-0.20230413111958-5fea69067bbc/go.mod h1:YPmFvsD0cr7KlC7wsoLTLwCAAJun/ovCDBCvUnWvdwo=
github.com/livekit/protocol v1.5.4 h1:lfEUqsE9AV1ZI/w8oZUKSAoi708V8RYwraOjeY83KVo=
github.com/livekit/protocol v1.5.4/go.mod h1:KJJVGHiNR6abdJIpoxB1kqQH2s902wM3cMt+P4p6jao=
github.com/livekit/psrpc v0.2.11-0.20230405191830-d76f71512630 h1:Rm5KLZgQxWnTidY+H8MsAV6sk1iiFxeXqPFgSLkMing=
github.com/livekit/psrpc v0.2.11-0.20230405191830-d76f71512630/go.mod h1:K0j8f1PgLShR7Lx80KbmwFkDH2BvOnycXGV0OSRURKc=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
@@ -233,8 +233,8 @@ github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJf
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/redis/go-redis/v9 v9.0.3 h1:+7mmR26M0IvyLxGZUHxu4GiBkJkVDid0Un+j4ScYu4k=
github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rs/cors v1.9.0 h1:l9HGsTsHJcvW14Nk7J9KFz8bzeAWXn3CG6bgt7LsrAE=
github.com/rs/cors v1.9.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
@@ -382,7 +382,6 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=

View File

@@ -701,7 +701,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room {
NodeId: "testnode",
Region: "testregion",
},
telemetry.NewTelemetryService(webhook.NewNotifier("", "", nil), &telemetryfakes.FakeAnalyticsService{}),
telemetry.NewTelemetryService(webhook.NewDefaultNotifier("", "", nil), &telemetryfakes.FakeAnalyticsService{}),
nil,
)
for i := 0; i < opts.num+opts.numHidden; i++ {

View File

@@ -76,7 +76,10 @@ func TestSubscribe(t *testing.T) {
require.NotNil(t, s.getSubscribedTrack())
require.Len(t, sm.GetSubscribedTracks(), 1)
require.Len(t, sm.GetSubscribedParticipants(), 1)
require.Eventually(t, func() bool {
return len(sm.GetSubscribedParticipants()) == 1
}, subSettleTimeout, subCheckInterval, "GetSubscribedParticipants should have returned one item")
require.Equal(t, "pubID", string(sm.GetSubscribedParticipants()[0]))
// ensure telemetry events are sent

View File

@@ -114,7 +114,7 @@ func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) {
return auth.NewFileBasedKeyProviderFromMap(conf.Keys), nil
}
func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webhook.Notifier, error) {
func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webhook.QueuedNotifier, error) {
wc := conf.WebHook
if len(wc.URLs) == 0 {
return nil, nil
@@ -124,7 +124,7 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webh
return nil, ErrWebHookMissingAPIKey
}
return webhook.NewNotifier(wc.APIKey, secret, wc.URLs), nil
return webhook.NewDefaultNotifier(wc.APIKey, secret, wc.URLs), nil
}
func createRedisClient(conf *config.Config) (redis.UniversalClient, error) {

View File

@@ -63,12 +63,12 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
notifier, err := createWebhookNotifier(conf, keyProvider)
queuedNotifier, err := createWebhookNotifier(conf, keyProvider)
if err != nil {
return nil, err
}
analyticsService := telemetry.NewAnalyticsService(conf, currentNode)
telemetryService := telemetry.NewTelemetryService(notifier, analyticsService)
telemetryService := telemetry.NewTelemetryService(queuedNotifier, analyticsService)
rtcEgressLauncher := NewEgressLauncher(egressClient, rpcClient, egressStore, telemetryService)
roomService, err := NewRoomService(roomConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher)
if err != nil {
@@ -159,7 +159,7 @@ func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) {
return auth.NewFileBasedKeyProviderFromMap(conf.Keys), nil
}
func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webhook.Notifier, error) {
func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webhook.QueuedNotifier, error) {
wc := conf.WebHook
if len(wc.URLs) == 0 {
return nil, nil
@@ -169,7 +169,7 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webh
return nil, ErrWebHookMissingAPIKey
}
return webhook.NewNotifier(wc.APIKey, secret, wc.URLs), nil
return webhook.NewDefaultNotifier(wc.APIKey, secret, wc.URLs), nil
}
func createRedisClient(conf *config.Config) (redis.UniversalClient, error) {

View File

@@ -21,11 +21,9 @@ func (t *telemetryService) NotifyEvent(ctx context.Context, event *livekit.Webho
event.CreatedAt = time.Now().Unix()
event.Id = utils.NewGuid("EV_")
t.webhookPool.Submit(func() {
if err := t.notifier.Notify(ctx, event); err != nil {
logger.Warnw("failed to notify webhook", err, "event", event.Event)
}
})
if err := t.notifier.QueueNotify(ctx, event); err != nil {
logger.Warnw("failed to notify webhook", err, "event", event.Event)
}
}
func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {

View File

@@ -5,8 +5,6 @@ import (
"sync"
"time"
"github.com/gammazero/workerpool"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -72,22 +70,20 @@ const (
type telemetryService struct {
AnalyticsService
notifier webhook.Notifier
webhookPool *workerpool.WorkerPool
jobsChan chan func()
notifier webhook.QueuedNotifier
jobsChan chan func()
lock sync.RWMutex
workers map[livekit.ParticipantID]*StatsWorker
}
func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService {
func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsService) TelemetryService {
t := &telemetryService{
AnalyticsService: analytics,
notifier: notifier,
webhookPool: workerpool.New(maxWebhookWorkers),
jobsChan: make(chan func(), jobQueueBufferSize),
workers: make(map[livekit.ParticipantID]*StatsWorker),
notifier: notifier,
jobsChan: make(chan func(), jobQueueBufferSize),
workers: make(map[livekit.ParticipantID]*StatsWorker),
}
go t.run()