Auto egress (#1011)

* auto egress

* fix room service test

* reuse StartTrackEgress

* add timestamp

* update prefixed filename explicitly

* update protocol

* clean up telemetry

* fix telemetry tests

* separate room internal storage

* auto participant egress

* remove custom template url

* fix internal key

* use map for stats workers

* remove sync.Map

* remove participant composite
This commit is contained in:
David Colburn
2022-09-21 12:04:19 -07:00
committed by GitHub
parent 48588d7c3d
commit 803046b882
26 changed files with 1230 additions and 677 deletions

2
go.mod
View File

@@ -16,7 +16,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/go-version v1.6.0
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v1.0.4-0.20220920233742-8645a138fb2e
github.com/livekit/protocol v1.1.2
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995
github.com/mackerelio/go-osstat v0.2.3
github.com/magefile/mage v1.14.0

4
go.sum
View File

@@ -240,8 +240,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v1.0.4-0.20220920233742-8645a138fb2e h1:lGP8FJESvE4zvVufl4tc02aw9Ym1zA3rGQ48H40pjwE=
github.com/livekit/protocol v1.0.4-0.20220920233742-8645a138fb2e/go.mod h1:eburCdz6ZtbgKSKYkAeCdWP1z33DB9clTphz7uNaxp0=
github.com/livekit/protocol v1.1.2 h1:LDEFKK16T57pwDwxlJkkWMpbgvR0DJ3PozjOnvq29CI=
github.com/livekit/protocol v1.1.2/go.mod h1:eburCdz6ZtbgKSKYkAeCdWP1z33DB9clTphz7uNaxp0=
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw=
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U=
github.com/mackerelio/go-osstat v0.2.3 h1:jAMXD5erlDE39kdX2CU7YwCGRcxIO33u/p8+Fhe5dJw=

View File

@@ -39,12 +39,14 @@ type Room struct {
lock sync.RWMutex
protoRoom *livekit.Room
internal *livekit.RoomInternal
Logger logger.Logger
config WebRTCConfig
audioConfig *config.AudioConfig
serverInfo *livekit.ServerInfo
telemetry telemetry.TelemetryService
config WebRTCConfig
audioConfig *config.AudioConfig
serverInfo *livekit.ServerInfo
telemetry telemetry.TelemetryService
egressLauncher EgressLauncher
// map of identity -> Participant
participants map[livekit.ParticipantIdentity]types.LocalParticipant
@@ -73,17 +75,21 @@ type ParticipantOptions struct {
func NewRoom(
room *livekit.Room,
internal *livekit.RoomInternal,
config WebRTCConfig,
audioConfig *config.AudioConfig,
serverInfo *livekit.ServerInfo,
telemetry telemetry.TelemetryService,
egressLauncher EgressLauncher,
) *Room {
r := &Room{
protoRoom: proto.Clone(room).(*livekit.Room),
internal: internal,
Logger: LoggerWithRoom(logger.GetDefaultLogger(), livekit.RoomName(room.Name), livekit.RoomID(room.Sid)),
config: config,
audioConfig: audioConfig,
telemetry: telemetry,
egressLauncher: egressLauncher,
serverInfo: serverInfo,
participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant),
participantOpts: make(map[livekit.ParticipantIdentity]*ParticipantOptions),
@@ -693,8 +699,6 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.
r.broadcastParticipantState(participant, broadcastOptions{skipSource: true})
r.lock.RLock()
defer r.lock.RUnlock()
// subscribe all existing participants to this MediaTrack
for _, existingParticipant := range r.participants {
if existingParticipant == participant {
@@ -724,6 +728,20 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.
if r.onParticipantChanged != nil {
r.onParticipantChanged(participant)
}
r.lock.RUnlock()
// auto track egress
if r.internal != nil && r.internal.TrackEgress != nil {
StartTrackEgress(
context.Background(),
r.egressLauncher,
r.telemetry,
r.internal.TrackEgress,
track,
r.Name(),
r.ID(),
)
}
}
func (r *Room) onTrackUpdated(p types.LocalParticipant, _ types.MediaTrack) {

93
pkg/rtc/room_egress.go Normal file
View File

@@ -0,0 +1,93 @@
package rtc
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/webhook"
)
type EgressLauncher interface {
StartEgress(context.Context, *livekit.StartEgressRequest) (*livekit.EgressInfo, error)
}
func StartTrackEgress(
ctx context.Context,
launcher EgressLauncher,
ts telemetry.TelemetryService,
opts *livekit.AutoTrackEgress,
track types.MediaTrack,
roomName livekit.RoomName,
roomID livekit.RoomID,
) {
if req, err := startTrackEgress(ctx, launcher, opts, track, roomName, roomID); err != nil {
// send egress failed webhook
ts.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventEgressEnded,
EgressInfo: &livekit.EgressInfo{
RoomId: string(roomID),
RoomName: string(roomName),
Status: livekit.EgressStatus_EGRESS_FAILED,
Error: err.Error(),
Request: &livekit.EgressInfo_Track{Track: req},
},
})
}
}
func startTrackEgress(
ctx context.Context,
launcher EgressLauncher,
opts *livekit.AutoTrackEgress,
track types.MediaTrack,
roomName livekit.RoomName,
roomID livekit.RoomID,
) (*livekit.TrackEgressRequest, error) {
output := &livekit.DirectFileOutput{
Filepath: getFilePath(opts.FilePrefix, string(track.ID())),
}
switch out := opts.Output.(type) {
case *livekit.AutoTrackEgress_Azure:
output.Output = &livekit.DirectFileOutput_Azure{Azure: out.Azure}
case *livekit.AutoTrackEgress_Gcp:
output.Output = &livekit.DirectFileOutput_Gcp{Gcp: out.Gcp}
case *livekit.AutoTrackEgress_S3:
output.Output = &livekit.DirectFileOutput_S3{S3: out.S3}
}
req := &livekit.TrackEgressRequest{
RoomName: string(roomName),
TrackId: string(track.ID()),
Output: &livekit.TrackEgressRequest_File{
File: output,
},
}
if launcher == nil {
return req, errors.New("egress launcher not found")
}
_, err := launcher.StartEgress(ctx, &livekit.StartEgressRequest{
Request: &livekit.StartEgressRequest_Track{
Track: req,
},
RoomId: string(roomID),
})
return req, err
}
func getFilePath(prefix, identifier string) string {
if prefix == "" || strings.HasSuffix(prefix, "/") {
return fmt.Sprintf("%s%s_%s", prefix, identifier, time.Now().Format("2006-01-02T150405"))
} else {
return fmt.Sprintf("%s_%s_%s", prefix, identifier, time.Now().Format("2006-01-02T150405"))
}
}

View File

@@ -683,6 +683,7 @@ type testRoomOpts struct {
func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room {
rm := NewRoom(
&livekit.Room{Name: "room"},
nil,
WebRTCConfig{},
&config.AudioConfig{
UpdateInterval: audioUpdateInterval,
@@ -696,6 +697,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room {
Region: "testregion",
},
telemetry.NewTelemetryService(webhook.NewNotifier("", "", nil), &telemetryfakes.FakeAnalyticsService{}),
nil,
)
for i := 0; i < opts.num+opts.numHidden; i++ {
identity := livekit.ParticipantIdentity(fmt.Sprintf("p%d", i))

View File

@@ -8,6 +8,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -21,23 +22,43 @@ type EgressService struct {
es EgressStore
roomService livekit.RoomService
telemetry telemetry.TelemetryService
launcher rtc.EgressLauncher
shutdown chan struct{}
}
type egressLauncher struct {
rpcClient egress.RPCClient
es EgressStore
telemetry telemetry.TelemetryService
}
func NewEgressLauncher(rpcClient egress.RPCClient, es EgressStore, ts telemetry.TelemetryService) rtc.EgressLauncher {
if rpcClient == nil {
return nil
}
return &egressLauncher{
rpcClient: rpcClient,
es: es,
telemetry: ts,
}
}
func NewEgressService(
rpcClient egress.RPCClient,
store ServiceStore,
es EgressStore,
rs livekit.RoomService,
ts telemetry.TelemetryService,
launcher rtc.EgressLauncher,
) *EgressService {
return &EgressService{
rpcClient: rpcClient,
store: store,
es: es,
roomService: rs,
telemetry: ts,
launcher: launcher,
}
}
@@ -85,8 +106,7 @@ func (s *EgressService) StartTrackEgress(ctx context.Context, req *livekit.Track
func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomName, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.rpcClient == nil {
} else if s.launcher == nil {
return nil, ErrEgressNotConnected
}
@@ -96,6 +116,10 @@ func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomNa
}
req.RoomId = room.Sid
return s.launcher.StartEgress(ctx, req)
}
func (s *egressLauncher) StartEgress(ctx context.Context, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) {
info, err := s.rpcClient.SendRequest(ctx, req)
if err != nil {
return nil, err

View File

@@ -10,17 +10,19 @@ import (
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
// encapsulates CRUD operations for room settings
//
//counterfeiter:generate . ObjectStore
type ObjectStore interface {
ServiceStore
// enable locking on a specific room to prevent race
// returns a (lock uuid, error)
LockRoom(ctx context.Context, name livekit.RoomName, duration time.Duration) (string, error)
UnlockRoom(ctx context.Context, name livekit.RoomName, uid string) error
LockRoom(ctx context.Context, roomName livekit.RoomName, duration time.Duration) (string, error)
UnlockRoom(ctx context.Context, roomName livekit.RoomName, uid string) error
StoreRoom(ctx context.Context, room *livekit.Room) error
DeleteRoom(ctx context.Context, name livekit.RoomName) error
StoreRoomInternal(ctx context.Context, roomName livekit.RoomName, internal *livekit.RoomInternal) error
DeleteRoom(ctx context.Context, roomName livekit.RoomName) error
StoreParticipant(ctx context.Context, roomName livekit.RoomName, participant *livekit.ParticipantInfo) error
DeleteParticipant(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) error
@@ -28,11 +30,12 @@ type ObjectStore interface {
//counterfeiter:generate . ServiceStore
type ServiceStore interface {
LoadRoom(ctx context.Context, name livekit.RoomName) (*livekit.Room, error)
LoadRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Room, error)
LoadRoomInternal(ctx context.Context, roomName livekit.RoomName) (*livekit.RoomInternal, error)
// ListRooms returns currently active rooms. if names is not nil, it'll filter and return
// only rooms that match
ListRooms(ctx context.Context, names []livekit.RoomName) ([]*livekit.Room, error)
ListRooms(ctx context.Context, roomNames []livekit.RoomName) ([]*livekit.Room, error)
LoadParticipant(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error)
ListParticipants(ctx context.Context, roomName livekit.RoomName) ([]*livekit.ParticipantInfo, error)
}

View File

@@ -13,7 +13,8 @@ import (
// encapsulates CRUD operations for room settings
type LocalStore struct {
// map of roomName => room
rooms map[livekit.RoomName]*livekit.Room
rooms map[livekit.RoomName]*livekit.Room
roomInternal map[livekit.RoomName]*livekit.RoomInternal
// map of roomName => { identity: participant }
participants map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo
@@ -39,31 +40,45 @@ func (s *LocalStore) StoreRoom(_ context.Context, room *livekit.Room) error {
return nil
}
func (s *LocalStore) LoadRoom(_ context.Context, name livekit.RoomName) (*livekit.Room, error) {
func (s *LocalStore) StoreRoomInternal(_ context.Context, roomName livekit.RoomName, internal *livekit.RoomInternal) error {
s.lock.Lock()
s.roomInternal[roomName] = internal
s.lock.Unlock()
return nil
}
func (s *LocalStore) LoadRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Room, error) {
s.lock.RLock()
defer s.lock.RUnlock()
room := s.rooms[name]
room := s.rooms[roomName]
if room == nil {
return nil, ErrRoomNotFound
}
return room, nil
}
func (s *LocalStore) ListRooms(_ context.Context, names []livekit.RoomName) ([]*livekit.Room, error) {
func (s *LocalStore) LoadRoomInternal(_ context.Context, roomName livekit.RoomName) (*livekit.RoomInternal, error) {
s.lock.RLock()
defer s.lock.RUnlock()
return s.roomInternal[roomName], nil
}
func (s *LocalStore) ListRooms(_ context.Context, roomNames []livekit.RoomName) ([]*livekit.Room, error) {
s.lock.RLock()
defer s.lock.RUnlock()
rooms := make([]*livekit.Room, 0, len(s.rooms))
for _, r := range s.rooms {
if names == nil || funk.Contains(names, livekit.RoomName(r.Name)) {
if roomNames == nil || funk.Contains(roomNames, livekit.RoomName(r.Name)) {
rooms = append(rooms, r)
}
}
return rooms, nil
}
func (s *LocalStore) DeleteRoom(ctx context.Context, name livekit.RoomName) error {
room, err := s.LoadRoom(ctx, name)
func (s *LocalStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName) error {
room, err := s.LoadRoom(ctx, roomName)
if err == ErrRoomNotFound {
return nil
} else if err != nil {

View File

@@ -23,7 +23,8 @@ const (
VersionKey = "livekit_version"
// RoomsKey is hash of room_name => Room proto
RoomsKey = "rooms"
RoomsKey = "rooms"
RoomInternalKey = "room_internal"
// EgressKey is a hash of egressID => egress info
EgressKey = "egress"
@@ -115,8 +116,23 @@ func (s *RedisStore) StoreRoom(_ context.Context, room *livekit.Room) error {
return nil
}
func (s *RedisStore) LoadRoom(_ context.Context, name livekit.RoomName) (*livekit.Room, error) {
data, err := s.rc.HGet(s.ctx, RoomsKey, string(name)).Result()
func (s *RedisStore) StoreRoomInternal(_ context.Context, roomName livekit.RoomName, internal *livekit.RoomInternal) error {
data, err := proto.Marshal(internal)
if err != nil {
return err
}
pp := s.rc.Pipeline()
pp.HSet(s.ctx, RoomInternalKey, roomName, data)
if _, err = pp.Exec(s.ctx); err != nil {
return errors.Wrap(err, "could not create room")
}
return nil
}
func (s *RedisStore) LoadRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Room, error) {
data, err := s.rc.HGet(s.ctx, RoomsKey, string(roomName)).Result()
if err != nil {
if err == redis.Nil {
err = ErrRoomNotFound
@@ -133,18 +149,36 @@ func (s *RedisStore) LoadRoom(_ context.Context, name livekit.RoomName) (*liveki
return &room, nil
}
func (s *RedisStore) ListRooms(_ context.Context, names []livekit.RoomName) ([]*livekit.Room, error) {
func (s *RedisStore) LoadRoomInternal(_ context.Context, roomName livekit.RoomName) (*livekit.RoomInternal, error) {
data, err := s.rc.HGet(s.ctx, RoomInternalKey, string(roomName)).Result()
if err != nil {
if err == redis.Nil {
return nil, nil
}
return nil, err
}
internal := &livekit.RoomInternal{}
err = proto.Unmarshal([]byte(data), internal)
if err != nil {
return nil, err
}
return internal, nil
}
func (s *RedisStore) ListRooms(_ context.Context, roomNames []livekit.RoomName) ([]*livekit.Room, error) {
var items []string
var err error
if names == nil {
if roomNames == nil {
items, err = s.rc.HVals(s.ctx, RoomsKey).Result()
if err != nil && err != redis.Nil {
return nil, errors.Wrap(err, "could not get rooms")
}
} else {
roomNames := livekit.RoomNamesAsStrings(names)
names := livekit.RoomNamesAsStrings(roomNames)
var results []interface{}
results, err = s.rc.HMGet(s.ctx, RoomsKey, roomNames...).Result()
results, err = s.rc.HMGet(s.ctx, RoomsKey, names...).Result()
if err != nil && err != redis.Nil {
return nil, errors.Wrap(err, "could not get rooms by names")
}
@@ -168,23 +202,24 @@ func (s *RedisStore) ListRooms(_ context.Context, names []livekit.RoomName) ([]*
return rooms, nil
}
func (s *RedisStore) DeleteRoom(ctx context.Context, name livekit.RoomName) error {
_, err := s.LoadRoom(ctx, name)
func (s *RedisStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName) error {
_, err := s.LoadRoom(ctx, roomName)
if err == ErrRoomNotFound {
return nil
}
pp := s.rc.Pipeline()
pp.HDel(s.ctx, RoomsKey, string(name))
pp.Del(s.ctx, RoomParticipantsPrefix+string(name))
pp.HDel(s.ctx, RoomsKey, string(roomName))
pp.HDel(s.ctx, RoomInternalKey, string(roomName))
pp.Del(s.ctx, RoomParticipantsPrefix+string(roomName))
_, err = pp.Exec(s.ctx)
return err
}
func (s *RedisStore) LockRoom(_ context.Context, name livekit.RoomName, duration time.Duration) (string, error) {
func (s *RedisStore) LockRoom(_ context.Context, roomName livekit.RoomName, duration time.Duration) (string, error) {
token := utils.NewGuid("LOCK")
key := RoomLockPrefix + string(name)
key := RoomLockPrefix + string(roomName)
startTime := time.Now()
for {
@@ -207,8 +242,8 @@ func (s *RedisStore) LockRoom(_ context.Context, name livekit.RoomName, duration
return "", ErrRoomLockFailed
}
func (s *RedisStore) UnlockRoom(_ context.Context, name livekit.RoomName, uid string) error {
key := RoomLockPrefix + string(name)
func (s *RedisStore) UnlockRoom(_ context.Context, roomName livekit.RoomName, uid string) error {
key := RoomLockPrefix + string(roomName)
val, err := s.rc.Get(s.ctx, key).Result()
if err == redis.Nil {

View File

@@ -68,10 +68,18 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
if req.Metadata != "" {
rm.Metadata = req.Metadata
}
if err := r.roomStore.StoreRoom(ctx, rm); err != nil {
if err = r.roomStore.StoreRoom(ctx, rm); err != nil {
return nil, err
}
if req.Egress != nil && req.Egress.Tracks != nil {
internal := &livekit.RoomInternal{TrackEgress: req.Egress.Tracks}
if err = r.roomStore.StoreRoomInternal(ctx, livekit.RoomName(req.Name), internal); err != nil {
return nil, err
}
}
// check if room already assigned
existing, err := r.router.GetNodeForRoom(ctx, livekit.RoomName(rm.Name))
if err != routing.ErrNotFound && err != nil {

View File

@@ -48,6 +48,7 @@ type RoomManager struct {
roomStore ObjectStore
telemetry telemetry.TelemetryService
clientConfManager clientconfiguration.ClientConfigurationManager
egressLauncher rtc.EgressLauncher
rooms map[livekit.RoomName]*rtc.Room
@@ -61,6 +62,7 @@ func NewLocalRoomManager(
router routing.Router,
telemetry telemetry.TelemetryService,
clientConfManager clientconfiguration.ClientConfigurationManager,
egressLauncher rtc.EgressLauncher,
) (*RoomManager, error) {
rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip)
@@ -76,6 +78,7 @@ func NewLocalRoomManager(
roomStore: roomStore,
telemetry: telemetry,
clientConfManager: clientConfManager,
egressLauncher: egressLauncher,
rooms: make(map[livekit.RoomName]*rtc.Room),
@@ -373,6 +376,11 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
return nil, err
}
internal, err := r.roomStore.LoadRoomInternal(ctx, roomName)
if err != nil {
return nil, err
}
r.lock.Lock()
currentRoom := r.rooms[roomName]
@@ -388,7 +396,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
}
// construct ice servers
newRoom := rtc.NewRoom(ri, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry)
newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry, r.egressLauncher)
newRoom.OnClose(func() {
roomInfo := newRoom.ToProto()

View File

@@ -12,6 +12,7 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/protocol/livekit"
)
@@ -22,18 +23,27 @@ const (
// A rooms service that supports a single node
type RoomService struct {
router routing.MessageRouter
roomAllocator RoomAllocator
roomStore ServiceStore
conf config.RoomConfig
conf config.RoomConfig
router routing.MessageRouter
roomAllocator RoomAllocator
roomStore ServiceStore
egressLauncher rtc.EgressLauncher
}
func NewRoomService(ra RoomAllocator, rs ServiceStore, router routing.MessageRouter, conf config.RoomConfig) (svc *RoomService, err error) {
func NewRoomService(
conf config.RoomConfig,
router routing.MessageRouter,
roomAllocator RoomAllocator,
serviceStore ServiceStore,
egressLauncher rtc.EgressLauncher,
) (svc *RoomService, err error) {
svc = &RoomService{
router: router,
roomAllocator: ra,
roomStore: rs,
conf: conf,
conf: conf,
router: router,
roomAllocator: roomAllocator,
roomStore: serviceStore,
egressLauncher: egressLauncher,
}
return
}
@@ -41,11 +51,28 @@ func NewRoomService(ra RoomAllocator, rs ServiceStore, router routing.MessageRou
func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (rm *livekit.Room, err error) {
if err = EnsureCreatePermission(ctx); err != nil {
return nil, twirpAuthError(err)
} else if req.Egress != nil {
if s.egressLauncher == nil {
return nil, ErrEgressNotConnected
} else if err = EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
}
rm, err = s.roomAllocator.CreateRoom(ctx, req)
if err != nil {
err = errors.Wrap(err, "could not create room")
return
}
if req.Egress != nil && req.Egress.Room != nil {
egress := &livekit.StartEgressRequest{
Request: &livekit.StartEgressRequest_RoomComposite{
RoomComposite: req.Egress.Room,
},
RoomId: rm.Sid,
}
_, err = s.egressLauncher.StartEgress(ctx, egress)
}
return

View File

@@ -107,7 +107,7 @@ func newTestRoomService(conf config.RoomConfig) *TestRoomService {
router := &routingfakes.FakeRouter{}
allocator := &servicefakes.FakeRoomAllocator{}
store := &servicefakes.FakeServiceStore{}
svc, err := service.NewRoomService(allocator, store, router, conf)
svc, err := service.NewRoomService(conf, router, allocator, store, nil)
if err != nil {
panic(err)
}

View File

@@ -93,6 +93,20 @@ type FakeObjectStore struct {
result1 *livekit.Room
result2 error
}
LoadRoomInternalStub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error)
loadRoomInternalMutex sync.RWMutex
loadRoomInternalArgsForCall []struct {
arg1 context.Context
arg2 livekit.RoomName
}
loadRoomInternalReturns struct {
result1 *livekit.RoomInternal
result2 error
}
loadRoomInternalReturnsOnCall map[int]struct {
result1 *livekit.RoomInternal
result2 error
}
LockRoomStub func(context.Context, livekit.RoomName, time.Duration) (string, error)
lockRoomMutex sync.RWMutex
lockRoomArgsForCall []struct {
@@ -133,6 +147,19 @@ type FakeObjectStore struct {
storeRoomReturnsOnCall map[int]struct {
result1 error
}
StoreRoomInternalStub func(context.Context, livekit.RoomName, *livekit.RoomInternal) error
storeRoomInternalMutex sync.RWMutex
storeRoomInternalArgsForCall []struct {
arg1 context.Context
arg2 livekit.RoomName
arg3 *livekit.RoomInternal
}
storeRoomInternalReturns struct {
result1 error
}
storeRoomInternalReturnsOnCall map[int]struct {
result1 error
}
UnlockRoomStub func(context.Context, livekit.RoomName, string) error
unlockRoomMutex sync.RWMutex
unlockRoomArgsForCall []struct {
@@ -541,6 +568,71 @@ func (fake *FakeObjectStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room,
}{result1, result2}
}
func (fake *FakeObjectStore) LoadRoomInternal(arg1 context.Context, arg2 livekit.RoomName) (*livekit.RoomInternal, error) {
fake.loadRoomInternalMutex.Lock()
ret, specificReturn := fake.loadRoomInternalReturnsOnCall[len(fake.loadRoomInternalArgsForCall)]
fake.loadRoomInternalArgsForCall = append(fake.loadRoomInternalArgsForCall, struct {
arg1 context.Context
arg2 livekit.RoomName
}{arg1, arg2})
stub := fake.LoadRoomInternalStub
fakeReturns := fake.loadRoomInternalReturns
fake.recordInvocation("LoadRoomInternal", []interface{}{arg1, arg2})
fake.loadRoomInternalMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeObjectStore) LoadRoomInternalCallCount() int {
fake.loadRoomInternalMutex.RLock()
defer fake.loadRoomInternalMutex.RUnlock()
return len(fake.loadRoomInternalArgsForCall)
}
func (fake *FakeObjectStore) LoadRoomInternalCalls(stub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error)) {
fake.loadRoomInternalMutex.Lock()
defer fake.loadRoomInternalMutex.Unlock()
fake.LoadRoomInternalStub = stub
}
func (fake *FakeObjectStore) LoadRoomInternalArgsForCall(i int) (context.Context, livekit.RoomName) {
fake.loadRoomInternalMutex.RLock()
defer fake.loadRoomInternalMutex.RUnlock()
argsForCall := fake.loadRoomInternalArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeObjectStore) LoadRoomInternalReturns(result1 *livekit.RoomInternal, result2 error) {
fake.loadRoomInternalMutex.Lock()
defer fake.loadRoomInternalMutex.Unlock()
fake.LoadRoomInternalStub = nil
fake.loadRoomInternalReturns = struct {
result1 *livekit.RoomInternal
result2 error
}{result1, result2}
}
func (fake *FakeObjectStore) LoadRoomInternalReturnsOnCall(i int, result1 *livekit.RoomInternal, result2 error) {
fake.loadRoomInternalMutex.Lock()
defer fake.loadRoomInternalMutex.Unlock()
fake.LoadRoomInternalStub = nil
if fake.loadRoomInternalReturnsOnCall == nil {
fake.loadRoomInternalReturnsOnCall = make(map[int]struct {
result1 *livekit.RoomInternal
result2 error
})
}
fake.loadRoomInternalReturnsOnCall[i] = struct {
result1 *livekit.RoomInternal
result2 error
}{result1, result2}
}
func (fake *FakeObjectStore) LockRoom(arg1 context.Context, arg2 livekit.RoomName, arg3 time.Duration) (string, error) {
fake.lockRoomMutex.Lock()
ret, specificReturn := fake.lockRoomReturnsOnCall[len(fake.lockRoomArgsForCall)]
@@ -732,6 +824,69 @@ func (fake *FakeObjectStore) StoreRoomReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeObjectStore) StoreRoomInternal(arg1 context.Context, arg2 livekit.RoomName, arg3 *livekit.RoomInternal) error {
fake.storeRoomInternalMutex.Lock()
ret, specificReturn := fake.storeRoomInternalReturnsOnCall[len(fake.storeRoomInternalArgsForCall)]
fake.storeRoomInternalArgsForCall = append(fake.storeRoomInternalArgsForCall, struct {
arg1 context.Context
arg2 livekit.RoomName
arg3 *livekit.RoomInternal
}{arg1, arg2, arg3})
stub := fake.StoreRoomInternalStub
fakeReturns := fake.storeRoomInternalReturns
fake.recordInvocation("StoreRoomInternal", []interface{}{arg1, arg2, arg3})
fake.storeRoomInternalMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeObjectStore) StoreRoomInternalCallCount() int {
fake.storeRoomInternalMutex.RLock()
defer fake.storeRoomInternalMutex.RUnlock()
return len(fake.storeRoomInternalArgsForCall)
}
func (fake *FakeObjectStore) StoreRoomInternalCalls(stub func(context.Context, livekit.RoomName, *livekit.RoomInternal) error) {
fake.storeRoomInternalMutex.Lock()
defer fake.storeRoomInternalMutex.Unlock()
fake.StoreRoomInternalStub = stub
}
func (fake *FakeObjectStore) StoreRoomInternalArgsForCall(i int) (context.Context, livekit.RoomName, *livekit.RoomInternal) {
fake.storeRoomInternalMutex.RLock()
defer fake.storeRoomInternalMutex.RUnlock()
argsForCall := fake.storeRoomInternalArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeObjectStore) StoreRoomInternalReturns(result1 error) {
fake.storeRoomInternalMutex.Lock()
defer fake.storeRoomInternalMutex.Unlock()
fake.StoreRoomInternalStub = nil
fake.storeRoomInternalReturns = struct {
result1 error
}{result1}
}
func (fake *FakeObjectStore) StoreRoomInternalReturnsOnCall(i int, result1 error) {
fake.storeRoomInternalMutex.Lock()
defer fake.storeRoomInternalMutex.Unlock()
fake.StoreRoomInternalStub = nil
if fake.storeRoomInternalReturnsOnCall == nil {
fake.storeRoomInternalReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.storeRoomInternalReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeObjectStore) UnlockRoom(arg1 context.Context, arg2 livekit.RoomName, arg3 string) error {
fake.unlockRoomMutex.Lock()
ret, specificReturn := fake.unlockRoomReturnsOnCall[len(fake.unlockRoomArgsForCall)]
@@ -810,12 +965,16 @@ func (fake *FakeObjectStore) Invocations() map[string][][]interface{} {
defer fake.loadParticipantMutex.RUnlock()
fake.loadRoomMutex.RLock()
defer fake.loadRoomMutex.RUnlock()
fake.loadRoomInternalMutex.RLock()
defer fake.loadRoomInternalMutex.RUnlock()
fake.lockRoomMutex.RLock()
defer fake.lockRoomMutex.RUnlock()
fake.storeParticipantMutex.RLock()
defer fake.storeParticipantMutex.RUnlock()
fake.storeRoomMutex.RLock()
defer fake.storeRoomMutex.RUnlock()
fake.storeRoomInternalMutex.RLock()
defer fake.storeRoomInternalMutex.RUnlock()
fake.unlockRoomMutex.RLock()
defer fake.unlockRoomMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}

View File

@@ -67,6 +67,20 @@ type FakeServiceStore struct {
result1 *livekit.Room
result2 error
}
LoadRoomInternalStub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error)
loadRoomInternalMutex sync.RWMutex
loadRoomInternalArgsForCall []struct {
arg1 context.Context
arg2 livekit.RoomName
}
loadRoomInternalReturns struct {
result1 *livekit.RoomInternal
result2 error
}
loadRoomInternalReturnsOnCall map[int]struct {
result1 *livekit.RoomInternal
result2 error
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
@@ -337,6 +351,71 @@ func (fake *FakeServiceStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room
}{result1, result2}
}
func (fake *FakeServiceStore) LoadRoomInternal(arg1 context.Context, arg2 livekit.RoomName) (*livekit.RoomInternal, error) {
fake.loadRoomInternalMutex.Lock()
ret, specificReturn := fake.loadRoomInternalReturnsOnCall[len(fake.loadRoomInternalArgsForCall)]
fake.loadRoomInternalArgsForCall = append(fake.loadRoomInternalArgsForCall, struct {
arg1 context.Context
arg2 livekit.RoomName
}{arg1, arg2})
stub := fake.LoadRoomInternalStub
fakeReturns := fake.loadRoomInternalReturns
fake.recordInvocation("LoadRoomInternal", []interface{}{arg1, arg2})
fake.loadRoomInternalMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeServiceStore) LoadRoomInternalCallCount() int {
fake.loadRoomInternalMutex.RLock()
defer fake.loadRoomInternalMutex.RUnlock()
return len(fake.loadRoomInternalArgsForCall)
}
func (fake *FakeServiceStore) LoadRoomInternalCalls(stub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error)) {
fake.loadRoomInternalMutex.Lock()
defer fake.loadRoomInternalMutex.Unlock()
fake.LoadRoomInternalStub = stub
}
func (fake *FakeServiceStore) LoadRoomInternalArgsForCall(i int) (context.Context, livekit.RoomName) {
fake.loadRoomInternalMutex.RLock()
defer fake.loadRoomInternalMutex.RUnlock()
argsForCall := fake.loadRoomInternalArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeServiceStore) LoadRoomInternalReturns(result1 *livekit.RoomInternal, result2 error) {
fake.loadRoomInternalMutex.Lock()
defer fake.loadRoomInternalMutex.Unlock()
fake.LoadRoomInternalStub = nil
fake.loadRoomInternalReturns = struct {
result1 *livekit.RoomInternal
result2 error
}{result1, result2}
}
func (fake *FakeServiceStore) LoadRoomInternalReturnsOnCall(i int, result1 *livekit.RoomInternal, result2 error) {
fake.loadRoomInternalMutex.Lock()
defer fake.loadRoomInternalMutex.Unlock()
fake.LoadRoomInternalStub = nil
if fake.loadRoomInternalReturnsOnCall == nil {
fake.loadRoomInternalReturnsOnCall = make(map[int]struct {
result1 *livekit.RoomInternal
result2 error
})
}
fake.loadRoomInternalReturnsOnCall[i] = struct {
result1 *livekit.RoomInternal
result2 error
}{result1, result2}
}
func (fake *FakeServiceStore) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
@@ -348,6 +427,8 @@ func (fake *FakeServiceStore) Invocations() map[string][][]interface{} {
defer fake.loadParticipantMutex.RUnlock()
fake.loadRoomMutex.RLock()
defer fake.loadRoomMutex.RUnlock()
fake.loadRoomInternalMutex.RLock()
defer fake.loadRoomInternalMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value

View File

@@ -45,6 +45,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
telemetry.NewTelemetryService,
egress.NewRedisRPCClient,
getEgressStore,
NewEgressLauncher,
NewEgressService,
ingress.NewRedisRPC,
getIngressStore,

View File

@@ -34,6 +34,7 @@ import (
// Injectors from wire.go:
func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) {
roomConfig := getRoomConf(conf)
client, err := createRedisClient(conf)
if err != nil {
return nil, err
@@ -44,11 +45,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
roomConfig := getRoomConf(conf)
roomService, err := NewRoomService(roomAllocator, objectStore, router, roomConfig)
if err != nil {
return nil, err
}
nodeID := getNodeID(currentNode)
rpcClient := egress.NewRedisRPCClient(nodeID, client)
egressStore := getEgressStore(objectStore)
@@ -62,7 +58,12 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
analyticsService := telemetry.NewAnalyticsService(conf, currentNode)
telemetryService := telemetry.NewTelemetryService(notifier, analyticsService)
egressService := NewEgressService(rpcClient, objectStore, egressStore, roomService, telemetryService)
rtcEgressLauncher := NewEgressLauncher(rpcClient, egressStore, telemetryService)
roomService, err := NewRoomService(roomConfig, router, roomAllocator, objectStore, rtcEgressLauncher)
if err != nil {
return nil, err
}
egressService := NewEgressService(rpcClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher)
ingressConfig := getIngressConfig(conf)
rpc := ingress.NewRedisRPC(nodeID, client)
ingressRPCClient := getIngressRPCClient(rpc)
@@ -70,7 +71,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
ingressService := NewIngressService(ingressConfig, ingressRPCClient, ingressStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode)
clientConfigurationManager := createClientConfiguration()
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager)
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher)
if err != nil {
return nil, err
}

339
pkg/telemetry/events.go Normal file
View File

@@ -0,0 +1,339 @@
package telemetry
import (
"context"
"errors"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
)
func (t *telemetryService) NotifyEvent(ctx context.Context, event *livekit.WebhookEvent) {
if t.notifier == nil {
logger.Warnw("failed to notify webhook", errors.New("no notifier"), "event", event.Event)
return
}
event.CreatedAt = time.Now().Unix()
event.Id = utils.NewGuid("EV_")
t.webhookPool.Submit(func() {
if err := t.notifier.Notify(ctx, event); err != nil {
logger.Warnw("failed to notify webhook", err, "event", event.Event)
}
})
}
func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
t.enqueue(func() {
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRoomStarted,
Room: room,
})
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_ROOM_CREATED,
Timestamp: &timestamppb.Timestamp{Seconds: room.CreationTime},
Room: room,
})
})
}
func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
t.enqueue(func() {
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRoomFinished,
Room: room,
})
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_ROOM_ENDED,
Timestamp: timestamppb.Now(),
RoomId: room.Sid,
Room: room,
})
})
}
func (t *telemetryService) ParticipantJoined(
ctx context.Context,
room *livekit.Room,
participant *livekit.ParticipantInfo,
clientInfo *livekit.ClientInfo,
clientMeta *livekit.AnalyticsClientMeta,
) {
t.enqueue(func() {
prometheus.IncrementParticipantJoin(1)
prometheus.AddParticipant()
worker := newStatsWorker(
ctx,
t,
livekit.RoomID(room.Sid),
livekit.RoomName(room.Name),
livekit.ParticipantID(participant.Sid),
livekit.ParticipantIdentity(participant.Identity),
)
t.lock.Lock()
t.workers[livekit.ParticipantID(participant.Sid)] = worker
t.lock.Unlock()
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED,
Timestamp: timestamppb.Now(),
RoomId: room.Sid,
ParticipantId: participant.Sid,
Participant: participant,
Room: room,
ClientInfo: clientInfo,
ClientMeta: clientMeta,
})
})
}
func (t *telemetryService) ParticipantActive(
ctx context.Context,
room *livekit.Room,
participant *livekit.ParticipantInfo,
clientMeta *livekit.AnalyticsClientMeta,
) {
t.enqueue(func() {
// consider participant joined only when they became active
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventParticipantJoined,
Room: room,
Participant: participant,
})
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_PARTICIPANT_ACTIVE,
Timestamp: timestamppb.Now(),
RoomId: room.Sid,
ParticipantId: participant.Sid,
Room: room,
ClientMeta: clientMeta,
})
})
}
func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
t.enqueue(func() {
if worker, ok := t.getWorker(livekit.ParticipantID(participant.Sid)); ok {
worker.Close()
}
prometheus.SubParticipant()
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventParticipantLeft,
Room: room,
Participant: participant,
})
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_PARTICIPANT_LEFT,
Timestamp: timestamppb.Now(),
RoomId: room.Sid,
ParticipantId: participant.Sid,
Participant: participant,
Room: room,
})
})
}
func (t *telemetryService) TrackPublished(
ctx context.Context,
participantID livekit.ParticipantID,
identity livekit.ParticipantIdentity,
track *livekit.TrackInfo,
) {
t.enqueue(func() {
prometheus.AddPublishedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventTrackPublished,
Room: &livekit.Room{
Sid: string(roomID),
Name: string(roomName),
},
Participant: &livekit.ParticipantInfo{
Sid: string(participantID),
Identity: string(identity),
},
Track: track,
})
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_PUBLISHED,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
Participant: &livekit.ParticipantInfo{
Sid: string(participantID),
Identity: string(identity),
},
Track: track,
Room: &livekit.Room{Name: string(roomName)},
})
})
}
func (t *telemetryService) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.enqueue(func() {
roomID, roomName := t.getRoomDetails(participantID)
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_PUBLISHED_UPDATE,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
Track: track,
Room: &livekit.Room{Name: string(roomName)},
})
})
}
func (t *telemetryService) TrackMaxSubscribedVideoQuality(
ctx context.Context,
participantID livekit.ParticipantID,
track *livekit.TrackInfo,
mime string,
maxQuality livekit.VideoQuality,
) {
t.enqueue(func() {
roomID, roomName := t.getRoomDetails(participantID)
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_MAX_SUBSCRIBED_VIDEO_QUALITY,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
Track: track,
Room: &livekit.Room{Name: string(roomName)},
MaxSubscribedVideoQuality: maxQuality,
Mime: mime,
})
})
}
func (t *telemetryService) TrackSubscribed(
ctx context.Context,
participantID livekit.ParticipantID,
track *livekit.TrackInfo,
publisher *livekit.ParticipantInfo,
) {
t.enqueue(func() {
prometheus.AddSubscribedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_SUBSCRIBED,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
Track: track,
Room: &livekit.Room{Name: string(roomName)},
Publisher: publisher,
})
})
}
func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.enqueue(func() {
prometheus.SubSubscribedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_UNSUBSCRIBED,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
TrackId: track.Sid,
Room: &livekit.Room{Name: string(roomName)},
})
})
}
func (t *telemetryService) TrackUnpublished(
ctx context.Context,
participantID livekit.ParticipantID,
identity livekit.ParticipantIdentity,
track *livekit.TrackInfo,
ssrc uint32,
) {
t.enqueue(func() {
roomID, roomName := t.getRoomDetails(participantID)
prometheus.SubPublishedTrack(track.Type.String())
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventTrackUnpublished,
Room: &livekit.Room{
Sid: string(roomID),
Name: string(roomName),
},
Participant: &livekit.ParticipantInfo{
Sid: string(participantID),
Identity: string(identity),
},
Track: track,
})
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_UNPUBLISHED,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
TrackId: track.Sid,
Room: &livekit.Room{Name: string(roomName)},
})
})
}
func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) {
t.enqueue(func() {
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventEgressStarted,
EgressInfo: info,
})
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_EGRESS_STARTED,
Timestamp: timestamppb.Now(),
EgressId: info.EgressId,
RoomId: info.RoomId,
Egress: info,
})
})
}
func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) {
t.enqueue(func() {
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventEgressEnded,
EgressInfo: info,
})
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_EGRESS_ENDED,
Timestamp: timestamppb.Now(),
EgressId: info.EgressId,
RoomId: info.RoomId,
Egress: info,
})
})
}
func (t *telemetryService) getRoomDetails(participantID livekit.ParticipantID) (livekit.RoomID, livekit.RoomName) {
if worker, ok := t.getWorker(participantID); ok {
return worker.roomID, worker.roomName
}
return "", ""
}

