diff --git a/cmd/server/commands.go b/cmd/server/commands.go index b7732a468..2a0a52ec9 100644 --- a/cmd/server/commands.go +++ b/cmd/server/commands.go @@ -150,17 +150,41 @@ func listNodes(c *cli.Context) error { } table := tablewriter.NewWriter(os.Stdout) - table.SetHeader([]string{"ID", "IP Address", "Num CPUs", "Num Clients", "Num Rooms", "Num Tracks In", "Num Tracks Out", "Started At", "Updated At"}) + table.SetHeader([]string{ + "ID", "IP Address", + "CPUs", "Load", + "Clients", "Rooms", "Tracks In/Out", + "Bytes In/Out", "Packets In/Out", "Nack", "Bps In/Out", "Pps In/Out", "Nack/Sec", + "Started At", "Updated At", + }) for _, node := range nodes { - cpus := strconv.Itoa(int(node.NumCpus)) + // System stats + cpus := strconv.Itoa(int(node.Stats.NumCpus)) + loadAvg := fmt.Sprintf("%.2f, %.2f, %.2f", node.Stats.LoadAvgLast1Min, node.Stats.LoadAvgLast5Min, node.Stats.LoadAvgLast15Min) + + // Room stats clients := strconv.Itoa(int(node.Stats.NumClients)) rooms := strconv.Itoa(int(node.Stats.NumRooms)) - tracksIn := strconv.Itoa(int(node.Stats.NumTracksIn)) - tracksOut := strconv.Itoa(int(node.Stats.NumTracksOut)) + tracks := fmt.Sprintf("%d / %d", node.Stats.NumTracksIn, node.Stats.NumTracksOut) + + // Packet stats + bytes := fmt.Sprintf("%d / %d", node.Stats.BytesIn, node.Stats.BytesOut) + packets := fmt.Sprintf("%d / %d", node.Stats.PacketsIn, node.Stats.PacketsOut) + nack := strconv.Itoa(int(node.Stats.NackTotal)) + bps := fmt.Sprintf("%.2f / %.2f", node.Stats.BytesInPerSec, node.Stats.BytesOutPerSec) + packetsPerSec := fmt.Sprintf("%.2f / %.2f", node.Stats.PacketsInPerSec, node.Stats.PacketsOutPerSec) + nackPerSec := fmt.Sprintf("%f", node.Stats.NackPerSec) startedAt := time.Unix(node.Stats.StartedAt, 0).String() updatedAt := time.Unix(node.Stats.UpdatedAt, 0).String() - table.Append([]string{node.Id, node.Ip, cpus, clients, rooms, tracksIn, tracksOut, startedAt, updatedAt}) + + table.Append([]string{ + node.Id, node.Ip, + cpus, loadAvg, + clients, rooms, tracks, + bytes, packets, nack, bps, packetsPerSec, nackPerSec, + startedAt, updatedAt, + }) } table.Render() diff --git a/go.mod b/go.mod index 55db9ea7b..d49ad196c 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.15 require ( github.com/bep/debounce v1.2.0 + github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8 github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/gammazero/workerpool v1.1.2 github.com/go-logr/logr v1.0.0 @@ -13,7 +14,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.7.4 + github.com/livekit/protocol v0.7.5 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index b2e2eb113..e86695304 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,8 @@ github.com/bep/debounce v1.2.0 h1:wXds8Kq8qRfwAOpAxHrJDbCXgC5aHSzgQb/0gKsHQqo= github.com/bep/debounce v1.2.0/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= +github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8 h1:SjZ2GvvOononHOpK84APFuMvxqsk3tEIaKH/z4Rpu3g= +github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8/go.mod h1:uEyr4WpAH4hio6LFriaPkL938XnrvLpNPmQHBdrmbIE= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -238,8 +240,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/ion-sfu v1.20.7 h1:aAkdDC/cL7oGAfhhqltTecARdEnyUYhdDlfyX4QESB0= github.com/livekit/ion-sfu v1.20.7/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA= -github.com/livekit/protocol v0.7.4 h1:t44jtmvYa2ENfwG/CxTqvgAmhDXeSMIysTk4HvSl3oU= -github.com/livekit/protocol v0.7.4/go.mod h1:Vk04t1uIJa+U2L5SeANEmDl6ebjc9tKVi4kk3CpqW74= +github.com/livekit/protocol v0.7.5 h1:DGI1RUq9nQK78BPpzzPYTV+24OCdBNht5jexFabKzv4= +github.com/livekit/protocol v0.7.5/go.mod h1:Vk04t1uIJa+U2L5SeANEmDl6ebjc9tKVi4kk3CpqW74= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index cd4b8da5d..e8ce10a97 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -309,7 +309,9 @@ func (r *RedisRouter) statsWorker() { // update periodically seconds select { case <-time.After(statsUpdateInterval): - stats.UpdateCurrentNodeStats(r.currentNode.Stats) + if err := stats.UpdateCurrentNodeStats(r.currentNode.Stats); err != nil { + logger.Errorw("could not update node stats", err, "nodeID", r.currentNode.Id) + } if err := r.RegisterNode(); err != nil { logger.Errorw("could not update node", err, "nodeID", r.currentNode.Id) } diff --git a/pkg/utils/stats/packetstats.go b/pkg/utils/stats/packetstats.go index 6a2f9d105..e4601d261 100644 --- a/pkg/utils/stats/packetstats.go +++ b/pkg/utils/stats/packetstats.go @@ -3,11 +3,18 @@ package stats import ( "sync/atomic" + livekit "github.com/livekit/livekit-server/proto" "github.com/pion/rtcp" "github.com/prometheus/client_golang/prometheus" ) var ( + atomicBytesIn uint64 + atomicBytesOut uint64 + atomicPacketsIn uint64 + atomicPacketsOut uint64 + atomicNackTotal uint64 + promPacketTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: livekitNamespace, Subsystem: "packet", @@ -64,16 +71,27 @@ func newPacketStats(room, direction string) *PacketStats { func (s *PacketStats) IncrementBytes(bytes uint64) { promPacketBytes.WithLabelValues(s.direction).Add(float64(bytes)) atomic.AddUint64(&s.PacketBytes, bytes) + if s.direction == "incoming" { + atomic.AddUint64(&atomicBytesIn, bytes) + } else { + atomic.AddUint64(&atomicBytesOut, bytes) + } } func (s *PacketStats) IncrementPackets(count uint64) { promPacketTotal.WithLabelValues(s.direction).Add(float64(count)) atomic.AddUint64(&s.PacketTotal, count) + if s.direction == "incoming" { + atomic.AddUint64(&atomicPacketsIn, count) + } else { + atomic.AddUint64(&atomicPacketsOut, count) + } } func (s *PacketStats) IncrementNack(count uint64) { promNackTotal.WithLabelValues(s.direction).Add(float64(count)) atomic.AddUint64(&s.NackTotal, count) + atomic.AddUint64(&atomicNackTotal, count) } func (s *PacketStats) IncrementPLI(count uint64) { @@ -110,3 +128,31 @@ func (s PacketStats) Copy() *PacketStats { FIRTotal: atomic.LoadUint64(&s.FIRTotal), } } + +func updateCurrentNodePacketStats(nodeStats *livekit.NodeStats, secondsSinceLastUpdate int64) { + if secondsSinceLastUpdate == 0 { + return + } + + bytesInPrevious := nodeStats.BytesIn + bytesOutPrevious := nodeStats.BytesOut + packetsInPrevious := nodeStats.PacketsIn + packetsOutPrevious := nodeStats.PacketsOut + nackTotalPrevious := nodeStats.NackTotal + + nodeStats.BytesIn = atomic.LoadUint64(&atomicBytesIn) + nodeStats.BytesOut = atomic.LoadUint64(&atomicBytesOut) + nodeStats.PacketsIn = atomic.LoadUint64(&atomicPacketsIn) + nodeStats.PacketsOut = atomic.LoadUint64(&atomicPacketsOut) + nodeStats.NackTotal = atomic.LoadUint64(&atomicNackTotal) + + nodeStats.BytesInPerSec = perSec(bytesInPrevious, nodeStats.BytesIn, secondsSinceLastUpdate) + nodeStats.BytesOutPerSec = perSec(bytesOutPrevious, nodeStats.BytesOut, secondsSinceLastUpdate) + nodeStats.PacketsInPerSec = perSec(packetsInPrevious, nodeStats.PacketsIn, secondsSinceLastUpdate) + nodeStats.PacketsOutPerSec = perSec(packetsOutPrevious, nodeStats.PacketsOut, secondsSinceLastUpdate) + nodeStats.NackPerSec = perSec(nackTotalPrevious, nodeStats.NackTotal, secondsSinceLastUpdate) +} + +func perSec(prev, curr uint64, secs int64) float32 { + return float32(curr-prev) / float32(secs) +} diff --git a/pkg/utils/stats/roomstatsreporter.go b/pkg/utils/stats/roomstatsreporter.go index aba8c53b9..6018bab13 100644 --- a/pkg/utils/stats/roomstatsreporter.go +++ b/pkg/utils/stats/roomstatsreporter.go @@ -4,14 +4,15 @@ import ( "sync/atomic" "time" + livekit "github.com/livekit/livekit-server/proto" "github.com/prometheus/client_golang/prometheus" ) var ( - roomTotal int32 - participantTotal int32 - trackPublishedTotal int32 - trackSubscribedTotal int32 + atomicRoomTotal int32 + atomicParticipantTotal int32 + atomicTrackPublishedTotal int32 + atomicTrackSubscribedTotal int32 promRoomTotal = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: livekitNamespace, @@ -70,7 +71,7 @@ func NewRoomStatsReporter(roomName string) *RoomStatsReporter { func (r *RoomStatsReporter) RoomStarted() { r.startedAt = time.Now() promRoomTotal.Add(1) - atomic.AddInt32(&roomTotal, 1) + atomic.AddInt32(&atomicRoomTotal, 1) } func (r *RoomStatsReporter) RoomEnded() { @@ -78,35 +79,42 @@ func (r *RoomStatsReporter) RoomEnded() { promRoomDuration.Observe(float64(time.Now().Sub(r.startedAt)) / float64(time.Second)) } promRoomTotal.Sub(1) - atomic.AddInt32(&roomTotal, -1) + atomic.AddInt32(&atomicRoomTotal, -1) } func (r *RoomStatsReporter) AddParticipant() { promParticipantTotal.Add(1) - atomic.AddInt32(&participantTotal, 1) + atomic.AddInt32(&atomicParticipantTotal, 1) } func (r *RoomStatsReporter) SubParticipant() { promParticipantTotal.Sub(1) - atomic.AddInt32(&participantTotal, -1) + atomic.AddInt32(&atomicParticipantTotal, -1) } func (r *RoomStatsReporter) AddPublishedTrack(kind string) { promTrackPublishedTotal.WithLabelValues(kind).Add(1) - atomic.AddInt32(&trackPublishedTotal, 1) + atomic.AddInt32(&atomicTrackPublishedTotal, 1) } func (r *RoomStatsReporter) SubPublishedTrack(kind string) { promTrackPublishedTotal.WithLabelValues(kind).Sub(1) - atomic.AddInt32(&trackPublishedTotal, -1) + atomic.AddInt32(&atomicTrackPublishedTotal, -1) } func (r *RoomStatsReporter) AddSubscribedTrack(kind string) { promTrackSubscribedTotal.WithLabelValues(kind).Add(1) - atomic.AddInt32(&trackSubscribedTotal, 1) + atomic.AddInt32(&atomicTrackSubscribedTotal, 1) } func (r *RoomStatsReporter) SubSubscribedTrack(kind string) { promTrackSubscribedTotal.WithLabelValues(kind).Sub(1) - atomic.AddInt32(&trackSubscribedTotal, -1) + atomic.AddInt32(&atomicTrackSubscribedTotal, -1) +} + +func updateCurrentNodeRoomStats(nodeStats *livekit.NodeStats) { + nodeStats.NumClients = atomic.LoadInt32(&atomicParticipantTotal) + nodeStats.NumRooms = atomic.LoadInt32(&atomicRoomTotal) + nodeStats.NumTracksIn = atomic.LoadInt32(&atomicTrackPublishedTotal) + nodeStats.NumTracksOut = atomic.LoadInt32(&atomicTrackSubscribedTotal) } diff --git a/pkg/utils/stats/stats.go b/pkg/utils/stats/stats.go index fc779b09e..a0430c672 100644 --- a/pkg/utils/stats/stats.go +++ b/pkg/utils/stats/stats.go @@ -2,9 +2,9 @@ package stats import ( "io" - "sync/atomic" "time" + linuxproc "github.com/c9s/goprocinfo/linux" livekit "github.com/livekit/livekit-server/proto" "github.com/pion/interceptor" "github.com/pion/rtcp" @@ -85,11 +85,33 @@ func (s *StatsInterceptor) BindLocalStream(_ *interceptor.StreamInfo, writer int }) } -func UpdateCurrentNodeStats(nodeStats *livekit.NodeStats) { - nodeStats.NumClients = uint32(atomic.LoadInt32(&participantTotal)) - nodeStats.NumRooms = uint32(atomic.LoadInt32(&roomTotal)) - nodeStats.NumTracksIn = uint32(atomic.LoadInt32(&trackPublishedTotal)) - nodeStats.NumTracksOut = uint32(atomic.LoadInt32(&trackSubscribedTotal)) - +func UpdateCurrentNodeStats(nodeStats *livekit.NodeStats) error { + updatedAtPrevious := nodeStats.UpdatedAt nodeStats.UpdatedAt = time.Now().Unix() + secondsSinceLastUpdate := nodeStats.UpdatedAt - updatedAtPrevious + + err := updateCurrentNodeSystemStats(nodeStats) + updateCurrentNodeRoomStats(nodeStats) + updateCurrentNodePacketStats(nodeStats, secondsSinceLastUpdate) + + return err +} + +func updateCurrentNodeSystemStats(nodeStats *livekit.NodeStats) error { + cpuInfo, err := linuxproc.ReadCPUInfo("/proc/cpuinfo") + if err != nil { + return err + } + + loadAvg, err := linuxproc.ReadLoadAvg("/proc/loadavg") + if err != nil { + return err + } + + nodeStats.NumCpus = uint32(cpuInfo.NumCPU()) + nodeStats.LoadAvgLast1Min = float32(loadAvg.Last1Min) + nodeStats.LoadAvgLast5Min = float32(loadAvg.Last5Min) + nodeStats.LoadAvgLast15Min = float32(loadAvg.Last15Min) + + return nil }