include packet and system info with node stats (#92)

* include packet and system info with node stats

* add more packet and system stats

* revert magefile, clean up system stats error handling

* update procol version

* dz review: error return
This commit is contained in:
Mathew Kamkar
2021-08-25 12:04:28 -07:00
committed by GitHub
parent f5466c74e6
commit f9590afe1a
7 changed files with 133 additions and 28 deletions

View File

@@ -150,17 +150,41 @@ func listNodes(c *cli.Context) error {
}
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"ID", "IP Address", "Num CPUs", "Num Clients", "Num Rooms", "Num Tracks In", "Num Tracks Out", "Started At", "Updated At"})
table.SetHeader([]string{
"ID", "IP Address",
"CPUs", "Load",
"Clients", "Rooms", "Tracks In/Out",
"Bytes In/Out", "Packets In/Out", "Nack", "Bps In/Out", "Pps In/Out", "Nack/Sec",
"Started At", "Updated At",
})
for _, node := range nodes {
cpus := strconv.Itoa(int(node.NumCpus))
// System stats
cpus := strconv.Itoa(int(node.Stats.NumCpus))
loadAvg := fmt.Sprintf("%.2f, %.2f, %.2f", node.Stats.LoadAvgLast1Min, node.Stats.LoadAvgLast5Min, node.Stats.LoadAvgLast15Min)
// Room stats
clients := strconv.Itoa(int(node.Stats.NumClients))
rooms := strconv.Itoa(int(node.Stats.NumRooms))
tracksIn := strconv.Itoa(int(node.Stats.NumTracksIn))
tracksOut := strconv.Itoa(int(node.Stats.NumTracksOut))
tracks := fmt.Sprintf("%d / %d", node.Stats.NumTracksIn, node.Stats.NumTracksOut)
// Packet stats
bytes := fmt.Sprintf("%d / %d", node.Stats.BytesIn, node.Stats.BytesOut)
packets := fmt.Sprintf("%d / %d", node.Stats.PacketsIn, node.Stats.PacketsOut)
nack := strconv.Itoa(int(node.Stats.NackTotal))
bps := fmt.Sprintf("%.2f / %.2f", node.Stats.BytesInPerSec, node.Stats.BytesOutPerSec)
packetsPerSec := fmt.Sprintf("%.2f / %.2f", node.Stats.PacketsInPerSec, node.Stats.PacketsOutPerSec)
nackPerSec := fmt.Sprintf("%f", node.Stats.NackPerSec)
startedAt := time.Unix(node.Stats.StartedAt, 0).String()
updatedAt := time.Unix(node.Stats.UpdatedAt, 0).String()
table.Append([]string{node.Id, node.Ip, cpus, clients, rooms, tracksIn, tracksOut, startedAt, updatedAt})
table.Append([]string{
node.Id, node.Ip,
cpus, loadAvg,
clients, rooms, tracks,
bytes, packets, nack, bps, packetsPerSec, nackPerSec,
startedAt, updatedAt,
})
}
table.Render()

3
go.mod
View File

