From bf46e998b236f2b04090292187b9bf041855c700 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 9 Nov 2021 00:06:07 -0800 Subject: [PATCH] Sfu/buffer stats for telemetry (#173) * more buffer stats for analytics * update names * fix jitter and lost rate * don't return on participantLeft if they never published --- .gitignore | 2 +- pkg/rtc/mediatrack.go | 16 ++-- pkg/service/recordingservice.go | 16 ++++ pkg/telemetry/events.go | 107 ++++++++++++------------- pkg/telemetry/interceptor.go | 56 ++++++++++++++ pkg/telemetry/prometheus/packets.go | 20 +++-- pkg/telemetry/service.go | 116 ++++++++++++++-------------- pkg/telemetry/stats.go | 64 --------------- pkg/telemetry/statsworker.go | 89 +++++++++++++++++++++ 9 files changed, 295 insertions(+), 191 deletions(-) create mode 100644 pkg/telemetry/interceptor.go delete mode 100644 pkg/telemetry/stats.go create mode 100644 pkg/telemetry/statsworker.go diff --git a/.gitignore b/.gitignore index 51271e375..e14ce8fc2 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,4 @@ proto/ .DS_Store # IDE -.idea/jsonSchemas.xml \ No newline at end of file +.idea diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index f08e2fb96..f7463f946 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -6,9 +6,6 @@ import ( "sync/atomic" "time" - "github.com/livekit/livekit-server/pkg/sfu" - "github.com/livekit/livekit-server/pkg/sfu/buffer" - "github.com/livekit/livekit-server/pkg/sfu/twcc" "github.com/livekit/protocol/logger" livekit "github.com/livekit/protocol/proto" "github.com/livekit/protocol/utils" @@ -18,6 +15,9 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/livekit-server/pkg/sfu/twcc" "github.com/livekit/livekit-server/pkg/telemetry" ) @@ -127,7 +127,7 @@ func (t *MediaTrack) SetMuted(muted bool) { if t.receiver != nil { t.receiver.SetUpTrackPaused(muted) } - // mute all of the subscribedtracks + // mute all subscribed tracks for _, st := range t.subscribedTracks { st.SetPublisherMuted(muted) } @@ -247,7 +247,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { delete(t.subscribedTracks, sub.ID()) t.lock.Unlock() - t.params.Telemetry.UnsubscribedTrack(sub.ID(), sub.Identity(), t.ToProto()) + t.params.Telemetry.TrackUnsubscribed(sub.ID(), sub.Identity(), t.ToProto()) // ignore if the subscribing sub is not connected if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed { @@ -293,7 +293,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { sub.Negotiate() }() - t.params.Telemetry.SubscribedTrack(sub.ID(), sub.Identity(), t.ToProto()) + t.params.Telemetry.TrackSubscribed(sub.ID(), sub.Identity(), t.ToProto()) return nil } @@ -367,12 +367,12 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra onclose := t.onClose t.lock.Unlock() t.RemoveAllSubscribers() - t.params.Telemetry.UnpublishedTrack(t.params.ParticipantID, t.params.ParticipantIdentity, t.ToProto()) + t.params.Telemetry.TrackUnpublished(t.params.ParticipantID, t.params.ParticipantIdentity, t.ToProto(), uint32(track.SSRC())) if onclose != nil { onclose() } }) - t.params.Telemetry.PublishedTrack(t.params.ParticipantID, t.params.ParticipantIdentity, t.ToProto()) + t.params.Telemetry.TrackPublished(t.params.ParticipantID, t.params.ParticipantIdentity, t.ToProto(), buff) if t.Kind() == livekit.TrackType_AUDIO { t.buffer = buff diff --git a/pkg/service/recordingservice.go b/pkg/service/recordingservice.go index 600662d8e..8028a4a74 100644 --- a/pkg/service/recordingservice.go +++ b/pkg/service/recordingservice.go @@ -3,6 +3,7 @@ package service import ( "context" "errors" + "time" "github.com/livekit/protocol/logger" livekit "github.com/livekit/protocol/proto" @@ -62,7 +63,9 @@ func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.Star return nil, err } + logger.Debugw("recording started", "recordingID", recordingId) s.telemetry.RecordingStarted(ctx, recordingId, req) + return &livekit.StartRecordingResponse{RecordingId: recordingId}, nil } @@ -142,6 +145,19 @@ func (s *RecordingService) resultsWorker() { logger.Errorw("failed to read results", err) continue } + + // log results + values := []interface{}{"recordingID", res.Id} + if res.Error != "" { + values = append(values, "error", res.Error) + } else { + values = append(values, "duration", time.Duration(res.Duration*1e9)) + if res.DownloadUrl != "" { + values = append(values, "url", res.DownloadUrl) + } + } + logger.Debugw("recording ended", values...) + s.telemetry.RecordingEnded(res) case <-s.shutdown: _ = sub.Close() diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index d5f39b572..4849e5fc0 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -8,13 +8,14 @@ import ( livekit "github.com/livekit/protocol/proto" "github.com/livekit/protocol/webhook" + "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) -func (s *TelemetryService) RoomStarted(ctx context.Context, room *livekit.Room) { - s.pool.Submit(prometheus.RoomStarted) +func (t *TelemetryService) RoomStarted(ctx context.Context, room *livekit.Room) { + prometheus.RoomStarted() - s.notifyEvent(ctx, &livekit.WebhookEvent{ + t.notifyEvent(ctx, &livekit.WebhookEvent{ Event: webhook.EventRoomStarted, Room: room, }) @@ -22,12 +23,10 @@ func (s *TelemetryService) RoomStarted(ctx context.Context, room *livekit.Room) // TODO: analytics service } -func (s *TelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { - s.pool.Submit(func() { - prometheus.RoomEnded(time.Unix(room.CreationTime, 0)) - }) +func (t *TelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { + prometheus.RoomEnded(time.Unix(room.CreationTime, 0)) - s.notifyEvent(ctx, &livekit.WebhookEvent{ + t.notifyEvent(ctx, &livekit.WebhookEvent{ Event: webhook.EventRoomFinished, Room: room, }) @@ -35,10 +34,10 @@ func (s *TelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { // TODO: analytics service } -func (s *TelemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { - s.pool.Submit(prometheus.AddParticipant) +func (t *TelemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { + prometheus.AddParticipant() - s.notifyEvent(ctx, &livekit.WebhookEvent{ + t.notifyEvent(ctx, &livekit.WebhookEvent{ Event: webhook.EventParticipantJoined, Room: room, Participant: participant, @@ -47,10 +46,17 @@ func (s *TelemetryService) ParticipantJoined(ctx context.Context, room *livekit. // TODO: analytics service } -func (s *TelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { - s.pool.Submit(prometheus.SubParticipant) +func (t *TelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { + t.Lock() + if w := t.workers[participant.Sid]; w != nil { + w.Close() + delete(t.workers, participant.Sid) + } + t.Unlock() - s.notifyEvent(ctx, &livekit.WebhookEvent{ + prometheus.SubParticipant() + + t.notifyEvent(ctx, &livekit.WebhookEvent{ Event: webhook.EventParticipantLeft, Room: room, Participant: participant, @@ -59,42 +65,51 @@ func (s *TelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Ro // TODO: analytics service } -func (s *TelemetryService) PublishedTrack(SID, identity string, track *livekit.TrackInfo) { - s.pool.Submit(func() { - prometheus.AddPublishedTrack(track.Type.String()) - }) +func (t *TelemetryService) TrackPublished(participantID, identity string, track *livekit.TrackInfo, buff *buffer.Buffer) { + t.Lock() + if t.workers[participantID] == nil { + t.workers[participantID] = NewStatsWorker(func(diff *buffer.Stats) { + t.HandleIncomingRTP(participantID, identity, diff) + }) + } + t.workers[participantID].AddBuffer(buff) + t.Unlock() + + prometheus.AddPublishedTrack(track.Type.String()) // TODO: analytics service } -func (s *TelemetryService) UnpublishedTrack(SID, identity string, track *livekit.TrackInfo) { - s.pool.Submit(func() { - prometheus.SubPublishedTrack(track.Type.String()) - }) +func (t *TelemetryService) TrackUnpublished(participantID, identity string, track *livekit.TrackInfo, ssrc uint32) { + t.RLock() + if w := t.workers[participantID]; w != nil { + w.RemoveBuffer(ssrc) + t.RUnlock() + } else { + logger.Errorw("missing stats worker", nil, "participantID", participantID) + t.RUnlock() + return + } + + prometheus.SubPublishedTrack(track.Type.String()) // TODO: analytics service } -func (s *TelemetryService) SubscribedTrack(SID, identity string, track *livekit.TrackInfo) { - s.pool.Submit(func() { - prometheus.AddSubscribedTrack(track.Type.String()) - }) +func (t *TelemetryService) TrackSubscribed(participantID, identity string, track *livekit.TrackInfo) { + prometheus.AddSubscribedTrack(track.Type.String()) // TODO: analytics service } -func (s *TelemetryService) UnsubscribedTrack(SID, identity string, track *livekit.TrackInfo) { - s.pool.Submit(func() { - prometheus.SubSubscribedTrack(track.Type.String()) - }) +func (t *TelemetryService) TrackUnsubscribed(participantID, identity string, track *livekit.TrackInfo) { + prometheus.SubSubscribedTrack(track.Type.String()) // TODO: analytics service } -func (s *TelemetryService) RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) { - logger.Debugw("recording started", "recordingID", recordingID) - - s.notifyEvent(ctx, &livekit.WebhookEvent{ +func (t *TelemetryService) RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) { + t.notifyEvent(ctx, &livekit.WebhookEvent{ Event: webhook.EventRecordingStarted, RecordingInfo: &livekit.RecordingInfo{ Id: recordingID, @@ -105,20 +120,8 @@ func (s *TelemetryService) RecordingStarted(ctx context.Context, recordingID str // TODO: analytics service } -func (s *TelemetryService) RecordingEnded(res *livekit.RecordingResult) { - // log results - values := []interface{}{"recordingID", res.Id} - if res.Error != "" { - values = append(values, "error", res.Error) - } else { - values = append(values, "duration", time.Duration(res.Duration*1e9)) - if res.DownloadUrl != "" { - values = append(values, "url", res.DownloadUrl) - } - } - logger.Debugw("recording ended", values...) - - s.notifyEvent(context.Background(), &livekit.WebhookEvent{ +func (t *TelemetryService) RecordingEnded(res *livekit.RecordingResult) { + t.notifyEvent(context.Background(), &livekit.WebhookEvent{ Event: webhook.EventRecordingFinished, RecordingResult: res, }) @@ -126,13 +129,13 @@ func (s *TelemetryService) RecordingEnded(res *livekit.RecordingResult) { // TODO: analytics service } -func (s *TelemetryService) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) { - if s.notifier == nil { +func (t *TelemetryService) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) { + if t.notifier == nil { return } - s.pool.Submit(func() { - if err := s.notifier.Notify(ctx, event); err != nil { + t.webhookPool.Submit(func() { + if err := t.notifier.Notify(ctx, event); err != nil { logger.Warnw("failed to notify webhook", err, "event", event.Event) } }) diff --git a/pkg/telemetry/interceptor.go b/pkg/telemetry/interceptor.go new file mode 100644 index 000000000..0ae826968 --- /dev/null +++ b/pkg/telemetry/interceptor.go @@ -0,0 +1,56 @@ +package telemetry + +import ( + "github.com/pion/interceptor" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +type StatsInterceptorFactory struct { + t *TelemetryService + participantID string + identity string +} + +func (f *StatsInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) { + return &StatsInterceptor{ + t: f.t, + participantID: f.participantID, + identity: f.identity, + }, nil +} + +type StatsInterceptor struct { + interceptor.NoOp + + t *TelemetryService + participantID string + identity string +} + +// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might +// change in the future. The returned method will be called once per packet batch. +func (s *StatsInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { + return interceptor.RTCPReaderFunc(func(bytes []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + s.t.HandleIncomingRTCP(s.participantID, s.identity, bytes) + return reader.Read(bytes, attributes) + }) +} + +// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method +// will be called once per packet batch. +func (s *StatsInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { + return interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { + s.t.HandleOutgoingRTCP(s.participantID, s.identity, pkts) + return writer.Write(pkts, attributes) + }) +} + +// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method +// will be called once per rtp packet. +func (s *StatsInterceptor) BindLocalStream(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + s.t.HandleOutgoingRTP(s.participantID, s.identity, uint64(len(payload))) + return writer.Write(header, payload, attributes) + }) +} diff --git a/pkg/telemetry/prometheus/packets.go b/pkg/telemetry/prometheus/packets.go index 78e66be70..40ac10b39 100644 --- a/pkg/telemetry/prometheus/packets.go +++ b/pkg/telemetry/prometheus/packets.go @@ -57,15 +57,21 @@ func initPacketStats() { prometheus.MustRegister(promFirTotal) } -func IncrementPackets(direction Direction, pktLen uint64) { - promPacketTotal.WithLabelValues(string(direction)).Add(1) - promPacketBytes.WithLabelValues(string(direction)).Add(float64(pktLen)) +func IncrementPackets(direction Direction, count uint64) { + promPacketTotal.WithLabelValues(string(direction)).Add(float64(count)) if direction == Incoming { - atomic.AddUint64(&atomicPacketsIn, 1) - atomic.AddUint64(&atomicBytesIn, pktLen) + atomic.AddUint64(&atomicPacketsIn, count) } else { - atomic.AddUint64(&atomicPacketsOut, 1) - atomic.AddUint64(&atomicBytesOut, pktLen) + atomic.AddUint64(&atomicPacketsOut, count) + } +} + +func IncrementBytes(direction Direction, count uint64) { + promPacketBytes.WithLabelValues(string(direction)).Add(float64(count)) + if direction == Incoming { + atomic.AddUint64(&atomicBytesIn, count) + } else { + atomic.AddUint64(&atomicBytesOut, count) } } diff --git a/pkg/telemetry/service.go b/pkg/telemetry/service.go index 3d57a2cea..0a80c6015 100644 --- a/pkg/telemetry/service.go +++ b/pkg/telemetry/service.go @@ -1,91 +1,89 @@ package telemetry import ( + "sync" + "github.com/gammazero/workerpool" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/webhook" - "github.com/pion/interceptor" "github.com/pion/rtcp" - "github.com/pion/rtp" + + "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) type TelemetryService struct { - notifier webhook.Notifier - pool *workerpool.WorkerPool + notifier webhook.Notifier + webhookPool *workerpool.WorkerPool + + sync.RWMutex + // one worker per participant + workers map[string]*StatsWorker } func NewTelemetryService(notifier webhook.Notifier) *TelemetryService { return &TelemetryService{ - notifier: notifier, - pool: workerpool.New(10), + notifier: notifier, + webhookPool: workerpool.New(1), + workers: make(map[string]*StatsWorker), } } -type StatsInterceptorFactory struct { - t *TelemetryService - participantID string - identity string -} - -func (s *TelemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory { +func (t *TelemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory { return &StatsInterceptorFactory{ - t: s, + t: t, participantID: participantID, identity: identity, } } -func (f *StatsInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) { - return &StatsInterceptor{ - t: f.t, - participantID: f.participantID, - identity: f.identity, - }, nil +func (t *TelemetryService) HandleIncomingRTP(participantID, identity string, diff *buffer.Stats) { + prometheus.IncrementPackets(prometheus.Incoming, uint64(diff.PacketCount)) + prometheus.IncrementBytes(prometheus.Incoming, diff.TotalByte) + + // TODO: analytics service + // diff.LastExpected, diff.LastReceived, diff.Jitter, diff.LostRate } -type StatsInterceptor struct { - interceptor.NoOp +func (t *TelemetryService) HandleIncomingRTCP(participantID, identity string, bytes []byte) { + pkts, err := rtcp.Unmarshal(bytes) + if err != nil { + logger.Errorw("Interceptor failed to unmarshal rtcp packets", err) + return + } - t *TelemetryService - participantID string - identity string + for _, pkt := range pkts { + switch pkt.(type) { + case *rtcp.TransportLayerNack: + prometheus.IncrementNack(prometheus.Incoming) + case *rtcp.PictureLossIndication: + prometheus.IncrementPLI(prometheus.Incoming) + case *rtcp.FullIntraRequest: + prometheus.IncrementFIR(prometheus.Incoming) + } + } + + // TODO: analytics service } -// --- Incoming --- +func (t *TelemetryService) HandleOutgoingRTP(participantID, identity string, pktLen uint64) { + prometheus.IncrementPackets(prometheus.Outgoing, 1) + prometheus.IncrementBytes(prometheus.Outgoing, pktLen) -// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might -// change in the future. The returned method will be called once per packet batch. -func (s *StatsInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { - return interceptor.RTCPReaderFunc(func(bytes []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - s.t.HandleIncomingRTCP(s.participantID, s.identity, bytes) - return reader.Read(bytes, attributes) - }) + // TODO: analytics service } -// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method -// will be called once per rtp packet. -func (s *StatsInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { - return interceptor.RTPReaderFunc(func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - s.t.HandleIncomingRTP(s.participantID, s.identity, uint64(len(payload))) - return reader.Read(payload, attributes) - }) -} +func (t *TelemetryService) HandleOutgoingRTCP(participantID, identity string, pkts []rtcp.Packet) { + for _, pkt := range pkts { + switch pkt.(type) { + case *rtcp.TransportLayerNack: + prometheus.IncrementNack(prometheus.Outgoing) + case *rtcp.PictureLossIndication: + prometheus.IncrementPLI(prometheus.Outgoing) + case *rtcp.FullIntraRequest: + prometheus.IncrementFIR(prometheus.Outgoing) + } + } -// --- Outgoing --- - -// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method -// will be called once per packet batch. -func (s *StatsInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { - return interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { - s.t.HandleOutgoingRTCP(s.participantID, s.identity, pkts) - return writer.Write(pkts, attributes) - }) -} - -// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method -// will be called once per rtp packet. -func (s *StatsInterceptor) BindLocalStream(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { - return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - s.t.HandleOutgoingRTP(s.participantID, s.identity, uint64(len(payload))) - return writer.Write(header, payload, attributes) - }) + // TODO: analytics service } diff --git a/pkg/telemetry/stats.go b/pkg/telemetry/stats.go deleted file mode 100644 index 74f0ae495..000000000 --- a/pkg/telemetry/stats.go +++ /dev/null @@ -1,64 +0,0 @@ -package telemetry - -import ( - "github.com/livekit/protocol/logger" - "github.com/pion/rtcp" - - "github.com/livekit/livekit-server/pkg/telemetry/prometheus" -) - -func (s *TelemetryService) HandleIncomingRTCP(participantID, identity string, bytes []byte) { - pkts, err := rtcp.Unmarshal(bytes) - if err != nil { - logger.Errorw("Interceptor failed to unmarshal rtcp packets", err) - return - } - - s.pool.Submit(func() { - for _, pkt := range pkts { - switch pkt.(type) { - case *rtcp.TransportLayerNack: - prometheus.IncrementNack(prometheus.Incoming) - case *rtcp.PictureLossIndication: - prometheus.IncrementPLI(prometheus.Incoming) - case *rtcp.FullIntraRequest: - prometheus.IncrementFIR(prometheus.Incoming) - } - } - }) - - // TODO: analytics service -} - -func (s *TelemetryService) HandleIncomingRTP(participantID, identity string, pktLen uint64) { - s.pool.Submit(func() { - prometheus.IncrementPackets(prometheus.Incoming, pktLen) - }) - - // TODO: analytics service -} - -func (s *TelemetryService) HandleOutgoingRTCP(participantID, identity string, pkts []rtcp.Packet) { - s.pool.Submit(func() { - for _, pkt := range pkts { - switch pkt.(type) { - case *rtcp.TransportLayerNack: - prometheus.IncrementNack(prometheus.Outgoing) - case *rtcp.PictureLossIndication: - prometheus.IncrementPLI(prometheus.Outgoing) - case *rtcp.FullIntraRequest: - prometheus.IncrementFIR(prometheus.Outgoing) - } - } - }) - - // TODO: analytics service -} - -func (s *TelemetryService) HandleOutgoingRTP(participantID, identity string, pktLen uint64) { - s.pool.Submit(func() { - prometheus.IncrementPackets(prometheus.Outgoing, pktLen) - }) - - // TODO: analytics service -} diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go new file mode 100644 index 000000000..2869fba2e --- /dev/null +++ b/pkg/telemetry/statsworker.go @@ -0,0 +1,89 @@ +package telemetry + +import ( + "sync" + "time" + + "github.com/livekit/livekit-server/pkg/sfu/buffer" +) + +const updateFrequency = time.Second * 10 + +// StatsWorker handles incoming RTP statistics instead of the stream interceptor +type StatsWorker struct { + sync.RWMutex + buffers map[uint32]*buffer.Buffer + lastStats *buffer.Stats + onUpdate func(diff *buffer.Stats) + close chan struct{} +} + +func NewStatsWorker(onUpdate func(*buffer.Stats)) *StatsWorker { + s := &StatsWorker{ + buffers: make(map[uint32]*buffer.Buffer), + onUpdate: onUpdate, + close: make(chan struct{}, 1), + } + go s.run() + return s +} + +func (s *StatsWorker) run() { + for { + select { + case <-s.close: + return + case <-time.After(updateFrequency): + s.onUpdate(s.Calc()) + } + } +} + +func (s *StatsWorker) AddBuffer(buffer *buffer.Buffer) { + s.Lock() + s.buffers[buffer.GetMediaSSRC()] = buffer + s.Unlock() +} + +func (s *StatsWorker) RemoveBuffer(ssrc uint32) { + s.Lock() + delete(s.buffers, ssrc) + s.Unlock() +} + +func (s *StatsWorker) Calc() *buffer.Stats { + s.RLock() + total := &buffer.Stats{} + for _, buff := range s.buffers { + stats := buff.GetStats() + total.PacketCount += stats.PacketCount + total.TotalByte += stats.TotalByte + total.LastExpected += stats.LastExpected + total.LastReceived += stats.LastReceived + if stats.Jitter > total.Jitter { + total.Jitter = stats.Jitter + } + } + s.RUnlock() + + var diff *buffer.Stats + if s.lastStats != nil { + diff = &buffer.Stats{ + LastExpected: total.LastExpected - s.lastStats.LastExpected, + LastReceived: total.LastReceived - s.lastStats.LastReceived, + PacketCount: total.PacketCount - s.lastStats.PacketCount, + TotalByte: total.TotalByte - s.lastStats.TotalByte, + Jitter: total.Jitter, + } + } else { + diff = total + } + diff.LostRate = float32(diff.LastExpected-diff.LastReceived) / float32(diff.LastExpected) + + s.lastStats = diff + return diff +} + +func (s *StatsWorker) Close() { + close(s.close) +}