diff --git a/go.mod b/go.mod index 6d94e0d05..c789b5ba9 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e - github.com/livekit/protocol v1.16.1-0.20240523171447-90dfd668f42f + github.com/livekit/protocol v1.16.1-0.20240524061435-6410d008bf7b github.com/livekit/psrpc v0.5.3-0.20240426045048-8ba067a45715 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -66,7 +66,7 @@ require ( github.com/eapache/queue v1.1.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-jose/go-jose/v3 v3.0.3 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/subcommands v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect @@ -80,7 +80,7 @@ require ( github.com/mattn/go-runewidth v0.0.9 // indirect github.com/mdlayher/netlink v1.7.1 // indirect github.com/mdlayher/socket v0.4.0 // indirect - github.com/nats-io/nats.go v1.34.1 // indirect + github.com/nats-io/nats.go v1.35.0 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pion/datachannel v1.5.5 // indirect @@ -105,7 +105,7 @@ require ( golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect golang.org/x/tools v0.21.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect - google.golang.org/grpc v1.63.2 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e // indirect + google.golang.org/grpc v1.64.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 6a8e67882..b1f2205bc 100644 --- a/go.sum +++ b/go.sum @@ -48,8 +48,8 @@ github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44 github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc= github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k= github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -120,8 +120,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e h1:ss4VwrouYiDpuNJ9BUTH+WsW+GDdJS70iZp8ii3/0Lc= github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.16.1-0.20240523171447-90dfd668f42f h1:T4Twu81WbBQ7HyJWv3j/D8VJ//7M4yo4s3EmdPJqcGY= -github.com/livekit/protocol v1.16.1-0.20240523171447-90dfd668f42f/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w= +github.com/livekit/protocol v1.16.1-0.20240524061435-6410d008bf7b h1:TQOoMQqruWwgbhMoxY2ZCksge88NesmWSn8eBZuWKFs= +github.com/livekit/protocol v1.16.1-0.20240524061435-6410d008bf7b/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w= github.com/livekit/psrpc v0.5.3-0.20240426045048-8ba067a45715 h1:vhDMOe8fxEc/amYTFo799LySPM12Fk3vc+Nc6o4gYZQ= github.com/livekit/psrpc v0.5.3-0.20240426045048-8ba067a45715/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -153,8 +153,8 @@ github.com/mdlayher/socket v0.4.0 h1:280wsy40IC9M9q1uPGcLBwXpcTQDtoGwVt+BNoITxIw github.com/mdlayher/socket v0.4.0/go.mod h1:xxFqz5GRCUN3UEOm9CZqEJsAbe1C8OwSK46NlmWuVoc= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4= -github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.35.0 h1:XFNqNM7v5B+MQMKqVGAyHwYhyKb48jrenXNxIU20ULk= +github.com/nats-io/nats.go v1.35.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -402,10 +402,10 @@ golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= -google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= -google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e h1:Elxv5MwEkCI9f5SkoL6afed6NTdxaGoAo39eANBwHL8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= +google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/config/config.go b/pkg/config/config.go index 67006eb33..c1b40cc12 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -117,6 +117,8 @@ type RTCConfig struct { // max number of bytes to buffer for data channel. 0 means unlimited DataChannelMaxBufferedAmount uint64 `yaml:"data_channel_max_buffered_amount,omitempty"` + + ForwardStats ForwardStatsConfig `yaml:"forward_stats,omitempty"` } type TURNServer struct { @@ -318,6 +320,12 @@ type APIConfig struct { MaxCheckInterval time.Duration `yaml:"max_check_interval,omitempty"` } +type ForwardStatsConfig struct { + SummaryInterval time.Duration `yaml:"summary_interval,omitempty"` + ReportInterval time.Duration `yaml:"report_interval,omitempty"` + ReportWindow time.Duration `yaml:"report_window,omitempty"` +} + func DefaultAPIConfig() APIConfig { return APIConfig{ ExecutionTimeout: 2 * time.Second, diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index f575b4e13..21191a924 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -70,6 +70,7 @@ type MediaTrackParams struct { Logger logger.Logger SimTracks map[uint32]SimulcastTrackInfo OnRTCP func([]rtcp.Packet) + ForwardStats *sfu.ForwardStats } func NewMediaTrack(params MediaTrackParams, ti *livekit.TrackInfo) *MediaTrack { @@ -281,6 +282,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra sfu.WithAudioConfig(t.params.AudioConfig), sfu.WithLoadBalanceThreshold(20), sfu.WithStreamTrackers(), + sfu.WithForwardStats(t.params.ForwardStats), ) newWR.OnCloseHandler(func() { t.MediaTrackReceiver.SetClosing() diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 7cb7db17f..3042e0843 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -141,6 +141,7 @@ type ParticipantParams struct { PlayoutDelay *livekit.PlayoutDelay SyncStreams bool EnableTrafficLoadTracking bool + ForwardStats *sfu.ForwardStats } type ParticipantImpl struct { @@ -2126,6 +2127,7 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv PLIThrottleConfig: p.params.PLIThrottleConfig, SimTracks: p.params.SimTracks, OnRTCP: p.postRtcp, + ForwardStats: p.params.ForwardStats, }, ti) mt.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange) diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 3dfa146d5..16437ff38 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -25,6 +25,7 @@ import ( "golang.org/x/exp/maps" "github.com/livekit/livekit-server/pkg/agent" + "github.com/livekit/livekit-server/pkg/sfu" sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/auth" @@ -83,6 +84,8 @@ type RoomManager struct { participantServers utils.MultitonService[rpc.ParticipantTopic] iceConfigCache *sutils.IceConfigCache[iceConfigCacheKey] + + forwardStats *sfu.ForwardStats } func NewLocalRoomManager( @@ -97,6 +100,7 @@ func NewLocalRoomManager( versionGenerator utils.TimedVersionGenerator, turnAuthHandler *TURNAuthHandler, bus psrpc.MessageBus, + forwardStats *sfu.ForwardStats, ) (*RoomManager, error) { rtcConf, err := rtc.NewWebRTCConfig(conf) if err != nil { @@ -116,6 +120,7 @@ func NewLocalRoomManager( versionGenerator: versionGenerator, turnAuthHandler: turnAuthHandler, bus: bus, + forwardStats: forwardStats, rooms: make(map[livekit.RoomName]*rtc.Room), @@ -232,6 +237,10 @@ func (r *RoomManager) Stop() { } r.iceConfigCache.Stop() + + if r.forwardStats != nil { + r.forwardStats.Stop() + } } // StartSession starts WebRTC session when a new participant is connected, takes place on RTC node @@ -440,6 +449,7 @@ func (r *RoomManager) StartSession( SubscriptionLimitVideo: r.config.Limit.SubscriptionLimitVideo, PlayoutDelay: roomInternal.GetPlayoutDelay(), SyncStreams: roomInternal.GetSyncStreams(), + ForwardStats: r.forwardStats, }) if err != nil { return err diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 3eb4a64c6..91d6df7d1 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -27,10 +27,11 @@ import ( "github.com/redis/go-redis/v9" "gopkg.in/yaml.v3" + "github.com/livekit/livekit-server/pkg/agent" "github.com/livekit/livekit-server/pkg/clientconfiguration" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" - "github.com/livekit/livekit-server/pkg/agent" + "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" @@ -51,6 +52,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live createKeyProvider, createWebhookNotifier, createClientConfiguration, + createForwardStats, routing.CreateRouter, getRoomConf, config.DefaultAPIConfig, @@ -235,6 +237,13 @@ func getPSRPCClientParams(config rpc.PSRPCConfig, bus psrpc.MessageBus) rpc.Clie return rpc.NewClientParams(config, bus, logger.GetLogger(), rpc.PSRPCMetricsObserver{}) } +func createForwardStats(conf *config.Config) *sfu.ForwardStats { + if conf.RTC.ForwardStats.SummaryInterval == 0 || conf.RTC.ForwardStats.ReportInterval == 0 || conf.RTC.ForwardStats.ReportWindow == 0 { + return nil + } + return sfu.NewForwardStats(conf.RTC.ForwardStats.SummaryInterval, conf.RTC.ForwardStats.ReportInterval, conf.RTC.ForwardStats.ReportWindow) +} + func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { return NewTurnServer(conf, authHandler, false) } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 08d493b13..f731de75e 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -12,6 +12,7 @@ import ( "github.com/livekit/livekit-server/pkg/clientconfiguration" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" @@ -120,7 +121,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live clientConfigurationManager := createClientConfiguration() timedVersionGenerator := utils.NewDefaultTimedVersionGenerator() turnAuthHandler := NewTURNAuthHandler(keyProvider) - roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, client, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus) + forwardStats := createForwardStats(conf) + roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, client, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus, forwardStats) if err != nil { return nil, err } @@ -286,6 +288,13 @@ func getPSRPCClientParams(config2 rpc.PSRPCConfig, bus psrpc.MessageBus) rpc.Cli return rpc.NewClientParams(config2, bus, logger.GetLogger(), rpc.PSRPCMetricsObserver{}) } +func createForwardStats(conf *config.Config) *sfu.ForwardStats { + if conf.RTC.ForwardStats.SummaryInterval == 0 || conf.RTC.ForwardStats.ReportInterval == 0 || conf.RTC.ForwardStats.ReportWindow == 0 { + return nil + } + return sfu.NewForwardStats(conf.RTC.ForwardStats.SummaryInterval, conf.RTC.ForwardStats.ReportInterval, conf.RTC.ForwardStats.ReportWindow) +} + func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { return NewTurnServer(conf, authHandler, false) } diff --git a/pkg/sfu/downtrackspreader.go b/pkg/sfu/downtrackspreader.go index 68fc7b411..98057a0d4 100644 --- a/pkg/sfu/downtrackspreader.go +++ b/pkg/sfu/downtrackspreader.go @@ -86,8 +86,12 @@ func (d *DownTrackSpreader) HasDownTrack(subscriberID livekit.ParticipantID) boo return ok } -func (d *DownTrackSpreader) Broadcast(writer func(TrackSender)) { +func (d *DownTrackSpreader) Broadcast(writer func(TrackSender)) int { downTracks := d.GetDownTracks() + if len(downTracks) == 0 { + return 0 + } + threshold := uint64(d.params.Threshold) if threshold == 0 { threshold = 1000000 @@ -97,6 +101,7 @@ func (d *DownTrackSpreader) Broadcast(writer func(TrackSender)) { // WriteRTP takes about 50µs on average, so we write to 2 down tracks per loop. step := uint64(2) utils.ParallelExec(downTracks, threshold, step, writer) + return len(downTracks) } func (d *DownTrackSpreader) DownTrackCount() int { diff --git a/pkg/sfu/forwardstats.go b/pkg/sfu/forwardstats.go new file mode 100644 index 000000000..62544e3a8 --- /dev/null +++ b/pkg/sfu/forwardstats.go @@ -0,0 +1,74 @@ +package sfu + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" + "github.com/livekit/protocol/utils" +) + +type ForwardStats struct { + lock sync.Mutex + lastLeftMs atomic.Int64 + latency *utils.LatencyAggregate + closeCh chan struct{} +} + +func NewForwardStats(latencyUpdateInterval, reportInterval, latencyWindowLength time.Duration) *ForwardStats { + s := &ForwardStats{ + latency: utils.NewLatencyAggregate(latencyUpdateInterval, latencyWindowLength), + closeCh: make(chan struct{}), + } + + go s.report(reportInterval) + return s +} + +func (s *ForwardStats) Update(arrival, left time.Time) { + leftMs := left.UnixMilli() + lastMs := s.lastLeftMs.Load() + if leftMs < lastMs || !s.lastLeftMs.CompareAndSwap(lastMs, leftMs) { + return + } + + transit := left.Sub(arrival) + s.lock.Lock() + defer s.lock.Unlock() + s.latency.Update(time.Duration(arrival.UnixNano()), float64(transit)) +} + +func (s *ForwardStats) GetStats() (latency, jitter time.Duration) { + s.lock.Lock() + defer s.lock.Unlock() + w := s.latency.Summarize() + return time.Duration(w.Mean()), time.Duration(w.StdDev()) +} + +func (s *ForwardStats) GetLastStats(duration time.Duration) (latency, jitter time.Duration) { + s.lock.Lock() + defer s.lock.Unlock() + w := s.latency.SummarizeLast(duration) + return time.Duration(w.Mean()), time.Duration(w.StdDev()) +} + +func (s *ForwardStats) Stop() { + close(s.closeCh) +} + +func (s *ForwardStats) report(reportInterval time.Duration) { + ticker := time.NewTicker(reportInterval) + defer ticker.Stop() + for { + select { + case <-s.closeCh: + return + case <-ticker.C: + latency, jitter := s.GetLastStats(reportInterval) + latencySlow, jitterSlow := s.GetStats() + prometheus.RecordForwardJitter(uint32(jitter/time.Millisecond), uint32(jitterSlow/time.Millisecond)) + prometheus.RecordForwardLatency(uint32(latency/time.Millisecond), uint32(latencySlow/time.Millisecond)) + } + } +} diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 0cb210273..337bb3021 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -27,7 +27,6 @@ import ( "google.golang.org/protobuf/proto" "github.com/livekit/mediatransportutil/pkg/bucket" - "github.com/livekit/mediatransportutil/pkg/twcc" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -108,8 +107,6 @@ type WebRTCReceiver struct { onRTCP func([]rtcp.Packet) - twcc *twcc.Responder - bufferMu sync.RWMutex buffers [buffer.DefaultMaxLayerSpatial + 1]*buffer.Buffer upTracks [buffer.DefaultMaxLayerSpatial + 1]*webrtc.TrackRemote @@ -128,7 +125,9 @@ type WebRTCReceiver struct { primaryReceiver atomic.Pointer[RedPrimaryReceiver] redReceiver atomic.Pointer[RedReceiver] - redPktWriter func(pkt *buffer.ExtPacket, spatialLayer int32) + redPktWriter func(pkt *buffer.ExtPacket, spatialLayer int32) int + + forwardStats *ForwardStats } // SVC-TODO: Have to use more conditions to differentiate between @@ -187,6 +186,13 @@ func WithLoadBalanceThreshold(downTracks int) ReceiverOpts { } } +func WithForwardStats(forwardStats *ForwardStats) ReceiverOpts { + return func(w *WebRTCReceiver) *WebRTCReceiver { + w.forwardStats = forwardStats + return w + } +} + // NewWebRTCReceiver creates a new webrtc track receiver func NewWebRTCReceiver( receiver *webrtc.RTPReceiver, @@ -709,12 +715,16 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } } - w.downTrackSpreader.Broadcast(func(dt TrackSender) { + writeCount := w.downTrackSpreader.Broadcast(func(dt TrackSender) { _ = dt.WriteRTP(pkt, spatialLayer) }) if redPktWriter != nil { - redPktWriter(pkt, spatialLayer) + writeCount += redPktWriter(pkt, spatialLayer) + } + + if writeCount > 0 && w.forwardStats != nil { + w.forwardStats.Update(pkt.Arrival, time.Now()) } if spatialTracker != nil { diff --git a/pkg/sfu/redprimaryreceiver.go b/pkg/sfu/redprimaryreceiver.go index e9e2642f3..70cc1945e 100644 --- a/pkg/sfu/redprimaryreceiver.go +++ b/pkg/sfu/redprimaryreceiver.go @@ -57,18 +57,19 @@ func NewRedPrimaryReceiver(receiver TrackReceiver, dsp DownTrackSpreaderParams) } } -func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) { +func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int { // extract primary payload from RED and forward to downtracks if r.downTrackSpreader.DownTrackCount() == 0 { - return + return 0 } pkts, err := r.getSendPktsFromRed(pkt.Packet) if err != nil { r.logger.Errorw("get encoding for red failed", err, "payloadtype", pkt.Packet.PayloadType) - return + return 0 } + var count int for i, sendPkt := range pkts { pPkt := *pkt if i != len(pkts)-1 { @@ -81,10 +82,11 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3 // not modify the ExtPacket.RawPacket here for performance since it is not used by the DownTrack, // otherwise it should be set to the correct value (marshal the primary rtp packet) - r.downTrackSpreader.Broadcast(func(dt TrackSender) { + count += r.downTrackSpreader.Broadcast(func(dt TrackSender) { _ = dt.WriteRTP(&pPkt, spatialLayer) }) } + return count } func (r *RedPrimaryReceiver) AddDownTrack(track TrackSender) error { diff --git a/pkg/sfu/redreceiver.go b/pkg/sfu/redreceiver.go index fe254a972..d9b637c89 100644 --- a/pkg/sfu/redreceiver.go +++ b/pkg/sfu/redreceiver.go @@ -55,15 +55,15 @@ func NewRedReceiver(receiver TrackReceiver, dsp DownTrackSpreaderParams) *RedRec } } -func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) { +func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int { // extract primary payload from RED and forward to downtracks if r.downTrackSpreader.DownTrackCount() == 0 { - return + return 0 } redLen, err := r.encodeRedForPrimary(pkt.Packet, r.redPayloadBuf[:]) if err != nil { r.logger.Errorw("red encoding failed", err) - return + return 0 } pPkt := *pkt @@ -73,7 +73,7 @@ func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) { // not modify the ExtPacket.RawPacket here for performance since it is not used by the DownTrack, // otherwise it should be set to the correct value (marshal the primary rtp packet) - r.downTrackSpreader.Broadcast(func(dt TrackSender) { + return r.downTrackSpreader.Broadcast(func(dt TrackSender) { _ = dt.WriteRTP(&pPkt, spatialLayer) }) } diff --git a/pkg/telemetry/prometheus/node.go b/pkg/telemetry/prometheus/node.go index 703615693..fe186e1dd 100644 --- a/pkg/telemetry/prometheus/node.go +++ b/pkg/telemetry/prometheus/node.go @@ -165,6 +165,8 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats trackPublishSuccessNow := trackPublishSuccess.Load() trackSubscribeAttemptsNow := trackSubscribeAttempts.Load() trackSubscribeSuccessNow := trackSubscribeSuccess.Load() + forwardLatencyNow := forwardLatency.Load() + forwardJitterNow := forwardJitter.Load() updatedAt := time.Now().Unix() elapsed := updatedAt - prevAverage.UpdatedAt @@ -207,6 +209,8 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats RetransmitBytesOutPerSec: prevAverage.RetransmitBytesOutPerSec, RetransmitPacketsOutPerSec: prevAverage.RetransmitPacketsOutPerSec, NackPerSec: prevAverage.NackPerSec, + ForwardLatency: forwardLatencyNow, + ForwardJitter: forwardJitterNow, ParticipantSignalConnectedPerSec: prevAverage.ParticipantSignalConnectedPerSec, ParticipantRtcInitPerSec: prevAverage.ParticipantRtcInitPerSec, ParticipantRtcConnectedPerSec: prevAverage.ParticipantRtcConnectedPerSec, diff --git a/pkg/telemetry/prometheus/packets.go b/pkg/telemetry/prometheus/packets.go index 3efd1eb28..7242f803f 100644 --- a/pkg/telemetry/prometheus/packets.go +++ b/pkg/telemetry/prometheus/packets.go @@ -41,6 +41,8 @@ var ( participantSignalConnected atomic.Uint64 participantRTCConnected atomic.Uint64 participantRTCInit atomic.Uint64 + forwardLatency atomic.Uint32 + forwardJitter atomic.Uint32 promPacketLabels = []string{"direction", "transmission"} promPacketTotal *prometheus.CounterVec @@ -56,6 +58,8 @@ var ( promRTT *prometheus.HistogramVec promParticipantJoin *prometheus.CounterVec promConnections *prometheus.GaugeVec + promForwardLatency prometheus.Gauge + promForwardJitter prometheus.Gauge promPacketTotalIncomingInitial prometheus.Counter promPacketTotalIncomingRetransmit prometheus.Counter @@ -139,6 +143,18 @@ func initPacketStats(nodeID string, nodeType livekit.NodeType) { Name: "total", ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()}, }, []string{"kind"}) + promForwardLatency = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: livekitNamespace, + Subsystem: "forward", + Name: "latency", + ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()}, + }) + promForwardJitter = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: livekitNamespace, + Subsystem: "forward", + Name: "jitter", + ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()}, + }) prometheus.MustRegister(promPacketTotal) prometheus.MustRegister(promPacketBytes) @@ -280,3 +296,13 @@ func AddConnection(direction Direction) { func SubConnection(direction Direction) { promConnections.WithLabelValues(string(direction)).Sub(1) } + +func RecordForwardLatency(latencyInstant, latencyAvg uint32) { + forwardLatency.Store(latencyAvg) + promForwardLatency.Set(float64(latencyInstant)) +} + +func RecordForwardJitter(jitterInstant, jitterAvg uint32) { + forwardJitter.Store(jitterAvg) + promForwardJitter.Set(float64(jitterInstant)) +}