Add country label to edge prom stats. (#3816)

* Add country label to edge prom stats.

* data channel country stats

* test

* pub/sub time country
This commit is contained in:
Raja Subramanian
2025-07-24 13:23:05 +05:30
committed by GitHub
parent 68387b41fe
commit 10103449c5
12 changed files with 268 additions and 116 deletions
+10 -2
View File
@@ -67,6 +67,7 @@ type MediaTrackParams struct {
ParticipantID func() livekit.ParticipantID
ParticipantIdentity livekit.ParticipantIdentity
ParticipantVersion uint32
ParticipantCountry string
BufferFactory *buffer.Factory
ReceiverConfig ReceiverConfig
SubscriberConfig DirectionConfig
@@ -335,14 +336,21 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track sfu.TrackRe
})
// SIMULCAST-CODEC-TODO: these need to be receiver/mime aware, setting it up only for primary now
statsKey := telemetry.StatsKeyForTrack(
t.params.ParticipantCountry,
livekit.StreamType_UPSTREAM,
t.PublisherID(),
t.ID(),
ti.Source,
ti.Type,
)
newWR.OnStatsUpdate(func(_ *sfu.WebRTCReceiver, stat *livekit.AnalyticsStat) {
// send for only one codec, either primary (priority == 0) OR regressed codec
t.lock.RLock()
regressionTargetCodecReceived := t.regressionTargetCodecReceived
t.lock.RUnlock()
if priority == 0 || regressionTargetCodecReceived {
key := telemetry.StatsKeyForTrack(livekit.StreamType_UPSTREAM, t.PublisherID(), t.ID(), ti.Source, ti.Type)
t.params.Telemetry.TrackStats(key, stat)
t.params.Telemetry.TrackStats(statsKey, stat)
if cs, ok := telemetry.CondenseStat(stat); ok {
t.params.Reporter.Tx(func(tx roomobs.TrackTx) {
+9 -2
View File
@@ -210,10 +210,17 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
subTrack.SetPublisherMuted(t.params.MediaTrack.IsMuted())
})
statsKey := telemetry.StatsKeyForTrack(
sub.GetCountry(),
livekit.StreamType_DOWNSTREAM,
subscriberID,
trackID,
t.params.MediaTrack.Source(),
t.params.MediaTrack.Kind(),
)
reporter := sub.GetReporter().WithTrack(trackID.String())
downTrack.OnStatsUpdate(func(_ *sfu.DownTrack, stat *livekit.AnalyticsStat) {
key := telemetry.StatsKeyForTrack(livekit.StreamType_DOWNSTREAM, subscriberID, trackID, t.params.MediaTrack.Source(), t.params.MediaTrack.Kind())
t.params.Telemetry.TrackStats(key, stat)
t.params.Telemetry.TrackStats(statsKey, stat)
if cs, ok := telemetry.CondenseStat(stat); ok {
ti := wr.TrackInfo()
+15 -1
View File
@@ -205,6 +205,7 @@ type ParticipantParams struct {
FireOnTrackBySdp bool
DisableCodecRegression bool
LastPubReliableSeq uint32
Country string
}
type ParticipantImpl struct {
@@ -379,6 +380,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
p.id.Store(params.SID)
p.dataChannelStats = telemetry.NewBytesTrackStats(
p.params.Country,
telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, p.ID()),
p.ID(),
params.Telemetry,
@@ -436,6 +438,10 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
return p, nil
}
func (p *ParticipantImpl) GetCountry() string {
return p.params.Country
}
func (p *ParticipantImpl) GetTrailer() []byte {
trailer := make([]byte, len(p.params.Trailer))
copy(trailer, p.params.Trailer)
@@ -2947,7 +2953,14 @@ func (p *ParticipantImpl) mediaTrackReceived(track sfu.TrackRemote, rtpReceiver
)
}
prometheus.RecordPublishTime(mt.Source(), mt.Kind(), pubTime, p.GetClientInfo().GetSdk(), p.Kind())
prometheus.RecordPublishTime(
p.params.Country,
mt.Source(),
mt.Kind(),
pubTime,
p.GetClientInfo().GetSdk(),
p.Kind(),
)
p.handleTrackPublished(mt, isMigrated)
}()
}
@@ -3015,6 +3028,7 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
ParticipantID: p.ID,
ParticipantIdentity: p.params.Identity,
ParticipantVersion: p.version.Load(),
ParticipantCountry: p.params.Country,
BufferFactory: p.params.Config.BufferFactory,
ReceiverConfig: p.params.Config.Receiver,
AudioConfig: p.params.AudioConfig,
+9 -1
View File
@@ -1140,7 +1140,15 @@ func (s *trackSubscription) maybeRecordSuccess(ts telemetry.TelemetryService, pI
d := time.Since(*s.subscribeAt.Load())
s.logger.Debugw("track subscribed", "cost", d.Milliseconds())
subscriber := subTrack.Subscriber()
prometheus.RecordSubscribeTime(mediaTrack.Source(), mediaTrack.Kind(), d, subscriber.GetClientInfo().GetSdk(), subscriber.Kind(), int(s.succRecordCounter.Inc()))
prometheus.RecordSubscribeTime(
subscriber.GetCountry(),
mediaTrack.Source(),
mediaTrack.Kind(),
d,
subscriber.GetClientInfo().GetSdk(),
subscriber.Kind(),
int(s.succRecordCounter.Inc()),
)
eventSent := s.eventSent.Swap(true)
+1
View File
@@ -350,6 +350,7 @@ type LocalParticipant interface {
ToProtoWithVersion() (*livekit.ParticipantInfo, utils.TimedVersion)
// getters
GetCountry() string
GetTrailer() []byte
GetLogger() logger.Logger
GetLoggerResolver() logger.DeferredFieldResolver
@@ -292,6 +292,16 @@ type FakeLocalParticipant struct {
getConnectionQualityReturnsOnCall map[int]struct {
result1 *livekit.ConnectionQualityInfo
}
GetCountryStub func() string
getCountryMutex sync.RWMutex
getCountryArgsForCall []struct {
}
getCountryReturns struct {
result1 string
}
getCountryReturnsOnCall map[int]struct {
result1 string
}
GetDisableSenderReportPassThroughStub func() bool
getDisableSenderReportPassThroughMutex sync.RWMutex
getDisableSenderReportPassThroughArgsForCall []struct {
@@ -2832,6 +2842,59 @@ func (fake *FakeLocalParticipant) GetConnectionQualityReturnsOnCall(i int, resul
}{result1}
}
func (fake *FakeLocalParticipant) GetCountry() string {
fake.getCountryMutex.Lock()
ret, specificReturn := fake.getCountryReturnsOnCall[len(fake.getCountryArgsForCall)]
fake.getCountryArgsForCall = append(fake.getCountryArgsForCall, struct {
}{})
stub := fake.GetCountryStub
fakeReturns := fake.getCountryReturns
fake.recordInvocation("GetCountry", []interface{}{})
fake.getCountryMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) GetCountryCallCount() int {
fake.getCountryMutex.RLock()
defer fake.getCountryMutex.RUnlock()
return len(fake.getCountryArgsForCall)
}
func (fake *FakeLocalParticipant) GetCountryCalls(stub func() string) {
fake.getCountryMutex.Lock()
defer fake.getCountryMutex.Unlock()
fake.GetCountryStub = stub
}
func (fake *FakeLocalParticipant) GetCountryReturns(result1 string) {
fake.getCountryMutex.Lock()
defer fake.getCountryMutex.Unlock()
fake.GetCountryStub = nil
fake.getCountryReturns = struct {
result1 string
}{result1}
}
func (fake *FakeLocalParticipant) GetCountryReturnsOnCall(i int, result1 string) {
fake.getCountryMutex.Lock()
defer fake.getCountryMutex.Unlock()
fake.GetCountryStub = nil
if fake.getCountryReturnsOnCall == nil {
fake.getCountryReturnsOnCall = make(map[int]struct {
result1 string
})
}
fake.getCountryReturnsOnCall[i] = struct {
result1 string
}{result1}
}
func (fake *FakeLocalParticipant) GetDisableSenderReportPassThrough() bool {
fake.getDisableSenderReportPassThroughMutex.Lock()
ret, specificReturn := fake.getDisableSenderReportPassThroughReturnsOnCall[len(fake.getDisableSenderReportPassThroughArgsForCall)]
@@ -9262,6 +9325,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.getClientInfoMutex.RUnlock()
fake.getConnectionQualityMutex.RLock()
defer fake.getConnectionQualityMutex.RUnlock()
fake.getCountryMutex.RLock()
defer fake.getCountryMutex.RUnlock()
fake.getDisableSenderReportPassThroughMutex.RLock()
defer fake.getDisableSenderReportPassThroughMutex.RUnlock()
fake.getEnabledPublishCodecsMutex.RLock()
+45 -54
View File
@@ -24,10 +24,15 @@ import (
type Direction string
const (
Incoming Direction = "incoming"
Outgoing Direction = "outgoing"
transmissionInitial = "initial"
transmissionRetransmit = "retransmit"
Incoming Direction = "incoming"
Outgoing Direction = "outgoing"
)
type transmissionType string
const (
transmissionInitial transmissionType = "initial"
transmissionRetransmit transmissionType = "retransmit"
)
var (
@@ -44,11 +49,11 @@ var (
forwardLatency atomic.Uint32
forwardJitter atomic.Uint32
promPacketLabels = []string{"direction", "transmission"}
promPacketLabels = []string{"direction", "transmission", "country"}
promPacketTotal *prometheus.CounterVec
promPacketBytes *prometheus.CounterVec
promRTCPLabels = []string{"direction"}
promStreamLabels = []string{"direction", "source", "type"}
promRTCPLabels = []string{"direction", "country"}
promStreamLabels = []string{"direction", "source", "type", "country"}
promNackTotal *prometheus.CounterVec
promPliTotal *prometheus.CounterVec
promFirTotal *prometheus.CounterVec
@@ -186,31 +191,16 @@ func initPacketStats(nodeID string, nodeType livekit.NodeType) {
prometheus.MustRegister(promConnections)
prometheus.MustRegister(promForwardLatency)
prometheus.MustRegister(promForwardJitter)
promPacketTotalIncomingInitial = promPacketTotal.WithLabelValues(string(Incoming), transmissionInitial)
promPacketTotalIncomingRetransmit = promPacketTotal.WithLabelValues(string(Incoming), transmissionRetransmit)
promPacketTotalOutgoingInitial = promPacketTotal.WithLabelValues(string(Outgoing), transmissionInitial)
promPacketTotalOutgoingRetransmit = promPacketTotal.WithLabelValues(string(Outgoing), transmissionRetransmit)
promPacketBytesIncomingInitial = promPacketBytes.WithLabelValues(string(Incoming), transmissionInitial)
promPacketBytesIncomingRetransmit = promPacketBytes.WithLabelValues(string(Incoming), transmissionRetransmit)
promPacketBytesOutgoingInitial = promPacketBytes.WithLabelValues(string(Outgoing), transmissionInitial)
promPacketBytesOutgoingRetransmit = promPacketBytes.WithLabelValues(string(Outgoing), transmissionRetransmit)
}
func IncrementPackets(direction Direction, count uint64, retransmit bool) {
if direction == Incoming {
if retransmit {
promPacketTotalIncomingRetransmit.Add(float64(count))
} else {
promPacketTotalIncomingInitial.Add(float64(count))
}
func IncrementPackets(country string, direction Direction, count uint64, retransmit bool) {
var transmission transmissionType
if retransmit {
transmission = transmissionRetransmit
} else {
if retransmit {
promPacketTotalOutgoingRetransmit.Add(float64(count))
} else {
promPacketTotalOutgoingInitial.Add(float64(count))
}
transmission = transmissionInitial
}
promPacketTotal.WithLabelValues(string(direction), string(transmission), country).Add(float64(count))
if direction == Incoming {
packetsIn.Add(count)
@@ -222,20 +212,14 @@ func IncrementPackets(direction Direction, count uint64, retransmit bool) {
}
}
func IncrementBytes(direction Direction, count uint64, retransmit bool) {
if direction == Incoming {
if retransmit {
promPacketBytesIncomingRetransmit.Add(float64(count))
} else {
promPacketBytesIncomingInitial.Add(float64(count))
}
func IncrementBytes(country string, direction Direction, count uint64, retransmit bool) {
var transmission transmissionType
if retransmit {
transmission = transmissionRetransmit
} else {
if retransmit {
promPacketBytesOutgoingRetransmit.Add(float64(count))
} else {
promPacketBytesOutgoingInitial.Add(float64(count))
}
transmission = transmissionInitial
}
promPacketBytes.WithLabelValues(string(direction), string(transmission), country).Add(float64(count))
if direction == Incoming {
bytesIn.Add(count)
@@ -247,46 +231,53 @@ func IncrementBytes(direction Direction, count uint64, retransmit bool) {
}
}
func IncrementRTCP(direction Direction, nack, pli, fir uint32) {
func IncrementRTCP(country string, direction Direction, nack, pli, fir uint32) {
if nack > 0 {
promNackTotal.WithLabelValues(string(direction)).Add(float64(nack))
promNackTotal.WithLabelValues(string(direction), country).Add(float64(nack))
nackTotal.Add(uint64(nack))
}
if pli > 0 {
promPliTotal.WithLabelValues(string(direction)).Add(float64(pli))
promPliTotal.WithLabelValues(string(direction), country).Add(float64(pli))
}
if fir > 0 {
promFirTotal.WithLabelValues(string(direction)).Add(float64(fir))
promFirTotal.WithLabelValues(string(direction), country).Add(float64(fir))
}
}
func RecordPacketLoss(direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, lost, total uint32) {
func RecordPacketLoss(
country string,
direction Direction,
trackSource livekit.TrackSource,
trackType livekit.TrackType,
lost uint32,
total uint32,
) {
if total > 0 {
promPacketLoss.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Observe(float64(lost) / float64(total) * 100)
promPacketLoss.WithLabelValues(string(direction), trackSource.String(), trackType.String(), country).Observe(float64(lost) / float64(total) * 100)
}
if lost > 0 {
promPacketLossTotal.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Add(float64(lost))
promPacketLossTotal.WithLabelValues(string(direction), trackSource.String(), trackType.String(), country).Add(float64(lost))
}
}
func RecordPacketOutOfOrder(direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, ooo, total uint32) {
func RecordPacketOutOfOrder(country string, direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, ooo, total uint32) {
if total > 0 {
promPacketOutOfOrder.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Observe(float64(ooo) / float64(total) * 100)
promPacketOutOfOrder.WithLabelValues(string(direction), trackSource.String(), trackType.String(), country).Observe(float64(ooo) / float64(total) * 100)
}
if ooo > 0 {
promPacketOutOfOrderTotal.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Add(float64(ooo))
promPacketOutOfOrderTotal.WithLabelValues(string(direction), trackSource.String(), trackType.String(), country).Add(float64(ooo))
}
}
func RecordJitter(direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, jitter uint32) {
func RecordJitter(country string, direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, jitter uint32) {
if jitter > 0 {
promJitter.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Observe(float64(jitter))
promJitter.WithLabelValues(string(direction), trackSource.String(), trackType.String(), country).Observe(float64(jitter))
}
}
func RecordRTT(direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, rtt uint32) {
func RecordRTT(country string, direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, rtt uint32) {
if rtt > 0 {
promRTT.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Observe(float64(rtt))
promRTT.WithLabelValues(string(direction), trackSource.String(), trackType.String(), country).Observe(float64(rtt))
}
}
+38 -6
View File
@@ -172,20 +172,52 @@ func AddPublishSuccess(kind string) {
promTrackPublishCounter.WithLabelValues(kind, "success").Inc()
}
func RecordPublishTime(source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind) {
recordPubSubTime(true, source, trackType, d, sdk, kind, 1)
func RecordPublishTime(
country string,
source livekit.TrackSource,
trackType livekit.TrackType,
d time.Duration,
sdk livekit.ClientInfo_SDK,
kind livekit.ParticipantInfo_Kind,
) {
recordPubSubTime(true, country, source, trackType, d, sdk, kind, 1)
}
func RecordSubscribeTime(source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind, count int) {
recordPubSubTime(false, source, trackType, d, sdk, kind, count)
func RecordSubscribeTime(
country string,
source livekit.TrackSource,
trackType livekit.TrackType,
d time.Duration,
sdk livekit.ClientInfo_SDK,
kind livekit.ParticipantInfo_Kind,
count int,
) {
recordPubSubTime(false, country, source, trackType, d, sdk, kind, count)
}
func recordPubSubTime(isPublish bool, source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind, count int) {
func recordPubSubTime(
isPublish bool,
country string,
source livekit.TrackSource,
trackType livekit.TrackType,
d time.Duration,
sdk livekit.ClientInfo_SDK,
kind livekit.ParticipantInfo_Kind,
count int,
) {
direction := "subscribe"
if isPublish {
direction = "publish"
}
promPubSubTime.WithLabelValues(direction, source.String(), trackType.String(), sdk.String(), kind.String(), strconv.Itoa(count)).Observe(float64(d.Milliseconds()))
promPubSubTime.WithLabelValues(
direction,
source.String(),
trackType.String(),
country,
sdk.String(),
kind.String(),
strconv.Itoa(count),
).Observe(float64(d.Milliseconds()))
}
func RecordTrackSubscribeSuccess(kind string) {
+21 -12
View File
@@ -49,6 +49,7 @@ type TrafficTotals struct {
// stats for signal and data channel
type BytesTrackStats struct {
country string
trackID livekit.TrackID
pID livekit.ParticipantID
send, recv atomic.Uint64
@@ -61,12 +62,14 @@ type BytesTrackStats struct {
}
func NewBytesTrackStats(
country string,
trackID livekit.TrackID,
pID livekit.ParticipantID,
telemetry TelemetryService,
participantReporter roomobs.ParticipantSessionReporter,
) *BytesTrackStats {
s := &BytesTrackStats{
country: country,
trackID: trackID,
pID: pID,
telemetry: telemetry,
@@ -119,26 +122,32 @@ func (s *BytesTrackStats) Stop() {
func (s *BytesTrackStats) report() {
if recv := s.recv.Swap(0); recv > 0 {
packets := s.recvMessages.Swap(0)
s.telemetry.TrackStats(StatsKeyForData(livekit.StreamType_UPSTREAM, s.pID, s.trackID), &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
{
PrimaryBytes: recv,
PrimaryPackets: packets,
s.telemetry.TrackStats(
StatsKeyForData(s.country, livekit.StreamType_UPSTREAM, s.pID, s.trackID),
&livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
{
PrimaryBytes: recv,
PrimaryPackets: packets,
},
},
},
})
)
}
if send := s.send.Swap(0); send > 0 {
packets := s.sendMessages.Swap(0)
s.telemetry.TrackStats(StatsKeyForData(livekit.StreamType_DOWNSTREAM, s.pID, s.trackID), &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
{
PrimaryBytes: send,
PrimaryPackets: packets,
s.telemetry.TrackStats(
StatsKeyForData(s.country, livekit.StreamType_DOWNSTREAM, s.pID, s.trackID),
&livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
{
PrimaryBytes: send,
PrimaryPackets: packets,
},
},
},
})
)
}
}
+26 -11
View File
@@ -20,6 +20,7 @@ import (
)
type StatsKey struct {
country string
streamType livekit.StreamType
participantID livekit.ParticipantID
trackID livekit.TrackID
@@ -28,8 +29,16 @@ type StatsKey struct {
track bool
}
func StatsKeyForTrack(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, trackSource livekit.TrackSource, trackType livekit.TrackType) StatsKey {
func StatsKeyForTrack(
country string,
streamType livekit.StreamType,
participantID livekit.ParticipantID,
trackID livekit.TrackID,
trackSource livekit.TrackSource,
trackType livekit.TrackType,
) StatsKey {
return StatsKey{
country: country,
streamType: streamType,
participantID: participantID,
trackID: trackID,
@@ -39,8 +48,14 @@ func StatsKeyForTrack(streamType livekit.StreamType, participantID livekit.Parti
}
}
func StatsKeyForData(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID) StatsKey {
func StatsKeyForData(
country string,
streamType livekit.StreamType,
participantID livekit.ParticipantID,
trackID livekit.TrackID,
) StatsKey {
return StatsKey{
country: country,
streamType: streamType,
participantID: participantID,
trackID: trackID,
@@ -76,20 +91,20 @@ func (t *telemetryService) TrackStats(key StatsKey, stat *livekit.AnalyticsStat)
bytes += stream.RetransmitBytes
}
if key.track {
prometheus.RecordPacketLoss(direction, key.trackSource, key.trackType, stream.PacketsLost, stream.PrimaryPackets+stream.PaddingPackets)
prometheus.RecordPacketOutOfOrder(direction, key.trackSource, key.trackType, stream.PacketsOutOfOrder, stream.PrimaryPackets+stream.PaddingPackets)
prometheus.RecordRTT(direction, key.trackSource, key.trackType, stream.Rtt)
prometheus.RecordJitter(direction, key.trackSource, key.trackType, stream.Jitter)
prometheus.RecordPacketLoss(key.country, direction, key.trackSource, key.trackType, stream.PacketsLost, stream.PrimaryPackets+stream.PaddingPackets)
prometheus.RecordPacketOutOfOrder(key.country, direction, key.trackSource, key.trackType, stream.PacketsOutOfOrder, stream.PrimaryPackets+stream.PaddingPackets)
prometheus.RecordRTT(key.country, direction, key.trackSource, key.trackType, stream.Rtt)
prometheus.RecordJitter(key.country, direction, key.trackSource, key.trackType, stream.Jitter)
}
}
prometheus.IncrementRTCP(direction, nacks, plis, firs)
prometheus.IncrementPackets(direction, uint64(packets), false)
prometheus.IncrementBytes(direction, bytes, false)
prometheus.IncrementRTCP(key.country, direction, nacks, plis, firs)
prometheus.IncrementPackets(key.country, direction, uint64(packets), false)
prometheus.IncrementBytes(key.country, direction, bytes, false)
if retransmitPackets != 0 {
prometheus.IncrementPackets(direction, uint64(retransmitPackets), true)
prometheus.IncrementPackets(key.country, direction, uint64(retransmitPackets), true)
}
if retransmitBytes != 0 {
prometheus.IncrementBytes(direction, retransmitBytes, true)
prometheus.IncrementBytes(key.country, direction, retransmitBytes, true)
}
if worker, ok := t.getWorker(key.participantID); ok {
+21 -21
View File
@@ -57,7 +57,7 @@ func Test_ParticipantAndRoomDataAreSentWithAnalytics(t *testing.T) {
// do
packet := 33
stat := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet)}}}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, ""), stat)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, ""), stat)
// flush
fixture.flush()
@@ -89,7 +89,7 @@ func Test_OnDownstreamPackets(t *testing.T) {
trackID := livekit.TrackID("trackID")
for i := range packets {
stat := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packets[i]), PrimaryPackets: uint32(1)}}}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat)
}
// flush
@@ -119,12 +119,12 @@ func Test_OnDownstreamPackets_SeveralTracks(t *testing.T) {
packet1 := 33
trackID1 := livekit.TrackID("trackID1")
stat1 := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet1), PrimaryPackets: 1}}}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat1)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat1)
packet2 := 23
trackID2 := livekit.TrackID("trackID2")
stat2 := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet2), PrimaryPackets: 1}}}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID2), stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID2), stat2)
// flush
fixture.flush()
@@ -175,7 +175,7 @@ func Test_OnDownStreamStat(t *testing.T) {
},
}
trackID := livekit.TrackID("trackID1")
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat1)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat1)
stat2 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -191,7 +191,7 @@ func Test_OnDownStreamStat(t *testing.T) {
},
},
}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat2)
// flush
fixture.flush()
@@ -230,7 +230,7 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) {
},
},
}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat1) // there should be bytes reported so that stats are sent
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat1) // there should be bytes reported so that stats are sent
// flush
fixture.flush()
@@ -244,7 +244,7 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) {
},
},
}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat2)
// flush
fixture.flush()
@@ -282,7 +282,7 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) {
},
},
}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat1) // there should be bytes reported so that stats are sent
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat1) // there should be bytes reported so that stats are sent
stat2 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -293,7 +293,7 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) {
},
},
}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat2)
stat3 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -304,7 +304,7 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) {
},
},
}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID2), stat3)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID2), stat3)
// flush
fixture.flush()
@@ -357,7 +357,7 @@ func Test_OnUpstreamStat(t *testing.T) {
}
trackID := livekit.TrackID("trackID")
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID), stat1)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID), stat1)
stat2 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -373,7 +373,7 @@ func Test_OnUpstreamStat(t *testing.T) {
},
},
}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID), stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID), stat2)
// flush
fixture.flush()
@@ -416,8 +416,8 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
},
},
}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID1), stat1)
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID2), stat1) // using same buffer is not correct but for test it is fine
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID1), stat1)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID2), stat1) // using same buffer is not correct but for test it is fine
// do
totalBytes++
@@ -431,7 +431,7 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
},
},
}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID1), stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID1), stat2)
stat3 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -442,7 +442,7 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
},
},
}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID2), stat3)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID2), stat3)
// flush
fixture.flush()
@@ -518,7 +518,7 @@ func Test_AddUpTrack(t *testing.T) {
},
}
trackID := livekit.TrackID("trackID")
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID), stat)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID), stat)
// flush
fixture.flush()
@@ -556,7 +556,7 @@ func Test_AddUpTrack_SeveralBuffers_Simulcast(t *testing.T) {
},
},
}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID), stat1)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID), stat1)
// flush
fixture.flush()
@@ -591,7 +591,7 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) {
},
},
}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, "trackID"), stat1)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, "trackID"), stat1)
// downstream bytes
stat2 := &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -601,7 +601,7 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) {
},
},
}
fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, "trackID1"), stat2)
fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, "trackID1"), stat2)
// flush
fixture.flush()
+8 -6
View File
@@ -52,7 +52,8 @@ func NewConn(c net.Conn, direction prometheus.Direction) *Conn {
func (c *Conn) Read(b []byte) (n int, err error) {
n, err = c.Conn.Read(b)
if n > 0 {
prometheus.IncrementBytes(prometheus.Incoming, uint64(n), false)
prometheus.IncrementBytes("", prometheus.Incoming, uint64(n), false)
prometheus.IncrementPackets("", prometheus.Incoming, 1, false)
}
return
}
@@ -60,7 +61,8 @@ func (c *Conn) Read(b []byte) (n int, err error) {
func (c *Conn) Write(b []byte) (n int, err error) {
n, err = c.Conn.Write(b)
if n > 0 {
prometheus.IncrementBytes(prometheus.Outgoing, uint64(n), false)
prometheus.IncrementBytes("", prometheus.Outgoing, uint64(n), false)
prometheus.IncrementPackets("", prometheus.Outgoing, 1, false)
}
return
}
@@ -83,8 +85,8 @@ func NewPacketConn(c net.PacketConn, direction prometheus.Direction) *PacketConn
func (c *PacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
n, addr, err = c.PacketConn.ReadFrom(p)
if n > 0 {
prometheus.IncrementBytes(prometheus.Incoming, uint64(n), false)
prometheus.IncrementPackets(prometheus.Incoming, 1, false)
prometheus.IncrementBytes("", prometheus.Incoming, uint64(n), false)
prometheus.IncrementPackets("", prometheus.Incoming, 1, false)
}
return
}
@@ -92,8 +94,8 @@ func (c *PacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
func (c *PacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
n, err = c.PacketConn.WriteTo(p, addr)
if n > 0 {
prometheus.IncrementBytes(prometheus.Outgoing, uint64(n), false)
prometheus.IncrementPackets(prometheus.Outgoing, 1, false)
prometheus.IncrementBytes("", prometheus.Outgoing, uint64(n), false)
prometheus.IncrementPackets("", prometheus.Outgoing, 1, false)
}
return
}