From cbb2c61787d2e58172297b832fcada2d2d798cbc Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 18 Dec 2025 10:16:27 +0530 Subject: [PATCH] Publish/Unpublish counter match. (#4173) Published counter was bumped up only when not migrating in, but it was decremented when a migrating participant leaves without expectation to resume. That could have resulted in negative counts. Always change counters irrespective of migration or expected to resume on leave. Control events send based on migration/resume. --- pkg/rtc/participant.go | 31 +++++++++---------- pkg/telemetry/events.go | 4 +++ .../telemetryfakes/fake_telemetry_service.go | 18 ++++++----- pkg/telemetry/telemetryservice.go | 2 +- 4 files changed, 29 insertions(+), 26 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index fcfb22854..7a067ccc3 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -3299,15 +3299,13 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, ti *livekit.TrackInfo) p.supervisor.ClearPublishedTrack(trackID, mt) } - if !isExpectedToResume { - p.params.Telemetry.TrackUnpublished( - context.Background(), - p.ID(), - p.Identity(), - mt.ToProto(), - true, - ) - } + p.params.Telemetry.TrackUnpublished( + context.Background(), + p.ID(), + p.Identity(), + mt.ToProto(), + !isExpectedToResume, + ) p.pendingTracksLock.Lock() if pti := p.pendingTracks[signalCid]; pti != nil { @@ -3336,14 +3334,13 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack, isMigrate // send webhook after callbacks are complete, persistence and state handling happens // in `onTrackPublished` cb - if !isMigrated { - p.params.Telemetry.TrackPublished( - context.Background(), - p.ID(), - p.Identity(), - track.ToProto(), - ) - } + p.params.Telemetry.TrackPublished( + context.Background(), + p.ID(), + p.Identity(), + track.ToProto(), + !isMigrated, + ) p.pendingTracksLock.Lock() delete(p.pendingPublishingTracks, track.ID()) diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index 0000c1286..a6c006450 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -236,10 +236,14 @@ func (t *telemetryService) TrackPublished( participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, + shouldSendEvent bool, ) { t.enqueue(func() { prometheus.AddPublishedTrack(track.Type.String()) prometheus.RecordTrackPublishSuccess(track.Type.String()) + if !shouldSendEvent { + return + } room := t.getRoomDetails(participantID) participant := &livekit.ParticipantInfo{ diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index 5f1267fa9..bc9109db5 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -203,13 +203,14 @@ type FakeTelemetryService struct { arg3 livekit.ParticipantIdentity arg4 *livekit.TrackInfo } - TrackPublishedStub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo) + TrackPublishedStub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, bool) trackPublishedMutex sync.RWMutex trackPublishedArgsForCall []struct { arg1 context.Context arg2 livekit.ParticipantID arg3 livekit.ParticipantIdentity arg4 *livekit.TrackInfo + arg5 bool } TrackPublishedUpdateStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo) trackPublishedUpdateMutex sync.RWMutex @@ -1233,19 +1234,20 @@ func (fake *FakeTelemetryService) TrackPublishRequestedArgsForCall(i int) (conte return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 } -func (fake *FakeTelemetryService) TrackPublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 livekit.ParticipantIdentity, arg4 *livekit.TrackInfo) { +func (fake *FakeTelemetryService) TrackPublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 livekit.ParticipantIdentity, arg4 *livekit.TrackInfo, arg5 bool) { fake.trackPublishedMutex.Lock() fake.trackPublishedArgsForCall = append(fake.trackPublishedArgsForCall, struct { arg1 context.Context arg2 livekit.ParticipantID arg3 livekit.ParticipantIdentity arg4 *livekit.TrackInfo - }{arg1, arg2, arg3, arg4}) + arg5 bool + }{arg1, arg2, arg3, arg4, arg5}) stub := fake.TrackPublishedStub - fake.recordInvocation("TrackPublished", []interface{}{arg1, arg2, arg3, arg4}) + fake.recordInvocation("TrackPublished", []interface{}{arg1, arg2, arg3, arg4, arg5}) fake.trackPublishedMutex.Unlock() if stub != nil { - fake.TrackPublishedStub(arg1, arg2, arg3, arg4) + fake.TrackPublishedStub(arg1, arg2, arg3, arg4, arg5) } } @@ -1255,17 +1257,17 @@ func (fake *FakeTelemetryService) TrackPublishedCallCount() int { return len(fake.trackPublishedArgsForCall) } -func (fake *FakeTelemetryService) TrackPublishedCalls(stub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo)) { +func (fake *FakeTelemetryService) TrackPublishedCalls(stub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, bool)) { fake.trackPublishedMutex.Lock() defer fake.trackPublishedMutex.Unlock() fake.TrackPublishedStub = stub } -func (fake *FakeTelemetryService) TrackPublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo) { +func (fake *FakeTelemetryService) TrackPublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, bool) { fake.trackPublishedMutex.RLock() defer fake.trackPublishedMutex.RUnlock() argsForCall := fake.trackPublishedArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 } func (fake *FakeTelemetryService) TrackPublishedUpdate(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo) { diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index be27643ac..12cfe6ab4 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -47,7 +47,7 @@ type TelemetryService interface { // TrackPublishRequested - a publication attempt has been received TrackPublishRequested(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) // TrackPublished - a publication attempt has been successful - TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) + TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool) // TrackUnpublished - a participant unpublished a track TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool) // TrackSubscribeRequested - a participant requested to subscribe to a track