diff --git a/go.mod b/go.mod index d5c5e9da4..2474cd9d8 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/mediatransportutil v0.0.0-20260501135216-8818f1b77e59 - github.com/livekit/protocol v1.45.9-0.20260514081508-d0e065ec5133 + github.com/livekit/protocol v1.45.9-0.20260519061926-8381f2180c45 github.com/livekit/psrpc v0.7.1 github.com/mackerelio/go-osstat v0.2.7 github.com/magefile/mage v1.17.0 diff --git a/go.sum b/go.sum index 4b081d9ef..637e7e97e 100644 --- a/go.sum +++ b/go.sum @@ -181,8 +181,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20260501135216-8818f1b77e59 h1:lWRMrb4ReRJu/e/BAp1kpT6fQOjS8WjCxdp0PGjgrBc= github.com/livekit/mediatransportutil v0.0.0-20260501135216-8818f1b77e59/go.mod h1:RCd46PT+6sEztld6XpkCrG1xskb0u3SqxIjy4G897Ss= -github.com/livekit/protocol v1.45.9-0.20260514081508-d0e065ec5133 h1:b6Eodjgt2IdhKMhenZmGlZxDbRYuR+QEzdkhy7DdGRw= -github.com/livekit/protocol v1.45.9-0.20260514081508-d0e065ec5133/go.mod h1:KEPIJ/ZdMFQ9tmmfv/uT9TjQEuEcZupCZBabuRGEC1k= +github.com/livekit/protocol v1.45.9-0.20260519061926-8381f2180c45 h1:gJQFJNjHuxeKroI6KTtVeXVcUMOaK8ksdiB6FoiDWmE= +github.com/livekit/protocol v1.45.9-0.20260519061926-8381f2180c45/go.mod h1:KEPIJ/ZdMFQ9tmmfv/uT9TjQEuEcZupCZBabuRGEC1k= github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw= github.com/livekit/psrpc v0.7.1/go.mod h1:bZ4iHFQptTkbPnB0LasvRNu/OBYXEu1NA6O5BMFo9kk= github.com/mackerelio/go-osstat v0.2.7 h1:TCavZi10wF49bT6iQZ9eT2keGZQpC69MTDfdJej5e94= diff --git a/pkg/agent/worker.go b/pkg/agent/worker.go index a603b16b1..b93c33437 100644 --- a/pkg/agent/worker.go +++ b/pkg/agent/worker.go @@ -273,6 +273,9 @@ func NewWorker( logger: logger.WithValues( "workerID", registration.ID, "agentName", registration.AgentName, + "deployment", registration.Deployment, + "agentID", registration.AgentID, + "version", registration.Version, "jobType", registration.JobType.String(), ), diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index f938f364c..fc59d7edd 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -79,6 +79,8 @@ type MediaTrackParams struct { ParticipantIdentity livekit.ParticipantIdentity ParticipantVersion uint32 ParticipantCountry string + ParticipantKind livekit.ParticipantInfo_Kind + ParticipantKindDetails []livekit.ParticipantInfo_KindDetail BufferFactory *buffer.Factory ReceiverConfig ReceiverConfig SubscriberConfig DirectionConfig @@ -462,6 +464,9 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track sfu.TrackRe if cs, ok := telemetry.CondenseStat(stat); ok { t.params.Reporter.Tx(func(tx roomobs.TrackTx) { + tx.ParticipantSession().ReportKind(t.params.ParticipantKind.String()) + tx.ParticipantSession().ReportKindCode(roomobs.ParticipantKindCode(t.params.ParticipantKind)) + tx.ParticipantSession().ReportKindDetailsCodes(roomobs.ParticipantKindDetailsCodes(t.params.ParticipantKindDetails)) tx.ReportName(ti.Name) tx.ReportKind(roomobs.TrackKindPub) tx.ReportType(roomobs.TrackTypeFromProto(ti.Type)) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 3af2b8edc..a6918a365 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -382,6 +382,8 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { p.params.Country, BytesTrackIDForParticipantID(BytesTrackTypeData, p.ID()), p.ID(), + params.Grants.GetParticipantKind(), + params.Grants.GetKindDetails(), params.TelemetryListener, params.Reporter, ) @@ -412,6 +414,9 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { tx.ReportEndTime(ts) } + tx.ReportKindCode(roomobs.ParticipantKindCode(p.Kind())) + tx.ReportKindDetailsCodes(roomobs.ParticipantKindDetailsCodes(p.KindDetails())) + millis, secs, mins := p.params.SessionTimer.Advance(ts) tx.ReportDuration(uint16(millis)) tx.ReportDurationSeconds(uint16(secs)) @@ -516,6 +521,10 @@ func (p *ParticipantImpl) Kind() livekit.ParticipantInfo_Kind { return p.grants.Load().GetParticipantKind() } +func (p *ParticipantImpl) KindDetails() []livekit.ParticipantInfo_KindDetail { + return p.grants.Load().GetKindDetails() +} + func (p *ParticipantImpl) IsRecorder() bool { grants := p.grants.Load() return grants.GetParticipantKind() == livekit.ParticipantInfo_EGRESS || grants.Video.Recorder @@ -3301,23 +3310,25 @@ func (p *ParticipantImpl) addMigratedTrack(cid string, ti *livekit.TrackInfo) *M func (p *ParticipantImpl) addMediaTrack(signalCid string, ti *livekit.TrackInfo) *MediaTrack { mt := NewMediaTrack(MediaTrackParams{ - ParticipantID: p.ID, - ParticipantIdentity: p.params.Identity, - ParticipantVersion: p.version.Load(), - ParticipantCountry: p.params.Country, - BufferFactory: p.params.Config.BufferFactory, - ReceiverConfig: p.params.Config.Receiver, - AudioConfig: p.params.AudioConfig, - VideoConfig: p.params.VideoConfig, - TelemetryListener: p.params.TelemetryListener, - Logger: LoggerWithTrack(p.pubLogger, livekit.TrackID(ti.Sid), false), - Reporter: p.params.Reporter.WithTrack(ti.Sid), - SubscriberConfig: p.params.Config.Subscriber, - PLIThrottleConfig: p.params.PLIThrottleConfig, - SimTracks: p.params.SimTracks, - OnRTCP: p.postRtcp, - ForwardStats: p.params.ForwardStats, - OnTrackEverSubscribed: p.sendTrackHasBeenSubscribed, + ParticipantID: p.ID, + ParticipantIdentity: p.params.Identity, + ParticipantVersion: p.version.Load(), + ParticipantCountry: p.params.Country, + ParticipantKind: p.Kind(), + ParticipantKindDetails: p.KindDetails(), + BufferFactory: p.params.Config.BufferFactory, + ReceiverConfig: p.params.Config.Receiver, + AudioConfig: p.params.AudioConfig, + VideoConfig: p.params.VideoConfig, + TelemetryListener: p.params.TelemetryListener, + Logger: LoggerWithTrack(p.pubLogger, livekit.TrackID(ti.Sid), false), + Reporter: p.params.Reporter.WithTrack(ti.Sid), + SubscriberConfig: p.params.Config.Subscriber, + PLIThrottleConfig: p.params.PLIThrottleConfig, + SimTracks: p.params.SimTracks, + OnRTCP: p.postRtcp, + ForwardStats: p.params.ForwardStats, + OnTrackEverSubscribed: p.sendTrackHasBeenSubscribed, ShouldRegressCodec: func() bool { return p.helper().ShouldRegressCodec() }, diff --git a/pkg/rtc/signalanddatastats.go b/pkg/rtc/signalanddatastats.go index 17de369eb..fe83bd9f4 100644 --- a/pkg/rtc/signalanddatastats.go +++ b/pkg/rtc/signalanddatastats.go @@ -53,6 +53,8 @@ type TrafficTotals struct { type BytesTrackStats struct { country string pID livekit.ParticipantID + kind livekit.ParticipantInfo_Kind + kindDetails []livekit.ParticipantInfo_KindDetail trackID livekit.TrackID send, recv atomic.Uint64 sendMessages, recvMessages atomic.Uint32 @@ -67,12 +69,16 @@ func NewBytesTrackStats( country string, trackID livekit.TrackID, pID livekit.ParticipantID, + kind livekit.ParticipantInfo_Kind, + kindDetails []livekit.ParticipantInfo_KindDetail, telemetryListener types.ParticipantTelemetryListener, participantReporter roomobs.ParticipantSessionReporter, ) *BytesTrackStats { s := &BytesTrackStats{ country: country, pID: pID, + kind: kind, + kindDetails: kindDetails, trackID: trackID, telemetryListener: telemetryListener, reporter: participantReporter.WithTrack(trackID.String()), @@ -89,6 +95,8 @@ func (s *BytesTrackStats) AddBytes(bytes uint64, isSend bool) { s.totalSendMessages.Inc() s.reporter.Tx(func(tx roomobs.TrackTx) { + tx.ParticipantSession().ReportKindCode(roomobs.ParticipantKindCode(s.kind)) + tx.ParticipantSession().ReportKindDetailsCodes(roomobs.ParticipantKindDetailsCodes(s.kindDetails)) tx.ReportType(roomobs.TrackTypeData) tx.ReportSendBytes(uint32(bytes)) tx.ReportSendPackets(1) @@ -100,6 +108,8 @@ func (s *BytesTrackStats) AddBytes(bytes uint64, isSend bool) { s.totalRecvMessages.Inc() s.reporter.Tx(func(tx roomobs.TrackTx) { + tx.ParticipantSession().ReportKindCode(roomobs.ParticipantKindCode(s.kind)) + tx.ParticipantSession().ReportKindDetailsCodes(roomobs.ParticipantKindDetailsCodes(s.kindDetails)) tx.ReportType(roomobs.TrackTypeData) tx.ReportRecvBytes(uint32(bytes)) tx.ReportRecvPackets(1) @@ -230,9 +240,13 @@ func (s *BytesSignalStats) ResolveParticipant(pi *livekit.ParticipantInfo) { defer s.mu.Unlock() if s.pi == nil && pi != nil { s.pi = &livekit.ParticipantInfo{ - Sid: pi.Sid, - Identity: pi.Identity, + Sid: pi.Sid, + Identity: pi.Identity, + Kind: pi.Kind, + KindDetails: pi.KindDetails, } + s.kind = pi.Kind + s.kindDetails = pi.KindDetails s.maybeStart() } } diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index cb215cfcc..3a8a06148 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -413,6 +413,8 @@ func (t *SubscribedTrack) OnStatsUpdate(stat *livekit.AnalyticsStat) { if cs, ok := telemetry.CondenseStat(stat); ok { ti := t.params.WrappedReceiver.TrackInfo() t.reporter.Tx(func(tx roomobs.TrackTx) { + tx.ParticipantSession().ReportKindCode(roomobs.ParticipantKindCode(t.params.Subscriber.Kind())) + tx.ParticipantSession().ReportKindDetailsCodes(roomobs.ParticipantKindDetailsCodes(t.params.Subscriber.KindDetails())) tx.ReportName(ti.Name) tx.ReportKind(roomobs.TrackKindSub) tx.ReportType(roomobs.TrackTypeFromProto(ti.Type)) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 7da652eef..4eba9b63a 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -288,6 +288,7 @@ type Participant interface { ConnectedAt() time.Time CloseReason() ParticipantCloseReason Kind() livekit.ParticipantInfo_Kind + KindDetails() []livekit.ParticipantInfo_KindDetail IsRecorder() bool IsDependent() bool IsAgent() bool diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 35e473f04..96686a0d1 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -913,6 +913,16 @@ type FakeLocalParticipant struct { kindReturnsOnCall map[int]struct { result1 livekit.ParticipantInfo_Kind } + KindDetailsStub func() []livekit.ParticipantInfo_KindDetail + kindDetailsMutex sync.RWMutex + kindDetailsArgsForCall []struct { + } + kindDetailsReturns struct { + result1 []livekit.ParticipantInfo_KindDetail + } + kindDetailsReturnsOnCall map[int]struct { + result1 []livekit.ParticipantInfo_KindDetail + } MaybeStartMigrationStub func(bool, func()) bool maybeStartMigrationMutex sync.RWMutex maybeStartMigrationArgsForCall []struct { @@ -6243,6 +6253,59 @@ func (fake *FakeLocalParticipant) KindReturnsOnCall(i int, result1 livekit.Parti }{result1} } +func (fake *FakeLocalParticipant) KindDetails() []livekit.ParticipantInfo_KindDetail { + fake.kindDetailsMutex.Lock() + ret, specificReturn := fake.kindDetailsReturnsOnCall[len(fake.kindDetailsArgsForCall)] + fake.kindDetailsArgsForCall = append(fake.kindDetailsArgsForCall, struct { + }{}) + stub := fake.KindDetailsStub + fakeReturns := fake.kindDetailsReturns + fake.recordInvocation("KindDetails", []interface{}{}) + fake.kindDetailsMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) KindDetailsCallCount() int { + fake.kindDetailsMutex.RLock() + defer fake.kindDetailsMutex.RUnlock() + return len(fake.kindDetailsArgsForCall) +} + +func (fake *FakeLocalParticipant) KindDetailsCalls(stub func() []livekit.ParticipantInfo_KindDetail) { + fake.kindDetailsMutex.Lock() + defer fake.kindDetailsMutex.Unlock() + fake.KindDetailsStub = stub +} + +func (fake *FakeLocalParticipant) KindDetailsReturns(result1 []livekit.ParticipantInfo_KindDetail) { + fake.kindDetailsMutex.Lock() + defer fake.kindDetailsMutex.Unlock() + fake.KindDetailsStub = nil + fake.kindDetailsReturns = struct { + result1 []livekit.ParticipantInfo_KindDetail + }{result1} +} + +func (fake *FakeLocalParticipant) KindDetailsReturnsOnCall(i int, result1 []livekit.ParticipantInfo_KindDetail) { + fake.kindDetailsMutex.Lock() + defer fake.kindDetailsMutex.Unlock() + fake.KindDetailsStub = nil + if fake.kindDetailsReturnsOnCall == nil { + fake.kindDetailsReturnsOnCall = make(map[int]struct { + result1 []livekit.ParticipantInfo_KindDetail + }) + } + fake.kindDetailsReturnsOnCall[i] = struct { + result1 []livekit.ParticipantInfo_KindDetail + }{result1} +} + func (fake *FakeLocalParticipant) MaybeStartMigration(arg1 bool, arg2 func()) bool { fake.maybeStartMigrationMutex.Lock() ret, specificReturn := fake.maybeStartMigrationReturnsOnCall[len(fake.maybeStartMigrationArgsForCall)] diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index ed38762aa..19498c9c3 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -259,6 +259,16 @@ type FakeParticipant struct { kindReturnsOnCall map[int]struct { result1 livekit.ParticipantInfo_Kind } + KindDetailsStub func() []livekit.ParticipantInfo_KindDetail + kindDetailsMutex sync.RWMutex + kindDetailsArgsForCall []struct { + } + kindDetailsReturns struct { + result1 []livekit.ParticipantInfo_KindDetail + } + kindDetailsReturnsOnCall map[int]struct { + result1 []livekit.ParticipantInfo_KindDetail + } MigrateStateStub func() types.MigrateState migrateStateMutex sync.RWMutex migrateStateArgsForCall []struct { @@ -1647,6 +1657,59 @@ func (fake *FakeParticipant) KindReturnsOnCall(i int, result1 livekit.Participan }{result1} } +func (fake *FakeParticipant) KindDetails() []livekit.ParticipantInfo_KindDetail { + fake.kindDetailsMutex.Lock() + ret, specificReturn := fake.kindDetailsReturnsOnCall[len(fake.kindDetailsArgsForCall)] + fake.kindDetailsArgsForCall = append(fake.kindDetailsArgsForCall, struct { + }{}) + stub := fake.KindDetailsStub + fakeReturns := fake.kindDetailsReturns + fake.recordInvocation("KindDetails", []interface{}{}) + fake.kindDetailsMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) KindDetailsCallCount() int { + fake.kindDetailsMutex.RLock() + defer fake.kindDetailsMutex.RUnlock() + return len(fake.kindDetailsArgsForCall) +} + +func (fake *FakeParticipant) KindDetailsCalls(stub func() []livekit.ParticipantInfo_KindDetail) { + fake.kindDetailsMutex.Lock() + defer fake.kindDetailsMutex.Unlock() + fake.KindDetailsStub = stub +} + +func (fake *FakeParticipant) KindDetailsReturns(result1 []livekit.ParticipantInfo_KindDetail) { + fake.kindDetailsMutex.Lock() + defer fake.kindDetailsMutex.Unlock() + fake.KindDetailsStub = nil + fake.kindDetailsReturns = struct { + result1 []livekit.ParticipantInfo_KindDetail + }{result1} +} + +func (fake *FakeParticipant) KindDetailsReturnsOnCall(i int, result1 []livekit.ParticipantInfo_KindDetail) { + fake.kindDetailsMutex.Lock() + defer fake.kindDetailsMutex.Unlock() + fake.KindDetailsStub = nil + if fake.kindDetailsReturnsOnCall == nil { + fake.kindDetailsReturnsOnCall = make(map[int]struct { + result1 []livekit.ParticipantInfo_KindDetail + }) + } + fake.kindDetailsReturnsOnCall[i] = struct { + result1 []livekit.ParticipantInfo_KindDetail + }{result1} +} + func (fake *FakeParticipant) MigrateState() types.MigrateState { fake.migrateStateMutex.Lock() ret, specificReturn := fake.migrateStateReturnsOnCall[len(fake.migrateStateArgsForCall)]