Allow skipping of sending ParticipantJoined analytics event (#1236)

In certain scenarios such as migration, we do not want a duplicate event
to be sent when the participant is reconnecting. The Prometheus metric
should still be updated though.
This commit is contained in:
David Zhao
2022-12-18 22:09:20 -08:00
committed by GitHub
parent 241a7120f5
commit 120335da00
6 changed files with 45 additions and 36 deletions
+1 -1
View File
@@ -345,7 +345,7 @@ func (r *RoomManager) StartSession(
persistRoomForParticipantCount(room.ToProto())
clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region, Node: r.currentNode.Id}
r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta)
r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta, true)
participant.OnClose(func(p types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) {
if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil {
pLogger.Errorw("could not delete participant", err)
+18 -11
View File
@@ -65,6 +65,7 @@ func (t *telemetryService) ParticipantJoined(
participant *livekit.ParticipantInfo,
clientInfo *livekit.ClientInfo,
clientMeta *livekit.AnalyticsClientMeta,
shouldSendEvent bool,
) {
t.enqueue(func() {
prometheus.IncrementParticipantJoin(1)
@@ -78,16 +79,18 @@ func (t *telemetryService) ParticipantJoined(
livekit.ParticipantIdentity(participant.Identity),
)
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED,
Timestamp: timestamppb.Now(),
RoomId: room.Sid,
ParticipantId: participant.Sid,
Participant: participant,
Room: room,
ClientInfo: clientInfo,
ClientMeta: clientMeta,
})
if shouldSendEvent {
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED,
Timestamp: timestamppb.Now(),
RoomId: room.Sid,
ParticipantId: participant.Sid,
Participant: participant,
Room: room,
ClientInfo: clientInfo,
ClientMeta: clientMeta,
})
}
})
}
@@ -128,7 +131,11 @@ func (t *telemetryService) ParticipantActive(
})
}
func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, shouldSendEvent bool) {
func (t *telemetryService) ParticipantLeft(ctx context.Context,
room *livekit.Room,
participant *livekit.ParticipantInfo,
shouldSendEvent bool,
) {
t.enqueue(func() {
isConnected := false
if worker, ok := t.getWorker(livekit.ParticipantID(participant.Sid)); ok {
+3 -3
View File
@@ -34,7 +34,7 @@ func Test_OnParticipantJoin_EventIsSent(t *testing.T) {
participantInfo := &livekit.ParticipantInfo{Sid: partSID}
// do
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta, true)
time.Sleep(time.Millisecond * 500)
// test
@@ -146,7 +146,7 @@ func Test_OnParticipantActive_EventIsSent(t *testing.T) {
participantInfo := &livekit.ParticipantInfo{Sid: partSID}
// do
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta, true)
time.Sleep(time.Millisecond * 500)
// test
@@ -198,7 +198,7 @@ func Test_OnTrackSubscribed_EventIsSent(t *testing.T) {
participantInfo := &livekit.ParticipantInfo{Sid: partSID}
// do
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, clientMeta, true)
time.Sleep(time.Millisecond * 500)
// test
+12 -12
View File
@@ -38,7 +38,7 @@ func Test_ParticipantAndRoomDataAreSentWithAnalytics(t *testing.T) {
partSID := livekit.ParticipantID("part1")
clientInfo := &livekit.ClientInfo{Sdk: 2}
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil, true)
// do
packet := 33
@@ -66,7 +66,7 @@ func Test_OnDownstreamPackets(t *testing.T) {
partSID := livekit.ParticipantID("part1")
clientInfo := &livekit.ClientInfo{Sdk: 2}
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil, true)
// do
packets := []int{33, 23}
@@ -99,7 +99,7 @@ func Test_OnDownstreamPackets_SeveralTracks(t *testing.T) {
partSID := livekit.ParticipantID("part1")
clientInfo := &livekit.ClientInfo{Sdk: 2}
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo, nil, true)
// do
packet1 := 33
@@ -144,7 +144,7 @@ func Test_OnDownStreamStat(t *testing.T) {
room := &livekit.Room{}
partSID := livekit.ParticipantID("part1")
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true)
// do
stat1 := &livekit.AnalyticsStat{
@@ -203,7 +203,7 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) {
room := &livekit.Room{}
partSID := livekit.ParticipantID("part1")
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true)
// do
trackID := livekit.TrackID("trackID1")
@@ -255,7 +255,7 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) {
room := &livekit.Room{}
partSID := livekit.ParticipantID("part1")
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true)
// do
trackID1 := livekit.TrackID("trackID1")
@@ -324,7 +324,7 @@ func Test_OnUpstreamStat(t *testing.T) {
room := &livekit.Room{}
partSID := livekit.ParticipantID("part1")
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true)
// do
stat1 := &livekit.AnalyticsStat{
@@ -386,7 +386,7 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
partSID := livekit.ParticipantID("part1")
identity := livekit.ParticipantIdentity("part1Identity")
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID), Identity: string(identity)}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true)
// there should be bytes reported so that stats are sent
totalBytes := 1
@@ -472,7 +472,7 @@ func Test_AnalyticsSentWhenParticipantLeaves(t *testing.T) {
room := &livekit.Room{}
partSID := "part1"
participantInfo := &livekit.ParticipantInfo{Sid: partSID}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true)
// do
fixture.sut.ParticipantLeft(context.Background(), room, participantInfo, true)
@@ -489,7 +489,7 @@ func Test_AddUpTrack(t *testing.T) {
room := &livekit.Room{}
partSID := livekit.ParticipantID("part1")
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true)
// do
var totalBytes uint64 = 3
@@ -526,7 +526,7 @@ func Test_AddUpTrack_SeveralBuffers_Simulcast(t *testing.T) {
room := &livekit.Room{}
partSID := livekit.ParticipantID("part1")
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true)
// do
trackID := livekit.TrackID("trackID")
@@ -565,7 +565,7 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) {
room := &livekit.Room{}
partSID := livekit.ParticipantID("part1")
participantInfo := &livekit.ParticipantInfo{Sid: string(partSID)}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil)
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil, nil, true)
// do
// upstream bytes
@@ -40,7 +40,7 @@ type FakeTelemetryService struct {
arg3 *livekit.ParticipantInfo
arg4 *livekit.AnalyticsClientMeta
}
ParticipantJoinedStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta)
ParticipantJoinedStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta, bool)
participantJoinedMutex sync.RWMutex
participantJoinedArgsForCall []struct {
arg1 context.Context
@@ -48,6 +48,7 @@ type FakeTelemetryService struct {
arg3 *livekit.ParticipantInfo
arg4 *livekit.ClientInfo
arg5 *livekit.AnalyticsClientMeta
arg6 bool
}
ParticipantLeftStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, bool)
participantLeftMutex sync.RWMutex
@@ -299,7 +300,7 @@ func (fake *FakeTelemetryService) ParticipantActiveArgsForCall(i int) (context.C
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
}
func (fake *FakeTelemetryService) ParticipantJoined(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.ClientInfo, arg5 *livekit.AnalyticsClientMeta) {
func (fake *FakeTelemetryService) ParticipantJoined(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 *livekit.ClientInfo, arg5 *livekit.AnalyticsClientMeta, arg6 bool) {
fake.participantJoinedMutex.Lock()
fake.participantJoinedArgsForCall = append(fake.participantJoinedArgsForCall, struct {
arg1 context.Context
@@ -307,12 +308,13 @@ func (fake *FakeTelemetryService) ParticipantJoined(arg1 context.Context, arg2 *
arg3 *livekit.ParticipantInfo
arg4 *livekit.ClientInfo
arg5 *livekit.AnalyticsClientMeta
}{arg1, arg2, arg3, arg4, arg5})
arg6 bool
}{arg1, arg2, arg3, arg4, arg5, arg6})
stub := fake.ParticipantJoinedStub
fake.recordInvocation("ParticipantJoined", []interface{}{arg1, arg2, arg3, arg4, arg5})
fake.recordInvocation("ParticipantJoined", []interface{}{arg1, arg2, arg3, arg4, arg5, arg6})
fake.participantJoinedMutex.Unlock()
if stub != nil {
fake.ParticipantJoinedStub(arg1, arg2, arg3, arg4, arg5)
fake.ParticipantJoinedStub(arg1, arg2, arg3, arg4, arg5, arg6)
}
}
@@ -322,17 +324,17 @@ func (fake *FakeTelemetryService) ParticipantJoinedCallCount() int {
return len(fake.participantJoinedArgsForCall)
}
func (fake *FakeTelemetryService) ParticipantJoinedCalls(stub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta)) {
func (fake *FakeTelemetryService) ParticipantJoinedCalls(stub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta, bool)) {
fake.participantJoinedMutex.Lock()
defer fake.participantJoinedMutex.Unlock()
fake.ParticipantJoinedStub = stub
}
func (fake *FakeTelemetryService) ParticipantJoinedArgsForCall(i int) (context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta) {
func (fake *FakeTelemetryService) ParticipantJoinedArgsForCall(i int) (context.Context, *livekit.Room, *livekit.ParticipantInfo, *livekit.ClientInfo, *livekit.AnalyticsClientMeta, bool) {
fake.participantJoinedMutex.RLock()
defer fake.participantJoinedMutex.RUnlock()
argsForCall := fake.participantJoinedArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5, argsForCall.arg6
}
func (fake *FakeTelemetryService) ParticipantLeft(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 bool) {
+1 -1
View File
@@ -21,7 +21,7 @@ type TelemetryService interface {
// events
RoomStarted(ctx context.Context, room *livekit.Room)
RoomEnded(ctx context.Context, room *livekit.Room)
ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta)
ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta, shouldSendEvent bool)
ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta)
ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, shouldSendEvent bool)
TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo)