From a19ca69f5f96349cbe3472a46bd2f40ff2cfaf89 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 18 Apr 2022 22:51:34 +0530 Subject: [PATCH] Prevent stats update if the deltas are empty (#619) * Prevent stats update if the deltas are empty * increase force interval * static check * Change max delay to 30 seconds --- pkg/routing/redisrouter.go | 6 ++++-- pkg/routing/selector/utils.go | 9 +++++++-- pkg/telemetry/prometheus/node.go | 14 +++++++++++++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 8a5d8f71d..db6f1f5d7 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -26,7 +26,7 @@ const ( // expire participant mappings after a day participantMappingTTL = 24 * time.Hour statsUpdateInterval = 2 * time.Second - statsMaxDelaySeconds = 10 + statsMaxDelaySeconds = 30 ) // RedisRouter uses Redis pub/sub to route signaling messages across different nodes @@ -483,7 +483,9 @@ func (r *RedisRouter) handleRTCMessage(rm *livekit.RTCNodeMessage) error { if err != nil { logger.Errorw("could not update node stats", err) } else { - r.currentNode.Stats = updated + if updated != nil { + r.currentNode.Stats = updated + } } r.statsMu.Unlock() diff --git a/pkg/routing/selector/utils.go b/pkg/routing/selector/utils.go index 25ac25485..7ab021374 100644 --- a/pkg/routing/selector/utils.go +++ b/pkg/routing/selector/utils.go @@ -15,6 +15,11 @@ const AvailableSeconds = 5 // checks if a node has been updated recently to be considered for selection func IsAvailable(node *livekit.Node) bool { + if node.Stats == nil { + // available till stats are available + return true + } + delta := time.Now().Unix() - node.Stats.UpdatedAt return int(delta) < AvailableSeconds } @@ -82,12 +87,12 @@ func SelectSortedNode(nodes []*livekit.Node, sortBy string) (*livekit.Node, erro return nodes[0], nil case "tracks": sort.Slice(nodes, func(i, j int) bool { - return nodes[i].Stats.NumTracksIn + nodes[i].Stats.NumTracksOut < nodes[j].Stats.NumTracksIn + nodes[j].Stats.NumTracksOut + return nodes[i].Stats.NumTracksIn+nodes[i].Stats.NumTracksOut < nodes[j].Stats.NumTracksIn+nodes[j].Stats.NumTracksOut }) return nodes[0], nil case "bytespersec": sort.Slice(nodes, func(i, j int) bool { - return nodes[i].Stats.BytesInPerSec + nodes[i].Stats.BytesOutPerSec < nodes[j].Stats.BytesInPerSec + nodes[j].Stats.BytesOutPerSec + return nodes[i].Stats.BytesInPerSec+nodes[i].Stats.BytesOutPerSec < nodes[j].Stats.BytesInPerSec+nodes[j].Stats.BytesOutPerSec }) return nodes[0], nil default: diff --git a/pkg/telemetry/prometheus/node.go b/pkg/telemetry/prometheus/node.go index 4aa6676e0..011459f02 100644 --- a/pkg/telemetry/prometheus/node.go +++ b/pkg/telemetry/prometheus/node.go @@ -10,7 +10,10 @@ import ( "github.com/livekit/protocol/utils" ) -const livekitNamespace string = "livekit" +const ( + livekitNamespace string = "livekit" + forceUpdateInterval int64 = 15 +) var ( MessageCounter *prometheus.CounterVec @@ -66,6 +69,15 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats) (*livekit.NodeStats, error) { packetsOutNow := packetsOut.Load() nackTotalNow := nackTotal.Load() + if bytesInNow == prev.BytesIn && + bytesOutNow == prev.BytesOut && + packetsInNow == prev.PacketsIn && + packetsOutNow == prev.PacketsOut && + nackTotalNow == prev.NackTotal && + elapsed < forceUpdateInterval { + return nil, nil + } + return &livekit.NodeStats{ StartedAt: prev.StartedAt, UpdatedAt: updatedAt,