Add some simple data track stats (#4431)

This commit is contained in:
Raja Subramanian
2026-04-04 15:23:49 +05:30
committed by GitHub
parent 050909e627
commit 55912dff7e
3 changed files with 111 additions and 1 deletions
+7
View File
@@ -48,6 +48,8 @@ type DataTrack struct {
downTrackSpreader *sfuutils.DownTrackSpreader[types.DataTrackSender]
stats *dataTrackStats
closed core.Fuse
}
@@ -60,6 +62,7 @@ func NewDataTrack(params DataTrackParams, dti *livekit.DataTrackInfo) *DataTrack
Threshold: 20,
Logger: params.Logger,
}),
stats: newDataTrackStats(dataTrackStatsParams{Logger: params.Logger}),
}
d.params.Logger.Infow("created data track", "name", d.Name())
return d
@@ -68,6 +71,8 @@ func NewDataTrack(params DataTrackParams, dti *livekit.DataTrackInfo) *DataTrack
func (d *DataTrack) Close() {
d.params.Logger.Infow("closing data track", "name", d.Name())
d.closed.Break()
d.stats.Close()
}
func (d *DataTrack) PublisherID() livekit.ParticipantID {
@@ -156,6 +161,8 @@ func (d *DataTrack) DeleteDataDownTrack(subscriberID livekit.ParticipantID) {
}
func (d *DataTrack) HandlePacket(data []byte, packet *datatrack.Packet, arrivalTime int64) {
d.stats.Update(packet, arrivalTime)
d.downTrackSpreader.Broadcast(func(dts types.DataTrackSender) {
dts.WritePacket(data, packet, arrivalTime)
})
+1 -1
View File
@@ -85,7 +85,7 @@ type Header struct {
┆* 0 1 2 3
┆* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
┆* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
┆* |V |F|L|X| reserved | handle |
┆* |V |S|F|X| reserved | handle |
┆* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
┆* | sequence number | frame number |
┆* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+103
View File
@@ -0,0 +1,103 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rtc
import (
"sync"
"time"
"github.com/livekit/livekit-server/pkg/rtc/datatrack"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
)
type dataTrackStatsParams struct {
Logger logger.Logger
}
type dataTrackStats struct {
params dataTrackStatsParams
lock sync.Mutex
startTime int64
endTime int64
highestSequenceNumber uint16
numPackets int
numPacketsLost int
numPacketsOutOfOrder int
numFrames int // count of `F` tagged packets, i. e. packets with final packet of frame marker
}
func newDataTrackStats(params dataTrackStatsParams) *dataTrackStats {
return &dataTrackStats{
params: params,
}
}
func (d *dataTrackStats) Update(packet *datatrack.Packet, arrivalTime int64) {
d.lock.Lock()
defer d.lock.Unlock()
if d.endTime != 0 {
return
}
if d.startTime == 0 {
d.startTime = arrivalTime
d.highestSequenceNumber = packet.SequenceNumber
d.numPackets = 1
} else {
diff := packet.SequenceNumber - d.highestSequenceNumber
switch {
case diff == 0: // duplicate
return
case diff > (1 << 15): // out of order
d.numPacketsOutOfOrder++
if d.numPacketsLost > 0 {
d.numPacketsLost--
}
default: // in order
d.numPackets++
d.numPacketsLost += int(diff) - 1
d.highestSequenceNumber = packet.SequenceNumber
}
}
if packet.IsFinalOfFrame {
d.numFrames++
}
}
func (d *dataTrackStats) Close() {
d.lock.Lock()
defer d.lock.Unlock()
d.endTime = mono.UnixNano()
duration := time.Duration(d.endTime - d.startTime).Seconds()
fps := float64(d.numFrames) / duration
d.params.Logger.Infow(
"data track stats",
"duration", duration,
"numPackets", d.numPackets,
"numPacketsLost", d.numPacketsLost,
"numPacketsOutOfOrder", d.numPacketsOutOfOrder,
"numFrames", d.numFrames,
"fps", fps,
)
}