From f71544e27a2edceadd5a0f0e76a7eba50b8d2421 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Thu, 15 Jun 2023 15:39:04 -0700 Subject: [PATCH] Do not send ParticipantJoined webhook if connection was resumed (#1795) * Do not send ParticipantJoined webhook if connection was resumed * isResume -> isMigration --- pkg/rtc/room.go | 13 +++++++++---- pkg/telemetry/events.go | 15 +++++++++------ pkg/telemetry/events_test.go | 4 ++-- .../telemetryfakes/fake_telemetry_service.go | 18 ++++++++++-------- pkg/telemetry/telemetryservice.go | 4 ++-- 5 files changed, 32 insertions(+), 22 deletions(-) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 4344c6787..8d343a98f 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -280,10 +280,15 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me // start the workers once connectivity is established p.Start() - r.telemetry.ParticipantActive(context.Background(), r.ToProto(), p.ToProto(), &livekit.AnalyticsClientMeta{ - ClientConnectTime: uint32(time.Since(p.ConnectedAt()).Milliseconds()), - ConnectionType: string(p.GetICEConnectionType()), - }) + r.telemetry.ParticipantActive(context.Background(), + r.ToProto(), + p.ToProto(), + &livekit.AnalyticsClientMeta{ + ClientConnectTime: uint32(time.Since(p.ConnectedAt()).Milliseconds()), + ConnectionType: string(p.GetICEConnectionType()), + }, + false, + ) } else if state == livekit.ParticipantInfo_DISCONNECTED { // remove participant from room go r.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonStateDisconnected) diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index 26a215ba6..71d8c05b7 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -91,14 +91,17 @@ func (t *telemetryService) ParticipantActive( room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta, + isMigration bool, ) { t.enqueue(func() { - // consider participant joined only when they became active - t.NotifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventParticipantJoined, - Room: room, - Participant: participant, - }) + if !isMigration { + // consider participant joined only when they became active + t.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventParticipantJoined, + Room: room, + Participant: participant, + }) + } worker, ok := t.getWorker(livekit.ParticipantID(participant.Sid)) if !ok { diff --git a/pkg/telemetry/events_test.go b/pkg/telemetry/events_test.go index 852701180..77529b211 100644 --- a/pkg/telemetry/events_test.go +++ b/pkg/telemetry/events_test.go @@ -69,7 +69,7 @@ func Test_OnParticipantLeft_EventIsSent(t *testing.T) { participantInfo := &livekit.ParticipantInfo{Sid: partSID} // do - fixture.sut.ParticipantActive(context.Background(), room, participantInfo, &livekit.AnalyticsClientMeta{}) + fixture.sut.ParticipantActive(context.Background(), room, participantInfo, &livekit.AnalyticsClientMeta{}, false) fixture.sut.ParticipantLeft(context.Background(), room, participantInfo, true) time.Sleep(time.Millisecond * 500) @@ -159,7 +159,7 @@ func Test_OnParticipantActive_EventIsSent(t *testing.T) { ClientConnectTime: 420, } - fixture.sut.ParticipantActive(context.Background(), room, participantInfo, clientMetaConnect) + fixture.sut.ParticipantActive(context.Background(), room, participantInfo, clientMetaConnect, false) time.Sleep(time.Millisecond * 500) require.Equal(t, 2, fixture.analytics.SendEventCallCount()) diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index 800160fb7..f85e03527 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -62,13 +62,14 @@ type FakeTelemetryService struct { arg1 context.Context arg2 *livekit.WebhookEvent } - ParticipantActiveStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta) + ParticipantActiveStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta, bool) participantActiveMutex sync.RWMutex participantActiveArgsForCall []struct { arg1 context.Context arg2 *livekit.Room arg3 *livekit.ParticipantInfo arg4 *livekit.AnalyticsClientMeta + arg5 bool } ParticipantJoinedStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta, bool) participantJoinedMutex sync.RWMutex @@ -526,19 +527,20 @@ func (fake *FakeTelemetryService) NotifyEventArgsForCall(i int) (context.Context return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeTelemetryService) ParticipantActive(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.AnalyticsClientMeta) { +func (fake *FakeTelemetryService) ParticipantActive(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.AnalyticsClientMeta, arg5 bool) { fake.participantActiveMutex.Lock() fake.participantActiveArgsForCall = append(fake.participantActiveArgsForCall, struct { arg1 context.Context arg2 *livekit.Room arg3 *livekit.ParticipantInfo arg4 *livekit.AnalyticsClientMeta - }{arg1, arg2, arg3, arg4}) + arg5 bool + }{arg1, arg2, arg3, arg4, arg5}) stub := fake.ParticipantActiveStub - fake.recordInvocation("ParticipantActive", []interface{}{arg1, arg2, arg3, arg4}) + fake.recordInvocation("ParticipantActive", []interface{}{arg1, arg2, arg3, arg4, arg5}) fake.participantActiveMutex.Unlock() if stub != nil { - fake.ParticipantActiveStub(arg1, arg2, arg3, arg4) + fake.ParticipantActiveStub(arg1, arg2, arg3, arg4, arg5) } } @@ -548,17 +550,17 @@ func (fake *FakeTelemetryService) ParticipantActiveCallCount() int { return len(fake.participantActiveArgsForCall) } -func (fake *FakeTelemetryService) ParticipantActiveCalls(stub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta)) { +func (fake *FakeTelemetryService) ParticipantActiveCalls(stub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta, bool)) { fake.participantActiveMutex.Lock() defer fake.participantActiveMutex.Unlock() fake.ParticipantActiveStub = stub } -func (fake *FakeTelemetryService) ParticipantActiveArgsForCall(i int) (context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta) { +func (fake *FakeTelemetryService) ParticipantActiveArgsForCall(i int) (context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.AnalyticsClientMeta, bool) { fake.participantActiveMutex.RLock() defer fake.participantActiveMutex.RUnlock() argsForCall := fake.participantActiveArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 } func (fake *FakeTelemetryService) ParticipantJoined(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.ClientInfo, arg5 *livekit.AnalyticsClientMeta, arg6 bool) { diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index d3aaf1715..e304980aa 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -22,8 +22,8 @@ type TelemetryService interface { // ParticipantJoined - a participant establishes signal connection to a room ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta, shouldSendEvent bool) // ParticipantActive - a participant establishes media connection - ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) - // ParticipantResumed - there has been an ICE restart or connection resume attempt + ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta, isMigration bool) + // ParticipantResumed - there has been an ICE restart or connection resume attempt, and we've received their signal connection ParticipantResumed(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, nodeID livekit.NodeID, reason livekit.ReconnectReason) // ParticipantLeft - the participant leaves the room, only sent if ParticipantActive has been called before ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, shouldSendEvent bool)