Files
livekit/pkg/telemetry/statsworker.go
shishirng 7fcb887eb8 use delta bytes in window to identify max layer (#442)
total_bytes is aggregate, when we switch from higher layer to lower
layer, it takes time for lower layers total_bytes to catch up to
stopped higher layers

Signed-off-by: shishir gowda <shishir@livekit.io>
2022-02-17 15:15:10 -05:00

382 lines
12 KiB
Go

package telemetry
import (
"context"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
"google.golang.org/protobuf/types/known/timestamppb"
)
type Stat struct {
Score float32
Rtt uint32
Jitter uint32
TotalPrimaryPackets uint32
TotalPrimaryBytes uint64
TotalRetransmitPackets uint32
TotalRetransmitBytes uint64
TotalPaddingPackets uint32
TotalPaddingBytes uint64
TotalPacketsLost uint32
TotalFrames uint32
TotalNacks uint32
TotalPlis uint32
TotalFirs uint32
VideoLayers map[int32]*livekit.AnalyticsVideoLayer
TotalBytes uint64
TotalPackets uint32
MaxLayer int32
}
func (stat *Stat) ToAnalyticsStats(layers *livekit.AnalyticsVideoLayer) *livekit.AnalyticsStat {
stream := &livekit.AnalyticsStream{
TotalPrimaryPackets: stat.TotalPrimaryPackets,
TotalPrimaryBytes: stat.TotalPrimaryBytes,
TotalRetransmitPackets: stat.TotalRetransmitPackets,
TotalRetransmitBytes: stat.TotalRetransmitBytes,
TotalPaddingPackets: stat.TotalPaddingPackets,
TotalPaddingBytes: stat.TotalPaddingBytes,
TotalPacketsLost: stat.TotalPacketsLost,
TotalFrames: stat.TotalFrames,
Rtt: stat.Rtt,
Jitter: stat.Jitter,
TotalNacks: stat.TotalNacks,
TotalPlis: stat.TotalPlis,
TotalFirs: stat.TotalFirs,
}
if layers != nil {
stream.VideoLayers = []*livekit.AnalyticsVideoLayer{layers}
}
return &livekit.AnalyticsStat{Streams: []*livekit.AnalyticsStream{stream}, Score: stat.Score}
}
type Stats struct {
// local stats context used for coalesce/delta calculations
curStats *Stat
prevStats *Stat
// current stats received per stream
queue []*livekit.AnalyticsStat
}
// StatsWorker handles participant stats
type StatsWorker struct {
ctx context.Context
t TelemetryReporter
roomID livekit.RoomID
roomName livekit.RoomName
participantID livekit.ParticipantID
outgoingPerTrack map[livekit.TrackID]Stats
incomingPerTrack map[livekit.TrackID]Stats
// clean up stats for unpublished tracks
closedTracks map[livekit.TrackID]bool
}
func newStatsWorker(
ctx context.Context,
t TelemetryReporter,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
) *StatsWorker {
s := &StatsWorker{
ctx: ctx,
t: t,
roomID: roomID,
roomName: roomName,
participantID: participantID,
outgoingPerTrack: make(map[livekit.TrackID]Stats),
incomingPerTrack: make(map[livekit.TrackID]Stats),
closedTracks: make(map[livekit.TrackID]bool),
}
return s
}
func (s *StatsWorker) appendOutgoing(trackID livekit.TrackID, stat *livekit.AnalyticsStat) {
stats := s.outgoingPerTrack[trackID]
stats.queue = append(stats.queue, stat)
s.outgoingPerTrack[trackID] = stats
}
func (s *StatsWorker) appendIncoming(trackID livekit.TrackID, stat *livekit.AnalyticsStat) {
stats := s.incomingPerTrack[trackID]
stats.queue = append(stats.queue, stat)
s.incomingPerTrack[trackID] = stats
}
func (s *StatsWorker) OnTrackStat(trackID livekit.TrackID, direction livekit.StreamType, stat *livekit.AnalyticsStat) {
if direction == livekit.StreamType_DOWNSTREAM {
s.appendOutgoing(trackID, stat)
} else {
s.appendIncoming(trackID, stat)
}
}
func (s *StatsWorker) CleanUpTrackStats() {
if len(s.closedTracks) > 0 {
for trackID := range s.closedTracks {
delete(s.outgoingPerTrack, trackID)
delete(s.incomingPerTrack, trackID)
}
s.closedTracks = make(map[livekit.TrackID]bool)
}
}
func (s *StatsWorker) Update() {
ts := timestamppb.Now()
stats := make([]*livekit.AnalyticsStat, 0, len(s.incomingPerTrack)+len(s.outgoingPerTrack))
stats = s.collectUpstreamStats(ts, stats)
stats = s.collectDownstreamStats(ts, stats)
if len(stats) > 0 {
s.t.Report(s.ctx, stats)
}
s.CleanUpTrackStats()
}
func (s *StatsWorker) collectDownstreamStats(ts *timestamppb.Timestamp, stats []*livekit.AnalyticsStat) []*livekit.AnalyticsStat {
for trackID, analyticsStats := range s.outgoingPerTrack {
analyticsStat := s.getDeltaStats(&analyticsStats, ts, trackID, livekit.StreamType_DOWNSTREAM)
if analyticsStat != nil {
stats = append(stats, analyticsStat)
}
// clear the queue
analyticsStats.queue = nil
s.outgoingPerTrack[trackID] = analyticsStats
}
return stats
}
func (s *StatsWorker) collectUpstreamStats(ts *timestamppb.Timestamp, stats []*livekit.AnalyticsStat) []*livekit.AnalyticsStat {
for trackID, analyticsStats := range s.incomingPerTrack {
analyticsStat := s.getDeltaStats(&analyticsStats, ts, trackID, livekit.StreamType_UPSTREAM)
if analyticsStat != nil {
stats = append(stats, analyticsStat)
}
// clear the queue
analyticsStats.queue = nil
s.incomingPerTrack[trackID] = analyticsStats
}
return stats
}
func (s *StatsWorker) getDeltaStats(stats *Stats, ts *timestamppb.Timestamp, trackID livekit.TrackID, kind livekit.StreamType) *livekit.AnalyticsStat {
// merge all streams stats of track
stats.coalesce()
// create deltaStats to send
analyticsStat := stats.computeDeltaStats()
// update stats for next interval
if analyticsStat == nil {
return nil
}
stats.update()
s.patch(analyticsStat, ts, trackID, kind)
return analyticsStat
}
func (s *StatsWorker) patch(
analyticsStat *livekit.AnalyticsStat,
ts *timestamppb.Timestamp,
trackID livekit.TrackID,
kind livekit.StreamType,
) {
analyticsStat.TimeStamp = ts
analyticsStat.TrackId = string(trackID)
analyticsStat.Kind = kind
analyticsStat.RoomId = string(s.roomID)
analyticsStat.ParticipantId = string(s.participantID)
analyticsStat.RoomName = string(s.roomName)
}
func (s *StatsWorker) Close() {
s.Update()
}
// create a single stream and single video layer post aggregation
func (stats *Stats) update() {
stats.prevStats = stats.curStats
stats.curStats = nil
}
// create a single stream and single video layer post aggregation
func (stats *Stats) coalesce() {
if len(stats.queue) == 0 {
return
}
// average score of all available stats
score := float32(0.0)
for _, stat := range stats.queue {
score += stat.Score
}
score = score / float32(len(stats.queue))
// aggregate streams across all stats
maxRTT := make(map[uint32]uint32)
maxJitter := make(map[uint32]uint32)
analyticsStreams := make(map[uint32]*livekit.AnalyticsStream)
for _, stat := range stats.queue {
//
// For each stream (identified by SSRC) consolidate reports.
// For cumulative stats, take the latest report.
// For instantaneous stats, take maximum (or some other appropriate representation)
//
for _, stream := range stat.Streams {
ssrc := stream.Ssrc
analyticsStream := analyticsStreams[ssrc]
if analyticsStream == nil {
analyticsStreams[ssrc] = stream
maxRTT[ssrc] = stream.Rtt
maxJitter[ssrc] = stream.Jitter
continue
}
if stream.TotalPrimaryPackets <= analyticsStream.TotalPrimaryPackets {
// total count should be monotonically increasing
continue
}
analyticsStreams[ssrc] = stream
if stream.Rtt > maxRTT[ssrc] {
maxRTT[ssrc] = stream.Rtt
}
if stream.Jitter > maxJitter[ssrc] {
maxJitter[ssrc] = stream.Jitter
}
}
}
curStats := Stat{Score: score, VideoLayers: make(map[int32]*livekit.AnalyticsVideoLayer)}
// find aggregates across streams
for ssrc, analyticsStream := range analyticsStreams {
rtt := maxRTT[ssrc]
if rtt > curStats.Rtt {
curStats.Rtt = rtt
}
jitter := maxJitter[ssrc]
if jitter > curStats.Jitter {
curStats.Jitter = jitter
}
curStats.TotalPrimaryPackets += analyticsStream.TotalPrimaryPackets
curStats.TotalPrimaryBytes += analyticsStream.TotalPrimaryBytes
curStats.TotalRetransmitPackets += analyticsStream.TotalRetransmitPackets
curStats.TotalRetransmitBytes += analyticsStream.TotalRetransmitBytes
curStats.TotalPaddingPackets += analyticsStream.TotalPaddingPackets
curStats.TotalPaddingBytes += analyticsStream.TotalPaddingBytes
curStats.TotalPacketsLost += analyticsStream.TotalPacketsLost
curStats.TotalFrames += analyticsStream.TotalFrames
curStats.TotalNacks += analyticsStream.TotalNacks
curStats.TotalPlis += analyticsStream.TotalPlis
curStats.TotalFirs += analyticsStream.TotalFirs
// add/update new video VideoLayers data to current and sum up video layer bytes/packets
for _, videoLayer := range analyticsStream.VideoLayers {
curStats.VideoLayers[videoLayer.Layer] = proto.Clone(videoLayer).(*livekit.AnalyticsVideoLayer)
curStats.TotalPackets += videoLayer.TotalPackets
curStats.TotalBytes += videoLayer.TotalBytes
}
}
// update currentStats
stats.curStats = &curStats
return
}
// find delta between curStats and prevStats and prepare proto payload
func (stats *Stats) computeDeltaStats() *livekit.AnalyticsStat {
if stats.curStats == nil {
return nil
}
// Stats in both queue/prev contain consolidated single deltaStats
cur := stats.curStats
prev := stats.prevStats
var maxLayer int32
var maxDeltaBytes uint64
//create a map of VideoLayers - to pick max/best layer wrt current and prev
curLayers := make(map[int32]*livekit.AnalyticsVideoLayer)
// if we have prev, find max delta total bytes for each layer
if prev != nil {
// find max delta bytes
for _, layer := range cur.VideoLayers {
curLayers[layer.Layer] = layer
if prevLayer, ok := prev.VideoLayers[layer.Layer]; ok {
delta := layer.TotalBytes - prevLayer.TotalBytes
if delta > maxDeltaBytes {
maxDeltaBytes = delta
maxLayer = layer.Layer
}
}
}
} else {
// if we don't have prev layer, find max layer in current - based on totalBytes for a layer
for _, layer := range cur.VideoLayers {
curLayers[layer.Layer] = layer
// identify layer which sent max data
if layer.TotalBytes > maxDeltaBytes {
maxDeltaBytes = layer.TotalBytes
maxLayer = layer.Layer
}
}
return cur.ToAnalyticsStats(curLayers[maxLayer])
}
// we have prevStats, find delta between cur and prev
deltaStats := Stat{Score: cur.Score}
deltaStats.Rtt = cur.Rtt
deltaStats.Jitter = cur.Jitter
deltaStats.TotalPlis = cur.TotalPlis - prev.TotalPlis
deltaStats.TotalFrames = cur.TotalFrames - prev.TotalFrames
deltaStats.TotalNacks = cur.TotalNacks - prev.TotalNacks
deltaStats.TotalFirs = cur.TotalFirs - prev.TotalFirs
deltaStats.TotalPacketsLost = cur.TotalPacketsLost - prev.TotalPacketsLost
// https://datatracker.ietf.org/doc/html/rfc3550#page-83
if int32(deltaStats.TotalPacketsLost) < 0 {
deltaStats.TotalPacketsLost = 0
}
deltaStats.TotalPrimaryPackets = cur.TotalPrimaryPackets - prev.TotalPrimaryPackets
deltaStats.TotalRetransmitPackets = cur.TotalRetransmitPackets - prev.TotalRetransmitPackets
deltaStats.TotalPaddingPackets = cur.TotalPaddingPackets - prev.TotalPaddingPackets
deltaStats.TotalPaddingPackets = cur.TotalPaddingPackets - prev.TotalPaddingPackets
deltaStats.TotalPrimaryBytes = cur.TotalPrimaryBytes - prev.TotalPrimaryBytes
deltaStats.TotalPaddingBytes = cur.TotalPaddingBytes - prev.TotalPaddingBytes
deltaStats.TotalRetransmitBytes = cur.TotalRetransmitBytes - prev.TotalRetransmitBytes
var videoLayer *livekit.AnalyticsVideoLayer
if len(cur.VideoLayers) > 0 && len(prev.VideoLayers) > 0 {
videoLayer = new(livekit.AnalyticsVideoLayer)
// find the current layer for the same layer id as previous, compute current round of delta with it
if curLayer, ok := curLayers[prev.MaxLayer]; ok {
videoLayer.Layer = prev.MaxLayer
videoLayer.TotalFrames = curLayer.TotalFrames - prev.VideoLayers[prev.MaxLayer].TotalFrames
} else {
videoLayer = curLayers[maxLayer]
}
// store new max layer for next round
cur.MaxLayer = maxLayer
// we accumulate bytes/packets across layers
videoLayer.TotalBytes = cur.TotalBytes - prev.TotalBytes
videoLayer.TotalPackets = cur.TotalPackets - prev.TotalPackets
}
// if no packets from any layers, return nil to send no stats
if deltaStats.TotalPackets == 0 && deltaStats.TotalPrimaryPackets == 0 && deltaStats.TotalRetransmitPackets == 0 && deltaStats.TotalPaddingPackets == 0 {
return nil
}
return deltaStats.ToAnalyticsStats(videoLayer)
}
func (s *StatsWorker) RemoveStats(trackID livekit.TrackID) {
s.closedTracks[trackID] = true
}