diff --git a/go.mod b/go.mod index 872ad24d6..e3e2765cd 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20221221221243-f361fbe40290 github.com/livekit/mediatransportutil v0.0.0-20230111071722-904079e94a7c - github.com/livekit/protocol v1.3.2 + github.com/livekit/protocol v1.3.3-0.20230123192015-7c8172cb6ec0 github.com/livekit/psrpc v0.2.4 github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 github.com/mackerelio/go-osstat v0.2.3 diff --git a/go.sum b/go.sum index c35a9b355..3375ec9a0 100644 --- a/go.sum +++ b/go.sum @@ -235,6 +235,8 @@ github.com/livekit/mediatransportutil v0.0.0-20230111071722-904079e94a7c h1:wdzw github.com/livekit/mediatransportutil v0.0.0-20230111071722-904079e94a7c/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw= github.com/livekit/protocol v1.3.2 h1:3goGWbB5HFRb3tMjog8KP0nvZL1Fy6zut3W1psBzqE4= github.com/livekit/protocol v1.3.2/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8= +github.com/livekit/protocol v1.3.3-0.20230123192015-7c8172cb6ec0 h1:l5VHOZj/vfKKWCp2dWpKnR9vp8USLCNd/ajQsj5FJVo= +github.com/livekit/protocol v1.3.3-0.20230123192015-7c8172cb6ec0/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8= github.com/livekit/psrpc v0.2.4 h1:Fdxq56uJAIpRHCTgJsvp7ozw51dKtUmD3nxSXq9pCLs= github.com/livekit/psrpc v0.2.4/go.mod h1:+nJvbKx9DCZ6PSAsMHJPRAKjmRJ5WiyyhEmbKYqMKto= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw= diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index 1e5891711..4a964d782 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -8,6 +8,7 @@ import ( "go.uber.org/atomic" "google.golang.org/protobuf/proto" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" @@ -82,6 +83,7 @@ func (r *LocalRouter) ListNodes() ([]*livekit.Node, error) { } func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error) { + prometheus.IncrementParticipantRtcInit(1) // treat it as a new participant connecting if r.onNewParticipant == nil { err = ErrHandlerNotDefined diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index d70e3857a..4d09a0469 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -207,6 +207,7 @@ func (r *RedisRouter) WriteNodeRTC(_ context.Context, rtcNodeID string, msg *liv } func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey) error { + prometheus.IncrementParticipantRtcInit(1) // find the node where the room is hosted at rtcNode, err := r.GetNodeForRoom(r.ctx, livekit.RoomName(ss.RoomName)) if err != nil { diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index a30a7f79e..907e81407 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -68,7 +68,7 @@ func (t *telemetryService) ParticipantJoined( shouldSendEvent bool, ) { t.enqueue(func() { - prometheus.IncrementParticipantJoin(1, true) + prometheus.IncrementParticipantRtcConnected(1) prometheus.AddParticipant() t.createWorker( diff --git a/pkg/telemetry/prometheus/node.go b/pkg/telemetry/prometheus/node.go index baedd2ed9..38a739347 100644 --- a/pkg/telemetry/prometheus/node.go +++ b/pkg/telemetry/prometheus/node.go @@ -132,8 +132,9 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats nackTotalNow := nackTotal.Load() retransmitBytesNow := retransmitBytes.Load() retransmitPacketsNow := retransmitPackets.Load() - participantJoinNow := participantJoin.Load() - participantRTCNow := participantRTC.Load() + participantSignalConnectedNow := participantSignalConnected.Load() + participantRTCInitNow := participantRTCInit.Load() + participantRTConnectedCNow := participantRTCConnected.Load() trackPublishAttemptsNow := trackPublishAttempts.Load() trackPublishSuccessNow := trackPublishSuccess.Load() trackSubscribeAttemptsNow := trackSubscribeAttempts.Load() @@ -153,47 +154,49 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats } stats := &livekit.NodeStats{ - StartedAt: prev.StartedAt, - UpdatedAt: updatedAt, - NumRooms: roomTotal.Load(), - NumClients: participantTotal.Load(), - NumTracksIn: trackPublishedTotal.Load(), - NumTracksOut: trackSubscribedTotal.Load(), - NumTrackPublishAttempts: trackPublishAttemptsNow, - NumTrackPublishSuccess: trackPublishSuccessNow, - NumTrackSubscribeAttempts: trackSubscribeAttemptsNow, - NumTrackSubscribeSuccess: trackSubscribeSuccessNow, - BytesIn: bytesInNow, - BytesOut: bytesOutNow, - PacketsIn: packetsInNow, - PacketsOut: packetsOutNow, - RetransmitBytesOut: retransmitBytesNow, - RetransmitPacketsOut: retransmitPacketsNow, - NackTotal: nackTotalNow, - ParticipantJoin: participantJoinNow, - ParticipantRtc: participantRTCNow, - BytesInPerSec: prevAverage.BytesInPerSec, - BytesOutPerSec: prevAverage.BytesOutPerSec, - PacketsInPerSec: prevAverage.PacketsInPerSec, - PacketsOutPerSec: prevAverage.PacketsOutPerSec, - RetransmitBytesOutPerSec: prevAverage.RetransmitBytesOutPerSec, - RetransmitPacketsOutPerSec: prevAverage.RetransmitPacketsOutPerSec, - NackPerSec: prevAverage.NackPerSec, - ParticipantJoinPerSec: prevAverage.ParticipantJoinPerSec, - ParticipantRtcPerSec: prevAverage.ParticipantRtcPerSec, - NumCpus: numCPUs, - CpuLoad: cpuLoad, - MemoryTotal: memTotal, - MemoryUsed: memUsed, - LoadAvgLast1Min: float32(loadAvg.Loadavg1), - LoadAvgLast5Min: float32(loadAvg.Loadavg5), - LoadAvgLast15Min: float32(loadAvg.Loadavg15), - SysPacketsOut: sysPackets, - SysPacketsDropped: sysDroppedPackets, - TrackPublishAttemptsPerSec: prevAverage.TrackPublishAttemptsPerSec, - TrackPublishSuccessPerSec: prevAverage.TrackPublishSuccessPerSec, - TrackSubscribeAttemptsPerSec: prevAverage.TrackSubscribeAttemptsPerSec, - TrackSubscribeSuccessPerSec: prevAverage.TrackSubscribeSuccessPerSec, + StartedAt: prev.StartedAt, + UpdatedAt: updatedAt, + NumRooms: roomTotal.Load(), + NumClients: participantTotal.Load(), + NumTracksIn: trackPublishedTotal.Load(), + NumTracksOut: trackSubscribedTotal.Load(), + NumTrackPublishAttempts: trackPublishAttemptsNow, + NumTrackPublishSuccess: trackPublishSuccessNow, + NumTrackSubscribeAttempts: trackSubscribeAttemptsNow, + NumTrackSubscribeSuccess: trackSubscribeSuccessNow, + BytesIn: bytesInNow, + BytesOut: bytesOutNow, + PacketsIn: packetsInNow, + PacketsOut: packetsOutNow, + RetransmitBytesOut: retransmitBytesNow, + RetransmitPacketsOut: retransmitPacketsNow, + NackTotal: nackTotalNow, + ParticipantSignalConnected: participantSignalConnectedNow, + ParticipantRtcInit: participantRTCInitNow, + ParticipantRtcConnected: participantRTConnectedCNow, + BytesInPerSec: prevAverage.BytesInPerSec, + BytesOutPerSec: prevAverage.BytesOutPerSec, + PacketsInPerSec: prevAverage.PacketsInPerSec, + PacketsOutPerSec: prevAverage.PacketsOutPerSec, + RetransmitBytesOutPerSec: prevAverage.RetransmitBytesOutPerSec, + RetransmitPacketsOutPerSec: prevAverage.RetransmitPacketsOutPerSec, + NackPerSec: prevAverage.NackPerSec, + ParticipantSignalConnectedPerSec: prevAverage.ParticipantSignalConnectedPerSec, + ParticipantRtcInitPerSec: prevAverage.ParticipantRtcInitPerSec, + ParticipantRtcConnectedPerSec: prevAverage.ParticipantRtcConnectedPerSec, + NumCpus: numCPUs, + CpuLoad: cpuLoad, + MemoryTotal: memTotal, + MemoryUsed: memUsed, + LoadAvgLast1Min: float32(loadAvg.Loadavg1), + LoadAvgLast5Min: float32(loadAvg.Loadavg5), + LoadAvgLast15Min: float32(loadAvg.Loadavg15), + SysPacketsOut: sysPackets, + SysPacketsDropped: sysDroppedPackets, + TrackPublishAttemptsPerSec: prevAverage.TrackPublishAttemptsPerSec, + TrackPublishSuccessPerSec: prevAverage.TrackPublishSuccessPerSec, + TrackSubscribeAttemptsPerSec: prevAverage.TrackSubscribeAttemptsPerSec, + TrackSubscribeSuccessPerSec: prevAverage.TrackSubscribeSuccessPerSec, } // update stats @@ -205,8 +208,9 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats stats.RetransmitBytesOutPerSec = perSec(prevAverage.RetransmitBytesOut, retransmitBytesNow, elapsed) stats.RetransmitPacketsOutPerSec = perSec(prevAverage.RetransmitPacketsOut, retransmitPacketsNow, elapsed) stats.NackPerSec = perSec(prevAverage.NackTotal, nackTotalNow, elapsed) - stats.ParticipantJoinPerSec = perSec(prevAverage.ParticipantJoin, participantJoinNow, elapsed) - stats.ParticipantRtcPerSec = perSec(prevAverage.ParticipantRtc, participantRTCNow, elapsed) + stats.ParticipantSignalConnectedPerSec = perSec(prevAverage.ParticipantSignalConnected, participantSignalConnectedNow, elapsed) + stats.ParticipantRtcInitPerSec = perSec(prevAverage.ParticipantRtcInit, participantRTCInitNow, elapsed) + stats.ParticipantRtcConnectedPerSec = perSec(prevAverage.ParticipantRtcConnected, participantRTConnectedCNow, elapsed) stats.SysPacketsOutPerSec = perSec(uint64(prevAverage.SysPacketsOut), uint64(sysPackets), elapsed) stats.SysPacketsDroppedPerSec = perSec(uint64(prevAverage.SysPacketsDropped), uint64(sysDroppedPackets), elapsed) stats.TrackPublishAttemptsPerSec = perSec(uint64(prevAverage.NumTrackPublishAttempts), uint64(trackPublishAttemptsNow), elapsed) diff --git a/pkg/telemetry/prometheus/packets.go b/pkg/telemetry/prometheus/packets.go index 3611af924..b51d14231 100644 --- a/pkg/telemetry/prometheus/packets.go +++ b/pkg/telemetry/prometheus/packets.go @@ -17,15 +17,16 @@ const ( ) var ( - bytesIn atomic.Uint64 - bytesOut atomic.Uint64 - packetsIn atomic.Uint64 - packetsOut atomic.Uint64 - nackTotal atomic.Uint64 - retransmitBytes atomic.Uint64 - retransmitPackets atomic.Uint64 - participantJoin atomic.Uint64 - participantRTC atomic.Uint64 + bytesIn atomic.Uint64 + bytesOut atomic.Uint64 + packetsIn atomic.Uint64 + packetsOut atomic.Uint64 + nackTotal atomic.Uint64 + retransmitBytes atomic.Uint64 + retransmitPackets atomic.Uint64 + participantSignalConnected atomic.Uint64 + participantRTCConnected atomic.Uint64 + participantRTCInit atomic.Uint64 promPacketLabels = []string{"direction", "transmission"} promPacketTotal *prometheus.CounterVec @@ -191,15 +192,24 @@ func RecordRTT(direction Direction, trackSource livekit.TrackSource, trackType l } } -func IncrementParticipantJoin(join uint32, rtcConnected ...bool) { +func IncrementParticipantJoin(join uint32) { if join > 0 { - if len(rtcConnected) > 0 && rtcConnected[0] { - participantRTC.Add(uint64(join)) - promParticipantJoin.WithLabelValues("rtc_connected").Add(float64(join)) - } else { - participantJoin.Add(uint64(join)) - promParticipantJoin.WithLabelValues("signal_connected").Add(float64(join)) - } + participantSignalConnected.Add(uint64(join)) + promParticipantJoin.WithLabelValues("signal_connected").Add(float64(join)) + } +} + +func IncrementParticipantRtcInit(join uint32) { + if join > 0 { + participantRTCInit.Add(uint64(join)) + promParticipantJoin.WithLabelValues("rtc_init").Add(float64(join)) + } +} + +func IncrementParticipantRtcConnected(join uint32) { + if join > 0 { + participantRTCConnected.Add(uint64(join)) + promParticipantJoin.WithLabelValues("rtc_connected").Add(float64(join)) } }