mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 15:35:41 +00:00
Fixed concurrent modification to map (#702)
Synchronizes access to stats worker maps. Previously it was accessed from both OpsQueue goroutine and run() worker
This commit is contained in:
@@ -5,7 +5,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/utils"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/webhook"
|
||||
@@ -36,7 +35,7 @@ type TelemetryService interface {
|
||||
|
||||
type telemetryService struct {
|
||||
internalService TelemetryServiceInternal
|
||||
jobQueue *utils.OpsQueue
|
||||
jobsChan chan func()
|
||||
}
|
||||
|
||||
// queue should be sufficiently large to avoid blocking
|
||||
@@ -45,11 +44,9 @@ const jobQueueBufferSize = 10000
|
||||
func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService {
|
||||
t := &telemetryService{
|
||||
internalService: NewTelemetryServiceInternal(notifier, analytics),
|
||||
jobQueue: utils.NewOpsQueue(logger.GetDefaultLogger(), "telemetry", jobQueueBufferSize),
|
||||
jobsChan: make(chan func(), jobQueueBufferSize),
|
||||
}
|
||||
|
||||
t.jobQueue.Start()
|
||||
|
||||
go t.run()
|
||||
|
||||
return t
|
||||
@@ -59,104 +56,117 @@ func (t *telemetryService) run() {
|
||||
ticker := time.NewTicker(config.StatsUpdateInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
<-ticker.C
|
||||
t.internalService.SendAnalytics()
|
||||
select {
|
||||
case <-ticker.C:
|
||||
t.internalService.SendAnalytics()
|
||||
case op := <-t.jobsChan:
|
||||
op()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *telemetryService) enqueue(op func()) {
|
||||
select {
|
||||
case t.jobsChan <- op:
|
||||
// success
|
||||
default:
|
||||
logger.Warnw("telemetry queue full", nil)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stats *livekit.AnalyticsStat) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.TrackStats(streamType, participantID, trackID, stats)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.RoomStarted(ctx, room)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.RoomEnded(ctx, room)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo,
|
||||
clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.ParticipantJoined(ctx, room, participant, clientInfo, clientMeta)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.ParticipantActive(ctx, room, participant, clientMeta)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.ParticipantLeft(ctx, room, participant)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.TrackPublished(ctx, participantID, identity, track)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.TrackUnpublished(ctx, participantID, identity, track, ssrc)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.TrackSubscribed(ctx, participantID, track, publisher)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.TrackUnsubscribed(ctx, participantID, track)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.RecordingStarted(ctx, ri)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.RecordingEnded(ctx, ri)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.TrackPublishedUpdate(ctx, participantID, track)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, maxQuality livekit.VideoQuality) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.TrackMaxSubscribedVideoQuality(ctx, participantID, track, maxQuality)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.EgressStarted(ctx, info)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) {
|
||||
t.jobQueue.Enqueue(func() {
|
||||
t.enqueue(func() {
|
||||
t.internalService.EgressEnded(ctx, info)
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user