Forwarding latency measurement tweaks. (#4080)

* Forwarding latency measurement tweaks.

- prom transmission type public
- do not measure short term values as it is not used and saves some lock
  contention time in packet path potentially. Adding a separate method
  for that.
- Change latency/jitter summary reporting to `ns` also to match the
  histogram.

* add GetShortStats
This commit is contained in:
Raja Subramanian
2025-11-13 18:39:49 +05:30
committed by GitHub
parent f4929f099e
commit f8b994d491
2 changed files with 25 additions and 22 deletions
+14 -11
View File
@@ -53,7 +53,7 @@ func (s *ForwardStats) Update(arrival, left int64) (int64, bool) {
return transit, isHighForwardingLatency
}
func (s *ForwardStats) GetStats(shortDuration time.Duration) (time.Duration, time.Duration, time.Duration, time.Duration) {
func (s *ForwardStats) GetStats(shortDuration time.Duration) (time.Duration, time.Duration) {
s.lock.Lock()
// a dummy sample to flush the pipe to current time
now := mono.UnixNano()
@@ -62,7 +62,6 @@ func (s *ForwardStats) GetStats(shortDuration time.Duration) (time.Duration, tim
}
wLong := s.latency.Summarize()
wShort := s.latency.SummarizeLast(shortDuration)
lowest := s.lowest
s.lowest = time.Second.Nanoseconds()
@@ -72,8 +71,7 @@ func (s *ForwardStats) GetStats(shortDuration time.Duration) (time.Duration, tim
s.lock.Unlock()
latencyLong, jitterLong := time.Duration(wLong.Mean()), time.Duration(wLong.StdDev())
latencyShort, jitterShort := time.Duration(wShort.Mean()), time.Duration(wShort.StdDev())
if latencyShort > cHighForwardingLatency/2 && jitterLong > latencyLong*cSkewFactor {
if jitterLong > latencyLong*cSkewFactor {
logger.Infow(
"high jitter in forwarding path",
"lowest", time.Duration(lowest),
@@ -81,12 +79,17 @@ func (s *ForwardStats) GetStats(shortDuration time.Duration) (time.Duration, tim
"countLong", wLong.Count(),
"latencyLong", latencyLong,
"jitterLong", jitterLong,
"countShort", wShort.Count(),
"latencyShort", latencyShort,
"jitterShort", jitterShort,
)
}
return latencyLong, jitterLong, latencyShort, jitterShort
return latencyLong, jitterLong
}
func (s *ForwardStats) GetShortStats(shortDuration time.Duration) (time.Duration, time.Duration) {
s.lock.Lock()
wShort := s.latency.SummarizeLast(shortDuration)
s.lock.Unlock()
return time.Duration(wShort.Mean()), time.Duration(wShort.StdDev())
}
func (s *ForwardStats) Stop() {
@@ -103,9 +106,9 @@ func (s *ForwardStats) report(reportInterval time.Duration) {
return
case <-ticker.C:
latencyLong, jitterLong, latencyShort, jitterShort := s.GetStats(reportInterval)
prometheus.RecordForwardJitter(uint32(jitterShort.Microseconds()), uint32(jitterLong.Microseconds()))
prometheus.RecordForwardLatency(uint32(latencyShort.Microseconds()), uint32(latencyLong.Microseconds()))
latencyLong, jitterLong := s.GetStats(reportInterval)
prometheus.RecordForwardJitter(uint32(jitterLong.Nanoseconds()))
prometheus.RecordForwardLatency(uint32(latencyLong.Nanoseconds()))
}
}
}
+11 -11
View File
@@ -28,11 +28,11 @@ const (
Outgoing Direction = "outgoing"
)
type transmissionType string
type TransmissionType string
const (
transmissionInitial transmissionType = "initial"
transmissionRetransmit transmissionType = "retransmit"
TransmissionInitial TransmissionType = "initial"
TransmissionRetransmit TransmissionType = "retransmit"
)
var (
@@ -214,11 +214,11 @@ func initPacketStats(nodeID string, nodeType livekit.NodeType) {
}
func IncrementPackets(country string, direction Direction, count uint64, retransmit bool) {
var transmission transmissionType
var transmission TransmissionType
if retransmit {
transmission = transmissionRetransmit
transmission = TransmissionRetransmit
} else {
transmission = transmissionInitial
transmission = TransmissionInitial
}
promPacketTotal.WithLabelValues(string(direction), string(transmission), country).Add(float64(count))
@@ -233,11 +233,11 @@ func IncrementPackets(country string, direction Direction, count uint64, retrans
}
func IncrementBytes(country string, direction Direction, count uint64, retransmit bool) {
var transmission transmissionType
var transmission TransmissionType
if retransmit {
transmission = transmissionRetransmit
transmission = TransmissionRetransmit
} else {
transmission = transmissionInitial
transmission = TransmissionInitial
}
promPacketBytes.WithLabelValues(string(direction), string(transmission), country).Add(float64(count))
@@ -340,12 +340,12 @@ func RecordForwardLatencySample(forwardLatency int64) {
promForwardLatencyHist.Observe(float64(forwardLatency))
}
func RecordForwardLatency(shortTermLatencyAvg, longTermLatencyAvg uint32) {
func RecordForwardLatency(longTermLatencyAvg uint32) {
forwardLatency.Store(longTermLatencyAvg)
promForwardLatency.Set(float64(longTermLatencyAvg))
}
func RecordForwardJitter(shortTermJitterAvg, longTermJitterAvg uint32) {
func RecordForwardJitter(longTermJitterAvg uint32) {
forwardJitter.Store(longTermJitterAvg)
promForwardJitter.Set(float64(longTermJitterAvg))
}