Implement event loop for telemetry service (#297)

It allows all actions/events to run in the same go routine.
Therefore no synchronization primitives are needed inside
telemetry service implementation.
This commit is contained in:
Artur Shellunts
2021-12-29 19:51:12 +01:00
committed by GitHub
parent 828d490755
commit b744a9c2ba
4 changed files with 52 additions and 46 deletions
-16
View File
@@ -2,7 +2,6 @@ package telemetry
import (
"context"
"sync"
"github.com/livekit/protocol/livekit"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -18,7 +17,6 @@ type StatsWorker struct {
roomName string
participantID string
sync.RWMutex
upstreamBuffers map[string][]*buffer.Buffer
drainUpstreamBuffers map[string]bool
@@ -52,16 +50,10 @@ func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID, roomName,
}
func (s *StatsWorker) AddBuffer(trackID string, buffer *buffer.Buffer) {
s.Lock()
defer s.Unlock()
s.upstreamBuffers[trackID] = append(s.upstreamBuffers[trackID], buffer)
}
func (s *StatsWorker) OnDownstreamPacket(trackID string, bytes int) {
s.Lock()
defer s.Unlock()
s.getOrCreateOutgoingStatsIfEmpty(trackID).totalBytes += uint64(bytes)
s.getOrCreateOutgoingStatsIfEmpty(trackID).totalPackets++
}
@@ -91,9 +83,6 @@ func (s *StatsWorker) getOrCreateIncomingStatsIfEmpty(trackID string) *Stats {
}
func (s *StatsWorker) OnRTCP(trackID string, direction livekit.StreamType, stats *livekit.AnalyticsStat) {
s.Lock()
defer s.Unlock()
var ds *Stats
if direction == livekit.StreamType_DOWNSTREAM {
ds = s.getOrCreateOutgoingStatsIfEmpty(trackID)
@@ -125,9 +114,6 @@ func (s *StatsWorker) calculateTotalBytesPackets(allBuffers []*buffer.Buffer) (t
}
func (s *StatsWorker) Update() {
s.Lock()
defer s.Unlock()
ts := timestamppb.Now()
stats := make([]*livekit.AnalyticsStat, 0)
@@ -196,9 +182,7 @@ func (s *StatsWorker) update(stats *Stats, ts *timestamppb.Timestamp) *livekit.A
}
func (s *StatsWorker) RemoveBuffer(trackID string) {
s.Lock()
s.drainUpstreamBuffers[trackID] = true
s.Unlock()
}
func (s *StatsWorker) Close() {
+52 -14
View File
@@ -32,13 +32,19 @@ type TelemetryService interface {
RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo)
}
type doWorkFunc func()
type telemetryService struct {
internalService TelemetryServiceInternal
jobQueue chan doWorkFunc
}
const jobQueueBufferSize = 100
func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService {
t := &telemetryService{
internalService: NewTelemetryServiceInternal(notifier, analytics),
jobQueue: make(chan doWorkFunc, jobQueueBufferSize),
}
go t.run()
@@ -47,62 +53,94 @@ func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService)
}
func (t *telemetryService) run() {
ticker := time.NewTicker(updateFrequency)
for {
select {
case <-time.After(updateFrequency):
case <-ticker.C:
t.internalService.SendAnalytics()
case job, ok := <-t.jobQueue:
if ok {
job()
}
}
}
}
func (t *telemetryService) AddUpTrack(participantID string, trackID string, buff *buffer.Buffer) {
t.internalService.AddUpTrack(participantID, trackID, buff)
t.jobQueue <- func() {
t.internalService.AddUpTrack(participantID, trackID, buff)
}
}
func (t *telemetryService) OnDownstreamPacket(participantID string, trackID string, bytes int) {
t.internalService.OnDownstreamPacket(participantID, trackID, bytes)
t.jobQueue <- func() {
t.internalService.OnDownstreamPacket(participantID, trackID, bytes)
}
}
func (t *telemetryService) HandleRTCP(streamType livekit.StreamType, participantID string, trackID string, pkts []rtcp.Packet) {
t.internalService.HandleRTCP(streamType, participantID, trackID, pkts)
t.jobQueue <- func() {
t.internalService.HandleRTCP(streamType, participantID, trackID, pkts)
}
}
func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
t.internalService.RoomStarted(ctx, room)
t.jobQueue <- func() {
t.internalService.RoomStarted(ctx, room)
}
}
func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
t.internalService.RoomEnded(ctx, room)
t.jobQueue <- func() {
t.internalService.RoomEnded(ctx, room)
}
}
func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo) {
t.internalService.ParticipantJoined(ctx, room, participant, clientInfo)
t.jobQueue <- func() {
t.internalService.ParticipantJoined(ctx, room, participant, clientInfo)
}
}
func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
t.internalService.ParticipantLeft(ctx, room, participant)
t.jobQueue <- func() {
t.internalService.ParticipantLeft(ctx, room, participant)
}
}
func (t *telemetryService) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) {
t.internalService.TrackPublished(ctx, participantID, track)
t.jobQueue <- func() {
t.internalService.TrackPublished(ctx, participantID, track)
}
}
func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) {
t.internalService.TrackUnpublished(ctx, participantID, track, ssrc)
t.jobQueue <- func() {
t.internalService.TrackUnpublished(ctx, participantID, track, ssrc)
}
}
func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
t.internalService.TrackSubscribed(ctx, participantID, track)
t.jobQueue <- func() {
t.internalService.TrackSubscribed(ctx, participantID, track)
}
}
func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
t.internalService.TrackUnsubscribed(ctx, participantID, track)
t.jobQueue <- func() {
t.internalService.TrackUnsubscribed(ctx, participantID, track)
}
}
func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) {
t.internalService.RecordingStarted(ctx, ri)
t.jobQueue <- func() {
t.internalService.RecordingStarted(ctx, ri)
}
}
func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) {
t.internalService.RecordingEnded(ctx, ri)
t.jobQueue <- func() {
t.internalService.RecordingEnded(ctx, ri)
}
}
@@ -2,7 +2,6 @@ package telemetry
import (
"context"
"sync"
"github.com/gammazero/workerpool"
"github.com/livekit/protocol/livekit"
@@ -26,7 +25,6 @@ type telemetryServiceInternal struct {
notifier webhook.Notifier
webhookPool *workerpool.WorkerPool
sync.RWMutex
// one worker per participant
workers map[string]*StatsWorker
@@ -43,18 +41,14 @@ func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsS
}
func (t *telemetryServiceInternal) AddUpTrack(participantID string, trackID string, buff *buffer.Buffer) {
t.RLock()
w := t.workers[participantID]
t.RUnlock()
if w != nil {
w.AddBuffer(trackID, buff)
}
}
func (t *telemetryServiceInternal) OnDownstreamPacket(participantID string, trackID string, bytes int) {
t.RLock()
w := t.workers[participantID]
t.RUnlock()
if w != nil {
w.OnDownstreamPacket(trackID, bytes)
}
@@ -90,9 +84,7 @@ func (t *telemetryServiceInternal) HandleRTCP(streamType livekit.StreamType, par
prometheus.IncrementRTCP(direction, stats.NackCount, stats.PliCount, stats.FirCount)
t.RLock()
w := t.workers[participantID]
t.RUnlock()
if w != nil {
w.OnRTCP(trackID, streamType, stats)
}
@@ -46,9 +46,7 @@ func (t *telemetryServiceInternal) RoomEnded(ctx context.Context, room *livekit.
func (t *telemetryServiceInternal) ParticipantJoined(ctx context.Context, room *livekit.Room,
participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo) {
t.Lock()
t.workers[participant.Sid] = newStatsWorker(ctx, t, room.Sid, room.Name, participant.Sid)
t.Unlock()
prometheus.AddParticipant()
@@ -70,12 +68,10 @@ func (t *telemetryServiceInternal) ParticipantJoined(ctx context.Context, room *
}
func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
t.Lock()
if w := t.workers[participant.Sid]; w != nil {
w.Close()
delete(t.workers, participant.Sid)
}
t.Unlock()
prometheus.SubParticipant()
@@ -111,9 +107,7 @@ func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participa
func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) {
roomID := ""
roomName := ""
t.RLock()
w := t.workers[participantID]
t.RUnlock()
if w != nil {
roomID = w.roomID
w.RemoveBuffer(track.GetSid())
@@ -189,9 +183,7 @@ func (t *telemetryServiceInternal) RecordingEnded(ctx context.Context, ri *livek
}
func (t *telemetryServiceInternal) getRoomDetails(participantID string) (string, string) {
t.RLock()
w := t.workers[participantID]
t.RUnlock()
if w != nil {
return w.roomID, w.roomName
}