webhooks support (#81)

This commit is contained in:
David Zhao
2021-08-09 09:57:37 -07:00
committed by GitHub
parent 1f4288744c
commit 9495646f1a
12 changed files with 286 additions and 19 deletions
+10
View File
@@ -75,6 +75,16 @@ keys:
# - mime: audio/opus
# - mime: video/vp8
# Webhooks
# when configured, LiveKit notifies your URL handler with room events
#webhook:
# # the API key to use in order to sign the message
# # this must match one of the keys LiveKit is configured with
# api_key: <api_key>
# # list of URLs to be notified of room events
# urls:
# - https://your-host.com/handler
# customize audio level sensitivity
#audio:
# # minimum level to be considered active, 0-127, where 0 is loudest
+2 -1
View File
@@ -5,13 +5,14 @@ go 1.15
require (
github.com/bep/debounce v1.2.0
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/gammazero/workerpool v1.1.2
github.com/go-logr/logr v1.0.0
github.com/go-logr/zapr v1.0.0
github.com/go-redis/redis/v8 v8.7.1
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/livekit/protocol v0.6.6
github.com/livekit/protocol v0.7.0
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0
+2 -2
View File
@@ -237,8 +237,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/ion-sfu v1.20.6 h1:vA98RfuW3sSidV1rfK+/szGWgHFgki4Q4pomxsJS0i0=
github.com/livekit/ion-sfu v1.20.6/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA=
github.com/livekit/protocol v0.6.6 h1:ZeJlYBX/0ZUq9BSkTE4tiBAHDeYEfV/8sv1PWaWAv9g=
github.com/livekit/protocol v0.6.6/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0=
github.com/livekit/protocol v0.7.0 h1:p1HjTwlFdWalJAHs0zDFXy3Nyou0z+WqD9bernAoGT0=
github.com/livekit/protocol v0.7.0/go.mod h1:Vk04t1uIJa+U2L5SeANEmDl6ebjc9tKVi4kk3CpqW74=
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=
github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
+5 -3
View File
@@ -67,11 +67,12 @@ func Proto() error {
}
protoDir := info.Dir
updated, err := target.Path("proto/livekit_models.pb.go",
protoDir+"/livekit_internal.proto",
protoDir+"/livekit_models.proto",
protoDir+"/livekit_recording.proto",
protoDir+"/livekit_room.proto",
protoDir+"/livekit_rtc.proto",
protoDir+"/livekit_internal.proto",
protoDir+"/livekit_webhook.proto",
)
if err != nil {
return err
@@ -99,7 +100,7 @@ func Proto() error {
return err
}
// generate model and room
// generate twirp-related protos
cmd = exec.Command(protoc,
"--go_out", target,
"--twirp_out", target,
@@ -116,7 +117,7 @@ func Proto() error {
return err
}
// generate rtc
// generate basic protobuf
cmd = exec.Command(protoc,
"--go_out", target,
"--go_opt=paths=source_relative",
@@ -126,6 +127,7 @@ func Proto() error {
protoDir+"/livekit_rtc.proto",
protoDir+"/livekit_internal.proto",
protoDir+"/livekit_models.proto",
protoDir+"/livekit_webhook.proto",
)
connectStd(cmd)
if err := cmd.Run(); err != nil {
+8 -1
View File
@@ -12,7 +12,7 @@ import (
"gopkg.in/yaml.v3"
)
var DEFAULT_STUN_SERVERS = []string{
var DefaultStunServers = []string{
"stun.l.google.com:19302",
"stun1.l.google.com:19302",
}
@@ -25,6 +25,7 @@ type Config struct {
Audio AudioConfig `yaml:"audio"`
Room RoomConfig `yaml:"room"`
TURN TURNConfig `yaml:"turn"`
WebHook WebHookConfig `yaml:"webhook"`
KeyFile string `yaml:"key_file"`
Keys map[string]string `yaml:"keys"`
LogLevel string `yaml:"log_level"`
@@ -99,6 +100,12 @@ type TURNConfig struct {
UDPPort int `yaml:"udp_port"`
}
type WebHookConfig struct {
URLs []string `yaml:"urls"`
// key to use for webhook
APIKey string `yaml:"api_key"`
}
func NewConfig(confString string, c *cli.Context) (*Config, error) {
// start with defaults
conf := &Config{
+1 -1
View File
@@ -15,7 +15,7 @@ func (conf *Config) determineIP() (string, error) {
if conf.RTC.UseExternalIP {
stunServers := conf.RTC.StunServers
if len(stunServers) == 0 {
stunServers = DEFAULT_STUN_SERVERS
stunServers = DefaultStunServers
}
ip, err := GetExternalIP(stunServers)
if err == nil {
+6 -5
View File
@@ -3,9 +3,10 @@ package service
import "errors"
var (
ErrRoomNotFound = errors.New("requested room does not exist")
ErrRoomLockFailed = errors.New("could not lock room")
ErrRoomUnlockFailed = errors.New("could not unlock room, lock token does not match")
ErrParticipantNotFound = errors.New("participant does not exist")
ErrTrackNotFound = errors.New("track is not found")
ErrRoomNotFound = errors.New("requested room does not exist")
ErrRoomLockFailed = errors.New("could not lock room")
ErrRoomUnlockFailed = errors.New("could not unlock room, lock token does not match")
ErrParticipantNotFound = errors.New("participant does not exist")
ErrTrackNotFound = errors.New("track is not found")
ErrWebHookMissingAPIKey = errors.New("api_key is required to use webhooks")
)
+44 -4
View File
@@ -5,7 +5,9 @@ import (
"sync"
"time"
"github.com/gammazero/workerpool"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
"github.com/pion/webrtc/v3"
"github.com/livekit/livekit-server/pkg/config"
@@ -28,12 +30,15 @@ type RoomManager struct {
selector routing.NodeSelector
router routing.Router
currentNode routing.LocalNode
notifier *webhook.Notifier
rtcConfig *rtc.WebRTCConfig
config *config.Config
webhookPool *workerpool.WorkerPool
rooms map[string]*rtc.Room
}
func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.LocalNode, selector routing.NodeSelector, conf *config.Config) (*RoomManager, error) {
func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.LocalNode, selector routing.NodeSelector,
notifier *webhook.Notifier, conf *config.Config) (*RoomManager, error) {
rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip)
if err != nil {
return nil, err
@@ -46,7 +51,9 @@ func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.Loc
config: conf,
router: router,
selector: selector,
notifier: notifier,
currentNode: currentNode,
webhookPool: workerpool.New(1),
rooms: make(map[string]*rtc.Room),
}, nil
}
@@ -317,7 +324,7 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit,
go r.rtcSessionWorker(room, participant, requestSource)
}
// create the actual room object
// create the actual room object, to be used on RTC node
func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
r.lock.RLock()
room := r.rooms[roomName]
@@ -340,6 +347,11 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
logger.Errorw("could not delete room", err)
}
r.notifyEvent(&livekit.WebhookEvent{
Type: webhook.EventRoomFinished,
Room: room.Room,
})
// print stats
logger.Infow("room closed",
"incomingStats", room.GetIncomingStats().Copy(),
@@ -361,10 +373,15 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
r.rooms[roomName] = room
r.lock.Unlock()
r.notifyEvent(&livekit.WebhookEvent{
Type: webhook.EventRoomStarted,
Room: room.Room,
})
return room, nil
}
// manages a RTC session for a participant, runs on the RTC node
// manages an RTC session for a participant, runs on the RTC node
func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Participant, requestSource routing.MessageSource) {
defer func() {
logger.Debugw("RTC session finishing",
@@ -374,9 +391,20 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
"roomID", room.Room.Sid,
)
_ = participant.Close()
r.notifyEvent(&livekit.WebhookEvent{
Type: webhook.EventParticipantLeft,
Room: room.Room,
Participant: participant.ToProto(),
})
}()
defer rtc.Recover()
r.notifyEvent(&livekit.WebhookEvent{
Type: webhook.EventParticipantJoined,
Room: room.Room,
Participant: participant.ToProto(),
})
for {
select {
case <-time.After(time.Millisecond * 50):
@@ -540,11 +568,23 @@ func (r *RoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICEServer {
}
if !hasSTUN {
iceServers = append(iceServers, iceServerForStunServers(config.DEFAULT_STUN_SERVERS))
iceServers = append(iceServers, iceServerForStunServers(config.DefaultStunServers))
}
return iceServers
}
func (r *RoomManager) notifyEvent(event *livekit.WebhookEvent) {
if r.notifier == nil {
return
}
r.webhookPool.Submit(func() {
if err := r.notifier.Notify(event); err != nil {
logger.Warnw("could not notify webhook", err, "event", event.Type)
}
})
}
func applyDefaultRoomConfig(room *livekit.Room, conf *config.RoomConfig) {
room.EmptyTimeout = conf.EmptyTimeout
room.MaxParticipants = conf.MaxParticipants
+1 -1
View File
@@ -35,7 +35,7 @@ func newTestRoomManager(t *testing.T) (*service.RoomManager, *config.Config) {
router.GetNodeForRoomReturns(node, nil)
rm, err := service.NewRoomManager(store, router, node, selector, conf)
rm, err := service.NewRoomManager(store, router, node, selector, nil, conf)
require.NoError(t, err)
return rm, conf
+16
View File
@@ -6,6 +6,8 @@ import (
"github.com/go-redis/redis/v8"
"github.com/google/wire"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/webhook"
"github.com/pkg/errors"
"github.com/livekit/livekit-server/pkg/config"
@@ -18,6 +20,7 @@ var ServiceSet = wire.NewSet(
createRedisClient,
createRouter,
createStore,
createWebhookNotifier,
NewRecordingService,
NewRoomService,
NewRTCService,
@@ -66,6 +69,19 @@ func createStore(rc *redis.Client) RoomStore {
return NewLocalRoomStore()
}
func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (*webhook.Notifier, error) {
wc := conf.WebHook
if len(wc.URLs) == 0 {
return nil, nil
}
secret := provider.GetSecret(wc.APIKey)
if secret == "" {
return nil, ErrWebHookMissingAPIKey
}
return webhook.NewNotifier(wc.APIKey, secret, wc.URLs), nil
}
func handleError(w http.ResponseWriter, status int, msg string) {
// GetLogger already with extra depth 1
logger.GetLogger().V(1).Info("error handling request", "error", msg, "status", status)
+5 -1
View File
@@ -20,7 +20,11 @@ func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, current
}
roomStore := createStore(client)
router := createRouter(client, currentNode)
roomManager, err := NewRoomManager(roomStore, router, currentNode, selector, conf)
notifier, err := createWebhookNotifier(conf, keyProvider)
if err != nil {
return nil, err
}
roomManager, err := NewRoomManager(roomStore, router, currentNode, selector, notifier, conf)
if err != nil {
return nil, err
}
+186
View File
@@ -0,0 +1,186 @@
package test
import (
"context"
"fmt"
"net"
"net/http"
"sync"
"testing"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/service"
"github.com/livekit/livekit-server/pkg/testutils"
livekit "github.com/livekit/livekit-server/proto"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/protojson"
)
func TestWebhooks(t *testing.T) {
server, ts, finish, err := setupServerWithWebhook()
require.NoError(t, err)
defer finish()
c1 := createRTCClient("c1", defaultServerPort, nil)
waitUntilConnected(t, c1)
testutils.WithTimeout(t, "webhook events room_started and participant_joined", func() bool {
if ts.GetEvent(webhook.EventRoomStarted) == nil {
return false
}
if ts.GetEvent(webhook.EventParticipantJoined) == nil {
return false
}
return true
})
// first participant join should have started the room
started := ts.GetEvent(webhook.EventRoomStarted)
require.Equal(t, testRoom, started.Room.Name)
joined := ts.GetEvent(webhook.EventParticipantJoined)
require.Equal(t, "c1", joined.Participant.Identity)
ts.ClearEvents()
// another participant joins
c2 := createRTCClient("c2", defaultServerPort, nil)
waitUntilConnected(t, c2)
defer c2.Stop()
testutils.WithTimeout(t, "webhook events participant_joined", func() bool {
if ts.GetEvent(webhook.EventParticipantJoined) == nil {
return false
}
return true
})
joined = ts.GetEvent(webhook.EventParticipantJoined)
require.Equal(t, "c2", joined.Participant.Identity)
ts.ClearEvents()
// first participant leaves
c1.Stop()
testutils.WithTimeout(t, "webhook events participant_left", func() bool {
if ts.GetEvent(webhook.EventParticipantLeft) == nil {
return false
}
return true
})
left := ts.GetEvent(webhook.EventParticipantLeft)
require.Equal(t, "c1", left.Participant.Identity)
ts.ClearEvents()
// room closed
rm := server.RoomManager().GetRoom(testRoom)
rm.Close()
testutils.WithTimeout(t, "webhook events room_finished", func() bool {
if ts.GetEvent(webhook.EventRoomFinished) == nil {
return false
}
return true
})
require.Equal(t, testRoom, ts.GetEvent(webhook.EventRoomFinished).Room.Name)
}
func setupServerWithWebhook() (server *service.LivekitServer, testServer *webookTestServer, finishFunc func(), err error) {
conf, err := config.NewConfig("", nil)
if err != nil {
panic(fmt.Sprintf("could not create config: %v", err))
}
conf.WebHook.URLs = []string{"http://localhost:7890"}
conf.WebHook.APIKey = testApiKey
conf.Development = true
testServer = newTestServer(":7890")
if err = testServer.Start(); err != nil {
return
}
currentNode, err := routing.NewLocalNode(conf)
if err != nil {
return
}
currentNode.Id = utils.NewGuid(nodeId1)
server, err = service.InitializeServer(conf, &StaticKeyProvider{}, currentNode, &routing.RandomSelector{})
if err != nil {
return
}
go func() {
if err := server.Start(); err != nil {
logger.Errorw("server returned error", err)
}
}()
waitForServerToStart(server)
finishFunc = func() {
server.Stop()
testServer.Stop()
}
return
}
type webookTestServer struct {
server *http.Server
events map[string]*livekit.WebhookEvent
lock sync.Mutex
provider auth.KeyProvider
}
func newTestServer(addr string) *webookTestServer {
s := &webookTestServer{
events: make(map[string]*livekit.WebhookEvent),
provider: &StaticKeyProvider{},
}
s.server = &http.Server{
Addr: addr,
Handler: s,
}
return s
}
func (s *webookTestServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
data, err := webhook.Receive(r, s.provider)
if err != nil {
logger.Errorw("could not receive webhook", err)
return
}
event := livekit.WebhookEvent{}
if err = protojson.Unmarshal(data, &event); err != nil {
logger.Errorw("could not unmarshal event", err)
return
}
s.lock.Lock()
s.events[event.Type] = &event
s.lock.Unlock()
}
func (s *webookTestServer) GetEvent(name string) *livekit.WebhookEvent {
s.lock.Lock()
defer s.lock.Unlock()
return s.events[name]
}
func (s *webookTestServer) ClearEvents() {
s.lock.Lock()
s.events = make(map[string]*livekit.WebhookEvent)
s.lock.Unlock()
}
func (s *webookTestServer) Start() error {
l, err := net.Listen("tcp", s.server.Addr)
if err != nil {
return err
}
go s.server.Serve(l)
return nil
}
func (s *webookTestServer) Stop() {
_ = s.server.Shutdown(context.Background())
}