mirror of
https://github.com/livekit/livekit.git
synced 2026-05-25 09:54:41 +00:00
Sfu/buffer stats for telemetry (#173)
* more buffer stats for analytics * update names * fix jitter and lost rate * don't return on participantLeft if they never published
This commit is contained in:
+1
-1
@@ -24,4 +24,4 @@ proto/
|
||||
.DS_Store
|
||||
|
||||
# IDE
|
||||
.idea/jsonSchemas.xml
|
||||
.idea
|
||||
|
||||
@@ -6,9 +6,6 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/sfu"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/twcc"
|
||||
"github.com/livekit/protocol/logger"
|
||||
livekit "github.com/livekit/protocol/proto"
|
||||
"github.com/livekit/protocol/utils"
|
||||
@@ -18,6 +15,9 @@ import (
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/livekit-server/pkg/sfu"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/twcc"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
)
|
||||
|
||||
@@ -127,7 +127,7 @@ func (t *MediaTrack) SetMuted(muted bool) {
|
||||
if t.receiver != nil {
|
||||
t.receiver.SetUpTrackPaused(muted)
|
||||
}
|
||||
// mute all of the subscribedtracks
|
||||
// mute all subscribed tracks
|
||||
for _, st := range t.subscribedTracks {
|
||||
st.SetPublisherMuted(muted)
|
||||
}
|
||||
@@ -247,7 +247,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
|
||||
delete(t.subscribedTracks, sub.ID())
|
||||
t.lock.Unlock()
|
||||
|
||||
t.params.Telemetry.UnsubscribedTrack(sub.ID(), sub.Identity(), t.ToProto())
|
||||
t.params.Telemetry.TrackUnsubscribed(sub.ID(), sub.Identity(), t.ToProto())
|
||||
|
||||
// ignore if the subscribing sub is not connected
|
||||
if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed {
|
||||
@@ -293,7 +293,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
|
||||
sub.Negotiate()
|
||||
}()
|
||||
|
||||
t.params.Telemetry.SubscribedTrack(sub.ID(), sub.Identity(), t.ToProto())
|
||||
t.params.Telemetry.TrackSubscribed(sub.ID(), sub.Identity(), t.ToProto())
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -367,12 +367,12 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
|
||||
onclose := t.onClose
|
||||
t.lock.Unlock()
|
||||
t.RemoveAllSubscribers()
|
||||
t.params.Telemetry.UnpublishedTrack(t.params.ParticipantID, t.params.ParticipantIdentity, t.ToProto())
|
||||
t.params.Telemetry.TrackUnpublished(t.params.ParticipantID, t.params.ParticipantIdentity, t.ToProto(), uint32(track.SSRC()))
|
||||
if onclose != nil {
|
||||
onclose()
|
||||
}
|
||||
})
|
||||
t.params.Telemetry.PublishedTrack(t.params.ParticipantID, t.params.ParticipantIdentity, t.ToProto())
|
||||
t.params.Telemetry.TrackPublished(t.params.ParticipantID, t.params.ParticipantIdentity, t.ToProto(), buff)
|
||||
|
||||
if t.Kind() == livekit.TrackType_AUDIO {
|
||||
t.buffer = buff
|
||||
|
||||
@@ -3,6 +3,7 @@ package service
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/livekit/protocol/logger"
|
||||
livekit "github.com/livekit/protocol/proto"
|
||||
@@ -62,7 +63,9 @@ func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.Star
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logger.Debugw("recording started", "recordingID", recordingId)
|
||||
s.telemetry.RecordingStarted(ctx, recordingId, req)
|
||||
|
||||
return &livekit.StartRecordingResponse{RecordingId: recordingId}, nil
|
||||
}
|
||||
|
||||
@@ -142,6 +145,19 @@ func (s *RecordingService) resultsWorker() {
|
||||
logger.Errorw("failed to read results", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// log results
|
||||
values := []interface{}{"recordingID", res.Id}
|
||||
if res.Error != "" {
|
||||
values = append(values, "error", res.Error)
|
||||
} else {
|
||||
values = append(values, "duration", time.Duration(res.Duration*1e9))
|
||||
if res.DownloadUrl != "" {
|
||||
values = append(values, "url", res.DownloadUrl)
|
||||
}
|
||||
}
|
||||
logger.Debugw("recording ended", values...)
|
||||
|
||||
s.telemetry.RecordingEnded(res)
|
||||
case <-s.shutdown:
|
||||
_ = sub.Close()
|
||||
|
||||
+55
-52
@@ -8,13 +8,14 @@ import (
|
||||
livekit "github.com/livekit/protocol/proto"
|
||||
"github.com/livekit/protocol/webhook"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
)
|
||||
|
||||
func (s *TelemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
|
||||
s.pool.Submit(prometheus.RoomStarted)
|
||||
func (t *TelemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
|
||||
prometheus.RoomStarted()
|
||||
|
||||
s.notifyEvent(ctx, &livekit.WebhookEvent{
|
||||
t.notifyEvent(ctx, &livekit.WebhookEvent{
|
||||
Event: webhook.EventRoomStarted,
|
||||
Room: room,
|
||||
})
|
||||
@@ -22,12 +23,10 @@ func (s *TelemetryService) RoomStarted(ctx context.Context, room *livekit.Room)
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
func (s *TelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
|
||||
s.pool.Submit(func() {
|
||||
prometheus.RoomEnded(time.Unix(room.CreationTime, 0))
|
||||
})
|
||||
func (t *TelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
|
||||
prometheus.RoomEnded(time.Unix(room.CreationTime, 0))
|
||||
|
||||
s.notifyEvent(ctx, &livekit.WebhookEvent{
|
||||
t.notifyEvent(ctx, &livekit.WebhookEvent{
|
||||
Event: webhook.EventRoomFinished,
|
||||
Room: room,
|
||||
})
|
||||
@@ -35,10 +34,10 @@ func (s *TelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
func (s *TelemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
|
||||
s.pool.Submit(prometheus.AddParticipant)
|
||||
func (t *TelemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
|
||||
prometheus.AddParticipant()
|
||||
|
||||
s.notifyEvent(ctx, &livekit.WebhookEvent{
|
||||
t.notifyEvent(ctx, &livekit.WebhookEvent{
|
||||
Event: webhook.EventParticipantJoined,
|
||||
Room: room,
|
||||
Participant: participant,
|
||||
@@ -47,10 +46,17 @@ func (s *TelemetryService) ParticipantJoined(ctx context.Context, room *livekit.
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
func (s *TelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
|
||||
s.pool.Submit(prometheus.SubParticipant)
|
||||
func (t *TelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
|
||||
t.Lock()
|
||||
if w := t.workers[participant.Sid]; w != nil {
|
||||
w.Close()
|
||||
delete(t.workers, participant.Sid)
|
||||
}
|
||||
t.Unlock()
|
||||
|
||||
s.notifyEvent(ctx, &livekit.WebhookEvent{
|
||||
prometheus.SubParticipant()
|
||||
|
||||
t.notifyEvent(ctx, &livekit.WebhookEvent{
|
||||
Event: webhook.EventParticipantLeft,
|
||||
Room: room,
|
||||
Participant: participant,
|
||||
@@ -59,42 +65,51 @@ func (s *TelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Ro
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
func (s *TelemetryService) PublishedTrack(SID, identity string, track *livekit.TrackInfo) {
|
||||
s.pool.Submit(func() {
|
||||
prometheus.AddPublishedTrack(track.Type.String())
|
||||
})
|
||||
func (t *TelemetryService) TrackPublished(participantID, identity string, track *livekit.TrackInfo, buff *buffer.Buffer) {
|
||||
t.Lock()
|
||||
if t.workers[participantID] == nil {
|
||||
t.workers[participantID] = NewStatsWorker(func(diff *buffer.Stats) {
|
||||
t.HandleIncomingRTP(participantID, identity, diff)
|
||||
})
|
||||
}
|
||||
t.workers[participantID].AddBuffer(buff)
|
||||
t.Unlock()
|
||||
|
||||
prometheus.AddPublishedTrack(track.Type.String())
|
||||
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
func (s *TelemetryService) UnpublishedTrack(SID, identity string, track *livekit.TrackInfo) {
|
||||
s.pool.Submit(func() {
|
||||
prometheus.SubPublishedTrack(track.Type.String())
|
||||
})
|
||||
func (t *TelemetryService) TrackUnpublished(participantID, identity string, track *livekit.TrackInfo, ssrc uint32) {
|
||||
t.RLock()
|
||||
if w := t.workers[participantID]; w != nil {
|
||||
w.RemoveBuffer(ssrc)
|
||||
t.RUnlock()
|
||||
} else {
|
||||
logger.Errorw("missing stats worker", nil, "participantID", participantID)
|
||||
t.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
prometheus.SubPublishedTrack(track.Type.String())
|
||||
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
func (s *TelemetryService) SubscribedTrack(SID, identity string, track *livekit.TrackInfo) {
|
||||
s.pool.Submit(func() {
|
||||
prometheus.AddSubscribedTrack(track.Type.String())
|
||||
})
|
||||
func (t *TelemetryService) TrackSubscribed(participantID, identity string, track *livekit.TrackInfo) {
|
||||
prometheus.AddSubscribedTrack(track.Type.String())
|
||||
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
func (s *TelemetryService) UnsubscribedTrack(SID, identity string, track *livekit.TrackInfo) {
|
||||
s.pool.Submit(func() {
|
||||
prometheus.SubSubscribedTrack(track.Type.String())
|
||||
})
|
||||
func (t *TelemetryService) TrackUnsubscribed(participantID, identity string, track *livekit.TrackInfo) {
|
||||
prometheus.SubSubscribedTrack(track.Type.String())
|
||||
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
func (s *TelemetryService) RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) {
|
||||
logger.Debugw("recording started", "recordingID", recordingID)
|
||||
|
||||
s.notifyEvent(ctx, &livekit.WebhookEvent{
|
||||
func (t *TelemetryService) RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) {
|
||||
t.notifyEvent(ctx, &livekit.WebhookEvent{
|
||||
Event: webhook.EventRecordingStarted,
|
||||
RecordingInfo: &livekit.RecordingInfo{
|
||||
Id: recordingID,
|
||||
@@ -105,20 +120,8 @@ func (s *TelemetryService) RecordingStarted(ctx context.Context, recordingID str
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
func (s *TelemetryService) RecordingEnded(res *livekit.RecordingResult) {
|
||||
// log results
|
||||
values := []interface{}{"recordingID", res.Id}
|
||||
if res.Error != "" {
|
||||
values = append(values, "error", res.Error)
|
||||
} else {
|
||||
values = append(values, "duration", time.Duration(res.Duration*1e9))
|
||||
if res.DownloadUrl != "" {
|
||||
values = append(values, "url", res.DownloadUrl)
|
||||
}
|
||||
}
|
||||
logger.Debugw("recording ended", values...)
|
||||
|
||||
s.notifyEvent(context.Background(), &livekit.WebhookEvent{
|
||||
func (t *TelemetryService) RecordingEnded(res *livekit.RecordingResult) {
|
||||
t.notifyEvent(context.Background(), &livekit.WebhookEvent{
|
||||
Event: webhook.EventRecordingFinished,
|
||||
RecordingResult: res,
|
||||
})
|
||||
@@ -126,13 +129,13 @@ func (s *TelemetryService) RecordingEnded(res *livekit.RecordingResult) {
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
func (s *TelemetryService) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) {
|
||||
if s.notifier == nil {
|
||||
func (t *TelemetryService) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) {
|
||||
if t.notifier == nil {
|
||||
return
|
||||
}
|
||||
|
||||
s.pool.Submit(func() {
|
||||
if err := s.notifier.Notify(ctx, event); err != nil {
|
||||
t.webhookPool.Submit(func() {
|
||||
if err := t.notifier.Notify(ctx, event); err != nil {
|
||||
logger.Warnw("failed to notify webhook", err, "event", event.Event)
|
||||
}
|
||||
})
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"github.com/pion/interceptor"
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
type StatsInterceptorFactory struct {
|
||||
t *TelemetryService
|
||||
participantID string
|
||||
identity string
|
||||
}
|
||||
|
||||
func (f *StatsInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
|
||||
return &StatsInterceptor{
|
||||
t: f.t,
|
||||
participantID: f.participantID,
|
||||
identity: f.identity,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type StatsInterceptor struct {
|
||||
interceptor.NoOp
|
||||
|
||||
t *TelemetryService
|
||||
participantID string
|
||||
identity string
|
||||
}
|
||||
|
||||
// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
|
||||
// change in the future. The returned method will be called once per packet batch.
|
||||
func (s *StatsInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
|
||||
return interceptor.RTCPReaderFunc(func(bytes []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
s.t.HandleIncomingRTCP(s.participantID, s.identity, bytes)
|
||||
return reader.Read(bytes, attributes)
|
||||
})
|
||||
}
|
||||
|
||||
// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
|
||||
// will be called once per packet batch.
|
||||
func (s *StatsInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
|
||||
return interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
|
||||
s.t.HandleOutgoingRTCP(s.participantID, s.identity, pkts)
|
||||
return writer.Write(pkts, attributes)
|
||||
})
|
||||
}
|
||||
|
||||
// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
|
||||
// will be called once per rtp packet.
|
||||
func (s *StatsInterceptor) BindLocalStream(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
|
||||
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
s.t.HandleOutgoingRTP(s.participantID, s.identity, uint64(len(payload)))
|
||||
return writer.Write(header, payload, attributes)
|
||||
})
|
||||
}
|
||||
@@ -57,15 +57,21 @@ func initPacketStats() {
|
||||
prometheus.MustRegister(promFirTotal)
|
||||
}
|
||||
|
||||
func IncrementPackets(direction Direction, pktLen uint64) {
|
||||
promPacketTotal.WithLabelValues(string(direction)).Add(1)
|
||||
promPacketBytes.WithLabelValues(string(direction)).Add(float64(pktLen))
|
||||
func IncrementPackets(direction Direction, count uint64) {
|
||||
promPacketTotal.WithLabelValues(string(direction)).Add(float64(count))
|
||||
if direction == Incoming {
|
||||
atomic.AddUint64(&atomicPacketsIn, 1)
|
||||
atomic.AddUint64(&atomicBytesIn, pktLen)
|
||||
atomic.AddUint64(&atomicPacketsIn, count)
|
||||
} else {
|
||||
atomic.AddUint64(&atomicPacketsOut, 1)
|
||||
atomic.AddUint64(&atomicBytesOut, pktLen)
|
||||
atomic.AddUint64(&atomicPacketsOut, count)
|
||||
}
|
||||
}
|
||||
|
||||
func IncrementBytes(direction Direction, count uint64) {
|
||||
promPacketBytes.WithLabelValues(string(direction)).Add(float64(count))
|
||||
if direction == Incoming {
|
||||
atomic.AddUint64(&atomicBytesIn, count)
|
||||
} else {
|
||||
atomic.AddUint64(&atomicBytesOut, count)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+57
-59
@@ -1,91 +1,89 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/gammazero/workerpool"
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/webhook"
|
||||
"github.com/pion/interceptor"
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
)
|
||||
|
||||
type TelemetryService struct {
|
||||
notifier webhook.Notifier
|
||||
pool *workerpool.WorkerPool
|
||||
notifier webhook.Notifier
|
||||
webhookPool *workerpool.WorkerPool
|
||||
|
||||
sync.RWMutex
|
||||
// one worker per participant
|
||||
workers map[string]*StatsWorker
|
||||
}
|
||||
|
||||
func NewTelemetryService(notifier webhook.Notifier) *TelemetryService {
|
||||
return &TelemetryService{
|
||||
notifier: notifier,
|
||||
pool: workerpool.New(10),
|
||||
notifier: notifier,
|
||||
webhookPool: workerpool.New(1),
|
||||
workers: make(map[string]*StatsWorker),
|
||||
}
|
||||
}
|
||||
|
||||
type StatsInterceptorFactory struct {
|
||||
t *TelemetryService
|
||||
participantID string
|
||||
identity string
|
||||
}
|
||||
|
||||
func (s *TelemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory {
|
||||
func (t *TelemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory {
|
||||
return &StatsInterceptorFactory{
|
||||
t: s,
|
||||
t: t,
|
||||
participantID: participantID,
|
||||
identity: identity,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *StatsInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
|
||||
return &StatsInterceptor{
|
||||
t: f.t,
|
||||
participantID: f.participantID,
|
||||
identity: f.identity,
|
||||
}, nil
|
||||
func (t *TelemetryService) HandleIncomingRTP(participantID, identity string, diff *buffer.Stats) {
|
||||
prometheus.IncrementPackets(prometheus.Incoming, uint64(diff.PacketCount))
|
||||
prometheus.IncrementBytes(prometheus.Incoming, diff.TotalByte)
|
||||
|
||||
// TODO: analytics service
|
||||
// diff.LastExpected, diff.LastReceived, diff.Jitter, diff.LostRate
|
||||
}
|
||||
|
||||
type StatsInterceptor struct {
|
||||
interceptor.NoOp
|
||||
func (t *TelemetryService) HandleIncomingRTCP(participantID, identity string, bytes []byte) {
|
||||
pkts, err := rtcp.Unmarshal(bytes)
|
||||
if err != nil {
|
||||
logger.Errorw("Interceptor failed to unmarshal rtcp packets", err)
|
||||
return
|
||||
}
|
||||
|
||||
t *TelemetryService
|
||||
participantID string
|
||||
identity string
|
||||
for _, pkt := range pkts {
|
||||
switch pkt.(type) {
|
||||
case *rtcp.TransportLayerNack:
|
||||
prometheus.IncrementNack(prometheus.Incoming)
|
||||
case *rtcp.PictureLossIndication:
|
||||
prometheus.IncrementPLI(prometheus.Incoming)
|
||||
case *rtcp.FullIntraRequest:
|
||||
prometheus.IncrementFIR(prometheus.Incoming)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
// --- Incoming ---
|
||||
func (t *TelemetryService) HandleOutgoingRTP(participantID, identity string, pktLen uint64) {
|
||||
prometheus.IncrementPackets(prometheus.Outgoing, 1)
|
||||
prometheus.IncrementBytes(prometheus.Outgoing, pktLen)
|
||||
|
||||
// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
|
||||
// change in the future. The returned method will be called once per packet batch.
|
||||
func (s *StatsInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
|
||||
return interceptor.RTCPReaderFunc(func(bytes []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
s.t.HandleIncomingRTCP(s.participantID, s.identity, bytes)
|
||||
return reader.Read(bytes, attributes)
|
||||
})
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
|
||||
// will be called once per rtp packet.
|
||||
func (s *StatsInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
|
||||
return interceptor.RTPReaderFunc(func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
s.t.HandleIncomingRTP(s.participantID, s.identity, uint64(len(payload)))
|
||||
return reader.Read(payload, attributes)
|
||||
})
|
||||
}
|
||||
func (t *TelemetryService) HandleOutgoingRTCP(participantID, identity string, pkts []rtcp.Packet) {
|
||||
for _, pkt := range pkts {
|
||||
switch pkt.(type) {
|
||||
case *rtcp.TransportLayerNack:
|
||||
prometheus.IncrementNack(prometheus.Outgoing)
|
||||
case *rtcp.PictureLossIndication:
|
||||
prometheus.IncrementPLI(prometheus.Outgoing)
|
||||
case *rtcp.FullIntraRequest:
|
||||
prometheus.IncrementFIR(prometheus.Outgoing)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Outgoing ---
|
||||
|
||||
// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
|
||||
// will be called once per packet batch.
|
||||
func (s *StatsInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
|
||||
return interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
|
||||
s.t.HandleOutgoingRTCP(s.participantID, s.identity, pkts)
|
||||
return writer.Write(pkts, attributes)
|
||||
})
|
||||
}
|
||||
|
||||
// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
|
||||
// will be called once per rtp packet.
|
||||
func (s *StatsInterceptor) BindLocalStream(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
|
||||
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
s.t.HandleOutgoingRTP(s.participantID, s.identity, uint64(len(payload)))
|
||||
return writer.Write(header, payload, attributes)
|
||||
})
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
@@ -1,64 +0,0 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/pion/rtcp"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
)
|
||||
|
||||
func (s *TelemetryService) HandleIncomingRTCP(participantID, identity string, bytes []byte) {
|
||||
pkts, err := rtcp.Unmarshal(bytes)
|
||||
if err != nil {
|
||||
logger.Errorw("Interceptor failed to unmarshal rtcp packets", err)
|
||||
return
|
||||
}
|
||||
|
||||
s.pool.Submit(func() {
|
||||
for _, pkt := range pkts {
|
||||
switch pkt.(type) {
|
||||
case *rtcp.TransportLayerNack:
|
||||
prometheus.IncrementNack(prometheus.Incoming)
|
||||
case *rtcp.PictureLossIndication:
|
||||
prometheus.IncrementPLI(prometheus.Incoming)
|
||||
case *rtcp.FullIntraRequest:
|
||||
prometheus.IncrementFIR(prometheus.Incoming)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
func (s *TelemetryService) HandleIncomingRTP(participantID, identity string, pktLen uint64) {
|
||||
s.pool.Submit(func() {
|
||||
prometheus.IncrementPackets(prometheus.Incoming, pktLen)
|
||||
})
|
||||
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
func (s *TelemetryService) HandleOutgoingRTCP(participantID, identity string, pkts []rtcp.Packet) {
|
||||
s.pool.Submit(func() {
|
||||
for _, pkt := range pkts {
|
||||
switch pkt.(type) {
|
||||
case *rtcp.TransportLayerNack:
|
||||
prometheus.IncrementNack(prometheus.Outgoing)
|
||||
case *rtcp.PictureLossIndication:
|
||||
prometheus.IncrementPLI(prometheus.Outgoing)
|
||||
case *rtcp.FullIntraRequest:
|
||||
prometheus.IncrementFIR(prometheus.Outgoing)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// TODO: analytics service
|
||||
}
|
||||
|
||||
func (s *TelemetryService) HandleOutgoingRTP(participantID, identity string, pktLen uint64) {
|
||||
s.pool.Submit(func() {
|
||||
prometheus.IncrementPackets(prometheus.Outgoing, pktLen)
|
||||
})
|
||||
|
||||
// TODO: analytics service
|
||||
}
|
||||
@@ -0,0 +1,89 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
)
|
||||
|
||||
const updateFrequency = time.Second * 10
|
||||
|
||||
// StatsWorker handles incoming RTP statistics instead of the stream interceptor
|
||||
type StatsWorker struct {
|
||||
sync.RWMutex
|
||||
buffers map[uint32]*buffer.Buffer
|
||||
lastStats *buffer.Stats
|
||||
onUpdate func(diff *buffer.Stats)
|
||||
close chan struct{}
|
||||
}
|
||||
|
||||
func NewStatsWorker(onUpdate func(*buffer.Stats)) *StatsWorker {
|
||||
s := &StatsWorker{
|
||||
buffers: make(map[uint32]*buffer.Buffer),
|
||||
onUpdate: onUpdate,
|
||||
close: make(chan struct{}, 1),
|
||||
}
|
||||
go s.run()
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *StatsWorker) run() {
|
||||
for {
|
||||
select {
|
||||
case <-s.close:
|
||||
return
|
||||
case <-time.After(updateFrequency):
|
||||
s.onUpdate(s.Calc())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StatsWorker) AddBuffer(buffer *buffer.Buffer) {
|
||||
s.Lock()
|
||||
s.buffers[buffer.GetMediaSSRC()] = buffer
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
func (s *StatsWorker) RemoveBuffer(ssrc uint32) {
|
||||
s.Lock()
|
||||
delete(s.buffers, ssrc)
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
func (s *StatsWorker) Calc() *buffer.Stats {
|
||||
s.RLock()
|
||||
total := &buffer.Stats{}
|
||||
for _, buff := range s.buffers {
|
||||
stats := buff.GetStats()
|
||||
total.PacketCount += stats.PacketCount
|
||||
total.TotalByte += stats.TotalByte
|
||||
total.LastExpected += stats.LastExpected
|
||||
total.LastReceived += stats.LastReceived
|
||||
if stats.Jitter > total.Jitter {
|
||||
total.Jitter = stats.Jitter
|
||||
}
|
||||
}
|
||||
s.RUnlock()
|
||||
|
||||
var diff *buffer.Stats
|
||||
if s.lastStats != nil {
|
||||
diff = &buffer.Stats{
|
||||
LastExpected: total.LastExpected - s.lastStats.LastExpected,
|
||||
LastReceived: total.LastReceived - s.lastStats.LastReceived,
|
||||
PacketCount: total.PacketCount - s.lastStats.PacketCount,
|
||||
TotalByte: total.TotalByte - s.lastStats.TotalByte,
|
||||
Jitter: total.Jitter,
|
||||
}
|
||||
} else {
|
||||
diff = total
|
||||
}
|
||||
diff.LostRate = float32(diff.LastExpected-diff.LastReceived) / float32(diff.LastExpected)
|
||||
|
||||
s.lastStats = diff
|
||||
return diff
|
||||
}
|
||||
|
||||
func (s *StatsWorker) Close() {
|
||||
close(s.close)
|
||||
}
|
||||
Reference in New Issue
Block a user