diff --git a/pkg/telemetry/interceptor.go b/pkg/telemetry/interceptor.go index c3ad054a1..9cd5d93e0 100644 --- a/pkg/telemetry/interceptor.go +++ b/pkg/telemetry/interceptor.go @@ -6,7 +6,7 @@ import ( "github.com/pion/rtcp" ) -func (t *telemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory { +func (t *telemetryServiceInternal) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory { return &StatsInterceptorFactory{ t: t, participantID: participantID, diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index 368dedfa6..facf86399 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -3,7 +3,6 @@ package telemetry import ( "context" "sync" - "time" "github.com/livekit/protocol/livekit" "google.golang.org/protobuf/types/known/timestamppb" @@ -11,12 +10,10 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/buffer" ) -const updateFrequency = time.Second * 10 - // StatsWorker handles participant stats type StatsWorker struct { ctx context.Context - t TelemetryService + t TelemetryReporter roomID string roomName string participantID string @@ -27,8 +24,6 @@ type StatsWorker struct { incoming *Stats outgoing *Stats - - close chan struct{} } type Stats struct { @@ -40,7 +35,7 @@ type Stats struct { prevBytes uint64 } -func newStatsWorker(ctx context.Context, t TelemetryService, roomID, roomName, participantID string) *StatsWorker { +func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID, roomName, participantID string) *StatsWorker { s := &StatsWorker{ ctx: ctx, t: t, @@ -63,26 +58,10 @@ func newStatsWorker(ctx context.Context, t TelemetryService, roomID, roomName, p ParticipantId: participantID, RoomName: roomName, }}, - - close: make(chan struct{}, 1), } - go s.run() return s } -func (s *StatsWorker) run() { - for { - select { - case <-s.close: - // drain - s.Update() - return - case <-time.After(updateFrequency): - s.Update() - } - } -} - func (s *StatsWorker) AddBuffer(buffer *buffer.Buffer) { s.Lock() defer s.Unlock() @@ -190,5 +169,5 @@ func (s *StatsWorker) RemoveBuffer(ssrc uint32) { } func (s *StatsWorker) Close() { - close(s.close) + s.Update() } diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 00333ebdd..16dad1050 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -2,24 +2,23 @@ package telemetry import ( "context" - "sync" + "time" - "github.com/gammazero/workerpool" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/webhook" "github.com/pion/rtcp" "github.com/livekit/livekit-server/pkg/sfu/buffer" - "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) +const updateFrequency = time.Second * 10 + type TelemetryService interface { // stats NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory AddUpTrack(participantID string, buff *buffer.Buffer) OnDownstreamPacket(participantID string, bytes int) HandleRTCP(streamType livekit.StreamType, participantID string, pkts []rtcp.Packet) - Report(ctx context.Context, stats []*livekit.AnalyticsStat) // events RoomStarted(ctx context.Context, room *livekit.Room) @@ -35,91 +34,80 @@ type TelemetryService interface { } type telemetryService struct { - notifier webhook.Notifier - webhookPool *workerpool.WorkerPool - - sync.RWMutex - // one worker per participant - workers map[string]*StatsWorker - - analytics AnalyticsService + internalService TelemetryServiceInternal } func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService { - return &telemetryService{ - notifier: notifier, - webhookPool: workerpool.New(1), - workers: make(map[string]*StatsWorker), - analytics: analytics, + t := &telemetryService{ + internalService: NewTelemetryServiceInternal(notifier, analytics), + } + + go t.run() + + return t +} + +func (t *telemetryService) run() { + for { + select { + case <-time.After(updateFrequency): + t.internalService.SendAnalytics() + } } } func (t *telemetryService) AddUpTrack(participantID string, buff *buffer.Buffer) { - t.RLock() - w := t.workers[participantID] - t.RUnlock() - if w != nil { - w.AddBuffer(buff) - } + t.internalService.AddUpTrack(participantID, buff) } func (t *telemetryService) OnDownstreamPacket(participantID string, bytes int) { - t.RLock() - w := t.workers[participantID] - t.RUnlock() - if w != nil { - w.OnDownstreamPacket(bytes) - } + t.internalService.OnDownstreamPacket(participantID, bytes) } func (t *telemetryService) HandleRTCP(streamType livekit.StreamType, participantID string, pkts []rtcp.Packet) { - stats := &livekit.AnalyticsStat{} - for _, pkt := range pkts { - switch pkt := pkt.(type) { - case *rtcp.TransportLayerNack: - stats.NackCount++ - case *rtcp.PictureLossIndication: - stats.PliCount++ - case *rtcp.FullIntraRequest: - stats.FirCount++ - case *rtcp.ReceiverReport: - for _, rr := range pkt.Reports { - if delay := uint64(rr.Delay); delay > stats.Delay { - stats.Delay = delay - } - if jitter := float64(rr.Jitter); jitter > stats.Jitter { - stats.Jitter = jitter - } - stats.PacketLost += uint64(rr.TotalLost) - } - } - } - - direction := prometheus.Incoming - if streamType == livekit.StreamType_DOWNSTREAM { - direction = prometheus.Outgoing - } - - prometheus.IncrementRTCP(direction, stats.NackCount, stats.PliCount, stats.FirCount) - - t.RLock() - w := t.workers[participantID] - t.RUnlock() - if w != nil { - w.OnRTCP(streamType, stats) - } + t.internalService.HandleRTCP(streamType, participantID, pkts) } -func (t *telemetryService) Report(ctx context.Context, stats []*livekit.AnalyticsStat) { - for _, stat := range stats { - direction := prometheus.Incoming - if stat.Kind == livekit.StreamType_DOWNSTREAM { - direction = prometheus.Outgoing - } - - prometheus.IncrementPackets(direction, stat.TotalPackets) - prometheus.IncrementBytes(direction, stat.TotalBytes) - } - - t.analytics.SendStats(ctx, stats) +func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) { + t.internalService.RoomStarted(ctx, room) +} + +func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { + t.internalService.RoomEnded(ctx, room) +} + +func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo) { + t.internalService.ParticipantJoined(ctx, room, participant, clientInfo) +} + +func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { + t.internalService.ParticipantLeft(ctx, room, participant) +} + +func (t *telemetryService) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) { + t.internalService.TrackPublished(ctx, participantID, track) +} + +func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) { + t.internalService.TrackUnpublished(ctx, participantID, track, ssrc) +} + +func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { + t.internalService.TrackSubscribed(ctx, participantID, track) +} + +func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { + t.internalService.TrackUnsubscribed(ctx, participantID, track) +} + +func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) { + t.internalService.RecordingStarted(ctx, ri) +} + +func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) { + t.internalService.RecordingEnded(ctx, ri) +} + +func (t *telemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory { + return t.internalService.NewStatsInterceptorFactory(participantID, identity) } diff --git a/pkg/telemetry/telemetryserviceinternal.go b/pkg/telemetry/telemetryserviceinternal.go new file mode 100644 index 000000000..964ad7709 --- /dev/null +++ b/pkg/telemetry/telemetryserviceinternal.go @@ -0,0 +1,119 @@ +package telemetry + +import ( + "context" + "sync" + + "github.com/gammazero/workerpool" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/webhook" + "github.com/pion/rtcp" + + "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" +) + +type TelemetryServiceInternal interface { + TelemetryService + SendAnalytics() +} + +type TelemetryReporter interface { + Report(ctx context.Context, stats []*livekit.AnalyticsStat) +} + +type telemetryServiceInternal struct { + notifier webhook.Notifier + webhookPool *workerpool.WorkerPool + + sync.RWMutex + // one worker per participant + workers map[string]*StatsWorker + + analytics AnalyticsService +} + +func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsService) TelemetryServiceInternal { + return &telemetryServiceInternal{ + notifier: notifier, + webhookPool: workerpool.New(1), + workers: make(map[string]*StatsWorker), + analytics: analytics, + } +} + +func (t *telemetryServiceInternal) AddUpTrack(participantID string, buff *buffer.Buffer) { + t.RLock() + w := t.workers[participantID] + t.RUnlock() + if w != nil { + w.AddBuffer(buff) + } +} + +func (t *telemetryServiceInternal) OnDownstreamPacket(participantID string, bytes int) { + t.RLock() + w := t.workers[participantID] + t.RUnlock() + if w != nil { + w.OnDownstreamPacket(bytes) + } +} + +func (t *telemetryServiceInternal) HandleRTCP(streamType livekit.StreamType, participantID string, pkts []rtcp.Packet) { + stats := &livekit.AnalyticsStat{} + for _, pkt := range pkts { + switch pkt := pkt.(type) { + case *rtcp.TransportLayerNack: + stats.NackCount++ + case *rtcp.PictureLossIndication: + stats.PliCount++ + case *rtcp.FullIntraRequest: + stats.FirCount++ + case *rtcp.ReceiverReport: + for _, rr := range pkt.Reports { + if delay := uint64(rr.Delay); delay > stats.Delay { + stats.Delay = delay + } + if jitter := float64(rr.Jitter); jitter > stats.Jitter { + stats.Jitter = jitter + } + stats.PacketLost += uint64(rr.TotalLost) + } + } + } + + direction := prometheus.Incoming + if streamType == livekit.StreamType_DOWNSTREAM { + direction = prometheus.Outgoing + } + + prometheus.IncrementRTCP(direction, stats.NackCount, stats.PliCount, stats.FirCount) + + t.RLock() + w := t.workers[participantID] + t.RUnlock() + if w != nil { + w.OnRTCP(streamType, stats) + } +} + +func (t *telemetryServiceInternal) Report(ctx context.Context, stats []*livekit.AnalyticsStat) { + for _, stat := range stats { + direction := prometheus.Incoming + if stat.Kind == livekit.StreamType_DOWNSTREAM { + direction = prometheus.Outgoing + } + + prometheus.IncrementPackets(direction, stat.TotalPackets) + prometheus.IncrementBytes(direction, stat.TotalBytes) + } + + t.analytics.SendStats(ctx, stats) +} + +func (t *telemetryServiceInternal) SendAnalytics() { + for _, worker := range t.workers { + worker.Update() + } +} diff --git a/pkg/telemetry/telemetryserviceevents.go b/pkg/telemetry/telemetryserviceinternalevents.go similarity index 79% rename from pkg/telemetry/telemetryserviceevents.go rename to pkg/telemetry/telemetryserviceinternalevents.go index 94418b826..d212f76de 100644 --- a/pkg/telemetry/telemetryserviceevents.go +++ b/pkg/telemetry/telemetryserviceinternalevents.go @@ -13,7 +13,7 @@ import ( "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) -func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) { +func (t *telemetryServiceInternal) RoomStarted(ctx context.Context, room *livekit.Room) { prometheus.RoomStarted() t.notifyEvent(ctx, &livekit.WebhookEvent{ @@ -28,7 +28,7 @@ func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) }) } -func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { +func (t *telemetryServiceInternal) RoomEnded(ctx context.Context, room *livekit.Room) { prometheus.RoomEnded(time.Unix(room.CreationTime, 0)) t.notifyEvent(ctx, &livekit.WebhookEvent{ @@ -44,7 +44,7 @@ func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { }) } -func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, +func (t *telemetryServiceInternal) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo) { t.Lock() t.workers[participant.Sid] = newStatsWorker(ctx, t, room.Sid, room.Name, participant.Sid) @@ -68,7 +68,7 @@ func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit. }) } -func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { +func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { t.Lock() if w := t.workers[participant.Sid]; w != nil { w.Close() @@ -93,7 +93,7 @@ func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Ro }) } -func (t *telemetryService) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) { +func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) { prometheus.AddPublishedTrack(track.Type.String()) roomID, roomName := t.getRoomDetails(participantID) @@ -107,7 +107,7 @@ func (t *telemetryService) TrackPublished(ctx context.Context, participantID str }) } -func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) { +func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) { roomID := "" roomName := "" t.RLock() @@ -131,7 +131,7 @@ func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID s }) } -func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { +func (t *telemetryServiceInternal) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { prometheus.AddSubscribedTrack(track.Type.String()) roomID, roomName := t.getRoomDetails(participantID) @@ -145,7 +145,7 @@ func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID st }) } -func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { +func (t *telemetryServiceInternal) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { prometheus.SubSubscribedTrack(track.Type.String()) roomID, roomName := t.getRoomDetails(participantID) @@ -159,7 +159,7 @@ func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID }) } -func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) { +func (t *telemetryServiceInternal) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) { t.notifyEvent(ctx, &livekit.WebhookEvent{ Event: webhook.EventRecordingStarted, RecordingInfo: ri, @@ -173,7 +173,7 @@ func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.Rec }) } -func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) { +func (t *telemetryServiceInternal) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) { t.notifyEvent(ctx, &livekit.WebhookEvent{ Event: webhook.EventRecordingFinished, RecordingInfo: ri, @@ -187,7 +187,7 @@ func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.Recor }) } -func (t *telemetryService) getRoomDetails(participantID string) (string, string) { +func (t *telemetryServiceInternal) getRoomDetails(participantID string) (string, string) { t.RLock() w := t.workers[participantID] t.RUnlock() @@ -197,7 +197,7 @@ func (t *telemetryService) getRoomDetails(participantID string) (string, string) return "", "" } -func (t *telemetryService) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) { +func (t *telemetryServiceInternal) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) { if t.notifier == nil { return } diff --git a/pkg/telemetry/test/telemetry_service_test.go b/pkg/telemetry/test/telemetry_service_test.go index 1ba0bd4c6..9c4980b89 100644 --- a/pkg/telemetry/test/telemetry_service_test.go +++ b/pkg/telemetry/test/telemetry_service_test.go @@ -3,7 +3,6 @@ package telemetrytest import ( "context" "testing" - "time" "github.com/livekit/protocol/livekit" "github.com/stretchr/testify/require" @@ -13,36 +12,57 @@ import ( ) type telemetryServiceFixture struct { - sut telemetry.TelemetryService + sut telemetry.TelemetryServiceInternal analytics *telemetryfakes.FakeAnalyticsService } func createFixture() *telemetryServiceFixture { fixture := &telemetryServiceFixture{} fixture.analytics = &telemetryfakes.FakeAnalyticsService{} - fixture.sut = telemetry.NewTelemetryService(nil, fixture.analytics) + fixture.sut = telemetry.NewTelemetryServiceInternal(nil, fixture.analytics) return fixture } -func Test_TelemetryService_Downstream_Stats(t *testing.T) { +func Test_OnDownstreamPacket(t *testing.T) { fixture := createFixture() + //prepare room := &livekit.Room{} partSID := "part1" clientInfo := &livekit.ClientInfo{Sdk: 2} participantInfo := &livekit.ParticipantInfo{Sid: partSID} fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo) - totalBytes := 33 - fixture.sut.OnDownstreamPacket(partSID, totalBytes) - // call participant left to trigger sending of analytics - fixture.sut.ParticipantLeft(context.Background(), room, participantInfo) - - time.Sleep(time.Millisecond * 100) // wait for Update function to be called in go routine + //do + packets := []int{33, 23} + totalBytes := packets[0] + packets[1] + totalPackets := len(packets) + for i := range packets { + fixture.sut.OnDownstreamPacket(partSID, packets[i]) + } + fixture.sut.SendAnalytics() + //test require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) _, stats := fixture.analytics.SendStatsArgsForCall(0) require.Equal(t, 1, len(stats)) require.Equal(t, livekit.StreamType_DOWNSTREAM, stats[0].Kind) require.Equal(t, totalBytes, int(stats[0].TotalBytes)) + require.Equal(t, totalPackets, int(stats[0].TotalPackets)) +} + +func Test_AnalyticsSentWhenParticipantLeaves(t *testing.T) { + fixture := createFixture() + + //prepare + room := &livekit.Room{} + partSID := "part1" + participantInfo := &livekit.ParticipantInfo{Sid: partSID} + fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil) + + //do + fixture.sut.ParticipantLeft(context.Background(), room, participantInfo) + + //test + require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) }