Some more files with types (#302)

This commit is contained in:
Raja Subramanian
2021-12-30 16:43:20 +05:30
committed by GitHub
parent 1b66fe1e23
commit 07db1ba726
14 changed files with 109 additions and 105 deletions
+10 -10
View File
@@ -27,8 +27,8 @@ type MessageSource interface {
}
type ParticipantInit struct {
Identity string
Name string
Identity livekit.ParticipantIdentity
Name livekit.ParticipantName
Metadata string
Reconnect bool
Permission *livekit.ParticipantPermission
@@ -38,8 +38,8 @@ type ParticipantInit struct {
Client *livekit.ClientInfo
}
type NewParticipantCallback func(ctx context.Context, roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink)
type RTCMessageCallback func(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage)
type NewParticipantCallback func(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink)
type RTCMessageCallback func(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage)
// Router allows multiple nodes to coordinate the participant session
//counterfeiter:generate . Router
@@ -52,9 +52,9 @@ type Router interface {
ListNodes() ([]*livekit.Node, error)
GetNodeForRoom(ctx context.Context, roomName string) (*livekit.Node, error)
SetNodeForRoom(ctx context.Context, roomName, nodeId string) error
ClearRoomState(ctx context.Context, roomName string) error
GetNodeForRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Node, error)
SetNodeForRoom(ctx context.Context, roomName livekit.RoomName, nodeId string) error
ClearRoomState(ctx context.Context, roomName livekit.RoomName) error
Start() error
Drain()
@@ -69,11 +69,11 @@ type Router interface {
type MessageRouter interface {
// StartParticipantSignal participant signal connection is ready to start
StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error)
StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error)
// Write a message to a participant or room
WriteParticipantRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error
WriteRoomRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error
WriteParticipantRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error
WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error
}
func CreateRouter(rc *redis.Client, node LocalNode) Router {
+6 -6
View File
@@ -35,18 +35,18 @@ func NewLocalRouter(currentNode LocalNode) *LocalRouter {
}
}
func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ string) (*livekit.Node, error) {
func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error) {
r.lock.Lock()
defer r.lock.Unlock()
node := proto.Clone((*livekit.Node)(r.currentNode)).(*livekit.Node)
return node, nil
}
func (r *LocalRouter) SetNodeForRoom(_ context.Context, _, _ string) error {
func (r *LocalRouter) SetNodeForRoom(_ context.Context, _, _ livekit.RoomName) error {
return nil
}
func (r *LocalRouter) ClearRoomState(_ context.Context, _ string) error {
func (r *LocalRouter) ClearRoomState(_ context.Context, _ livekit.RoomName) error {
// do nothing
return nil
}
@@ -76,7 +76,7 @@ func (r *LocalRouter) ListNodes() ([]*livekit.Node, error) {
}, nil
}
func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionID string, reqSink MessageSink, resSource MessageSource, err error) {
func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID string, reqSink MessageSink, resSource MessageSource, err error) {
// treat it as a new participant connecting
if r.onNewParticipant == nil {
err = ErrHandlerNotDefined
@@ -110,7 +110,7 @@ func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName strin
return pi.Identity, reqChan, resChan, nil
}
func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error {
func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
if r.rtcMessageChan.IsClosed() {
// create a new one
r.rtcMessageChan = NewMessageChannel()
@@ -119,7 +119,7 @@ func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName, identity
return r.writeRTCMessage(r.rtcMessageChan, msg)
}
func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error {
func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
msg.ParticipantKey = participantKey(roomName, identity)
return r.WriteNodeRTC(ctx, r.currentNode.Id, msg)
}
+4 -2
View File
@@ -3,13 +3,15 @@ package routing
import (
"errors"
"strings"
"github.com/livekit/protocol/livekit"
)
func participantKey(roomName, identity string) string {
func participantKey(roomName livekit.RoomName, identity livekit.ParticipantIdentity) string {
return roomName + "|" + identity
}
func parseParticipantKey(pkey string) (roomName string, identity string, err error) {
func parseParticipantKey(pkey string) (roomName livekit.RoomName, identity livekit.ParticipantIdentity, err error) {
parts := strings.Split(pkey, "|")
if len(parts) != 2 {
err = errors.New("invalid participant key")
+6 -6
View File
@@ -494,11 +494,11 @@ type FakeParticipant struct {
toProtoReturnsOnCall map[int]struct {
result1 *livekit.ParticipantInfo
}
UpdateSubscriptionPermissionsStub func(*livekit.UpdateSubscriptionPermissions, func(participantSid string) types.Participant) error
UpdateSubscriptionPermissionsStub func(*livekit.UpdateSubscriptionPermissions, func(participantID string) types.Participant) error
updateSubscriptionPermissionsMutex sync.RWMutex
updateSubscriptionPermissionsArgsForCall []struct {
arg1 *livekit.UpdateSubscriptionPermissions
arg2 func(participantSid string) types.Participant
arg2 func(participantID string) types.Participant
}
updateSubscriptionPermissionsReturns struct {
result1 error
@@ -3142,12 +3142,12 @@ func (fake *FakeParticipant) ToProtoReturnsOnCall(i int, result1 *livekit.Partic
}{result1}
}
func (fake *FakeParticipant) UpdateSubscriptionPermissions(arg1 *livekit.UpdateSubscriptionPermissions, arg2 func(participantSid string) types.Participant) error {
func (fake *FakeParticipant) UpdateSubscriptionPermissions(arg1 *livekit.UpdateSubscriptionPermissions, arg2 func(participantID string) types.Participant) error {
fake.updateSubscriptionPermissionsMutex.Lock()
ret, specificReturn := fake.updateSubscriptionPermissionsReturnsOnCall[len(fake.updateSubscriptionPermissionsArgsForCall)]
fake.updateSubscriptionPermissionsArgsForCall = append(fake.updateSubscriptionPermissionsArgsForCall, struct {
arg1 *livekit.UpdateSubscriptionPermissions
arg2 func(participantSid string) types.Participant
arg2 func(participantID string) types.Participant
}{arg1, arg2})
stub := fake.UpdateSubscriptionPermissionsStub
fakeReturns := fake.updateSubscriptionPermissionsReturns
@@ -3168,13 +3168,13 @@ func (fake *FakeParticipant) UpdateSubscriptionPermissionsCallCount() int {
return len(fake.updateSubscriptionPermissionsArgsForCall)
}
func (fake *FakeParticipant) UpdateSubscriptionPermissionsCalls(stub func(*livekit.UpdateSubscriptionPermissions, func(participantSid string) types.Participant) error) {
func (fake *FakeParticipant) UpdateSubscriptionPermissionsCalls(stub func(*livekit.UpdateSubscriptionPermissions, func(participantID string) types.Participant) error) {
fake.updateSubscriptionPermissionsMutex.Lock()
defer fake.updateSubscriptionPermissionsMutex.Unlock()
fake.UpdateSubscriptionPermissionsStub = stub
}
func (fake *FakeParticipant) UpdateSubscriptionPermissionsArgsForCall(i int) (*livekit.UpdateSubscriptionPermissions, func(participantSid string) types.Participant) {
func (fake *FakeParticipant) UpdateSubscriptionPermissionsArgsForCall(i int) (*livekit.UpdateSubscriptionPermissions, func(participantID string) types.Participant) {
fake.updateSubscriptionPermissionsMutex.RLock()
defer fake.updateSubscriptionPermissionsMutex.RUnlock()
argsForCall := fake.updateSubscriptionPermissionsArgsForCall[i]
+3 -2
View File
@@ -9,6 +9,7 @@ import (
"github.com/twitchtv/twirp"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
)
const (
@@ -92,7 +93,7 @@ func SetAuthorizationToken(r *http.Request, token string) {
r.Header.Set(authorizationHeader, bearerPrefix+token)
}
func EnsureJoinPermission(ctx context.Context) (name string, err error) {
func EnsureJoinPermission(ctx context.Context) (name livekit.RoomName, err error) {
claims := GetGrants(ctx)
if claims == nil || claims.Video == nil {
err = ErrPermissionDenied
@@ -107,7 +108,7 @@ func EnsureJoinPermission(ctx context.Context) (name string, err error) {
return
}
func EnsureAdminPermission(ctx context.Context, room string) error {
func EnsureAdminPermission(ctx context.Context, room livekit.RoomName) error {
claims := GetGrants(ctx)
if claims == nil || claims.Video == nil {
return ErrPermissionDenied
+9 -9
View File
@@ -16,25 +16,25 @@ type RoomStore interface {
// enable locking on a specific room to prevent race
// returns a (lock uuid, error)
LockRoom(ctx context.Context, name string, duration time.Duration) (string, error)
UnlockRoom(ctx context.Context, name string, uid string) error
LockRoom(ctx context.Context, name livekit.RoomName, duration time.Duration) (string, error)
UnlockRoom(ctx context.Context, name livekit.RoomName, uid string) error
StoreRoom(ctx context.Context, room *livekit.Room) error
DeleteRoom(ctx context.Context, name string) error
DeleteRoom(ctx context.Context, name livekit.RoomName) error
StoreParticipant(ctx context.Context, roomName string, participant *livekit.ParticipantInfo) error
DeleteParticipant(ctx context.Context, roomName, identity string) error
StoreParticipant(ctx context.Context, roomName livekit.RoomName, participant *livekit.ParticipantInfo) error
DeleteParticipant(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) error
}
//counterfeiter:generate . RORoomStore
type RORoomStore interface {
LoadRoom(ctx context.Context, name string) (*livekit.Room, error)
LoadRoom(ctx context.Context, name livekit.RoomName) (*livekit.Room, error)
// ListRooms returns currently active rooms. if names is not nil, it'll filter and return
// only rooms that match
ListRooms(ctx context.Context, names []string) ([]*livekit.Room, error)
ListRooms(ctx context.Context, names []livekit.RoomName) ([]*livekit.Room, error)
LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error)
ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error)
LoadParticipant(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error)
ListParticipants(ctx context.Context, roomName livekit.RoomName) ([]*livekit.ParticipantInfo, error)
}
//counterfeiter:generate . RoomAllocator
+14 -14
View File
@@ -12,17 +12,17 @@ import (
// encapsulates CRUD operations for room settings
type LocalRoomStore struct {
// map of roomName => room
rooms map[string]*livekit.Room
rooms map[livekit.RoomName]*livekit.Room
// map of roomName => { identity: participant }
participants map[string]map[string]*livekit.ParticipantInfo
participants map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo
lock sync.RWMutex
globalLock sync.Mutex
}
func NewLocalRoomStore() *LocalRoomStore {
return &LocalRoomStore{
rooms: make(map[string]*livekit.Room),
participants: make(map[string]map[string]*livekit.ParticipantInfo),
rooms: make(map[livekit.RoomName]*livekit.Room),
participants: make(map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo),
lock: sync.RWMutex{},
}
}
@@ -37,7 +37,7 @@ func (p *LocalRoomStore) StoreRoom(_ context.Context, room *livekit.Room) error
return nil
}
func (p *LocalRoomStore) LoadRoom(_ context.Context, name string) (*livekit.Room, error) {
func (p *LocalRoomStore) LoadRoom(_ context.Context, name livekit.RoomName) (*livekit.Room, error) {
p.lock.RLock()
defer p.lock.RUnlock()
@@ -48,7 +48,7 @@ func (p *LocalRoomStore) LoadRoom(_ context.Context, name string) (*livekit.Room
return room, nil
}
func (p *LocalRoomStore) ListRooms(_ context.Context, names []string) ([]*livekit.Room, error) {
func (p *LocalRoomStore) ListRooms(_ context.Context, names []livekit.RoomName) ([]*livekit.Room, error) {
p.lock.RLock()
defer p.lock.RUnlock()
rooms := make([]*livekit.Room, 0, len(p.rooms))
@@ -60,7 +60,7 @@ func (p *LocalRoomStore) ListRooms(_ context.Context, names []string) ([]*liveki
return rooms, nil
}
func (p *LocalRoomStore) DeleteRoom(ctx context.Context, name string) error {
func (p *LocalRoomStore) DeleteRoom(ctx context.Context, name livekit.RoomName) error {
room, err := p.LoadRoom(ctx, name)
if err == ErrRoomNotFound {
return nil
@@ -76,30 +76,30 @@ func (p *LocalRoomStore) DeleteRoom(ctx context.Context, name string) error {
return nil
}
func (p *LocalRoomStore) LockRoom(_ context.Context, _ string, _ time.Duration) (string, error) {
func (p *LocalRoomStore) LockRoom(_ context.Context, _ livekit.RoomName, _ time.Duration) (string, error) {
// local rooms lock & unlock globally
p.globalLock.Lock()
return "", nil
}
func (p *LocalRoomStore) UnlockRoom(_ context.Context, _ string, _ string) error {
func (p *LocalRoomStore) UnlockRoom(_ context.Context, _ livekit.RoomName, _ string) error {
p.globalLock.Unlock()
return nil
}
func (p *LocalRoomStore) StoreParticipant(_ context.Context, roomName string, participant *livekit.ParticipantInfo) error {
func (p *LocalRoomStore) StoreParticipant(_ context.Context, roomName livekit.RoomName, participant *livekit.ParticipantInfo) error {
p.lock.Lock()
defer p.lock.Unlock()
roomParticipants := p.participants[roomName]
if roomParticipants == nil {
roomParticipants = make(map[string]*livekit.ParticipantInfo)
roomParticipants = make(map[livekit.ParticipantIdentity]*livekit.ParticipantInfo)
p.participants[roomName] = roomParticipants
}
roomParticipants[participant.Identity] = participant
return nil
}
func (p *LocalRoomStore) LoadParticipant(_ context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) {
func (p *LocalRoomStore) LoadParticipant(_ context.Context, roomName, identity livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error) {
p.lock.RLock()
defer p.lock.RUnlock()
@@ -114,7 +114,7 @@ func (p *LocalRoomStore) LoadParticipant(_ context.Context, roomName, identity s
return participant, nil
}
func (p *LocalRoomStore) ListParticipants(_ context.Context, roomName string) ([]*livekit.ParticipantInfo, error) {
func (p *LocalRoomStore) ListParticipants(_ context.Context, roomName livekit.RoomName) ([]*livekit.ParticipantInfo, error) {
p.lock.RLock()
defer p.lock.RUnlock()
@@ -132,7 +132,7 @@ func (p *LocalRoomStore) ListParticipants(_ context.Context, roomName string) ([
return items, nil
}
func (p *LocalRoomStore) DeleteParticipant(_ context.Context, roomName, identity string) error {
func (p *LocalRoomStore) DeleteParticipant(_ context.Context, roomName, identity livekit.ParticipantIdentity) error {
p.lock.Lock()
defer p.lock.Unlock()
+9 -9
View File
@@ -54,7 +54,7 @@ func (p *RedisRoomStore) StoreRoom(_ context.Context, room *livekit.Room) error
return nil
}
func (p *RedisRoomStore) LoadRoom(_ context.Context, name string) (*livekit.Room, error) {
func (p *RedisRoomStore) LoadRoom(_ context.Context, name livekit.RoomName) (*livekit.Room, error) {
data, err := p.rc.HGet(p.ctx, RoomsKey, name).Result()
if err != nil {
if err == redis.Nil {
@@ -72,7 +72,7 @@ func (p *RedisRoomStore) LoadRoom(_ context.Context, name string) (*livekit.Room
return &room, nil
}
func (p *RedisRoomStore) ListRooms(_ context.Context, names []string) ([]*livekit.Room, error) {
func (p *RedisRoomStore) ListRooms(_ context.Context, names []livekit.RoomName) ([]*livekit.Room, error) {
var items []string
var err error
if names == nil {
@@ -106,7 +106,7 @@ func (p *RedisRoomStore) ListRooms(_ context.Context, names []string) ([]*liveki
return rooms, nil
}
func (p *RedisRoomStore) DeleteRoom(ctx context.Context, name string) error {
func (p *RedisRoomStore) DeleteRoom(ctx context.Context, name livekit.RoomName) error {
_, err := p.LoadRoom(ctx, name)
if err == ErrRoomNotFound {
return nil
@@ -120,7 +120,7 @@ func (p *RedisRoomStore) DeleteRoom(ctx context.Context, name string) error {
return err
}
func (p *RedisRoomStore) LockRoom(_ context.Context, name string, duration time.Duration) (string, error) {
func (p *RedisRoomStore) LockRoom(_ context.Context, name livekit.RoomName, duration time.Duration) (string, error) {
token := utils.NewGuid("LOCK")
key := RoomLockPrefix + name
@@ -145,7 +145,7 @@ func (p *RedisRoomStore) LockRoom(_ context.Context, name string, duration time.
return "", ErrRoomLockFailed
}
func (p *RedisRoomStore) UnlockRoom(_ context.Context, name string, uid string) error {
func (p *RedisRoomStore) UnlockRoom(_ context.Context, name livekit.RoomName, uid string) error {
key := RoomLockPrefix + name
val, err := p.rc.Get(p.ctx, key).Result()
@@ -162,7 +162,7 @@ func (p *RedisRoomStore) UnlockRoom(_ context.Context, name string, uid string)
return p.rc.Del(p.ctx, key).Err()
}
func (p *RedisRoomStore) StoreParticipant(_ context.Context, roomName string, participant *livekit.ParticipantInfo) error {
func (p *RedisRoomStore) StoreParticipant(_ context.Context, roomName livekit.RoomName, participant *livekit.ParticipantInfo) error {
key := RoomParticipantsPrefix + roomName
data, err := proto.Marshal(participant)
@@ -173,7 +173,7 @@ func (p *RedisRoomStore) StoreParticipant(_ context.Context, roomName string, pa
return p.rc.HSet(p.ctx, key, participant.Identity, data).Err()
}
func (p *RedisRoomStore) LoadParticipant(_ context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) {
func (p *RedisRoomStore) LoadParticipant(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error) {
key := RoomParticipantsPrefix + roomName
data, err := p.rc.HGet(p.ctx, key, identity).Result()
if err == redis.Nil {
@@ -189,7 +189,7 @@ func (p *RedisRoomStore) LoadParticipant(_ context.Context, roomName, identity s
return &pi, nil
}
func (p *RedisRoomStore) ListParticipants(_ context.Context, roomName string) ([]*livekit.ParticipantInfo, error) {
func (p *RedisRoomStore) ListParticipants(_ context.Context, roomName livekit.RoomName) ([]*livekit.ParticipantInfo, error) {
key := RoomParticipantsPrefix + roomName
items, err := p.rc.HVals(p.ctx, key).Result()
if err == redis.Nil {
@@ -209,7 +209,7 @@ func (p *RedisRoomStore) ListParticipants(_ context.Context, roomName string) ([
return participants, nil
}
func (p *RedisRoomStore) DeleteParticipant(_ context.Context, roomName, identity string) error {
func (p *RedisRoomStore) DeleteParticipant(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) error {
key := RoomParticipantsPrefix + roomName
return p.rc.HDel(p.ctx, key, identity).Err()
+3 -3
View File
@@ -46,7 +46,7 @@ func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsReque
return nil, twirpAuthError(err)
}
var names []string
var names []livekit.RoomName
if len(req.Names) > 0 {
names = req.Names
}
@@ -225,7 +225,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
return room, nil
}
func (s *RoomService) writeParticipantMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error {
func (s *RoomService) writeParticipantMessage(ctx context.Context, room livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
if err := EnsureAdminPermission(ctx, room); err != nil {
return twirpAuthError(err)
}
@@ -238,7 +238,7 @@ func (s *RoomService) writeParticipantMessage(ctx context.Context, room, identit
return s.router.WriteParticipantRTC(ctx, room, identity, msg)
}
func (s *RoomService) writeRoomMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error {
func (s *RoomService) writeRoomMessage(ctx context.Context, room livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
if err := EnsureAdminPermission(ctx, room); err != nil {
return twirpAuthError(err)
}
+2 -1
View File
@@ -1,7 +1,8 @@
// Code generated by Wire. DO NOT EDIT.
//go:generate go run github.com/google/wire/cmd/wire
//+build !wireinject
//go:build !wireinject
// +build !wireinject
package service
+19 -19
View File
@@ -13,15 +13,15 @@ import (
type StatsWorker struct {
ctx context.Context
t TelemetryReporter
roomID string
roomName string
participantID string
roomID livekit.RoomID
roomName livekit.RoomName
participantID livekit.ParticipantID
upstreamBuffers map[string][]*buffer.Buffer
drainUpstreamBuffers map[string]bool
upstreamBuffers map[livekit.TrackID][]*buffer.Buffer
drainUpstreamBuffers map[livekit.TrackID]bool
outgoingPerTrack map[string]*Stats
incomingPerTrack map[string]*Stats
outgoingPerTrack map[livekit.TrackID]*Stats
incomingPerTrack map[livekit.TrackID]*Stats
}
type Stats struct {
@@ -34,7 +34,7 @@ type Stats struct {
prevPacketsLost uint64
}
func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID, roomName, participantID string) *StatsWorker {
func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID) *StatsWorker {
s := &StatsWorker{
ctx: ctx,
t: t,
@@ -42,25 +42,25 @@ func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID, roomName,
roomName: roomName,
participantID: participantID,
upstreamBuffers: make(map[string][]*buffer.Buffer),
drainUpstreamBuffers: make(map[string]bool),
upstreamBuffers: make(map[livekit.TrackID][]*buffer.Buffer),
drainUpstreamBuffers: make(map[livekit.TrackID]bool),
outgoingPerTrack: make(map[string]*Stats),
incomingPerTrack: make(map[string]*Stats),
outgoingPerTrack: make(map[livekit.TrackID]*Stats),
incomingPerTrack: make(map[livekit.TrackID]*Stats),
}
return s
}
func (s *StatsWorker) AddBuffer(trackID string, buffer *buffer.Buffer) {
func (s *StatsWorker) AddBuffer(trackID livekit.TrackID, buffer *buffer.Buffer) {
s.upstreamBuffers[trackID] = append(s.upstreamBuffers[trackID], buffer)
}
func (s *StatsWorker) OnDownstreamPacket(trackID string, bytes int) {
func (s *StatsWorker) OnDownstreamPacket(trackID livekit.TrackID, bytes int) {
s.getOrCreateOutgoingStatsIfEmpty(trackID).totalBytes += uint64(bytes)
s.getOrCreateOutgoingStatsIfEmpty(trackID).totalPackets++
}
func (s *StatsWorker) getOrCreateOutgoingStatsIfEmpty(trackID string) *Stats {
func (s *StatsWorker) getOrCreateOutgoingStatsIfEmpty(trackID livekit.TrackID) *Stats {
if s.outgoingPerTrack[trackID] == nil {
s.outgoingPerTrack[trackID] = &Stats{next: &livekit.AnalyticsStat{
Kind: livekit.StreamType_DOWNSTREAM,
@@ -72,7 +72,7 @@ func (s *StatsWorker) getOrCreateOutgoingStatsIfEmpty(trackID string) *Stats {
return s.outgoingPerTrack[trackID]
}
func (s *StatsWorker) getOrCreateIncomingStatsIfEmpty(trackID string) *Stats {
func (s *StatsWorker) getOrCreateIncomingStatsIfEmpty(trackID livekit.TrackID) *Stats {
if s.incomingPerTrack[trackID] == nil {
s.incomingPerTrack[trackID] = &Stats{next: &livekit.AnalyticsStat{
Kind: livekit.StreamType_UPSTREAM,
@@ -84,7 +84,7 @@ func (s *StatsWorker) getOrCreateIncomingStatsIfEmpty(trackID string) *Stats {
return s.incomingPerTrack[trackID]
}
func (s *StatsWorker) OnRTCP(trackID string, direction livekit.StreamType, stats *livekit.AnalyticsStat) {
func (s *StatsWorker) OnRTCP(trackID livekit.TrackID, direction livekit.StreamType, stats *livekit.AnalyticsStat) {
var ds *Stats
if direction == livekit.StreamType_DOWNSTREAM {
ds = s.getOrCreateOutgoingStatsIfEmpty(trackID)
@@ -155,7 +155,7 @@ func (s *StatsWorker) collectUpstreamStats(ts *timestamppb.Timestamp, stats []*l
delete(s.upstreamBuffers, trackID)
delete(s.incomingPerTrack, trackID)
}
s.drainUpstreamBuffers = make(map[string]bool)
s.drainUpstreamBuffers = make(map[livekit.TrackID]bool)
}
return stats
}
@@ -185,7 +185,7 @@ func (s *StatsWorker) update(stats *Stats, ts *timestamppb.Timestamp) *livekit.A
return next
}
func (s *StatsWorker) RemoveBuffer(trackID string) {
func (s *StatsWorker) RemoveBuffer(trackID livekit.TrackID) {
s.drainUpstreamBuffers[trackID] = true
}
+14 -14
View File
@@ -15,19 +15,19 @@ const updateFrequency = time.Second * 10
type TelemetryService interface {
// stats
AddUpTrack(participantID string, trackID string, buff *buffer.Buffer)
OnDownstreamPacket(participantID string, trackID string, bytes int)
HandleRTCP(streamType livekit.StreamType, participantID string, trackID string, pkts []rtcp.Packet)
AddUpTrack(participantID livekit.ParticipantID, trackID livekit.TrackID, buff *buffer.Buffer)
OnDownstreamPacket(participantID livekit.ParticipantID, trackID livekit.TrackID, bytes int)
HandleRTCP(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, pkts []rtcp.Packet)
// events
RoomStarted(ctx context.Context, room *livekit.Room)
RoomEnded(ctx context.Context, room *livekit.Room)
ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo)
ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo)
TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo)
TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32)
TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo)
TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo)
TrackPublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo)
TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32)
TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo)
TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo)
RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo)
RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo)
}
@@ -67,19 +67,19 @@ func (t *telemetryService) run() {
}
}
func (t *telemetryService) AddUpTrack(participantID string, trackID string, buff *buffer.Buffer) {
func (t *telemetryService) AddUpTrack(participantID livekit.ParticipantID, trackID livekit.TrackID, buff *buffer.Buffer) {
t.jobQueue <- func() {
t.internalService.AddUpTrack(participantID, trackID, buff)
}
}
func (t *telemetryService) OnDownstreamPacket(participantID string, trackID string, bytes int) {
func (t *telemetryService) OnDownstreamPacket(participantID livekit.ParticipantID, trackID livekit.TrackID, bytes int) {
t.jobQueue <- func() {
t.internalService.OnDownstreamPacket(participantID, trackID, bytes)
}
}
func (t *telemetryService) HandleRTCP(streamType livekit.StreamType, participantID string, trackID string, pkts []rtcp.Packet) {
func (t *telemetryService) HandleRTCP(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, pkts []rtcp.Packet) {
t.jobQueue <- func() {
t.internalService.HandleRTCP(streamType, participantID, trackID, pkts)
}
@@ -109,25 +109,25 @@ func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Ro
}
}
func (t *telemetryService) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) {
func (t *telemetryService) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.jobQueue <- func() {
t.internalService.TrackPublished(ctx, participantID, track)
}
}
func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) {
func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) {
t.jobQueue <- func() {
t.internalService.TrackUnpublished(ctx, participantID, track, ssrc)
}
}
func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.jobQueue <- func() {
t.internalService.TrackSubscribed(ctx, participantID, track)
}
}
func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.jobQueue <- func() {
t.internalService.TrackUnsubscribed(ctx, participantID, track)
}
+5 -5
View File
@@ -26,7 +26,7 @@ type telemetryServiceInternal struct {
webhookPool *workerpool.WorkerPool
// one worker per participant
workers map[string]*StatsWorker
workers map[livekit.ParticipantID]*StatsWorker
analytics AnalyticsService
}
@@ -35,26 +35,26 @@ func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsS
return &telemetryServiceInternal{
notifier: notifier,
webhookPool: workerpool.New(1),
workers: make(map[string]*StatsWorker),
workers: make(map[livekit.ParticipantID]*StatsWorker),
analytics: analytics,
}
}
func (t *telemetryServiceInternal) AddUpTrack(participantID string, trackID string, buff *buffer.Buffer) {
func (t *telemetryServiceInternal) AddUpTrack(participantID livekit.ParticipantID, trackID livekit.TrackID, buff *buffer.Buffer) {
w := t.workers[participantID]
if w != nil {
w.AddBuffer(trackID, buff)
}
}
func (t *telemetryServiceInternal) OnDownstreamPacket(participantID string, trackID string, bytes int) {
func (t *telemetryServiceInternal) OnDownstreamPacket(participantID livekit.ParticipantID, trackID livekit.TrackID, bytes int) {
w := t.workers[participantID]
if w != nil {
w.OnDownstreamPacket(trackID, bytes)
}
}
func (t *telemetryServiceInternal) HandleRTCP(streamType livekit.StreamType, participantID string, trackID string, pkts []rtcp.Packet) {
func (t *telemetryServiceInternal) HandleRTCP(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, pkts []rtcp.Packet) {
stats := &livekit.AnalyticsStat{}
for _, pkt := range pkts {
switch pkt := pkt.(type) {
@@ -90,7 +90,7 @@ func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *li
})
}
func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) {
func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
prometheus.AddPublishedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
@@ -104,7 +104,7 @@ func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participa
})
}
func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) {
func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, ssrc uint32) {
roomID := ""
roomName := ""
w := t.workers[participantID]
@@ -126,7 +126,7 @@ func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, partici
})
}
func (t *telemetryServiceInternal) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
func (t *telemetryServiceInternal) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
prometheus.AddSubscribedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
@@ -140,7 +140,7 @@ func (t *telemetryServiceInternal) TrackSubscribed(ctx context.Context, particip
})
}
func (t *telemetryServiceInternal) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
func (t *telemetryServiceInternal) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
prometheus.SubSubscribedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
@@ -182,7 +182,7 @@ func (t *telemetryServiceInternal) RecordingEnded(ctx context.Context, ri *livek
})
}
func (t *telemetryServiceInternal) getRoomDetails(participantID string) (string, string) {
func (t *telemetryServiceInternal) getRoomDetails(participantID livekit.ParticipantID) (livekit.RoomID, livekit.RoomName) {
w := t.workers[participantID]
if w != nil {
return w.roomID, w.roomName