Make TelemetryService testable (#276)

* Make TelemetryService testable

Timer is extracted for better testability of telemetryservice.

Divided TelemetryService to internal part that:
- contains business logic
- does not contain timer
- and therefore testable

and external part that:
- does not contain business logic
- contains timer to send analytics every 10 seconds for all participants.
- does not need tests

* Add Test_AnalyticsSentWhenParticipantLeaves

* Fix test
This commit is contained in:
Artur Shellunts
2021-12-28 12:54:56 +01:00
committed by GitHub
parent 3108ef22ad
commit 2209edce20
6 changed files with 229 additions and 123 deletions
+1 -1
View File
@@ -6,7 +6,7 @@ import (
"github.com/pion/rtcp"
)
func (t *telemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory {
func (t *telemetryServiceInternal) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory {
return &StatsInterceptorFactory{
t: t,
participantID: participantID,
+3 -24
View File
@@ -3,7 +3,6 @@ package telemetry
import (
"context"
"sync"
"time"
"github.com/livekit/protocol/livekit"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -11,12 +10,10 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
const updateFrequency = time.Second * 10
// StatsWorker handles participant stats
type StatsWorker struct {
ctx context.Context
t TelemetryService
t TelemetryReporter
roomID string
roomName string
participantID string
@@ -27,8 +24,6 @@ type StatsWorker struct {
incoming *Stats
outgoing *Stats
close chan struct{}
}
type Stats struct {
@@ -40,7 +35,7 @@ type Stats struct {
prevBytes uint64
}
func newStatsWorker(ctx context.Context, t TelemetryService, roomID, roomName, participantID string) *StatsWorker {
func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID, roomName, participantID string) *StatsWorker {
s := &StatsWorker{
ctx: ctx,
t: t,
@@ -63,26 +58,10 @@ func newStatsWorker(ctx context.Context, t TelemetryService, roomID, roomName, p
ParticipantId: participantID,
RoomName: roomName,
}},
close: make(chan struct{}, 1),
}
go s.run()
return s
}
func (s *StatsWorker) run() {
for {
select {
case <-s.close:
// drain
s.Update()
return
case <-time.After(updateFrequency):
s.Update()
}
}
}
func (s *StatsWorker) AddBuffer(buffer *buffer.Buffer) {
s.Lock()
defer s.Unlock()
@@ -190,5 +169,5 @@ func (s *StatsWorker) RemoveBuffer(ssrc uint32) {
}
func (s *StatsWorker) Close() {
close(s.close)
s.Update()
}
+64 -76
View File
@@ -2,24 +2,23 @@ package telemetry
import (
"context"
"sync"
"time"
"github.com/gammazero/workerpool"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/webhook"
"github.com/pion/rtcp"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
const updateFrequency = time.Second * 10
type TelemetryService interface {
// stats
NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory
AddUpTrack(participantID string, buff *buffer.Buffer)
OnDownstreamPacket(participantID string, bytes int)
HandleRTCP(streamType livekit.StreamType, participantID string, pkts []rtcp.Packet)
Report(ctx context.Context, stats []*livekit.AnalyticsStat)
// events
RoomStarted(ctx context.Context, room *livekit.Room)
@@ -35,91 +34,80 @@ type TelemetryService interface {
}
type telemetryService struct {
notifier webhook.Notifier
webhookPool *workerpool.WorkerPool
sync.RWMutex
// one worker per participant
workers map[string]*StatsWorker
analytics AnalyticsService
internalService TelemetryServiceInternal
}
func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService {
return &telemetryService{
notifier: notifier,
webhookPool: workerpool.New(1),
workers: make(map[string]*StatsWorker),
analytics: analytics,
t := &telemetryService{
internalService: NewTelemetryServiceInternal(notifier, analytics),
}
go t.run()
return t
}
func (t *telemetryService) run() {
for {
select {
case <-time.After(updateFrequency):
t.internalService.SendAnalytics()
}
}
}
func (t *telemetryService) AddUpTrack(participantID string, buff *buffer.Buffer) {
t.RLock()
w := t.workers[participantID]
t.RUnlock()
if w != nil {
w.AddBuffer(buff)
}
t.internalService.AddUpTrack(participantID, buff)
}
func (t *telemetryService) OnDownstreamPacket(participantID string, bytes int) {
t.RLock()
w := t.workers[participantID]
t.RUnlock()
if w != nil {
w.OnDownstreamPacket(bytes)
}
t.internalService.OnDownstreamPacket(participantID, bytes)
}
func (t *telemetryService) HandleRTCP(streamType livekit.StreamType, participantID string, pkts []rtcp.Packet) {
stats := &livekit.AnalyticsStat{}
for _, pkt := range pkts {
switch pkt := pkt.(type) {
case *rtcp.TransportLayerNack:
stats.NackCount++
case *rtcp.PictureLossIndication:
stats.PliCount++
case *rtcp.FullIntraRequest:
stats.FirCount++
case *rtcp.ReceiverReport:
for _, rr := range pkt.Reports {
if delay := uint64(rr.Delay); delay > stats.Delay {
stats.Delay = delay
}
if jitter := float64(rr.Jitter); jitter > stats.Jitter {
stats.Jitter = jitter
}
stats.PacketLost += uint64(rr.TotalLost)
}
}
}
direction := prometheus.Incoming
if streamType == livekit.StreamType_DOWNSTREAM {
direction = prometheus.Outgoing
}
prometheus.IncrementRTCP(direction, stats.NackCount, stats.PliCount, stats.FirCount)
t.RLock()
w := t.workers[participantID]
t.RUnlock()
if w != nil {
w.OnRTCP(streamType, stats)
}
t.internalService.HandleRTCP(streamType, participantID, pkts)
}
func (t *telemetryService) Report(ctx context.Context, stats []*livekit.AnalyticsStat) {
for _, stat := range stats {
direction := prometheus.Incoming
if stat.Kind == livekit.StreamType_DOWNSTREAM {
direction = prometheus.Outgoing
}
prometheus.IncrementPackets(direction, stat.TotalPackets)
prometheus.IncrementBytes(direction, stat.TotalBytes)
}
t.analytics.SendStats(ctx, stats)
func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
t.internalService.RoomStarted(ctx, room)
}
func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
t.internalService.RoomEnded(ctx, room)
}
func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo) {
t.internalService.ParticipantJoined(ctx, room, participant, clientInfo)
}
func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
t.internalService.ParticipantLeft(ctx, room, participant)
}
func (t *telemetryService) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) {
t.internalService.TrackPublished(ctx, participantID, track)
}
func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) {
t.internalService.TrackUnpublished(ctx, participantID, track, ssrc)
}
func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
t.internalService.TrackSubscribed(ctx, participantID, track)
}
func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
t.internalService.TrackUnsubscribed(ctx, participantID, track)
}
func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) {
t.internalService.RecordingStarted(ctx, ri)
}
func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) {
t.internalService.RecordingEnded(ctx, ri)
}
func (t *telemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory {
return t.internalService.NewStatsInterceptorFactory(participantID, identity)
}
+119
View File
@@ -0,0 +1,119 @@
package telemetry
import (
"context"
"sync"
"github.com/gammazero/workerpool"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/webhook"
"github.com/pion/rtcp"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
type TelemetryServiceInternal interface {
TelemetryService
SendAnalytics()
}
type TelemetryReporter interface {
Report(ctx context.Context, stats []*livekit.AnalyticsStat)
}
type telemetryServiceInternal struct {
notifier webhook.Notifier
webhookPool *workerpool.WorkerPool
sync.RWMutex
// one worker per participant
workers map[string]*StatsWorker
analytics AnalyticsService
}
func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsService) TelemetryServiceInternal {
return &telemetryServiceInternal{
notifier: notifier,
webhookPool: workerpool.New(1),
workers: make(map[string]*StatsWorker),
analytics: analytics,
}
}
func (t *telemetryServiceInternal) AddUpTrack(participantID string, buff *buffer.Buffer) {
t.RLock()
w := t.workers[participantID]
t.RUnlock()
if w != nil {
w.AddBuffer(buff)
}
}
func (t *telemetryServiceInternal) OnDownstreamPacket(participantID string, bytes int) {
t.RLock()
w := t.workers[participantID]
t.RUnlock()
if w != nil {
w.OnDownstreamPacket(bytes)
}
}
func (t *telemetryServiceInternal) HandleRTCP(streamType livekit.StreamType, participantID string, pkts []rtcp.Packet) {
stats := &livekit.AnalyticsStat{}
for _, pkt := range pkts {
switch pkt := pkt.(type) {
case *rtcp.TransportLayerNack:
stats.NackCount++
case *rtcp.PictureLossIndication:
stats.PliCount++
case *rtcp.FullIntraRequest:
stats.FirCount++
case *rtcp.ReceiverReport:
for _, rr := range pkt.Reports {
if delay := uint64(rr.Delay); delay > stats.Delay {
stats.Delay = delay
}
if jitter := float64(rr.Jitter); jitter > stats.Jitter {
stats.Jitter = jitter
}
stats.PacketLost += uint64(rr.TotalLost)
}
}
}
direction := prometheus.Incoming
if streamType == livekit.StreamType_DOWNSTREAM {
direction = prometheus.Outgoing
}
prometheus.IncrementRTCP(direction, stats.NackCount, stats.PliCount, stats.FirCount)
t.RLock()
w := t.workers[participantID]
t.RUnlock()
if w != nil {
w.OnRTCP(streamType, stats)
}
}
func (t *telemetryServiceInternal) Report(ctx context.Context, stats []*livekit.AnalyticsStat) {
for _, stat := range stats {
direction := prometheus.Incoming
if stat.Kind == livekit.StreamType_DOWNSTREAM {
direction = prometheus.Outgoing
}
prometheus.IncrementPackets(direction, stat.TotalPackets)
prometheus.IncrementBytes(direction, stat.TotalBytes)
}
t.analytics.SendStats(ctx, stats)
}
func (t *telemetryServiceInternal) SendAnalytics() {
for _, worker := range t.workers {
worker.Update()
}
}
@@ -13,7 +13,7 @@ import (
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
func (t *telemetryServiceInternal) RoomStarted(ctx context.Context, room *livekit.Room) {
prometheus.RoomStarted()
t.notifyEvent(ctx, &livekit.WebhookEvent{
@@ -28,7 +28,7 @@ func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room)
})
}
func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
func (t *telemetryServiceInternal) RoomEnded(ctx context.Context, room *livekit.Room) {
prometheus.RoomEnded(time.Unix(room.CreationTime, 0))
t.notifyEvent(ctx, &livekit.WebhookEvent{
@@ -44,7 +44,7 @@ func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
})
}
func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room,
func (t *telemetryServiceInternal) ParticipantJoined(ctx context.Context, room *livekit.Room,
participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo) {
t.Lock()
t.workers[participant.Sid] = newStatsWorker(ctx, t, room.Sid, room.Name, participant.Sid)
@@ -68,7 +68,7 @@ func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.
})
}
func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
t.Lock()
if w := t.workers[participant.Sid]; w != nil {
w.Close()
@@ -93,7 +93,7 @@ func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Ro
})
}
func (t *telemetryService) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) {
func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) {
prometheus.AddPublishedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
@@ -107,7 +107,7 @@ func (t *telemetryService) TrackPublished(ctx context.Context, participantID str
})
}
func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) {
func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) {
roomID := ""
roomName := ""
t.RLock()
@@ -131,7 +131,7 @@ func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID s
})
}
func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
func (t *telemetryServiceInternal) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
prometheus.AddSubscribedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
@@ -145,7 +145,7 @@ func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID st
})
}
func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
func (t *telemetryServiceInternal) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) {
prometheus.SubSubscribedTrack(track.Type.String())
roomID, roomName := t.getRoomDetails(participantID)
@@ -159,7 +159,7 @@ func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID
})
}
func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) {
func (t *telemetryServiceInternal) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) {
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRecordingStarted,
RecordingInfo: ri,
@@ -173,7 +173,7 @@ func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.Rec
})
}
func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) {
func (t *telemetryServiceInternal) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) {
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRecordingFinished,
RecordingInfo: ri,
@@ -187,7 +187,7 @@ func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.Recor
})
}
func (t *telemetryService) getRoomDetails(participantID string) (string, string) {
func (t *telemetryServiceInternal) getRoomDetails(participantID string) (string, string) {
t.RLock()
w := t.workers[participantID]
t.RUnlock()
@@ -197,7 +197,7 @@ func (t *telemetryService) getRoomDetails(participantID string) (string, string)
return "", ""
}
func (t *telemetryService) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) {
func (t *telemetryServiceInternal) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) {
if t.notifier == nil {
return
}
+30 -10
View File
@@ -3,7 +3,6 @@ package telemetrytest
import (
"context"
"testing"
"time"
"github.com/livekit/protocol/livekit"
"github.com/stretchr/testify/require"
@@ -13,36 +12,57 @@ import (
)
type telemetryServiceFixture struct {
sut telemetry.TelemetryService
sut telemetry.TelemetryServiceInternal
analytics *telemetryfakes.FakeAnalyticsService
}
func createFixture() *telemetryServiceFixture {
fixture := &telemetryServiceFixture{}
fixture.analytics = &telemetryfakes.FakeAnalyticsService{}
fixture.sut = telemetry.NewTelemetryService(nil, fixture.analytics)
fixture.sut = telemetry.NewTelemetryServiceInternal(nil, fixture.analytics)
return fixture
}
func Test_TelemetryService_Downstream_Stats(t *testing.T) {
func Test_OnDownstreamPacket(t *testing.T) {
fixture := createFixture()
//prepare
room := &livekit.Room{}
partSID := "part1"
clientInfo := &livekit.ClientInfo{Sdk: 2}
participantInfo := &livekit.ParticipantInfo{Sid: partSID}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, clientInfo)
totalBytes := 33
fixture.sut.OnDownstreamPacket(partSID, totalBytes)
// call participant left to trigger sending of analytics
fixture.sut.ParticipantLeft(context.Background(), room, participantInfo)
time.Sleep(time.Millisecond * 100) // wait for Update function to be called in go routine
//do
packets := []int{33, 23}
totalBytes := packets[0] + packets[1]
totalPackets := len(packets)
for i := range packets {
fixture.sut.OnDownstreamPacket(partSID, packets[i])
}
fixture.sut.SendAnalytics()
//test
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
_, stats := fixture.analytics.SendStatsArgsForCall(0)
require.Equal(t, 1, len(stats))
require.Equal(t, livekit.StreamType_DOWNSTREAM, stats[0].Kind)
require.Equal(t, totalBytes, int(stats[0].TotalBytes))
require.Equal(t, totalPackets, int(stats[0].TotalPackets))
}
func Test_AnalyticsSentWhenParticipantLeaves(t *testing.T) {
fixture := createFixture()
//prepare
room := &livekit.Room{}
partSID := "part1"
participantInfo := &livekit.ParticipantInfo{Sid: partSID}
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil)
//do
fixture.sut.ParticipantLeft(context.Background(), room, participantInfo)
//test
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
}