publish stream stats to prometheus (#1313)

* add prometheus stats for rtt/jitter/packet loss

* add track source to metrics

* better packet loss bins

* add track type to metrics

* remove source from AnalyticsStat

* regenerate telemetry service fake

* compute loss from per stream packet count
This commit is contained in:
Paul Wells
2023-01-19 19:37:15 -08:00
committed by GitHub
parent 6a8e86c3a3
commit 1ef7c46fd7
8 changed files with 135 additions and 47 deletions

View File

@@ -251,7 +251,8 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
})
newWR.OnStatsUpdate(func(_ *sfu.WebRTCReceiver, stat *livekit.AnalyticsStat) {
// LK-TODO: this needs to be receiver/mime aware
t.params.Telemetry.TrackStats(livekit.StreamType_UPSTREAM, t.PublisherID(), t.ID(), stat)
key := telemetry.StatsKeyForTrack(livekit.StreamType_UPSTREAM, t.PublisherID(), t.ID(), t.params.TrackInfo.Source, t.params.TrackInfo.Type)
t.params.Telemetry.TrackStats(key, stat)
})
if t.PrimaryReceiver() == nil {
// primary codec published, set potential codecs

View File

@@ -154,7 +154,8 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
})
downTrack.OnStatsUpdate(func(_ *sfu.DownTrack, stat *livekit.AnalyticsStat) {
t.params.Telemetry.TrackStats(livekit.StreamType_DOWNSTREAM, subscriberID, trackID, stat)
key := telemetry.StatsKeyForTrack(livekit.StreamType_DOWNSTREAM, subscriberID, trackID, t.params.MediaTrack.Source(), t.params.MediaTrack.Kind())
t.params.Telemetry.TrackStats(key, stat)
})
downTrack.OnMaxLayerChanged(func(dt *sfu.DownTrack, layer int32) {

View File

@@ -31,9 +31,14 @@ var (
promPacketTotal *prometheus.CounterVec
promPacketBytes *prometheus.CounterVec
promRTCPLabels = []string{"direction"}
promStreamLabels = []string{"direction", "source", "type"}
promNackTotal *prometheus.CounterVec
promPliTotal *prometheus.CounterVec
promFirTotal *prometheus.CounterVec
promPacketLossTotal *prometheus.CounterVec
promPacketLoss *prometheus.HistogramVec
promJitter *prometheus.HistogramVec
promRTT *prometheus.HistogramVec
promParticipantJoin *prometheus.CounterVec
promConnections *prometheus.GaugeVec
)
@@ -69,6 +74,33 @@ func initPacketStats(nodeID string, nodeType livekit.NodeType) {
Name: "total",
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
}, promRTCPLabels)
promPacketLossTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "packet_loss",
Name: "total",
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
}, promStreamLabels)
promPacketLoss = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: livekitNamespace,
Subsystem: "packet_loss",
Name: "percent",
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
Buckets: []float64{0.0, 0.1, 0.3, 0.5, 0.7, 1, 5, 10, 40, 100},
}, promStreamLabels)
promJitter = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: livekitNamespace,
Subsystem: "jitter",
Name: "us",
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
Buckets: []float64{100, 500, 1500, 2000, 3000, 5000, 10000, 20000, 40000, 60000},
}, promStreamLabels)
promRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: livekitNamespace,
Subsystem: "rtt",
Name: "ms",
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
Buckets: []float64{50, 100, 150, 200, 250, 500, 750, 1000, 5000, 10000},
}, promStreamLabels)
promParticipantJoin = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "participant_join",
@@ -87,6 +119,10 @@ func initPacketStats(nodeID string, nodeType livekit.NodeType) {
prometheus.MustRegister(promNackTotal)
prometheus.MustRegister(promPliTotal)
prometheus.MustRegister(promFirTotal)
prometheus.MustRegister(promPacketLossTotal)
prometheus.MustRegister(promPacketLoss)
prometheus.MustRegister(promJitter)
prometheus.MustRegister(promRTT)
prometheus.MustRegister(promParticipantJoin)
prometheus.MustRegister(promConnections)
}
@@ -134,6 +170,27 @@ func IncrementRTCP(direction Direction, nack, pli, fir uint32) {
}
}
func RecordPacketLoss(direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, lost, total uint32) {
if total > 0 {
promPacketLoss.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Observe(float64(lost) / float64(total) * 100)
}
if lost > 0 {
promPacketLossTotal.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Add(float64(lost))
}
}
func RecordJitter(direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, jitter uint32) {
if jitter > 0 {
promJitter.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Observe(float64(jitter))
}
}
func RecordRTT(direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, rtt uint32) {
if rtt > 0 {
promRTT.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Observe(float64(rtt))
}
}
func IncrementParticipantJoin(join uint32, rtcConnected ...bool) {
if join > 0 {
if len(rtcConnected) > 0 && rtcConnected[0] {

View File

@@ -69,7 +69,7 @@ func (p *BytesTrackStats) report(force bool) {
}
if recv := p.recv.Swap(0); recv > 0 {
p.telemetry.TrackStats(livekit.StreamType_UPSTREAM, p.pID, p.trackID, &livekit.AnalyticsStat{
p.telemetry.TrackStats(StatsKeyForData(livekit.StreamType_UPSTREAM, p.pID, p.trackID), &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
{PrimaryBytes: recv},
},
@@ -77,7 +77,7 @@ func (p *BytesTrackStats) report(force bool) {
}
if send := p.send.Swap(0); send > 0 {
p.telemetry.TrackStats(livekit.StreamType_DOWNSTREAM, p.pID, p.trackID, &livekit.AnalyticsStat{
p.telemetry.TrackStats(StatsKeyForData(livekit.StreamType_DOWNSTREAM, p.pID, p.trackID), &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
{PrimaryBytes: send},
},

View File

@@ -5,10 +5,38 @@ import (
"github.com/livekit/protocol/livekit"
)
func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat) {
type StatsKey struct {
streamType livekit.StreamType
participantID livekit.ParticipantID
trackID livekit.TrackID
trackSource livekit.TrackSource
trackType livekit.TrackType
track bool
}
func StatsKeyForTrack(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, trackSource livekit.TrackSource, trackType livekit.TrackType) StatsKey {
return StatsKey{
streamType: streamType,
participantID: participantID,
trackID: trackID,
trackSource: trackSource,
trackType: trackType,
track: true,
}
}
func StatsKeyForData(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID) StatsKey {
return StatsKey{
streamType: streamType,
participantID: participantID,
trackID: trackID,
}
}
func (t *telemetryService) TrackStats(key StatsKey, stat *livekit.AnalyticsStat) {
t.enqueue(func() {
direction := prometheus.Incoming
if streamType == livekit.StreamType_DOWNSTREAM {
if key.streamType == livekit.StreamType_DOWNSTREAM {
direction = prometheus.Outgoing
}
@@ -25,7 +53,7 @@ func (t *telemetryService) TrackStats(streamType livekit.StreamType, participant
firs += stream.Firs
packets += stream.PrimaryPackets + stream.PaddingPackets
bytes += stream.PrimaryBytes + stream.PaddingBytes
if streamType == livekit.StreamType_DOWNSTREAM {
if key.streamType == livekit.StreamType_DOWNSTREAM {
retransmitPackets += stream.RetransmitPackets
retransmitBytes += stream.RetransmitBytes
} else {
@@ -33,6 +61,11 @@ func (t *telemetryService) TrackStats(streamType livekit.StreamType, participant
packets += stream.RetransmitPackets
bytes += stream.RetransmitBytes
}
if key.track {
prometheus.RecordPacketLoss(direction, key.trackSource, key.trackType, stream.PacketsLost, stream.PrimaryPackets+stream.PaddingPackets)
prometheus.RecordRTT(direction, key.trackSource, key.trackType, stream.Rtt)
prometheus.RecordJitter(direction, key.trackSource, key.trackType, stream.Jitter)
}
}
prometheus.IncrementRTCP(direction, nacks, plis, firs)
prometheus.IncrementPackets(direction, uint64(packets), false)
@@ -44,8 +77,8 @@ func (t *telemetryService) TrackStats(streamType livekit.StreamType, participant
prometheus.IncrementBytes(direction, retransmitBytes, true)
}
if worker, ok := t.getWorker(participantID); ok {
worker.OnTrackStat(trackID, streamType, stat)
if worker, ok := t.getWorker(key.participantID); ok {
worker.OnTrackStat(key.trackID, key.streamType, stat)
}
})
}

View File

@@ -43,7 +43,7 @@ func Test_ParticipantAndRoomDataAreSentWithAnalytics(t *testing.T) {
// do
packet := 33
stat := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet)}}}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, "", stat)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, ""), stat)
// flush
fixture.flush()
@@ -75,7 +75,7 @@ func Test_OnDownstreamPackets(t *testing.T) {
trackID := livekit.TrackID("trackID")
for i := range packets {
stat := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packets[i]), PrimaryPackets: uint32(1)}}}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat)
}
// flush
@@ -105,12 +105,12 @@ func Test_OnDownstreamPackets_SeveralTracks(t *testing.T) {
packet1 := 33
trackID1 := livekit.TrackID("trackID1")
stat1 := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet1), PrimaryPackets: 1}}}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID1, stat1)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat1)
packet2 := 23
trackID2 := livekit.TrackID("trackID2")
stat2 := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet2), PrimaryPackets: 1}}}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID2, stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID2), stat2)
// flush
fixture.flush()
@@ -161,7 +161,7 @@ func Test_OnDownStreamStat(t *testing.T) {
},
}
trackID := livekit.TrackID("trackID1")
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat1)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat1)
stat2 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -177,7 +177,7 @@ func Test_OnDownStreamStat(t *testing.T) {
},
},
}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat2)
// flush
fixture.flush()
@@ -216,7 +216,7 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) {
},
},
}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat1) // there should be bytes reported so that stats are sent
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat1) // there should be bytes reported so that stats are sent
// flush
fixture.flush()
@@ -230,7 +230,7 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) {
},
},
}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat2)
// flush
fixture.flush()
@@ -268,7 +268,7 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) {
},
},
}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID1, stat1) // there should be bytes reported so that stats are sent
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat1) // there should be bytes reported so that stats are sent
stat2 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -279,7 +279,7 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) {
},
},
}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID1, stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat2)
stat3 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -290,7 +290,7 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) {
},
},
}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID2, stat3)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID2), stat3)
// flush
fixture.flush()
@@ -343,7 +343,7 @@ func Test_OnUpstreamStat(t *testing.T) {
}
trackID := livekit.TrackID("trackID")
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat1)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID), stat1)
stat2 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -359,7 +359,7 @@ func Test_OnUpstreamStat(t *testing.T) {
},
},
}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID), stat2)
// flush
fixture.flush()
@@ -402,8 +402,8 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
},
},
}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID1, stat1)
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID2, stat1) // using same buffer is not correct but for test it is fine
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID1), stat1)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID2), stat1) // using same buffer is not correct but for test it is fine
// do
totalBytes++
@@ -417,7 +417,7 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
},
},
}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID1, stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID1), stat2)
stat3 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -428,7 +428,7 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
},
},
}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID2, stat3)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID2), stat3)
// flush
fixture.flush()
@@ -504,7 +504,7 @@ func Test_AddUpTrack(t *testing.T) {
},
}
trackID := livekit.TrackID("trackID")
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID), stat)
// flush
fixture.flush()
@@ -542,7 +542,7 @@ func Test_AddUpTrack_SeveralBuffers_Simulcast(t *testing.T) {
},
},
}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat1)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID), stat1)
// flush
fixture.flush()
@@ -577,7 +577,7 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) {
},
},
}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, "trackID", stat1)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, "trackID"), stat1)
// downstream bytes
stat2 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -587,7 +587,7 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) {
},
},
}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, "trackID1", stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, "trackID1"), stat2)
// flush
fixture.flush()

