diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index b5b7f3d8e..e90a524b9 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -67,6 +67,7 @@ type MediaTrackParams struct { ParticipantID func() livekit.ParticipantID ParticipantIdentity livekit.ParticipantIdentity ParticipantVersion uint32 + ParticipantCountry string BufferFactory *buffer.Factory ReceiverConfig ReceiverConfig SubscriberConfig DirectionConfig @@ -335,14 +336,21 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track sfu.TrackRe }) // SIMULCAST-CODEC-TODO: these need to be receiver/mime aware, setting it up only for primary now + statsKey := telemetry.StatsKeyForTrack( + t.params.ParticipantCountry, + livekit.StreamType_UPSTREAM, + t.PublisherID(), + t.ID(), + ti.Source, + ti.Type, + ) newWR.OnStatsUpdate(func(_ *sfu.WebRTCReceiver, stat *livekit.AnalyticsStat) { // send for only one codec, either primary (priority == 0) OR regressed codec t.lock.RLock() regressionTargetCodecReceived := t.regressionTargetCodecReceived t.lock.RUnlock() if priority == 0 || regressionTargetCodecReceived { - key := telemetry.StatsKeyForTrack(livekit.StreamType_UPSTREAM, t.PublisherID(), t.ID(), ti.Source, ti.Type) - t.params.Telemetry.TrackStats(key, stat) + t.params.Telemetry.TrackStats(statsKey, stat) if cs, ok := telemetry.CondenseStat(stat); ok { t.params.Reporter.Tx(func(tx roomobs.TrackTx) { diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 94ee24688..ce2aa037a 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -210,10 +210,17 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * subTrack.SetPublisherMuted(t.params.MediaTrack.IsMuted()) }) + statsKey := telemetry.StatsKeyForTrack( + sub.GetCountry(), + livekit.StreamType_DOWNSTREAM, + subscriberID, + trackID, + t.params.MediaTrack.Source(), + t.params.MediaTrack.Kind(), + ) reporter := sub.GetReporter().WithTrack(trackID.String()) downTrack.OnStatsUpdate(func(_ *sfu.DownTrack, stat *livekit.AnalyticsStat) { - key := telemetry.StatsKeyForTrack(livekit.StreamType_DOWNSTREAM, subscriberID, trackID, t.params.MediaTrack.Source(), t.params.MediaTrack.Kind()) - t.params.Telemetry.TrackStats(key, stat) + t.params.Telemetry.TrackStats(statsKey, stat) if cs, ok := telemetry.CondenseStat(stat); ok { ti := wr.TrackInfo() diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 7e0962750..d1cb5820b 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -205,6 +205,7 @@ type ParticipantParams struct { FireOnTrackBySdp bool DisableCodecRegression bool LastPubReliableSeq uint32 + Country string } type ParticipantImpl struct { @@ -379,6 +380,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { p.id.Store(params.SID) p.dataChannelStats = telemetry.NewBytesTrackStats( + p.params.Country, telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, p.ID()), p.ID(), params.Telemetry, @@ -436,6 +438,10 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { return p, nil } +func (p *ParticipantImpl) GetCountry() string { + return p.params.Country +} + func (p *ParticipantImpl) GetTrailer() []byte { trailer := make([]byte, len(p.params.Trailer)) copy(trailer, p.params.Trailer) @@ -2947,7 +2953,14 @@ func (p *ParticipantImpl) mediaTrackReceived(track sfu.TrackRemote, rtpReceiver ) } - prometheus.RecordPublishTime(mt.Source(), mt.Kind(), pubTime, p.GetClientInfo().GetSdk(), p.Kind()) + prometheus.RecordPublishTime( + p.params.Country, + mt.Source(), + mt.Kind(), + pubTime, + p.GetClientInfo().GetSdk(), + p.Kind(), + ) p.handleTrackPublished(mt, isMigrated) }() } @@ -3015,6 +3028,7 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv ParticipantID: p.ID, ParticipantIdentity: p.params.Identity, ParticipantVersion: p.version.Load(), + ParticipantCountry: p.params.Country, BufferFactory: p.params.Config.BufferFactory, ReceiverConfig: p.params.Config.Receiver, AudioConfig: p.params.AudioConfig, diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 0c89c0882..682b8f372 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -1140,7 +1140,15 @@ func (s *trackSubscription) maybeRecordSuccess(ts telemetry.TelemetryService, pI d := time.Since(*s.subscribeAt.Load()) s.logger.Debugw("track subscribed", "cost", d.Milliseconds()) subscriber := subTrack.Subscriber() - prometheus.RecordSubscribeTime(mediaTrack.Source(), mediaTrack.Kind(), d, subscriber.GetClientInfo().GetSdk(), subscriber.Kind(), int(s.succRecordCounter.Inc())) + prometheus.RecordSubscribeTime( + subscriber.GetCountry(), + mediaTrack.Source(), + mediaTrack.Kind(), + d, + subscriber.GetClientInfo().GetSdk(), + subscriber.Kind(), + int(s.succRecordCounter.Inc()), + ) eventSent := s.eventSent.Swap(true) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index f3265bc6a..132bb0f0a 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -350,6 +350,7 @@ type LocalParticipant interface { ToProtoWithVersion() (*livekit.ParticipantInfo, utils.TimedVersion) // getters + GetCountry() string GetTrailer() []byte GetLogger() logger.Logger GetLoggerResolver() logger.DeferredFieldResolver diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index d541371c3..8d61eed90 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -292,6 +292,16 @@ type FakeLocalParticipant struct { getConnectionQualityReturnsOnCall map[int]struct { result1 *livekit.ConnectionQualityInfo } + GetCountryStub func() string + getCountryMutex sync.RWMutex + getCountryArgsForCall []struct { + } + getCountryReturns struct { + result1 string + } + getCountryReturnsOnCall map[int]struct { + result1 string + } GetDisableSenderReportPassThroughStub func() bool getDisableSenderReportPassThroughMutex sync.RWMutex getDisableSenderReportPassThroughArgsForCall []struct { @@ -2832,6 +2842,59 @@ func (fake *FakeLocalParticipant) GetConnectionQualityReturnsOnCall(i int, resul }{result1} } +func (fake *FakeLocalParticipant) GetCountry() string { + fake.getCountryMutex.Lock() + ret, specificReturn := fake.getCountryReturnsOnCall[len(fake.getCountryArgsForCall)] + fake.getCountryArgsForCall = append(fake.getCountryArgsForCall, struct { + }{}) + stub := fake.GetCountryStub + fakeReturns := fake.getCountryReturns + fake.recordInvocation("GetCountry", []interface{}{}) + fake.getCountryMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) GetCountryCallCount() int { + fake.getCountryMutex.RLock() + defer fake.getCountryMutex.RUnlock() + return len(fake.getCountryArgsForCall) +} + +func (fake *FakeLocalParticipant) GetCountryCalls(stub func() string) { + fake.getCountryMutex.Lock() + defer fake.getCountryMutex.Unlock() + fake.GetCountryStub = stub +} + +func (fake *FakeLocalParticipant) GetCountryReturns(result1 string) { + fake.getCountryMutex.Lock() + defer fake.getCountryMutex.Unlock() + fake.GetCountryStub = nil + fake.getCountryReturns = struct { + result1 string + }{result1} +} + +func (fake *FakeLocalParticipant) GetCountryReturnsOnCall(i int, result1 string) { + fake.getCountryMutex.Lock() + defer fake.getCountryMutex.Unlock() + fake.GetCountryStub = nil + if fake.getCountryReturnsOnCall == nil { + fake.getCountryReturnsOnCall = make(map[int]struct { + result1 string + }) + } + fake.getCountryReturnsOnCall[i] = struct { + result1 string + }{result1} +} + func (fake *FakeLocalParticipant) GetDisableSenderReportPassThrough() bool { fake.getDisableSenderReportPassThroughMutex.Lock() ret, specificReturn := fake.getDisableSenderReportPassThroughReturnsOnCall[len(fake.getDisableSenderReportPassThroughArgsForCall)] @@ -9262,6 +9325,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.getClientInfoMutex.RUnlock() fake.getConnectionQualityMutex.RLock() defer fake.getConnectionQualityMutex.RUnlock() + fake.getCountryMutex.RLock() + defer fake.getCountryMutex.RUnlock() fake.getDisableSenderReportPassThroughMutex.RLock() defer fake.getDisableSenderReportPassThroughMutex.RUnlock() fake.getEnabledPublishCodecsMutex.RLock() diff --git a/pkg/telemetry/prometheus/packets.go b/pkg/telemetry/prometheus/packets.go index ce3f07c6c..760e5bda7 100644 --- a/pkg/telemetry/prometheus/packets.go +++ b/pkg/telemetry/prometheus/packets.go @@ -24,10 +24,15 @@ import ( type Direction string const ( - Incoming Direction = "incoming" - Outgoing Direction = "outgoing" - transmissionInitial = "initial" - transmissionRetransmit = "retransmit" + Incoming Direction = "incoming" + Outgoing Direction = "outgoing" +) + +type transmissionType string + +const ( + transmissionInitial transmissionType = "initial" + transmissionRetransmit transmissionType = "retransmit" ) var ( @@ -44,11 +49,11 @@ var ( forwardLatency atomic.Uint32 forwardJitter atomic.Uint32 - promPacketLabels = []string{"direction", "transmission"} + promPacketLabels = []string{"direction", "transmission", "country"} promPacketTotal *prometheus.CounterVec promPacketBytes *prometheus.CounterVec - promRTCPLabels = []string{"direction"} - promStreamLabels = []string{"direction", "source", "type"} + promRTCPLabels = []string{"direction", "country"} + promStreamLabels = []string{"direction", "source", "type", "country"} promNackTotal *prometheus.CounterVec promPliTotal *prometheus.CounterVec promFirTotal *prometheus.CounterVec @@ -186,31 +191,16 @@ func initPacketStats(nodeID string, nodeType livekit.NodeType) { prometheus.MustRegister(promConnections) prometheus.MustRegister(promForwardLatency) prometheus.MustRegister(promForwardJitter) - - promPacketTotalIncomingInitial = promPacketTotal.WithLabelValues(string(Incoming), transmissionInitial) - promPacketTotalIncomingRetransmit = promPacketTotal.WithLabelValues(string(Incoming), transmissionRetransmit) - promPacketTotalOutgoingInitial = promPacketTotal.WithLabelValues(string(Outgoing), transmissionInitial) - promPacketTotalOutgoingRetransmit = promPacketTotal.WithLabelValues(string(Outgoing), transmissionRetransmit) - promPacketBytesIncomingInitial = promPacketBytes.WithLabelValues(string(Incoming), transmissionInitial) - promPacketBytesIncomingRetransmit = promPacketBytes.WithLabelValues(string(Incoming), transmissionRetransmit) - promPacketBytesOutgoingInitial = promPacketBytes.WithLabelValues(string(Outgoing), transmissionInitial) - promPacketBytesOutgoingRetransmit = promPacketBytes.WithLabelValues(string(Outgoing), transmissionRetransmit) } -func IncrementPackets(direction Direction, count uint64, retransmit bool) { - if direction == Incoming { - if retransmit { - promPacketTotalIncomingRetransmit.Add(float64(count)) - } else { - promPacketTotalIncomingInitial.Add(float64(count)) - } +func IncrementPackets(country string, direction Direction, count uint64, retransmit bool) { + var transmission transmissionType + if retransmit { + transmission = transmissionRetransmit } else { - if retransmit { - promPacketTotalOutgoingRetransmit.Add(float64(count)) - } else { - promPacketTotalOutgoingInitial.Add(float64(count)) - } + transmission = transmissionInitial } + promPacketTotal.WithLabelValues(string(direction), string(transmission), country).Add(float64(count)) if direction == Incoming { packetsIn.Add(count) @@ -222,20 +212,14 @@ func IncrementPackets(direction Direction, count uint64, retransmit bool) { } } -func IncrementBytes(direction Direction, count uint64, retransmit bool) { - if direction == Incoming { - if retransmit { - promPacketBytesIncomingRetransmit.Add(float64(count)) - } else { - promPacketBytesIncomingInitial.Add(float64(count)) - } +func IncrementBytes(country string, direction Direction, count uint64, retransmit bool) { + var transmission transmissionType + if retransmit { + transmission = transmissionRetransmit } else { - if retransmit { - promPacketBytesOutgoingRetransmit.Add(float64(count)) - } else { - promPacketBytesOutgoingInitial.Add(float64(count)) - } + transmission = transmissionInitial } + promPacketBytes.WithLabelValues(string(direction), string(transmission), country).Add(float64(count)) if direction == Incoming { bytesIn.Add(count) @@ -247,46 +231,53 @@ func IncrementBytes(direction Direction, count uint64, retransmit bool) { } } -func IncrementRTCP(direction Direction, nack, pli, fir uint32) { +func IncrementRTCP(country string, direction Direction, nack, pli, fir uint32) { if nack > 0 { - promNackTotal.WithLabelValues(string(direction)).Add(float64(nack)) + promNackTotal.WithLabelValues(string(direction), country).Add(float64(nack)) nackTotal.Add(uint64(nack)) } if pli > 0 { - promPliTotal.WithLabelValues(string(direction)).Add(float64(pli)) + promPliTotal.WithLabelValues(string(direction), country).Add(float64(pli)) } if fir > 0 { - promFirTotal.WithLabelValues(string(direction)).Add(float64(fir)) + promFirTotal.WithLabelValues(string(direction), country).Add(float64(fir)) } } -func RecordPacketLoss(direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, lost, total uint32) { +func RecordPacketLoss( + country string, + direction Direction, + trackSource livekit.TrackSource, + trackType livekit.TrackType, + lost uint32, + total uint32, +) { if total > 0 { - promPacketLoss.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Observe(float64(lost) / float64(total) * 100) + promPacketLoss.WithLabelValues(string(direction), trackSource.String(), trackType.String(), country).Observe(float64(lost) / float64(total) * 100) } if lost > 0 { - promPacketLossTotal.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Add(float64(lost)) + promPacketLossTotal.WithLabelValues(string(direction), trackSource.String(), trackType.String(), country).Add(float64(lost)) } } -func RecordPacketOutOfOrder(direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, ooo, total uint32) { +func RecordPacketOutOfOrder(country string, direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, ooo, total uint32) { if total > 0 { - promPacketOutOfOrder.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Observe(float64(ooo) / float64(total) * 100) + promPacketOutOfOrder.WithLabelValues(string(direction), trackSource.String(), trackType.String(), country).Observe(float64(ooo) / float64(total) * 100) } if ooo > 0 { - promPacketOutOfOrderTotal.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Add(float64(ooo)) + promPacketOutOfOrderTotal.WithLabelValues(string(direction), trackSource.String(), trackType.String(), country).Add(float64(ooo)) } } -func RecordJitter(direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, jitter uint32) { +func RecordJitter(country string, direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, jitter uint32) { if jitter > 0 { - promJitter.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Observe(float64(jitter)) + promJitter.WithLabelValues(string(direction), trackSource.String(), trackType.String(), country).Observe(float64(jitter)) } } -func RecordRTT(direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, rtt uint32) { +func RecordRTT(country string, direction Direction, trackSource livekit.TrackSource, trackType livekit.TrackType, rtt uint32) { if rtt > 0 { - promRTT.WithLabelValues(string(direction), trackSource.String(), trackType.String()).Observe(float64(rtt)) + promRTT.WithLabelValues(string(direction), trackSource.String(), trackType.String(), country).Observe(float64(rtt)) } } diff --git a/pkg/telemetry/prometheus/rooms.go b/pkg/telemetry/prometheus/rooms.go index e0d48479d..9214c7e42 100644 --- a/pkg/telemetry/prometheus/rooms.go +++ b/pkg/telemetry/prometheus/rooms.go @@ -172,20 +172,52 @@ func AddPublishSuccess(kind string) { promTrackPublishCounter.WithLabelValues(kind, "success").Inc() } -func RecordPublishTime(source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind) { - recordPubSubTime(true, source, trackType, d, sdk, kind, 1) +func RecordPublishTime( + country string, + source livekit.TrackSource, + trackType livekit.TrackType, + d time.Duration, + sdk livekit.ClientInfo_SDK, + kind livekit.ParticipantInfo_Kind, +) { + recordPubSubTime(true, country, source, trackType, d, sdk, kind, 1) } -func RecordSubscribeTime(source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind, count int) { - recordPubSubTime(false, source, trackType, d, sdk, kind, count) +func RecordSubscribeTime( + country string, + source livekit.TrackSource, + trackType livekit.TrackType, + d time.Duration, + sdk livekit.ClientInfo_SDK, + kind livekit.ParticipantInfo_Kind, + count int, +) { + recordPubSubTime(false, country, source, trackType, d, sdk, kind, count) } -func recordPubSubTime(isPublish bool, source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind, count int) { +func recordPubSubTime( + isPublish bool, + country string, + source livekit.TrackSource, + trackType livekit.TrackType, + d time.Duration, + sdk livekit.ClientInfo_SDK, + kind livekit.ParticipantInfo_Kind, + count int, +) { direction := "subscribe" if isPublish { direction = "publish" } - promPubSubTime.WithLabelValues(direction, source.String(), trackType.String(), sdk.String(), kind.String(), strconv.Itoa(count)).Observe(float64(d.Milliseconds())) + promPubSubTime.WithLabelValues( + direction, + source.String(), + trackType.String(), + country, + sdk.String(), + kind.String(), + strconv.Itoa(count), + ).Observe(float64(d.Milliseconds())) } func RecordTrackSubscribeSuccess(kind string) { diff --git a/pkg/telemetry/signalanddatastats.go b/pkg/telemetry/signalanddatastats.go index f098e97d5..83f6152eb 100644 --- a/pkg/telemetry/signalanddatastats.go +++ b/pkg/telemetry/signalanddatastats.go @@ -49,6 +49,7 @@ type TrafficTotals struct { // stats for signal and data channel type BytesTrackStats struct { + country string trackID livekit.TrackID pID livekit.ParticipantID send, recv atomic.Uint64 @@ -61,12 +62,14 @@ type BytesTrackStats struct { } func NewBytesTrackStats( + country string, trackID livekit.TrackID, pID livekit.ParticipantID, telemetry TelemetryService, participantReporter roomobs.ParticipantSessionReporter, ) *BytesTrackStats { s := &BytesTrackStats{ + country: country, trackID: trackID, pID: pID, telemetry: telemetry, @@ -119,26 +122,32 @@ func (s *BytesTrackStats) Stop() { func (s *BytesTrackStats) report() { if recv := s.recv.Swap(0); recv > 0 { packets := s.recvMessages.Swap(0) - s.telemetry.TrackStats(StatsKeyForData(livekit.StreamType_UPSTREAM, s.pID, s.trackID), &livekit.AnalyticsStat{ - Streams: []*livekit.AnalyticsStream{ - { - PrimaryBytes: recv, - PrimaryPackets: packets, + s.telemetry.TrackStats( + StatsKeyForData(s.country, livekit.StreamType_UPSTREAM, s.pID, s.trackID), + &livekit.AnalyticsStat{ + Streams: []*livekit.AnalyticsStream{ + { + PrimaryBytes: recv, + PrimaryPackets: packets, + }, }, }, - }) + ) } if send := s.send.Swap(0); send > 0 { packets := s.sendMessages.Swap(0) - s.telemetry.TrackStats(StatsKeyForData(livekit.StreamType_DOWNSTREAM, s.pID, s.trackID), &livekit.AnalyticsStat{ - Streams: []*livekit.AnalyticsStream{ - { - PrimaryBytes: send, - PrimaryPackets: packets, + s.telemetry.TrackStats( + StatsKeyForData(s.country, livekit.StreamType_DOWNSTREAM, s.pID, s.trackID), + &livekit.AnalyticsStat{ + Streams: []*livekit.AnalyticsStream{ + { + PrimaryBytes: send, + PrimaryPackets: packets, + }, }, }, - }) + ) } } diff --git a/pkg/telemetry/stats.go b/pkg/telemetry/stats.go index b24b04445..7ff082abb 100644 --- a/pkg/telemetry/stats.go +++ b/pkg/telemetry/stats.go @@ -20,6 +20,7 @@ import ( ) type StatsKey struct { + country string streamType livekit.StreamType participantID livekit.ParticipantID trackID livekit.TrackID @@ -28,8 +29,16 @@ type StatsKey struct { track bool } -func StatsKeyForTrack(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, trackSource livekit.TrackSource, trackType livekit.TrackType) StatsKey { +func StatsKeyForTrack( + country string, + streamType livekit.StreamType, + participantID livekit.ParticipantID, + trackID livekit.TrackID, + trackSource livekit.TrackSource, + trackType livekit.TrackType, +) StatsKey { return StatsKey{ + country: country, streamType: streamType, participantID: participantID, trackID: trackID, @@ -39,8 +48,14 @@ func StatsKeyForTrack(streamType livekit.StreamType, participantID livekit.Parti } } -func StatsKeyForData(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID) StatsKey { +func StatsKeyForData( + country string, + streamType livekit.StreamType, + participantID livekit.ParticipantID, + trackID livekit.TrackID, +) StatsKey { return StatsKey{ + country: country, streamType: streamType, participantID: participantID, trackID: trackID, @@ -76,20 +91,20 @@ func (t *telemetryService) TrackStats(key StatsKey, stat *livekit.AnalyticsStat) bytes += stream.RetransmitBytes } if key.track { - prometheus.RecordPacketLoss(direction, key.trackSource, key.trackType, stream.PacketsLost, stream.PrimaryPackets+stream.PaddingPackets) - prometheus.RecordPacketOutOfOrder(direction, key.trackSource, key.trackType, stream.PacketsOutOfOrder, stream.PrimaryPackets+stream.PaddingPackets) - prometheus.RecordRTT(direction, key.trackSource, key.trackType, stream.Rtt) - prometheus.RecordJitter(direction, key.trackSource, key.trackType, stream.Jitter) + prometheus.RecordPacketLoss(key.country, direction, key.trackSource, key.trackType, stream.PacketsLost, stream.PrimaryPackets+stream.PaddingPackets) + prometheus.RecordPacketOutOfOrder(key.country, direction, key.trackSource, key.trackType, stream.PacketsOutOfOrder, stream.PrimaryPackets+stream.PaddingPackets) + prometheus.RecordRTT(key.country, direction, key.trackSource, key.trackType, stream.Rtt) + prometheus.RecordJitter(key.country, direction, key.trackSource, key.trackType, stream.Jitter) } } - prometheus.IncrementRTCP(direction, nacks, plis, firs) - prometheus.IncrementPackets(direction, uint64(packets), false) - prometheus.IncrementBytes(direction, bytes, false) + prometheus.IncrementRTCP(key.country, direction, nacks, plis, firs) + prometheus.IncrementPackets(key.country, direction, uint64(packets), false) + prometheus.IncrementBytes(key.country, direction, bytes, false) if retransmitPackets != 0 { - prometheus.IncrementPackets(direction, uint64(retransmitPackets), true) + prometheus.IncrementPackets(key.country, direction, uint64(retransmitPackets), true) } if retransmitBytes != 0 { - prometheus.IncrementBytes(direction, retransmitBytes, true) + prometheus.IncrementBytes(key.country, direction, retransmitBytes, true) } if worker, ok := t.getWorker(key.participantID); ok { diff --git a/pkg/telemetry/stats_test.go b/pkg/telemetry/stats_test.go index 89051ee92..7e33b4d9d 100644 --- a/pkg/telemetry/stats_test.go +++ b/pkg/telemetry/stats_test.go @@ -57,7 +57,7 @@ func Test_ParticipantAndRoomDataAreSentWithAnalytics(t *testing.T) { // do packet := 33 stat := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet)}}} - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, ""), stat) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, ""), stat) // flush fixture.flush() @@ -89,7 +89,7 @@ func Test_OnDownstreamPackets(t *testing.T) { trackID := livekit.TrackID("trackID") for i := range packets { stat := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packets[i]), PrimaryPackets: uint32(1)}}} - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat) } // flush @@ -119,12 +119,12 @@ func Test_OnDownstreamPackets_SeveralTracks(t *testing.T) { packet1 := 33 trackID1 := livekit.TrackID("trackID1") stat1 := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet1), PrimaryPackets: 1}}} - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat1) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat1) packet2 := 23 trackID2 := livekit.TrackID("trackID2") stat2 := &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{{PrimaryBytes: uint64(packet2), PrimaryPackets: 1}}} - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID2), stat2) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID2), stat2) // flush fixture.flush() @@ -175,7 +175,7 @@ func Test_OnDownStreamStat(t *testing.T) { }, } trackID := livekit.TrackID("trackID1") - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat1) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat1) stat2 := &livekit.AnalyticsStat{ Streams: []*livekit.AnalyticsStream{ @@ -191,7 +191,7 @@ func Test_OnDownStreamStat(t *testing.T) { }, }, } - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat2) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat2) // flush fixture.flush() @@ -230,7 +230,7 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) { }, }, } - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat1) // there should be bytes reported so that stats are sent + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat1) // there should be bytes reported so that stats are sent // flush fixture.flush() @@ -244,7 +244,7 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) { }, }, } - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID), stat2) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID), stat2) // flush fixture.flush() @@ -282,7 +282,7 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) { }, }, } - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat1) // there should be bytes reported so that stats are sent + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat1) // there should be bytes reported so that stats are sent stat2 := &livekit.AnalyticsStat{ Streams: []*livekit.AnalyticsStream{ @@ -293,7 +293,7 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) { }, }, } - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat2) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID1), stat2) stat3 := &livekit.AnalyticsStat{ Streams: []*livekit.AnalyticsStream{ @@ -304,7 +304,7 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) { }, }, } - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, trackID2), stat3) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, trackID2), stat3) // flush fixture.flush() @@ -357,7 +357,7 @@ func Test_OnUpstreamStat(t *testing.T) { } trackID := livekit.TrackID("trackID") - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID), stat1) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID), stat1) stat2 := &livekit.AnalyticsStat{ Streams: []*livekit.AnalyticsStream{ @@ -373,7 +373,7 @@ func Test_OnUpstreamStat(t *testing.T) { }, }, } - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID), stat2) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID), stat2) // flush fixture.flush() @@ -416,8 +416,8 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) { }, }, } - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID1), stat1) - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID2), stat1) // using same buffer is not correct but for test it is fine + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID1), stat1) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID2), stat1) // using same buffer is not correct but for test it is fine // do totalBytes++ @@ -431,7 +431,7 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) { }, }, } - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID1), stat2) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID1), stat2) stat3 := &livekit.AnalyticsStat{ Streams: []*livekit.AnalyticsStream{ @@ -442,7 +442,7 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) { }, }, } - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID2), stat3) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID2), stat3) // flush fixture.flush() @@ -518,7 +518,7 @@ func Test_AddUpTrack(t *testing.T) { }, } trackID := livekit.TrackID("trackID") - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID), stat) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID), stat) // flush fixture.flush() @@ -556,7 +556,7 @@ func Test_AddUpTrack_SeveralBuffers_Simulcast(t *testing.T) { }, }, } - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, trackID), stat1) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, trackID), stat1) // flush fixture.flush() @@ -591,7 +591,7 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) { }, }, } - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_UPSTREAM, partSID, "trackID"), stat1) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_UPSTREAM, partSID, "trackID"), stat1) // downstream bytes stat2 := &livekit.AnalyticsStat{ Streams: []*livekit.AnalyticsStream{ @@ -601,7 +601,7 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) { }, }, } - fixture.sut.TrackStats(telemetry.StatsKeyForData(livekit.StreamType_DOWNSTREAM, partSID, "trackID1"), stat2) + fixture.sut.TrackStats(telemetry.StatsKeyForData("test", livekit.StreamType_DOWNSTREAM, partSID, "trackID1"), stat2) // flush fixture.flush() diff --git a/pkg/telemetry/statsconn.go b/pkg/telemetry/statsconn.go index b03d0bcaf..308dc3806 100644 --- a/pkg/telemetry/statsconn.go +++ b/pkg/telemetry/statsconn.go @@ -52,7 +52,8 @@ func NewConn(c net.Conn, direction prometheus.Direction) *Conn { func (c *Conn) Read(b []byte) (n int, err error) { n, err = c.Conn.Read(b) if n > 0 { - prometheus.IncrementBytes(prometheus.Incoming, uint64(n), false) + prometheus.IncrementBytes("", prometheus.Incoming, uint64(n), false) + prometheus.IncrementPackets("", prometheus.Incoming, 1, false) } return } @@ -60,7 +61,8 @@ func (c *Conn) Read(b []byte) (n int, err error) { func (c *Conn) Write(b []byte) (n int, err error) { n, err = c.Conn.Write(b) if n > 0 { - prometheus.IncrementBytes(prometheus.Outgoing, uint64(n), false) + prometheus.IncrementBytes("", prometheus.Outgoing, uint64(n), false) + prometheus.IncrementPackets("", prometheus.Outgoing, 1, false) } return } @@ -83,8 +85,8 @@ func NewPacketConn(c net.PacketConn, direction prometheus.Direction) *PacketConn func (c *PacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { n, addr, err = c.PacketConn.ReadFrom(p) if n > 0 { - prometheus.IncrementBytes(prometheus.Incoming, uint64(n), false) - prometheus.IncrementPackets(prometheus.Incoming, 1, false) + prometheus.IncrementBytes("", prometheus.Incoming, uint64(n), false) + prometheus.IncrementPackets("", prometheus.Incoming, 1, false) } return } @@ -92,8 +94,8 @@ func (c *PacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { func (c *PacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { n, err = c.PacketConn.WriteTo(p, addr) if n > 0 { - prometheus.IncrementBytes(prometheus.Outgoing, uint64(n), false) - prometheus.IncrementPackets(prometheus.Outgoing, 1, false) + prometheus.IncrementBytes("", prometheus.Outgoing, uint64(n), false) + prometheus.IncrementPackets("", prometheus.Outgoing, 1, false) } return }