@@ -4,6 +4,7 @@ go 1.15
require (
github.com/bep/debounce v1.2.0
github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/gammazero/workerpool v1.1.2
github.com/go-logr/logr v1.0.0
@@ -13,7 +14,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/livekit/protocol v0.7.4
github.com/livekit/protocol v0.7.5
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0

6
go.sum
View File

@@ -43,6 +43,8 @@ github.com/bep/debounce v1.2.0 h1:wXds8Kq8qRfwAOpAxHrJDbCXgC5aHSzgQb/0gKsHQqo=
github.com/bep/debounce v1.2.0/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8 h1:SjZ2GvvOononHOpK84APFuMvxqsk3tEIaKH/z4Rpu3g=
github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8/go.mod h1:uEyr4WpAH4hio6LFriaPkL938XnrvLpNPmQHBdrmbIE=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@@ -238,8 +240,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/ion-sfu v1.20.7 h1:aAkdDC/cL7oGAfhhqltTecARdEnyUYhdDlfyX4QESB0=
github.com/livekit/ion-sfu v1.20.7/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA=
github.com/livekit/protocol v0.7.4 h1:t44jtmvYa2ENfwG/CxTqvgAmhDXeSMIysTk4HvSl3oU=
github.com/livekit/protocol v0.7.4/go.mod h1:Vk04t1uIJa+U2L5SeANEmDl6ebjc9tKVi4kk3CpqW74=
github.com/livekit/protocol v0.7.5 h1:DGI1RUq9nQK78BPpzzPYTV+24OCdBNht5jexFabKzv4=
github.com/livekit/protocol v0.7.5/go.mod h1:Vk04t1uIJa+U2L5SeANEmDl6ebjc9tKVi4kk3CpqW74=
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=
github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=

View File

@@ -309,7 +309,9 @@ func (r *RedisRouter) statsWorker() {
// update periodically seconds
select {
case <-time.After(statsUpdateInterval):
stats.UpdateCurrentNodeStats(r.currentNode.Stats)
if err := stats.UpdateCurrentNodeStats(r.currentNode.Stats); err != nil {
logger.Errorw("could not update node stats", err, "nodeID", r.currentNode.Id)
}
if err := r.RegisterNode(); err != nil {
logger.Errorw("could not update node", err, "nodeID", r.currentNode.Id)
}

View File

@@ -3,11 +3,18 @@ package stats
import (
"sync/atomic"
livekit "github.com/livekit/livekit-server/proto"
"github.com/pion/rtcp"
"github.com/prometheus/client_golang/prometheus"
)
var (
atomicBytesIn uint64
atomicBytesOut uint64
atomicPacketsIn uint64
atomicPacketsOut uint64
atomicNackTotal uint64
promPacketTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "packet",
@@ -64,16 +71,27 @@ func newPacketStats(room, direction string) *PacketStats {
func (s *PacketStats) IncrementBytes(bytes uint64) {
promPacketBytes.WithLabelValues(s.direction).Add(float64(bytes))
atomic.AddUint64(&s.PacketBytes, bytes)
if s.direction == "incoming" {
atomic.AddUint64(&atomicBytesIn, bytes)
} else {
atomic.AddUint64(&atomicBytesOut, bytes)
}
}
func (s *PacketStats) IncrementPackets(count uint64) {
promPacketTotal.WithLabelValues(s.direction).Add(float64(count))
atomic.AddUint64(&s.PacketTotal, count)
if s.direction == "incoming" {
atomic.AddUint64(&atomicPacketsIn, count)
} else {
atomic.AddUint64(&atomicPacketsOut, count)
}
}
func (s *PacketStats) IncrementNack(count uint64) {
promNackTotal.WithLabelValues(s.direction).Add(float64(count))
atomic.AddUint64(&s.NackTotal, count)
atomic.AddUint64(&atomicNackTotal, count)
}
func (s *PacketStats) IncrementPLI(count uint64) {
@@ -110,3 +128,31 @@ func (s PacketStats) Copy() *PacketStats {
FIRTotal: atomic.LoadUint64(&s.FIRTotal),
}
}
func updateCurrentNodePacketStats(nodeStats *livekit.NodeStats, secondsSinceLastUpdate int64) {
if secondsSinceLastUpdate == 0 {
return
}
bytesInPrevious := nodeStats.BytesIn
bytesOutPrevious := nodeStats.BytesOut
packetsInPrevious := nodeStats.PacketsIn
packetsOutPrevious := nodeStats.PacketsOut
nackTotalPrevious := nodeStats.NackTotal
nodeStats.BytesIn = atomic.LoadUint64(&atomicBytesIn)
nodeStats.BytesOut = atomic.LoadUint64(&atomicBytesOut)
nodeStats.PacketsIn = atomic.LoadUint64(&atomicPacketsIn)
nodeStats.PacketsOut = atomic.LoadUint64(&atomicPacketsOut)
nodeStats.NackTotal = atomic.LoadUint64(&atomicNackTotal)
nodeStats.BytesInPerSec = perSec(bytesInPrevious, nodeStats.BytesIn, secondsSinceLastUpdate)
nodeStats.BytesOutPerSec = perSec(bytesOutPrevious, nodeStats.BytesOut, secondsSinceLastUpdate)
nodeStats.PacketsInPerSec = perSec(packetsInPrevious, nodeStats.PacketsIn, secondsSinceLastUpdate)
nodeStats.PacketsOutPerSec = perSec(packetsOutPrevious, nodeStats.PacketsOut, secondsSinceLastUpdate)
nodeStats.NackPerSec = perSec(nackTotalPrevious, nodeStats.NackTotal, secondsSinceLastUpdate)
}
func perSec(prev, curr uint64, secs int64) float32 {
return float32(curr-prev) / float32(secs)
}

View File

@@ -4,14 +4,15 @@ import (
"sync/atomic"
"time"
livekit "github.com/livekit/livekit-server/proto"
"github.com/prometheus/client_golang/prometheus"
)
var (
roomTotal int32
participantTotal int32
trackPublishedTotal int32
trackSubscribedTotal int32
atomicRoomTotal int32
atomicParticipantTotal int32
atomicTrackPublishedTotal int32
atomicTrackSubscribedTotal int32
promRoomTotal = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: livekitNamespace,
@@ -70,7 +71,7 @@ func NewRoomStatsReporter(roomName string) *RoomStatsReporter {
func (r *RoomStatsReporter) RoomStarted() {
r.startedAt = time.Now()
promRoomTotal.Add(1)
atomic.AddInt32(&roomTotal, 1)
atomic.AddInt32(&atomicRoomTotal, 1)
}
func (r *RoomStatsReporter) RoomEnded() {
@@ -78,35 +79,42 @@ func (r *RoomStatsReporter) RoomEnded() {
promRoomDuration.Observe(float64(time.Now().Sub(r.startedAt)) / float64(time.Second))
}
promRoomTotal.Sub(1)
atomic.AddInt32(&roomTotal, -1)
atomic.AddInt32(&atomicRoomTotal, -1)
}
func (r *RoomStatsReporter) AddParticipant() {
promParticipantTotal.Add(1)
atomic.AddInt32(&participantTotal, 1)
atomic.AddInt32(&atomicParticipantTotal, 1)
}
func (r *RoomStatsReporter) SubParticipant() {
promParticipantTotal.Sub(1)
atomic.AddInt32(&participantTotal, -1)
atomic.AddInt32(&atomicParticipantTotal, -1)
}
func (r *RoomStatsReporter) AddPublishedTrack(kind string) {
promTrackPublishedTotal.WithLabelValues(kind).Add(1)
atomic.AddInt32(&trackPublishedTotal, 1)
atomic.AddInt32(&atomicTrackPublishedTotal, 1)
}
func (r *RoomStatsReporter) SubPublishedTrack(kind string) {
promTrackPublishedTotal.WithLabelValues(kind).Sub(1)
atomic.AddInt32(&trackPublishedTotal, -1)
atomic.AddInt32(&atomicTrackPublishedTotal, -1)
}
func (r *RoomStatsReporter) AddSubscribedTrack(kind string) {
promTrackSubscribedTotal.WithLabelValues(kind).Add(1)
atomic.AddInt32(&trackSubscribedTotal, 1)
atomic.AddInt32(&atomicTrackSubscribedTotal, 1)
}
func (r *RoomStatsReporter) SubSubscribedTrack(kind string) {
promTrackSubscribedTotal.WithLabelValues(kind).Sub(1)
atomic.AddInt32(&trackSubscribedTotal, -1)
atomic.AddInt32(&atomicTrackSubscribedTotal, -1)
}
func updateCurrentNodeRoomStats(nodeStats *livekit.NodeStats) {
nodeStats.NumClients = atomic.LoadInt32(&atomicParticipantTotal)
nodeStats.NumRooms = atomic.LoadInt32(&atomicRoomTotal)
nodeStats.NumTracksIn = atomic.LoadInt32(&atomicTrackPublishedTotal)
nodeStats.NumTracksOut = atomic.LoadInt32(&atomicTrackSubscribedTotal)
}

View File

@@ -2,9 +2,9 @@ package stats
import (
"io"
"sync/atomic"
"time"
linuxproc "github.com/c9s/goprocinfo/linux"
livekit "github.com/livekit/livekit-server/proto"
"github.com/pion/interceptor"
"github.com/pion/rtcp"
@@ -85,11 +85,33 @@ func (s *StatsInterceptor) BindLocalStream(_ *interceptor.StreamInfo, writer int
})
}
func UpdateCurrentNodeStats(nodeStats *livekit.NodeStats) {
nodeStats.NumClients = uint32(atomic.LoadInt32(&participantTotal))
nodeStats.NumRooms = uint32(atomic.LoadInt32(&roomTotal))
nodeStats.NumTracksIn = uint32(atomic.LoadInt32(&trackPublishedTotal))
nodeStats.NumTracksOut = uint32(atomic.LoadInt32(&trackSubscribedTotal))
func UpdateCurrentNodeStats(nodeStats *livekit.NodeStats) error {
updatedAtPrevious := nodeStats.UpdatedAt
nodeStats.UpdatedAt = time.Now().Unix()
secondsSinceLastUpdate := nodeStats.UpdatedAt - updatedAtPrevious
err := updateCurrentNodeSystemStats(nodeStats)
updateCurrentNodeRoomStats(nodeStats)
updateCurrentNodePacketStats(nodeStats, secondsSinceLastUpdate)
return err
}
func updateCurrentNodeSystemStats(nodeStats *livekit.NodeStats) error {
cpuInfo, err := linuxproc.ReadCPUInfo("/proc/cpuinfo")
if err != nil {
return err
}
loadAvg, err := linuxproc.ReadLoadAvg("/proc/loadavg")
if err != nil {
return err
}
nodeStats.NumCpus = uint32(cpuInfo.NumCPU())
nodeStats.LoadAvgLast1Min = float32(loadAvg.Last1Min)
nodeStats.LoadAvgLast5Min = float32(loadAvg.Last5Min)
nodeStats.LoadAvgLast15Min = float32(loadAvg.Last15Min)
return nil
}