From 803046b88255b0fb6dbf52bc8ff4e3e08e0bbb52 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Wed, 21 Sep 2022 12:04:19 -0700 Subject: [PATCH] Auto egress (#1011) * auto egress * fix room service test * reuse StartTrackEgress * add timestamp * update prefixed filename explicitly * update protocol * clean up telemetry * fix telemetry tests * separate room internal storage * auto participant egress * remove custom template url * fix internal key * use map for stats workers * remove sync.Map * remove participant composite --- go.mod | 2 +- go.sum | 4 +- pkg/rtc/room.go | 30 +- pkg/rtc/room_egress.go | 93 +++++ pkg/rtc/room_test.go | 2 + pkg/service/egress.go | 30 +- pkg/service/interfaces.go | 15 +- pkg/service/localstore.go | 29 +- pkg/service/redisstore.go | 65 +++- pkg/service/roomallocator.go | 10 +- pkg/service/roommanager.go | 10 +- pkg/service/roomservice.go | 45 ++- pkg/service/roomservice_test.go | 2 +- pkg/service/servicefakes/fake_object_store.go | 159 ++++++++ .../servicefakes/fake_service_store.go | 81 +++++ pkg/service/wire.go | 1 + pkg/service/wire_gen.go | 15 +- pkg/telemetry/events.go | 339 ++++++++++++++++++ ..._service_events_test.go => events_test.go} | 12 +- pkg/telemetry/stats.go | 51 +++ ...elemetry_service_test.go => stats_test.go} | 71 +++- pkg/telemetry/statsworker.go | 86 ++--- .../telemetryfakes/fake_telemetry_service.go | 158 ++++++++ pkg/telemetry/telemetryservice.go | 144 +++----- pkg/telemetry/telemetryserviceinternal.go | 145 -------- .../telemetryserviceinternalevents.go | 308 ---------------- 26 files changed, 1230 insertions(+), 677 deletions(-) create mode 100644 pkg/rtc/room_egress.go create mode 100644 pkg/telemetry/events.go rename pkg/telemetry/{test/telemetry_service_events_test.go => events_test.go} (96%) create mode 100644 pkg/telemetry/stats.go rename pkg/telemetry/{test/telemetry_service_test.go => stats_test.go} (96%) delete mode 100644 pkg/telemetry/telemetryserviceinternal.go delete mode 100644 pkg/telemetry/telemetryserviceinternalevents.go diff --git a/go.mod b/go.mod index 54a7aad57..cffba4d35 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-version v1.6.0 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v1.0.4-0.20220920233742-8645a138fb2e + github.com/livekit/protocol v1.1.2 github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 github.com/mackerelio/go-osstat v0.2.3 github.com/magefile/mage v1.14.0 diff --git a/go.sum b/go.sum index fbe25cac2..b02efff1a 100644 --- a/go.sum +++ b/go.sum @@ -240,8 +240,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v1.0.4-0.20220920233742-8645a138fb2e h1:lGP8FJESvE4zvVufl4tc02aw9Ym1zA3rGQ48H40pjwE= -github.com/livekit/protocol v1.0.4-0.20220920233742-8645a138fb2e/go.mod h1:eburCdz6ZtbgKSKYkAeCdWP1z33DB9clTphz7uNaxp0= +github.com/livekit/protocol v1.1.2 h1:LDEFKK16T57pwDwxlJkkWMpbgvR0DJ3PozjOnvq29CI= +github.com/livekit/protocol v1.1.2/go.mod h1:eburCdz6ZtbgKSKYkAeCdWP1z33DB9clTphz7uNaxp0= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.3 h1:jAMXD5erlDE39kdX2CU7YwCGRcxIO33u/p8+Fhe5dJw= diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index ea83f2ee8..186ab176d 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -39,12 +39,14 @@ type Room struct { lock sync.RWMutex protoRoom *livekit.Room + internal *livekit.RoomInternal Logger logger.Logger - config WebRTCConfig - audioConfig *config.AudioConfig - serverInfo *livekit.ServerInfo - telemetry telemetry.TelemetryService + config WebRTCConfig + audioConfig *config.AudioConfig + serverInfo *livekit.ServerInfo + telemetry telemetry.TelemetryService + egressLauncher EgressLauncher // map of identity -> Participant participants map[livekit.ParticipantIdentity]types.LocalParticipant @@ -73,17 +75,21 @@ type ParticipantOptions struct { func NewRoom( room *livekit.Room, + internal *livekit.RoomInternal, config WebRTCConfig, audioConfig *config.AudioConfig, serverInfo *livekit.ServerInfo, telemetry telemetry.TelemetryService, + egressLauncher EgressLauncher, ) *Room { r := &Room{ protoRoom: proto.Clone(room).(*livekit.Room), + internal: internal, Logger: LoggerWithRoom(logger.GetDefaultLogger(), livekit.RoomName(room.Name), livekit.RoomID(room.Sid)), config: config, audioConfig: audioConfig, telemetry: telemetry, + egressLauncher: egressLauncher, serverInfo: serverInfo, participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant), participantOpts: make(map[livekit.ParticipantIdentity]*ParticipantOptions), @@ -693,8 +699,6 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types. r.broadcastParticipantState(participant, broadcastOptions{skipSource: true}) r.lock.RLock() - defer r.lock.RUnlock() - // subscribe all existing participants to this MediaTrack for _, existingParticipant := range r.participants { if existingParticipant == participant { @@ -724,6 +728,20 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types. if r.onParticipantChanged != nil { r.onParticipantChanged(participant) } + r.lock.RUnlock() + + // auto track egress + if r.internal != nil && r.internal.TrackEgress != nil { + StartTrackEgress( + context.Background(), + r.egressLauncher, + r.telemetry, + r.internal.TrackEgress, + track, + r.Name(), + r.ID(), + ) + } } func (r *Room) onTrackUpdated(p types.LocalParticipant, _ types.MediaTrack) { diff --git a/pkg/rtc/room_egress.go b/pkg/rtc/room_egress.go new file mode 100644 index 000000000..f5788020b --- /dev/null +++ b/pkg/rtc/room_egress.go @@ -0,0 +1,93 @@ +package rtc + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/webhook" +) + +type EgressLauncher interface { + StartEgress(context.Context, *livekit.StartEgressRequest) (*livekit.EgressInfo, error) +} + +func StartTrackEgress( + ctx context.Context, + launcher EgressLauncher, + ts telemetry.TelemetryService, + opts *livekit.AutoTrackEgress, + track types.MediaTrack, + roomName livekit.RoomName, + roomID livekit.RoomID, +) { + if req, err := startTrackEgress(ctx, launcher, opts, track, roomName, roomID); err != nil { + // send egress failed webhook + ts.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventEgressEnded, + EgressInfo: &livekit.EgressInfo{ + RoomId: string(roomID), + RoomName: string(roomName), + Status: livekit.EgressStatus_EGRESS_FAILED, + Error: err.Error(), + Request: &livekit.EgressInfo_Track{Track: req}, + }, + }) + } +} + +func startTrackEgress( + ctx context.Context, + launcher EgressLauncher, + opts *livekit.AutoTrackEgress, + track types.MediaTrack, + roomName livekit.RoomName, + roomID livekit.RoomID, +) (*livekit.TrackEgressRequest, error) { + + output := &livekit.DirectFileOutput{ + Filepath: getFilePath(opts.FilePrefix, string(track.ID())), + } + + switch out := opts.Output.(type) { + case *livekit.AutoTrackEgress_Azure: + output.Output = &livekit.DirectFileOutput_Azure{Azure: out.Azure} + case *livekit.AutoTrackEgress_Gcp: + output.Output = &livekit.DirectFileOutput_Gcp{Gcp: out.Gcp} + case *livekit.AutoTrackEgress_S3: + output.Output = &livekit.DirectFileOutput_S3{S3: out.S3} + } + + req := &livekit.TrackEgressRequest{ + RoomName: string(roomName), + TrackId: string(track.ID()), + Output: &livekit.TrackEgressRequest_File{ + File: output, + }, + } + + if launcher == nil { + return req, errors.New("egress launcher not found") + } + + _, err := launcher.StartEgress(ctx, &livekit.StartEgressRequest{ + Request: &livekit.StartEgressRequest_Track{ + Track: req, + }, + RoomId: string(roomID), + }) + return req, err +} + +func getFilePath(prefix, identifier string) string { + if prefix == "" || strings.HasSuffix(prefix, "/") { + return fmt.Sprintf("%s%s_%s", prefix, identifier, time.Now().Format("2006-01-02T150405")) + } else { + return fmt.Sprintf("%s_%s_%s", prefix, identifier, time.Now().Format("2006-01-02T150405")) + } +} diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index b81785c35..cf7528c88 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -683,6 +683,7 @@ type testRoomOpts struct { func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room { rm := NewRoom( &livekit.Room{Name: "room"}, + nil, WebRTCConfig{}, &config.AudioConfig{ UpdateInterval: audioUpdateInterval, @@ -696,6 +697,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room { Region: "testregion", }, telemetry.NewTelemetryService(webhook.NewNotifier("", "", nil), &telemetryfakes.FakeAnalyticsService{}), + nil, ) for i := 0; i < opts.num+opts.numHidden; i++ { identity := livekit.ParticipantIdentity(fmt.Sprintf("p%d", i)) diff --git a/pkg/service/egress.go b/pkg/service/egress.go index 4f1bd13c0..7249b425d 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -8,6 +8,7 @@ import ( "google.golang.org/protobuf/proto" + "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -21,23 +22,43 @@ type EgressService struct { es EgressStore roomService livekit.RoomService telemetry telemetry.TelemetryService + launcher rtc.EgressLauncher shutdown chan struct{} } +type egressLauncher struct { + rpcClient egress.RPCClient + es EgressStore + telemetry telemetry.TelemetryService +} + +func NewEgressLauncher(rpcClient egress.RPCClient, es EgressStore, ts telemetry.TelemetryService) rtc.EgressLauncher { + if rpcClient == nil { + return nil + } + + return &egressLauncher{ + rpcClient: rpcClient, + es: es, + telemetry: ts, + } +} + func NewEgressService( rpcClient egress.RPCClient, store ServiceStore, es EgressStore, rs livekit.RoomService, ts telemetry.TelemetryService, + launcher rtc.EgressLauncher, ) *EgressService { - return &EgressService{ rpcClient: rpcClient, store: store, es: es, roomService: rs, telemetry: ts, + launcher: launcher, } } @@ -85,8 +106,7 @@ func (s *EgressService) StartTrackEgress(ctx context.Context, req *livekit.Track func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomName, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) { if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) - } - if s.rpcClient == nil { + } else if s.launcher == nil { return nil, ErrEgressNotConnected } @@ -96,6 +116,10 @@ func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomNa } req.RoomId = room.Sid + return s.launcher.StartEgress(ctx, req) +} + +func (s *egressLauncher) StartEgress(ctx context.Context, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) { info, err := s.rpcClient.SendRequest(ctx, req) if err != nil { return nil, err diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 80a7e0b6a..d31e2d492 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -10,17 +10,19 @@ import ( //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate // encapsulates CRUD operations for room settings +// //counterfeiter:generate . ObjectStore type ObjectStore interface { ServiceStore // enable locking on a specific room to prevent race // returns a (lock uuid, error) - LockRoom(ctx context.Context, name livekit.RoomName, duration time.Duration) (string, error) - UnlockRoom(ctx context.Context, name livekit.RoomName, uid string) error + LockRoom(ctx context.Context, roomName livekit.RoomName, duration time.Duration) (string, error) + UnlockRoom(ctx context.Context, roomName livekit.RoomName, uid string) error StoreRoom(ctx context.Context, room *livekit.Room) error - DeleteRoom(ctx context.Context, name livekit.RoomName) error + StoreRoomInternal(ctx context.Context, roomName livekit.RoomName, internal *livekit.RoomInternal) error + DeleteRoom(ctx context.Context, roomName livekit.RoomName) error StoreParticipant(ctx context.Context, roomName livekit.RoomName, participant *livekit.ParticipantInfo) error DeleteParticipant(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) error @@ -28,11 +30,12 @@ type ObjectStore interface { //counterfeiter:generate . ServiceStore type ServiceStore interface { - LoadRoom(ctx context.Context, name livekit.RoomName) (*livekit.Room, error) + LoadRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Room, error) + LoadRoomInternal(ctx context.Context, roomName livekit.RoomName) (*livekit.RoomInternal, error) + // ListRooms returns currently active rooms. if names is not nil, it'll filter and return // only rooms that match - ListRooms(ctx context.Context, names []livekit.RoomName) ([]*livekit.Room, error) - + ListRooms(ctx context.Context, roomNames []livekit.RoomName) ([]*livekit.Room, error) LoadParticipant(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error) ListParticipants(ctx context.Context, roomName livekit.RoomName) ([]*livekit.ParticipantInfo, error) } diff --git a/pkg/service/localstore.go b/pkg/service/localstore.go index 60037c584..b980ed3d9 100644 --- a/pkg/service/localstore.go +++ b/pkg/service/localstore.go @@ -13,7 +13,8 @@ import ( // encapsulates CRUD operations for room settings type LocalStore struct { // map of roomName => room - rooms map[livekit.RoomName]*livekit.Room + rooms map[livekit.RoomName]*livekit.Room + roomInternal map[livekit.RoomName]*livekit.RoomInternal // map of roomName => { identity: participant } participants map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo @@ -39,31 +40,45 @@ func (s *LocalStore) StoreRoom(_ context.Context, room *livekit.Room) error { return nil } -func (s *LocalStore) LoadRoom(_ context.Context, name livekit.RoomName) (*livekit.Room, error) { +func (s *LocalStore) StoreRoomInternal(_ context.Context, roomName livekit.RoomName, internal *livekit.RoomInternal) error { + s.lock.Lock() + s.roomInternal[roomName] = internal + s.lock.Unlock() + return nil +} + +func (s *LocalStore) LoadRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Room, error) { s.lock.RLock() defer s.lock.RUnlock() - room := s.rooms[name] + room := s.rooms[roomName] if room == nil { return nil, ErrRoomNotFound } return room, nil } -func (s *LocalStore) ListRooms(_ context.Context, names []livekit.RoomName) ([]*livekit.Room, error) { +func (s *LocalStore) LoadRoomInternal(_ context.Context, roomName livekit.RoomName) (*livekit.RoomInternal, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.roomInternal[roomName], nil +} + +func (s *LocalStore) ListRooms(_ context.Context, roomNames []livekit.RoomName) ([]*livekit.Room, error) { s.lock.RLock() defer s.lock.RUnlock() rooms := make([]*livekit.Room, 0, len(s.rooms)) for _, r := range s.rooms { - if names == nil || funk.Contains(names, livekit.RoomName(r.Name)) { + if roomNames == nil || funk.Contains(roomNames, livekit.RoomName(r.Name)) { rooms = append(rooms, r) } } return rooms, nil } -func (s *LocalStore) DeleteRoom(ctx context.Context, name livekit.RoomName) error { - room, err := s.LoadRoom(ctx, name) +func (s *LocalStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName) error { + room, err := s.LoadRoom(ctx, roomName) if err == ErrRoomNotFound { return nil } else if err != nil { diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index 79b5a8896..121272966 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -23,7 +23,8 @@ const ( VersionKey = "livekit_version" // RoomsKey is hash of room_name => Room proto - RoomsKey = "rooms" + RoomsKey = "rooms" + RoomInternalKey = "room_internal" // EgressKey is a hash of egressID => egress info EgressKey = "egress" @@ -115,8 +116,23 @@ func (s *RedisStore) StoreRoom(_ context.Context, room *livekit.Room) error { return nil } -func (s *RedisStore) LoadRoom(_ context.Context, name livekit.RoomName) (*livekit.Room, error) { - data, err := s.rc.HGet(s.ctx, RoomsKey, string(name)).Result() +func (s *RedisStore) StoreRoomInternal(_ context.Context, roomName livekit.RoomName, internal *livekit.RoomInternal) error { + data, err := proto.Marshal(internal) + if err != nil { + return err + } + + pp := s.rc.Pipeline() + pp.HSet(s.ctx, RoomInternalKey, roomName, data) + + if _, err = pp.Exec(s.ctx); err != nil { + return errors.Wrap(err, "could not create room") + } + return nil +} + +func (s *RedisStore) LoadRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Room, error) { + data, err := s.rc.HGet(s.ctx, RoomsKey, string(roomName)).Result() if err != nil { if err == redis.Nil { err = ErrRoomNotFound @@ -133,18 +149,36 @@ func (s *RedisStore) LoadRoom(_ context.Context, name livekit.RoomName) (*liveki return &room, nil } -func (s *RedisStore) ListRooms(_ context.Context, names []livekit.RoomName) ([]*livekit.Room, error) { +func (s *RedisStore) LoadRoomInternal(_ context.Context, roomName livekit.RoomName) (*livekit.RoomInternal, error) { + data, err := s.rc.HGet(s.ctx, RoomInternalKey, string(roomName)).Result() + if err != nil { + if err == redis.Nil { + return nil, nil + } + return nil, err + } + + internal := &livekit.RoomInternal{} + err = proto.Unmarshal([]byte(data), internal) + if err != nil { + return nil, err + } + + return internal, nil +} + +func (s *RedisStore) ListRooms(_ context.Context, roomNames []livekit.RoomName) ([]*livekit.Room, error) { var items []string var err error - if names == nil { + if roomNames == nil { items, err = s.rc.HVals(s.ctx, RoomsKey).Result() if err != nil && err != redis.Nil { return nil, errors.Wrap(err, "could not get rooms") } } else { - roomNames := livekit.RoomNamesAsStrings(names) + names := livekit.RoomNamesAsStrings(roomNames) var results []interface{} - results, err = s.rc.HMGet(s.ctx, RoomsKey, roomNames...).Result() + results, err = s.rc.HMGet(s.ctx, RoomsKey, names...).Result() if err != nil && err != redis.Nil { return nil, errors.Wrap(err, "could not get rooms by names") } @@ -168,23 +202,24 @@ func (s *RedisStore) ListRooms(_ context.Context, names []livekit.RoomName) ([]* return rooms, nil } -func (s *RedisStore) DeleteRoom(ctx context.Context, name livekit.RoomName) error { - _, err := s.LoadRoom(ctx, name) +func (s *RedisStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName) error { + _, err := s.LoadRoom(ctx, roomName) if err == ErrRoomNotFound { return nil } pp := s.rc.Pipeline() - pp.HDel(s.ctx, RoomsKey, string(name)) - pp.Del(s.ctx, RoomParticipantsPrefix+string(name)) + pp.HDel(s.ctx, RoomsKey, string(roomName)) + pp.HDel(s.ctx, RoomInternalKey, string(roomName)) + pp.Del(s.ctx, RoomParticipantsPrefix+string(roomName)) _, err = pp.Exec(s.ctx) return err } -func (s *RedisStore) LockRoom(_ context.Context, name livekit.RoomName, duration time.Duration) (string, error) { +func (s *RedisStore) LockRoom(_ context.Context, roomName livekit.RoomName, duration time.Duration) (string, error) { token := utils.NewGuid("LOCK") - key := RoomLockPrefix + string(name) + key := RoomLockPrefix + string(roomName) startTime := time.Now() for { @@ -207,8 +242,8 @@ func (s *RedisStore) LockRoom(_ context.Context, name livekit.RoomName, duration return "", ErrRoomLockFailed } -func (s *RedisStore) UnlockRoom(_ context.Context, name livekit.RoomName, uid string) error { - key := RoomLockPrefix + string(name) +func (s *RedisStore) UnlockRoom(_ context.Context, roomName livekit.RoomName, uid string) error { + key := RoomLockPrefix + string(roomName) val, err := s.rc.Get(s.ctx, key).Result() if err == redis.Nil { diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index ef2e04675..3f5c0207e 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -68,10 +68,18 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre if req.Metadata != "" { rm.Metadata = req.Metadata } - if err := r.roomStore.StoreRoom(ctx, rm); err != nil { + + if err = r.roomStore.StoreRoom(ctx, rm); err != nil { return nil, err } + if req.Egress != nil && req.Egress.Tracks != nil { + internal := &livekit.RoomInternal{TrackEgress: req.Egress.Tracks} + if err = r.roomStore.StoreRoomInternal(ctx, livekit.RoomName(req.Name), internal); err != nil { + return nil, err + } + } + // check if room already assigned existing, err := r.router.GetNodeForRoom(ctx, livekit.RoomName(rm.Name)) if err != routing.ErrNotFound && err != nil { diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 26c1a22e3..9751ab998 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -48,6 +48,7 @@ type RoomManager struct { roomStore ObjectStore telemetry telemetry.TelemetryService clientConfManager clientconfiguration.ClientConfigurationManager + egressLauncher rtc.EgressLauncher rooms map[livekit.RoomName]*rtc.Room @@ -61,6 +62,7 @@ func NewLocalRoomManager( router routing.Router, telemetry telemetry.TelemetryService, clientConfManager clientconfiguration.ClientConfigurationManager, + egressLauncher rtc.EgressLauncher, ) (*RoomManager, error) { rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip) @@ -76,6 +78,7 @@ func NewLocalRoomManager( roomStore: roomStore, telemetry: telemetry, clientConfManager: clientConfManager, + egressLauncher: egressLauncher, rooms: make(map[livekit.RoomName]*rtc.Room), @@ -373,6 +376,11 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room return nil, err } + internal, err := r.roomStore.LoadRoomInternal(ctx, roomName) + if err != nil { + return nil, err + } + r.lock.Lock() currentRoom := r.rooms[roomName] @@ -388,7 +396,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room } // construct ice servers - newRoom := rtc.NewRoom(ri, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry) + newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry, r.egressLauncher) newRoom.OnClose(func() { roomInfo := newRoom.ToProto() diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index d5bad1c24..462b4f81d 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -12,6 +12,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/protocol/livekit" ) @@ -22,18 +23,27 @@ const ( // A rooms service that supports a single node type RoomService struct { - router routing.MessageRouter - roomAllocator RoomAllocator - roomStore ServiceStore - conf config.RoomConfig + conf config.RoomConfig + router routing.MessageRouter + roomAllocator RoomAllocator + roomStore ServiceStore + egressLauncher rtc.EgressLauncher } -func NewRoomService(ra RoomAllocator, rs ServiceStore, router routing.MessageRouter, conf config.RoomConfig) (svc *RoomService, err error) { +func NewRoomService( + conf config.RoomConfig, + router routing.MessageRouter, + roomAllocator RoomAllocator, + serviceStore ServiceStore, + egressLauncher rtc.EgressLauncher, +) (svc *RoomService, err error) { + svc = &RoomService{ - router: router, - roomAllocator: ra, - roomStore: rs, - conf: conf, + conf: conf, + router: router, + roomAllocator: roomAllocator, + roomStore: serviceStore, + egressLauncher: egressLauncher, } return } @@ -41,11 +51,28 @@ func NewRoomService(ra RoomAllocator, rs ServiceStore, router routing.MessageRou func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (rm *livekit.Room, err error) { if err = EnsureCreatePermission(ctx); err != nil { return nil, twirpAuthError(err) + } else if req.Egress != nil { + if s.egressLauncher == nil { + return nil, ErrEgressNotConnected + } else if err = EnsureRecordPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } } rm, err = s.roomAllocator.CreateRoom(ctx, req) if err != nil { err = errors.Wrap(err, "could not create room") + return + } + + if req.Egress != nil && req.Egress.Room != nil { + egress := &livekit.StartEgressRequest{ + Request: &livekit.StartEgressRequest_RoomComposite{ + RoomComposite: req.Egress.Room, + }, + RoomId: rm.Sid, + } + _, err = s.egressLauncher.StartEgress(ctx, egress) } return diff --git a/pkg/service/roomservice_test.go b/pkg/service/roomservice_test.go index 74ec8fb31..946a8cf29 100644 --- a/pkg/service/roomservice_test.go +++ b/pkg/service/roomservice_test.go @@ -107,7 +107,7 @@ func newTestRoomService(conf config.RoomConfig) *TestRoomService { router := &routingfakes.FakeRouter{} allocator := &servicefakes.FakeRoomAllocator{} store := &servicefakes.FakeServiceStore{} - svc, err := service.NewRoomService(allocator, store, router, conf) + svc, err := service.NewRoomService(conf, router, allocator, store, nil) if err != nil { panic(err) } diff --git a/pkg/service/servicefakes/fake_object_store.go b/pkg/service/servicefakes/fake_object_store.go index a027f8961..9574b2c3c 100644 --- a/pkg/service/servicefakes/fake_object_store.go +++ b/pkg/service/servicefakes/fake_object_store.go @@ -93,6 +93,20 @@ type FakeObjectStore struct { result1 *livekit.Room result2 error } + LoadRoomInternalStub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error) + loadRoomInternalMutex sync.RWMutex + loadRoomInternalArgsForCall []struct { + arg1 context.Context + arg2 livekit.RoomName + } + loadRoomInternalReturns struct { + result1 *livekit.RoomInternal + result2 error + } + loadRoomInternalReturnsOnCall map[int]struct { + result1 *livekit.RoomInternal + result2 error + } LockRoomStub func(context.Context, livekit.RoomName, time.Duration) (string, error) lockRoomMutex sync.RWMutex lockRoomArgsForCall []struct { @@ -133,6 +147,19 @@ type FakeObjectStore struct { storeRoomReturnsOnCall map[int]struct { result1 error } + StoreRoomInternalStub func(context.Context, livekit.RoomName, *livekit.RoomInternal) error + storeRoomInternalMutex sync.RWMutex + storeRoomInternalArgsForCall []struct { + arg1 context.Context + arg2 livekit.RoomName + arg3 *livekit.RoomInternal + } + storeRoomInternalReturns struct { + result1 error + } + storeRoomInternalReturnsOnCall map[int]struct { + result1 error + } UnlockRoomStub func(context.Context, livekit.RoomName, string) error unlockRoomMutex sync.RWMutex unlockRoomArgsForCall []struct { @@ -541,6 +568,71 @@ func (fake *FakeObjectStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room, }{result1, result2} } +func (fake *FakeObjectStore) LoadRoomInternal(arg1 context.Context, arg2 livekit.RoomName) (*livekit.RoomInternal, error) { + fake.loadRoomInternalMutex.Lock() + ret, specificReturn := fake.loadRoomInternalReturnsOnCall[len(fake.loadRoomInternalArgsForCall)] + fake.loadRoomInternalArgsForCall = append(fake.loadRoomInternalArgsForCall, struct { + arg1 context.Context + arg2 livekit.RoomName + }{arg1, arg2}) + stub := fake.LoadRoomInternalStub + fakeReturns := fake.loadRoomInternalReturns + fake.recordInvocation("LoadRoomInternal", []interface{}{arg1, arg2}) + fake.loadRoomInternalMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeObjectStore) LoadRoomInternalCallCount() int { + fake.loadRoomInternalMutex.RLock() + defer fake.loadRoomInternalMutex.RUnlock() + return len(fake.loadRoomInternalArgsForCall) +} + +func (fake *FakeObjectStore) LoadRoomInternalCalls(stub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error)) { + fake.loadRoomInternalMutex.Lock() + defer fake.loadRoomInternalMutex.Unlock() + fake.LoadRoomInternalStub = stub +} + +func (fake *FakeObjectStore) LoadRoomInternalArgsForCall(i int) (context.Context, livekit.RoomName) { + fake.loadRoomInternalMutex.RLock() + defer fake.loadRoomInternalMutex.RUnlock() + argsForCall := fake.loadRoomInternalArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeObjectStore) LoadRoomInternalReturns(result1 *livekit.RoomInternal, result2 error) { + fake.loadRoomInternalMutex.Lock() + defer fake.loadRoomInternalMutex.Unlock() + fake.LoadRoomInternalStub = nil + fake.loadRoomInternalReturns = struct { + result1 *livekit.RoomInternal + result2 error + }{result1, result2} +} + +func (fake *FakeObjectStore) LoadRoomInternalReturnsOnCall(i int, result1 *livekit.RoomInternal, result2 error) { + fake.loadRoomInternalMutex.Lock() + defer fake.loadRoomInternalMutex.Unlock() + fake.LoadRoomInternalStub = nil + if fake.loadRoomInternalReturnsOnCall == nil { + fake.loadRoomInternalReturnsOnCall = make(map[int]struct { + result1 *livekit.RoomInternal + result2 error + }) + } + fake.loadRoomInternalReturnsOnCall[i] = struct { + result1 *livekit.RoomInternal + result2 error + }{result1, result2} +} + func (fake *FakeObjectStore) LockRoom(arg1 context.Context, arg2 livekit.RoomName, arg3 time.Duration) (string, error) { fake.lockRoomMutex.Lock() ret, specificReturn := fake.lockRoomReturnsOnCall[len(fake.lockRoomArgsForCall)] @@ -732,6 +824,69 @@ func (fake *FakeObjectStore) StoreRoomReturnsOnCall(i int, result1 error) { }{result1} } +func (fake *FakeObjectStore) StoreRoomInternal(arg1 context.Context, arg2 livekit.RoomName, arg3 *livekit.RoomInternal) error { + fake.storeRoomInternalMutex.Lock() + ret, specificReturn := fake.storeRoomInternalReturnsOnCall[len(fake.storeRoomInternalArgsForCall)] + fake.storeRoomInternalArgsForCall = append(fake.storeRoomInternalArgsForCall, struct { + arg1 context.Context + arg2 livekit.RoomName + arg3 *livekit.RoomInternal + }{arg1, arg2, arg3}) + stub := fake.StoreRoomInternalStub + fakeReturns := fake.storeRoomInternalReturns + fake.recordInvocation("StoreRoomInternal", []interface{}{arg1, arg2, arg3}) + fake.storeRoomInternalMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeObjectStore) StoreRoomInternalCallCount() int { + fake.storeRoomInternalMutex.RLock() + defer fake.storeRoomInternalMutex.RUnlock() + return len(fake.storeRoomInternalArgsForCall) +} + +func (fake *FakeObjectStore) StoreRoomInternalCalls(stub func(context.Context, livekit.RoomName, *livekit.RoomInternal) error) { + fake.storeRoomInternalMutex.Lock() + defer fake.storeRoomInternalMutex.Unlock() + fake.StoreRoomInternalStub = stub +} + +func (fake *FakeObjectStore) StoreRoomInternalArgsForCall(i int) (context.Context, livekit.RoomName, *livekit.RoomInternal) { + fake.storeRoomInternalMutex.RLock() + defer fake.storeRoomInternalMutex.RUnlock() + argsForCall := fake.storeRoomInternalArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeObjectStore) StoreRoomInternalReturns(result1 error) { + fake.storeRoomInternalMutex.Lock() + defer fake.storeRoomInternalMutex.Unlock() + fake.StoreRoomInternalStub = nil + fake.storeRoomInternalReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeObjectStore) StoreRoomInternalReturnsOnCall(i int, result1 error) { + fake.storeRoomInternalMutex.Lock() + defer fake.storeRoomInternalMutex.Unlock() + fake.StoreRoomInternalStub = nil + if fake.storeRoomInternalReturnsOnCall == nil { + fake.storeRoomInternalReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.storeRoomInternalReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeObjectStore) UnlockRoom(arg1 context.Context, arg2 livekit.RoomName, arg3 string) error { fake.unlockRoomMutex.Lock() ret, specificReturn := fake.unlockRoomReturnsOnCall[len(fake.unlockRoomArgsForCall)] @@ -810,12 +965,16 @@ func (fake *FakeObjectStore) Invocations() map[string][][]interface{} { defer fake.loadParticipantMutex.RUnlock() fake.loadRoomMutex.RLock() defer fake.loadRoomMutex.RUnlock() + fake.loadRoomInternalMutex.RLock() + defer fake.loadRoomInternalMutex.RUnlock() fake.lockRoomMutex.RLock() defer fake.lockRoomMutex.RUnlock() fake.storeParticipantMutex.RLock() defer fake.storeParticipantMutex.RUnlock() fake.storeRoomMutex.RLock() defer fake.storeRoomMutex.RUnlock() + fake.storeRoomInternalMutex.RLock() + defer fake.storeRoomInternalMutex.RUnlock() fake.unlockRoomMutex.RLock() defer fake.unlockRoomMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/pkg/service/servicefakes/fake_service_store.go b/pkg/service/servicefakes/fake_service_store.go index 2f8c4759f..b37b493d2 100644 --- a/pkg/service/servicefakes/fake_service_store.go +++ b/pkg/service/servicefakes/fake_service_store.go @@ -67,6 +67,20 @@ type FakeServiceStore struct { result1 *livekit.Room result2 error } + LoadRoomInternalStub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error) + loadRoomInternalMutex sync.RWMutex + loadRoomInternalArgsForCall []struct { + arg1 context.Context + arg2 livekit.RoomName + } + loadRoomInternalReturns struct { + result1 *livekit.RoomInternal + result2 error + } + loadRoomInternalReturnsOnCall map[int]struct { + result1 *livekit.RoomInternal + result2 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -337,6 +351,71 @@ func (fake *FakeServiceStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room }{result1, result2} } +func (fake *FakeServiceStore) LoadRoomInternal(arg1 context.Context, arg2 livekit.RoomName) (*livekit.RoomInternal, error) { + fake.loadRoomInternalMutex.Lock() + ret, specificReturn := fake.loadRoomInternalReturnsOnCall[len(fake.loadRoomInternalArgsForCall)] + fake.loadRoomInternalArgsForCall = append(fake.loadRoomInternalArgsForCall, struct { + arg1 context.Context + arg2 livekit.RoomName + }{arg1, arg2}) + stub := fake.LoadRoomInternalStub + fakeReturns := fake.loadRoomInternalReturns + fake.recordInvocation("LoadRoomInternal", []interface{}{arg1, arg2}) + fake.loadRoomInternalMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeServiceStore) LoadRoomInternalCallCount() int { + fake.loadRoomInternalMutex.RLock() + defer fake.loadRoomInternalMutex.RUnlock() + return len(fake.loadRoomInternalArgsForCall) +} + +func (fake *FakeServiceStore) LoadRoomInternalCalls(stub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error)) { + fake.loadRoomInternalMutex.Lock() + defer fake.loadRoomInternalMutex.Unlock() + fake.LoadRoomInternalStub = stub +} + +func (fake *FakeServiceStore) LoadRoomInternalArgsForCall(i int) (context.Context, livekit.RoomName) { + fake.loadRoomInternalMutex.RLock() + defer fake.loadRoomInternalMutex.RUnlock() + argsForCall := fake.loadRoomInternalArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeServiceStore) LoadRoomInternalReturns(result1 *livekit.RoomInternal, result2 error) { + fake.loadRoomInternalMutex.Lock() + defer fake.loadRoomInternalMutex.Unlock() + fake.LoadRoomInternalStub = nil + fake.loadRoomInternalReturns = struct { + result1 *livekit.RoomInternal + result2 error + }{result1, result2} +} + +func (fake *FakeServiceStore) LoadRoomInternalReturnsOnCall(i int, result1 *livekit.RoomInternal, result2 error) { + fake.loadRoomInternalMutex.Lock() + defer fake.loadRoomInternalMutex.Unlock() + fake.LoadRoomInternalStub = nil + if fake.loadRoomInternalReturnsOnCall == nil { + fake.loadRoomInternalReturnsOnCall = make(map[int]struct { + result1 *livekit.RoomInternal + result2 error + }) + } + fake.loadRoomInternalReturnsOnCall[i] = struct { + result1 *livekit.RoomInternal + result2 error + }{result1, result2} +} + func (fake *FakeServiceStore) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -348,6 +427,8 @@ func (fake *FakeServiceStore) Invocations() map[string][][]interface{} { defer fake.loadParticipantMutex.RUnlock() fake.loadRoomMutex.RLock() defer fake.loadRoomMutex.RUnlock() + fake.loadRoomInternalMutex.RLock() + defer fake.loadRoomInternalMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 00a9939b0..be6f8cf04 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -45,6 +45,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live telemetry.NewTelemetryService, egress.NewRedisRPCClient, getEgressStore, + NewEgressLauncher, NewEgressService, ingress.NewRedisRPC, getIngressStore, diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 4cb6b7cdd..b6da2453c 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -34,6 +34,7 @@ import ( // Injectors from wire.go: func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) { + roomConfig := getRoomConf(conf) client, err := createRedisClient(conf) if err != nil { return nil, err @@ -44,11 +45,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - roomConfig := getRoomConf(conf) - roomService, err := NewRoomService(roomAllocator, objectStore, router, roomConfig) - if err != nil { - return nil, err - } nodeID := getNodeID(currentNode) rpcClient := egress.NewRedisRPCClient(nodeID, client) egressStore := getEgressStore(objectStore) @@ -62,7 +58,12 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } analyticsService := telemetry.NewAnalyticsService(conf, currentNode) telemetryService := telemetry.NewTelemetryService(notifier, analyticsService) - egressService := NewEgressService(rpcClient, objectStore, egressStore, roomService, telemetryService) + rtcEgressLauncher := NewEgressLauncher(rpcClient, egressStore, telemetryService) + roomService, err := NewRoomService(roomConfig, router, roomAllocator, objectStore, rtcEgressLauncher) + if err != nil { + return nil, err + } + egressService := NewEgressService(rpcClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher) ingressConfig := getIngressConfig(conf) rpc := ingress.NewRedisRPC(nodeID, client) ingressRPCClient := getIngressRPCClient(rpc) @@ -70,7 +71,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live ingressService := NewIngressService(ingressConfig, ingressRPCClient, ingressStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode) clientConfigurationManager := createClientConfiguration() - roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager) + roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher) if err != nil { return nil, err } diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go new file mode 100644 index 000000000..e916cb973 --- /dev/null +++ b/pkg/telemetry/events.go @@ -0,0 +1,339 @@ +package telemetry + +import ( + "context" + "errors" + "time" + + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" + "github.com/livekit/protocol/webhook" +) + +func (t *telemetryService) NotifyEvent(ctx context.Context, event *livekit.WebhookEvent) { + if t.notifier == nil { + logger.Warnw("failed to notify webhook", errors.New("no notifier"), "event", event.Event) + return + } + + event.CreatedAt = time.Now().Unix() + event.Id = utils.NewGuid("EV_") + + t.webhookPool.Submit(func() { + if err := t.notifier.Notify(ctx, event); err != nil { + logger.Warnw("failed to notify webhook", err, "event", event.Event) + } + }) +} + +func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) { + t.enqueue(func() { + t.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventRoomStarted, + Room: room, + }) + + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_ROOM_CREATED, + Timestamp: ×tamppb.Timestamp{Seconds: room.CreationTime}, + Room: room, + }) + }) +} + +func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { + t.enqueue(func() { + t.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventRoomFinished, + Room: room, + }) + + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_ROOM_ENDED, + Timestamp: timestamppb.Now(), + RoomId: room.Sid, + Room: room, + }) + }) +} + +func (t *telemetryService) ParticipantJoined( + ctx context.Context, + room *livekit.Room, + participant *livekit.ParticipantInfo, + clientInfo *livekit.ClientInfo, + clientMeta *livekit.AnalyticsClientMeta, +) { + t.enqueue(func() { + prometheus.IncrementParticipantJoin(1) + prometheus.AddParticipant() + + worker := newStatsWorker( + ctx, + t, + livekit.RoomID(room.Sid), + livekit.RoomName(room.Name), + livekit.ParticipantID(participant.Sid), + livekit.ParticipantIdentity(participant.Identity), + ) + + t.lock.Lock() + t.workers[livekit.ParticipantID(participant.Sid)] = worker + t.lock.Unlock() + + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED, + Timestamp: timestamppb.Now(), + RoomId: room.Sid, + ParticipantId: participant.Sid, + Participant: participant, + Room: room, + ClientInfo: clientInfo, + ClientMeta: clientMeta, + }) + }) +} + +func (t *telemetryService) ParticipantActive( + ctx context.Context, + room *livekit.Room, + participant *livekit.ParticipantInfo, + clientMeta *livekit.AnalyticsClientMeta, +) { + t.enqueue(func() { + // consider participant joined only when they became active + t.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventParticipantJoined, + Room: room, + Participant: participant, + }) + + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_PARTICIPANT_ACTIVE, + Timestamp: timestamppb.Now(), + RoomId: room.Sid, + ParticipantId: participant.Sid, + Room: room, + ClientMeta: clientMeta, + }) + }) +} + +func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { + t.enqueue(func() { + if worker, ok := t.getWorker(livekit.ParticipantID(participant.Sid)); ok { + worker.Close() + } + + prometheus.SubParticipant() + + t.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventParticipantLeft, + Room: room, + Participant: participant, + }) + + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_PARTICIPANT_LEFT, + Timestamp: timestamppb.Now(), + RoomId: room.Sid, + ParticipantId: participant.Sid, + Participant: participant, + Room: room, + }) + }) +} + +func (t *telemetryService) TrackPublished( + ctx context.Context, + participantID livekit.ParticipantID, + identity livekit.ParticipantIdentity, + track *livekit.TrackInfo, +) { + t.enqueue(func() { + prometheus.AddPublishedTrack(track.Type.String()) + + roomID, roomName := t.getRoomDetails(participantID) + t.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventTrackPublished, + Room: &livekit.Room{ + Sid: string(roomID), + Name: string(roomName), + }, + Participant: &livekit.ParticipantInfo{ + Sid: string(participantID), + Identity: string(identity), + }, + Track: track, + }) + + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_TRACK_PUBLISHED, + Timestamp: timestamppb.Now(), + RoomId: string(roomID), + ParticipantId: string(participantID), + Participant: &livekit.ParticipantInfo{ + Sid: string(participantID), + Identity: string(identity), + }, + Track: track, + Room: &livekit.Room{Name: string(roomName)}, + }) + }) +} + +func (t *telemetryService) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { + t.enqueue(func() { + roomID, roomName := t.getRoomDetails(participantID) + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_TRACK_PUBLISHED_UPDATE, + Timestamp: timestamppb.Now(), + RoomId: string(roomID), + ParticipantId: string(participantID), + Track: track, + Room: &livekit.Room{Name: string(roomName)}, + }) + }) +} + +func (t *telemetryService) TrackMaxSubscribedVideoQuality( + ctx context.Context, + participantID livekit.ParticipantID, + track *livekit.TrackInfo, + mime string, + maxQuality livekit.VideoQuality, +) { + t.enqueue(func() { + roomID, roomName := t.getRoomDetails(participantID) + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_TRACK_MAX_SUBSCRIBED_VIDEO_QUALITY, + Timestamp: timestamppb.Now(), + RoomId: string(roomID), + ParticipantId: string(participantID), + Track: track, + Room: &livekit.Room{Name: string(roomName)}, + MaxSubscribedVideoQuality: maxQuality, + Mime: mime, + }) + }) +} + +func (t *telemetryService) TrackSubscribed( + ctx context.Context, + participantID livekit.ParticipantID, + track *livekit.TrackInfo, + publisher *livekit.ParticipantInfo, +) { + t.enqueue(func() { + prometheus.AddSubscribedTrack(track.Type.String()) + + roomID, roomName := t.getRoomDetails(participantID) + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_TRACK_SUBSCRIBED, + Timestamp: timestamppb.Now(), + RoomId: string(roomID), + ParticipantId: string(participantID), + Track: track, + Room: &livekit.Room{Name: string(roomName)}, + Publisher: publisher, + }) + }) +} + +func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { + t.enqueue(func() { + prometheus.SubSubscribedTrack(track.Type.String()) + + roomID, roomName := t.getRoomDetails(participantID) + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_TRACK_UNSUBSCRIBED, + Timestamp: timestamppb.Now(), + RoomId: string(roomID), + ParticipantId: string(participantID), + TrackId: track.Sid, + Room: &livekit.Room{Name: string(roomName)}, + }) + }) +} + +func (t *telemetryService) TrackUnpublished( + ctx context.Context, + participantID livekit.ParticipantID, + identity livekit.ParticipantIdentity, + track *livekit.TrackInfo, + ssrc uint32, +) { + t.enqueue(func() { + roomID, roomName := t.getRoomDetails(participantID) + + prometheus.SubPublishedTrack(track.Type.String()) + + t.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventTrackUnpublished, + Room: &livekit.Room{ + Sid: string(roomID), + Name: string(roomName), + }, + Participant: &livekit.ParticipantInfo{ + Sid: string(participantID), + Identity: string(identity), + }, + Track: track, + }) + + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_TRACK_UNPUBLISHED, + Timestamp: timestamppb.Now(), + RoomId: string(roomID), + ParticipantId: string(participantID), + TrackId: track.Sid, + Room: &livekit.Room{Name: string(roomName)}, + }) + }) +} + +func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) { + t.enqueue(func() { + t.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventEgressStarted, + EgressInfo: info, + }) + + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_EGRESS_STARTED, + Timestamp: timestamppb.Now(), + EgressId: info.EgressId, + RoomId: info.RoomId, + Egress: info, + }) + }) +} + +func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) { + t.enqueue(func() { + t.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventEgressEnded, + EgressInfo: info, + }) + + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_EGRESS_ENDED, + Timestamp: timestamppb.Now(), + EgressId: info.EgressId, + RoomId: info.RoomId, + Egress: info, + }) + }) +} + +func (t *telemetryService) getRoomDetails(participantID livekit.ParticipantID) (livekit.RoomID, livekit.RoomName) { + if worker, ok := t.getWorker(participantID); ok { + return worker.roomID, worker.roomName + } + + return "", "" +} diff --git a/pkg/telemetry/test/telemetry_service_events_test.go b/pkg/telemetry/events_test.go similarity index 96% rename from pkg/telemetry/test/telemetry_service_events_test.go rename to pkg/telemetry/events_test.go index 0fafce5d8..bdb008dc3 100644 --- a/pkg/telemetry/test/telemetry_service_events_test.go +++ b/pkg/telemetry/events_test.go @@ -1,8 +1,9 @@ -package telemetrytest +package telemetry_test import ( "context" "testing" + "time" "github.com/stretchr/testify/require" @@ -34,6 +35,7 @@ func Test_OnParticipantJoin_EventIsSent(t *testing.T) { // do fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta) + time.Sleep(time.Millisecond * 500) // test require.Equal(t, 1, fixture.analytics.SendEventCallCount()) @@ -68,6 +70,7 @@ func Test_OnParticipantLeft_EventIsSent(t *testing.T) { // do fixture.sut.ParticipantLeft(context.Background(), room, participantInfo) + time.Sleep(time.Millisecond * 500) // test require.Equal(t, 1, fixture.analytics.SendEventCallCount()) @@ -102,6 +105,7 @@ func Test_OnTrackUpdate_EventIsSent(t *testing.T) { // do fixture.sut.TrackPublishedUpdate(context.Background(), livekit.ParticipantID(partID), trackInfo) + time.Sleep(time.Millisecond * 500) // test require.Equal(t, 1, fixture.analytics.SendEventCallCount()) @@ -142,6 +146,7 @@ func Test_OnParticipantActive_EventIsSent(t *testing.T) { // do fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta) + time.Sleep(time.Millisecond * 500) // test require.Equal(t, 1, fixture.analytics.SendEventCallCount()) @@ -154,6 +159,7 @@ func Test_OnParticipantActive_EventIsSent(t *testing.T) { } fixture.sut.ParticipantActive(context.Background(), room, participantInfo, clientMetaConnect) + time.Sleep(time.Millisecond * 500) require.Equal(t, 2, fixture.analytics.SendEventCallCount()) _, eventActive := fixture.analytics.SendEventArgsForCall(1) @@ -192,16 +198,16 @@ func Test_OnTrackSubscribed_EventIsSent(t *testing.T) { // do fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta) + time.Sleep(time.Millisecond * 500) // test require.Equal(t, 1, fixture.analytics.SendEventCallCount()) _, event := fixture.analytics.SendEventArgsForCall(0) require.Equal(t, room, event.Room) - // test // do - fixture.sut.TrackSubscribed(context.Background(), livekit.ParticipantID(partSID), trackInfo, publisherInfo) + time.Sleep(time.Millisecond * 500) require.Equal(t, 2, fixture.analytics.SendEventCallCount()) _, eventTrackSubscribed := fixture.analytics.SendEventArgsForCall(1) diff --git a/pkg/telemetry/stats.go b/pkg/telemetry/stats.go new file mode 100644 index 000000000..acc007f9a --- /dev/null +++ b/pkg/telemetry/stats.go @@ -0,0 +1,51 @@ +package telemetry + +import ( + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" + "github.com/livekit/protocol/livekit" +) + +func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat) { + t.enqueue(func() { + direction := prometheus.Incoming + if streamType == livekit.StreamType_DOWNSTREAM { + direction = prometheus.Outgoing + } + + nacks := uint32(0) + plis := uint32(0) + firs := uint32(0) + packets := uint32(0) + bytes := uint64(0) + retransmitBytes := uint64(0) + retransmitPackets := uint32(0) + for _, stream := range stat.Streams { + nacks += stream.Nacks + plis += stream.Plis + firs += stream.Firs + packets += stream.PrimaryPackets + stream.PaddingPackets + bytes += stream.PrimaryBytes + stream.PaddingBytes + if streamType == livekit.StreamType_DOWNSTREAM { + retransmitPackets += stream.RetransmitPackets + retransmitBytes += stream.RetransmitBytes + } else { + // for upstream, we don't account for these separately for now + packets += stream.RetransmitPackets + bytes += stream.RetransmitBytes + } + } + prometheus.IncrementRTCP(direction, nacks, plis, firs) + prometheus.IncrementPackets(direction, uint64(packets), false) + prometheus.IncrementBytes(direction, bytes, false) + if retransmitPackets != 0 { + prometheus.IncrementPackets(direction, uint64(retransmitPackets), true) + } + if retransmitBytes != 0 { + prometheus.IncrementBytes(direction, retransmitBytes, true) + } + + if worker, ok := t.getWorker(participantID); ok { + worker.OnTrackStat(trackID, streamType, stat) + } + }) +} diff --git a/pkg/telemetry/test/telemetry_service_test.go b/pkg/telemetry/stats_test.go similarity index 96% rename from pkg/telemetry/test/telemetry_service_test.go rename to pkg/telemetry/stats_test.go index 0fed53dc4..65029c429 100644 --- a/pkg/telemetry/test/telemetry_service_test.go +++ b/pkg/telemetry/stats_test.go @@ -1,14 +1,15 @@ -package telemetrytest +package telemetry_test import ( "context" "testing" + "time" "github.com/stretchr/testify/require" + "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/livekit" - "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes" ) @@ -18,14 +19,14 @@ func init() { } type telemetryServiceFixture struct { - sut telemetry.TelemetryServiceInternal + sut telemetry.TelemetryService analytics *telemetryfakes.FakeAnalyticsService } func createFixture() *telemetryServiceFixture { fixture := &telemetryServiceFixture{} fixture.analytics = &telemetryfakes.FakeAnalyticsService{} - fixture.sut = telemetry.NewTelemetryServiceInternal(nil, fixture.analytics) + fixture.sut = telemetry.NewTelemetryService(nil, fixture.analytics) return fixture } @@ -43,7 +44,9 @@ func Test_ParticipantAndRoomDataAreSentWithAnalytics(t *testing.T) { packet := 33 stat := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet)}}} fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, "", stat) - fixture.sut.SendAnalytics() + + // flush + fixture.flush() // test require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) @@ -74,7 +77,9 @@ func Test_OnDownstreamPackets(t *testing.T) { stat := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packets[i]), PrimaryPackets: uint32(1)}}} fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat) } - fixture.sut.SendAnalytics() + + // flush + fixture.flush() // test require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) @@ -106,7 +111,9 @@ func Test_OnDownstreamPackets_SeveralTracks(t *testing.T) { trackID2 := livekit.TrackID("trackID2") stat2 := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet2), PrimaryPackets: 1}}} fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID2, stat2) - fixture.sut.SendAnalytics() + + // flush + fixture.flush() // test require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) @@ -172,7 +179,8 @@ func Test_OnDownStreamStat(t *testing.T) { } fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat2) - fixture.sut.SendAnalytics() + // flush + fixture.flush() // test require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) @@ -209,7 +217,9 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) { }, } fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat1) // there should be bytes reported so that stats are sent - fixture.sut.SendAnalytics() + + // flush + fixture.flush() stat2 := &livekit.AnalyticsStat{ Streams: []*livekit.AnalyticsStream{ @@ -221,10 +231,12 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) { }, } fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat2) - fixture.sut.SendAnalytics() + + // flush + fixture.flush() // test - require.Equal(t, 2, fixture.analytics.SendStatsCallCount()) // 2 calls to fixture.sut.SendAnalytics() + require.Equal(t, 2, fixture.analytics.SendStatsCallCount()) // 2 calls to fixture.sut.FlushStats() _, stats := fixture.analytics.SendStatsArgsForCall(0) require.Equal(t, 1, len(stats)) require.Equal(t, livekit.StreamType_DOWNSTREAM, stats[0].Kind) @@ -279,7 +291,9 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) { }, } fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID2, stat3) - fixture.sut.SendAnalytics() + + // flush + fixture.flush() // test require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) @@ -346,7 +360,9 @@ func Test_OnUpstreamStat(t *testing.T) { }, } fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat2) - fixture.sut.SendAnalytics() + + // flush + fixture.flush() // test require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) @@ -413,7 +429,9 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) { }, } fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID2, stat3) - fixture.sut.SendAnalytics() + + // flush + fixture.flush() // test require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) @@ -440,7 +458,10 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) { // remove 1 track - track stats were flushed above, so no more calls to SendStats fixture.sut.TrackUnpublished(context.Background(), partSID, identity, &livekit.TrackInfo{Sid: string(trackID2)}, 0) - fixture.sut.SendAnalytics() + + // flush + fixture.flush() + require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) } @@ -457,6 +478,7 @@ func Test_AnalyticsSentWhenParticipantLeaves(t *testing.T) { fixture.sut.ParticipantLeft(context.Background(), room, participantInfo) // should not be called if there are no track stats + time.Sleep(time.Millisecond * 500) require.Equal(t, 0, fixture.analytics.SendStatsCallCount()) } @@ -483,7 +505,9 @@ func Test_AddUpTrack(t *testing.T) { } trackID := livekit.TrackID("trackID") fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat) - fixture.sut.SendAnalytics() + + // flush + fixture.flush() // test require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) @@ -503,9 +527,9 @@ func Test_AddUpTrack_SeveralBuffers_Simulcast(t *testing.T) { partSID := livekit.ParticipantID("part1") participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)} fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil) + // do trackID := livekit.TrackID("trackID") - stat1 := &livekit.AnalyticsStat{ Streams: []*livekit.AnalyticsStream{ { @@ -519,7 +543,9 @@ func Test_AddUpTrack_SeveralBuffers_Simulcast(t *testing.T) { }, } fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat1) - fixture.sut.SendAnalytics() + + // flush + fixture.flush() // test require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) @@ -562,7 +588,9 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) { }, } fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, "trackID1", stat2) - fixture.sut.SendAnalytics() + + // flush + fixture.flush() // test require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) @@ -571,3 +599,8 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) { require.Equal(t, livekit.StreamType_UPSTREAM, stats[0].Kind) require.Equal(t, livekit.StreamType_DOWNSTREAM, stats[1].Kind) } + +func (f *telemetryServiceFixture) flush() { + time.Sleep(time.Millisecond * 500) + f.sut.FlushStats() +} diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index 59a179628..79f834cce 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -15,7 +15,7 @@ import ( // StatsWorker handles participant stats type StatsWorker struct { ctx context.Context - t TelemetryReporter + t TelemetryService roomID livekit.RoomID roomName livekit.RoomName participantID livekit.ParticipantID @@ -29,7 +29,7 @@ type StatsWorker struct { func newStatsWorker( ctx context.Context, - t TelemetryReporter, + t TelemetryService, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, @@ -58,7 +58,11 @@ func (s *StatsWorker) OnTrackStat(trackID livekit.TrackID, direction livekit.Str s.lock.Unlock() } -func (s *StatsWorker) Update() { +func (s *StatsWorker) ParticipantID() livekit.ParticipantID { + return s.participantID +} + +func (s *StatsWorker) Flush() { ts := timestamppb.Now() s.lock.Lock() @@ -79,57 +83,12 @@ func (s *StatsWorker) Update() { stats = s.collectStats(ts, livekit.StreamType_UPSTREAM, incomingPerTrack, stats) stats = s.collectStats(ts, livekit.StreamType_DOWNSTREAM, outgoingPerTrack, stats) if len(stats) > 0 { - s.t.Report(s.ctx, stats) + s.t.SendStats(s.ctx, stats) } } -func (s *StatsWorker) collectStats( - ts *timestamppb.Timestamp, - streamType livekit.StreamType, - perTrack map[livekit.TrackID][]*livekit.AnalyticsStat, - stats []*livekit.AnalyticsStat, -) []*livekit.AnalyticsStat { - for trackID, analyticsStats := range perTrack { - analyticsStat := s.getDeltaStats(analyticsStats, ts, trackID, streamType) - if analyticsStat != nil { - stats = append(stats, analyticsStat) - } - } - return stats -} - -func (s *StatsWorker) getDeltaStats( - stats []*livekit.AnalyticsStat, - ts *timestamppb.Timestamp, - trackID livekit.TrackID, - kind livekit.StreamType, -) *livekit.AnalyticsStat { - // merge all streams stats of track - analyticsStat := coalesce(stats) - if analyticsStat == nil { - return nil - } - - s.patch(analyticsStat, ts, trackID, kind) - return analyticsStat -} - -func (s *StatsWorker) patch( - analyticsStat *livekit.AnalyticsStat, - ts *timestamppb.Timestamp, - trackID livekit.TrackID, - kind livekit.StreamType, -) { - analyticsStat.TimeStamp = ts - analyticsStat.TrackId = string(trackID) - analyticsStat.Kind = kind - analyticsStat.RoomId = string(s.roomID) - analyticsStat.ParticipantId = string(s.participantID) - analyticsStat.RoomName = string(s.roomName) -} - func (s *StatsWorker) Close() { - s.Update() + s.Flush() s.lock.Lock() s.closedAt = time.Now() @@ -143,12 +102,31 @@ func (s *StatsWorker) ClosedAt() time.Time { return s.closedAt } -func (s *StatsWorker) ParticipantID() livekit.ParticipantID { - return s.participantID -} - // ------------------------------------------------------------------------- +func (s *StatsWorker) collectStats( + ts *timestamppb.Timestamp, + streamType livekit.StreamType, + perTrack map[livekit.TrackID][]*livekit.AnalyticsStat, + stats []*livekit.AnalyticsStat, +) []*livekit.AnalyticsStat { + for trackID, analyticsStats := range perTrack { + coalesced := coalesce(analyticsStats) + if coalesced == nil { + continue + } + + coalesced.TimeStamp = ts + coalesced.TrackId = string(trackID) + coalesced.Kind = streamType + coalesced.RoomId = string(s.roomID) + coalesced.ParticipantId = string(s.participantID) + coalesced.RoomName = string(s.roomName) + stats = append(stats, coalesced) + } + return stats +} + // create a single stream and single video layer post aggregation func coalesce(stats []*livekit.AnalyticsStat) *livekit.AnalyticsStat { if len(stats) == 0 { diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index 5f6785509..c8eba10f1 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -22,6 +22,16 @@ type FakeTelemetryService struct { arg1 context.Context arg2 *livekit.EgressInfo } + FlushStatsStub func() + flushStatsMutex sync.RWMutex + flushStatsArgsForCall []struct { + } + NotifyEventStub func(context.Context, *livekit.WebhookEvent) + notifyEventMutex sync.RWMutex + notifyEventArgsForCall []struct { + arg1 context.Context + arg2 *livekit.WebhookEvent + } ParticipantActiveStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta) participantActiveMutex sync.RWMutex participantActiveArgsForCall []struct { @@ -58,6 +68,18 @@ type FakeTelemetryService struct { arg1 context.Context arg2 *livekit.Room } + SendEventStub func(context.Context, *livekit.AnalyticsEvent) + sendEventMutex sync.RWMutex + sendEventArgsForCall []struct { + arg1 context.Context + arg2 *livekit.AnalyticsEvent + } + SendStatsStub func(context.Context, []*livekit.AnalyticsStat) + sendStatsMutex sync.RWMutex + sendStatsArgsForCall []struct { + arg1 context.Context + arg2 []*livekit.AnalyticsStat + } TrackMaxSubscribedVideoQualityStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo, string, livekit.VideoQuality) trackMaxSubscribedVideoQualityMutex sync.RWMutex trackMaxSubscribedVideoQualityArgsForCall []struct { @@ -184,6 +206,63 @@ func (fake *FakeTelemetryService) EgressStartedArgsForCall(i int) (context.Conte return argsForCall.arg1, argsForCall.arg2 } +func (fake *FakeTelemetryService) FlushStats() { + fake.flushStatsMutex.Lock() + fake.flushStatsArgsForCall = append(fake.flushStatsArgsForCall, struct { + }{}) + stub := fake.FlushStatsStub + fake.recordInvocation("FlushStats", []interface{}{}) + fake.flushStatsMutex.Unlock() + if stub != nil { + fake.FlushStatsStub() + } +} + +func (fake *FakeTelemetryService) FlushStatsCallCount() int { + fake.flushStatsMutex.RLock() + defer fake.flushStatsMutex.RUnlock() + return len(fake.flushStatsArgsForCall) +} + +func (fake *FakeTelemetryService) FlushStatsCalls(stub func()) { + fake.flushStatsMutex.Lock() + defer fake.flushStatsMutex.Unlock() + fake.FlushStatsStub = stub +} + +func (fake *FakeTelemetryService) NotifyEvent(arg1 context.Context, arg2 *livekit.WebhookEvent) { + fake.notifyEventMutex.Lock() + fake.notifyEventArgsForCall = append(fake.notifyEventArgsForCall, struct { + arg1 context.Context + arg2 *livekit.WebhookEvent + }{arg1, arg2}) + stub := fake.NotifyEventStub + fake.recordInvocation("NotifyEvent", []interface{}{arg1, arg2}) + fake.notifyEventMutex.Unlock() + if stub != nil { + fake.NotifyEventStub(arg1, arg2) + } +} + +func (fake *FakeTelemetryService) NotifyEventCallCount() int { + fake.notifyEventMutex.RLock() + defer fake.notifyEventMutex.RUnlock() + return len(fake.notifyEventArgsForCall) +} + +func (fake *FakeTelemetryService) NotifyEventCalls(stub func(context.Context, *livekit.WebhookEvent)) { + fake.notifyEventMutex.Lock() + defer fake.notifyEventMutex.Unlock() + fake.NotifyEventStub = stub +} + +func (fake *FakeTelemetryService) NotifyEventArgsForCall(i int) (context.Context, *livekit.WebhookEvent) { + fake.notifyEventMutex.RLock() + defer fake.notifyEventMutex.RUnlock() + argsForCall := fake.notifyEventArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeTelemetryService) ParticipantActive(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.AnalyticsClientMeta) { fake.participantActiveMutex.Lock() fake.participantActiveArgsForCall = append(fake.participantActiveArgsForCall, struct { @@ -355,6 +434,77 @@ func (fake *FakeTelemetryService) RoomStartedArgsForCall(i int) (context.Context return argsForCall.arg1, argsForCall.arg2 } +func (fake *FakeTelemetryService) SendEvent(arg1 context.Context, arg2 *livekit.AnalyticsEvent) { + fake.sendEventMutex.Lock() + fake.sendEventArgsForCall = append(fake.sendEventArgsForCall, struct { + arg1 context.Context + arg2 *livekit.AnalyticsEvent + }{arg1, arg2}) + stub := fake.SendEventStub + fake.recordInvocation("SendEvent", []interface{}{arg1, arg2}) + fake.sendEventMutex.Unlock() + if stub != nil { + fake.SendEventStub(arg1, arg2) + } +} + +func (fake *FakeTelemetryService) SendEventCallCount() int { + fake.sendEventMutex.RLock() + defer fake.sendEventMutex.RUnlock() + return len(fake.sendEventArgsForCall) +} + +func (fake *FakeTelemetryService) SendEventCalls(stub func(context.Context, *livekit.AnalyticsEvent)) { + fake.sendEventMutex.Lock() + defer fake.sendEventMutex.Unlock() + fake.SendEventStub = stub +} + +func (fake *FakeTelemetryService) SendEventArgsForCall(i int) (context.Context, *livekit.AnalyticsEvent) { + fake.sendEventMutex.RLock() + defer fake.sendEventMutex.RUnlock() + argsForCall := fake.sendEventArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeTelemetryService) SendStats(arg1 context.Context, arg2 []*livekit.AnalyticsStat) { + var arg2Copy []*livekit.AnalyticsStat + if arg2 != nil { + arg2Copy = make([]*livekit.AnalyticsStat, len(arg2)) + copy(arg2Copy, arg2) + } + fake.sendStatsMutex.Lock() + fake.sendStatsArgsForCall = append(fake.sendStatsArgsForCall, struct { + arg1 context.Context + arg2 []*livekit.AnalyticsStat + }{arg1, arg2Copy}) + stub := fake.SendStatsStub + fake.recordInvocation("SendStats", []interface{}{arg1, arg2Copy}) + fake.sendStatsMutex.Unlock() + if stub != nil { + fake.SendStatsStub(arg1, arg2) + } +} + +func (fake *FakeTelemetryService) SendStatsCallCount() int { + fake.sendStatsMutex.RLock() + defer fake.sendStatsMutex.RUnlock() + return len(fake.sendStatsArgsForCall) +} + +func (fake *FakeTelemetryService) SendStatsCalls(stub func(context.Context, []*livekit.AnalyticsStat)) { + fake.sendStatsMutex.Lock() + defer fake.sendStatsMutex.Unlock() + fake.SendStatsStub = stub +} + +func (fake *FakeTelemetryService) SendStatsArgsForCall(i int) (context.Context, []*livekit.AnalyticsStat) { + fake.sendStatsMutex.RLock() + defer fake.sendStatsMutex.RUnlock() + argsForCall := fake.sendStatsArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeTelemetryService) TrackMaxSubscribedVideoQuality(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo, arg4 string, arg5 livekit.VideoQuality) { fake.trackMaxSubscribedVideoQualityMutex.Lock() fake.trackMaxSubscribedVideoQualityArgsForCall = append(fake.trackMaxSubscribedVideoQualityArgsForCall, struct { @@ -607,6 +757,10 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} { defer fake.egressEndedMutex.RUnlock() fake.egressStartedMutex.RLock() defer fake.egressStartedMutex.RUnlock() + fake.flushStatsMutex.RLock() + defer fake.flushStatsMutex.RUnlock() + fake.notifyEventMutex.RLock() + defer fake.notifyEventMutex.RUnlock() fake.participantActiveMutex.RLock() defer fake.participantActiveMutex.RUnlock() fake.participantJoinedMutex.RLock() @@ -617,6 +771,10 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} { defer fake.roomEndedMutex.RUnlock() fake.roomStartedMutex.RLock() defer fake.roomStartedMutex.RUnlock() + fake.sendEventMutex.RLock() + defer fake.sendEventMutex.RUnlock() + fake.sendStatsMutex.RLock() + defer fake.sendStatsMutex.RUnlock() fake.trackMaxSubscribedVideoQualityMutex.RLock() defer fake.trackMaxSubscribedVideoQualityMutex.RUnlock() fake.trackPublishedMutex.RLock() diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 47602370f..cf585dba2 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -2,8 +2,11 @@ package telemetry import ( "context" + "sync" "time" + "github.com/gammazero/workerpool" + "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -29,20 +32,38 @@ type TelemetryService interface { TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, mime string, maxQuality livekit.VideoQuality) EgressStarted(ctx context.Context, info *livekit.EgressInfo) EgressEnded(ctx context.Context, info *livekit.EgressInfo) + + // helpers + AnalyticsService + NotifyEvent(ctx context.Context, event *livekit.WebhookEvent) + FlushStats() } +const ( + maxWebhookWorkers = 50 + workerCleanupWait = 3 * time.Minute + jobQueueBufferSize = 10000 +) + type telemetryService struct { - internalService TelemetryServiceInternal - jobsChan chan func() -} + AnalyticsService -// queue should be sufficiently large to avoid blocking -const jobQueueBufferSize = 10000 + notifier webhook.Notifier + webhookPool *workerpool.WorkerPool + jobsChan chan func() + + lock sync.RWMutex + workers map[livekit.ParticipantID]*StatsWorker +} func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService { t := &telemetryService{ - internalService: NewTelemetryServiceInternal(notifier, analytics), - jobsChan: make(chan func(), jobQueueBufferSize), + AnalyticsService: analytics, + + notifier: notifier, + webhookPool: workerpool.New(maxWebhookWorkers), + jobsChan: make(chan func(), jobQueueBufferSize), + workers: make(map[livekit.ParticipantID]*StatsWorker), } go t.run() @@ -50,6 +71,15 @@ func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) return t } +func (t *telemetryService) FlushStats() { + t.lock.Lock() + defer t.lock.Unlock() + + for _, worker := range t.workers { + worker.Flush() + } +} + func (t *telemetryService) run() { ticker := time.NewTicker(config.StatsUpdateInterval) defer ticker.Stop() @@ -60,9 +90,9 @@ func (t *telemetryService) run() { for { select { case <-ticker.C: - t.internalService.SendAnalytics() + t.FlushStats() case <-cleanupTicker.C: - t.internalService.CleanupWorkers() + t.cleanupWorkers() case op := <-t.jobsChan: op() } @@ -78,87 +108,23 @@ func (t *telemetryService) enqueue(op func()) { } } -func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stats *livekit.AnalyticsStat) { - t.enqueue(func() { - t.internalService.TrackStats(streamType, participantID, trackID, stats) - }) +func (t *telemetryService) getWorker(participantID livekit.ParticipantID) (worker *StatsWorker, ok bool) { + t.lock.RLock() + defer t.lock.RUnlock() + + worker, ok = t.workers[participantID] + return } -func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) { - t.enqueue(func() { - t.internalService.RoomStarted(ctx, room) - }) -} +func (t *telemetryService) cleanupWorkers() { + t.lock.Lock() + defer t.lock.Unlock() -func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { - t.enqueue(func() { - t.internalService.RoomEnded(ctx, room) - }) -} - -func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, - clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta) { - t.enqueue(func() { - t.internalService.ParticipantJoined(ctx, room, participant, clientInfo, clientMeta) - }) -} - -func (t *telemetryService) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) { - t.enqueue(func() { - t.internalService.ParticipantActive(ctx, room, participant, clientMeta) - }) -} - -func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { - t.enqueue(func() { - t.internalService.ParticipantLeft(ctx, room, participant) - }) -} - -func (t *telemetryService) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) { - t.enqueue(func() { - t.internalService.TrackPublished(ctx, participantID, identity, track) - }) -} - -func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32) { - t.enqueue(func() { - t.internalService.TrackUnpublished(ctx, participantID, identity, track, ssrc) - }) -} - -func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo) { - t.enqueue(func() { - t.internalService.TrackSubscribed(ctx, participantID, track, publisher) - }) -} - -func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { - t.enqueue(func() { - t.internalService.TrackUnsubscribed(ctx, participantID, track) - }) -} - -func (t *telemetryService) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { - t.enqueue(func() { - t.internalService.TrackPublishedUpdate(ctx, participantID, track) - }) -} - -func (t *telemetryService) TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, mime string, maxQuality livekit.VideoQuality) { - t.enqueue(func() { - t.internalService.TrackMaxSubscribedVideoQuality(ctx, participantID, track, mime, maxQuality) - }) -} - -func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) { - t.enqueue(func() { - t.internalService.EgressStarted(ctx, info) - }) -} - -func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) { - t.enqueue(func() { - t.internalService.EgressEnded(ctx, info) - }) + for participantID, worker := range t.workers { + closedAt := worker.ClosedAt() + if !closedAt.IsZero() && time.Since(closedAt) > workerCleanupWait { + logger.Debugw("reaping analytics worker for participant", "pID", participantID) + delete(t.workers, participantID) + } + } } diff --git a/pkg/telemetry/telemetryserviceinternal.go b/pkg/telemetry/telemetryserviceinternal.go deleted file mode 100644 index 02afc381e..000000000 --- a/pkg/telemetry/telemetryserviceinternal.go +++ /dev/null @@ -1,145 +0,0 @@ -package telemetry - -import ( - "context" - "sync" - "time" - - "github.com/gammazero/workerpool" - - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/webhook" - - "github.com/livekit/livekit-server/pkg/telemetry/prometheus" -) - -const ( - maxWebhookWorkers = 50 - workerCleanupWait = 3 * time.Minute -) - -type TelemetryServiceInternal interface { - TelemetryService - SendAnalytics() - CleanupWorkers() -} - -type TelemetryReporter interface { - Report(ctx context.Context, stats []*livekit.AnalyticsStat) -} - -type telemetryServiceInternal struct { - notifier webhook.Notifier - webhookPool *workerpool.WorkerPool - - // one worker per participant - workersMu sync.RWMutex - workers []*StatsWorker - workersIdx map[livekit.ParticipantID]int - - analytics AnalyticsService -} - -func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsService) TelemetryServiceInternal { - return &telemetryServiceInternal{ - notifier: notifier, - webhookPool: workerpool.New(maxWebhookWorkers), - workersIdx: make(map[livekit.ParticipantID]int), - analytics: analytics, - } -} - -func (t *telemetryServiceInternal) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat) { - direction := prometheus.Incoming - if streamType == livekit.StreamType_DOWNSTREAM { - direction = prometheus.Outgoing - } - - nacks := uint32(0) - plis := uint32(0) - firs := uint32(0) - packets := uint32(0) - bytes := uint64(0) - retransmitBytes := uint64(0) - retransmitPackets := uint32(0) - for _, stream := range stat.Streams { - nacks += stream.Nacks - plis += stream.Plis - firs += stream.Firs - packets += stream.PrimaryPackets + stream.PaddingPackets - bytes += stream.PrimaryBytes + stream.PaddingBytes - if streamType == livekit.StreamType_DOWNSTREAM { - retransmitPackets += stream.RetransmitPackets - retransmitBytes += stream.RetransmitBytes - } else { - // for upstream, we don't account for these separately for now - packets += stream.RetransmitPackets - bytes += stream.RetransmitBytes - } - } - prometheus.IncrementRTCP(direction, nacks, plis, firs) - prometheus.IncrementPackets(direction, uint64(packets), false) - prometheus.IncrementBytes(direction, bytes, false) - if retransmitPackets != 0 { - prometheus.IncrementPackets(direction, uint64(retransmitPackets), true) - } - if retransmitBytes != 0 { - prometheus.IncrementBytes(direction, retransmitBytes, true) - } - - if w := t.getStatsWorker(participantID); w != nil { - w.OnTrackStat(trackID, streamType, stat) - } -} - -func (t *telemetryServiceInternal) Report(ctx context.Context, stats []*livekit.AnalyticsStat) { - t.analytics.SendStats(ctx, stats) -} - -func (t *telemetryServiceInternal) SendAnalytics() { - t.workersMu.RLock() - workers := t.workers - t.workersMu.RUnlock() - - for _, worker := range workers { - if worker != nil { - worker.Update() - } - } -} - -func (t *telemetryServiceInternal) CleanupWorkers() { - t.workersMu.RLock() - workers := t.workers - t.workersMu.RUnlock() - - for _, worker := range workers { - if worker == nil { - continue - } - - closedAt := worker.ClosedAt() - if !closedAt.IsZero() && time.Since(closedAt) > workerCleanupWait { - pID := worker.ParticipantID() - logger.Debugw("reaping analytics worker for participant", "pID", pID) - t.workersMu.Lock() - if idx, ok := t.workersIdx[pID]; ok { - delete(t.workersIdx, pID) - t.workers[idx] = nil - } - t.workersMu.Unlock() - } - } -} - -func (t *telemetryServiceInternal) getStatsWorker(participantID livekit.ParticipantID) *StatsWorker { - t.workersMu.RLock() - defer t.workersMu.RUnlock() - - if idx, ok := t.workersIdx[participantID]; ok { - return t.workers[idx] - } - - return nil -} diff --git a/pkg/telemetry/telemetryserviceinternalevents.go b/pkg/telemetry/telemetryserviceinternalevents.go deleted file mode 100644 index d21bfb46c..000000000 --- a/pkg/telemetry/telemetryserviceinternalevents.go +++ /dev/null @@ -1,308 +0,0 @@ -package telemetry - -import ( - "context" - "time" - - "google.golang.org/protobuf/types/known/timestamppb" - - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/utils" - "github.com/livekit/protocol/webhook" - - "github.com/livekit/livekit-server/pkg/telemetry/prometheus" -) - -func (t *telemetryServiceInternal) RoomStarted(ctx context.Context, room *livekit.Room) { - t.notifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventRoomStarted, - Room: room, - }) - - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_ROOM_CREATED, - Timestamp: ×tamppb.Timestamp{Seconds: room.CreationTime}, - Room: room, - }) -} - -func (t *telemetryServiceInternal) RoomEnded(ctx context.Context, room *livekit.Room) { - t.notifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventRoomFinished, - Room: room, - }) - - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_ROOM_ENDED, - Timestamp: timestamppb.Now(), - RoomId: room.Sid, - Room: room, - }) -} - -func (t *telemetryServiceInternal) ParticipantJoined( - ctx context.Context, - room *livekit.Room, - participant *livekit.ParticipantInfo, - clientInfo *livekit.ClientInfo, - clientMeta *livekit.AnalyticsClientMeta, -) { - prometheus.IncrementParticipantJoin(1) - prometheus.AddParticipant() - - newWorker := newStatsWorker( - ctx, - t, - livekit.RoomID(room.Sid), - livekit.RoomName(room.Name), - livekit.ParticipantID(participant.Sid), - livekit.ParticipantIdentity(participant.Identity), - ) - t.workersMu.Lock() - var free = false - for idx, worker := range t.workers { - if worker != nil { - continue - } - - free = true - t.workersIdx[livekit.ParticipantID(participant.Sid)] = idx - t.workers[idx] = newWorker - break - } - - if !free { - t.workersIdx[livekit.ParticipantID(participant.Sid)] = len(t.workers) - t.workers = append(t.workers, newWorker) - } - t.workersMu.Unlock() - - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED, - Timestamp: timestamppb.Now(), - RoomId: room.Sid, - ParticipantId: participant.Sid, - Participant: participant, - Room: room, - ClientInfo: clientInfo, - ClientMeta: clientMeta, - }) -} - -func (t *telemetryServiceInternal) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) { - // consider participant joined only when they became active - t.notifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventParticipantJoined, - Room: room, - Participant: participant, - }) - - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_PARTICIPANT_ACTIVE, - Timestamp: timestamppb.Now(), - RoomId: room.Sid, - ParticipantId: participant.Sid, - Room: room, - ClientMeta: clientMeta, - }) -} - -func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { - w := t.getStatsWorker(livekit.ParticipantID(participant.Sid)) - if w != nil { - w.Close() - } - - prometheus.SubParticipant() - - t.notifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventParticipantLeft, - Room: room, - Participant: participant, - }) - - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_PARTICIPANT_LEFT, - Timestamp: timestamppb.Now(), - RoomId: room.Sid, - ParticipantId: participant.Sid, - Participant: participant, - Room: room, - }) -} - -func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) { - prometheus.AddPublishedTrack(track.Type.String()) - - roomID, roomName := t.getRoomDetails(participantID) - t.notifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventTrackPublished, - Room: &livekit.Room{ - Sid: string(roomID), - Name: string(roomName), - }, - Participant: &livekit.ParticipantInfo{ - Sid: string(participantID), - Identity: string(identity), - }, - Track: track, - }) - - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_TRACK_PUBLISHED, - Timestamp: timestamppb.Now(), - RoomId: string(roomID), - ParticipantId: string(participantID), - Participant: &livekit.ParticipantInfo{ - Sid: string(participantID), - Identity: string(identity), - }, - Track: track, - Room: &livekit.Room{Name: string(roomName)}, - }) -} - -func (t *telemetryServiceInternal) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { - roomID, roomName := t.getRoomDetails(participantID) - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_TRACK_PUBLISHED_UPDATE, - Timestamp: timestamppb.Now(), - RoomId: string(roomID), - ParticipantId: string(participantID), - Track: track, - Room: &livekit.Room{Name: string(roomName)}, - }) -} - -func (t *telemetryServiceInternal) TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, - mime string, maxQuality livekit.VideoQuality) { - - roomID, roomName := t.getRoomDetails(participantID) - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_TRACK_MAX_SUBSCRIBED_VIDEO_QUALITY, - Timestamp: timestamppb.Now(), - RoomId: string(roomID), - ParticipantId: string(participantID), - Track: track, - Room: &livekit.Room{Name: string(roomName)}, - MaxSubscribedVideoQuality: maxQuality, - Mime: mime, - }) -} - -func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32) { - roomID := livekit.RoomID("") - roomName := livekit.RoomName("") - w := t.getStatsWorker(participantID) - if w != nil { - roomID = w.roomID - roomName = w.roomName - } - - prometheus.SubPublishedTrack(track.Type.String()) - - t.notifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventTrackUnpublished, - Room: &livekit.Room{ - Sid: string(roomID), - Name: string(roomName), - }, - Participant: &livekit.ParticipantInfo{ - Sid: string(participantID), - Identity: string(identity), - }, - Track: track, - }) - - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_TRACK_UNPUBLISHED, - Timestamp: timestamppb.Now(), - RoomId: string(roomID), - ParticipantId: string(participantID), - TrackId: track.Sid, - Room: &livekit.Room{Name: string(roomName)}, - }) -} - -func (t *telemetryServiceInternal) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, - publisher *livekit.ParticipantInfo) { - prometheus.AddSubscribedTrack(track.Type.String()) - - roomID, roomName := t.getRoomDetails(participantID) - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_TRACK_SUBSCRIBED, - Timestamp: timestamppb.Now(), - RoomId: string(roomID), - ParticipantId: string(participantID), - Track: track, - Room: &livekit.Room{Name: string(roomName)}, - Publisher: publisher, - }) -} - -func (t *telemetryServiceInternal) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { - prometheus.SubSubscribedTrack(track.Type.String()) - - roomID, roomName := t.getRoomDetails(participantID) - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_TRACK_UNSUBSCRIBED, - Timestamp: timestamppb.Now(), - RoomId: string(roomID), - ParticipantId: string(participantID), - TrackId: track.Sid, - Room: &livekit.Room{Name: string(roomName)}, - }) -} - -func (t *telemetryServiceInternal) getRoomDetails(participantID livekit.ParticipantID) (livekit.RoomID, livekit.RoomName) { - if w := t.getStatsWorker(participantID); w != nil { - return w.roomID, w.roomName - } - return "", "" -} - -func (t *telemetryServiceInternal) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) { - if t.notifier == nil { - return - } - - event.CreatedAt = time.Now().Unix() - event.Id = utils.NewGuid("EV_") - - t.webhookPool.Submit(func() { - if err := t.notifier.Notify(ctx, event); err != nil { - logger.Warnw("failed to notify webhook", err, "event", event.Event) - } - }) -} - -func (t *telemetryServiceInternal) EgressStarted(ctx context.Context, info *livekit.EgressInfo) { - t.notifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventEgressStarted, - EgressInfo: info, - }) - - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_EGRESS_STARTED, - Timestamp: timestamppb.Now(), - EgressId: info.EgressId, - RoomId: info.RoomId, - Egress: info, - }) -} - -func (t *telemetryServiceInternal) EgressEnded(ctx context.Context, info *livekit.EgressInfo) { - t.notifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventEgressEnded, - EgressInfo: info, - }) - - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_EGRESS_ENDED, - Timestamp: timestamppb.Now(), - EgressId: info.EgressId, - RoomId: info.RoomId, - Egress: info, - }) -}