From c401ca58afe0988b8ecf45908893b748affe2969 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 31 Aug 2022 11:00:27 +0800 Subject: [PATCH] turn packet and bytes stats used for telemetry and load control (#969) * stats for turn * add connections stats * stats for standalone turn server only * wire update --- pkg/service/turn.go | 19 ++++- pkg/service/wire.go | 7 +- pkg/service/wire_gen.go | 7 +- pkg/telemetry/prometheus/packets.go | 16 ++++ pkg/telemetry/statsconn.go | 116 ++++++++++++++++++++++++++++ 5 files changed, 161 insertions(+), 4 deletions(-) create mode 100644 pkg/telemetry/statsconn.go diff --git a/pkg/service/turn.go b/pkg/service/turn.go index a11bcdb20..26a4efc44 100644 --- a/pkg/service/turn.go +++ b/pkg/service/turn.go @@ -14,6 +14,8 @@ import ( "github.com/livekit/livekit-server/pkg/config" logging "github.com/livekit/livekit-server/pkg/logger" + "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) const ( @@ -24,7 +26,7 @@ const ( turnMaxPort = 30000 ) -func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { +func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler, standalone bool) (*turn.Server, error) { turnConf := conf.TURN if !turnConf.Enabled { return nil, nil @@ -39,13 +41,16 @@ func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Ser AuthHandler: authHandler, LoggerFactory: logging.NewLoggerFactory(logger.GetLogger()), } - relayAddrGen := &turn.RelayAddressGeneratorPortRange{ + var relayAddrGen turn.RelayAddressGenerator = &turn.RelayAddressGeneratorPortRange{ RelayAddress: net.ParseIP(conf.RTC.NodeIP), Address: "0.0.0.0", MinPort: turnConf.RelayPortRangeStart, MaxPort: turnConf.RelayPortRangeEnd, MaxRetries: allocateRetries, } + if standalone { + relayAddrGen = telemetry.NewRelayAddressGenerator(relayAddrGen) + } var logValues []interface{} logValues = append(logValues, "turn.relay_range_start", turnConf.RelayPortRangeStart) @@ -74,6 +79,9 @@ func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Ser if err != nil { return nil, errors.Wrap(err, "could not listen on TURN TCP port") } + if standalone { + tlsListener = telemetry.NewListener(tlsListener) + } listenerConfig := turn.ListenerConfig{ Listener: tlsListener, @@ -85,6 +93,9 @@ func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Ser if err != nil { return nil, errors.Wrap(err, "could not listen on TURN TCP port") } + if standalone { + tcpListener = telemetry.NewListener(tcpListener) + } listenerConfig := turn.ListenerConfig{ Listener: tcpListener, @@ -101,6 +112,10 @@ func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Ser return nil, errors.Wrap(err, "could not listen on TURN UDP port") } + if standalone { + udpListener = telemetry.NewPacketConn(udpListener, prometheus.Incoming) + } + packetConfig := turn.PacketConnConfig{ PacketConn: udpListener, RelayAddressGenerator: relayAddrGen, diff --git a/pkg/service/wire.go b/pkg/service/wire.go index c6bf2e300..836d796c6 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -11,6 +11,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/google/wire" + "github.com/pion/turn/v2" "github.com/pkg/errors" "gopkg.in/yaml.v3" @@ -53,7 +54,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live NewRTCService, NewLocalRoomManager, newTurnAuthHandler, - NewTurnServer, + newInProcessTurnServer, NewLivekitServer, ) return &LivekitServer{}, nil @@ -195,3 +196,7 @@ func createClientConfiguration() clientconfiguration.ClientConfigurationManager func getRoomConf(config *config.Config) config.RoomConfig { return config.Room } + +func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { + return NewTurnServer(conf, authHandler, false) +} diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 650b96e6f..dc3ec335f 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -21,6 +21,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/webhook" + "github.com/pion/turn/v2" "github.com/pkg/errors" "gopkg.in/yaml.v3" "os" @@ -72,7 +73,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } authHandler := newTurnAuthHandler(objectStore) - server, err := NewTurnServer(conf, authHandler) + server, err := newInProcessTurnServer(conf, authHandler) if err != nil { return nil, err } @@ -221,3 +222,7 @@ func createClientConfiguration() clientconfiguration.ClientConfigurationManager func getRoomConf(config2 *config.Config) config.RoomConfig { return config2.Room } + +func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { + return NewTurnServer(conf, authHandler, false) +} diff --git a/pkg/telemetry/prometheus/packets.go b/pkg/telemetry/prometheus/packets.go index d939c5bb8..3097e917f 100644 --- a/pkg/telemetry/prometheus/packets.go +++ b/pkg/telemetry/prometheus/packets.go @@ -32,6 +32,7 @@ var ( promPliTotal *prometheus.CounterVec promFirTotal *prometheus.CounterVec promParticipantJoin *prometheus.CounterVec + promConnections *prometheus.GaugeVec ) func initPacketStats(nodeID string) { @@ -71,6 +72,12 @@ func initPacketStats(nodeID string) { Name: "total", ConstLabels: prometheus.Labels{"node_id": nodeID}, }, nil) + promConnections = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: livekitNamespace, + Subsystem: "connection", + Name: "total", + ConstLabels: prometheus.Labels{"node_id": nodeID}, + }, []string{"kind"}) prometheus.MustRegister(promPacketTotal) prometheus.MustRegister(promPacketBytes) @@ -78,6 +85,7 @@ func initPacketStats(nodeID string) { prometheus.MustRegister(promPliTotal) prometheus.MustRegister(promFirTotal) prometheus.MustRegister(promParticipantJoin) + prometheus.MustRegister(promConnections) } func IncrementPackets(direction Direction, count uint64, retransmit bool) { @@ -130,6 +138,14 @@ func IncrementParticipantJoin(join uint32) { } } +func AddConnection(direction Direction) { + promConnections.WithLabelValues(string(direction)).Add(1) +} + +func SubConnection(direction Direction) { + promConnections.WithLabelValues(string(direction)).Sub(1) +} + func transmissionLabel(retransmit bool) string { if !retransmit { return transmissionInitial diff --git a/pkg/telemetry/statsconn.go b/pkg/telemetry/statsconn.go new file mode 100644 index 000000000..91f60af94 --- /dev/null +++ b/pkg/telemetry/statsconn.go @@ -0,0 +1,116 @@ +package telemetry + +import ( + "net" + + "github.com/pion/turn/v2" + + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" +) + +type Listener struct { + net.Listener +} + +func NewListener(l net.Listener) *Listener { + return &Listener{Listener: l} +} + +func (l *Listener) Accept() (net.Conn, error) { + conn, err := l.Listener.Accept() + if err != nil { + return nil, err + } + + return NewConn(conn, prometheus.Incoming), nil +} + +type Conn struct { + net.Conn + direction prometheus.Direction +} + +func NewConn(c net.Conn, direction prometheus.Direction) *Conn { + prometheus.AddConnection(direction) + return &Conn{Conn: c, direction: direction} +} + +func (c *Conn) Read(b []byte) (n int, err error) { + n, err = c.Conn.Read(b) + if n > 0 { + prometheus.IncrementBytes(prometheus.Incoming, uint64(n), false) + } + return +} + +func (c *Conn) Write(b []byte) (n int, err error) { + n, err = c.Conn.Write(b) + if n > 0 { + prometheus.IncrementBytes(prometheus.Outgoing, uint64(n), false) + } + return +} + +func (c *Conn) Close() error { + prometheus.SubConnection(c.direction) + return c.Conn.Close() +} + +type PacketConn struct { + net.PacketConn + direction prometheus.Direction +} + +func NewPacketConn(c net.PacketConn, direction prometheus.Direction) *PacketConn { + prometheus.AddConnection(direction) + return &PacketConn{PacketConn: c, direction: direction} +} + +func (c *PacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + n, addr, err = c.PacketConn.ReadFrom(p) + if n > 0 { + prometheus.IncrementBytes(prometheus.Incoming, uint64(n), false) + prometheus.IncrementPackets(prometheus.Incoming, 1, false) + } + return +} + +func (c *PacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { + n, err = c.PacketConn.WriteTo(p, addr) + if n > 0 { + prometheus.IncrementBytes(prometheus.Outgoing, uint64(n), false) + prometheus.IncrementPackets(prometheus.Outgoing, 1, false) + } + return +} + +func (c *PacketConn) Close() error { + prometheus.SubConnection(c.direction) + return c.PacketConn.Close() +} + +type RelayAddressGenerator struct { + turn.RelayAddressGenerator +} + +func NewRelayAddressGenerator(g turn.RelayAddressGenerator) *RelayAddressGenerator { + return &RelayAddressGenerator{RelayAddressGenerator: g} +} + +func (g *RelayAddressGenerator) AllocatePacketConn(network string, requestedPort int) (net.PacketConn, net.Addr, error) { + conn, addr, err := g.RelayAddressGenerator.AllocatePacketConn(network, requestedPort) + if err != nil { + return nil, addr, err + } + + return NewPacketConn(conn, prometheus.Outgoing), addr, err +} + +func (g *RelayAddressGenerator) AllocateConn(network string, requestedPort int) (net.Conn, net.Addr, error) { + conn, addr, err := g.RelayAddressGenerator.AllocateConn(network, requestedPort) + if err != nil { + return nil, addr, err + } + + return NewConn(conn, prometheus.Outgoing), addr, err +}