Initial plumbing for metrics. (#2950)

* Initial plumbing for metrics.

This implements
- metrics received from participant.
- callback to room.
- room distributes it to all other participants (excluding the sending
  participant).
- other participants forward to client.
- counting metrics bytes in data channel stats

TODO:
  - recording/processing/batching
  - should recording/processing/batching happen on publisher side or
    subscriber side?
  - should metrics be echoed back to publisher?
  - grants to publish/subscribe metrics.

* mage generate

* clear OnMetrics on close

* - CanSubscribeMetrics permission.
- Echo back to sender.

* update deps

* No destination identities for metrics

* WIP

* use normalized timestamp for server injected timestamps

* compile

* debug log metrics batch

* correct comment

* add baseTime to wire

* protocol dep

* Scope metrics forwarding to only participants that a participant is
subscribed to.

Also remove the participant_metrics.go file as it was not doing anything
useful.

* update comment

* utils.ErrorIsOneOf

* couple of more utils.CloneProto
This commit is contained in:
Raja Subramanian
2024-09-19 11:42:31 +05:30
committed by GitHub
parent 4deaac2f3f
commit 7df6f86693
23 changed files with 488 additions and 55 deletions
+1 -1
View File
@@ -137,6 +137,6 @@ require (
golang.org/x/tools v0.25.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.66.1 // indirect
google.golang.org/grpc v1.66.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+2 -2
View File
@@ -488,8 +488,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 h1:
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117/go.mod h1:OimBR/bc1wPO9iV4NC2bpyjy3VnAwZh5EBPQdtaE5oo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.66.1 h1:hO5qAXR19+/Z44hmvIM4dQFMSYX9XcWsByfoxutBpAM=
google.golang.org/grpc v1.66.1/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo=
google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
+6 -2
View File
@@ -27,6 +27,7 @@ import (
"github.com/urfave/cli/v2"
"gopkg.in/yaml.v3"
"github.com/livekit/livekit-server/pkg/metric"
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -85,6 +86,8 @@ type Config struct {
Limit LimitConfig `yaml:"limit,omitempty"`
Development bool `yaml:"development,omitempty"`
Metric metric.MetricConfig `yaml:"metric,omitempty"`
}
type RTCConfig struct {
@@ -572,8 +575,9 @@ var DefaultConfig = Config{
StreamBufferSize: 1000,
ConnectAttempts: 3,
},
PSRPC: rpc.DefaultPSRPCConfig,
Keys: map[string]string{},
PSRPC: rpc.DefaultPSRPCConfig,
Keys: map[string]string{},
Metric: metric.DefaultMetricConfig,
}
func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []cli.Flag) (*Config, error) {
+27
View File
@@ -0,0 +1,27 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
// ------------------------------------------------
type MetricConfig struct {
Timestamper MetricTimestamperConfig `yaml:"timestamper_config,omitempty"`
}
var (
DefaultMetricConfig = MetricConfig{
Timestamper: DefaultMetricTimestamperConfig,
}
)
+116
View File
@@ -0,0 +1,116 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"sync"
"time"
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"google.golang.org/protobuf/types/known/timestamppb"
)
// ------------------------------------------------
type MetricTimestamperConfig struct {
OneWayDelayEstimatorMinInterval time.Duration `yaml:"one_way_delay_estimator_min_interval,omitempty"`
OneWayDelayEstimatorMaxBatch int `yaml:"one_way_delay_estimator_max_batch,omitempty"`
}
var (
DefaultMetricTimestamperConfig = MetricTimestamperConfig{
OneWayDelayEstimatorMinInterval: 5 * time.Second,
OneWayDelayEstimatorMaxBatch: 100,
}
)
// ------------------------------------------------
type MetricTimestamperParams struct {
Config MetricTimestamperConfig
BaseTime time.Time
Logger logger.Logger
}
type MetricTimestamper struct {
params MetricTimestamperParams
lock sync.Mutex
owdEstimator *utils.OWDEstimator
lastOWDEstimatorRunAt time.Time
batchesSinceLastOWDEstimatorRun int
}
func NewMetricTimestamper(params MetricTimestamperParams) *MetricTimestamper {
return &MetricTimestamper{
params: params,
owdEstimator: utils.NewOWDEstimator(utils.OWDEstimatorParamsDefault),
lastOWDEstimatorRunAt: time.Now().Add(-params.Config.OneWayDelayEstimatorMinInterval),
}
}
func (m *MetricTimestamper) Process(batch *livekit.MetricsBatch) {
// run OWD estimation periodically
estimatedOWDNanos := m.maybeRunOWDEstimator(batch)
// normalize all time stamps and add estimated OWD
// NOTE: all timestamps will be re-mapped. If the time series or event happened some time
// in the past and the OWD estimation has changed since, those samples will get the updated
// OWD estimation applied. So, they may have more uncertainty in addition to the uncertainty
// of OWD estimation process.
batch.NormalizedTimestamp = timestamppb.New(time.Unix(0, batch.TimestampMs*1e6+estimatedOWDNanos))
for _, ts := range batch.TimeSeries {
for _, sample := range ts.Samples {
sample.NormalizedTimestamp = timestamppb.New(time.Unix(0, sample.TimestampMs*1e6+estimatedOWDNanos))
}
}
for _, ev := range batch.Events {
ev.NormalizedStartTimestamp = timestamppb.New(time.Unix(0, ev.StartTimestampMs*1e6+estimatedOWDNanos))
endTimestampMs := ev.GetEndTimestampMs()
if endTimestampMs != 0 {
ev.NormalizedEndTimestamp = timestamppb.New(time.Unix(0, endTimestampMs*1e6+estimatedOWDNanos))
}
}
m.params.Logger.Debugw("timestamped metrics batch", "batch", logger.Proto(batch))
}
func (m *MetricTimestamper) maybeRunOWDEstimator(batch *livekit.MetricsBatch) int64 {
m.lock.Lock()
defer m.lock.Unlock()
if time.Since(m.lastOWDEstimatorRunAt) < m.params.Config.OneWayDelayEstimatorMinInterval &&
m.batchesSinceLastOWDEstimatorRun < m.params.Config.OneWayDelayEstimatorMaxBatch {
m.batchesSinceLastOWDEstimatorRun++
return m.owdEstimator.EstimatedPropagationDelay().Nanoseconds()
}
senderClockTime := batch.GetTimestampMs()
if senderClockTime == 0 {
m.batchesSinceLastOWDEstimatorRun++
return m.owdEstimator.EstimatedPropagationDelay().Nanoseconds()
}
m.lastOWDEstimatorRunAt = time.Now()
m.batchesSinceLastOWDEstimatorRun = 1
at := m.params.BaseTime.Add(time.Since(m.params.BaseTime))
estimatedOWD, _ := m.owdEstimator.Update(time.UnixMilli(senderClockTime), at)
return estimatedOWD.Nanoseconds()
}
+2 -2
View File
@@ -20,10 +20,10 @@ import (
"time"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)
var _ Router = (*LocalRouter)(nil)
@@ -58,7 +58,7 @@ func NewLocalRouter(
func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error) {
r.lock.Lock()
defer r.lock.Unlock()
node := proto.Clone((*livekit.Node)(r.currentNode)).(*livekit.Node)
node := utils.CloneProto((*livekit.Node)(r.currentNode))
return node, nil
}
+2
View File
@@ -42,4 +42,6 @@ var (
ErrTrackNotAttached = errors.New("track is not yet attached")
ErrTrackNotBound = errors.New("track not bound")
ErrSubscriptionLimitExceeded = errors.New("participant has exceeded its subscription limit")
ErrNoSubscribeMetricsPermission = errors.New("participant is not given permission to subscribe to metrics")
)
+2
View File
@@ -56,6 +56,7 @@ type MediaTrack struct {
}
type MediaTrackParams struct {
BaseTime time.Time
SignalCid string
SdpCid string
ParticipantID livekit.ParticipantID
@@ -275,6 +276,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
newWR := sfu.NewWebRTCReceiver(
receiver,
t.params.BaseTime,
track,
ti,
LoggerWithCodecMime(t.params.Logger, mime),
+1 -1
View File
@@ -712,7 +712,7 @@ func (t *MediaTrackReceiver) TrackInfo() *livekit.TrackInfo {
}
func (t *MediaTrackReceiver) TrackInfoClone() *livekit.TrackInfo {
return proto.Clone(t.TrackInfo()).(*livekit.TrackInfo)
return utils.CloneProto(t.TrackInfo())
}
func (t *MediaTrackReceiver) NotifyMaxLayerChange(maxLayer int32) {
+107 -25
View File
@@ -17,7 +17,6 @@ package rtc
import (
"context"
"fmt"
"io"
"os"
"slices"
"strconv"
@@ -27,7 +26,6 @@ import (
lru "github.com/hashicorp/golang-lru/v2"
"github.com/pion/rtcp"
"github.com/pion/sctp"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
"github.com/pkg/errors"
@@ -42,6 +40,7 @@ import (
"github.com/livekit/protocol/utils/guid"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/metric"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/supervisor"
"github.com/livekit/livekit-server/pkg/rtc/transport"
@@ -109,6 +108,7 @@ func (p participantUpdateInfo) String() string {
// ---------------------------------------------------------------
type ParticipantParams struct {
BaseTime time.Time
Identity livekit.ParticipantIdentity
Name livekit.ParticipantName
SID livekit.ParticipantID
@@ -157,6 +157,7 @@ type ParticipantParams struct {
SyncStreams bool
ForwardStats *sfu.ForwardStats
DisableSenderReportPassThrough bool
MetricConfig metric.MetricConfig
}
type ParticipantImpl struct {
@@ -234,6 +235,7 @@ type ParticipantImpl struct {
onMigrateStateChange func(p types.LocalParticipant, migrateState types.MigrateState)
onParticipantUpdate func(types.LocalParticipant)
onDataPacket func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)
onMetrics func(types.LocalParticipant, *livekit.DataPacket)
migrateState atomic.Value // types.MigrateState
@@ -248,6 +250,8 @@ type ParticipantImpl struct {
tracksQuality map[livekit.TrackID]livekit.ConnectionQuality
metricTimestamper *metric.MetricTimestamper
// loggers for publisher and subscriber
pubLogger logger.Logger
subLogger logger.Logger
@@ -279,10 +283,16 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
dataChannelStats: telemetry.NewBytesTrackStats(
telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, params.SID),
params.SID,
params.Telemetry),
params.Telemetry,
),
tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality),
pubLogger: params.Logger.WithComponent(sutils.ComponentPub),
subLogger: params.Logger.WithComponent(sutils.ComponentSub),
metricTimestamper: metric.NewMetricTimestamper(metric.MetricTimestamperParams{
Config: params.MetricConfig.Timestamper,
BaseTime: params.BaseTime,
Logger: params.Logger,
}),
pubLogger: params.Logger.WithComponent(sutils.ComponentPub),
subLogger: params.Logger.WithComponent(sutils.ComponentSub),
}
if !params.DisableSupervisor {
p.supervisor = supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger})
@@ -749,6 +759,24 @@ func (p *ParticipantImpl) OnDataPacket(callback func(types.LocalParticipant, liv
p.lock.Unlock()
}
func (p *ParticipantImpl) getOnDataPacket() func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onDataPacket
}
func (p *ParticipantImpl) OnMetrics(callback func(types.LocalParticipant, *livekit.DataPacket)) {
p.lock.Lock()
p.onMetrics = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) getOnMetrics() func(types.LocalParticipant, *livekit.DataPacket) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onMetrics
}
func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant)) {
p.lock.Lock()
p.onClose = callback
@@ -1201,6 +1229,10 @@ func (p *ParticipantImpl) Hidden() bool {
return p.grants.Load().Video.Hidden
}
func (p *ParticipantImpl) CanSubscribeMetrics() bool {
return p.grants.Load().Video.GetCanSubscribeMetrics()
}
func (p *ParticipantImpl) VerifySubscribeParticipantInfo(pID livekit.ParticipantID, version uint32) {
if !p.IsReady() {
// we have not sent a JoinResponse yet. metadata would be covered in JoinResponse
@@ -1333,6 +1365,10 @@ func (h SubscriberTransportHandler) OnInitialConnected() {
h.p.onSubscriberInitialConnected()
}
func (h SubscriberTransportHandler) OnDataSendError(err error) {
h.p.onDataSendError(err)
}
// ----------------------------------------------------------
type PrimaryTransportHandler struct {
@@ -1391,6 +1427,7 @@ func (p *ParticipantImpl) setupTransportManager() error {
Logger: p.params.Logger.WithComponent(sutils.ComponentTransport),
PublisherHandler: pth,
SubscriberHandler: sth,
DataChannelStats: p.dataChannelStats,
}
if p.params.SyncStreams && p.params.PlayoutDelay.GetEnabled() && p.params.ClientInfo.isFirefox() {
// we will disable playout delay for Firefox if the user is expecting
@@ -1601,7 +1638,9 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt
dp.ParticipantIdentity = string(p.params.Identity)
}
shouldForward := false
shouldForwardData := true
shouldForwardMetrics := false
isPublisher := true
// only forward on user payloads
switch payload := dp.Value.(type) {
case *livekit.DataPacket_User:
@@ -1623,28 +1662,42 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt
} else {
dp.DestinationIdentities = u.DestinationIdentities
}
shouldForward = true
case *livekit.DataPacket_SipDtmf:
shouldForward = true
case *livekit.DataPacket_Transcription:
if p.Kind() == livekit.ParticipantInfo_AGENT {
shouldForward = true
shouldForwardData = false
}
case *livekit.DataPacket_ChatMessage:
shouldForward = true
shouldForwardData = true
case *livekit.DataPacket_Metrics:
shouldForwardData = false
shouldForwardMetrics = true
isPublisher = false
// METRICS-TODO-QUESTIONS:
// 1. Should this record (and do processing/batching) metrics (i. e. publisher side) rather
// than forwarding and recording/processing/batching at every subscriber (in this case
// subscriber is defined as the other participants pushing this to edge client).
// 2. If the above is done, there could be two cadences, publisher side recording/processing/batching
// and pushing it to all subscribers on some cadence and subscribers have their own cadence of
// processing/batching and sending to edge clients.
p.metricTimestamper.Process(payload.Metrics)
default:
p.pubLogger.Warnw("received unsupported data packet", nil, "payload", payload)
}
if shouldForward {
p.lock.RLock()
onDataPacket := p.onDataPacket
p.lock.RUnlock()
if onDataPacket != nil {
if shouldForwardData {
if onDataPacket := p.getOnDataPacket(); onDataPacket != nil {
onDataPacket(p, kind, dp)
}
}
if shouldForwardMetrics {
if onMetrics := p.getOnMetrics(); onMetrics != nil {
onMetrics(p, dp)
}
}
p.setIsPublisher(true)
if isPublisher {
p.setIsPublisher(true)
}
}
func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) error {
@@ -2194,6 +2247,7 @@ func (p *ParticipantImpl) addMigratedTrack(cid string, ti *livekit.TrackInfo) *M
func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *livekit.TrackInfo) *MediaTrack {
mt := NewMediaTrack(MediaTrackParams{
BaseTime: p.params.BaseTime,
SignalCid: signalCid,
SdpCid: sdpCid,
ParticipantID: p.params.SID,
@@ -2651,16 +2705,14 @@ func (p *ParticipantImpl) SendDataPacket(kind livekit.DataPacket_Kind, encoded [
return ErrDataChannelUnavailable
}
err := p.TransportManager.SendDataPacket(kind, encoded)
if err != nil {
if (errors.Is(err, sctp.ErrStreamClosed) || errors.Is(err, io.ErrClosedPipe)) && p.params.ReconnectOnDataChannelError {
p.params.Logger.Infow("issuing full reconnect on data channel error", "error", err)
p.IssueFullReconnect(types.ParticipantCloseReasonDataChannelError)
}
} else {
p.dataChannelStats.AddBytes(uint64(len(encoded)), true)
return p.TransportManager.SendDataPacket(kind, encoded)
}
func (p *ParticipantImpl) onDataSendError(err error) {
if p.params.ReconnectOnDataChannelError {
p.params.Logger.Infow("issuing full reconnect on data channel error", "error", err)
p.IssueFullReconnect(types.ParticipantCloseReasonDataChannelError)
}
return err
}
func (p *ParticipantImpl) setupEnabledCodecs(publishEnabledCodecs []*livekit.Codec, subscribeEnabledCodecs []*livekit.Codec, disabledCodecs *livekit.DisabledCodecs) {
@@ -2769,3 +2821,33 @@ func (p *ParticipantImpl) UpdateVideoTrack(update *livekit.UpdateLocalVideoTrack
p.pubLogger.Debugw("could not locate track", "trackID", update.TrackSid)
return errors.New("could not find track")
}
func (p *ParticipantImpl) HandleMetrics(senderParticipantID livekit.ParticipantID, metrics *livekit.MetricsBatch) error {
if p.State() != livekit.ParticipantInfo_ACTIVE {
return ErrDataChannelUnavailable
}
if !p.CanSubscribeMetrics() {
return ErrNoSubscribeMetricsPermission
}
if !p.SubscriptionManager.IsSubscribedTo(senderParticipantID) {
return nil
}
// METRICS-TODO: This is just forwarding. Will have to do more, including but not limited to
// 1. Filtering: subscriber metrics from self only should be sent to that participant.
// 2. Batching: could include re-mapping labels to consolidate multiple batches.
// 3. (Maybe) Time stamps: this is done on receive, TBD if required here also
dpData, err := proto.Marshal(&livekit.DataPacket{
Value: &livekit.DataPacket_Metrics{
Metrics: metrics,
},
})
if err != nil {
p.params.Logger.Errorw("failed to marshal data packet", err)
return err
}
return p.TransportManager.SendDataPacket(livekit.DataPacket_RELIABLE, dpData)
}
+18 -12
View File
@@ -16,9 +16,7 @@ package rtc
import (
"context"
"errors"
"fmt"
"io"
"math"
"slices"
"sort"
@@ -30,8 +28,6 @@ import (
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"
"github.com/pion/sctp"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
@@ -482,6 +478,7 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me
participant.OnTrackUnpublished(r.onTrackUnpublished)
participant.OnParticipantUpdate(r.onParticipantUpdate)
participant.OnDataPacket(r.onDataPacket)
participant.OnMetrics(r.onMetrics)
participant.OnSubscribeStatusChanged(func(publisherID livekit.ParticipantID, subscribed bool) {
if subscribed {
pub := r.GetParticipantByID(publisherID)
@@ -726,6 +723,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek
p.OnStateChange(nil)
p.OnParticipantUpdate(nil)
p.OnDataPacket(nil)
p.OnMetrics(nil)
p.OnSubscribeStatusChanged(nil)
// close participant as well
@@ -1272,6 +1270,10 @@ func (r *Room) onDataPacket(source types.LocalParticipant, kind livekit.DataPack
BroadcastDataPacketForRoom(r, source, kind, dp, r.Logger)
}
func (r *Room) onMetrics(source types.LocalParticipant, dp *livekit.DataPacket) {
BroadcastMetricsForRoom(r, source, dp, r.Logger)
}
func (r *Room) subscribeToExistingTracks(p types.LocalParticipant) {
r.lock.RLock()
shouldSubscribe := r.autoSubscribe(p)
@@ -1769,9 +1771,6 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, kin
var dpData []byte
for _, op := range participants {
if op.State() != livekit.ParticipantInfo_ACTIVE {
continue
}
if source != nil && op.ID() == source.ID() {
continue
}
@@ -1792,14 +1791,21 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, kin
}
utils.ParallelExec(destParticipants, dataForwardLoadBalanceThreshold, 1, func(op types.LocalParticipant) {
err := op.SendDataPacket(kind, dpData)
if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, sctp.ErrStreamClosed) &&
!errors.Is(err, ErrTransportFailure) && !errors.Is(err, ErrDataChannelBufferFull) {
op.GetLogger().Infow("send data packet error", "error", err)
}
op.SendDataPacket(kind, dpData)
})
}
func BroadcastMetricsForRoom(r types.Room, source types.LocalParticipant, dp *livekit.DataPacket, logger logger.Logger) {
switch payload := dp.Value.(type) {
case *livekit.DataPacket_Metrics:
utils.ParallelExec(r.GetLocalParticipants(), dataForwardLoadBalanceThreshold, 1, func(op types.LocalParticipant) {
// echoing back to sender too
op.HandleMetrics(source.ID(), payload.Metrics)
})
default:
}
}
func IsCloseNotifySkippable(closeReason types.ParticipantCloseReason) bool {
return closeReason == types.ParticipantCloseReasonDuplicateIdentity
}
+4 -6
View File
@@ -24,6 +24,7 @@ import (
"github.com/pion/webrtc/v3/pkg/rtcerr"
"go.uber.org/atomic"
"golang.org/x/exp/maps"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
@@ -31,6 +32,7 @@ import (
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)
// using var instead of const to override in tests
@@ -216,11 +218,7 @@ func (m *SubscriptionManager) GetSubscribedParticipants() []livekit.ParticipantI
m.lock.RLock()
defer m.lock.RUnlock()
var participantIDs []livekit.ParticipantID
for pID := range m.subscribedTo {
participantIDs = append(participantIDs, pID)
}
return participantIDs
return maps.Keys(m.subscribedTo)
}
func (m *SubscriptionManager) IsSubscribedTo(participantID livekit.ParticipantID) bool {
@@ -526,7 +524,7 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
subTrack, err := track.AddSubscriber(m.params.Participant)
if err != nil && !errors.Is(err, errAlreadySubscribed) {
// ignore error(s): already subscribed
if !errors.Is(err, ErrTrackNotAttached) && !errors.Is(err, ErrNoReceiver) {
if !utils.ErrorIsOneOf(err, ErrTrackNotAttached, ErrNoReceiver) {
// as track resolution could take some time, not logging errors due to waiting for track resolution
m.params.Logger.Warnw("add subscriber failed", err, "trackID", trackID)
}
+2
View File
@@ -39,6 +39,7 @@ type Handler interface {
OnFailed(isShortLived bool)
OnTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver)
OnDataPacket(kind livekit.DataPacket_Kind, data []byte)
OnDataSendError(err error)
OnOffer(sd webrtc.SessionDescription) error
OnAnswer(sd webrtc.SessionDescription) error
OnNegotiationStateChanged(state NegotiationState)
@@ -56,6 +57,7 @@ func (h UnimplementedHandler) OnFullyEstablished()
func (h UnimplementedHandler) OnFailed(isShortLived bool) {}
func (h UnimplementedHandler) OnTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {}
func (h UnimplementedHandler) OnDataPacket(kind livekit.DataPacket_Kind, data []byte) {}
func (h UnimplementedHandler) OnDataSendError(err error) {}
func (h UnimplementedHandler) OnOffer(sd webrtc.SessionDescription) error {
return ErrNoOfferHandler
}
@@ -28,6 +28,11 @@ type FakeHandler struct {
arg1 livekit.DataPacket_Kind
arg2 []byte
}
OnDataSendErrorStub func(error)
onDataSendErrorMutex sync.RWMutex
onDataSendErrorArgsForCall []struct {
arg1 error
}
OnFailedStub func(bool)
onFailedMutex sync.RWMutex
onFailedArgsForCall []struct {
@@ -193,6 +198,38 @@ func (fake *FakeHandler) OnDataPacketArgsForCall(i int) (livekit.DataPacket_Kind
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeHandler) OnDataSendError(arg1 error) {
fake.onDataSendErrorMutex.Lock()
fake.onDataSendErrorArgsForCall = append(fake.onDataSendErrorArgsForCall, struct {
arg1 error
}{arg1})
stub := fake.OnDataSendErrorStub
fake.recordInvocation("OnDataSendError", []interface{}{arg1})
fake.onDataSendErrorMutex.Unlock()
if stub != nil {
fake.OnDataSendErrorStub(arg1)
}
}
func (fake *FakeHandler) OnDataSendErrorCallCount() int {
fake.onDataSendErrorMutex.RLock()
defer fake.onDataSendErrorMutex.RUnlock()
return len(fake.onDataSendErrorArgsForCall)
}
func (fake *FakeHandler) OnDataSendErrorCalls(stub func(error)) {
fake.onDataSendErrorMutex.Lock()
defer fake.onDataSendErrorMutex.Unlock()
fake.OnDataSendErrorStub = stub
}
func (fake *FakeHandler) OnDataSendErrorArgsForCall(i int) error {
fake.onDataSendErrorMutex.RLock()
defer fake.onDataSendErrorMutex.RUnlock()
argsForCall := fake.onDataSendErrorArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeHandler) OnFailed(arg1 bool) {
fake.onFailedMutex.Lock()
fake.onFailedArgsForCall = append(fake.onFailedArgsForCall, struct {
@@ -553,6 +590,8 @@ func (fake *FakeHandler) Invocations() map[string][][]interface{} {
defer fake.onAnswerMutex.RUnlock()
fake.onDataPacketMutex.RLock()
defer fake.onDataPacketMutex.RUnlock()
fake.onDataSendErrorMutex.RLock()
defer fake.onDataSendErrorMutex.RUnlock()
fake.onFailedMutex.RLock()
defer fake.onFailedMutex.RUnlock()
fake.onFullyEstablishedMutex.RLock()
+17 -1
View File
@@ -15,11 +15,13 @@
package rtc
import (
"io"
"math/bits"
"sync"
"time"
"github.com/pion/rtcp"
"github.com/pion/sctp"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
"github.com/pkg/errors"
@@ -36,6 +38,7 @@ import (
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/pacer"
"github.com/livekit/livekit-server/pkg/telemetry"
)
const (
@@ -89,6 +92,7 @@ type TransportManagerParams struct {
Logger logger.Logger
PublisherHandler transport.Handler
SubscriberHandler transport.Handler
DataChannelStats *telemetry.BytesTrackStats
}
type TransportManager struct {
@@ -242,7 +246,19 @@ func (t *TransportManager) RemoveSubscribedTrack(subTrack types.SubscribedTrack)
func (t *TransportManager) SendDataPacket(kind livekit.DataPacket_Kind, encoded []byte) error {
// downstream data is sent via primary peer connection
return t.getTransport(true).SendDataPacket(kind, encoded)
err := t.getTransport(true).SendDataPacket(kind, encoded)
if err != nil {
if !utils.ErrorIsOneOf(err, io.ErrClosedPipe, sctp.ErrStreamClosed, ErrTransportFailure, ErrDataChannelBufferFull) {
t.params.Logger.Warnw("send data packet error", err)
}
if utils.ErrorIsOneOf(err, sctp.ErrStreamClosed, io.ErrClosedPipe) {
t.params.SubscriberHandler.OnDataSendError(err)
}
} else {
t.params.DataChannelStats.AddBytes(uint64(len(encoded)), true)
}
return err
}
func (t *TransportManager) createDataChannelsForSubscriber(pendingDataChannels []*livekit.DataChannelInfo) error {
+3
View File
@@ -404,6 +404,7 @@ type LocalParticipant interface {
// OnParticipantUpdate - metadata or permission is updated
OnParticipantUpdate(callback func(LocalParticipant))
OnDataPacket(callback func(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket))
OnMetrics(callback func(LocalParticipant, *livekit.DataPacket))
OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool))
OnClose(callback func(LocalParticipant))
OnClaimsChanged(callback func(LocalParticipant))
@@ -442,6 +443,8 @@ type LocalParticipant interface {
GetPacer() pacer.Pacer
GetDisableSenderReportPassThrough() bool
HandleMetrics(senderParticipantID livekit.ParticipantID, batch *livekit.MetricsBatch) error
}
// Room is a container of participants, and can provide room-level actions
@@ -413,6 +413,18 @@ type FakeLocalParticipant struct {
handleAnswerArgsForCall []struct {
arg1 webrtc.SessionDescription
}
HandleMetricsStub func(livekit.ParticipantID, *livekit.MetricsBatch) error
handleMetricsMutex sync.RWMutex
handleMetricsArgsForCall []struct {
arg1 livekit.ParticipantID
arg2 *livekit.MetricsBatch
}
handleMetricsReturns struct {
result1 error
}
handleMetricsReturnsOnCall map[int]struct {
result1 error
}
HandleOfferStub func(webrtc.SessionDescription)
handleOfferMutex sync.RWMutex
handleOfferArgsForCall []struct {
@@ -644,6 +656,11 @@ type FakeLocalParticipant struct {
onICEConfigChangedArgsForCall []struct {
arg1 func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig)
}
OnMetricsStub func(func(types.LocalParticipant, *livekit.DataPacket))
onMetricsMutex sync.RWMutex
onMetricsArgsForCall []struct {
arg1 func(types.LocalParticipant, *livekit.DataPacket)
}
OnMigrateStateChangeStub func(func(p types.LocalParticipant, migrateState types.MigrateState))
onMigrateStateChangeMutex sync.RWMutex
onMigrateStateChangeArgsForCall []struct {
@@ -3146,6 +3163,68 @@ func (fake *FakeLocalParticipant) HandleAnswerArgsForCall(i int) webrtc.SessionD
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) HandleMetrics(arg1 livekit.ParticipantID, arg2 *livekit.MetricsBatch) error {
fake.handleMetricsMutex.Lock()
ret, specificReturn := fake.handleMetricsReturnsOnCall[len(fake.handleMetricsArgsForCall)]
fake.handleMetricsArgsForCall = append(fake.handleMetricsArgsForCall, struct {
arg1 livekit.ParticipantID
arg2 *livekit.MetricsBatch
}{arg1, arg2})
stub := fake.HandleMetricsStub
fakeReturns := fake.handleMetricsReturns
fake.recordInvocation("HandleMetrics", []interface{}{arg1, arg2})
fake.handleMetricsMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) HandleMetricsCallCount() int {
fake.handleMetricsMutex.RLock()
defer fake.handleMetricsMutex.RUnlock()
return len(fake.handleMetricsArgsForCall)
}
func (fake *FakeLocalParticipant) HandleMetricsCalls(stub func(livekit.ParticipantID, *livekit.MetricsBatch) error) {
fake.handleMetricsMutex.Lock()
defer fake.handleMetricsMutex.Unlock()
fake.HandleMetricsStub = stub
}
func (fake *FakeLocalParticipant) HandleMetricsArgsForCall(i int) (livekit.ParticipantID, *livekit.MetricsBatch) {
fake.handleMetricsMutex.RLock()
defer fake.handleMetricsMutex.RUnlock()
argsForCall := fake.handleMetricsArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeLocalParticipant) HandleMetricsReturns(result1 error) {
fake.handleMetricsMutex.Lock()
defer fake.handleMetricsMutex.Unlock()
fake.HandleMetricsStub = nil
fake.handleMetricsReturns = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) HandleMetricsReturnsOnCall(i int, result1 error) {
fake.handleMetricsMutex.Lock()
defer fake.handleMetricsMutex.Unlock()
fake.HandleMetricsStub = nil
if fake.handleMetricsReturnsOnCall == nil {
fake.handleMetricsReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.handleMetricsReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) HandleOffer(arg1 webrtc.SessionDescription) {
fake.handleOfferMutex.Lock()
fake.handleOfferArgsForCall = append(fake.handleOfferArgsForCall, struct {
@@ -4419,6 +4498,38 @@ func (fake *FakeLocalParticipant) OnICEConfigChangedArgsForCall(i int) func(part
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnMetrics(arg1 func(types.LocalParticipant, *livekit.DataPacket)) {
fake.onMetricsMutex.Lock()
fake.onMetricsArgsForCall = append(fake.onMetricsArgsForCall, struct {
arg1 func(types.LocalParticipant, *livekit.DataPacket)
}{arg1})
stub := fake.OnMetricsStub
fake.recordInvocation("OnMetrics", []interface{}{arg1})
fake.onMetricsMutex.Unlock()
if stub != nil {
fake.OnMetricsStub(arg1)
}
}
func (fake *FakeLocalParticipant) OnMetricsCallCount() int {
fake.onMetricsMutex.RLock()
defer fake.onMetricsMutex.RUnlock()
return len(fake.onMetricsArgsForCall)
}
func (fake *FakeLocalParticipant) OnMetricsCalls(stub func(func(types.LocalParticipant, *livekit.DataPacket))) {
fake.onMetricsMutex.Lock()
defer fake.onMetricsMutex.Unlock()
fake.OnMetricsStub = stub
}
func (fake *FakeLocalParticipant) OnMetricsArgsForCall(i int) func(types.LocalParticipant, *livekit.DataPacket) {
fake.onMetricsMutex.RLock()
defer fake.onMetricsMutex.RUnlock()
argsForCall := fake.onMetricsArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnMigrateStateChange(arg1 func(p types.LocalParticipant, migrateState types.MigrateState)) {
fake.onMigrateStateChangeMutex.Lock()
fake.onMigrateStateChangeArgsForCall = append(fake.onMigrateStateChangeArgsForCall, struct {
@@ -6991,6 +7102,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.getTrailerMutex.RUnlock()
fake.handleAnswerMutex.RLock()
defer fake.handleAnswerMutex.RUnlock()
fake.handleMetricsMutex.RLock()
defer fake.handleMetricsMutex.RUnlock()
fake.handleOfferMutex.RLock()
defer fake.handleOfferMutex.RUnlock()
fake.handleReceiverReportMutex.RLock()
@@ -7047,6 +7160,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.onDataPacketMutex.RUnlock()
fake.onICEConfigChangedMutex.RLock()
defer fake.onICEConfigChangedMutex.RUnlock()
fake.onMetricsMutex.RLock()
defer fake.onMetricsMutex.RUnlock()
fake.onMigrateStateChangeMutex.RLock()
defer fake.onMigrateStateChangeMutex.RUnlock()
fake.onParticipantUpdateMutex.RLock()
+6
View File
@@ -66,6 +66,8 @@ type iceConfigCacheKey struct {
type RoomManager struct {
lock sync.RWMutex
baseTime time.Time
config *config.Config
rtcConfig *rtc.WebRTCConfig
serverInfo *livekit.ServerInfo
@@ -95,6 +97,7 @@ type RoomManager struct {
}
func NewLocalRoomManager(
baseTime time.Time,
conf *config.Config,
roomStore ObjectStore,
currentNode routing.LocalNode,
@@ -116,6 +119,7 @@ func NewLocalRoomManager(
}
r := &RoomManager{
baseTime: baseTime,
config: conf,
rtcConfig: rtcConf,
currentNode: currentNode,
@@ -444,6 +448,7 @@ func (r *RoomManager) StartSession(
subscriberAllowPause = *pi.SubscriberAllowPause
}
participant, err = rtc.NewParticipant(rtc.ParticipantParams{
BaseTime: r.baseTime,
Identity: pi.Identity,
Name: pi.Name,
SID: sid,
@@ -486,6 +491,7 @@ func (r *RoomManager) StartSession(
PlayoutDelay: roomInternal.GetPlayoutDelay(),
SyncStreams: roomInternal.GetSyncStreams(),
ForwardStats: r.forwardStats,
MetricConfig: r.config.Metric,
})
if err != nil {
return err
+6
View File
@@ -20,6 +20,7 @@ package service
import (
"fmt"
"os"
"time"
"github.com/google/wire"
"github.com/pion/turn/v2"
@@ -45,6 +46,7 @@ import (
func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) {
wire.Build(
getTimeNow,
getNodeID,
createRedisClient,
createStore,
@@ -122,6 +124,10 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi
return nil, nil
}
func getTimeNow() time.Time {
return time.Now()
}
func getNodeID(currentNode routing.LocalNode) livekit.NodeID {
return livekit.NodeID(currentNode.Id)
}
+7 -1
View File
@@ -27,6 +27,7 @@ import (
"github.com/redis/go-redis/v9"
"gopkg.in/yaml.v3"
"os"
"time"
)
import (
@@ -124,6 +125,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
time := getTimeNow()
clientConfigurationManager := createClientConfiguration()
client, err := agent.NewAgentClient(messageBus)
if err != nil {
@@ -133,7 +135,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
timedVersionGenerator := utils.NewDefaultTimedVersionGenerator()
turnAuthHandler := NewTURNAuthHandler(keyProvider)
forwardStats := createForwardStats(conf)
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, roomAllocator, telemetryService, clientConfigurationManager, client, agentStore, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus, forwardStats)
roomManager, err := NewLocalRoomManager(time, conf, objectStore, currentNode, router, roomAllocator, telemetryService, clientConfigurationManager, client, agentStore, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus, forwardStats)
if err != nil {
return nil, err
}
@@ -182,6 +184,10 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi
// wire.go:
func getTimeNow() time.Time {
return time.Now()
}
func getNodeID(currentNode routing.LocalNode) livekit.NodeID {
return livekit.NodeID(currentNode.Id)
}
+1 -1
View File
@@ -186,6 +186,7 @@ func WithForwardStats(forwardStats *ForwardStats) ReceiverOpts {
// NewWebRTCReceiver creates a new webrtc track receiver
func NewWebRTCReceiver(
receiver *webrtc.RTPReceiver,
baseTime time.Time,
track *webrtc.TrackRemote,
trackInfo *livekit.TrackInfo,
logger logger.Logger,
@@ -203,7 +204,6 @@ func NewWebRTCReceiver(
onRTCP: onRTCP,
isSVC: buffer.IsSvcCodec(track.Codec().MimeType),
isRED: buffer.IsRedCodec(track.Codec().MimeType),
baseTime: time.Now(),
}
for _, opt := range opts {
+4
View File
@@ -173,3 +173,7 @@ func (o *OWDEstimator) Update(senderClockTime time.Time, receiverClockTime time.
o.lastSenderClockTime = senderClockTime
return o.estimatedPropagationDelay, stepChange
}
func (o *OWDEstimator) EstimatedPropagationDelay() time.Duration {
return o.estimatedPropagationDelay
}
-1
View File
@@ -694,7 +694,6 @@ func (c *RTCClient) PublishData(data []byte, kind livekit.DataPacket_Kind) error
}
dpData, err := proto.Marshal(&livekit.DataPacket{
Kind: kind,
Value: &livekit.DataPacket_User{
User: &livekit.UserPacket{Payload: data},
},