diff --git a/pkg/sfu/forwardstats.go b/pkg/sfu/forwardstats.go index 2922254aa..34daa0cca 100644 --- a/pkg/sfu/forwardstats.go +++ b/pkg/sfu/forwardstats.go @@ -53,7 +53,7 @@ func (s *ForwardStats) Update(arrival, left int64) (int64, bool) { return transit, isHighForwardingLatency } -func (s *ForwardStats) GetStats(shortDuration time.Duration) (time.Duration, time.Duration, time.Duration, time.Duration) { +func (s *ForwardStats) GetStats(shortDuration time.Duration) (time.Duration, time.Duration) { s.lock.Lock() // a dummy sample to flush the pipe to current time now := mono.UnixNano() @@ -62,7 +62,6 @@ func (s *ForwardStats) GetStats(shortDuration time.Duration) (time.Duration, tim } wLong := s.latency.Summarize() - wShort := s.latency.SummarizeLast(shortDuration) lowest := s.lowest s.lowest = time.Second.Nanoseconds() @@ -72,8 +71,7 @@ func (s *ForwardStats) GetStats(shortDuration time.Duration) (time.Duration, tim s.lock.Unlock() latencyLong, jitterLong := time.Duration(wLong.Mean()), time.Duration(wLong.StdDev()) - latencyShort, jitterShort := time.Duration(wShort.Mean()), time.Duration(wShort.StdDev()) - if latencyShort > cHighForwardingLatency/2 && jitterLong > latencyLong*cSkewFactor { + if jitterLong > latencyLong*cSkewFactor { logger.Infow( "high jitter in forwarding path", "lowest", time.Duration(lowest), @@ -81,12 +79,17 @@ func (s *ForwardStats) GetStats(shortDuration time.Duration) (time.Duration, tim "countLong", wLong.Count(), "latencyLong", latencyLong, "jitterLong", jitterLong, - "countShort", wShort.Count(), - "latencyShort", latencyShort, - "jitterShort", jitterShort, ) } - return latencyLong, jitterLong, latencyShort, jitterShort + return latencyLong, jitterLong +} + +func (s *ForwardStats) GetShortStats(shortDuration time.Duration) (time.Duration, time.Duration) { + s.lock.Lock() + wShort := s.latency.SummarizeLast(shortDuration) + s.lock.Unlock() + + return time.Duration(wShort.Mean()), time.Duration(wShort.StdDev()) } func (s *ForwardStats) Stop() { @@ -103,9 +106,9 @@ func (s *ForwardStats) report(reportInterval time.Duration) { return case <-ticker.C: - latencyLong, jitterLong, latencyShort, jitterShort := s.GetStats(reportInterval) - prometheus.RecordForwardJitter(uint32(jitterShort.Microseconds()), uint32(jitterLong.Microseconds())) - prometheus.RecordForwardLatency(uint32(latencyShort.Microseconds()), uint32(latencyLong.Microseconds())) + latencyLong, jitterLong := s.GetStats(reportInterval) + prometheus.RecordForwardJitter(uint32(jitterLong.Nanoseconds())) + prometheus.RecordForwardLatency(uint32(latencyLong.Nanoseconds())) } } } diff --git a/pkg/telemetry/prometheus/packets.go b/pkg/telemetry/prometheus/packets.go index dcff38ceb..d9681748a 100644 --- a/pkg/telemetry/prometheus/packets.go +++ b/pkg/telemetry/prometheus/packets.go @@ -28,11 +28,11 @@ const ( Outgoing Direction = "outgoing" ) -type transmissionType string +type TransmissionType string const ( - transmissionInitial transmissionType = "initial" - transmissionRetransmit transmissionType = "retransmit" + TransmissionInitial TransmissionType = "initial" + TransmissionRetransmit TransmissionType = "retransmit" ) var ( @@ -214,11 +214,11 @@ func initPacketStats(nodeID string, nodeType livekit.NodeType) { } func IncrementPackets(country string, direction Direction, count uint64, retransmit bool) { - var transmission transmissionType + var transmission TransmissionType if retransmit { - transmission = transmissionRetransmit + transmission = TransmissionRetransmit } else { - transmission = transmissionInitial + transmission = TransmissionInitial } promPacketTotal.WithLabelValues(string(direction), string(transmission), country).Add(float64(count)) @@ -233,11 +233,11 @@ func IncrementPackets(country string, direction Direction, count uint64, retrans } func IncrementBytes(country string, direction Direction, count uint64, retransmit bool) { - var transmission transmissionType + var transmission TransmissionType if retransmit { - transmission = transmissionRetransmit + transmission = TransmissionRetransmit } else { - transmission = transmissionInitial + transmission = TransmissionInitial } promPacketBytes.WithLabelValues(string(direction), string(transmission), country).Add(float64(count)) @@ -340,12 +340,12 @@ func RecordForwardLatencySample(forwardLatency int64) { promForwardLatencyHist.Observe(float64(forwardLatency)) } -func RecordForwardLatency(shortTermLatencyAvg, longTermLatencyAvg uint32) { +func RecordForwardLatency(longTermLatencyAvg uint32) { forwardLatency.Store(longTermLatencyAvg) promForwardLatency.Set(float64(longTermLatencyAvg)) } -func RecordForwardJitter(shortTermJitterAvg, longTermJitterAvg uint32) { +func RecordForwardJitter(longTermJitterAvg uint32) { forwardJitter.Store(longTermJitterAvg) promForwardJitter.Set(float64(longTermJitterAvg)) }