Telemetry refactor (#172)

* telemetry refactor

* fix imports

* update protocol
This commit is contained in:
David Colburn
2021-11-08 18:00:34 -08:00
committed by GitHub
parent 8344466629
commit 289ebd32ff
25 changed files with 597 additions and 511 deletions
+1 -2
View File
@@ -15,7 +15,7 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/livekit/protocol v0.10.0
github.com/livekit/protocol v0.10.1-0.20211109000312-b3847c8d35ff
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0
@@ -28,7 +28,6 @@ require (
github.com/pion/rtp v1.7.2
github.com/pion/sdp/v3 v3.0.4
github.com/pion/stun v0.3.5
github.com/pion/transport v0.12.3
github.com/pion/turn/v2 v2.0.5
github.com/pion/webrtc/v3 v3.1.5
github.com/pkg/errors v0.9.1
+2 -2
View File
@@ -252,8 +252,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/ion-sfu v1.20.16 h1:B4+z0sf4t3zZSXFIwHive8malNn6Vje+7b1OW4ETDOM=
github.com/livekit/ion-sfu v1.20.16/go.mod h1:sUjL3tZRROs3NjCm6ZLT+IsisdYVRtxfq4OhVFHVd/A=
github.com/livekit/protocol v0.10.0 h1:s2zf1+G1Tcx6UKIf8mbRzbQ4ELdyS0mlLGsFkTVT5Aw=
github.com/livekit/protocol v0.10.0/go.mod h1:7ir9zSlgnrPQoGGNv4f8U/c9QrWh+ogC9B5xVbJNedM=
github.com/livekit/protocol v0.10.1-0.20211109000312-b3847c8d35ff h1:21SZ2sh5e7ELCVdXT01hlpdSZyNlwDv6KTOlcplBrQ8=
github.com/livekit/protocol v0.10.1-0.20211109000312-b3847c8d35ff/go.mod h1:7ir9zSlgnrPQoGGNv4f8U/c9QrWh+ogC9B5xVbJNedM=
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=
github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
+1 -1
View File
@@ -149,7 +149,7 @@ func Clean() {
// regenerate code
func Generate() error {
mg.Deps(installDeps)
mg.Deps(installDeps, generateWire)
fmt.Println("generating...")
+8 -8
View File
@@ -12,7 +12,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/routing/selector"
"github.com/livekit/livekit-server/pkg/utils/stats"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
const (
@@ -312,7 +312,7 @@ func (r *RedisRouter) statsWorker() {
// update periodically seconds
select {
case <-time.After(statsUpdateInterval):
if err := stats.UpdateCurrentNodeStats(r.currentNode.Stats); err != nil {
if err := prometheus.UpdateCurrentNodeStats(r.currentNode.Stats); err != nil {
logger.Errorw("could not update node stats", err, "nodeID", r.currentNode.Id)
}
if err := r.RegisterNode(); err != nil {
@@ -345,28 +345,28 @@ func (r *RedisRouter) redisWorker(startedChan chan struct{}) {
sm := livekit.SignalNodeMessage{}
if err := proto.Unmarshal([]byte(msg.Payload), &sm); err != nil {
logger.Errorw("could not unmarshal signal message on sigchan", err)
stats.PromMessageCounter.WithLabelValues("signal", "failure").Add(1)
prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1)
continue
}
if err := r.handleSignalMessage(&sm); err != nil {
logger.Errorw("error processing signal message", err)
stats.PromMessageCounter.WithLabelValues("signal", "failure").Add(1)
prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1)
continue
}
stats.PromMessageCounter.WithLabelValues("signal", "success").Add(1)
prometheus.MessageCounter.WithLabelValues("signal", "success").Add(1)
} else if msg.Channel == rtcChannel {
rm := livekit.RTCNodeMessage{}
if err := proto.Unmarshal([]byte(msg.Payload), &rm); err != nil {
logger.Errorw("could not unmarshal RTC message on rtcchan", err)
stats.PromMessageCounter.WithLabelValues("rtc", "failure").Add(1)
prometheus.MessageCounter.WithLabelValues("rtc", "failure").Add(1)
continue
}
if err := r.handleRTCMessage(&rm); err != nil {
logger.Errorw("error processing RTC message", err)
stats.PromMessageCounter.WithLabelValues("rtc", "failure").Add(1)
prometheus.MessageCounter.WithLabelValues("rtc", "failure").Add(1)
continue
}
stats.PromMessageCounter.WithLabelValues("rtc", "success").Add(1)
prometheus.MessageCounter.WithLabelValues("rtc", "success").Add(1)
}
}
}
+6 -9
View File
@@ -18,7 +18,7 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/utils/stats"
"github.com/livekit/livekit-server/pkg/telemetry"
)
var (
@@ -74,7 +74,7 @@ type MediaTrackParams struct {
BufferFactory *buffer.Factory
ReceiverConfig ReceiverConfig
AudioConfig config.AudioConfig
Stats *stats.RoomStatsReporter
Telemetry *telemetry.TelemetryService
Logger logger.Logger
}
@@ -247,7 +247,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
delete(t.subscribedTracks, sub.ID())
t.lock.Unlock()
t.params.Stats.SubSubscribedTrack(t.Kind().String())
t.params.Telemetry.UnsubscribedTrack(sub.ID(), sub.Identity(), t.ToProto())
// ignore if the subscribing sub is not connected
if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed {
@@ -293,7 +293,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
sub.Negotiate()
}()
t.params.Stats.AddSubscribedTrack(t.Kind().String())
t.params.Telemetry.SubscribedTrack(sub.ID(), sub.Identity(), t.ToProto())
return nil
}
@@ -367,12 +367,12 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
onclose := t.onClose
t.lock.Unlock()
t.RemoveAllSubscribers()
t.params.Stats.SubPublishedTrack(t.Kind().String())
t.params.Telemetry.UnpublishedTrack(t.params.ParticipantID, t.params.ParticipantIdentity, t.ToProto())
if onclose != nil {
onclose()
}
})
t.params.Stats.AddPublishedTrack(t.Kind().String())
t.params.Telemetry.PublishedTrack(t.params.ParticipantID, t.params.ParticipantIdentity, t.ToProto())
if t.Kind() == livekit.TrackType_AUDIO {
t.buffer = buff
@@ -523,9 +523,6 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) {
t.fracLostLock.Unlock()
}
if t.params.Stats != nil {
t.params.Stats.Incoming.HandleRTCP(packets)
}
// also look for sender reports
// feedback for the source RTCP
t.params.RTCPChan <- packets
+25 -20
View File
@@ -22,7 +22,8 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/utils/stats"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/livekit-server/version"
)
@@ -38,7 +39,7 @@ type ParticipantParams struct {
Sink routing.MessageSink
AudioConfig config.AudioConfig
ProtocolVersion types.ProtocolVersion
Stats *stats.RoomStatsReporter
Telemetry *telemetry.TelemetryService
ThrottleConfig config.PLIThrottleConfig
EnabledCodecs []*livekit.Codec
Hidden bool
@@ -115,20 +116,24 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
return nil, err
}
p.publisher, err = NewPCTransport(TransportParams{
Target: livekit.SignalTarget_PUBLISHER,
Config: params.Config,
Stats: p.params.Stats,
EnabledCodecs: p.params.EnabledCodecs,
Logger: params.Logger,
ParticipantID: p.id,
ParticipantIdentity: p.params.Identity,
Target: livekit.SignalTarget_PUBLISHER,
Config: params.Config,
Telemetry: p.params.Telemetry,
EnabledCodecs: p.params.EnabledCodecs,
Logger: params.Logger,
})
if err != nil {
return nil, err
}
p.subscriber, err = NewPCTransport(TransportParams{
Target: livekit.SignalTarget_SUBSCRIBER,
Config: params.Config,
Stats: p.params.Stats,
Logger: params.Logger,
ParticipantID: p.id,
ParticipantIdentity: p.params.Identity,
Target: livekit.SignalTarget_SUBSCRIBER,
Config: params.Config,
Telemetry: p.params.Telemetry,
Logger: params.Logger,
})
if err != nil {
return nil, err
@@ -283,7 +288,7 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web
)
if err = p.publisher.SetRemoteDescription(sdp); err != nil {
stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "remote_description").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "remote_description").Add(1)
return
}
@@ -291,13 +296,13 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web
answer, err = p.publisher.pc.CreateAnswer(nil)
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "create").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "create").Add(1)
err = errors.Wrap(err, "could not create answer")
return
}
if err = p.publisher.pc.SetLocalDescription(answer); err != nil {
stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "local_description").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "local_description").Add(1)
err = errors.Wrap(err, "could not set local description")
return
}
@@ -312,14 +317,14 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web
},
})
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "write_message").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "write_message").Add(1)
return
}
if p.State() == livekit.ParticipantInfo_JOINING {
p.updateState(livekit.ParticipantInfo_JOINED)
}
stats.PromServiceOperationCounter.WithLabelValues("answer", "success", "").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("answer", "success", "").Add(1)
return
}
@@ -890,9 +895,9 @@ func (p *ParticipantImpl) onOffer(offer webrtc.SessionDescription) {
},
})
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1)
} else {
stats.PromServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1)
}
}
@@ -938,7 +943,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
BufferFactory: p.params.Config.BufferFactory,
ReceiverConfig: p.params.Config.Receiver,
AudioConfig: p.params.AudioConfig,
Stats: p.params.Stats,
Telemetry: p.params.Telemetry,
Logger: p.params.Logger,
})
@@ -1084,7 +1089,7 @@ func (p *ParticipantImpl) handlePrimaryICEStateChange(state webrtc.ICEConnection
// p.params.Logger.Debugw("ICE connection state changed", "state", state.String(),
// "participant", p.identity, "pID", p.ID())
if state == webrtc.ICEConnectionStateConnected {
stats.PromServiceOperationCounter.WithLabelValues("ice_connection", "success", "").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("ice_connection", "success", "").Add(1)
p.updateState(livekit.ParticipantInfo_ACTIVE)
} else if state == webrtc.ICEConnectionStateFailed {
// only close when failed, to allow clients opportunity to reconnect
+12 -28
View File
@@ -8,15 +8,16 @@ import (
"time"
"github.com/go-logr/logr"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/pion/ion-sfu/pkg/buffer"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/utils/stats"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
const (
@@ -45,7 +46,7 @@ type Room struct {
// for active speaker updates
audioConfig *config.AudioConfig
statsReporter *stats.RoomStatsReporter
telemetry *telemetry.TelemetryService
onParticipantChanged func(p types.Participant)
onMetadataUpdate func(metadata string)
@@ -56,13 +57,13 @@ type ParticipantOptions struct {
AutoSubscribe bool
}
func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig) *Room {
func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig, telemetry *telemetry.TelemetryService) *Room {
r := &Room{
Room: proto.Clone(room).(*livekit.Room),
Logger: logger.Logger(logger.GetLogger().WithValues("room", room.Name)),
config: config,
audioConfig: audioConfig,
statsReporter: stats.NewRoomStatsReporter(),
telemetry: telemetry,
participants: make(map[string]types.Participant),
participantOpts: make(map[string]*ParticipantOptions),
bufferFactory: buffer.NewBufferFactory(config.Receiver.packetBufferSize, logr.Logger{}),
@@ -74,7 +75,7 @@ func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioC
if r.Room.CreationTime == 0 {
r.Room.CreationTime = time.Now().Unix()
}
r.statsReporter.RoomStarted()
go r.audioUpdateWorker()
go r.connectionQualityWorker()
@@ -118,10 +119,6 @@ func (r *Room) GetActiveSpeakers() []*livekit.SpeakerInfo {
return speakers
}
func (r *Room) GetStatsReporter() *stats.RoomStatsReporter {
return r.statsReporter
}
func (r *Room) GetBufferFactor() *buffer.Factory {
return r.bufferFactory
}
@@ -144,7 +141,7 @@ func (r *Room) LastLeftAt() int64 {
func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, iceServers []*livekit.ICEServer) error {
if r.IsClosed() {
stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "room_closed").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "room_closed").Add(1)
return ErrRoomClosed
}
@@ -152,12 +149,12 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice
defer r.lock.Unlock()
if r.participants[participant.Identity()] != nil {
stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "already_joined").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "already_joined").Add(1)
return ErrAlreadyJoined
}
if r.Room.MaxParticipants > 0 && int(r.Room.MaxParticipants) == len(r.participants) {
stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "max_exceeded").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "max_exceeded").Add(1)
return ErrMaxParticipantsExceeded
}
@@ -165,8 +162,6 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice
r.joinedAt.Store(time.Now().Unix())
}
r.statsReporter.AddParticipant()
// it's important to set this before connection, we don't want to miss out on any publishedTracks
participant.OnTrackPublished(r.onTrackPublished)
participant.OnStateChange(func(p types.Participant, oldState livekit.ParticipantInfo_State) {
@@ -223,7 +218,7 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice
})
if err := participant.SendJoinResponse(r.Room, otherParticipants, iceServers); err != nil {
stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1)
return err
}
@@ -232,7 +227,7 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice
participant.Negotiate()
}
stats.PromServiceOperationCounter.WithLabelValues("participant_join", "success", "").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "success", "").Add(1)
return nil
}
@@ -266,7 +261,6 @@ func (r *Room) RemoveParticipant(identity string) {
if !ok {
return
}
r.statsReporter.SubParticipant()
// send broadcast only if it's not already closed
sendUpdates := p.State() != livekit.ParticipantInfo_DISCONNECTED
@@ -374,22 +368,12 @@ func (r *Room) Close() {
r.closeOnce.Do(func() {
close(r.closed)
r.Logger.Infow("closing room", "roomID", r.Room.Sid, "room", r.Room.Name)
r.statsReporter.RoomEnded()
if r.onClose != nil {
r.onClose()
}
})
}
func (r *Room) GetIncomingStats() stats.PacketStats {
return *r.statsReporter.Incoming
}
func (r *Room) GetOutgoingStats() stats.PacketStats {
return *r.statsReporter.Outgoing
}
func (r *Room) OnClose(f func()) {
r.onClose = f
}
+2
View File
@@ -13,6 +13,7 @@ import (
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/rtc/types/typesfakes"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/testutils"
)
@@ -551,6 +552,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room {
UpdateInterval: audioUpdateInterval,
SmoothIntervals: opts.audioSmoothIntervals,
},
telemetry.NewTelemetryService(nil),
)
for i := 0; i < opts.num+opts.numHidden; i++ {
identity := fmt.Sprintf("p%d", i)
+14 -19
View File
@@ -10,7 +10,8 @@ import (
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
"github.com/livekit/livekit-server/pkg/utils/stats"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
const (
@@ -40,11 +41,13 @@ type PCTransport struct {
}
type TransportParams struct {
Target livekit.SignalTarget
Config *WebRTCConfig
Stats *stats.RoomStatsReporter
EnabledCodecs []*livekit.Codec
Logger logger.Logger
ParticipantID string
ParticipantIdentity string
Target livekit.SignalTarget
Config *WebRTCConfig
Telemetry *telemetry.TelemetryService
EnabledCodecs []*livekit.Codec
Logger logger.Logger
}
func newPeerConnection(params TransportParams) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) {
@@ -60,18 +63,10 @@ func newPeerConnection(params TransportParams) (*webrtc.PeerConnection, *webrtc.
}
se := params.Config.SettingEngine
se.DisableMediaEngineCopy(true)
if params.Stats != nil && se.BufferFactory != nil {
wrapper := &stats.StatsBufferWrapper{
CreateBufferFunc: se.BufferFactory,
Stats: params.Stats.Incoming,
}
se.BufferFactory = wrapper.CreateBuffer
}
ir := &interceptor.Registry{}
if params.Stats != nil && params.Target == livekit.SignalTarget_SUBSCRIBER {
// only capture subscriber for outbound streams
f := stats.NewStatsInterceptorFactory(params.Stats)
if params.Telemetry != nil {
f := params.Telemetry.NewStatsInterceptorFactory(params.ParticipantID, params.ParticipantIdentity)
ir.Add(f)
}
api := webrtc.NewAPI(
@@ -208,7 +203,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
if iceRestart && currentSD != nil {
t.logger.Debugw("recovering from client negotiation state")
if err := t.pc.SetRemoteDescription(*currentSD); err != nil {
stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "remote_description").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "remote_description").Add(1)
return err
}
} else {
@@ -223,14 +218,14 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
offer, err := t.pc.CreateOffer(options)
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "create").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "create").Add(1)
t.logger.Errorw("could not create offer", err)
return err
}
err = t.pc.SetLocalDescription(offer)
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1)
t.logger.Errorw("could not set local description", err)
return err
}
+8 -6
View File
@@ -14,9 +14,10 @@ import (
func TestMissingAnswerDuringICERestart(t *testing.T) {
params := TransportParams{
Target: livekit.SignalTarget_PUBLISHER,
Config: &WebRTCConfig{},
Stats: nil,
ParticipantID: "id",
ParticipantIdentity: "identity",
Target: livekit.SignalTarget_PUBLISHER,
Config: &WebRTCConfig{},
}
transportA, err := NewPCTransport(params)
require.NoError(t, err)
@@ -64,9 +65,10 @@ func TestMissingAnswerDuringICERestart(t *testing.T) {
func TestNegotiationTiming(t *testing.T) {
params := TransportParams{
Target: livekit.SignalTarget_SUBSCRIBER,
Config: &WebRTCConfig{},
Stats: nil,
ParticipantID: "id",
ParticipantIdentity: "identity",
Target: livekit.SignalTarget_SUBSCRIBER,
Config: &WebRTCConfig{},
}
transportA, err := NewPCTransport(params)
require.NoError(t, err)
+11 -35
View File
@@ -3,28 +3,28 @@ package service
import (
"context"
"errors"
"time"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/recording"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/livekit/livekit-server/pkg/telemetry"
)
type RecordingService struct {
bus utils.MessageBus
notifier webhook.Notifier
shutdown chan struct{}
bus utils.MessageBus
telemetry *telemetry.TelemetryService
shutdown chan struct{}
}
func NewRecordingService(mb utils.MessageBus, notifier webhook.Notifier) *RecordingService {
func NewRecordingService(mb utils.MessageBus, telemetry *telemetry.TelemetryService) *RecordingService {
return &RecordingService{
bus: mb,
notifier: notifier,
shutdown: make(chan struct{}, 1),
bus: mb,
telemetry: telemetry,
shutdown: make(chan struct{}, 1),
}
}
@@ -62,6 +62,7 @@ func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.Star
return nil, err
}
s.telemetry.RecordingStarted(ctx, recordingId, req)
return &livekit.StartRecordingResponse{RecordingId: recordingId}, nil
}
@@ -141,35 +142,10 @@ func (s *RecordingService) resultsWorker() {
logger.Errorw("failed to read results", err)
continue
}
s.notify(res)
s.telemetry.RecordingEnded(res)
case <-s.shutdown:
_ = sub.Close()
return
}
}
}
func (s *RecordingService) notify(res *livekit.RecordingResult) {
// log results
values := []interface{}{"id", res.Id}
if res.Error != "" {
values = append(values, "error", res.Error)
} else {
values = append(values, "duration", time.Duration(res.Duration*1e9))
if res.DownloadUrl != "" {
values = append(values, "url", res.DownloadUrl)
}
}
logger.Debugw("received recording result", values...)
// webhook
if s.notifier != nil {
event := webhook.EventRecordingFinished
if err := s.notifier.Notify(&livekit.WebhookEvent{
Event: event,
RecordingResult: res,
}); err != nil {
logger.Warnw("could not notify webhook", err, "event", event)
}
}
}
+15 -47
View File
@@ -6,15 +6,14 @@ import (
"sync"
"time"
"github.com/gammazero/workerpool"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/webhook"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/telemetry"
)
const (
@@ -29,15 +28,14 @@ type LocalRoomManager struct {
lock sync.RWMutex
router routing.Router
currentNode routing.LocalNode
notifier webhook.Notifier
rtcConfig *rtc.WebRTCConfig
config *config.Config
webhookPool *workerpool.WorkerPool
telemetry *telemetry.TelemetryService
rooms map[string]*rtc.Room
}
func NewLocalRoomManager(conf *config.Config, rs RoomStore, router routing.Router, currentNode routing.LocalNode,
notifier webhook.Notifier) (*LocalRoomManager, error) {
telemetry *telemetry.TelemetryService) (*LocalRoomManager, error) {
rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip)
if err != nil {
@@ -50,9 +48,8 @@ func NewLocalRoomManager(conf *config.Config, rs RoomStore, router routing.Route
rtcConfig: rtcConf,
config: conf,
router: router,
notifier: notifier,
currentNode: currentNode,
webhookPool: workerpool.New(1),
telemetry: telemetry,
rooms: make(map[string]*rtc.Room),
}
@@ -228,7 +225,7 @@ func (r *LocalRoomManager) StartSession(ctx context.Context, roomName string, pi
Sink: responseSink,
AudioConfig: r.config.Audio,
ProtocolVersion: pv,
Stats: room.GetStatsReporter(),
Telemetry: r.telemetry,
ThrottleConfig: r.config.RTC.PLIThrottle,
EnabledCodecs: room.Room.EnabledCodecs,
Hidden: pi.Hidden,
@@ -255,6 +252,11 @@ func (r *LocalRoomManager) StartSession(ctx context.Context, roomName string, pi
return
}
r.telemetry.ParticipantJoined(ctx, room.Room, participant.ToProto())
participant.OnClose(func(p types.Participant) {
r.telemetry.ParticipantLeft(ctx, room.Room, p.ToProto())
})
go r.rtcSessionWorker(room, participant, requestSource)
}
@@ -275,22 +277,16 @@ func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string)
}
// construct ice servers
room = rtc.NewRoom(ri, *r.rtcConfig, &r.config.Audio)
room = rtc.NewRoom(ri, *r.rtcConfig, &r.config.Audio, r.telemetry)
r.telemetry.RoomStarted(ctx, room.Room)
room.OnClose(func() {
r.telemetry.RoomEnded(ctx, room.Room)
if err := r.DeleteRoom(ctx, roomName); err != nil {
logger.Errorw("could not delete room", err)
}
r.notifyEvent(&livekit.WebhookEvent{
Event: webhook.EventRoomFinished,
Room: room.Room,
})
// print stats
logger.Infow("room closed",
"incomingStats", room.GetIncomingStats().Copy(),
"outgoingStats", room.GetOutgoingStats().Copy(),
)
logger.Infow("room closed")
})
room.OnMetadataUpdate(func(metadata string) {
err := r.StoreRoom(ctx, room.Room)
@@ -313,11 +309,6 @@ func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string)
r.rooms[roomName] = room
r.lock.Unlock()
r.notifyEvent(&livekit.WebhookEvent{
Event: webhook.EventRoomStarted,
Room: room.Room,
})
return room, nil
}
@@ -331,20 +322,9 @@ func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Pa
"roomID", room.Room.Sid,
)
_ = participant.Close()
r.notifyEvent(&livekit.WebhookEvent{
Event: webhook.EventParticipantLeft,
Room: room.Room,
Participant: participant.ToProto(),
})
}()
defer rtc.Recover()
r.notifyEvent(&livekit.WebhookEvent{
Event: webhook.EventParticipantJoined,
Room: room.Room,
Participant: participant.ToProto(),
})
for {
select {
case <-time.After(time.Millisecond * 50):
@@ -569,18 +549,6 @@ func (r *LocalRoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICESer
return iceServers
}
func (r *LocalRoomManager) notifyEvent(event *livekit.WebhookEvent) {
if r.notifier == nil {
return
}
r.webhookPool.Submit(func() {
if err := r.notifier.Notify(event); err != nil {
logger.Warnw("could not notify webhook", err, "event", event.Event)
}
})
}
func iceServerForStunServers(servers []string) *livekit.ICEServer {
iceServer := &livekit.ICEServer{}
for _, stunServer := range servers {
+6 -6
View File
@@ -16,7 +16,7 @@ import (
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/utils/stats"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
type RTCService struct {
@@ -93,7 +93,7 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit,
func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// reject non websocket requests
if !websocket.IsWebSocketUpgrade(r) {
stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "reject").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "reject").Add(1)
w.WriteHeader(404)
return
}
@@ -107,7 +107,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// create room if it doesn't exist, also assigns an RTC node for the room
rm, err := s.roomAllocator.CreateRoom(r.Context(), &livekit.CreateRoomRequest{Name: roomName})
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "create_room").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "create_room").Add(1)
handleError(w, http.StatusInternalServerError, err.Error())
return
}
@@ -115,7 +115,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// this needs to be started first *before* using router functions on this node
connId, reqSink, resSource, err := s.router.StartParticipantSignal(r.Context(), roomName, pi)
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "start_signal").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "start_signal").Add(1)
handleError(w, http.StatusInternalServerError, "could not start session: "+err.Error())
return
}
@@ -131,7 +131,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// upgrade only once the basics are good to go
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "upgrade").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "upgrade").Add(1)
logger.Warnw("could not upgrade to WS", err)
handleError(w, http.StatusInternalServerError, err.Error())
return
@@ -141,7 +141,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
sigConn.useJSON = false
}
stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "success", "").Add(1)
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "success", "").Add(1)
logger.Infow("new client WS connected",
"connID", connId,
"roomID", rm.Sid,
+2
View File
@@ -20,6 +20,7 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/telemetry"
)
func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) {
@@ -30,6 +31,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
createKeyProvider,
createWebhookNotifier,
routing.CreateRouter,
telemetry.NewTelemetryService,
NewRecordingService,
NewRoomAllocator,
NewRoomService,
+5 -4
View File
@@ -1,8 +1,7 @@
// Code generated by Wire. DO NOT EDIT.
//go:generate go run github.com/google/wire/cmd/wire
//go:build !wireinject
// +build !wireinject
//+build !wireinject
package service
@@ -12,6 +11,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
@@ -46,9 +46,10 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
recordingService := NewRecordingService(messageBus, notifier)
telemetryService := telemetry.NewTelemetryService(notifier)
recordingService := NewRecordingService(messageBus, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, router, currentNode)
localRoomManager, err := NewLocalRoomManager(conf, roomStore, router, currentNode, notifier)
localRoomManager, err := NewLocalRoomManager(conf, roomStore, router, currentNode, telemetryService)
if err != nil {
return nil, err
}
+139
View File
@@ -0,0 +1,139 @@
package telemetry
import (
"context"
"time"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/webhook"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
func (s *TelemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
s.pool.Submit(prometheus.RoomStarted)
s.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRoomStarted,
Room: room,
})
// TODO: analytics service
}
func (s *TelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
s.pool.Submit(func() {
prometheus.RoomEnded(time.Unix(room.CreationTime, 0))
})
s.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRoomFinished,
Room: room,
})
// TODO: analytics service
}
func (s *TelemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
s.pool.Submit(prometheus.AddParticipant)
s.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventParticipantJoined,
Room: room,
Participant: participant,
})
// TODO: analytics service
}
func (s *TelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
s.pool.Submit(prometheus.SubParticipant)
s.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventParticipantLeft,
Room: room,
Participant: participant,
})
// TODO: analytics service
}
func (s *TelemetryService) PublishedTrack(SID, identity string, track *livekit.TrackInfo) {
s.pool.Submit(func() {
prometheus.AddPublishedTrack(track.Type.String())
})
// TODO: analytics service
}
func (s *TelemetryService) UnpublishedTrack(SID, identity string, track *livekit.TrackInfo) {
s.pool.Submit(func() {
prometheus.SubPublishedTrack(track.Type.String())
})
// TODO: analytics service
}
func (s *TelemetryService) SubscribedTrack(SID, identity string, track *livekit.TrackInfo) {
s.pool.Submit(func() {
prometheus.AddSubscribedTrack(track.Type.String())
})
// TODO: analytics service
}
func (s *TelemetryService) UnsubscribedTrack(SID, identity string, track *livekit.TrackInfo) {
s.pool.Submit(func() {
prometheus.SubSubscribedTrack(track.Type.String())
})
// TODO: analytics service
}
func (s *TelemetryService) RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) {
logger.Debugw("recording started", "recordingID", recordingID)
s.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRecordingStarted,
RecordingInfo: &livekit.RecordingInfo{
Id: recordingID,
Request: req,
},
})
// TODO: analytics service
}
func (s *TelemetryService) RecordingEnded(res *livekit.RecordingResult) {
// log results
values := []interface{}{"recordingID", res.Id}
if res.Error != "" {
values = append(values, "error", res.Error)
} else {
values = append(values, "duration", time.Duration(res.Duration*1e9))
if res.DownloadUrl != "" {
values = append(values, "url", res.DownloadUrl)
}
}
logger.Debugw("recording ended", values...)
s.notifyEvent(context.Background(), &livekit.WebhookEvent{
Event: webhook.EventRecordingFinished,
RecordingResult: res,
})
// TODO: analytics service
}
func (s *TelemetryService) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) {
if s.notifier == nil {
return
}
s.pool.Submit(func() {
if err := s.notifier.Notify(ctx, event); err != nil {
logger.Warnw("failed to notify webhook", err, "event", event.Event)
}
})
}
+86
View File
@@ -0,0 +1,86 @@
package prometheus
import (
"sync/atomic"
"time"
livekit "github.com/livekit/protocol/proto"
"github.com/prometheus/client_golang/prometheus"
)
const livekitNamespace = "livekit"
var (
MessageCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "node",
Name: "messages",
},
[]string{"type", "status"},
)
ServiceOperationCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "node",
Name: "service_operation",
},
[]string{"type", "status", "error_type"},
)
)
func init() {
prometheus.MustRegister(MessageCounter)
prometheus.MustRegister(ServiceOperationCounter)
initPacketStats()
initRoomStats()
}
func UpdateCurrentNodeStats(nodeStats *livekit.NodeStats) error {
updatedAtPrevious := nodeStats.UpdatedAt
nodeStats.UpdatedAt = time.Now().Unix()
secondsSinceLastUpdate := nodeStats.UpdatedAt - updatedAtPrevious
err := updateCurrentNodeSystemStats(nodeStats)
updateCurrentNodeRoomStats(nodeStats)
updateCurrentNodePacketStats(nodeStats, secondsSinceLastUpdate)
return err
}
func updateCurrentNodeRoomStats(nodeStats *livekit.NodeStats) {
nodeStats.NumClients = atomic.LoadInt32(&atomicParticipantTotal)
nodeStats.NumRooms = atomic.LoadInt32(&atomicRoomTotal)
nodeStats.NumTracksIn = atomic.LoadInt32(&atomicTrackPublishedTotal)
nodeStats.NumTracksOut = atomic.LoadInt32(&atomicTrackSubscribedTotal)
}
func updateCurrentNodePacketStats(nodeStats *livekit.NodeStats, secondsSinceLastUpdate int64) {
if secondsSinceLastUpdate == 0 {
return
}
bytesInPrevious := nodeStats.BytesIn
bytesOutPrevious := nodeStats.BytesOut
packetsInPrevious := nodeStats.PacketsIn
packetsOutPrevious := nodeStats.PacketsOut
nackTotalPrevious := nodeStats.NackTotal
nodeStats.BytesIn = atomic.LoadUint64(&atomicBytesIn)
nodeStats.BytesOut = atomic.LoadUint64(&atomicBytesOut)
nodeStats.PacketsIn = atomic.LoadUint64(&atomicPacketsIn)
nodeStats.PacketsOut = atomic.LoadUint64(&atomicPacketsOut)
nodeStats.NackTotal = atomic.LoadUint64(&atomicNackTotal)
nodeStats.BytesInPerSec = perSec(bytesInPrevious, nodeStats.BytesIn, secondsSinceLastUpdate)
nodeStats.BytesOutPerSec = perSec(bytesOutPrevious, nodeStats.BytesOut, secondsSinceLastUpdate)
nodeStats.PacketsInPerSec = perSec(packetsInPrevious, nodeStats.PacketsIn, secondsSinceLastUpdate)
nodeStats.PacketsOutPerSec = perSec(packetsOutPrevious, nodeStats.PacketsOut, secondsSinceLastUpdate)
nodeStats.NackPerSec = perSec(nackTotalPrevious, nodeStats.NackTotal, secondsSinceLastUpdate)
}
func perSec(prev, curr uint64, secs int64) float32 {
return float32(curr-prev) / float32(secs)
}
@@ -1,6 +1,7 @@
//go:build linux
// +build linux
package stats
package prometheus
import (
linuxproc "github.com/c9s/goprocinfo/linux"
@@ -1,6 +1,7 @@
//go:build !linux
// +build !linux
package stats
package prometheus
import livekit "github.com/livekit/protocol/proto"
+83
View File
@@ -0,0 +1,83 @@
package prometheus
import (
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
)
type Direction string
const (
Incoming Direction = "incoming"
Outgoing Direction = "outgoing"
)
var (
atomicBytesIn uint64
atomicBytesOut uint64
atomicPacketsIn uint64
atomicPacketsOut uint64
atomicNackTotal uint64
promPacketLabels = []string{"direction"}
promPacketTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "packet",
Name: "total",
}, promPacketLabels)
promPacketBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "packet",
Name: "bytes",
}, promPacketLabels)
promNackTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "nack",
Name: "total",
}, promPacketLabels)
promPliTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "pli",
Name: "total",
}, promPacketLabels)
promFirTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "fir",
Name: "total",
}, promPacketLabels)
)
func initPacketStats() {
prometheus.MustRegister(promPacketTotal)
prometheus.MustRegister(promPacketBytes)
prometheus.MustRegister(promNackTotal)
prometheus.MustRegister(promPliTotal)
prometheus.MustRegister(promFirTotal)
}
func IncrementPackets(direction Direction, pktLen uint64) {
promPacketTotal.WithLabelValues(string(direction)).Add(1)
promPacketBytes.WithLabelValues(string(direction)).Add(float64(pktLen))
if direction == Incoming {
atomic.AddUint64(&atomicPacketsIn, 1)
atomic.AddUint64(&atomicBytesIn, pktLen)
} else {
atomic.AddUint64(&atomicPacketsOut, 1)
atomic.AddUint64(&atomicBytesOut, pktLen)
}
}
func IncrementNack(direction Direction) {
promNackTotal.WithLabelValues(string(direction)).Add(1)
atomic.AddUint64(&atomicNackTotal, 1)
}
func IncrementPLI(direction Direction) {
promPliTotal.WithLabelValues(string(direction)).Add(1)
}
func IncrementFIR(direction Direction) {
promFirTotal.WithLabelValues(string(direction)).Add(1)
}
@@ -1,10 +1,9 @@
package stats
package prometheus
import (
"sync/atomic"
"time"
livekit "github.com/livekit/protocol/proto"
"github.com/prometheus/client_golang/prometheus"
)
@@ -44,7 +43,7 @@ var (
}, []string{"kind"})
)
func initRoomStatsReporter() {
func initRoomStats() {
prometheus.MustRegister(promRoomTotal)
prometheus.MustRegister(promRoomDuration)
prometheus.MustRegister(promParticipantTotal)
@@ -52,68 +51,45 @@ func initRoomStatsReporter() {
prometheus.MustRegister(promTrackSubscribedTotal)
}
// RoomStatsReporter is created for each room
type RoomStatsReporter struct {
roomName string
startedAt time.Time
Incoming *PacketStats
Outgoing *PacketStats
}
func NewRoomStatsReporter() *RoomStatsReporter {
return &RoomStatsReporter{
Incoming: newPacketStats("incoming"),
Outgoing: newPacketStats("outgoing"),
}
}
func (r *RoomStatsReporter) RoomStarted() {
r.startedAt = time.Now()
func RoomStarted() {
promRoomTotal.Add(1)
atomic.AddInt32(&atomicRoomTotal, 1)
}
func (r *RoomStatsReporter) RoomEnded() {
if !r.startedAt.IsZero() {
promRoomDuration.Observe(float64(time.Now().Sub(r.startedAt)) / float64(time.Second))
func RoomEnded(startedAt time.Time) {
if !startedAt.IsZero() {
promRoomDuration.Observe(float64(time.Now().Sub(startedAt)) / float64(time.Second))
}
promRoomTotal.Sub(1)
atomic.AddInt32(&atomicRoomTotal, -1)
}
func (r *RoomStatsReporter) AddParticipant() {
func AddParticipant() {
promParticipantTotal.Add(1)
atomic.AddInt32(&atomicParticipantTotal, 1)
}
func (r *RoomStatsReporter) SubParticipant() {
func SubParticipant() {
promParticipantTotal.Sub(1)
atomic.AddInt32(&atomicParticipantTotal, -1)
}
func (r *RoomStatsReporter) AddPublishedTrack(kind string) {
func AddPublishedTrack(kind string) {
promTrackPublishedTotal.WithLabelValues(kind).Add(1)
atomic.AddInt32(&atomicTrackPublishedTotal, 1)
}
func (r *RoomStatsReporter) SubPublishedTrack(kind string) {
func SubPublishedTrack(kind string) {
promTrackPublishedTotal.WithLabelValues(kind).Sub(1)
atomic.AddInt32(&atomicTrackPublishedTotal, -1)
}
func (r *RoomStatsReporter) AddSubscribedTrack(kind string) {
func AddSubscribedTrack(kind string) {
promTrackSubscribedTotal.WithLabelValues(kind).Add(1)
atomic.AddInt32(&atomicTrackSubscribedTotal, 1)
}
func (r *RoomStatsReporter) SubSubscribedTrack(kind string) {
func SubSubscribedTrack(kind string) {
promTrackSubscribedTotal.WithLabelValues(kind).Sub(1)
atomic.AddInt32(&atomicTrackSubscribedTotal, -1)
}
func updateCurrentNodeRoomStats(nodeStats *livekit.NodeStats) {
nodeStats.NumClients = atomic.LoadInt32(&atomicParticipantTotal)
nodeStats.NumRooms = atomic.LoadInt32(&atomicRoomTotal)
nodeStats.NumTracksIn = atomic.LoadInt32(&atomicTrackPublishedTotal)
nodeStats.NumTracksOut = atomic.LoadInt32(&atomicTrackSubscribedTotal)
}
+91
View File
@@ -0,0 +1,91 @@
package telemetry
import (
"github.com/gammazero/workerpool"
"github.com/livekit/protocol/webhook"
"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)
type TelemetryService struct {
notifier webhook.Notifier
pool *workerpool.WorkerPool
}
func NewTelemetryService(notifier webhook.Notifier) *TelemetryService {
return &TelemetryService{
notifier: notifier,
pool: workerpool.New(10),
}
}
type StatsInterceptorFactory struct {
t *TelemetryService
participantID string
identity string
}
func (s *TelemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory {
return &StatsInterceptorFactory{
t: s,
participantID: participantID,
identity: identity,
}
}
func (f *StatsInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
return &StatsInterceptor{
t: f.t,
participantID: f.participantID,
identity: f.identity,
}, nil
}
type StatsInterceptor struct {
interceptor.NoOp
t *TelemetryService
participantID string
identity string
}
// --- Incoming ---
// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
// change in the future. The returned method will be called once per packet batch.
func (s *StatsInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
return interceptor.RTCPReaderFunc(func(bytes []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
s.t.HandleIncomingRTCP(s.participantID, s.identity, bytes)
return reader.Read(bytes, attributes)
})
}
// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (s *StatsInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
return interceptor.RTPReaderFunc(func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
s.t.HandleIncomingRTP(s.participantID, s.identity, uint64(len(payload)))
return reader.Read(payload, attributes)
})
}
// --- Outgoing ---
// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
// will be called once per packet batch.
func (s *StatsInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
return interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
s.t.HandleOutgoingRTCP(s.participantID, s.identity, pkts)
return writer.Write(pkts, attributes)
})
}
// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
// will be called once per rtp packet.
func (s *StatsInterceptor) BindLocalStream(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
s.t.HandleOutgoingRTP(s.participantID, s.identity, uint64(len(payload)))
return writer.Write(header, payload, attributes)
})
}
+64
View File
@@ -0,0 +1,64 @@
package telemetry
import (
"github.com/livekit/protocol/logger"
"github.com/pion/rtcp"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
func (s *TelemetryService) HandleIncomingRTCP(participantID, identity string, bytes []byte) {
pkts, err := rtcp.Unmarshal(bytes)
if err != nil {
logger.Errorw("Interceptor failed to unmarshal rtcp packets", err)
return
}
s.pool.Submit(func() {
for _, pkt := range pkts {
switch pkt.(type) {
case *rtcp.TransportLayerNack:
prometheus.IncrementNack(prometheus.Incoming)
case *rtcp.PictureLossIndication:
prometheus.IncrementPLI(prometheus.Incoming)
case *rtcp.FullIntraRequest:
prometheus.IncrementFIR(prometheus.Incoming)
}
}
})
// TODO: analytics service
}
func (s *TelemetryService) HandleIncomingRTP(participantID, identity string, pktLen uint64) {
s.pool.Submit(func() {
prometheus.IncrementPackets(prometheus.Incoming, pktLen)
})
// TODO: analytics service
}
func (s *TelemetryService) HandleOutgoingRTCP(participantID, identity string, pkts []rtcp.Packet) {
s.pool.Submit(func() {
for _, pkt := range pkts {
switch pkt.(type) {
case *rtcp.TransportLayerNack:
prometheus.IncrementNack(prometheus.Outgoing)
case *rtcp.PictureLossIndication:
prometheus.IncrementPLI(prometheus.Outgoing)
case *rtcp.FullIntraRequest:
prometheus.IncrementFIR(prometheus.Outgoing)
}
}
})
// TODO: analytics service
}
func (s *TelemetryService) HandleOutgoingRTP(participantID, identity string, pktLen uint64) {
s.pool.Submit(func() {
prometheus.IncrementPackets(prometheus.Outgoing, pktLen)
})
// TODO: analytics service
}
-157
View File
@@ -1,157 +0,0 @@
package stats
import (
"sync/atomic"
livekit "github.com/livekit/protocol/proto"
"github.com/pion/rtcp"
"github.com/prometheus/client_golang/prometheus"
)
var (
atomicBytesIn uint64
atomicBytesOut uint64
atomicPacketsIn uint64
atomicPacketsOut uint64
atomicNackTotal uint64
promPacketLabels = []string{"direction"}
promPacketTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "packet",
Name: "total",
}, promPacketLabels)
promPacketBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "packet",
Name: "bytes",
}, promPacketLabels)
promNackTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "nack",
Name: "total",
}, promPacketLabels)
promPliTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "pli",
Name: "total",
}, promPacketLabels)
promFirTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "fir",
Name: "total",
}, promPacketLabels)
)
func initPacketStats() {
prometheus.MustRegister(promPacketTotal)
prometheus.MustRegister(promPacketBytes)
prometheus.MustRegister(promNackTotal)
prometheus.MustRegister(promPliTotal)
prometheus.MustRegister(promFirTotal)
}
type PacketStats struct {
direction string // incoming or outgoing
PacketBytes uint64 `json:"packetBytes"`
PacketTotal uint64 `json:"packetTotal"`
NackTotal uint64 `json:"nackTotal"`
PLITotal uint64 `json:"pliTotal"`
FIRTotal uint64 `json:"firTotal"`
}
func newPacketStats(direction string) *PacketStats {
return &PacketStats{
direction: direction,
}
}
func (s *PacketStats) IncrementBytes(bytes uint64) {
promPacketBytes.WithLabelValues(s.direction).Add(float64(bytes))
atomic.AddUint64(&s.PacketBytes, bytes)
if s.direction == "incoming" {
atomic.AddUint64(&atomicBytesIn, bytes)
} else {
atomic.AddUint64(&atomicBytesOut, bytes)
}
}
func (s *PacketStats) IncrementPackets(count uint64) {
promPacketTotal.WithLabelValues(s.direction).Add(float64(count))
atomic.AddUint64(&s.PacketTotal, count)
if s.direction == "incoming" {
atomic.AddUint64(&atomicPacketsIn, count)
} else {
atomic.AddUint64(&atomicPacketsOut, count)
}
}
func (s *PacketStats) IncrementNack(count uint64) {
promNackTotal.WithLabelValues(s.direction).Add(float64(count))
atomic.AddUint64(&s.NackTotal, count)
atomic.AddUint64(&atomicNackTotal, count)
}
func (s *PacketStats) IncrementPLI(count uint64) {
promPliTotal.WithLabelValues(s.direction).Add(float64(count))
atomic.AddUint64(&s.PLITotal, count)
}
func (s *PacketStats) IncrementFIR(count uint64) {
promFirTotal.WithLabelValues(s.direction).Add(float64(count))
atomic.AddUint64(&s.FIRTotal, count)
}
func (s *PacketStats) HandleRTCP(pkts []rtcp.Packet) {
for _, rtcpPacket := range pkts {
switch rtcpPacket.(type) {
case *rtcp.TransportLayerNack:
s.IncrementNack(1)
case *rtcp.PictureLossIndication:
s.IncrementPLI(1)
case *rtcp.FullIntraRequest:
s.IncrementFIR(1)
}
}
}
func (s PacketStats) Copy() *PacketStats {
return &PacketStats{
direction: s.direction,
PacketBytes: atomic.LoadUint64(&s.PacketBytes),
PacketTotal: atomic.LoadUint64(&s.PacketTotal),
NackTotal: atomic.LoadUint64(&s.NackTotal),
PLITotal: atomic.LoadUint64(&s.PLITotal),
FIRTotal: atomic.LoadUint64(&s.FIRTotal),
}
}
func updateCurrentNodePacketStats(nodeStats *livekit.NodeStats, secondsSinceLastUpdate int64) {
if secondsSinceLastUpdate == 0 {
return
}
bytesInPrevious := nodeStats.BytesIn
bytesOutPrevious := nodeStats.BytesOut
packetsInPrevious := nodeStats.PacketsIn
packetsOutPrevious := nodeStats.PacketsOut
nackTotalPrevious := nodeStats.NackTotal
nodeStats.BytesIn = atomic.LoadUint64(&atomicBytesIn)
nodeStats.BytesOut = atomic.LoadUint64(&atomicBytesOut)
nodeStats.PacketsIn = atomic.LoadUint64(&atomicPacketsIn)
nodeStats.PacketsOut = atomic.LoadUint64(&atomicPacketsOut)
nodeStats.NackTotal = atomic.LoadUint64(&atomicNackTotal)
nodeStats.BytesInPerSec = perSec(bytesInPrevious, nodeStats.BytesIn, secondsSinceLastUpdate)
nodeStats.BytesOutPerSec = perSec(bytesOutPrevious, nodeStats.BytesOut, secondsSinceLastUpdate)
nodeStats.PacketsInPerSec = perSec(packetsInPrevious, nodeStats.PacketsIn, secondsSinceLastUpdate)
nodeStats.PacketsOutPerSec = perSec(packetsOutPrevious, nodeStats.PacketsOut, secondsSinceLastUpdate)
nodeStats.NackPerSec = perSec(nackTotalPrevious, nodeStats.NackTotal, secondsSinceLastUpdate)
}
func perSec(prev, curr uint64, secs int64) float32 {
return float32(curr-prev) / float32(secs)
}
-129
View File
@@ -1,129 +0,0 @@
package stats
import (
"io"
"time"
livekit "github.com/livekit/protocol/proto"
"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/transport/packetio"
"github.com/prometheus/client_golang/prometheus"
)
const livekitNamespace = "livekit"
var (
PromMessageCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "node",
Name: "messages",
},
[]string{"type", "status"},
)
PromServiceOperationCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "node",
Name: "service_operation",
},
[]string{"type", "status", "error_type"},
)
)
func init() {
prometheus.MustRegister(PromMessageCounter)
prometheus.MustRegister(PromServiceOperationCounter)
initPacketStats()
initRoomStatsReporter()
}
// StatsBufferWrapper wraps a buffer factory so we could get information on
// incoming packets
type StatsBufferWrapper struct {
CreateBufferFunc func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser
Stats *PacketStats
}
func (w *StatsBufferWrapper) CreateBuffer(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser {
writer := w.CreateBufferFunc(packetType, ssrc)
if packetType == packetio.RTPBufferPacket {
// wrap this in a counter class
return &rtpReporterWriter{
ReadWriteCloser: writer,
stats: w.Stats,
}
}
return writer
}
type rtpReporterWriter struct {
io.ReadWriteCloser
stats *PacketStats
}
func (w *rtpReporterWriter) Write(p []byte) (n int, err error) {
w.stats.IncrementPackets(1)
w.stats.IncrementBytes(uint64(len(p)))
return w.ReadWriteCloser.Write(p)
}
// StatsInterceptorFactory is created for each participant to keep of track of outgoing stats
// it adheres to Pion interceptor.Factory interface
type StatsInterceptorFactory struct {
reporter *RoomStatsReporter
}
func NewStatsInterceptorFactory(reporter *RoomStatsReporter) *StatsInterceptorFactory {
return &StatsInterceptorFactory{
reporter: reporter,
}
}
func (f *StatsInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
return &StatsInterceptor{
reporter: f.reporter,
}, nil
}
// StatsInterceptor is created for each participant to keep of track of outgoing stats
// it adheres to Pion interceptor interface
type StatsInterceptor struct {
interceptor.NoOp
reporter *RoomStatsReporter
}
// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
// will be called once per packet batch.
func (s *StatsInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
return interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
s.reporter.Outgoing.HandleRTCP(pkts)
return writer.Write(pkts, attributes)
})
}
// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
// will be called once per rtp packet.
func (s *StatsInterceptor) BindLocalStream(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
s.reporter.Outgoing.IncrementPackets(1)
s.reporter.Outgoing.IncrementBytes(uint64(len(payload)))
return writer.Write(header, payload, attributes)
})
}
func UpdateCurrentNodeStats(nodeStats *livekit.NodeStats) error {
updatedAtPrevious := nodeStats.UpdatedAt
nodeStats.UpdatedAt = time.Now().Unix()
secondsSinceLastUpdate := nodeStats.UpdatedAt - updatedAtPrevious
err := updateCurrentNodeSystemStats(nodeStats)
updateCurrentNodeRoomStats(nodeStats)
updateCurrentNodePacketStats(nodeStats, secondsSinceLastUpdate)
return err
}