From cf592676310c96d9ed24ae89bdf8d790766e3368 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Fri, 11 Oct 2024 04:07:24 +0000 Subject: [PATCH] Add counter for pub&sub time metrics (#3084) * Add counter for pub&sub time metrics The pub&sub shows large value in migration related case like muted/disabled migration, the subscription time depends on the time when publisher unmute the track(sending rtp packet after migration), add a counter to distinguish since we can't control the time in such cases and the first subscription attemps also is more meaningful than those cases. * Add info log for high publish delay --- pkg/rtc/participant.go | 24 ++++++++++++++++++------ pkg/rtc/subscriptionmanager.go | 5 +++-- pkg/telemetry/prometheus/rooms.go | 12 ++++++------ 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 873944adf..db34de735 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -2231,12 +2231,24 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei if newTrack { go func() { - p.pubLogger.Debugw( - "track published", - "trackID", mt.ID(), - "track", logger.Proto(mt.ToProto()), - "cost", pubTime.Milliseconds(), - ) + // TODO: remove this after we know where the high delay is coming from + if pubTime > 3*time.Second { + p.pubLogger.Infow( + "track published with high delay", + "trackID", mt.ID(), + "track", logger.Proto(mt.ToProto()), + "cost", pubTime.Milliseconds(), + "rid", track.RID(), + "mime", track.Codec().MimeType, + ) + } else { + p.pubLogger.Debugw( + "track published", + "trackID", mt.ID(), + "track", logger.Proto(mt.ToProto()), + "cost", pubTime.Milliseconds(), + ) + } prometheus.RecordPublishTime(mt.Source(), mt.Kind(), pubTime, p.GetClientInfo().GetSdk(), p.Kind()) p.handleTrackPublished(mt) diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 237af7ba2..33585b4f1 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -759,7 +759,8 @@ type trackSubscription struct { subStartedAt atomic.Pointer[time.Time] // the timestamp when the subscription was started, will be reset when downtrack is closed with expected resume - subscribeAt atomic.Pointer[time.Time] + subscribeAt atomic.Pointer[time.Time] + succRecordCounter atomic.Int32 } func newTrackSubscription(subscriberID livekit.ParticipantID, trackID livekit.TrackID, l logger.Logger) *trackSubscription { @@ -1003,7 +1004,7 @@ func (s *trackSubscription) maybeRecordSuccess(ts telemetry.TelemetryService, pI d := time.Since(*s.subscribeAt.Load()) s.logger.Debugw("track subscribed", "cost", d.Milliseconds()) subscriber := subTrack.Subscriber() - prometheus.RecordSubscribeTime(mediaTrack.Source(), mediaTrack.Kind(), d, subscriber.GetClientInfo().GetSdk(), subscriber.Kind()) + prometheus.RecordSubscribeTime(mediaTrack.Source(), mediaTrack.Kind(), d, subscriber.GetClientInfo().GetSdk(), subscriber.Kind(), int(s.succRecordCounter.Inc())) eventSent := s.eventSent.Swap(true) diff --git a/pkg/telemetry/prometheus/rooms.go b/pkg/telemetry/prometheus/rooms.go index fa7126539..e0d48479d 100644 --- a/pkg/telemetry/prometheus/rooms.go +++ b/pkg/telemetry/prometheus/rooms.go @@ -115,7 +115,7 @@ func initRoomStats(nodeID string, nodeType livekit.NodeType) { Name: "ms", ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()}, Buckets: []float64{100, 200, 500, 700, 1000, 5000, 10000}, - }, append(promStreamLabels, "sdk", "kind")) + }, append(promStreamLabels, "sdk", "kind", "count")) prometheus.MustRegister(promRoomCurrent) prometheus.MustRegister(promRoomDuration) @@ -173,19 +173,19 @@ func AddPublishSuccess(kind string) { } func RecordPublishTime(source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind) { - recordPubSubTime(true, source, trackType, d, sdk, kind) + recordPubSubTime(true, source, trackType, d, sdk, kind, 1) } -func RecordSubscribeTime(source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind) { - recordPubSubTime(false, source, trackType, d, sdk, kind) +func RecordSubscribeTime(source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind, count int) { + recordPubSubTime(false, source, trackType, d, sdk, kind, count) } -func recordPubSubTime(isPublish bool, source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind) { +func recordPubSubTime(isPublish bool, source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind, count int) { direction := "subscribe" if isPublish { direction = "publish" } - promPubSubTime.WithLabelValues(direction, source.String(), trackType.String(), sdk.String(), kind.String()).Observe(float64(d.Milliseconds())) + promPubSubTime.WithLabelValues(direction, source.String(), trackType.String(), sdk.String(), kind.String(), strconv.Itoa(count)).Observe(float64(d.Milliseconds())) } func RecordTrackSubscribeSuccess(kind string) {