rtc: report participant kind code and details (#4534)

* rtc: report participant kind code and details

Plumb ParticipantKind and KindDetails through MediaTrack and
BytesTrackStats so track-level reporting can record the numeric kind
code plus details codes on every participant_session aggregation,
alongside the existing Kind string. Also picks up the new kind fields
on resolved BytesSignalStats participants.

Adds deployment/agentID/version to the agent worker logger.
This commit is contained in:
Paul Wells
2026-05-18 23:20:52 -07:00
committed by GitHub
parent 77595d387a
commit 019a6640ae
10 changed files with 184 additions and 22 deletions
+1 -1
View File
@@ -21,7 +21,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/mediatransportutil v0.0.0-20260501135216-8818f1b77e59
github.com/livekit/protocol v1.45.9-0.20260514081508-d0e065ec5133
github.com/livekit/protocol v1.45.9-0.20260519061926-8381f2180c45
github.com/livekit/psrpc v0.7.1
github.com/mackerelio/go-osstat v0.2.7
github.com/magefile/mage v1.17.0
+2 -2
View File
@@ -181,8 +181,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20260501135216-8818f1b77e59 h1:lWRMrb4ReRJu/e/BAp1kpT6fQOjS8WjCxdp0PGjgrBc=
github.com/livekit/mediatransportutil v0.0.0-20260501135216-8818f1b77e59/go.mod h1:RCd46PT+6sEztld6XpkCrG1xskb0u3SqxIjy4G897Ss=
github.com/livekit/protocol v1.45.9-0.20260514081508-d0e065ec5133 h1:b6Eodjgt2IdhKMhenZmGlZxDbRYuR+QEzdkhy7DdGRw=
github.com/livekit/protocol v1.45.9-0.20260514081508-d0e065ec5133/go.mod h1:KEPIJ/ZdMFQ9tmmfv/uT9TjQEuEcZupCZBabuRGEC1k=
github.com/livekit/protocol v1.45.9-0.20260519061926-8381f2180c45 h1:gJQFJNjHuxeKroI6KTtVeXVcUMOaK8ksdiB6FoiDWmE=
github.com/livekit/protocol v1.45.9-0.20260519061926-8381f2180c45/go.mod h1:KEPIJ/ZdMFQ9tmmfv/uT9TjQEuEcZupCZBabuRGEC1k=
github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw=
github.com/livekit/psrpc v0.7.1/go.mod h1:bZ4iHFQptTkbPnB0LasvRNu/OBYXEu1NA6O5BMFo9kk=
github.com/mackerelio/go-osstat v0.2.7 h1:TCavZi10wF49bT6iQZ9eT2keGZQpC69MTDfdJej5e94=
+3
View File
@@ -273,6 +273,9 @@ func NewWorker(
logger: logger.WithValues(
"workerID", registration.ID,
"agentName", registration.AgentName,
"deployment", registration.Deployment,
"agentID", registration.AgentID,
"version", registration.Version,
"jobType", registration.JobType.String(),
),
+5
View File
@@ -79,6 +79,8 @@ type MediaTrackParams struct {
ParticipantIdentity livekit.ParticipantIdentity
ParticipantVersion uint32
ParticipantCountry string
ParticipantKind livekit.ParticipantInfo_Kind
ParticipantKindDetails []livekit.ParticipantInfo_KindDetail
BufferFactory *buffer.Factory
ReceiverConfig ReceiverConfig
SubscriberConfig DirectionConfig
@@ -462,6 +464,9 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track sfu.TrackRe
if cs, ok := telemetry.CondenseStat(stat); ok {
t.params.Reporter.Tx(func(tx roomobs.TrackTx) {
tx.ParticipantSession().ReportKind(t.params.ParticipantKind.String())
tx.ParticipantSession().ReportKindCode(roomobs.ParticipantKindCode(t.params.ParticipantKind))
tx.ParticipantSession().ReportKindDetailsCodes(roomobs.ParticipantKindDetailsCodes(t.params.ParticipantKindDetails))
tx.ReportName(ti.Name)
tx.ReportKind(roomobs.TrackKindPub)
tx.ReportType(roomobs.TrackTypeFromProto(ti.Type))
+28 -17
View File
@@ -382,6 +382,8 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
p.params.Country,
BytesTrackIDForParticipantID(BytesTrackTypeData, p.ID()),
p.ID(),
params.Grants.GetParticipantKind(),
params.Grants.GetKindDetails(),
params.TelemetryListener,
params.Reporter,
)
@@ -412,6 +414,9 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
tx.ReportEndTime(ts)
}
tx.ReportKindCode(roomobs.ParticipantKindCode(p.Kind()))
tx.ReportKindDetailsCodes(roomobs.ParticipantKindDetailsCodes(p.KindDetails()))
millis, secs, mins := p.params.SessionTimer.Advance(ts)
tx.ReportDuration(uint16(millis))
tx.ReportDurationSeconds(uint16(secs))
@@ -516,6 +521,10 @@ func (p *ParticipantImpl) Kind() livekit.ParticipantInfo_Kind {
return p.grants.Load().GetParticipantKind()
}
func (p *ParticipantImpl) KindDetails() []livekit.ParticipantInfo_KindDetail {
return p.grants.Load().GetKindDetails()
}
func (p *ParticipantImpl) IsRecorder() bool {
grants := p.grants.Load()
return grants.GetParticipantKind() == livekit.ParticipantInfo_EGRESS || grants.Video.Recorder
@@ -3301,23 +3310,25 @@ func (p *ParticipantImpl) addMigratedTrack(cid string, ti *livekit.TrackInfo) *M
func (p *ParticipantImpl) addMediaTrack(signalCid string, ti *livekit.TrackInfo) *MediaTrack {
mt := NewMediaTrack(MediaTrackParams{
ParticipantID: p.ID,
ParticipantIdentity: p.params.Identity,
ParticipantVersion: p.version.Load(),
ParticipantCountry: p.params.Country,
BufferFactory: p.params.Config.BufferFactory,
ReceiverConfig: p.params.Config.Receiver,
AudioConfig: p.params.AudioConfig,
VideoConfig: p.params.VideoConfig,
TelemetryListener: p.params.TelemetryListener,
Logger: LoggerWithTrack(p.pubLogger, livekit.TrackID(ti.Sid), false),
Reporter: p.params.Reporter.WithTrack(ti.Sid),
SubscriberConfig: p.params.Config.Subscriber,
PLIThrottleConfig: p.params.PLIThrottleConfig,
SimTracks: p.params.SimTracks,
OnRTCP: p.postRtcp,
ForwardStats: p.params.ForwardStats,
OnTrackEverSubscribed: p.sendTrackHasBeenSubscribed,
ParticipantID: p.ID,
ParticipantIdentity: p.params.Identity,
ParticipantVersion: p.version.Load(),
ParticipantCountry: p.params.Country,
ParticipantKind: p.Kind(),
ParticipantKindDetails: p.KindDetails(),
BufferFactory: p.params.Config.BufferFactory,
ReceiverConfig: p.params.Config.Receiver,
AudioConfig: p.params.AudioConfig,
VideoConfig: p.params.VideoConfig,
TelemetryListener: p.params.TelemetryListener,
Logger: LoggerWithTrack(p.pubLogger, livekit.TrackID(ti.Sid), false),
Reporter: p.params.Reporter.WithTrack(ti.Sid),
SubscriberConfig: p.params.Config.Subscriber,
PLIThrottleConfig: p.params.PLIThrottleConfig,
SimTracks: p.params.SimTracks,
OnRTCP: p.postRtcp,
ForwardStats: p.params.ForwardStats,
OnTrackEverSubscribed: p.sendTrackHasBeenSubscribed,
ShouldRegressCodec: func() bool {
return p.helper().ShouldRegressCodec()
},
+16 -2
View File
@@ -53,6 +53,8 @@ type TrafficTotals struct {
type BytesTrackStats struct {
country string
pID livekit.ParticipantID
kind livekit.ParticipantInfo_Kind
kindDetails []livekit.ParticipantInfo_KindDetail
trackID livekit.TrackID
send, recv atomic.Uint64
sendMessages, recvMessages atomic.Uint32
@@ -67,12 +69,16 @@ func NewBytesTrackStats(
country string,
trackID livekit.TrackID,
pID livekit.ParticipantID,
kind livekit.ParticipantInfo_Kind,
kindDetails []livekit.ParticipantInfo_KindDetail,
telemetryListener types.ParticipantTelemetryListener,
participantReporter roomobs.ParticipantSessionReporter,
) *BytesTrackStats {
s := &BytesTrackStats{
country: country,
pID: pID,
kind: kind,
kindDetails: kindDetails,
trackID: trackID,
telemetryListener: telemetryListener,
reporter: participantReporter.WithTrack(trackID.String()),
@@ -89,6 +95,8 @@ func (s *BytesTrackStats) AddBytes(bytes uint64, isSend bool) {
s.totalSendMessages.Inc()
s.reporter.Tx(func(tx roomobs.TrackTx) {
tx.ParticipantSession().ReportKindCode(roomobs.ParticipantKindCode(s.kind))
tx.ParticipantSession().ReportKindDetailsCodes(roomobs.ParticipantKindDetailsCodes(s.kindDetails))
tx.ReportType(roomobs.TrackTypeData)
tx.ReportSendBytes(uint32(bytes))
tx.ReportSendPackets(1)
@@ -100,6 +108,8 @@ func (s *BytesTrackStats) AddBytes(bytes uint64, isSend bool) {
s.totalRecvMessages.Inc()
s.reporter.Tx(func(tx roomobs.TrackTx) {
tx.ParticipantSession().ReportKindCode(roomobs.ParticipantKindCode(s.kind))
tx.ParticipantSession().ReportKindDetailsCodes(roomobs.ParticipantKindDetailsCodes(s.kindDetails))
tx.ReportType(roomobs.TrackTypeData)
tx.ReportRecvBytes(uint32(bytes))
tx.ReportRecvPackets(1)
@@ -230,9 +240,13 @@ func (s *BytesSignalStats) ResolveParticipant(pi *livekit.ParticipantInfo) {
defer s.mu.Unlock()
if s.pi == nil && pi != nil {
s.pi = &livekit.ParticipantInfo{
Sid: pi.Sid,
Identity: pi.Identity,
Sid: pi.Sid,
Identity: pi.Identity,
Kind: pi.Kind,
KindDetails: pi.KindDetails,
}
s.kind = pi.Kind
s.kindDetails = pi.KindDetails
s.maybeStart()
}
}
+2
View File
@@ -413,6 +413,8 @@ func (t *SubscribedTrack) OnStatsUpdate(stat *livekit.AnalyticsStat) {
if cs, ok := telemetry.CondenseStat(stat); ok {
ti := t.params.WrappedReceiver.TrackInfo()
t.reporter.Tx(func(tx roomobs.TrackTx) {
tx.ParticipantSession().ReportKindCode(roomobs.ParticipantKindCode(t.params.Subscriber.Kind()))
tx.ParticipantSession().ReportKindDetailsCodes(roomobs.ParticipantKindDetailsCodes(t.params.Subscriber.KindDetails()))
tx.ReportName(ti.Name)
tx.ReportKind(roomobs.TrackKindSub)
tx.ReportType(roomobs.TrackTypeFromProto(ti.Type))
+1
View File
@@ -288,6 +288,7 @@ type Participant interface {
ConnectedAt() time.Time
CloseReason() ParticipantCloseReason
Kind() livekit.ParticipantInfo_Kind
KindDetails() []livekit.ParticipantInfo_KindDetail
IsRecorder() bool
IsDependent() bool
IsAgent() bool
@@ -913,6 +913,16 @@ type FakeLocalParticipant struct {
kindReturnsOnCall map[int]struct {
result1 livekit.ParticipantInfo_Kind
}
KindDetailsStub func() []livekit.ParticipantInfo_KindDetail
kindDetailsMutex sync.RWMutex
kindDetailsArgsForCall []struct {
}
kindDetailsReturns struct {
result1 []livekit.ParticipantInfo_KindDetail
}
kindDetailsReturnsOnCall map[int]struct {
result1 []livekit.ParticipantInfo_KindDetail
}
MaybeStartMigrationStub func(bool, func()) bool
maybeStartMigrationMutex sync.RWMutex
maybeStartMigrationArgsForCall []struct {
@@ -6243,6 +6253,59 @@ func (fake *FakeLocalParticipant) KindReturnsOnCall(i int, result1 livekit.Parti
}{result1}
}
func (fake *FakeLocalParticipant) KindDetails() []livekit.ParticipantInfo_KindDetail {
fake.kindDetailsMutex.Lock()
ret, specificReturn := fake.kindDetailsReturnsOnCall[len(fake.kindDetailsArgsForCall)]
fake.kindDetailsArgsForCall = append(fake.kindDetailsArgsForCall, struct {
}{})
stub := fake.KindDetailsStub
fakeReturns := fake.kindDetailsReturns
fake.recordInvocation("KindDetails", []interface{}{})
fake.kindDetailsMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) KindDetailsCallCount() int {
fake.kindDetailsMutex.RLock()
defer fake.kindDetailsMutex.RUnlock()
return len(fake.kindDetailsArgsForCall)
}
func (fake *FakeLocalParticipant) KindDetailsCalls(stub func() []livekit.ParticipantInfo_KindDetail) {
fake.kindDetailsMutex.Lock()
defer fake.kindDetailsMutex.Unlock()
fake.KindDetailsStub = stub
}
func (fake *FakeLocalParticipant) KindDetailsReturns(result1 []livekit.ParticipantInfo_KindDetail) {
fake.kindDetailsMutex.Lock()
defer fake.kindDetailsMutex.Unlock()
fake.KindDetailsStub = nil
fake.kindDetailsReturns = struct {
result1 []livekit.ParticipantInfo_KindDetail
}{result1}
}
func (fake *FakeLocalParticipant) KindDetailsReturnsOnCall(i int, result1 []livekit.ParticipantInfo_KindDetail) {
fake.kindDetailsMutex.Lock()
defer fake.kindDetailsMutex.Unlock()
fake.KindDetailsStub = nil
if fake.kindDetailsReturnsOnCall == nil {
fake.kindDetailsReturnsOnCall = make(map[int]struct {
result1 []livekit.ParticipantInfo_KindDetail
})
}
fake.kindDetailsReturnsOnCall[i] = struct {
result1 []livekit.ParticipantInfo_KindDetail
}{result1}
}
func (fake *FakeLocalParticipant) MaybeStartMigration(arg1 bool, arg2 func()) bool {
fake.maybeStartMigrationMutex.Lock()
ret, specificReturn := fake.maybeStartMigrationReturnsOnCall[len(fake.maybeStartMigrationArgsForCall)]
@@ -259,6 +259,16 @@ type FakeParticipant struct {
kindReturnsOnCall map[int]struct {
result1 livekit.ParticipantInfo_Kind
}
KindDetailsStub func() []livekit.ParticipantInfo_KindDetail
kindDetailsMutex sync.RWMutex
kindDetailsArgsForCall []struct {
}
kindDetailsReturns struct {
result1 []livekit.ParticipantInfo_KindDetail
}
kindDetailsReturnsOnCall map[int]struct {
result1 []livekit.ParticipantInfo_KindDetail
}
MigrateStateStub func() types.MigrateState
migrateStateMutex sync.RWMutex
migrateStateArgsForCall []struct {
@@ -1647,6 +1657,59 @@ func (fake *FakeParticipant) KindReturnsOnCall(i int, result1 livekit.Participan
}{result1}
}
func (fake *FakeParticipant) KindDetails() []livekit.ParticipantInfo_KindDetail {
fake.kindDetailsMutex.Lock()
ret, specificReturn := fake.kindDetailsReturnsOnCall[len(fake.kindDetailsArgsForCall)]
fake.kindDetailsArgsForCall = append(fake.kindDetailsArgsForCall, struct {
}{})
stub := fake.KindDetailsStub
fakeReturns := fake.kindDetailsReturns
fake.recordInvocation("KindDetails", []interface{}{})
fake.kindDetailsMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeParticipant) KindDetailsCallCount() int {
fake.kindDetailsMutex.RLock()
defer fake.kindDetailsMutex.RUnlock()
return len(fake.kindDetailsArgsForCall)
}
func (fake *FakeParticipant) KindDetailsCalls(stub func() []livekit.ParticipantInfo_KindDetail) {
fake.kindDetailsMutex.Lock()
defer fake.kindDetailsMutex.Unlock()
fake.KindDetailsStub = stub
}
func (fake *FakeParticipant) KindDetailsReturns(result1 []livekit.ParticipantInfo_KindDetail) {
fake.kindDetailsMutex.Lock()
defer fake.kindDetailsMutex.Unlock()
fake.KindDetailsStub = nil
fake.kindDetailsReturns = struct {
result1 []livekit.ParticipantInfo_KindDetail
}{result1}
}
func (fake *FakeParticipant) KindDetailsReturnsOnCall(i int, result1 []livekit.ParticipantInfo_KindDetail) {
fake.kindDetailsMutex.Lock()
defer fake.kindDetailsMutex.Unlock()
fake.KindDetailsStub = nil
if fake.kindDetailsReturnsOnCall == nil {
fake.kindDetailsReturnsOnCall = make(map[int]struct {
result1 []livekit.ParticipantInfo_KindDetail
})
}
fake.kindDetailsReturnsOnCall[i] = struct {
result1 []livekit.ParticipantInfo_KindDetail
}{result1}
}
func (fake *FakeParticipant) MigrateState() types.MigrateState {
fake.migrateStateMutex.Lock()
ret, specificReturn := fake.migrateStateReturnsOnCall[len(fake.migrateStateArgsForCall)]