diff --git a/go.mod b/go.mod index 01988518b..a723e88b6 100644 --- a/go.mod +++ b/go.mod @@ -137,6 +137,6 @@ require ( golang.org/x/tools v0.25.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect - google.golang.org/grpc v1.66.1 // indirect + google.golang.org/grpc v1.66.2 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index c3d22a889..7c6c3f8a6 100644 --- a/go.sum +++ b/go.sum @@ -488,8 +488,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 h1: google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117/go.mod h1:OimBR/bc1wPO9iV4NC2bpyjy3VnAwZh5EBPQdtaE5oo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= -google.golang.org/grpc v1.66.1 h1:hO5qAXR19+/Z44hmvIM4dQFMSYX9XcWsByfoxutBpAM= -google.golang.org/grpc v1.66.1/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo= +google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= diff --git a/pkg/config/config.go b/pkg/config/config.go index fc57b23f0..9bb5a73c2 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -27,6 +27,7 @@ import ( "github.com/urfave/cli/v2" "gopkg.in/yaml.v3" + "github.com/livekit/livekit-server/pkg/metric" "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -85,6 +86,8 @@ type Config struct { Limit LimitConfig `yaml:"limit,omitempty"` Development bool `yaml:"development,omitempty"` + + Metric metric.MetricConfig `yaml:"metric,omitempty"` } type RTCConfig struct { @@ -572,8 +575,9 @@ var DefaultConfig = Config{ StreamBufferSize: 1000, ConnectAttempts: 3, }, - PSRPC: rpc.DefaultPSRPCConfig, - Keys: map[string]string{}, + PSRPC: rpc.DefaultPSRPCConfig, + Keys: map[string]string{}, + Metric: metric.DefaultMetricConfig, } func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []cli.Flag) (*Config, error) { diff --git a/pkg/metric/metric_config.go b/pkg/metric/metric_config.go new file mode 100644 index 000000000..0e9e25c4c --- /dev/null +++ b/pkg/metric/metric_config.go @@ -0,0 +1,27 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +// ------------------------------------------------ + +type MetricConfig struct { + Timestamper MetricTimestamperConfig `yaml:"timestamper_config,omitempty"` +} + +var ( + DefaultMetricConfig = MetricConfig{ + Timestamper: DefaultMetricTimestamperConfig, + } +) diff --git a/pkg/metric/metric_timestamper.go b/pkg/metric/metric_timestamper.go new file mode 100644 index 000000000..6ed9637ac --- /dev/null +++ b/pkg/metric/metric_timestamper.go @@ -0,0 +1,116 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "sync" + "time" + + "github.com/livekit/livekit-server/pkg/sfu/utils" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// ------------------------------------------------ + +type MetricTimestamperConfig struct { + OneWayDelayEstimatorMinInterval time.Duration `yaml:"one_way_delay_estimator_min_interval,omitempty"` + OneWayDelayEstimatorMaxBatch int `yaml:"one_way_delay_estimator_max_batch,omitempty"` +} + +var ( + DefaultMetricTimestamperConfig = MetricTimestamperConfig{ + OneWayDelayEstimatorMinInterval: 5 * time.Second, + OneWayDelayEstimatorMaxBatch: 100, + } +) + +// ------------------------------------------------ + +type MetricTimestamperParams struct { + Config MetricTimestamperConfig + BaseTime time.Time + Logger logger.Logger +} + +type MetricTimestamper struct { + params MetricTimestamperParams + lock sync.Mutex + owdEstimator *utils.OWDEstimator + lastOWDEstimatorRunAt time.Time + batchesSinceLastOWDEstimatorRun int +} + +func NewMetricTimestamper(params MetricTimestamperParams) *MetricTimestamper { + return &MetricTimestamper{ + params: params, + owdEstimator: utils.NewOWDEstimator(utils.OWDEstimatorParamsDefault), + lastOWDEstimatorRunAt: time.Now().Add(-params.Config.OneWayDelayEstimatorMinInterval), + } +} + +func (m *MetricTimestamper) Process(batch *livekit.MetricsBatch) { + // run OWD estimation periodically + estimatedOWDNanos := m.maybeRunOWDEstimator(batch) + + // normalize all time stamps and add estimated OWD + // NOTE: all timestamps will be re-mapped. If the time series or event happened some time + // in the past and the OWD estimation has changed since, those samples will get the updated + // OWD estimation applied. So, they may have more uncertainty in addition to the uncertainty + // of OWD estimation process. + batch.NormalizedTimestamp = timestamppb.New(time.Unix(0, batch.TimestampMs*1e6+estimatedOWDNanos)) + + for _, ts := range batch.TimeSeries { + for _, sample := range ts.Samples { + sample.NormalizedTimestamp = timestamppb.New(time.Unix(0, sample.TimestampMs*1e6+estimatedOWDNanos)) + } + } + + for _, ev := range batch.Events { + ev.NormalizedStartTimestamp = timestamppb.New(time.Unix(0, ev.StartTimestampMs*1e6+estimatedOWDNanos)) + + endTimestampMs := ev.GetEndTimestampMs() + if endTimestampMs != 0 { + ev.NormalizedEndTimestamp = timestamppb.New(time.Unix(0, endTimestampMs*1e6+estimatedOWDNanos)) + } + } + + m.params.Logger.Debugw("timestamped metrics batch", "batch", logger.Proto(batch)) +} + +func (m *MetricTimestamper) maybeRunOWDEstimator(batch *livekit.MetricsBatch) int64 { + m.lock.Lock() + defer m.lock.Unlock() + + if time.Since(m.lastOWDEstimatorRunAt) < m.params.Config.OneWayDelayEstimatorMinInterval && + m.batchesSinceLastOWDEstimatorRun < m.params.Config.OneWayDelayEstimatorMaxBatch { + m.batchesSinceLastOWDEstimatorRun++ + return m.owdEstimator.EstimatedPropagationDelay().Nanoseconds() + } + + senderClockTime := batch.GetTimestampMs() + if senderClockTime == 0 { + m.batchesSinceLastOWDEstimatorRun++ + return m.owdEstimator.EstimatedPropagationDelay().Nanoseconds() + } + + m.lastOWDEstimatorRunAt = time.Now() + m.batchesSinceLastOWDEstimatorRun = 1 + + at := m.params.BaseTime.Add(time.Since(m.params.BaseTime)) + estimatedOWD, _ := m.owdEstimator.Update(time.UnixMilli(senderClockTime), at) + return estimatedOWD.Nanoseconds() +} diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index 182f862a8..ff100eeb9 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -20,10 +20,10 @@ import ( "time" "go.uber.org/atomic" - "google.golang.org/protobuf/proto" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" ) var _ Router = (*LocalRouter)(nil) @@ -58,7 +58,7 @@ func NewLocalRouter( func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error) { r.lock.Lock() defer r.lock.Unlock() - node := proto.Clone((*livekit.Node)(r.currentNode)).(*livekit.Node) + node := utils.CloneProto((*livekit.Node)(r.currentNode)) return node, nil } diff --git a/pkg/rtc/errors.go b/pkg/rtc/errors.go index 27bf936c3..4141f6aff 100644 --- a/pkg/rtc/errors.go +++ b/pkg/rtc/errors.go @@ -42,4 +42,6 @@ var ( ErrTrackNotAttached = errors.New("track is not yet attached") ErrTrackNotBound = errors.New("track not bound") ErrSubscriptionLimitExceeded = errors.New("participant has exceeded its subscription limit") + + ErrNoSubscribeMetricsPermission = errors.New("participant is not given permission to subscribe to metrics") ) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 11f306a54..9b1e8c2ea 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -56,6 +56,7 @@ type MediaTrack struct { } type MediaTrackParams struct { + BaseTime time.Time SignalCid string SdpCid string ParticipantID livekit.ParticipantID @@ -275,6 +276,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra newWR := sfu.NewWebRTCReceiver( receiver, + t.params.BaseTime, track, ti, LoggerWithCodecMime(t.params.Logger, mime), diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 363d45250..433f64aad 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -712,7 +712,7 @@ func (t *MediaTrackReceiver) TrackInfo() *livekit.TrackInfo { } func (t *MediaTrackReceiver) TrackInfoClone() *livekit.TrackInfo { - return proto.Clone(t.TrackInfo()).(*livekit.TrackInfo) + return utils.CloneProto(t.TrackInfo()) } func (t *MediaTrackReceiver) NotifyMaxLayerChange(maxLayer int32) { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 48776d0ba..e24c0cd09 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -17,7 +17,6 @@ package rtc import ( "context" "fmt" - "io" "os" "slices" "strconv" @@ -27,7 +26,6 @@ import ( lru "github.com/hashicorp/golang-lru/v2" "github.com/pion/rtcp" - "github.com/pion/sctp" "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" "github.com/pkg/errors" @@ -42,6 +40,7 @@ import ( "github.com/livekit/protocol/utils/guid" "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/metric" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/supervisor" "github.com/livekit/livekit-server/pkg/rtc/transport" @@ -109,6 +108,7 @@ func (p participantUpdateInfo) String() string { // --------------------------------------------------------------- type ParticipantParams struct { + BaseTime time.Time Identity livekit.ParticipantIdentity Name livekit.ParticipantName SID livekit.ParticipantID @@ -157,6 +157,7 @@ type ParticipantParams struct { SyncStreams bool ForwardStats *sfu.ForwardStats DisableSenderReportPassThrough bool + MetricConfig metric.MetricConfig } type ParticipantImpl struct { @@ -234,6 +235,7 @@ type ParticipantImpl struct { onMigrateStateChange func(p types.LocalParticipant, migrateState types.MigrateState) onParticipantUpdate func(types.LocalParticipant) onDataPacket func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) + onMetrics func(types.LocalParticipant, *livekit.DataPacket) migrateState atomic.Value // types.MigrateState @@ -248,6 +250,8 @@ type ParticipantImpl struct { tracksQuality map[livekit.TrackID]livekit.ConnectionQuality + metricTimestamper *metric.MetricTimestamper + // loggers for publisher and subscriber pubLogger logger.Logger subLogger logger.Logger @@ -279,10 +283,16 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { dataChannelStats: telemetry.NewBytesTrackStats( telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, params.SID), params.SID, - params.Telemetry), + params.Telemetry, + ), tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality), - pubLogger: params.Logger.WithComponent(sutils.ComponentPub), - subLogger: params.Logger.WithComponent(sutils.ComponentSub), + metricTimestamper: metric.NewMetricTimestamper(metric.MetricTimestamperParams{ + Config: params.MetricConfig.Timestamper, + BaseTime: params.BaseTime, + Logger: params.Logger, + }), + pubLogger: params.Logger.WithComponent(sutils.ComponentPub), + subLogger: params.Logger.WithComponent(sutils.ComponentSub), } if !params.DisableSupervisor { p.supervisor = supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}) @@ -749,6 +759,24 @@ func (p *ParticipantImpl) OnDataPacket(callback func(types.LocalParticipant, liv p.lock.Unlock() } +func (p *ParticipantImpl) getOnDataPacket() func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) { + p.lock.RLock() + defer p.lock.RUnlock() + return p.onDataPacket +} + +func (p *ParticipantImpl) OnMetrics(callback func(types.LocalParticipant, *livekit.DataPacket)) { + p.lock.Lock() + p.onMetrics = callback + p.lock.Unlock() +} + +func (p *ParticipantImpl) getOnMetrics() func(types.LocalParticipant, *livekit.DataPacket) { + p.lock.RLock() + defer p.lock.RUnlock() + return p.onMetrics +} + func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant)) { p.lock.Lock() p.onClose = callback @@ -1201,6 +1229,10 @@ func (p *ParticipantImpl) Hidden() bool { return p.grants.Load().Video.Hidden } +func (p *ParticipantImpl) CanSubscribeMetrics() bool { + return p.grants.Load().Video.GetCanSubscribeMetrics() +} + func (p *ParticipantImpl) VerifySubscribeParticipantInfo(pID livekit.ParticipantID, version uint32) { if !p.IsReady() { // we have not sent a JoinResponse yet. metadata would be covered in JoinResponse @@ -1333,6 +1365,10 @@ func (h SubscriberTransportHandler) OnInitialConnected() { h.p.onSubscriberInitialConnected() } +func (h SubscriberTransportHandler) OnDataSendError(err error) { + h.p.onDataSendError(err) +} + // ---------------------------------------------------------- type PrimaryTransportHandler struct { @@ -1391,6 +1427,7 @@ func (p *ParticipantImpl) setupTransportManager() error { Logger: p.params.Logger.WithComponent(sutils.ComponentTransport), PublisherHandler: pth, SubscriberHandler: sth, + DataChannelStats: p.dataChannelStats, } if p.params.SyncStreams && p.params.PlayoutDelay.GetEnabled() && p.params.ClientInfo.isFirefox() { // we will disable playout delay for Firefox if the user is expecting @@ -1601,7 +1638,9 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt dp.ParticipantIdentity = string(p.params.Identity) } - shouldForward := false + shouldForwardData := true + shouldForwardMetrics := false + isPublisher := true // only forward on user payloads switch payload := dp.Value.(type) { case *livekit.DataPacket_User: @@ -1623,28 +1662,42 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt } else { dp.DestinationIdentities = u.DestinationIdentities } - shouldForward = true case *livekit.DataPacket_SipDtmf: - shouldForward = true case *livekit.DataPacket_Transcription: if p.Kind() == livekit.ParticipantInfo_AGENT { - shouldForward = true + shouldForwardData = false } case *livekit.DataPacket_ChatMessage: - shouldForward = true + shouldForwardData = true + case *livekit.DataPacket_Metrics: + shouldForwardData = false + shouldForwardMetrics = true + isPublisher = false + // METRICS-TODO-QUESTIONS: + // 1. Should this record (and do processing/batching) metrics (i. e. publisher side) rather + // than forwarding and recording/processing/batching at every subscriber (in this case + // subscriber is defined as the other participants pushing this to edge client). + // 2. If the above is done, there could be two cadences, publisher side recording/processing/batching + // and pushing it to all subscribers on some cadence and subscribers have their own cadence of + // processing/batching and sending to edge clients. + p.metricTimestamper.Process(payload.Metrics) default: p.pubLogger.Warnw("received unsupported data packet", nil, "payload", payload) } - if shouldForward { - p.lock.RLock() - onDataPacket := p.onDataPacket - p.lock.RUnlock() - if onDataPacket != nil { + if shouldForwardData { + if onDataPacket := p.getOnDataPacket(); onDataPacket != nil { onDataPacket(p, kind, dp) } } + if shouldForwardMetrics { + if onMetrics := p.getOnMetrics(); onMetrics != nil { + onMetrics(p, dp) + } + } - p.setIsPublisher(true) + if isPublisher { + p.setIsPublisher(true) + } } func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) error { @@ -2194,6 +2247,7 @@ func (p *ParticipantImpl) addMigratedTrack(cid string, ti *livekit.TrackInfo) *M func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *livekit.TrackInfo) *MediaTrack { mt := NewMediaTrack(MediaTrackParams{ + BaseTime: p.params.BaseTime, SignalCid: signalCid, SdpCid: sdpCid, ParticipantID: p.params.SID, @@ -2651,16 +2705,14 @@ func (p *ParticipantImpl) SendDataPacket(kind livekit.DataPacket_Kind, encoded [ return ErrDataChannelUnavailable } - err := p.TransportManager.SendDataPacket(kind, encoded) - if err != nil { - if (errors.Is(err, sctp.ErrStreamClosed) || errors.Is(err, io.ErrClosedPipe)) && p.params.ReconnectOnDataChannelError { - p.params.Logger.Infow("issuing full reconnect on data channel error", "error", err) - p.IssueFullReconnect(types.ParticipantCloseReasonDataChannelError) - } - } else { - p.dataChannelStats.AddBytes(uint64(len(encoded)), true) + return p.TransportManager.SendDataPacket(kind, encoded) +} + +func (p *ParticipantImpl) onDataSendError(err error) { + if p.params.ReconnectOnDataChannelError { + p.params.Logger.Infow("issuing full reconnect on data channel error", "error", err) + p.IssueFullReconnect(types.ParticipantCloseReasonDataChannelError) } - return err } func (p *ParticipantImpl) setupEnabledCodecs(publishEnabledCodecs []*livekit.Codec, subscribeEnabledCodecs []*livekit.Codec, disabledCodecs *livekit.DisabledCodecs) { @@ -2769,3 +2821,33 @@ func (p *ParticipantImpl) UpdateVideoTrack(update *livekit.UpdateLocalVideoTrack p.pubLogger.Debugw("could not locate track", "trackID", update.TrackSid) return errors.New("could not find track") } + +func (p *ParticipantImpl) HandleMetrics(senderParticipantID livekit.ParticipantID, metrics *livekit.MetricsBatch) error { + if p.State() != livekit.ParticipantInfo_ACTIVE { + return ErrDataChannelUnavailable + } + + if !p.CanSubscribeMetrics() { + return ErrNoSubscribeMetricsPermission + } + + if !p.SubscriptionManager.IsSubscribedTo(senderParticipantID) { + return nil + } + + // METRICS-TODO: This is just forwarding. Will have to do more, including but not limited to + // 1. Filtering: subscriber metrics from self only should be sent to that participant. + // 2. Batching: could include re-mapping labels to consolidate multiple batches. + // 3. (Maybe) Time stamps: this is done on receive, TBD if required here also + dpData, err := proto.Marshal(&livekit.DataPacket{ + Value: &livekit.DataPacket_Metrics{ + Metrics: metrics, + }, + }) + if err != nil { + p.params.Logger.Errorw("failed to marshal data packet", err) + return err + } + + return p.TransportManager.SendDataPacket(livekit.DataPacket_RELIABLE, dpData) +} diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index a64a3f028..99038451a 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -16,9 +16,7 @@ package rtc import ( "context" - "errors" "fmt" - "io" "math" "slices" "sort" @@ -30,8 +28,6 @@ import ( "golang.org/x/exp/maps" "google.golang.org/protobuf/proto" - "github.com/pion/sctp" - "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" @@ -482,6 +478,7 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me participant.OnTrackUnpublished(r.onTrackUnpublished) participant.OnParticipantUpdate(r.onParticipantUpdate) participant.OnDataPacket(r.onDataPacket) + participant.OnMetrics(r.onMetrics) participant.OnSubscribeStatusChanged(func(publisherID livekit.ParticipantID, subscribed bool) { if subscribed { pub := r.GetParticipantByID(publisherID) @@ -726,6 +723,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek p.OnStateChange(nil) p.OnParticipantUpdate(nil) p.OnDataPacket(nil) + p.OnMetrics(nil) p.OnSubscribeStatusChanged(nil) // close participant as well @@ -1272,6 +1270,10 @@ func (r *Room) onDataPacket(source types.LocalParticipant, kind livekit.DataPack BroadcastDataPacketForRoom(r, source, kind, dp, r.Logger) } +func (r *Room) onMetrics(source types.LocalParticipant, dp *livekit.DataPacket) { + BroadcastMetricsForRoom(r, source, dp, r.Logger) +} + func (r *Room) subscribeToExistingTracks(p types.LocalParticipant) { r.lock.RLock() shouldSubscribe := r.autoSubscribe(p) @@ -1769,9 +1771,6 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, kin var dpData []byte for _, op := range participants { - if op.State() != livekit.ParticipantInfo_ACTIVE { - continue - } if source != nil && op.ID() == source.ID() { continue } @@ -1792,14 +1791,21 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, kin } utils.ParallelExec(destParticipants, dataForwardLoadBalanceThreshold, 1, func(op types.LocalParticipant) { - err := op.SendDataPacket(kind, dpData) - if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, sctp.ErrStreamClosed) && - !errors.Is(err, ErrTransportFailure) && !errors.Is(err, ErrDataChannelBufferFull) { - op.GetLogger().Infow("send data packet error", "error", err) - } + op.SendDataPacket(kind, dpData) }) } +func BroadcastMetricsForRoom(r types.Room, source types.LocalParticipant, dp *livekit.DataPacket, logger logger.Logger) { + switch payload := dp.Value.(type) { + case *livekit.DataPacket_Metrics: + utils.ParallelExec(r.GetLocalParticipants(), dataForwardLoadBalanceThreshold, 1, func(op types.LocalParticipant) { + // echoing back to sender too + op.HandleMetrics(source.ID(), payload.Metrics) + }) + default: + } +} + func IsCloseNotifySkippable(closeReason types.ParticipantCloseReason) bool { return closeReason == types.ParticipantCloseReasonDuplicateIdentity } diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 31d90eb6a..4cdb9e4ad 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -24,6 +24,7 @@ import ( "github.com/pion/webrtc/v3/pkg/rtcerr" "go.uber.org/atomic" + "golang.org/x/exp/maps" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" @@ -31,6 +32,7 @@ import ( "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" ) // using var instead of const to override in tests @@ -216,11 +218,7 @@ func (m *SubscriptionManager) GetSubscribedParticipants() []livekit.ParticipantI m.lock.RLock() defer m.lock.RUnlock() - var participantIDs []livekit.ParticipantID - for pID := range m.subscribedTo { - participantIDs = append(participantIDs, pID) - } - return participantIDs + return maps.Keys(m.subscribedTo) } func (m *SubscriptionManager) IsSubscribedTo(participantID livekit.ParticipantID) bool { @@ -526,7 +524,7 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error { subTrack, err := track.AddSubscriber(m.params.Participant) if err != nil && !errors.Is(err, errAlreadySubscribed) { // ignore error(s): already subscribed - if !errors.Is(err, ErrTrackNotAttached) && !errors.Is(err, ErrNoReceiver) { + if !utils.ErrorIsOneOf(err, ErrTrackNotAttached, ErrNoReceiver) { // as track resolution could take some time, not logging errors due to waiting for track resolution m.params.Logger.Warnw("add subscriber failed", err, "trackID", trackID) } diff --git a/pkg/rtc/transport/handler.go b/pkg/rtc/transport/handler.go index dbad2f5b6..81ab42c03 100644 --- a/pkg/rtc/transport/handler.go +++ b/pkg/rtc/transport/handler.go @@ -39,6 +39,7 @@ type Handler interface { OnFailed(isShortLived bool) OnTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) OnDataPacket(kind livekit.DataPacket_Kind, data []byte) + OnDataSendError(err error) OnOffer(sd webrtc.SessionDescription) error OnAnswer(sd webrtc.SessionDescription) error OnNegotiationStateChanged(state NegotiationState) @@ -56,6 +57,7 @@ func (h UnimplementedHandler) OnFullyEstablished() func (h UnimplementedHandler) OnFailed(isShortLived bool) {} func (h UnimplementedHandler) OnTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {} func (h UnimplementedHandler) OnDataPacket(kind livekit.DataPacket_Kind, data []byte) {} +func (h UnimplementedHandler) OnDataSendError(err error) {} func (h UnimplementedHandler) OnOffer(sd webrtc.SessionDescription) error { return ErrNoOfferHandler } diff --git a/pkg/rtc/transport/transportfakes/fake_handler.go b/pkg/rtc/transport/transportfakes/fake_handler.go index dc7ad3c3e..0afd8446b 100644 --- a/pkg/rtc/transport/transportfakes/fake_handler.go +++ b/pkg/rtc/transport/transportfakes/fake_handler.go @@ -28,6 +28,11 @@ type FakeHandler struct { arg1 livekit.DataPacket_Kind arg2 []byte } + OnDataSendErrorStub func(error) + onDataSendErrorMutex sync.RWMutex + onDataSendErrorArgsForCall []struct { + arg1 error + } OnFailedStub func(bool) onFailedMutex sync.RWMutex onFailedArgsForCall []struct { @@ -193,6 +198,38 @@ func (fake *FakeHandler) OnDataPacketArgsForCall(i int) (livekit.DataPacket_Kind return argsForCall.arg1, argsForCall.arg2 } +func (fake *FakeHandler) OnDataSendError(arg1 error) { + fake.onDataSendErrorMutex.Lock() + fake.onDataSendErrorArgsForCall = append(fake.onDataSendErrorArgsForCall, struct { + arg1 error + }{arg1}) + stub := fake.OnDataSendErrorStub + fake.recordInvocation("OnDataSendError", []interface{}{arg1}) + fake.onDataSendErrorMutex.Unlock() + if stub != nil { + fake.OnDataSendErrorStub(arg1) + } +} + +func (fake *FakeHandler) OnDataSendErrorCallCount() int { + fake.onDataSendErrorMutex.RLock() + defer fake.onDataSendErrorMutex.RUnlock() + return len(fake.onDataSendErrorArgsForCall) +} + +func (fake *FakeHandler) OnDataSendErrorCalls(stub func(error)) { + fake.onDataSendErrorMutex.Lock() + defer fake.onDataSendErrorMutex.Unlock() + fake.OnDataSendErrorStub = stub +} + +func (fake *FakeHandler) OnDataSendErrorArgsForCall(i int) error { + fake.onDataSendErrorMutex.RLock() + defer fake.onDataSendErrorMutex.RUnlock() + argsForCall := fake.onDataSendErrorArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeHandler) OnFailed(arg1 bool) { fake.onFailedMutex.Lock() fake.onFailedArgsForCall = append(fake.onFailedArgsForCall, struct { @@ -553,6 +590,8 @@ func (fake *FakeHandler) Invocations() map[string][][]interface{} { defer fake.onAnswerMutex.RUnlock() fake.onDataPacketMutex.RLock() defer fake.onDataPacketMutex.RUnlock() + fake.onDataSendErrorMutex.RLock() + defer fake.onDataSendErrorMutex.RUnlock() fake.onFailedMutex.RLock() defer fake.onFailedMutex.RUnlock() fake.onFullyEstablishedMutex.RLock() diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index e7fedf9e2..e3c812e1b 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -15,11 +15,13 @@ package rtc import ( + "io" "math/bits" "sync" "time" "github.com/pion/rtcp" + "github.com/pion/sctp" "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" "github.com/pkg/errors" @@ -36,6 +38,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/pacer" + "github.com/livekit/livekit-server/pkg/telemetry" ) const ( @@ -89,6 +92,7 @@ type TransportManagerParams struct { Logger logger.Logger PublisherHandler transport.Handler SubscriberHandler transport.Handler + DataChannelStats *telemetry.BytesTrackStats } type TransportManager struct { @@ -242,7 +246,19 @@ func (t *TransportManager) RemoveSubscribedTrack(subTrack types.SubscribedTrack) func (t *TransportManager) SendDataPacket(kind livekit.DataPacket_Kind, encoded []byte) error { // downstream data is sent via primary peer connection - return t.getTransport(true).SendDataPacket(kind, encoded) + err := t.getTransport(true).SendDataPacket(kind, encoded) + if err != nil { + if !utils.ErrorIsOneOf(err, io.ErrClosedPipe, sctp.ErrStreamClosed, ErrTransportFailure, ErrDataChannelBufferFull) { + t.params.Logger.Warnw("send data packet error", err) + } + if utils.ErrorIsOneOf(err, sctp.ErrStreamClosed, io.ErrClosedPipe) { + t.params.SubscriberHandler.OnDataSendError(err) + } + } else { + t.params.DataChannelStats.AddBytes(uint64(len(encoded)), true) + } + + return err } func (t *TransportManager) createDataChannelsForSubscriber(pendingDataChannels []*livekit.DataChannelInfo) error { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 31f7196c3..bdf7e68ce 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -404,6 +404,7 @@ type LocalParticipant interface { // OnParticipantUpdate - metadata or permission is updated OnParticipantUpdate(callback func(LocalParticipant)) OnDataPacket(callback func(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)) + OnMetrics(callback func(LocalParticipant, *livekit.DataPacket)) OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool)) OnClose(callback func(LocalParticipant)) OnClaimsChanged(callback func(LocalParticipant)) @@ -442,6 +443,8 @@ type LocalParticipant interface { GetPacer() pacer.Pacer GetDisableSenderReportPassThrough() bool + + HandleMetrics(senderParticipantID livekit.ParticipantID, batch *livekit.MetricsBatch) error } // Room is a container of participants, and can provide room-level actions diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index afe375785..2a3d79874 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -413,6 +413,18 @@ type FakeLocalParticipant struct { handleAnswerArgsForCall []struct { arg1 webrtc.SessionDescription } + HandleMetricsStub func(livekit.ParticipantID, *livekit.MetricsBatch) error + handleMetricsMutex sync.RWMutex + handleMetricsArgsForCall []struct { + arg1 livekit.ParticipantID + arg2 *livekit.MetricsBatch + } + handleMetricsReturns struct { + result1 error + } + handleMetricsReturnsOnCall map[int]struct { + result1 error + } HandleOfferStub func(webrtc.SessionDescription) handleOfferMutex sync.RWMutex handleOfferArgsForCall []struct { @@ -644,6 +656,11 @@ type FakeLocalParticipant struct { onICEConfigChangedArgsForCall []struct { arg1 func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig) } + OnMetricsStub func(func(types.LocalParticipant, *livekit.DataPacket)) + onMetricsMutex sync.RWMutex + onMetricsArgsForCall []struct { + arg1 func(types.LocalParticipant, *livekit.DataPacket) + } OnMigrateStateChangeStub func(func(p types.LocalParticipant, migrateState types.MigrateState)) onMigrateStateChangeMutex sync.RWMutex onMigrateStateChangeArgsForCall []struct { @@ -3146,6 +3163,68 @@ func (fake *FakeLocalParticipant) HandleAnswerArgsForCall(i int) webrtc.SessionD return argsForCall.arg1 } +func (fake *FakeLocalParticipant) HandleMetrics(arg1 livekit.ParticipantID, arg2 *livekit.MetricsBatch) error { + fake.handleMetricsMutex.Lock() + ret, specificReturn := fake.handleMetricsReturnsOnCall[len(fake.handleMetricsArgsForCall)] + fake.handleMetricsArgsForCall = append(fake.handleMetricsArgsForCall, struct { + arg1 livekit.ParticipantID + arg2 *livekit.MetricsBatch + }{arg1, arg2}) + stub := fake.HandleMetricsStub + fakeReturns := fake.handleMetricsReturns + fake.recordInvocation("HandleMetrics", []interface{}{arg1, arg2}) + fake.handleMetricsMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) HandleMetricsCallCount() int { + fake.handleMetricsMutex.RLock() + defer fake.handleMetricsMutex.RUnlock() + return len(fake.handleMetricsArgsForCall) +} + +func (fake *FakeLocalParticipant) HandleMetricsCalls(stub func(livekit.ParticipantID, *livekit.MetricsBatch) error) { + fake.handleMetricsMutex.Lock() + defer fake.handleMetricsMutex.Unlock() + fake.HandleMetricsStub = stub +} + +func (fake *FakeLocalParticipant) HandleMetricsArgsForCall(i int) (livekit.ParticipantID, *livekit.MetricsBatch) { + fake.handleMetricsMutex.RLock() + defer fake.handleMetricsMutex.RUnlock() + argsForCall := fake.handleMetricsArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeLocalParticipant) HandleMetricsReturns(result1 error) { + fake.handleMetricsMutex.Lock() + defer fake.handleMetricsMutex.Unlock() + fake.HandleMetricsStub = nil + fake.handleMetricsReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) HandleMetricsReturnsOnCall(i int, result1 error) { + fake.handleMetricsMutex.Lock() + defer fake.handleMetricsMutex.Unlock() + fake.HandleMetricsStub = nil + if fake.handleMetricsReturnsOnCall == nil { + fake.handleMetricsReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.handleMetricsReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeLocalParticipant) HandleOffer(arg1 webrtc.SessionDescription) { fake.handleOfferMutex.Lock() fake.handleOfferArgsForCall = append(fake.handleOfferArgsForCall, struct { @@ -4419,6 +4498,38 @@ func (fake *FakeLocalParticipant) OnICEConfigChangedArgsForCall(i int) func(part return argsForCall.arg1 } +func (fake *FakeLocalParticipant) OnMetrics(arg1 func(types.LocalParticipant, *livekit.DataPacket)) { + fake.onMetricsMutex.Lock() + fake.onMetricsArgsForCall = append(fake.onMetricsArgsForCall, struct { + arg1 func(types.LocalParticipant, *livekit.DataPacket) + }{arg1}) + stub := fake.OnMetricsStub + fake.recordInvocation("OnMetrics", []interface{}{arg1}) + fake.onMetricsMutex.Unlock() + if stub != nil { + fake.OnMetricsStub(arg1) + } +} + +func (fake *FakeLocalParticipant) OnMetricsCallCount() int { + fake.onMetricsMutex.RLock() + defer fake.onMetricsMutex.RUnlock() + return len(fake.onMetricsArgsForCall) +} + +func (fake *FakeLocalParticipant) OnMetricsCalls(stub func(func(types.LocalParticipant, *livekit.DataPacket))) { + fake.onMetricsMutex.Lock() + defer fake.onMetricsMutex.Unlock() + fake.OnMetricsStub = stub +} + +func (fake *FakeLocalParticipant) OnMetricsArgsForCall(i int) func(types.LocalParticipant, *livekit.DataPacket) { + fake.onMetricsMutex.RLock() + defer fake.onMetricsMutex.RUnlock() + argsForCall := fake.onMetricsArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) OnMigrateStateChange(arg1 func(p types.LocalParticipant, migrateState types.MigrateState)) { fake.onMigrateStateChangeMutex.Lock() fake.onMigrateStateChangeArgsForCall = append(fake.onMigrateStateChangeArgsForCall, struct { @@ -6991,6 +7102,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.getTrailerMutex.RUnlock() fake.handleAnswerMutex.RLock() defer fake.handleAnswerMutex.RUnlock() + fake.handleMetricsMutex.RLock() + defer fake.handleMetricsMutex.RUnlock() fake.handleOfferMutex.RLock() defer fake.handleOfferMutex.RUnlock() fake.handleReceiverReportMutex.RLock() @@ -7047,6 +7160,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.onDataPacketMutex.RUnlock() fake.onICEConfigChangedMutex.RLock() defer fake.onICEConfigChangedMutex.RUnlock() + fake.onMetricsMutex.RLock() + defer fake.onMetricsMutex.RUnlock() fake.onMigrateStateChangeMutex.RLock() defer fake.onMigrateStateChangeMutex.RUnlock() fake.onParticipantUpdateMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index c62886028..99cc44728 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -66,6 +66,8 @@ type iceConfigCacheKey struct { type RoomManager struct { lock sync.RWMutex + baseTime time.Time + config *config.Config rtcConfig *rtc.WebRTCConfig serverInfo *livekit.ServerInfo @@ -95,6 +97,7 @@ type RoomManager struct { } func NewLocalRoomManager( + baseTime time.Time, conf *config.Config, roomStore ObjectStore, currentNode routing.LocalNode, @@ -116,6 +119,7 @@ func NewLocalRoomManager( } r := &RoomManager{ + baseTime: baseTime, config: conf, rtcConfig: rtcConf, currentNode: currentNode, @@ -444,6 +448,7 @@ func (r *RoomManager) StartSession( subscriberAllowPause = *pi.SubscriberAllowPause } participant, err = rtc.NewParticipant(rtc.ParticipantParams{ + BaseTime: r.baseTime, Identity: pi.Identity, Name: pi.Name, SID: sid, @@ -486,6 +491,7 @@ func (r *RoomManager) StartSession( PlayoutDelay: roomInternal.GetPlayoutDelay(), SyncStreams: roomInternal.GetSyncStreams(), ForwardStats: r.forwardStats, + MetricConfig: r.config.Metric, }) if err != nil { return err diff --git a/pkg/service/wire.go b/pkg/service/wire.go index dcabb5da8..8ae08d529 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -20,6 +20,7 @@ package service import ( "fmt" "os" + "time" "github.com/google/wire" "github.com/pion/turn/v2" @@ -45,6 +46,7 @@ import ( func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) { wire.Build( + getTimeNow, getNodeID, createRedisClient, createStore, @@ -122,6 +124,10 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi return nil, nil } +func getTimeNow() time.Time { + return time.Now() +} + func getNodeID(currentNode routing.LocalNode) livekit.NodeID { return livekit.NodeID(currentNode.Id) } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 27c5271d2..fb5d75aea 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -27,6 +27,7 @@ import ( "github.com/redis/go-redis/v9" "gopkg.in/yaml.v3" "os" + "time" ) import ( @@ -124,6 +125,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } + time := getTimeNow() clientConfigurationManager := createClientConfiguration() client, err := agent.NewAgentClient(messageBus) if err != nil { @@ -133,7 +135,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live timedVersionGenerator := utils.NewDefaultTimedVersionGenerator() turnAuthHandler := NewTURNAuthHandler(keyProvider) forwardStats := createForwardStats(conf) - roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, roomAllocator, telemetryService, clientConfigurationManager, client, agentStore, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus, forwardStats) + roomManager, err := NewLocalRoomManager(time, conf, objectStore, currentNode, router, roomAllocator, telemetryService, clientConfigurationManager, client, agentStore, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus, forwardStats) if err != nil { return nil, err } @@ -182,6 +184,10 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi // wire.go: +func getTimeNow() time.Time { + return time.Now() +} + func getNodeID(currentNode routing.LocalNode) livekit.NodeID { return livekit.NodeID(currentNode.Id) } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 9e015360b..e4d60dc36 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -186,6 +186,7 @@ func WithForwardStats(forwardStats *ForwardStats) ReceiverOpts { // NewWebRTCReceiver creates a new webrtc track receiver func NewWebRTCReceiver( receiver *webrtc.RTPReceiver, + baseTime time.Time, track *webrtc.TrackRemote, trackInfo *livekit.TrackInfo, logger logger.Logger, @@ -203,7 +204,6 @@ func NewWebRTCReceiver( onRTCP: onRTCP, isSVC: buffer.IsSvcCodec(track.Codec().MimeType), isRED: buffer.IsRedCodec(track.Codec().MimeType), - baseTime: time.Now(), } for _, opt := range opts { diff --git a/pkg/sfu/utils/owd_estimator.go b/pkg/sfu/utils/owd_estimator.go index 90f5fe1e2..7a5d22b91 100644 --- a/pkg/sfu/utils/owd_estimator.go +++ b/pkg/sfu/utils/owd_estimator.go @@ -173,3 +173,7 @@ func (o *OWDEstimator) Update(senderClockTime time.Time, receiverClockTime time. o.lastSenderClockTime = senderClockTime return o.estimatedPropagationDelay, stepChange } + +func (o *OWDEstimator) EstimatedPropagationDelay() time.Duration { + return o.estimatedPropagationDelay +} diff --git a/test/client/client.go b/test/client/client.go index 9d0105b5d..70f5fb2a0 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -694,7 +694,6 @@ func (c *RTCClient) PublishData(data []byte, kind livekit.DataPacket_Kind) error } dpData, err := proto.Marshal(&livekit.DataPacket{ - Kind: kind, Value: &livekit.DataPacket_User{ User: &livekit.UserPacket{Payload: data}, },