View File

@@ -1,8 +1,9 @@
package telemetrytest
package telemetry_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
@@ -34,6 +35,7 @@ func Test_OnParticipantJoin_EventIsSent(t *testing.T) {
// do
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta)
time.Sleep(time.Millisecond * 500)
// test
require.Equal(t, 1, fixture.analytics.SendEventCallCount())
@@ -68,6 +70,7 @@ func Test_OnParticipantLeft_EventIsSent(t *testing.T) {
// do
fixture.sut.ParticipantLeft(context.Background(), room, participantInfo)
time.Sleep(time.Millisecond * 500)
// test
require.Equal(t, 1, fixture.analytics.SendEventCallCount())
@@ -102,6 +105,7 @@ func Test_OnTrackUpdate_EventIsSent(t *testing.T) {
// do
fixture.sut.TrackPublishedUpdate(context.Background(), livekit.ParticipantID(partID), trackInfo)
time.Sleep(time.Millisecond * 500)
// test
require.Equal(t, 1, fixture.analytics.SendEventCallCount())
@@ -142,6 +146,7 @@ func Test_OnParticipantActive_EventIsSent(t *testing.T) {
// do
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta)
time.Sleep(time.Millisecond * 500)
// test
require.Equal(t, 1, fixture.analytics.SendEventCallCount())
@@ -154,6 +159,7 @@ func Test_OnParticipantActive_EventIsSent(t *testing.T) {
}
fixture.sut.ParticipantActive(context.Background(), room, participantInfo, clientMetaConnect)
time.Sleep(time.Millisecond * 500)
require.Equal(t, 2, fixture.analytics.SendEventCallCount())
_, eventActive := fixture.analytics.SendEventArgsForCall(1)
@@ -192,16 +198,16 @@ func Test_OnTrackSubscribed_EventIsSent(t *testing.T) {
// do
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta)
time.Sleep(time.Millisecond * 500)
// test
require.Equal(t, 1, fixture.analytics.SendEventCallCount())
_, event := fixture.analytics.SendEventArgsForCall(0)
require.Equal(t, room, event.Room)
// test
// do
fixture.sut.TrackSubscribed(context.Background(), livekit.ParticipantID(partSID), trackInfo, publisherInfo)
time.Sleep(time.Millisecond * 500)
require.Equal(t, 2, fixture.analytics.SendEventCallCount())
_, eventTrackSubscribed := fixture.analytics.SendEventArgsForCall(1)

51
pkg/telemetry/stats.go Normal file
View File

@@ -0,0 +1,51 @@
package telemetry
import (
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/livekit"
)
func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat) {
t.enqueue(func() {
direction := prometheus.Incoming
if streamType == livekit.StreamType_DOWNSTREAM {
direction = prometheus.Outgoing
}
nacks := uint32(0)
plis := uint32(0)
firs := uint32(0)
packets := uint32(0)
bytes := uint64(0)
retransmitBytes := uint64(0)
retransmitPackets := uint32(0)
for _, stream := range stat.Streams {
nacks += stream.Nacks
plis += stream.Plis
firs += stream.Firs
packets += stream.PrimaryPackets + stream.PaddingPackets
bytes += stream.PrimaryBytes + stream.PaddingBytes
if streamType == livekit.StreamType_DOWNSTREAM {
retransmitPackets += stream.RetransmitPackets
retransmitBytes += stream.RetransmitBytes
} else {
// for upstream, we don't account for these separately for now
packets += stream.RetransmitPackets
bytes += stream.RetransmitBytes
}
}
prometheus.IncrementRTCP(direction, nacks, plis, firs)
prometheus.IncrementPackets(direction, uint64(packets), false)
prometheus.IncrementBytes(direction, bytes, false)
if retransmitPackets != 0 {
prometheus.IncrementPackets(direction, uint64(retransmitPackets), true)
}
if retransmitBytes != 0 {
prometheus.IncrementBytes(direction, retransmitBytes, true)
}
if worker, ok := t.getWorker(participantID); ok {
worker.OnTrackStat(trackID, streamType, stat)
}
})
}

View File

@@ -1,14 +1,15 @@
package telemetrytest
package telemetry_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/livekit"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes"
)
@@ -18,14 +19,14 @@ func init() {
}
type telemetryServiceFixture struct {
sut telemetry.TelemetryServiceInternal
sut telemetry.TelemetryService
analytics *telemetryfakes.FakeAnalyticsService
}
func createFixture() *telemetryServiceFixture {
fixture := &telemetryServiceFixture{}
fixture.analytics = &telemetryfakes.FakeAnalyticsService{}
fixture.sut = telemetry.NewTelemetryServiceInternal(nil, fixture.analytics)
fixture.sut = telemetry.NewTelemetryService(nil, fixture.analytics)
return fixture
}
@@ -43,7 +44,9 @@ func Test_ParticipantAndRoomDataAreSentWithAnalytics(t *testing.T) {
packet := 33
stat := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet)}}}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, "", stat)
fixture.sut.SendAnalytics()
// flush
fixture.flush()
// test
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
@@ -74,7 +77,9 @@ func Test_OnDownstreamPackets(t *testing.T) {
stat := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packets[i]), PrimaryPackets: uint32(1)}}}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat)
}
fixture.sut.SendAnalytics()
// flush
fixture.flush()
// test
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
@@ -106,7 +111,9 @@ func Test_OnDownstreamPackets_SeveralTracks(t *testing.T) {
trackID2 := livekit.TrackID("trackID2")
stat2 := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet2), PrimaryPackets: 1}}}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID2, stat2)
fixture.sut.SendAnalytics()
// flush
fixture.flush()
// test
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
@@ -172,7 +179,8 @@ func Test_OnDownStreamStat(t *testing.T) {
}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat2)
fixture.sut.SendAnalytics()
// flush
fixture.flush()
// test
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
@@ -209,7 +217,9 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) {
},
}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat1) // there should be bytes reported so that stats are sent
fixture.sut.SendAnalytics()
// flush
fixture.flush()
stat2 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -221,10 +231,12 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) {
},
}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat2)
fixture.sut.SendAnalytics()
// flush
fixture.flush()
// test
require.Equal(t, 2, fixture.analytics.SendStatsCallCount()) // 2 calls to fixture.sut.SendAnalytics()
require.Equal(t, 2, fixture.analytics.SendStatsCallCount()) // 2 calls to fixture.sut.FlushStats()
_, stats := fixture.analytics.SendStatsArgsForCall(0)
require.Equal(t, 1, len(stats))
require.Equal(t, livekit.StreamType_DOWNSTREAM, stats[0].Kind)
@@ -279,7 +291,9 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) {
},
}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID2, stat3)
fixture.sut.SendAnalytics()
// flush
fixture.flush()
// test
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
@@ -346,7 +360,9 @@ func Test_OnUpstreamStat(t *testing.T) {
},
}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat2)
fixture.sut.SendAnalytics()
// flush
fixture.flush()
// test
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
@@ -413,7 +429,9 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
},
}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID2, stat3)
fixture.sut.SendAnalytics()
// flush
fixture.flush()
// test
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
@@ -440,7 +458,10 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
// remove 1 track - track stats were flushed above, so no more calls to SendStats
fixture.sut.TrackUnpublished(context.Background(), partSID, identity, &livekit.TrackInfo{Sid: string(trackID2)}, 0)
fixture.sut.SendAnalytics()
// flush
fixture.flush()
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
}
@@ -457,6 +478,7 @@ func Test_AnalyticsSentWhenParticipantLeaves(t *testing.T) {
fixture.sut.ParticipantLeft(context.Background(), room, participantInfo)
// should not be called if there are no track stats
time.Sleep(time.Millisecond * 500)
require.Equal(t, 0, fixture.analytics.SendStatsCallCount())
}
@@ -483,7 +505,9 @@ func Test_AddUpTrack(t *testing.T) {
}
trackID := livekit.TrackID("trackID")
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat)
fixture.sut.SendAnalytics()
// flush
fixture.flush()
// test
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
@@ -503,9 +527,9 @@ func Test_AddUpTrack_SeveralBuffers_Simulcast(t *testing.T) {
partSID := livekit.ParticipantID("part1")
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil)
// do
trackID := livekit.TrackID("trackID")
stat1 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
{
@@ -519,7 +543,9 @@ func Test_AddUpTrack_SeveralBuffers_Simulcast(t *testing.T) {
},
}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat1)
fixture.sut.SendAnalytics()
// flush
fixture.flush()
// test
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
@@ -562,7 +588,9 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) {
},
}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, "trackID1", stat2)
fixture.sut.SendAnalytics()
// flush
fixture.flush()
// test
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
@@ -571,3 +599,8 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) {
require.Equal(t, livekit.StreamType_UPSTREAM, stats[0].Kind)
require.Equal(t, livekit.StreamType_DOWNSTREAM, stats[1].Kind)
}
func (f *telemetryServiceFixture) flush() {
time.Sleep(time.Millisecond * 500)
f.sut.FlushStats()
}

