Publish/Unpublish counter match. (#4173)

Published counter was bumped up only when not migrating in, but it was
decremented when a migrating participant leaves without expectation to
resume. That could have resulted in negative counts.

Always change counters irrespective of migration or expected to resume
on leave. Control events send based on migration/resume.
This commit is contained in:
Raja Subramanian
2025-12-18 10:16:27 +05:30
committed by GitHub
parent fb849edc6a
commit cbb2c61787
4 changed files with 29 additions and 26 deletions
+14 -17
View File
@@ -3299,15 +3299,13 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, ti *livekit.TrackInfo)
p.supervisor.ClearPublishedTrack(trackID, mt)
}
if !isExpectedToResume {
p.params.Telemetry.TrackUnpublished(
context.Background(),
p.ID(),
p.Identity(),
mt.ToProto(),
true,
)
}
p.params.Telemetry.TrackUnpublished(
context.Background(),
p.ID(),
p.Identity(),
mt.ToProto(),
!isExpectedToResume,
)
p.pendingTracksLock.Lock()
if pti := p.pendingTracks[signalCid]; pti != nil {
@@ -3336,14 +3334,13 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack, isMigrate
// send webhook after callbacks are complete, persistence and state handling happens
// in `onTrackPublished` cb
if !isMigrated {
p.params.Telemetry.TrackPublished(
context.Background(),
p.ID(),
p.Identity(),
track.ToProto(),
)
}
p.params.Telemetry.TrackPublished(
context.Background(),
p.ID(),
p.Identity(),
track.ToProto(),
!isMigrated,
)
p.pendingTracksLock.Lock()
delete(p.pendingPublishingTracks, track.ID())
+4
View File
@@ -236,10 +236,14 @@ func (t *telemetryService) TrackPublished(
participantID livekit.ParticipantID,
identity livekit.ParticipantIdentity,
track *livekit.TrackInfo,
shouldSendEvent bool,
) {
t.enqueue(func() {
prometheus.AddPublishedTrack(track.Type.String())
prometheus.RecordTrackPublishSuccess(track.Type.String())
if !shouldSendEvent {
return
}
room := t.getRoomDetails(participantID)
participant := &livekit.ParticipantInfo{
@@ -203,13 +203,14 @@ type FakeTelemetryService struct {
arg3 livekit.ParticipantIdentity
arg4 *livekit.TrackInfo
}
TrackPublishedStub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo)
TrackPublishedStub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, bool)
trackPublishedMutex sync.RWMutex
trackPublishedArgsForCall []struct {
arg1 context.Context
arg2 livekit.ParticipantID
arg3 livekit.ParticipantIdentity
arg4 *livekit.TrackInfo
arg5 bool
}
TrackPublishedUpdateStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo)
trackPublishedUpdateMutex sync.RWMutex
@@ -1233,19 +1234,20 @@ func (fake *FakeTelemetryService) TrackPublishRequestedArgsForCall(i int) (conte
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
}
func (fake *FakeTelemetryService) TrackPublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 livekit.ParticipantIdentity, arg4 *livekit.TrackInfo) {
func (fake *FakeTelemetryService) TrackPublished(arg1 context.Context, arg2 livekit.ParticipantID, arg3 livekit.ParticipantIdentity, arg4 *livekit.TrackInfo, arg5 bool) {
fake.trackPublishedMutex.Lock()
fake.trackPublishedArgsForCall = append(fake.trackPublishedArgsForCall, struct {
arg1 context.Context
arg2 livekit.ParticipantID
arg3 livekit.ParticipantIdentity
arg4 *livekit.TrackInfo
}{arg1, arg2, arg3, arg4})
arg5 bool
}{arg1, arg2, arg3, arg4, arg5})
stub := fake.TrackPublishedStub
fake.recordInvocation("TrackPublished", []interface{}{arg1, arg2, arg3, arg4})
fake.recordInvocation("TrackPublished", []interface{}{arg1, arg2, arg3, arg4, arg5})
fake.trackPublishedMutex.Unlock()
if stub != nil {
fake.TrackPublishedStub(arg1, arg2, arg3, arg4)
fake.TrackPublishedStub(arg1, arg2, arg3, arg4, arg5)
}
}
@@ -1255,17 +1257,17 @@ func (fake *FakeTelemetryService) TrackPublishedCallCount() int {
return len(fake.trackPublishedArgsForCall)
}
func (fake *FakeTelemetryService) TrackPublishedCalls(stub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo)) {
func (fake *FakeTelemetryService) TrackPublishedCalls(stub func(context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, bool)) {
fake.trackPublishedMutex.Lock()
defer fake.trackPublishedMutex.Unlock()
fake.TrackPublishedStub = stub
}
func (fake *FakeTelemetryService) TrackPublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo) {
func (fake *FakeTelemetryService) TrackPublishedArgsForCall(i int) (context.Context, livekit.ParticipantID, livekit.ParticipantIdentity, *livekit.TrackInfo, bool) {
fake.trackPublishedMutex.RLock()
defer fake.trackPublishedMutex.RUnlock()
argsForCall := fake.trackPublishedArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5
}
func (fake *FakeTelemetryService) TrackPublishedUpdate(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo) {
+1 -1
View File
@@ -47,7 +47,7 @@ type TelemetryService interface {
// TrackPublishRequested - a publication attempt has been received
TrackPublishRequested(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo)
// TrackPublished - a publication attempt has been successful
TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo)
TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool)
// TrackUnpublished - a participant unpublished a track
TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool)
// TrackSubscribeRequested - a participant requested to subscribe to a track