From 9495646f1a99fbade8b33eee1de71a9dd8ddca3c Mon Sep 17 00:00:00 2001 From: David Zhao Date: Mon, 9 Aug 2021 09:57:37 -0700 Subject: [PATCH] webhooks support (#81) --- config-sample.yaml | 10 ++ go.mod | 3 +- go.sum | 4 +- magefile.go | 8 +- pkg/config/config.go | 9 +- pkg/config/ip.go | 2 +- pkg/service/errors.go | 11 +- pkg/service/roommanager.go | 48 ++++++++- pkg/service/roommanager_test.go | 2 +- pkg/service/utils.go | 16 +++ pkg/service/wire_gen.go | 6 +- test/webhook_test.go | 186 ++++++++++++++++++++++++++++++++ 12 files changed, 286 insertions(+), 19 deletions(-) create mode 100644 test/webhook_test.go diff --git a/config-sample.yaml b/config-sample.yaml index 5884b9d1d..c03b01f22 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -75,6 +75,16 @@ keys: # - mime: audio/opus # - mime: video/vp8 +# Webhooks +# when configured, LiveKit notifies your URL handler with room events +#webhook: +# # the API key to use in order to sign the message +# # this must match one of the keys LiveKit is configured with +# api_key: +# # list of URLs to be notified of room events +# urls: +# - https://your-host.com/handler + # customize audio level sensitivity #audio: # # minimum level to be considered active, 0-127, where 0 is loudest diff --git a/go.mod b/go.mod index 270ce4693..7057e0bec 100644 --- a/go.mod +++ b/go.mod @@ -5,13 +5,14 @@ go 1.15 require ( github.com/bep/debounce v1.2.0 github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect + github.com/gammazero/workerpool v1.1.2 github.com/go-logr/logr v1.0.0 github.com/go-logr/zapr v1.0.0 github.com/go-redis/redis/v8 v8.7.1 github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.6.6 + github.com/livekit/protocol v0.7.0 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index 953f69091..8f62128dd 100644 --- a/go.sum +++ b/go.sum @@ -237,8 +237,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/ion-sfu v1.20.6 h1:vA98RfuW3sSidV1rfK+/szGWgHFgki4Q4pomxsJS0i0= github.com/livekit/ion-sfu v1.20.6/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA= -github.com/livekit/protocol v0.6.6 h1:ZeJlYBX/0ZUq9BSkTE4tiBAHDeYEfV/8sv1PWaWAv9g= -github.com/livekit/protocol v0.6.6/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0= +github.com/livekit/protocol v0.7.0 h1:p1HjTwlFdWalJAHs0zDFXy3Nyou0z+WqD9bernAoGT0= +github.com/livekit/protocol v0.7.0/go.mod h1:Vk04t1uIJa+U2L5SeANEmDl6ebjc9tKVi4kk3CpqW74= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= diff --git a/magefile.go b/magefile.go index 4ec376d83..7a20ea406 100644 --- a/magefile.go +++ b/magefile.go @@ -67,11 +67,12 @@ func Proto() error { } protoDir := info.Dir updated, err := target.Path("proto/livekit_models.pb.go", + protoDir+"/livekit_internal.proto", protoDir+"/livekit_models.proto", protoDir+"/livekit_recording.proto", protoDir+"/livekit_room.proto", protoDir+"/livekit_rtc.proto", - protoDir+"/livekit_internal.proto", + protoDir+"/livekit_webhook.proto", ) if err != nil { return err @@ -99,7 +100,7 @@ func Proto() error { return err } - // generate model and room + // generate twirp-related protos cmd = exec.Command(protoc, "--go_out", target, "--twirp_out", target, @@ -116,7 +117,7 @@ func Proto() error { return err } - // generate rtc + // generate basic protobuf cmd = exec.Command(protoc, "--go_out", target, "--go_opt=paths=source_relative", @@ -126,6 +127,7 @@ func Proto() error { protoDir+"/livekit_rtc.proto", protoDir+"/livekit_internal.proto", protoDir+"/livekit_models.proto", + protoDir+"/livekit_webhook.proto", ) connectStd(cmd) if err := cmd.Run(); err != nil { diff --git a/pkg/config/config.go b/pkg/config/config.go index 4f4fbdd53..838a2092a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -12,7 +12,7 @@ import ( "gopkg.in/yaml.v3" ) -var DEFAULT_STUN_SERVERS = []string{ +var DefaultStunServers = []string{ "stun.l.google.com:19302", "stun1.l.google.com:19302", } @@ -25,6 +25,7 @@ type Config struct { Audio AudioConfig `yaml:"audio"` Room RoomConfig `yaml:"room"` TURN TURNConfig `yaml:"turn"` + WebHook WebHookConfig `yaml:"webhook"` KeyFile string `yaml:"key_file"` Keys map[string]string `yaml:"keys"` LogLevel string `yaml:"log_level"` @@ -99,6 +100,12 @@ type TURNConfig struct { UDPPort int `yaml:"udp_port"` } +type WebHookConfig struct { + URLs []string `yaml:"urls"` + // key to use for webhook + APIKey string `yaml:"api_key"` +} + func NewConfig(confString string, c *cli.Context) (*Config, error) { // start with defaults conf := &Config{ diff --git a/pkg/config/ip.go b/pkg/config/ip.go index 0cbb41a56..a76b7e337 100644 --- a/pkg/config/ip.go +++ b/pkg/config/ip.go @@ -15,7 +15,7 @@ func (conf *Config) determineIP() (string, error) { if conf.RTC.UseExternalIP { stunServers := conf.RTC.StunServers if len(stunServers) == 0 { - stunServers = DEFAULT_STUN_SERVERS + stunServers = DefaultStunServers } ip, err := GetExternalIP(stunServers) if err == nil { diff --git a/pkg/service/errors.go b/pkg/service/errors.go index 1b157cae4..992e45549 100644 --- a/pkg/service/errors.go +++ b/pkg/service/errors.go @@ -3,9 +3,10 @@ package service import "errors" var ( - ErrRoomNotFound = errors.New("requested room does not exist") - ErrRoomLockFailed = errors.New("could not lock room") - ErrRoomUnlockFailed = errors.New("could not unlock room, lock token does not match") - ErrParticipantNotFound = errors.New("participant does not exist") - ErrTrackNotFound = errors.New("track is not found") + ErrRoomNotFound = errors.New("requested room does not exist") + ErrRoomLockFailed = errors.New("could not lock room") + ErrRoomUnlockFailed = errors.New("could not unlock room, lock token does not match") + ErrParticipantNotFound = errors.New("participant does not exist") + ErrTrackNotFound = errors.New("track is not found") + ErrWebHookMissingAPIKey = errors.New("api_key is required to use webhooks") ) diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index f0590b84e..736e2eb8a 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -5,7 +5,9 @@ import ( "sync" "time" + "github.com/gammazero/workerpool" "github.com/livekit/protocol/utils" + "github.com/livekit/protocol/webhook" "github.com/pion/webrtc/v3" "github.com/livekit/livekit-server/pkg/config" @@ -28,12 +30,15 @@ type RoomManager struct { selector routing.NodeSelector router routing.Router currentNode routing.LocalNode + notifier *webhook.Notifier rtcConfig *rtc.WebRTCConfig config *config.Config + webhookPool *workerpool.WorkerPool rooms map[string]*rtc.Room } -func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.LocalNode, selector routing.NodeSelector, conf *config.Config) (*RoomManager, error) { +func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.LocalNode, selector routing.NodeSelector, + notifier *webhook.Notifier, conf *config.Config) (*RoomManager, error) { rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip) if err != nil { return nil, err @@ -46,7 +51,9 @@ func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.Loc config: conf, router: router, selector: selector, + notifier: notifier, currentNode: currentNode, + webhookPool: workerpool.New(1), rooms: make(map[string]*rtc.Room), }, nil } @@ -317,7 +324,7 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit, go r.rtcSessionWorker(room, participant, requestSource) } -// create the actual room object +// create the actual room object, to be used on RTC node func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { r.lock.RLock() room := r.rooms[roomName] @@ -340,6 +347,11 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { logger.Errorw("could not delete room", err) } + r.notifyEvent(&livekit.WebhookEvent{ + Type: webhook.EventRoomFinished, + Room: room.Room, + }) + // print stats logger.Infow("room closed", "incomingStats", room.GetIncomingStats().Copy(), @@ -361,10 +373,15 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { r.rooms[roomName] = room r.lock.Unlock() + r.notifyEvent(&livekit.WebhookEvent{ + Type: webhook.EventRoomStarted, + Room: room.Room, + }) + return room, nil } -// manages a RTC session for a participant, runs on the RTC node +// manages an RTC session for a participant, runs on the RTC node func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Participant, requestSource routing.MessageSource) { defer func() { logger.Debugw("RTC session finishing", @@ -374,9 +391,20 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici "roomID", room.Room.Sid, ) _ = participant.Close() + + r.notifyEvent(&livekit.WebhookEvent{ + Type: webhook.EventParticipantLeft, + Room: room.Room, + Participant: participant.ToProto(), + }) }() defer rtc.Recover() + r.notifyEvent(&livekit.WebhookEvent{ + Type: webhook.EventParticipantJoined, + Room: room.Room, + Participant: participant.ToProto(), + }) for { select { case <-time.After(time.Millisecond * 50): @@ -540,11 +568,23 @@ func (r *RoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICEServer { } if !hasSTUN { - iceServers = append(iceServers, iceServerForStunServers(config.DEFAULT_STUN_SERVERS)) + iceServers = append(iceServers, iceServerForStunServers(config.DefaultStunServers)) } return iceServers } +func (r *RoomManager) notifyEvent(event *livekit.WebhookEvent) { + if r.notifier == nil { + return + } + + r.webhookPool.Submit(func() { + if err := r.notifier.Notify(event); err != nil { + logger.Warnw("could not notify webhook", err, "event", event.Type) + } + }) +} + func applyDefaultRoomConfig(room *livekit.Room, conf *config.RoomConfig) { room.EmptyTimeout = conf.EmptyTimeout room.MaxParticipants = conf.MaxParticipants diff --git a/pkg/service/roommanager_test.go b/pkg/service/roommanager_test.go index 03787d6eb..6b70d25fd 100644 --- a/pkg/service/roommanager_test.go +++ b/pkg/service/roommanager_test.go @@ -35,7 +35,7 @@ func newTestRoomManager(t *testing.T) (*service.RoomManager, *config.Config) { router.GetNodeForRoomReturns(node, nil) - rm, err := service.NewRoomManager(store, router, node, selector, conf) + rm, err := service.NewRoomManager(store, router, node, selector, nil, conf) require.NoError(t, err) return rm, conf diff --git a/pkg/service/utils.go b/pkg/service/utils.go index dbfc8fb81..58c72f2d3 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -6,6 +6,8 @@ import ( "github.com/go-redis/redis/v8" "github.com/google/wire" + "github.com/livekit/protocol/auth" + "github.com/livekit/protocol/webhook" "github.com/pkg/errors" "github.com/livekit/livekit-server/pkg/config" @@ -18,6 +20,7 @@ var ServiceSet = wire.NewSet( createRedisClient, createRouter, createStore, + createWebhookNotifier, NewRecordingService, NewRoomService, NewRTCService, @@ -66,6 +69,19 @@ func createStore(rc *redis.Client) RoomStore { return NewLocalRoomStore() } +func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (*webhook.Notifier, error) { + wc := conf.WebHook + if len(wc.URLs) == 0 { + return nil, nil + } + secret := provider.GetSecret(wc.APIKey) + if secret == "" { + return nil, ErrWebHookMissingAPIKey + } + + return webhook.NewNotifier(wc.APIKey, secret, wc.URLs), nil +} + func handleError(w http.ResponseWriter, status int, msg string) { // GetLogger already with extra depth 1 logger.GetLogger().V(1).Info("error handling request", "error", msg, "status", status) diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index d1d2e8060..73b4d70a2 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -20,7 +20,11 @@ func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, current } roomStore := createStore(client) router := createRouter(client, currentNode) - roomManager, err := NewRoomManager(roomStore, router, currentNode, selector, conf) + notifier, err := createWebhookNotifier(conf, keyProvider) + if err != nil { + return nil, err + } + roomManager, err := NewRoomManager(roomStore, router, currentNode, selector, notifier, conf) if err != nil { return nil, err } diff --git a/test/webhook_test.go b/test/webhook_test.go new file mode 100644 index 000000000..4d75b0b9a --- /dev/null +++ b/test/webhook_test.go @@ -0,0 +1,186 @@ +package test + +import ( + "context" + "fmt" + "net" + "net/http" + "sync" + "testing" + + "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/logger" + "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/service" + "github.com/livekit/livekit-server/pkg/testutils" + livekit "github.com/livekit/livekit-server/proto" + "github.com/livekit/protocol/auth" + "github.com/livekit/protocol/utils" + "github.com/livekit/protocol/webhook" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" +) + +func TestWebhooks(t *testing.T) { + server, ts, finish, err := setupServerWithWebhook() + require.NoError(t, err) + defer finish() + + c1 := createRTCClient("c1", defaultServerPort, nil) + waitUntilConnected(t, c1) + testutils.WithTimeout(t, "webhook events room_started and participant_joined", func() bool { + if ts.GetEvent(webhook.EventRoomStarted) == nil { + return false + } + if ts.GetEvent(webhook.EventParticipantJoined) == nil { + return false + } + return true + }) + + // first participant join should have started the room + started := ts.GetEvent(webhook.EventRoomStarted) + require.Equal(t, testRoom, started.Room.Name) + joined := ts.GetEvent(webhook.EventParticipantJoined) + require.Equal(t, "c1", joined.Participant.Identity) + ts.ClearEvents() + + // another participant joins + c2 := createRTCClient("c2", defaultServerPort, nil) + waitUntilConnected(t, c2) + defer c2.Stop() + testutils.WithTimeout(t, "webhook events participant_joined", func() bool { + if ts.GetEvent(webhook.EventParticipantJoined) == nil { + return false + } + return true + }) + joined = ts.GetEvent(webhook.EventParticipantJoined) + require.Equal(t, "c2", joined.Participant.Identity) + ts.ClearEvents() + + // first participant leaves + c1.Stop() + testutils.WithTimeout(t, "webhook events participant_left", func() bool { + if ts.GetEvent(webhook.EventParticipantLeft) == nil { + return false + } + return true + }) + left := ts.GetEvent(webhook.EventParticipantLeft) + require.Equal(t, "c1", left.Participant.Identity) + ts.ClearEvents() + + // room closed + rm := server.RoomManager().GetRoom(testRoom) + rm.Close() + testutils.WithTimeout(t, "webhook events room_finished", func() bool { + if ts.GetEvent(webhook.EventRoomFinished) == nil { + return false + } + return true + }) + require.Equal(t, testRoom, ts.GetEvent(webhook.EventRoomFinished).Room.Name) +} + +func setupServerWithWebhook() (server *service.LivekitServer, testServer *webookTestServer, finishFunc func(), err error) { + conf, err := config.NewConfig("", nil) + if err != nil { + panic(fmt.Sprintf("could not create config: %v", err)) + } + conf.WebHook.URLs = []string{"http://localhost:7890"} + conf.WebHook.APIKey = testApiKey + conf.Development = true + + testServer = newTestServer(":7890") + if err = testServer.Start(); err != nil { + return + } + + currentNode, err := routing.NewLocalNode(conf) + if err != nil { + return + } + currentNode.Id = utils.NewGuid(nodeId1) + + server, err = service.InitializeServer(conf, &StaticKeyProvider{}, currentNode, &routing.RandomSelector{}) + if err != nil { + return + } + + go func() { + if err := server.Start(); err != nil { + logger.Errorw("server returned error", err) + } + }() + + waitForServerToStart(server) + + finishFunc = func() { + server.Stop() + testServer.Stop() + } + return +} + +type webookTestServer struct { + server *http.Server + events map[string]*livekit.WebhookEvent + lock sync.Mutex + provider auth.KeyProvider +} + +func newTestServer(addr string) *webookTestServer { + s := &webookTestServer{ + events: make(map[string]*livekit.WebhookEvent), + provider: &StaticKeyProvider{}, + } + s.server = &http.Server{ + Addr: addr, + Handler: s, + } + return s +} + +func (s *webookTestServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + data, err := webhook.Receive(r, s.provider) + if err != nil { + logger.Errorw("could not receive webhook", err) + return + } + + event := livekit.WebhookEvent{} + if err = protojson.Unmarshal(data, &event); err != nil { + logger.Errorw("could not unmarshal event", err) + return + } + + s.lock.Lock() + s.events[event.Type] = &event + s.lock.Unlock() +} + +func (s *webookTestServer) GetEvent(name string) *livekit.WebhookEvent { + s.lock.Lock() + defer s.lock.Unlock() + return s.events[name] +} + +func (s *webookTestServer) ClearEvents() { + s.lock.Lock() + s.events = make(map[string]*livekit.WebhookEvent) + s.lock.Unlock() +} + +func (s *webookTestServer) Start() error { + l, err := net.Listen("tcp", s.server.Addr) + if err != nil { + return err + } + go s.server.Serve(l) + return nil +} + +func (s *webookTestServer) Stop() { + _ = s.server.Shutdown(context.Background()) +}