View File

@@ -15,7 +15,7 @@ import (
// StatsWorker handles participant stats
type StatsWorker struct {
ctx context.Context
t TelemetryReporter
t TelemetryService
roomID livekit.RoomID
roomName livekit.RoomName
participantID livekit.ParticipantID
@@ -29,7 +29,7 @@ type StatsWorker struct {
func newStatsWorker(
ctx context.Context,
t TelemetryReporter,
t TelemetryService,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
@@ -58,7 +58,11 @@ func (s *StatsWorker) OnTrackStat(trackID livekit.TrackID, direction livekit.Str
s.lock.Unlock()
}
func (s *StatsWorker) Update() {
func (s *StatsWorker) ParticipantID() livekit.ParticipantID {
return s.participantID
}
func (s *StatsWorker) Flush() {
ts := timestamppb.Now()
s.lock.Lock()
@@ -79,57 +83,12 @@ func (s *StatsWorker) Update() {
stats = s.collectStats(ts, livekit.StreamType_UPSTREAM, incomingPerTrack, stats)
stats = s.collectStats(ts, livekit.StreamType_DOWNSTREAM, outgoingPerTrack, stats)
if len(stats) > 0 {
s.t.Report(s.ctx, stats)
s.t.SendStats(s.ctx, stats)
}
}
func (s *StatsWorker) collectStats(
ts *timestamppb.Timestamp,
streamType livekit.StreamType,
perTrack map[livekit.TrackID][]*livekit.AnalyticsStat,
stats []*livekit.AnalyticsStat,
) []*livekit.AnalyticsStat {
for trackID, analyticsStats := range perTrack {
analyticsStat := s.getDeltaStats(analyticsStats, ts, trackID, streamType)
if analyticsStat != nil {
stats = append(stats, analyticsStat)
}
}
return stats
}
func (s *StatsWorker) getDeltaStats(
stats []*livekit.AnalyticsStat,
ts *timestamppb.Timestamp,
trackID livekit.TrackID,
kind livekit.StreamType,
) *livekit.AnalyticsStat {
// merge all streams stats of track
analyticsStat := coalesce(stats)
if analyticsStat == nil {
return nil
}
s.patch(analyticsStat, ts, trackID, kind)
return analyticsStat
}
func (s *StatsWorker) patch(
analyticsStat *livekit.AnalyticsStat,
ts *timestamppb.Timestamp,
trackID livekit.TrackID,
kind livekit.StreamType,
) {
analyticsStat.TimeStamp = ts
analyticsStat.TrackId = string(trackID)
analyticsStat.Kind = kind
analyticsStat.RoomId = string(s.roomID)
analyticsStat.ParticipantId = string(s.participantID)
analyticsStat.RoomName = string(s.roomName)
}
func (s *StatsWorker) Close() {
s.Update()
s.Flush()
s.lock.Lock()
s.closedAt = time.Now()
@@ -143,12 +102,31 @@ func (s *StatsWorker) ClosedAt() time.Time {
return s.closedAt
}
func (s *StatsWorker) ParticipantID() livekit.ParticipantID {
return s.participantID
}
// -------------------------------------------------------------------------
func (s *StatsWorker) collectStats(
ts *timestamppb.Timestamp,
streamType livekit.StreamType,
perTrack map[livekit.TrackID][]*livekit.AnalyticsStat,
stats []*livekit.AnalyticsStat,
) []*livekit.AnalyticsStat {
for trackID, analyticsStats := range perTrack {
coalesced := coalesce(analyticsStats)
if coalesced == nil {
continue
}
coalesced.TimeStamp = ts
coalesced.TrackId = string(trackID)
coalesced.Kind = streamType
coalesced.RoomId = string(s.roomID)
coalesced.ParticipantId = string(s.participantID)
coalesced.RoomName = string(s.roomName)
stats = append(stats, coalesced)
}
return stats
}
// create a single stream and single video layer post aggregation
func coalesce(stats []*livekit.AnalyticsStat) *livekit.AnalyticsStat {
if len(stats) == 0 {

View File

@@ -22,6 +22,16 @@ type FakeTelemetryService struct {
arg1 context.Context
arg2 *livekit.EgressInfo
}
FlushStatsStub func()
flushStatsMutex sync.RWMutex
flushStatsArgsForCall []struct {
}
NotifyEventStub func(context.Context, *livekit.WebhookEvent)
notifyEventMutex sync.RWMutex
notifyEventArgsForCall []struct {
arg1 context.Context
arg2 *livekit.WebhookEvent
}
ParticipantActiveStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta)
participantActiveMutex sync.RWMutex
participantActiveArgsForCall []struct {
@@ -58,6 +68,18 @@ type FakeTelemetryService struct {
arg1 context.Context
arg2 *livekit.Room
}
SendEventStub func(context.Context, *livekit.AnalyticsEvent)
sendEventMutex sync.RWMutex
sendEventArgsForCall []struct {
arg1 context.Context
arg2 *livekit.AnalyticsEvent
}
SendStatsStub func(context.Context, []*livekit.AnalyticsStat)
sendStatsMutex sync.RWMutex
sendStatsArgsForCall []struct {
arg1 context.Context
arg2 []*livekit.AnalyticsStat
}
TrackMaxSubscribedVideoQualityStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo, string, livekit.VideoQuality)
trackMaxSubscribedVideoQualityMutex sync.RWMutex
trackMaxSubscribedVideoQualityArgsForCall []struct {
@@ -184,6 +206,63 @@ func (fake *FakeTelemetryService) EgressStartedArgsForCall(i int) (context.Conte
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeTelemetryService) FlushStats() {
fake.flushStatsMutex.Lock()
fake.flushStatsArgsForCall = append(fake.flushStatsArgsForCall, struct {
}{})
stub := fake.FlushStatsStub
fake.recordInvocation("FlushStats", []interface{}{})
fake.flushStatsMutex.Unlock()
if stub != nil {
fake.FlushStatsStub()
}
}
func (fake *FakeTelemetryService) FlushStatsCallCount() int {
fake.flushStatsMutex.RLock()
defer fake.flushStatsMutex.RUnlock()
return len(fake.flushStatsArgsForCall)
}
func (fake *FakeTelemetryService) FlushStatsCalls(stub func()) {
fake.flushStatsMutex.Lock()
defer fake.flushStatsMutex.Unlock()
fake.FlushStatsStub = stub
}
func (fake *FakeTelemetryService) NotifyEvent(arg1 context.Context, arg2 *livekit.WebhookEvent) {
fake.notifyEventMutex.Lock()
fake.notifyEventArgsForCall = append(fake.notifyEventArgsForCall, struct {
arg1 context.Context
arg2 *livekit.WebhookEvent
}{arg1, arg2})
stub := fake.NotifyEventStub
fake.recordInvocation("NotifyEvent", []interface{}{arg1, arg2})
fake.notifyEventMutex.Unlock()
if stub != nil {
fake.NotifyEventStub(arg1, arg2)
}
}
func (fake *FakeTelemetryService) NotifyEventCallCount() int {
fake.notifyEventMutex.RLock()
defer fake.notifyEventMutex.RUnlock()
return len(fake.notifyEventArgsForCall)
}
func (fake *FakeTelemetryService) NotifyEventCalls(stub func(context.Context, *livekit.WebhookEvent)) {
fake.notifyEventMutex.Lock()
defer fake.notifyEventMutex.Unlock()
fake.NotifyEventStub = stub
}
func (fake *FakeTelemetryService) NotifyEventArgsForCall(i int) (context.Context, *livekit.WebhookEvent) {
fake.notifyEventMutex.RLock()
defer fake.notifyEventMutex.RUnlock()
argsForCall := fake.notifyEventArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeTelemetryService) ParticipantActive(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.AnalyticsClientMeta) {
fake.participantActiveMutex.Lock()
fake.participantActiveArgsForCall = append(fake.participantActiveArgsForCall, struct {
@@ -355,6 +434,77 @@ func (fake *FakeTelemetryService) RoomStartedArgsForCall(i int) (context.Context
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeTelemetryService) SendEvent(arg1 context.Context, arg2 *livekit.AnalyticsEvent) {
fake.sendEventMutex.Lock()
fake.sendEventArgsForCall = append(fake.sendEventArgsForCall, struct {
arg1 context.Context
arg2 *livekit.AnalyticsEvent
}{arg1, arg2})
stub := fake.SendEventStub
fake.recordInvocation("SendEvent", []interface{}{arg1, arg2})
fake.sendEventMutex.Unlock()
if stub != nil {
fake.SendEventStub(arg1, arg2)
}
}
func (fake *FakeTelemetryService) SendEventCallCount() int {
fake.sendEventMutex.RLock()
defer fake.sendEventMutex.RUnlock()
return len(fake.sendEventArgsForCall)
}
func (fake *FakeTelemetryService) SendEventCalls(stub func(context.Context, *livekit.AnalyticsEvent)) {
fake.sendEventMutex.Lock()
defer fake.sendEventMutex.Unlock()
fake.SendEventStub = stub
}
func (fake *FakeTelemetryService) SendEventArgsForCall(i int) (context.Context, *livekit.AnalyticsEvent) {
fake.sendEventMutex.RLock()
defer fake.sendEventMutex.RUnlock()
argsForCall := fake.sendEventArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeTelemetryService) SendStats(arg1 context.Context, arg2 []*livekit.AnalyticsStat) {
var arg2Copy []*livekit.AnalyticsStat
if arg2 != nil {
arg2Copy = make([]*livekit.AnalyticsStat, len(arg2))
copy(arg2Copy, arg2)
}
fake.sendStatsMutex.Lock()
fake.sendStatsArgsForCall = append(fake.sendStatsArgsForCall, struct {
arg1 context.Context
arg2 []*livekit.AnalyticsStat
}{arg1, arg2Copy})
stub := fake.SendStatsStub
fake.recordInvocation("SendStats", []interface{}{arg1, arg2Copy})
fake.sendStatsMutex.Unlock()
if stub != nil {
fake.SendStatsStub(arg1, arg2)
}
}
func (fake *FakeTelemetryService) SendStatsCallCount() int {
fake.sendStatsMutex.RLock()
defer fake.sendStatsMutex.RUnlock()
return len(fake.sendStatsArgsForCall)
}
func (fake *FakeTelemetryService) SendStatsCalls(stub func(context.Context, []*livekit.AnalyticsStat)) {
fake.sendStatsMutex.Lock()
defer fake.sendStatsMutex.Unlock()
fake.SendStatsStub = stub
}
func (fake *FakeTelemetryService) SendStatsArgsForCall(i int) (context.Context, []*livekit.AnalyticsStat) {
fake.sendStatsMutex.RLock()
defer fake.sendStatsMutex.RUnlock()
argsForCall := fake.sendStatsArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeTelemetryService) TrackMaxSubscribedVideoQuality(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo, arg4 string, arg5 livekit.VideoQuality) {
fake.trackMaxSubscribedVideoQualityMutex.Lock()
fake.trackMaxSubscribedVideoQualityArgsForCall = append(fake.trackMaxSubscribedVideoQualityArgsForCall, struct {
@@ -607,6 +757,10 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} {
defer fake.egressEndedMutex.RUnlock()
fake.egressStartedMutex.RLock()
defer fake.egressStartedMutex.RUnlock()
fake.flushStatsMutex.RLock()
defer fake.flushStatsMutex.RUnlock()
fake.notifyEventMutex.RLock()
defer fake.notifyEventMutex.RUnlock()
fake.participantActiveMutex.RLock()
defer fake.participantActiveMutex.RUnlock()
fake.participantJoinedMutex.RLock()
@@ -617,6 +771,10 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} {
defer fake.roomEndedMutex.RUnlock()
fake.roomStartedMutex.RLock()
defer fake.roomStartedMutex.RUnlock()
fake.sendEventMutex.RLock()
defer fake.sendEventMutex.RUnlock()
fake.sendStatsMutex.RLock()
defer fake.sendStatsMutex.RUnlock()
fake.trackMaxSubscribedVideoQualityMutex.RLock()
defer fake.trackMaxSubscribedVideoQualityMutex.RUnlock()
fake.trackPublishedMutex.RLock()

View File

@@ -2,8 +2,11 @@ package telemetry
import (
"context"
"sync"
"time"
"github.com/gammazero/workerpool"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -29,20 +32,38 @@ type TelemetryService interface {
TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, mime string, maxQuality livekit.VideoQuality)
EgressStarted(ctx context.Context, info *livekit.EgressInfo)
EgressEnded(ctx context.Context, info *livekit.EgressInfo)
// helpers
AnalyticsService
NotifyEvent(ctx context.Context, event *livekit.WebhookEvent)
FlushStats()
}
const (
maxWebhookWorkers = 50
workerCleanupWait = 3 * time.Minute
jobQueueBufferSize = 10000
)
type telemetryService struct {
internalService TelemetryServiceInternal
jobsChan chan func()
}
AnalyticsService
// queue should be sufficiently large to avoid blocking
const jobQueueBufferSize = 10000
notifier webhook.Notifier
webhookPool *workerpool.WorkerPool
jobsChan chan func()
lock sync.RWMutex
workers map[livekit.ParticipantID]*StatsWorker
}
func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService {
t := &telemetryService{
internalService: NewTelemetryServiceInternal(notifier, analytics),
jobsChan: make(chan func(), jobQueueBufferSize),
AnalyticsService: analytics,
notifier: notifier,
webhookPool: workerpool.New(maxWebhookWorkers),
jobsChan: make(chan func(), jobQueueBufferSize),
workers: make(map[livekit.ParticipantID]*StatsWorker),
}
go t.run()
@@ -50,6 +71,15 @@ func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService)
return t
}
func (t *telemetryService) FlushStats() {
t.lock.Lock()
defer t.lock.Unlock()
for _, worker := range t.workers {
worker.Flush()
}
}
func (t *telemetryService) run() {
ticker := time.NewTicker(config.StatsUpdateInterval)
defer ticker.Stop()
@@ -60,9 +90,9 @@ func (t *telemetryService) run() {
for {
select {
case <-ticker.C:
t.internalService.SendAnalytics()
t.FlushStats()
case <-cleanupTicker.C:
t.internalService.CleanupWorkers()
t.cleanupWorkers()
case op := <-t.jobsChan:
op()
}
@@ -78,87 +108,23 @@ func (t *telemetryService) enqueue(op func()) {
}
}
func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stats *livekit.AnalyticsStat) {
t.enqueue(func() {
t.internalService.TrackStats(streamType, participantID, trackID, stats)
})
func (t *telemetryService) getWorker(participantID livekit.ParticipantID) (worker *StatsWorker, ok bool) {
t.lock.RLock()
defer t.lock.RUnlock()
worker, ok = t.workers[participantID]
return
}
func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
t.enqueue(func() {
t.internalService.RoomStarted(ctx, room)
})
}
func (t *telemetryService) cleanupWorkers() {
t.lock.Lock()
defer t.lock.Unlock()
func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
t.enqueue(func() {
t.internalService.RoomEnded(ctx, room)
})
}
func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo,
clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta) {
t.enqueue(func() {
t.internalService.ParticipantJoined(ctx, room, participant, clientInfo, clientMeta)
})
}
func (t *telemetryService) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) {
t.enqueue(func() {
t.internalService.ParticipantActive(ctx, room, participant, clientMeta)
})
}
func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
t.enqueue(func() {
t.internalService.ParticipantLeft(ctx, room, participant)
})
}
func (t *telemetryService) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) {
t.enqueue(func() {
t.internalService.TrackPublished(ctx, participantID, identity, track)
})
}
func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32) {
t.enqueue(func() {
t.internalService.TrackUnpublished(ctx, participantID, identity, track, ssrc)
})
}
func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo) {
t.enqueue(func() {
t.internalService.TrackSubscribed(ctx, participantID, track, publisher)
})
}
func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.enqueue(func() {
t.internalService.TrackUnsubscribed(ctx, participantID, track)
})
}
func (t *telemetryService) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.enqueue(func() {
t.internalService.TrackPublishedUpdate(ctx, participantID, track)
})
}
func (t *telemetryService) TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, mime string, maxQuality livekit.VideoQuality) {
t.enqueue(func() {
t.internalService.TrackMaxSubscribedVideoQuality(ctx, participantID, track, mime, maxQuality)
})
}
func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) {
t.enqueue(func() {
t.internalService.EgressStarted(ctx, info)
})
}
func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) {
t.enqueue(func() {
t.internalService.EgressEnded(ctx, info)
})
for participantID, worker := range t.workers {
closedAt := worker.ClosedAt()
if !closedAt.IsZero() && time.Since(closedAt) > workerCleanupWait {
logger.Debugw("reaping analytics worker for participant", "pID", participantID)
delete(t.workers, participantID)
}
}
}

View File

@@ -1,145 +0,0 @@
package telemetry
import (
"context"
"sync"
"time"
"github.com/gammazero/workerpool"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/webhook"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
const (
maxWebhookWorkers = 50
workerCleanupWait = 3 * time.Minute
)
type TelemetryServiceInternal interface {
TelemetryService
SendAnalytics()
CleanupWorkers()
}
type TelemetryReporter interface {
Report(ctx context.Context, stats []*livekit.AnalyticsStat)
}
type telemetryServiceInternal struct {
notifier webhook.Notifier
webhookPool *workerpool.WorkerPool
// one worker per participant
workersMu sync.RWMutex
workers []*StatsWorker
workersIdx map[livekit.ParticipantID]int
analytics AnalyticsService
}
func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsService) TelemetryServiceInternal {
return &telemetryServiceInternal{
notifier: notifier,
webhookPool: workerpool.New(maxWebhookWorkers),
workersIdx: make(map[livekit.ParticipantID]int),
analytics: analytics,
}
}
func (t *telemetryServiceInternal) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat) {
direction := prometheus.Incoming
if streamType == livekit.StreamType_DOWNSTREAM {
direction = prometheus.Outgoing
}
nacks := uint32(0)
plis := uint32(0)
firs := uint32(0)
packets := uint32(0)
bytes := uint64(0)
retransmitBytes := uint64(0)
retransmitPackets := uint32(0)
for _, stream := range stat.Streams {
nacks += stream.Nacks
plis += stream.Plis
firs += stream.Firs
packets += stream.PrimaryPackets + stream.PaddingPackets
bytes += stream.PrimaryBytes + stream.PaddingBytes
if streamType == livekit.StreamType_DOWNSTREAM {
retransmitPackets += stream.RetransmitPackets
retransmitBytes += stream.RetransmitBytes
} else {
// for upstream, we don't account for these separately for now
packets += stream.RetransmitPackets
bytes += stream.RetransmitBytes
}
}
prometheus.IncrementRTCP(direction, nacks, plis, firs)
prometheus.IncrementPackets(direction, uint64(packets), false)
prometheus.IncrementBytes(direction, bytes, false)
if retransmitPackets != 0 {
prometheus.IncrementPackets(direction, uint64(retransmitPackets), true)
}
if retransmitBytes != 0 {
prometheus.IncrementBytes(direction, retransmitBytes, true)
}
if w := t.getStatsWorker(participantID); w != nil {
w.OnTrackStat(trackID, streamType, stat)
}
}
func (t *telemetryServiceInternal) Report(ctx context.Context, stats []*livekit.AnalyticsStat) {
t.analytics.SendStats(ctx, stats)
}
func (t *telemetryServiceInternal) SendAnalytics() {
t.workersMu.RLock()
workers := t.workers
t.workersMu.RUnlock()
for _, worker := range workers {
if worker != nil {
worker.Update()
}
}
}
func (t *telemetryServiceInternal) CleanupWorkers() {
t.workersMu.RLock()
workers := t.workers
t.workersMu.RUnlock()
for _, worker := range workers {
if worker == nil {
continue
}
closedAt := worker.ClosedAt()
if !closedAt.IsZero() && time.Since(closedAt) > workerCleanupWait {
pID := worker.ParticipantID()
logger.Debugw("reaping analytics worker for participant", "pID", pID)
t.workersMu.Lock()
if idx, ok := t.workersIdx[pID]; ok {
delete(t.workersIdx, pID)
t.workers[idx] = nil
}
t.workersMu.Unlock()
}
}
}
func (t *telemetryServiceInternal) getStatsWorker(participantID livekit.ParticipantID) *StatsWorker {
t.workersMu.RLock()
defer t.workersMu.RUnlock()
if idx, ok := t.workersIdx[participantID]; ok {
return t.workers[idx]
}
return nil
}

View File

@@ -1,308 +0,0 @@
package telemetry
import (
"context"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
func (t *telemetryServiceInternal) RoomStarted(ctx context.Context, room *livekit.Room) {
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRoomStarted,
Room: room,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_ROOM_CREATED,
Timestamp: &timestamppb.Timestamp{Seconds: room.CreationTime},
Room: room,
})
}
func (t *telemetryServiceInternal) RoomEnded(ctx context.Context, room *livekit.Room) {
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRoomFinished,
Room: room,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_ROOM_ENDED,
Timestamp: timestamppb.Now(),
RoomId: room.Sid,
Room: room,
})
}
func (t *telemetryServiceInternal) ParticipantJoined(
ctx context.Context,
room *livekit.Room,
participant *livekit.ParticipantInfo,
clientInfo *livekit.ClientInfo,
clientMeta *livekit.AnalyticsClientMeta,
) {
prometheus.IncrementParticipantJoin(1)
prometheus.AddParticipant()
newWorker := newStatsWorker(
ctx,
t,
livekit.RoomID(room.Sid),
livekit.RoomName(room.Name),
livekit.ParticipantID(participant.Sid),
livekit.ParticipantIdentity(participant.Identity),
)
t.workersMu.Lock()
var free = false
for idx, worker := range t.workers {
if worker != nil {
continue
}
free = true
t.workersIdx[livekit.ParticipantID(participant.Sid)] = idx
t.workers[idx] = newWorker
break
}
if !free {
t.workersIdx[livekit.ParticipantID(participant.Sid)] = len(t.workers)
t.workers = append(t.workers, newWorker)
}
t.workersMu.Unlock()
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED,
Timestamp: timestamppb.Now(),
RoomId: room.Sid,
ParticipantId: participant.Sid,
Participant: participant,
Room: room,
ClientInfo: clientInfo,
ClientMeta: clientMeta,
})
}
func (t *telemetryServiceInternal) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) {
// consider participant joined only when they became active
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventParticipantJoined,
Room: room,
Participant: participant,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_PARTICIPANT_ACTIVE,
Timestamp: timestamppb.Now(),
RoomId: room.Sid,
ParticipantId: participant.Sid,
Room: room,
ClientMeta: clientMeta,
})
}
func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
w := t.getStatsWorker(livekit.ParticipantID(participant.Sid))
if w != nil {
w.Close()
}
prometheus.SubParticipant()
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventParticipantLeft,
Room: room,
Participant: participant,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_PARTICIPANT_LEFT,
Timestamp: timestamppb.Now(),
RoomId: room.Sid,
ParticipantId: participant.Sid,
Participant: participant,
Room: room,
})
}
func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) {
prometheus.AddPublishedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventTrackPublished,
Room: &livekit.Room{
Sid: string(roomID),
Name: string(roomName),
},
Participant: &livekit.ParticipantInfo{
Sid: string(participantID),
Identity: string(identity),
},
Track: track,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_PUBLISHED,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
Participant: &livekit.ParticipantInfo{
Sid: string(participantID),
Identity: string(identity),
},
Track: track,
Room: &livekit.Room{Name: string(roomName)},
})
}
func (t *telemetryServiceInternal) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
roomID, roomName := t.getRoomDetails(participantID)
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_PUBLISHED_UPDATE,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
Track: track,
Room: &livekit.Room{Name: string(roomName)},
})
}
func (t *telemetryServiceInternal) TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo,
mime string, maxQuality livekit.VideoQuality) {
roomID, roomName := t.getRoomDetails(participantID)
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_MAX_SUBSCRIBED_VIDEO_QUALITY,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
Track: track,
Room: &livekit.Room{Name: string(roomName)},
MaxSubscribedVideoQuality: maxQuality,
Mime: mime,
})
}
func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32) {
roomID := livekit.RoomID("")
roomName := livekit.RoomName("")
w := t.getStatsWorker(participantID)
if w != nil {
roomID = w.roomID
roomName = w.roomName
}
prometheus.SubPublishedTrack(track.Type.String())
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventTrackUnpublished,
Room: &livekit.Room{
Sid: string(roomID),
Name: string(roomName),
},
Participant: &livekit.ParticipantInfo{
Sid: string(participantID),
Identity: string(identity),
},
Track: track,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_UNPUBLISHED,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
TrackId: track.Sid,
Room: &livekit.Room{Name: string(roomName)},
})
}
func (t *telemetryServiceInternal) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo,
publisher *livekit.ParticipantInfo) {
prometheus.AddSubscribedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_SUBSCRIBED,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
Track: track,
Room: &livekit.Room{Name: string(roomName)},
Publisher: publisher,
})
}
func (t *telemetryServiceInternal) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
prometheus.SubSubscribedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_TRACK_UNSUBSCRIBED,
Timestamp: timestamppb.Now(),
RoomId: string(roomID),
ParticipantId: string(participantID),
TrackId: track.Sid,
Room: &livekit.Room{Name: string(roomName)},
})
}
func (t *telemetryServiceInternal) getRoomDetails(participantID livekit.ParticipantID) (livekit.RoomID, livekit.RoomName) {
if w := t.getStatsWorker(participantID); w != nil {
return w.roomID, w.roomName
}
return "", ""
}
func (t *telemetryServiceInternal) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) {
if t.notifier == nil {
return
}
event.CreatedAt = time.Now().Unix()
event.Id = utils.NewGuid("EV_")
t.webhookPool.Submit(func() {
if err := t.notifier.Notify(ctx, event); err != nil {
logger.Warnw("failed to notify webhook", err, "event", event.Event)
}
})
}
func (t *telemetryServiceInternal) EgressStarted(ctx context.Context, info *livekit.EgressInfo) {
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventEgressStarted,
EgressInfo: info,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_EGRESS_STARTED,
Timestamp: timestamppb.Now(),
EgressId: info.EgressId,
RoomId: info.RoomId,
Egress: info,
})
}
func (t *telemetryServiceInternal) EgressEnded(ctx context.Context, info *livekit.EgressInfo) {
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventEgressEnded,
EgressInfo: info,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_EGRESS_ENDED,
Timestamp: timestamppb.Now(),
EgressId: info.EgressId,
RoomId: info.RoomId,
Egress: info,
})
}