View File

@@ -129,13 +129,11 @@ type FakeTelemetryService struct {
arg2 livekit.ParticipantID
arg3 *livekit.TrackInfo
}
TrackStatsStub func(livekit.StreamType, livekit.ParticipantID, livekit.TrackID, *livekit.AnalyticsStat)
TrackStatsStub func(telemetry.StatsKey, *livekit.AnalyticsStat)
trackStatsMutex sync.RWMutex
trackStatsArgsForCall []struct {
arg1 livekit.StreamType
arg2 livekit.ParticipantID
arg3 livekit.TrackID
arg4 *livekit.AnalyticsStat
arg1 telemetry.StatsKey
arg2 *livekit.AnalyticsStat
}
TrackSubscribeRequestedStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo, *livekit.ParticipantInfo)
trackSubscribeRequestedMutex sync.RWMutex
@@ -756,19 +754,17 @@ func (fake *FakeTelemetryService) TrackPublishedUpdateArgsForCall(i int) (contex
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeTelemetryService) TrackStats(arg1 livekit.StreamType, arg2 livekit.ParticipantID, arg3 livekit.TrackID, arg4 *livekit.AnalyticsStat) {
func (fake *FakeTelemetryService) TrackStats(arg1 telemetry.StatsKey, arg2 *livekit.AnalyticsStat) {
fake.trackStatsMutex.Lock()
fake.trackStatsArgsForCall = append(fake.trackStatsArgsForCall, struct {
arg1 livekit.StreamType
arg2 livekit.ParticipantID
arg3 livekit.TrackID
arg4 *livekit.AnalyticsStat
}{arg1, arg2, arg3, arg4})
arg1 telemetry.StatsKey
arg2 *livekit.AnalyticsStat
}{arg1, arg2})
stub := fake.TrackStatsStub
fake.recordInvocation("TrackStats", []interface{}{arg1, arg2, arg3, arg4})
fake.recordInvocation("TrackStats", []interface{}{arg1, arg2})
fake.trackStatsMutex.Unlock()
if stub != nil {
fake.TrackStatsStub(arg1, arg2, arg3, arg4)
fake.TrackStatsStub(arg1, arg2)
}
}
@@ -778,17 +774,17 @@ func (fake *FakeTelemetryService) TrackStatsCallCount() int {
return len(fake.trackStatsArgsForCall)
}
func (fake *FakeTelemetryService) TrackStatsCalls(stub func(livekit.StreamType, livekit.ParticipantID, livekit.TrackID, *livekit.AnalyticsStat)) {
func (fake *FakeTelemetryService) TrackStatsCalls(stub func(telemetry.StatsKey, *livekit.AnalyticsStat)) {
fake.trackStatsMutex.Lock()
defer fake.trackStatsMutex.Unlock()
fake.TrackStatsStub = stub
}
func (fake *FakeTelemetryService) TrackStatsArgsForCall(i int) (livekit.StreamType, livekit.ParticipantID, livekit.TrackID, *livekit.AnalyticsStat) {
func (fake *FakeTelemetryService) TrackStatsArgsForCall(i int) (telemetry.StatsKey, *livekit.AnalyticsStat) {
fake.trackStatsMutex.RLock()
defer fake.trackStatsMutex.RUnlock()
argsForCall := fake.trackStatsArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeTelemetryService) TrackSubscribeRequested(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo, arg4 *livekit.ParticipantInfo) {

View File

@@ -16,7 +16,7 @@ import (
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . TelemetryService
type TelemetryService interface {
// TrackStats is called periodically for each track in both directions (published/subscribed)
TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat)
TrackStats(key StatsKey, stat *livekit.AnalyticsStat)
// events
RoomStarted(ctx context.Context, room *livekit.Room)