diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index fcfb22854..7a067ccc3 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -3299,15 +3299,13 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, ti *livekit.TrackInfo) p.supervisor.ClearPublishedTrack(trackID, mt) } - if !isExpectedToResume { - p.params.Telemetry.TrackUnpublished( - context.Background(), - p.ID(), - p.Identity(), - mt.ToProto(), - true, - ) - } + p.params.Telemetry.TrackUnpublished( + context.Background(), + p.ID(), + p.Identity(), + mt.ToProto(), + !isExpectedToResume, + ) p.pendingTracksLock.Lock() if pti := p.pendingTracks[signalCid]; pti != nil { @@ -3336,14 +3334,13 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack, isMigrate // send webhook after callbacks are complete, persistence and state handling happens // in `onTrackPublished` cb - if !isMigrated { - p.params.Telemetry.TrackPublished( - context.Background(), - p.ID(), - p.Identity(), - track.ToProto(), - ) - } + p.params.Telemetry.TrackPublished( + context.Background(), + p.ID(), + p.Identity(), + track.ToProto(), + !isMigrated, + ) p.pendingTracksLock.Lock() delete(p.pendingPublishingTracks, track.ID()) diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index 0000c1286..a6c006450 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -236,10 +236,14 @@ func (t *telemetryService) TrackPublished( participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, + shouldSendEvent bool, ) { t.enqueue(func() { prometheus.AddPublishedTrack(track.Type.String()) prometheus.RecordTrackPublishSuccess(track.Type.String()) + if !shouldSendEvent { + return + } room := t.getRoomDetails(participantID) participant := &livekit.ParticipantInfo{ diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index 5f1267fa9..bc9109db5 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -203,13 +203,14 @@ type FakeTelemetryService struct { arg3 livekit.ParticipantIdentity arg4 *livekit.TrackInfo } - TrackPublishedStub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo) + TrackPublishedStub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, bool) trackPublishedMutex sync.RWMutex trackPublishedArgsForCall []struct { arg1 context.Context arg2 livekit.ParticipantID arg3 livekit.ParticipantIdentity arg4 *livekit.TrackInfo + arg5 bool } TrackPublishedUpdateStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo) trackPublishedUpdateMutex sync.RWMutex @@ -1233,19 +1234,20 @@ func (fake *FakeTelemetryService) TrackPublishRequestedArgsForCall(i int) (conte return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 } -func (fake *FakeTelemetryService) TrackPublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 livekit.ParticipantIdentity, arg4 *livekit.TrackInfo) { +func (fake *FakeTelemetryService) TrackPublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 livekit.ParticipantIdentity, arg4 *livekit.TrackInfo, arg5 bool) { fake.trackPublishedMutex.Lock() fake.trackPublishedArgsForCall = append(fake.trackPublishedArgsForCall, struct { arg1 context.Context arg2 livekit.ParticipantID arg3 livekit.ParticipantIdentity arg4 *livekit.TrackInfo - }{arg1, arg2, arg3, arg4}) + arg5 bool + }{arg1, arg2, arg3, arg4, arg5}) stub := fake.TrackPublishedStub - fake.recordInvocation("TrackPublished", []interface{}{arg1, arg2, arg3, arg4}) + fake.recordInvocation("TrackPublished", []interface{}{arg1, arg2, arg3, arg4, arg5}) fake.trackPublishedMutex.Unlock() if stub != nil { - fake.TrackPublishedStub(arg1, arg2, arg3, arg4) + fake.TrackPublishedStub(arg1, arg2, arg3, arg4, arg5) } } @@ -1255,17 +1257,17 @@ func (fake *FakeTelemetryService) TrackPublishedCallCount() int { return len(fake.trackPublishedArgsForCall) } -func (fake *FakeTelemetryService) TrackPublishedCalls(stub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo)) { +func (fake *FakeTelemetryService) TrackPublishedCalls(stub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, bool)) { fake.trackPublishedMutex.Lock() defer fake.trackPublishedMutex.Unlock() fake.TrackPublishedStub = stub } -func (fake *FakeTelemetryService) TrackPublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo) { +func (fake *FakeTelemetryService) TrackPublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, bool) { fake.trackPublishedMutex.RLock() defer fake.trackPublishedMutex.RUnlock() argsForCall := fake.trackPublishedArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 } func (fake *FakeTelemetryService) TrackPublishedUpdate(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo) { diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index be27643ac..12cfe6ab4 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -47,7 +47,7 @@ type TelemetryService interface { // TrackPublishRequested - a publication attempt has been received TrackPublishRequested(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) // TrackPublished - a publication attempt has been successful - TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) + TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool) // TrackUnpublished - a participant unpublished a track TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool) // TrackSubscribeRequested - a participant requested to subscribe to a track