mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 13:25:42 +00:00
turn packet and bytes stats used for telemetry and load control (#969)
* stats for turn * add connections stats * stats for standalone turn server only * wire update
This commit is contained in:
@@ -14,6 +14,8 @@ import (
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
logging "github.com/livekit/livekit-server/pkg/logger"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -24,7 +26,7 @@ const (
|
||||
turnMaxPort = 30000
|
||||
)
|
||||
|
||||
func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) {
|
||||
func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler, standalone bool) (*turn.Server, error) {
|
||||
turnConf := conf.TURN
|
||||
if !turnConf.Enabled {
|
||||
return nil, nil
|
||||
@@ -39,13 +41,16 @@ func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Ser
|
||||
AuthHandler: authHandler,
|
||||
LoggerFactory: logging.NewLoggerFactory(logger.GetLogger()),
|
||||
}
|
||||
relayAddrGen := &turn.RelayAddressGeneratorPortRange{
|
||||
var relayAddrGen turn.RelayAddressGenerator = &turn.RelayAddressGeneratorPortRange{
|
||||
RelayAddress: net.ParseIP(conf.RTC.NodeIP),
|
||||
Address: "0.0.0.0",
|
||||
MinPort: turnConf.RelayPortRangeStart,
|
||||
MaxPort: turnConf.RelayPortRangeEnd,
|
||||
MaxRetries: allocateRetries,
|
||||
}
|
||||
if standalone {
|
||||
relayAddrGen = telemetry.NewRelayAddressGenerator(relayAddrGen)
|
||||
}
|
||||
var logValues []interface{}
|
||||
|
||||
logValues = append(logValues, "turn.relay_range_start", turnConf.RelayPortRangeStart)
|
||||
@@ -74,6 +79,9 @@ func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Ser
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not listen on TURN TCP port")
|
||||
}
|
||||
if standalone {
|
||||
tlsListener = telemetry.NewListener(tlsListener)
|
||||
}
|
||||
|
||||
listenerConfig := turn.ListenerConfig{
|
||||
Listener: tlsListener,
|
||||
@@ -85,6 +93,9 @@ func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Ser
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not listen on TURN TCP port")
|
||||
}
|
||||
if standalone {
|
||||
tcpListener = telemetry.NewListener(tcpListener)
|
||||
}
|
||||
|
||||
listenerConfig := turn.ListenerConfig{
|
||||
Listener: tcpListener,
|
||||
@@ -101,6 +112,10 @@ func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Ser
|
||||
return nil, errors.Wrap(err, "could not listen on TURN UDP port")
|
||||
}
|
||||
|
||||
if standalone {
|
||||
udpListener = telemetry.NewPacketConn(udpListener, prometheus.Incoming)
|
||||
}
|
||||
|
||||
packetConfig := turn.PacketConnConfig{
|
||||
PacketConn: udpListener,
|
||||
RelayAddressGenerator: relayAddrGen,
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/google/wire"
|
||||
"github.com/pion/turn/v2"
|
||||
"github.com/pkg/errors"
|
||||
"gopkg.in/yaml.v3"
|
||||
|
||||
@@ -53,7 +54,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
|
||||
NewRTCService,
|
||||
NewLocalRoomManager,
|
||||
newTurnAuthHandler,
|
||||
NewTurnServer,
|
||||
newInProcessTurnServer,
|
||||
NewLivekitServer,
|
||||
)
|
||||
return &LivekitServer{}, nil
|
||||
@@ -195,3 +196,7 @@ func createClientConfiguration() clientconfiguration.ClientConfigurationManager
|
||||
func getRoomConf(config *config.Config) config.RoomConfig {
|
||||
return config.Room
|
||||
}
|
||||
|
||||
func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) {
|
||||
return NewTurnServer(conf, authHandler, false)
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/webhook"
|
||||
"github.com/pion/turn/v2"
|
||||
"github.com/pkg/errors"
|
||||
"gopkg.in/yaml.v3"
|
||||
"os"
|
||||
@@ -72,7 +73,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
|
||||
return nil, err
|
||||
}
|
||||
authHandler := newTurnAuthHandler(objectStore)
|
||||
server, err := NewTurnServer(conf, authHandler)
|
||||
server, err := newInProcessTurnServer(conf, authHandler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -221,3 +222,7 @@ func createClientConfiguration() clientconfiguration.ClientConfigurationManager
|
||||
func getRoomConf(config2 *config.Config) config.RoomConfig {
|
||||
return config2.Room
|
||||
}
|
||||
|
||||
func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) {
|
||||
return NewTurnServer(conf, authHandler, false)
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ var (
|
||||
promPliTotal *prometheus.CounterVec
|
||||
promFirTotal *prometheus.CounterVec
|
||||
promParticipantJoin *prometheus.CounterVec
|
||||
promConnections *prometheus.GaugeVec
|
||||
)
|
||||
|
||||
func initPacketStats(nodeID string) {
|
||||
@@ -71,6 +72,12 @@ func initPacketStats(nodeID string) {
|
||||
Name: "total",
|
||||
ConstLabels: prometheus.Labels{"node_id": nodeID},
|
||||
}, nil)
|
||||
promConnections = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: livekitNamespace,
|
||||
Subsystem: "connection",
|
||||
Name: "total",
|
||||
ConstLabels: prometheus.Labels{"node_id": nodeID},
|
||||
}, []string{"kind"})
|
||||
|
||||
prometheus.MustRegister(promPacketTotal)
|
||||
prometheus.MustRegister(promPacketBytes)
|
||||
@@ -78,6 +85,7 @@ func initPacketStats(nodeID string) {
|
||||
prometheus.MustRegister(promPliTotal)
|
||||
prometheus.MustRegister(promFirTotal)
|
||||
prometheus.MustRegister(promParticipantJoin)
|
||||
prometheus.MustRegister(promConnections)
|
||||
}
|
||||
|
||||
func IncrementPackets(direction Direction, count uint64, retransmit bool) {
|
||||
@@ -130,6 +138,14 @@ func IncrementParticipantJoin(join uint32) {
|
||||
}
|
||||
}
|
||||
|
||||
func AddConnection(direction Direction) {
|
||||
promConnections.WithLabelValues(string(direction)).Add(1)
|
||||
}
|
||||
|
||||
func SubConnection(direction Direction) {
|
||||
promConnections.WithLabelValues(string(direction)).Sub(1)
|
||||
}
|
||||
|
||||
func transmissionLabel(retransmit bool) string {
|
||||
if !retransmit {
|
||||
return transmissionInitial
|
||||
|
||||
116
pkg/telemetry/statsconn.go
Normal file
116
pkg/telemetry/statsconn.go
Normal file
@@ -0,0 +1,116 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"github.com/pion/turn/v2"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
)
|
||||
|
||||
type Listener struct {
|
||||
net.Listener
|
||||
}
|
||||
|
||||
func NewListener(l net.Listener) *Listener {
|
||||
return &Listener{Listener: l}
|
||||
}
|
||||
|
||||
func (l *Listener) Accept() (net.Conn, error) {
|
||||
conn, err := l.Listener.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewConn(conn, prometheus.Incoming), nil
|
||||
}
|
||||
|
||||
type Conn struct {
|
||||
net.Conn
|
||||
direction prometheus.Direction
|
||||
}
|
||||
|
||||
func NewConn(c net.Conn, direction prometheus.Direction) *Conn {
|
||||
prometheus.AddConnection(direction)
|
||||
return &Conn{Conn: c, direction: direction}
|
||||
}
|
||||
|
||||
func (c *Conn) Read(b []byte) (n int, err error) {
|
||||
n, err = c.Conn.Read(b)
|
||||
if n > 0 {
|
||||
prometheus.IncrementBytes(prometheus.Incoming, uint64(n), false)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Conn) Write(b []byte) (n int, err error) {
|
||||
n, err = c.Conn.Write(b)
|
||||
if n > 0 {
|
||||
prometheus.IncrementBytes(prometheus.Outgoing, uint64(n), false)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
prometheus.SubConnection(c.direction)
|
||||
return c.Conn.Close()
|
||||
}
|
||||
|
||||
type PacketConn struct {
|
||||
net.PacketConn
|
||||
direction prometheus.Direction
|
||||
}
|
||||
|
||||
func NewPacketConn(c net.PacketConn, direction prometheus.Direction) *PacketConn {
|
||||
prometheus.AddConnection(direction)
|
||||
return &PacketConn{PacketConn: c, direction: direction}
|
||||
}
|
||||
|
||||
func (c *PacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
|
||||
n, addr, err = c.PacketConn.ReadFrom(p)
|
||||
if n > 0 {
|
||||
prometheus.IncrementBytes(prometheus.Incoming, uint64(n), false)
|
||||
prometheus.IncrementPackets(prometheus.Incoming, 1, false)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *PacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
|
||||
n, err = c.PacketConn.WriteTo(p, addr)
|
||||
if n > 0 {
|
||||
prometheus.IncrementBytes(prometheus.Outgoing, uint64(n), false)
|
||||
prometheus.IncrementPackets(prometheus.Outgoing, 1, false)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *PacketConn) Close() error {
|
||||
prometheus.SubConnection(c.direction)
|
||||
return c.PacketConn.Close()
|
||||
}
|
||||
|
||||
type RelayAddressGenerator struct {
|
||||
turn.RelayAddressGenerator
|
||||
}
|
||||
|
||||
func NewRelayAddressGenerator(g turn.RelayAddressGenerator) *RelayAddressGenerator {
|
||||
return &RelayAddressGenerator{RelayAddressGenerator: g}
|
||||
}
|
||||
|
||||
func (g *RelayAddressGenerator) AllocatePacketConn(network string, requestedPort int) (net.PacketConn, net.Addr, error) {
|
||||
conn, addr, err := g.RelayAddressGenerator.AllocatePacketConn(network, requestedPort)
|
||||
if err != nil {
|
||||
return nil, addr, err
|
||||
}
|
||||
|
||||
return NewPacketConn(conn, prometheus.Outgoing), addr, err
|
||||
}
|
||||
|
||||
func (g *RelayAddressGenerator) AllocateConn(network string, requestedPort int) (net.Conn, net.Addr, error) {
|
||||
conn, addr, err := g.RelayAddressGenerator.AllocateConn(network, requestedPort)
|
||||
if err != nil {
|
||||
return nil, addr, err
|
||||
}
|
||||
|
||||
return NewConn(conn, prometheus.Outgoing), addr, err
|
||||
}
|
||||
Reference in New Issue
Block a user