From 07db1ba726cdc4d50a0a4736eed8ff79c2d82251 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 30 Dec 2021 16:43:20 +0530 Subject: [PATCH] Some more files with types (#302) --- pkg/routing/interfaces.go | 20 +++++----- pkg/routing/localrouter.go | 12 +++--- pkg/routing/utils.go | 6 ++- pkg/rtc/types/typesfakes/fake_participant.go | 12 +++--- pkg/service/auth.go | 5 ++- pkg/service/interfaces.go | 18 ++++----- pkg/service/localroomstore.go | 28 +++++++------- pkg/service/redisroomstore.go | 18 ++++----- pkg/service/roomservice.go | 6 +-- pkg/service/wire_gen.go | 3 +- pkg/telemetry/statsworker.go | 38 +++++++++---------- pkg/telemetry/telemetryservice.go | 28 +++++++------- pkg/telemetry/telemetryserviceinternal.go | 10 ++--- .../telemetryserviceinternalevents.go | 10 ++--- 14 files changed, 109 insertions(+), 105 deletions(-) diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 4fde6b0fd..e55a24368 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -27,8 +27,8 @@ type MessageSource interface { } type ParticipantInit struct { - Identity string - Name string + Identity livekit.ParticipantIdentity + Name livekit.ParticipantName Metadata string Reconnect bool Permission *livekit.ParticipantPermission @@ -38,8 +38,8 @@ type ParticipantInit struct { Client *livekit.ClientInfo } -type NewParticipantCallback func(ctx context.Context, roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink) -type RTCMessageCallback func(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) +type NewParticipantCallback func(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink) +type RTCMessageCallback func(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) // Router allows multiple nodes to coordinate the participant session //counterfeiter:generate . Router @@ -52,9 +52,9 @@ type Router interface { ListNodes() ([]*livekit.Node, error) - GetNodeForRoom(ctx context.Context, roomName string) (*livekit.Node, error) - SetNodeForRoom(ctx context.Context, roomName, nodeId string) error - ClearRoomState(ctx context.Context, roomName string) error + GetNodeForRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Node, error) + SetNodeForRoom(ctx context.Context, roomName livekit.RoomName, nodeId string) error + ClearRoomState(ctx context.Context, roomName livekit.RoomName) error Start() error Drain() @@ -69,11 +69,11 @@ type Router interface { type MessageRouter interface { // StartParticipantSignal participant signal connection is ready to start - StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) + StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) // Write a message to a participant or room - WriteParticipantRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error - WriteRoomRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error + WriteParticipantRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error + WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error } func CreateRouter(rc *redis.Client, node LocalNode) Router { diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index 1c8452c92..a7231be94 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -35,18 +35,18 @@ func NewLocalRouter(currentNode LocalNode) *LocalRouter { } } -func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ string) (*livekit.Node, error) { +func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error) { r.lock.Lock() defer r.lock.Unlock() node := proto.Clone((*livekit.Node)(r.currentNode)).(*livekit.Node) return node, nil } -func (r *LocalRouter) SetNodeForRoom(_ context.Context, _, _ string) error { +func (r *LocalRouter) SetNodeForRoom(_ context.Context, _, _ livekit.RoomName) error { return nil } -func (r *LocalRouter) ClearRoomState(_ context.Context, _ string) error { +func (r *LocalRouter) ClearRoomState(_ context.Context, _ livekit.RoomName) error { // do nothing return nil } @@ -76,7 +76,7 @@ func (r *LocalRouter) ListNodes() ([]*livekit.Node, error) { }, nil } -func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionID string, reqSink MessageSink, resSource MessageSource, err error) { +func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID string, reqSink MessageSink, resSource MessageSource, err error) { // treat it as a new participant connecting if r.onNewParticipant == nil { err = ErrHandlerNotDefined @@ -110,7 +110,7 @@ func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName strin return pi.Identity, reqChan, resChan, nil } -func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error { +func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error { if r.rtcMessageChan.IsClosed() { // create a new one r.rtcMessageChan = NewMessageChannel() @@ -119,7 +119,7 @@ func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName, identity return r.writeRTCMessage(r.rtcMessageChan, msg) } -func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error { +func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error { msg.ParticipantKey = participantKey(roomName, identity) return r.WriteNodeRTC(ctx, r.currentNode.Id, msg) } diff --git a/pkg/routing/utils.go b/pkg/routing/utils.go index 2c9b5d2ee..d65f7c916 100644 --- a/pkg/routing/utils.go +++ b/pkg/routing/utils.go @@ -3,13 +3,15 @@ package routing import ( "errors" "strings" + + "github.com/livekit/protocol/livekit" ) -func participantKey(roomName, identity string) string { +func participantKey(roomName livekit.RoomName, identity livekit.ParticipantIdentity) string { return roomName + "|" + identity } -func parseParticipantKey(pkey string) (roomName string, identity string, err error) { +func parseParticipantKey(pkey string) (roomName livekit.RoomName, identity livekit.ParticipantIdentity, err error) { parts := strings.Split(pkey, "|") if len(parts) != 2 { err = errors.New("invalid participant key") diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 8b693f3b5..d8a016606 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -494,11 +494,11 @@ type FakeParticipant struct { toProtoReturnsOnCall map[int]struct { result1 *livekit.ParticipantInfo } - UpdateSubscriptionPermissionsStub func(*livekit.UpdateSubscriptionPermissions, func(participantSid string) types.Participant) error + UpdateSubscriptionPermissionsStub func(*livekit.UpdateSubscriptionPermissions, func(participantID string) types.Participant) error updateSubscriptionPermissionsMutex sync.RWMutex updateSubscriptionPermissionsArgsForCall []struct { arg1 *livekit.UpdateSubscriptionPermissions - arg2 func(participantSid string) types.Participant + arg2 func(participantID string) types.Participant } updateSubscriptionPermissionsReturns struct { result1 error @@ -3142,12 +3142,12 @@ func (fake *FakeParticipant) ToProtoReturnsOnCall(i int, result1 *livekit.Partic }{result1} } -func (fake *FakeParticipant) UpdateSubscriptionPermissions(arg1 *livekit.UpdateSubscriptionPermissions, arg2 func(participantSid string) types.Participant) error { +func (fake *FakeParticipant) UpdateSubscriptionPermissions(arg1 *livekit.UpdateSubscriptionPermissions, arg2 func(participantID string) types.Participant) error { fake.updateSubscriptionPermissionsMutex.Lock() ret, specificReturn := fake.updateSubscriptionPermissionsReturnsOnCall[len(fake.updateSubscriptionPermissionsArgsForCall)] fake.updateSubscriptionPermissionsArgsForCall = append(fake.updateSubscriptionPermissionsArgsForCall, struct { arg1 *livekit.UpdateSubscriptionPermissions - arg2 func(participantSid string) types.Participant + arg2 func(participantID string) types.Participant }{arg1, arg2}) stub := fake.UpdateSubscriptionPermissionsStub fakeReturns := fake.updateSubscriptionPermissionsReturns @@ -3168,13 +3168,13 @@ func (fake *FakeParticipant) UpdateSubscriptionPermissionsCallCount() int { return len(fake.updateSubscriptionPermissionsArgsForCall) } -func (fake *FakeParticipant) UpdateSubscriptionPermissionsCalls(stub func(*livekit.UpdateSubscriptionPermissions, func(participantSid string) types.Participant) error) { +func (fake *FakeParticipant) UpdateSubscriptionPermissionsCalls(stub func(*livekit.UpdateSubscriptionPermissions, func(participantID string) types.Participant) error) { fake.updateSubscriptionPermissionsMutex.Lock() defer fake.updateSubscriptionPermissionsMutex.Unlock() fake.UpdateSubscriptionPermissionsStub = stub } -func (fake *FakeParticipant) UpdateSubscriptionPermissionsArgsForCall(i int) (*livekit.UpdateSubscriptionPermissions, func(participantSid string) types.Participant) { +func (fake *FakeParticipant) UpdateSubscriptionPermissionsArgsForCall(i int) (*livekit.UpdateSubscriptionPermissions, func(participantID string) types.Participant) { fake.updateSubscriptionPermissionsMutex.RLock() defer fake.updateSubscriptionPermissionsMutex.RUnlock() argsForCall := fake.updateSubscriptionPermissionsArgsForCall[i] diff --git a/pkg/service/auth.go b/pkg/service/auth.go index 255dc609d..64de57f74 100644 --- a/pkg/service/auth.go +++ b/pkg/service/auth.go @@ -9,6 +9,7 @@ import ( "github.com/twitchtv/twirp" "github.com/livekit/protocol/auth" + "github.com/livekit/protocol/livekit" ) const ( @@ -92,7 +93,7 @@ func SetAuthorizationToken(r *http.Request, token string) { r.Header.Set(authorizationHeader, bearerPrefix+token) } -func EnsureJoinPermission(ctx context.Context) (name string, err error) { +func EnsureJoinPermission(ctx context.Context) (name livekit.RoomName, err error) { claims := GetGrants(ctx) if claims == nil || claims.Video == nil { err = ErrPermissionDenied @@ -107,7 +108,7 @@ func EnsureJoinPermission(ctx context.Context) (name string, err error) { return } -func EnsureAdminPermission(ctx context.Context, room string) error { +func EnsureAdminPermission(ctx context.Context, room livekit.RoomName) error { claims := GetGrants(ctx) if claims == nil || claims.Video == nil { return ErrPermissionDenied diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 04cefe022..62335fe40 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -16,25 +16,25 @@ type RoomStore interface { // enable locking on a specific room to prevent race // returns a (lock uuid, error) - LockRoom(ctx context.Context, name string, duration time.Duration) (string, error) - UnlockRoom(ctx context.Context, name string, uid string) error + LockRoom(ctx context.Context, name livekit.RoomName, duration time.Duration) (string, error) + UnlockRoom(ctx context.Context, name livekit.RoomName, uid string) error StoreRoom(ctx context.Context, room *livekit.Room) error - DeleteRoom(ctx context.Context, name string) error + DeleteRoom(ctx context.Context, name livekit.RoomName) error - StoreParticipant(ctx context.Context, roomName string, participant *livekit.ParticipantInfo) error - DeleteParticipant(ctx context.Context, roomName, identity string) error + StoreParticipant(ctx context.Context, roomName livekit.RoomName, participant *livekit.ParticipantInfo) error + DeleteParticipant(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) error } //counterfeiter:generate . RORoomStore type RORoomStore interface { - LoadRoom(ctx context.Context, name string) (*livekit.Room, error) + LoadRoom(ctx context.Context, name livekit.RoomName) (*livekit.Room, error) // ListRooms returns currently active rooms. if names is not nil, it'll filter and return // only rooms that match - ListRooms(ctx context.Context, names []string) ([]*livekit.Room, error) + ListRooms(ctx context.Context, names []livekit.RoomName) ([]*livekit.Room, error) - LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) - ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error) + LoadParticipant(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error) + ListParticipants(ctx context.Context, roomName livekit.RoomName) ([]*livekit.ParticipantInfo, error) } //counterfeiter:generate . RoomAllocator diff --git a/pkg/service/localroomstore.go b/pkg/service/localroomstore.go index eaf51bcee..73580e4e1 100644 --- a/pkg/service/localroomstore.go +++ b/pkg/service/localroomstore.go @@ -12,17 +12,17 @@ import ( // encapsulates CRUD operations for room settings type LocalRoomStore struct { // map of roomName => room - rooms map[string]*livekit.Room + rooms map[livekit.RoomName]*livekit.Room // map of roomName => { identity: participant } - participants map[string]map[string]*livekit.ParticipantInfo + participants map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo lock sync.RWMutex globalLock sync.Mutex } func NewLocalRoomStore() *LocalRoomStore { return &LocalRoomStore{ - rooms: make(map[string]*livekit.Room), - participants: make(map[string]map[string]*livekit.ParticipantInfo), + rooms: make(map[livekit.RoomName]*livekit.Room), + participants: make(map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo), lock: sync.RWMutex{}, } } @@ -37,7 +37,7 @@ func (p *LocalRoomStore) StoreRoom(_ context.Context, room *livekit.Room) error return nil } -func (p *LocalRoomStore) LoadRoom(_ context.Context, name string) (*livekit.Room, error) { +func (p *LocalRoomStore) LoadRoom(_ context.Context, name livekit.RoomName) (*livekit.Room, error) { p.lock.RLock() defer p.lock.RUnlock() @@ -48,7 +48,7 @@ func (p *LocalRoomStore) LoadRoom(_ context.Context, name string) (*livekit.Room return room, nil } -func (p *LocalRoomStore) ListRooms(_ context.Context, names []string) ([]*livekit.Room, error) { +func (p *LocalRoomStore) ListRooms(_ context.Context, names []livekit.RoomName) ([]*livekit.Room, error) { p.lock.RLock() defer p.lock.RUnlock() rooms := make([]*livekit.Room, 0, len(p.rooms)) @@ -60,7 +60,7 @@ func (p *LocalRoomStore) ListRooms(_ context.Context, names []string) ([]*liveki return rooms, nil } -func (p *LocalRoomStore) DeleteRoom(ctx context.Context, name string) error { +func (p *LocalRoomStore) DeleteRoom(ctx context.Context, name livekit.RoomName) error { room, err := p.LoadRoom(ctx, name) if err == ErrRoomNotFound { return nil @@ -76,30 +76,30 @@ func (p *LocalRoomStore) DeleteRoom(ctx context.Context, name string) error { return nil } -func (p *LocalRoomStore) LockRoom(_ context.Context, _ string, _ time.Duration) (string, error) { +func (p *LocalRoomStore) LockRoom(_ context.Context, _ livekit.RoomName, _ time.Duration) (string, error) { // local rooms lock & unlock globally p.globalLock.Lock() return "", nil } -func (p *LocalRoomStore) UnlockRoom(_ context.Context, _ string, _ string) error { +func (p *LocalRoomStore) UnlockRoom(_ context.Context, _ livekit.RoomName, _ string) error { p.globalLock.Unlock() return nil } -func (p *LocalRoomStore) StoreParticipant(_ context.Context, roomName string, participant *livekit.ParticipantInfo) error { +func (p *LocalRoomStore) StoreParticipant(_ context.Context, roomName livekit.RoomName, participant *livekit.ParticipantInfo) error { p.lock.Lock() defer p.lock.Unlock() roomParticipants := p.participants[roomName] if roomParticipants == nil { - roomParticipants = make(map[string]*livekit.ParticipantInfo) + roomParticipants = make(map[livekit.ParticipantIdentity]*livekit.ParticipantInfo) p.participants[roomName] = roomParticipants } roomParticipants[participant.Identity] = participant return nil } -func (p *LocalRoomStore) LoadParticipant(_ context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) { +func (p *LocalRoomStore) LoadParticipant(_ context.Context, roomName, identity livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error) { p.lock.RLock() defer p.lock.RUnlock() @@ -114,7 +114,7 @@ func (p *LocalRoomStore) LoadParticipant(_ context.Context, roomName, identity s return participant, nil } -func (p *LocalRoomStore) ListParticipants(_ context.Context, roomName string) ([]*livekit.ParticipantInfo, error) { +func (p *LocalRoomStore) ListParticipants(_ context.Context, roomName livekit.RoomName) ([]*livekit.ParticipantInfo, error) { p.lock.RLock() defer p.lock.RUnlock() @@ -132,7 +132,7 @@ func (p *LocalRoomStore) ListParticipants(_ context.Context, roomName string) ([ return items, nil } -func (p *LocalRoomStore) DeleteParticipant(_ context.Context, roomName, identity string) error { +func (p *LocalRoomStore) DeleteParticipant(_ context.Context, roomName, identity livekit.ParticipantIdentity) error { p.lock.Lock() defer p.lock.Unlock() diff --git a/pkg/service/redisroomstore.go b/pkg/service/redisroomstore.go index 9ee9a52f3..458f0bdf6 100644 --- a/pkg/service/redisroomstore.go +++ b/pkg/service/redisroomstore.go @@ -54,7 +54,7 @@ func (p *RedisRoomStore) StoreRoom(_ context.Context, room *livekit.Room) error return nil } -func (p *RedisRoomStore) LoadRoom(_ context.Context, name string) (*livekit.Room, error) { +func (p *RedisRoomStore) LoadRoom(_ context.Context, name livekit.RoomName) (*livekit.Room, error) { data, err := p.rc.HGet(p.ctx, RoomsKey, name).Result() if err != nil { if err == redis.Nil { @@ -72,7 +72,7 @@ func (p *RedisRoomStore) LoadRoom(_ context.Context, name string) (*livekit.Room return &room, nil } -func (p *RedisRoomStore) ListRooms(_ context.Context, names []string) ([]*livekit.Room, error) { +func (p *RedisRoomStore) ListRooms(_ context.Context, names []livekit.RoomName) ([]*livekit.Room, error) { var items []string var err error if names == nil { @@ -106,7 +106,7 @@ func (p *RedisRoomStore) ListRooms(_ context.Context, names []string) ([]*liveki return rooms, nil } -func (p *RedisRoomStore) DeleteRoom(ctx context.Context, name string) error { +func (p *RedisRoomStore) DeleteRoom(ctx context.Context, name livekit.RoomName) error { _, err := p.LoadRoom(ctx, name) if err == ErrRoomNotFound { return nil @@ -120,7 +120,7 @@ func (p *RedisRoomStore) DeleteRoom(ctx context.Context, name string) error { return err } -func (p *RedisRoomStore) LockRoom(_ context.Context, name string, duration time.Duration) (string, error) { +func (p *RedisRoomStore) LockRoom(_ context.Context, name livekit.RoomName, duration time.Duration) (string, error) { token := utils.NewGuid("LOCK") key := RoomLockPrefix + name @@ -145,7 +145,7 @@ func (p *RedisRoomStore) LockRoom(_ context.Context, name string, duration time. return "", ErrRoomLockFailed } -func (p *RedisRoomStore) UnlockRoom(_ context.Context, name string, uid string) error { +func (p *RedisRoomStore) UnlockRoom(_ context.Context, name livekit.RoomName, uid string) error { key := RoomLockPrefix + name val, err := p.rc.Get(p.ctx, key).Result() @@ -162,7 +162,7 @@ func (p *RedisRoomStore) UnlockRoom(_ context.Context, name string, uid string) return p.rc.Del(p.ctx, key).Err() } -func (p *RedisRoomStore) StoreParticipant(_ context.Context, roomName string, participant *livekit.ParticipantInfo) error { +func (p *RedisRoomStore) StoreParticipant(_ context.Context, roomName livekit.RoomName, participant *livekit.ParticipantInfo) error { key := RoomParticipantsPrefix + roomName data, err := proto.Marshal(participant) @@ -173,7 +173,7 @@ func (p *RedisRoomStore) StoreParticipant(_ context.Context, roomName string, pa return p.rc.HSet(p.ctx, key, participant.Identity, data).Err() } -func (p *RedisRoomStore) LoadParticipant(_ context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) { +func (p *RedisRoomStore) LoadParticipant(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error) { key := RoomParticipantsPrefix + roomName data, err := p.rc.HGet(p.ctx, key, identity).Result() if err == redis.Nil { @@ -189,7 +189,7 @@ func (p *RedisRoomStore) LoadParticipant(_ context.Context, roomName, identity s return &pi, nil } -func (p *RedisRoomStore) ListParticipants(_ context.Context, roomName string) ([]*livekit.ParticipantInfo, error) { +func (p *RedisRoomStore) ListParticipants(_ context.Context, roomName livekit.RoomName) ([]*livekit.ParticipantInfo, error) { key := RoomParticipantsPrefix + roomName items, err := p.rc.HVals(p.ctx, key).Result() if err == redis.Nil { @@ -209,7 +209,7 @@ func (p *RedisRoomStore) ListParticipants(_ context.Context, roomName string) ([ return participants, nil } -func (p *RedisRoomStore) DeleteParticipant(_ context.Context, roomName, identity string) error { +func (p *RedisRoomStore) DeleteParticipant(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) error { key := RoomParticipantsPrefix + roomName return p.rc.HDel(p.ctx, key, identity).Err() diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index dbf3908f7..7c622cacb 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -46,7 +46,7 @@ func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsReque return nil, twirpAuthError(err) } - var names []string + var names []livekit.RoomName if len(req.Names) > 0 { names = req.Names } @@ -225,7 +225,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return room, nil } -func (s *RoomService) writeParticipantMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error { +func (s *RoomService) writeParticipantMessage(ctx context.Context, room livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error { if err := EnsureAdminPermission(ctx, room); err != nil { return twirpAuthError(err) } @@ -238,7 +238,7 @@ func (s *RoomService) writeParticipantMessage(ctx context.Context, room, identit return s.router.WriteParticipantRTC(ctx, room, identity, msg) } -func (s *RoomService) writeRoomMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error { +func (s *RoomService) writeRoomMessage(ctx context.Context, room livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error { if err := EnsureAdminPermission(ctx, room); err != nil { return twirpAuthError(err) } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 0570b3e6f..7c2853612 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,7 +1,8 @@ // Code generated by Wire. DO NOT EDIT. //go:generate go run github.com/google/wire/cmd/wire -//+build !wireinject +//go:build !wireinject +// +build !wireinject package service diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index c5e6879ae..3b32a3adf 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -13,15 +13,15 @@ import ( type StatsWorker struct { ctx context.Context t TelemetryReporter - roomID string - roomName string - participantID string + roomID livekit.RoomID + roomName livekit.RoomName + participantID livekit.ParticipantID - upstreamBuffers map[string][]*buffer.Buffer - drainUpstreamBuffers map[string]bool + upstreamBuffers map[livekit.TrackID][]*buffer.Buffer + drainUpstreamBuffers map[livekit.TrackID]bool - outgoingPerTrack map[string]*Stats - incomingPerTrack map[string]*Stats + outgoingPerTrack map[livekit.TrackID]*Stats + incomingPerTrack map[livekit.TrackID]*Stats } type Stats struct { @@ -34,7 +34,7 @@ type Stats struct { prevPacketsLost uint64 } -func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID, roomName, participantID string) *StatsWorker { +func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID) *StatsWorker { s := &StatsWorker{ ctx: ctx, t: t, @@ -42,25 +42,25 @@ func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID, roomName, roomName: roomName, participantID: participantID, - upstreamBuffers: make(map[string][]*buffer.Buffer), - drainUpstreamBuffers: make(map[string]bool), + upstreamBuffers: make(map[livekit.TrackID][]*buffer.Buffer), + drainUpstreamBuffers: make(map[livekit.TrackID]bool), - outgoingPerTrack: make(map[string]*Stats), - incomingPerTrack: make(map[string]*Stats), + outgoingPerTrack: make(map[livekit.TrackID]*Stats), + incomingPerTrack: make(map[livekit.TrackID]*Stats), } return s } -func (s *StatsWorker) AddBuffer(trackID string, buffer *buffer.Buffer) { +func (s *StatsWorker) AddBuffer(trackID livekit.TrackID, buffer *buffer.Buffer) { s.upstreamBuffers[trackID] = append(s.upstreamBuffers[trackID], buffer) } -func (s *StatsWorker) OnDownstreamPacket(trackID string, bytes int) { +func (s *StatsWorker) OnDownstreamPacket(trackID livekit.TrackID, bytes int) { s.getOrCreateOutgoingStatsIfEmpty(trackID).totalBytes += uint64(bytes) s.getOrCreateOutgoingStatsIfEmpty(trackID).totalPackets++ } -func (s *StatsWorker) getOrCreateOutgoingStatsIfEmpty(trackID string) *Stats { +func (s *StatsWorker) getOrCreateOutgoingStatsIfEmpty(trackID livekit.TrackID) *Stats { if s.outgoingPerTrack[trackID] == nil { s.outgoingPerTrack[trackID] = &Stats{next: &livekit.AnalyticsStat{ Kind: livekit.StreamType_DOWNSTREAM, @@ -72,7 +72,7 @@ func (s *StatsWorker) getOrCreateOutgoingStatsIfEmpty(trackID string) *Stats { return s.outgoingPerTrack[trackID] } -func (s *StatsWorker) getOrCreateIncomingStatsIfEmpty(trackID string) *Stats { +func (s *StatsWorker) getOrCreateIncomingStatsIfEmpty(trackID livekit.TrackID) *Stats { if s.incomingPerTrack[trackID] == nil { s.incomingPerTrack[trackID] = &Stats{next: &livekit.AnalyticsStat{ Kind: livekit.StreamType_UPSTREAM, @@ -84,7 +84,7 @@ func (s *StatsWorker) getOrCreateIncomingStatsIfEmpty(trackID string) *Stats { return s.incomingPerTrack[trackID] } -func (s *StatsWorker) OnRTCP(trackID string, direction livekit.StreamType, stats *livekit.AnalyticsStat) { +func (s *StatsWorker) OnRTCP(trackID livekit.TrackID, direction livekit.StreamType, stats *livekit.AnalyticsStat) { var ds *Stats if direction == livekit.StreamType_DOWNSTREAM { ds = s.getOrCreateOutgoingStatsIfEmpty(trackID) @@ -155,7 +155,7 @@ func (s *StatsWorker) collectUpstreamStats(ts *timestamppb.Timestamp, stats []*l delete(s.upstreamBuffers, trackID) delete(s.incomingPerTrack, trackID) } - s.drainUpstreamBuffers = make(map[string]bool) + s.drainUpstreamBuffers = make(map[livekit.TrackID]bool) } return stats } @@ -185,7 +185,7 @@ func (s *StatsWorker) update(stats *Stats, ts *timestamppb.Timestamp) *livekit.A return next } -func (s *StatsWorker) RemoveBuffer(trackID string) { +func (s *StatsWorker) RemoveBuffer(trackID livekit.TrackID) { s.drainUpstreamBuffers[trackID] = true } diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 9e420d629..5208fd70e 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -15,19 +15,19 @@ const updateFrequency = time.Second * 10 type TelemetryService interface { // stats - AddUpTrack(participantID string, trackID string, buff *buffer.Buffer) - OnDownstreamPacket(participantID string, trackID string, bytes int) - HandleRTCP(streamType livekit.StreamType, participantID string, trackID string, pkts []rtcp.Packet) + AddUpTrack(participantID livekit.ParticipantID, trackID livekit.TrackID, buff *buffer.Buffer) + OnDownstreamPacket(participantID livekit.ParticipantID, trackID livekit.TrackID, bytes int) + HandleRTCP(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, pkts []rtcp.Packet) // events RoomStarted(ctx context.Context, room *livekit.Room) RoomEnded(ctx context.Context, room *livekit.Room) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) - TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) - TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) - TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) - TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) + TrackPublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) + TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) + TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) + TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) } @@ -67,19 +67,19 @@ func (t *telemetryService) run() { } } -func (t *telemetryService) AddUpTrack(participantID string, trackID string, buff *buffer.Buffer) { +func (t *telemetryService) AddUpTrack(participantID livekit.ParticipantID, trackID livekit.TrackID, buff *buffer.Buffer) { t.jobQueue <- func() { t.internalService.AddUpTrack(participantID, trackID, buff) } } -func (t *telemetryService) OnDownstreamPacket(participantID string, trackID string, bytes int) { +func (t *telemetryService) OnDownstreamPacket(participantID livekit.ParticipantID, trackID livekit.TrackID, bytes int) { t.jobQueue <- func() { t.internalService.OnDownstreamPacket(participantID, trackID, bytes) } } -func (t *telemetryService) HandleRTCP(streamType livekit.StreamType, participantID string, trackID string, pkts []rtcp.Packet) { +func (t *telemetryService) HandleRTCP(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, pkts []rtcp.Packet) { t.jobQueue <- func() { t.internalService.HandleRTCP(streamType, participantID, trackID, pkts) } @@ -109,25 +109,25 @@ func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Ro } } -func (t *telemetryService) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) { +func (t *telemetryService) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { t.jobQueue <- func() { t.internalService.TrackPublished(ctx, participantID, track) } } -func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) { +func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) { t.jobQueue <- func() { t.internalService.TrackUnpublished(ctx, participantID, track, ssrc) } } -func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { +func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { t.jobQueue <- func() { t.internalService.TrackSubscribed(ctx, participantID, track) } } -func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { +func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { t.jobQueue <- func() { t.internalService.TrackUnsubscribed(ctx, participantID, track) } diff --git a/pkg/telemetry/telemetryserviceinternal.go b/pkg/telemetry/telemetryserviceinternal.go index 192b70150..683dd08d6 100644 --- a/pkg/telemetry/telemetryserviceinternal.go +++ b/pkg/telemetry/telemetryserviceinternal.go @@ -26,7 +26,7 @@ type telemetryServiceInternal struct { webhookPool *workerpool.WorkerPool // one worker per participant - workers map[string]*StatsWorker + workers map[livekit.ParticipantID]*StatsWorker analytics AnalyticsService } @@ -35,26 +35,26 @@ func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsS return &telemetryServiceInternal{ notifier: notifier, webhookPool: workerpool.New(1), - workers: make(map[string]*StatsWorker), + workers: make(map[livekit.ParticipantID]*StatsWorker), analytics: analytics, } } -func (t *telemetryServiceInternal) AddUpTrack(participantID string, trackID string, buff *buffer.Buffer) { +func (t *telemetryServiceInternal) AddUpTrack(participantID livekit.ParticipantID, trackID livekit.TrackID, buff *buffer.Buffer) { w := t.workers[participantID] if w != nil { w.AddBuffer(trackID, buff) } } -func (t *telemetryServiceInternal) OnDownstreamPacket(participantID string, trackID string, bytes int) { +func (t *telemetryServiceInternal) OnDownstreamPacket(participantID livekit.ParticipantID, trackID livekit.TrackID, bytes int) { w := t.workers[participantID] if w != nil { w.OnDownstreamPacket(trackID, bytes) } } -func (t *telemetryServiceInternal) HandleRTCP(streamType livekit.StreamType, participantID string, trackID string, pkts []rtcp.Packet) { +func (t *telemetryServiceInternal) HandleRTCP(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, pkts []rtcp.Packet) { stats := &livekit.AnalyticsStat{} for _, pkt := range pkts { switch pkt := pkt.(type) { diff --git a/pkg/telemetry/telemetryserviceinternalevents.go b/pkg/telemetry/telemetryserviceinternalevents.go index ff4e65226..aa2ca5102 100644 --- a/pkg/telemetry/telemetryserviceinternalevents.go +++ b/pkg/telemetry/telemetryserviceinternalevents.go @@ -90,7 +90,7 @@ func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *li }) } -func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) { +func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { prometheus.AddPublishedTrack(track.Type.String()) roomID, roomName := t.getRoomDetails(participantID) @@ -104,7 +104,7 @@ func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participa }) } -func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) { +func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) { roomID := "" roomName := "" w := t.workers[participantID] @@ -126,7 +126,7 @@ func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, partici }) } -func (t *telemetryServiceInternal) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { +func (t *telemetryServiceInternal) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { prometheus.AddSubscribedTrack(track.Type.String()) roomID, roomName := t.getRoomDetails(participantID) @@ -140,7 +140,7 @@ func (t *telemetryServiceInternal) TrackSubscribed(ctx context.Context, particip }) } -func (t *telemetryServiceInternal) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { +func (t *telemetryServiceInternal) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { prometheus.SubSubscribedTrack(track.Type.String()) roomID, roomName := t.getRoomDetails(participantID) @@ -182,7 +182,7 @@ func (t *telemetryServiceInternal) RecordingEnded(ctx context.Context, ri *livek }) } -func (t *telemetryServiceInternal) getRoomDetails(participantID string) (string, string) { +func (t *telemetryServiceInternal) getRoomDetails(participantID livekit.ParticipantID) (livekit.RoomID, livekit.RoomName) { w := t.workers[participantID] if w != nil { return w.roomID, w.roomName