diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 1655e1d0a..3286b6db7 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -345,7 +345,7 @@ func (r *RoomManager) StartSession( persistRoomForParticipantCount(room.ToProto()) clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region, Node: r.currentNode.Id} - r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta) + r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta, true) participant.OnClose(func(p types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) { if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil { pLogger.Errorw("could not delete participant", err) diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index cfd9d2f80..e68dfb3c1 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -65,6 +65,7 @@ func (t *telemetryService) ParticipantJoined( participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta, + shouldSendEvent bool, ) { t.enqueue(func() { prometheus.IncrementParticipantJoin(1) @@ -78,16 +79,18 @@ func (t *telemetryService) ParticipantJoined( livekit.ParticipantIdentity(participant.Identity), ) - t.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED, - Timestamp: timestamppb.Now(), - RoomId: room.Sid, - ParticipantId: participant.Sid, - Participant: participant, - Room: room, - ClientInfo: clientInfo, - ClientMeta: clientMeta, - }) + if shouldSendEvent { + t.SendEvent(ctx, &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED, + Timestamp: timestamppb.Now(), + RoomId: room.Sid, + ParticipantId: participant.Sid, + Participant: participant, + Room: room, + ClientInfo: clientInfo, + ClientMeta: clientMeta, + }) + } }) } @@ -128,7 +131,11 @@ func (t *telemetryService) ParticipantActive( }) } -func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, shouldSendEvent bool) { +func (t *telemetryService) ParticipantLeft(ctx context.Context, + room *livekit.Room, + participant *livekit.ParticipantInfo, + shouldSendEvent bool, +) { t.enqueue(func() { isConnected := false if worker, ok := t.getWorker(livekit.ParticipantID(participant.Sid)); ok { diff --git a/pkg/telemetry/events_test.go b/pkg/telemetry/events_test.go index 4db264c3d..55701ba1e 100644 --- a/pkg/telemetry/events_test.go +++ b/pkg/telemetry/events_test.go @@ -34,7 +34,7 @@ func Test_OnParticipantJoin_EventIsSent(t *testing.T) { participantInfo := &livekit.ParticipantInfo{Sid: partSID} // do - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta, true) time.Sleep(time.Millisecond * 500) // test @@ -146,7 +146,7 @@ func Test_OnParticipantActive_EventIsSent(t *testing.T) { participantInfo := &livekit.ParticipantInfo{Sid: partSID} // do - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta, true) time.Sleep(time.Millisecond * 500) // test @@ -198,7 +198,7 @@ func Test_OnTrackSubscribed_EventIsSent(t *testing.T) { participantInfo := &livekit.ParticipantInfo{Sid: partSID} // do - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta, true) time.Sleep(time.Millisecond * 500) // test diff --git a/pkg/telemetry/stats_test.go b/pkg/telemetry/stats_test.go index 0d5098aac..fc224a8d4 100644 --- a/pkg/telemetry/stats_test.go +++ b/pkg/telemetry/stats_test.go @@ -38,7 +38,7 @@ func Test_ParticipantAndRoomDataAreSentWithAnalytics(t *testing.T) { partSID := livekit.ParticipantID("part1") clientInfo := &livekit.ClientInfo{Sdk: 2} participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)} - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil, true) // do packet := 33 @@ -66,7 +66,7 @@ func Test_OnDownstreamPackets(t *testing.T) { partSID := livekit.ParticipantID("part1") clientInfo := &livekit.ClientInfo{Sdk: 2} participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)} - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil, true) // do packets := []int{33, 23} @@ -99,7 +99,7 @@ func Test_OnDownstreamPackets_SeveralTracks(t *testing.T) { partSID := livekit.ParticipantID("part1") clientInfo := &livekit.ClientInfo{Sdk: 2} participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)} - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil, true) // do packet1 := 33 @@ -144,7 +144,7 @@ func Test_OnDownStreamStat(t *testing.T) { room := &livekit.Room{} partSID := livekit.ParticipantID("part1") participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)} - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true) // do stat1 := &livekit.AnalyticsStat{ @@ -203,7 +203,7 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) { room := &livekit.Room{} partSID := livekit.ParticipantID("part1") participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)} - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true) // do trackID := livekit.TrackID("trackID1") @@ -255,7 +255,7 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) { room := &livekit.Room{} partSID := livekit.ParticipantID("part1") participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)} - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true) // do trackID1 := livekit.TrackID("trackID1") @@ -324,7 +324,7 @@ func Test_OnUpstreamStat(t *testing.T) { room := &livekit.Room{} partSID := livekit.ParticipantID("part1") participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)} - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true) // do stat1 := &livekit.AnalyticsStat{ @@ -386,7 +386,7 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) { partSID := livekit.ParticipantID("part1") identity := livekit.ParticipantIdentity("part1Identity") participantInfo := &livekit.ParticipantInfo{Sid: string(partSID), Identity: string(identity)} - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true) // there should be bytes reported so that stats are sent totalBytes := 1 @@ -472,7 +472,7 @@ func Test_AnalyticsSentWhenParticipantLeaves(t *testing.T) { room := &livekit.Room{} partSID := "part1" participantInfo := &livekit.ParticipantInfo{Sid: partSID} - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true) // do fixture.sut.ParticipantLeft(context.Background(), room, participantInfo, true) @@ -489,7 +489,7 @@ func Test_AddUpTrack(t *testing.T) { room := &livekit.Room{} partSID := livekit.ParticipantID("part1") participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)} - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true) // do var totalBytes uint64 = 3 @@ -526,7 +526,7 @@ func Test_AddUpTrack_SeveralBuffers_Simulcast(t *testing.T) { room := &livekit.Room{} partSID := livekit.ParticipantID("part1") participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)} - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true) // do trackID := livekit.TrackID("trackID") @@ -565,7 +565,7 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) { room := &livekit.Room{} partSID := livekit.ParticipantID("part1") participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)} - fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil) + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true) // do // upstream bytes diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index 069e3b64f..07caf73f3 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -40,7 +40,7 @@ type FakeTelemetryService struct { arg3 *livekit.ParticipantInfo arg4 *livekit.AnalyticsClientMeta } - ParticipantJoinedStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta) + ParticipantJoinedStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta, bool) participantJoinedMutex sync.RWMutex participantJoinedArgsForCall []struct { arg1 context.Context @@ -48,6 +48,7 @@ type FakeTelemetryService struct { arg3 *livekit.ParticipantInfo arg4 *livekit.ClientInfo arg5 *livekit.AnalyticsClientMeta + arg6 bool } ParticipantLeftStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, bool) participantLeftMutex sync.RWMutex @@ -299,7 +300,7 @@ func (fake *FakeTelemetryService) ParticipantActiveArgsForCall(i int) (context.C return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 } -func (fake *FakeTelemetryService) ParticipantJoined(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.ClientInfo, arg5 *livekit.AnalyticsClientMeta) { +func (fake *FakeTelemetryService) ParticipantJoined(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.ClientInfo, arg5 *livekit.AnalyticsClientMeta, arg6 bool) { fake.participantJoinedMutex.Lock() fake.participantJoinedArgsForCall = append(fake.participantJoinedArgsForCall, struct { arg1 context.Context @@ -307,12 +308,13 @@ func (fake *FakeTelemetryService) ParticipantJoined(arg1 context.Context, arg2 * arg3 *livekit.ParticipantInfo arg4 *livekit.ClientInfo arg5 *livekit.AnalyticsClientMeta - }{arg1, arg2, arg3, arg4, arg5}) + arg6 bool + }{arg1, arg2, arg3, arg4, arg5, arg6}) stub := fake.ParticipantJoinedStub - fake.recordInvocation("ParticipantJoined", []interface{}{arg1, arg2, arg3, arg4, arg5}) + fake.recordInvocation("ParticipantJoined", []interface{}{arg1, arg2, arg3, arg4, arg5, arg6}) fake.participantJoinedMutex.Unlock() if stub != nil { - fake.ParticipantJoinedStub(arg1, arg2, arg3, arg4, arg5) + fake.ParticipantJoinedStub(arg1, arg2, arg3, arg4, arg5, arg6) } } @@ -322,17 +324,17 @@ func (fake *FakeTelemetryService) ParticipantJoinedCallCount() int { return len(fake.participantJoinedArgsForCall) } -func (fake *FakeTelemetryService) ParticipantJoinedCalls(stub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta)) { +func (fake *FakeTelemetryService) ParticipantJoinedCalls(stub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta, bool)) { fake.participantJoinedMutex.Lock() defer fake.participantJoinedMutex.Unlock() fake.ParticipantJoinedStub = stub } -func (fake *FakeTelemetryService) ParticipantJoinedArgsForCall(i int) (context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta) { +func (fake *FakeTelemetryService) ParticipantJoinedArgsForCall(i int) (context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta, bool) { fake.participantJoinedMutex.RLock() defer fake.participantJoinedMutex.RUnlock() argsForCall := fake.participantJoinedArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5, argsForCall.arg6 } func (fake *FakeTelemetryService) ParticipantLeft(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 bool) { diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index a0a2254ec..0c29cebbc 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -21,7 +21,7 @@ type TelemetryService interface { // events RoomStarted(ctx context.Context, room *livekit.Room) RoomEnded(ctx context.Context, room *livekit.Room) - ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta) + ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta, shouldSendEvent bool) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, shouldSendEvent bool) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo)