From 55912dff7e1ff3e36a4f673189f94d305758d8dd Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 4 Apr 2026 15:23:49 +0530 Subject: [PATCH] Add some simple data track stats (#4431) --- pkg/rtc/datatrack.go | 7 +++ pkg/rtc/datatrack/packet.go | 2 +- pkg/rtc/datatrack_stats.go | 103 ++++++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 pkg/rtc/datatrack_stats.go diff --git a/pkg/rtc/datatrack.go b/pkg/rtc/datatrack.go index 0401e5d78..d1e1cfc16 100644 --- a/pkg/rtc/datatrack.go +++ b/pkg/rtc/datatrack.go @@ -48,6 +48,8 @@ type DataTrack struct { downTrackSpreader *sfuutils.DownTrackSpreader[types.DataTrackSender] + stats *dataTrackStats + closed core.Fuse } @@ -60,6 +62,7 @@ func NewDataTrack(params DataTrackParams, dti *livekit.DataTrackInfo) *DataTrack Threshold: 20, Logger: params.Logger, }), + stats: newDataTrackStats(dataTrackStatsParams{Logger: params.Logger}), } d.params.Logger.Infow("created data track", "name", d.Name()) return d @@ -68,6 +71,8 @@ func NewDataTrack(params DataTrackParams, dti *livekit.DataTrackInfo) *DataTrack func (d *DataTrack) Close() { d.params.Logger.Infow("closing data track", "name", d.Name()) d.closed.Break() + + d.stats.Close() } func (d *DataTrack) PublisherID() livekit.ParticipantID { @@ -156,6 +161,8 @@ func (d *DataTrack) DeleteDataDownTrack(subscriberID livekit.ParticipantID) { } func (d *DataTrack) HandlePacket(data []byte, packet *datatrack.Packet, arrivalTime int64) { + d.stats.Update(packet, arrivalTime) + d.downTrackSpreader.Broadcast(func(dts types.DataTrackSender) { dts.WritePacket(data, packet, arrivalTime) }) diff --git a/pkg/rtc/datatrack/packet.go b/pkg/rtc/datatrack/packet.go index be36318a9..487a1c6c9 100644 --- a/pkg/rtc/datatrack/packet.go +++ b/pkg/rtc/datatrack/packet.go @@ -85,7 +85,7 @@ type Header struct { ┆* 0 1 2 3 ┆* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ┆* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - ┆* |V |F|L|X| reserved | handle | + ┆* |V |S|F|X| reserved | handle | ┆* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ┆* | sequence number | frame number | ┆* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ diff --git a/pkg/rtc/datatrack_stats.go b/pkg/rtc/datatrack_stats.go new file mode 100644 index 000000000..7c3f2034c --- /dev/null +++ b/pkg/rtc/datatrack_stats.go @@ -0,0 +1,103 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rtc + +import ( + "sync" + "time" + + "github.com/livekit/livekit-server/pkg/rtc/datatrack" + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils/mono" +) + +type dataTrackStatsParams struct { + Logger logger.Logger +} + +type dataTrackStats struct { + params dataTrackStatsParams + + lock sync.Mutex + startTime int64 + endTime int64 + highestSequenceNumber uint16 + numPackets int + numPacketsLost int + numPacketsOutOfOrder int + numFrames int // count of `F` tagged packets, i. e. packets with final packet of frame marker +} + +func newDataTrackStats(params dataTrackStatsParams) *dataTrackStats { + return &dataTrackStats{ + params: params, + } +} + +func (d *dataTrackStats) Update(packet *datatrack.Packet, arrivalTime int64) { + d.lock.Lock() + defer d.lock.Unlock() + + if d.endTime != 0 { + return + } + + if d.startTime == 0 { + d.startTime = arrivalTime + d.highestSequenceNumber = packet.SequenceNumber + d.numPackets = 1 + } else { + diff := packet.SequenceNumber - d.highestSequenceNumber + switch { + case diff == 0: // duplicate + return + + case diff > (1 << 15): // out of order + d.numPacketsOutOfOrder++ + if d.numPacketsLost > 0 { + d.numPacketsLost-- + } + + default: // in order + d.numPackets++ + d.numPacketsLost += int(diff) - 1 + d.highestSequenceNumber = packet.SequenceNumber + } + } + + if packet.IsFinalOfFrame { + d.numFrames++ + } +} + +func (d *dataTrackStats) Close() { + d.lock.Lock() + defer d.lock.Unlock() + + d.endTime = mono.UnixNano() + + duration := time.Duration(d.endTime - d.startTime).Seconds() + fps := float64(d.numFrames) / duration + + d.params.Logger.Infow( + "data track stats", + "duration", duration, + "numPackets", d.numPackets, + "numPacketsLost", d.numPacketsLost, + "numPacketsOutOfOrder", d.numPacketsOutOfOrder, + "numFrames", d.numFrames, + "fps", fps, + ) +}