Telemetry connection scores (#377)

* octets - total bytes needs to be uint64

uint32 wraps at 4GB

Signed-off-by: shishir gowda <shishir@livekit.io>

* Cleanup stats handler to use connectionQuality stats

remove per packet rtcp handlers, buffer stats

* cleanup connection stats

* Update mediatrack to store rtcp stats in connection stats

* Update downstream handling of connection stats and telemetry

* Update telemetry tests

Signed-off-by: shishir gowda <shishir@livekit.io>

* Misc fixes

Signed-off-by: shishir gowda <shishir@livekit.io>

* Minor fix to avoid accessing buffer before its allocated

Signed-off-by: shishir gowda <shishir@livekit.io>

* start updateStats worker in AddReciever()

Signed-off-by: shishir gowda <shishir@livekit.io>

* Use previous score to calculate avg scores

* Restructure connectionStats

Signed-off-by: shishir gowda <shishir@livekit.io>
This commit is contained in:
shishirng
2022-01-27 11:24:54 -05:00
committed by GitHub
parent 32825d2666
commit 26eea78b54
11 changed files with 266 additions and 339 deletions
+51 -24
View File
@@ -87,7 +87,7 @@ func NewMediaTrack(track *webrtc.TrackRemote, params MediaTrackParams) *MediaTra
Logger: params.Logger,
})
t.MediaTrackReceiver.OnMediaLossUpdate(func(fractionalLoss uint8) {
if t.buffer != nil {
if t.buffer != nil && t.Kind() == livekit.TrackType_AUDIO {
// ok to access buffer since receivers are added before subscribers
t.buffer.SetLastFractionLostReport(fractionalLoss)
}
@@ -108,8 +108,6 @@ func NewMediaTrack(track *webrtc.TrackRemote, params MediaTrackParams) *MediaTra
// on close signal via closing channel to workers
t.AddOnClose(t.closeChan)
go t.updateStats()
return t
}
@@ -222,16 +220,15 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
t.params.Telemetry.TrackUnpublished(context.Background(), t.PublisherID(), t.ToProto(), uint32(track.SSRC()))
})
t.params.Telemetry.TrackPublished(context.Background(), t.PublisherID(), t.ToProto())
if t.Kind() == livekit.TrackType_AUDIO {
t.buffer = buff
}
t.buffer = buff
t.MediaTrackReceiver.SetupReceiver(wr)
go t.updateStats()
}
t.lock.Unlock()
t.Receiver().(*sfu.WebRTCReceiver).AddUpTrack(track, buff)
t.params.Telemetry.AddUpTrack(t.PublisherID(), t.ID(), buff)
atomic.AddUint32(&t.numUpTracks, 1)
// LK-TODO: can remove this completely when VideoLayers protocol becomes the default as it has info from client or if we decide to use TrackInfo.Simulcast
@@ -275,9 +272,9 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) {
var jitter uint32
var totalLost uint32
var maxSeqNum uint32
// forward to telemetry
t.params.Telemetry.HandleRTCP(livekit.StreamType_UPSTREAM, t.params.ParticipantID, t.ID(), packets)
var nackCount int32
var pliCount int32
var firCount int32
for _, p := range packets {
switch pkt := p.(type) {
@@ -287,7 +284,6 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) {
if rr.FractionLost > maxLost {
maxLost = rr.FractionLost
}
if rr.Delay > delay {
delay = rr.Delay
}
@@ -297,11 +293,18 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) {
if rr.LastSequenceNumber > maxSeqNum {
maxSeqNum = rr.LastSequenceNumber
}
totalLost = rr.TotalLost
hasReport = true
}
hasReport = true
case *rtcp.TransportLayerNack:
nackCount++
hasReport = true
case *rtcp.PictureLossIndication:
pliCount++
hasReport = true
case *rtcp.FullIntraRequest:
firCount++
hasReport = true
}
}
@@ -317,8 +320,11 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) {
t.maxUpFracLost = 0
t.maxUpFracLostTs = now
}
t.statsLock.Unlock()
// update feedback stats
current := t.connectionStats.Curr
t.connectionStats.Lock.Lock()
current := t.connectionStats
if jitter > current.Jitter {
current.Jitter = jitter
}
@@ -329,7 +335,10 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) {
current.LastSeqNum = maxSeqNum
}
current.PacketsLost = totalLost
t.statsLock.Unlock()
current.NackCount += nackCount
current.PliCount += pliCount
current.FirCount += firCount
t.connectionStats.Lock.Unlock()
}
// also look for sender reports
@@ -338,8 +347,8 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) {
}
func (t *MediaTrack) GetConnectionScore() float64 {
t.statsLock.Lock()
defer t.statsLock.Unlock()
t.connectionStats.Lock.Lock()
defer t.connectionStats.Lock.Unlock()
return t.connectionStats.Score
}
@@ -348,23 +357,41 @@ func (t *MediaTrack) closeChan() {
}
func (t *MediaTrack) updateStats() {
for {
select {
case <-t.done:
return
case <-time.After(connectionQualityUpdateInterval):
t.statsLock.Lock()
t.connectionStats.Lock.Lock()
//we are accessing stats after receiver has buffer assigned
stats := t.buffer.GetStats()
delta := t.connectionStats.UpdateStats(stats.TotalByte)
if t.Kind() == livekit.TrackType_AUDIO {
t.connectionStats.CalculateAudioScore()
t.connectionStats.Score = connectionquality.AudioConnectionScore(delta, t.connectionStats.Jitter)
} else {
t.calculateVideoScore()
t.connectionStats.Score = t.calculateVideoScore()
}
t.statsLock.Unlock()
stat := &livekit.AnalyticsStat{
Jitter: float64(t.connectionStats.Jitter),
TotalPackets: uint64(t.connectionStats.TotalPackets),
PacketLost: uint64(t.connectionStats.PacketsLost),
Delay: uint64(t.connectionStats.Delay),
TotalBytes: t.connectionStats.TotalBytes,
NackCount: t.connectionStats.NackCount,
PliCount: t.connectionStats.PliCount,
FirCount: t.connectionStats.FirCount,
ConnectionScore: float32(t.connectionStats.Score),
}
t.params.Telemetry.TrackStats(livekit.StreamType_UPSTREAM, t.PublisherID(), t.ID(), stat)
t.connectionStats.Lock.Unlock()
}
}
}
func (t *MediaTrack) calculateVideoScore() {
func (t *MediaTrack) calculateVideoScore() float64 {
var reducedQuality bool
publishing, expected := t.getNumUpTracks()
if publishing < expected {
@@ -375,5 +402,5 @@ func (t *MediaTrack) calculateVideoScore() {
if expected == 0 {
loss = 0
}
t.connectionStats.Score = connectionquality.Loss2Score(loss, reducedQuality)
return connectionquality.VideoConnectionScore(loss, reducedQuality)
}
+3 -8
View File
@@ -185,14 +185,9 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, code
go t.sendDownTrackBindingReports(sub)
})
trackID := t.params.MediaTrack.ID()
downTrack.OnPacketSent(func(_ *sfu.DownTrack, size int) {
t.params.Telemetry.OnDownstreamPacket(subscriberID, trackID, size)
})
downTrack.OnPaddingSent(func(_ *sfu.DownTrack, size int) {
t.params.Telemetry.OnDownstreamPacket(subscriberID, trackID, size)
})
downTrack.OnRTCP(func(pkts []rtcp.Packet) {
t.params.Telemetry.HandleRTCP(livekit.StreamType_DOWNSTREAM, subscriberID, trackID, pkts)
downTrack.OnStatsUpdate(func(_ *sfu.DownTrack, stat *livekit.AnalyticsStat) {
t.params.Telemetry.TrackStats(livekit.StreamType_DOWNSTREAM, subscriberID, trackID, stat)
})
downTrack.OnCloseHandler(func() {
+9 -5
View File
@@ -250,7 +250,7 @@ func TestConnectionQuality(t *testing.T) {
if numRegistered > 0 && numPublishing != numRegistered {
reducedQuality = true
}
return connectionquality.Loss2Score(loss, reducedQuality)
return connectionquality.VideoConnectionScore(loss, reducedQuality)
}
testPublishedVideoTrack := func(loss, numPublishing, numRegistered uint32) *typesfakes.FakeLocalMediaTrack {
@@ -264,16 +264,20 @@ func TestConnectionQuality(t *testing.T) {
testPublishedAudioTrack := func(totalPackets, packetsLost uint32) *typesfakes.FakeLocalMediaTrack {
tr := &typesfakes.FakeLocalMediaTrack{}
stat := &connectionquality.ConnectionStat{
stat := connectionquality.ConnectionStat{
PacketsLost: packetsLost,
TotalPackets: totalPackets,
LastSeqNum: 0,
}
stats := &connectionquality.ConnectionStats{
Curr: stat,
Prev: &connectionquality.ConnectionStat{},
ConnectionStat: stat,
Prev: &connectionquality.ConnectionStat{},
}
stats.CalculateAudioScore()
stats.Score = connectionquality.AudioConnectionScore(connectionquality.ConnectionStat{
TotalPackets: stat.TotalPackets,
PacketsLost: stat.PacketsLost,
TotalBytes: stat.TotalBytes,
}, 0)
t.Log("audio score: ", stats.Score)
tr.GetConnectionScoreReturns(stats.Score)
return tr
+23 -16
View File
@@ -1,25 +1,28 @@
package connectionquality
import (
"github.com/livekit/protocol/livekit"
)
import "sync"
type ConnectionStat struct {
PacketsLost uint32
Delay uint32
Jitter uint32
TotalPackets uint32
LastSeqNum uint32
TotalBytes uint64
}
type ConnectionStats struct {
Curr *ConnectionStat
Prev *ConnectionStat
Score float64
Lock sync.Mutex
ConnectionStat
Prev *ConnectionStat
Delay uint32
Jitter uint32
NackCount int32
PliCount int32
FirCount int32
Score float64
}
func NewConnectionStats() *ConnectionStats {
return &ConnectionStats{Curr: &ConnectionStat{}, Prev: &ConnectionStat{}, Score: 4.0}
return &ConnectionStats{Prev: &ConnectionStat{}, Score: 4.0}
}
func getTotalPackets(curSN, prevSN uint32) uint32 {
@@ -33,18 +36,22 @@ func getTotalPackets(curSN, prevSN uint32) uint32 {
return increment
}
func (cs *ConnectionStats) CalculateAudioScore() float64 {
func (cs *ConnectionStats) UpdateStats(totalBytes uint64) ConnectionStat {
// update feedback stats
current := cs.Curr
previous := cs.Prev
// Update TotalPackets from SeqNum here
current.TotalPackets += getTotalPackets(current.LastSeqNum, previous.LastSeqNum)
cs.Score = ConnectionScore(current, previous, livekit.TrackType_AUDIO)
cs.TotalPackets += getTotalPackets(cs.LastSeqNum, previous.LastSeqNum)
cs.TotalBytes = totalBytes
var delta ConnectionStat
delta.TotalPackets = cs.TotalPackets - previous.TotalPackets
delta.PacketsLost = cs.PacketsLost - previous.PacketsLost
delta.TotalBytes = cs.TotalBytes - previous.TotalBytes
// store previous stats
cs.Prev = current
cs.Curr = &ConnectionStat{TotalPackets: cs.Prev.TotalPackets, PacketsLost: cs.Prev.PacketsLost}
cs.Prev = &ConnectionStat{TotalPackets: cs.TotalPackets, PacketsLost: cs.PacketsLost, TotalBytes: cs.TotalBytes, LastSeqNum: cs.LastSeqNum}
return cs.Score
return delta
}
+11 -16
View File
@@ -22,26 +22,20 @@ func Score2Rating(score float64) livekit.ConnectionQuality {
return livekit.ConnectionQuality_POOR
}
func mosAudioEmodel(cur, prev *ConnectionStat) float64 {
if cur == nil {
return 0.0
}
func mosAudioEmodel(delta ConnectionStat, jitter uint32) float64 {
// find percentage of lost packets in this window
deltaTotalPackets := cur.TotalPackets - prev.TotalPackets
if deltaTotalPackets == 0 {
if delta.TotalPackets == 0 {
return 0.0
}
deltaTotalLostPackets := cur.PacketsLost - prev.PacketsLost
percentageLost := (float64(deltaTotalLostPackets) / float64(deltaTotalPackets)) * 100
percentageLost := (float64(delta.PacketsLost) / float64(delta.TotalPackets)) * 100
rx := 93.2 - percentageLost
ry := 0.18*rx*rx - 27.9*rx + 1126.62
// Jitter is in MicroSecs (1/1e6) units. Convert it to MilliSecs
d := float64(rtt + (cur.Jitter / 1000))
d := float64(rtt + (jitter / 1000))
h := d - 177.3
if h < 0 {
h = 0
@@ -61,7 +55,7 @@ func mosAudioEmodel(cur, prev *ConnectionStat) float64 {
return score
}
func Loss2Score(loss uint32, reducedQuality bool) float64 {
func loss2Score(loss uint32, reducedQuality bool) float64 {
// No Loss, excellent
if loss == 0 && !reducedQuality {
return 5
@@ -78,9 +72,10 @@ func Loss2Score(loss uint32, reducedQuality bool) float64 {
return score
}
func ConnectionScore(cur, prev *ConnectionStat, kind livekit.TrackType) float64 {
if kind == livekit.TrackType_AUDIO {
return mosAudioEmodel(cur, prev)
}
return 0
func AudioConnectionScore(delta ConnectionStat, jitter uint32) float64 {
return mosAudioEmodel(delta, jitter)
}
func VideoConnectionScore(pctLoss uint32, reducedQuality bool) float64 {
return loss2Score(pctLoss, reducedQuality)
}
+69 -38
View File
@@ -66,7 +66,7 @@ var (
type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport)
type PacketStats struct {
octets uint32
octets uint64
packets uint32
}
@@ -103,7 +103,6 @@ type DownTrack struct {
primaryStats atomic.Value // contains *PacketStats
rtxStats atomic.Value // contains *PacketStats
paddingStats atomic.Value // contains *PacketStats
statsLock sync.Mutex
connectionStats *connectionquality.ConnectionStats
done chan struct{}
@@ -133,6 +132,9 @@ type DownTrack struct {
// padding packet sent callback
onPaddingSent []func(dt *DownTrack, size int)
// update stats
onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat)
}
// NewDownTrack returns a DownTrack.
@@ -552,6 +554,10 @@ func (d *DownTrack) OnPaddingSent(fn func(dt *DownTrack, size int)) {
d.onPaddingSent = append(d.onPaddingSent, fn)
}
func (d *DownTrack) OnStatsUpdate(fn func(dt *DownTrack, stat *livekit.AnalyticsStat)) {
d.onStatsUpdate = fn
}
func (d *DownTrack) IsDeficient() bool {
return d.forwarder.IsDeficient()
}
@@ -641,19 +647,20 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport {
diff := (uint64(now.Sub(ntpTime(srNTP).Time())) * uint64(d.codec.ClockRate)) / uint64(time.Second)
octets, packets := d.getSRStats()
return &rtcp.SenderReport{
SSRC: d.ssrc,
NTPTime: uint64(nowNTP),
RTPTime: srRTP + uint32(diff),
PacketCount: packets,
OctetCount: octets,
OctetCount: uint32(octets),
}
}
func (d *DownTrack) UpdatePrimaryStats(packetLen uint32) {
primaryStats, _ := d.primaryStats.Load().(*PacketStats)
primaryStats.octets += packetLen
primaryStats.octets += uint64(packetLen)
primaryStats.packets += 1
d.primaryStats.Store(primaryStats)
@@ -662,7 +669,7 @@ func (d *DownTrack) UpdatePrimaryStats(packetLen uint32) {
func (d *DownTrack) UpdateRtxStats(packetLen uint32) {
rtxStats, _ := d.rtxStats.Load().(*PacketStats)
rtxStats.octets += packetLen
rtxStats.octets += uint64(packetLen)
rtxStats.packets += 1
d.rtxStats.Store(rtxStats)
@@ -671,7 +678,7 @@ func (d *DownTrack) UpdateRtxStats(packetLen uint32) {
func (d *DownTrack) UpdatePaddingStats(packetLen uint32) {
paddingStats, _ := d.paddingStats.Load().(*PacketStats)
paddingStats.octets += packetLen
paddingStats.octets += uint64(packetLen)
paddingStats.packets += 1
d.paddingStats.Store(paddingStats)
@@ -807,14 +814,21 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
var jitter uint32
var totalLost uint32
var maxSeqNum uint32
var nackCount int32
var pliCount int32
var firCount int32
maxRatePacketLoss := uint8(0)
for _, pkt := range pkts {
switch p := pkt.(type) {
case *rtcp.PictureLossIndication:
sendPliOnce()
pliCount++
hasReport = true
case *rtcp.FullIntraRequest:
sendPliOnce()
firCount++
hasReport = true
case *rtcp.ReceiverEstimatedMaximumBitrate:
if d.onREMB != nil {
d.onREMB(d, p)
@@ -854,34 +868,40 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
}
d.listenerLock.RUnlock()
}
if hasReport {
d.statsLock.Lock()
// update feedback stats
current := d.connectionStats.Curr
if jitter > current.Jitter {
current.Jitter = jitter
}
if delay > current.Delay {
current.Delay = delay
}
if maxSeqNum > current.LastSeqNum {
current.LastSeqNum = maxSeqNum
}
current.PacketsLost = totalLost
d.statsLock.Unlock()
}
case *rtcp.TransportLayerNack:
var nackedPackets []packetMeta
for _, pair := range p.Nacks {
nackedPackets = append(nackedPackets, d.sequencer.getSeqNoPairs(pair.PacketList())...)
}
go d.retransmitPackets(nackedPackets)
nackCount += int32(len(nackedPackets))
hasReport = true
case *rtcp.TransportLayerCC:
if p.MediaSSRC == d.ssrc && d.onTransportCCFeedback != nil {
d.onTransportCCFeedback(d, p)
}
}
}
if hasReport {
d.connectionStats.Lock.Lock()
current := d.connectionStats
// update feedback stats
if jitter > current.Jitter {
current.Jitter = jitter
}
if delay > current.Delay {
current.Delay = delay
}
if maxSeqNum > current.LastSeqNum {
current.LastSeqNum = maxSeqNum
}
current.PacketsLost = totalLost
current.NackCount += nackCount
current.PliCount += pliCount
current.FirCount += firCount
d.connectionStats.Lock.Unlock()
}
}
func (d *DownTrack) retransmitPackets(nackedPackets []packetMeta) {
@@ -959,7 +979,7 @@ func (d *DownTrack) retransmitPackets(nackedPackets []packetMeta) {
}
}
func (d *DownTrack) getSRStats() (uint32, uint32) {
func (d *DownTrack) getSRStats() (uint64, uint32) {
primary := d.primaryStats.Load().(*PacketStats)
rtx := d.rtxStats.Load().(*PacketStats)
padding := d.paddingStats.Load().(*PacketStats)
@@ -1061,8 +1081,8 @@ func (d *DownTrack) DebugInfo() map[string]interface{} {
}
func (d *DownTrack) GetConnectionScore() float64 {
d.statsLock.Lock()
defer d.statsLock.Unlock()
d.connectionStats.Lock.Lock()
defer d.connectionStats.Lock.Unlock()
return d.connectionStats.Score
}
@@ -1072,13 +1092,33 @@ func (d *DownTrack) updateStats() {
case <-d.done:
return
case <-time.After(connectionQualityUpdateInterval):
d.statsLock.Lock()
d.connectionStats.Lock.Lock()
totalBytes, _ := d.getSRStats()
delta := d.connectionStats.UpdateStats(totalBytes)
if d.Kind() == webrtc.RTPCodecTypeAudio {
d.connectionStats.CalculateAudioScore()
d.connectionStats.Score = connectionquality.AudioConnectionScore(delta, d.connectionStats.Jitter)
} else {
d.calculateVideoScore()
var reducedQuality bool
if d.GetForwardingStatus() != ForwardingStatusOptimal {
reducedQuality = true
}
d.connectionStats.Score = connectionquality.VideoConnectionScore(FixedPointToPercent(d.CurrentMaxLossFraction()), reducedQuality)
}
d.statsLock.Unlock()
stat := &livekit.AnalyticsStat{
Jitter: float64(d.connectionStats.Jitter),
TotalPackets: uint64(d.connectionStats.TotalPackets),
PacketLost: uint64(d.connectionStats.PacketsLost),
Delay: uint64(d.connectionStats.Delay),
TotalBytes: d.connectionStats.TotalBytes,
NackCount: d.connectionStats.NackCount,
PliCount: d.connectionStats.PliCount,
FirCount: d.connectionStats.FirCount,
ConnectionScore: float32(d.connectionStats.Score),
}
if d.onStatsUpdate != nil {
d.onStatsUpdate(d, stat)
}
d.connectionStats.Lock.Unlock()
}
}
}
@@ -1087,12 +1127,3 @@ func (d *DownTrack) updateStats() {
func FixedPointToPercent(frac uint8) uint32 {
return (uint32(frac) * 100) >> 8
}
func (d *DownTrack) calculateVideoScore() {
var reducedQuality bool
if d.GetForwardingStatus() != ForwardingStatusOptimal {
reducedQuality = true
}
d.connectionStats.Score = connectionquality.Loss2Score(FixedPointToPercent(d.CurrentMaxLossFraction()), reducedQuality)
return
}
+29 -51
View File
@@ -5,8 +5,6 @@ import (
"github.com/livekit/protocol/livekit"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
// StatsWorker handles participant stats
@@ -17,9 +15,7 @@ type StatsWorker struct {
roomName livekit.RoomName
participantID livekit.ParticipantID
upstreamBuffers map[livekit.TrackID][]*buffer.Buffer
drainUpstreamBuffers map[livekit.TrackID]bool
drainStats map[livekit.TrackID]bool
outgoingPerTrack map[livekit.TrackID]*Stats
incomingPerTrack map[livekit.TrackID]*Stats
}
@@ -32,6 +28,7 @@ type Stats struct {
prevBytes uint64
totalPacketsLost uint64
prevPacketsLost uint64
connectionScore float32
}
func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID) *StatsWorker {
@@ -42,24 +39,13 @@ func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID livekit.Roo
roomName: roomName,
participantID: participantID,
upstreamBuffers: make(map[livekit.TrackID][]*buffer.Buffer),
drainUpstreamBuffers: make(map[livekit.TrackID]bool),
outgoingPerTrack: make(map[livekit.TrackID]*Stats),
incomingPerTrack: make(map[livekit.TrackID]*Stats),
drainStats: make(map[livekit.TrackID]bool),
}
return s
}
func (s *StatsWorker) AddBuffer(trackID livekit.TrackID, buffer *buffer.Buffer) {
s.upstreamBuffers[trackID] = append(s.upstreamBuffers[trackID], buffer)
}
func (s *StatsWorker) OnDownstreamPacket(trackID livekit.TrackID, bytes int) {
s.getOrCreateOutgoingStatsIfEmpty(trackID).totalBytes += uint64(bytes)
s.getOrCreateOutgoingStatsIfEmpty(trackID).totalPackets++
}
func (s *StatsWorker) getOrCreateOutgoingStatsIfEmpty(trackID livekit.TrackID) *Stats {
if s.outgoingPerTrack[trackID] == nil {
s.outgoingPerTrack[trackID] = &Stats{next: &livekit.AnalyticsStat{
@@ -84,7 +70,7 @@ func (s *StatsWorker) getOrCreateIncomingStatsIfEmpty(trackID livekit.TrackID) *
return s.incomingPerTrack[trackID]
}
func (s *StatsWorker) OnRTCP(trackID livekit.TrackID, direction livekit.StreamType, stats *livekit.AnalyticsStat) {
func (s *StatsWorker) OnTrackStat(trackID livekit.TrackID, direction livekit.StreamType, stats *livekit.AnalyticsStat) {
var ds *Stats
if direction == livekit.StreamType_DOWNSTREAM {
ds = s.getOrCreateOutgoingStatsIfEmpty(trackID)
@@ -92,6 +78,8 @@ func (s *StatsWorker) OnRTCP(trackID livekit.TrackID, direction livekit.StreamTy
ds = s.getOrCreateIncomingStatsIfEmpty(trackID)
}
ds.totalPacketsLost = stats.PacketLost
ds.totalPackets = uint32(stats.TotalPackets)
ds.totalBytes = stats.TotalBytes
if stats.Rtt > ds.next.Rtt {
ds.next.Rtt = stats.Rtt
@@ -102,17 +90,12 @@ func (s *StatsWorker) OnRTCP(trackID livekit.TrackID, direction livekit.StreamTy
ds.next.NackCount += stats.NackCount
ds.next.PliCount += stats.PliCount
ds.next.FirCount += stats.FirCount
}
func (s *StatsWorker) calculateTotalBytesPackets(allBuffers []*buffer.Buffer) (totalBytes uint64, totalPackets uint32) {
totalBytes = 0
totalPackets = 0
for _, buff := range allBuffers {
totalBytes += buff.GetStats().TotalByte
totalPackets += buff.GetStats().PacketCount
// average out scores received in this interval
if ds.connectionScore == 0 {
ds.connectionScore = stats.ConnectionScore
} else {
ds.connectionScore = (ds.connectionScore + stats.ConnectionScore) / 2
}
return totalBytes, totalPackets
}
func (s *StatsWorker) Update() {
@@ -133,30 +116,25 @@ func (s *StatsWorker) collectDownstreamStats(ts *timestamppb.Timestamp, stats []
stats = append(stats, analyticsStat)
}
}
if len(s.drainStats) > 0 {
for trackID := range s.drainStats {
delete(s.outgoingPerTrack, trackID)
delete(s.incomingPerTrack, trackID)
}
s.drainStats = make(map[livekit.TrackID]bool)
}
return stats
}
func (s *StatsWorker) collectUpstreamStats(ts *timestamppb.Timestamp, stats []*livekit.AnalyticsStat) []*livekit.AnalyticsStat {
for trackID, buffers := range s.upstreamBuffers {
totalBytes, totalPackets := s.calculateTotalBytesPackets(buffers)
s.getOrCreateIncomingStatsIfEmpty(trackID).totalBytes = totalBytes
s.getOrCreateIncomingStatsIfEmpty(trackID).totalPackets = totalPackets
analyticsStats := s.update(s.incomingPerTrack[trackID], ts)
if analyticsStats != nil {
analyticsStats.TrackId = string(trackID)
stats = append(stats, analyticsStats)
for trackID, trackUpStreamStats := range s.incomingPerTrack {
analyticsStat := s.update(trackUpStreamStats, ts)
if analyticsStat != nil {
analyticsStat.TrackId = string(trackID)
stats = append(stats, analyticsStat)
}
}
if len(s.drainUpstreamBuffers) > 0 {
for trackID := range s.drainUpstreamBuffers {
delete(s.upstreamBuffers, trackID)
delete(s.incomingPerTrack, trackID)
}
s.drainUpstreamBuffers = make(map[livekit.TrackID]bool)
}
return stats
}
@@ -177,18 +155,18 @@ func (s *StatsWorker) update(stats *Stats, ts *timestamppb.Timestamp) *livekit.A
next.TotalPackets = uint64(stats.totalPackets - stats.prevPackets)
next.TotalBytes = stats.totalBytes - stats.prevBytes
next.PacketLost = stats.totalPacketsLost - stats.prevPacketsLost
next.ConnectionScore = stats.connectionScore
stats.prevPackets = stats.totalPackets
stats.prevBytes = stats.totalBytes
stats.prevPacketsLost = stats.totalPacketsLost
return next
}
func (s *StatsWorker) RemoveBuffer(trackID livekit.TrackID) {
s.drainUpstreamBuffers[trackID] = true
}
func (s *StatsWorker) Close() {
s.Update()
}
func (s *StatsWorker) RemoveStats(trackID livekit.TrackID) {
s.drainStats[trackID] = true
}
+3 -20
View File
@@ -6,18 +6,13 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/webhook"
"github.com/pion/rtcp"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
const updateFrequency = time.Second * 10
type TelemetryService interface {
// stats
AddUpTrack(participantID livekit.ParticipantID, trackID livekit.TrackID, buff *buffer.Buffer)
OnDownstreamPacket(participantID livekit.ParticipantID, trackID livekit.TrackID, bytes int)
HandleRTCP(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, pkts []rtcp.Packet)
TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat)
// events
RoomStarted(ctx context.Context, room *livekit.Room)
@@ -68,21 +63,9 @@ func (t *telemetryService) run() {
}
}
func (t *telemetryService) AddUpTrack(participantID livekit.ParticipantID, trackID livekit.TrackID, buff *buffer.Buffer) {
func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stats *livekit.AnalyticsStat) {
t.jobQueue <- func() {
t.internalService.AddUpTrack(participantID, trackID, buff)
}
}
func (t *telemetryService) OnDownstreamPacket(participantID livekit.ParticipantID, trackID livekit.TrackID, bytes int) {
t.jobQueue <- func() {
t.internalService.OnDownstreamPacket(participantID, trackID, bytes)
}
}
func (t *telemetryService) HandleRTCP(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, pkts []rtcp.Packet) {
t.jobQueue <- func() {
t.internalService.HandleRTCP(streamType, participantID, trackID, pkts)
t.internalService.TrackStats(streamType, participantID, trackID, stats)
}
}
+4 -47
View File
@@ -4,12 +4,9 @@ import (
"context"
"github.com/gammazero/workerpool"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/webhook"
"github.com/pion/rtcp"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
type TelemetryServiceInternal interface {
@@ -40,58 +37,18 @@ func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsS
}
}
func (t *telemetryServiceInternal) AddUpTrack(participantID livekit.ParticipantID, trackID livekit.TrackID, buff *buffer.Buffer) {
w := t.workers[participantID]
if w != nil {
w.AddBuffer(trackID, buff)
}
}
func (t *telemetryServiceInternal) OnDownstreamPacket(participantID livekit.ParticipantID, trackID livekit.TrackID, bytes int) {
w := t.workers[participantID]
if w != nil {
w.OnDownstreamPacket(trackID, bytes)
}
}
func (t *telemetryServiceInternal) HandleRTCP(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, pkts []rtcp.Packet) {
stats := &livekit.AnalyticsStat{}
for _, pkt := range pkts {
switch pkt := pkt.(type) {
case *rtcp.TransportLayerNack:
stats.NackCount++
case *rtcp.PictureLossIndication:
stats.PliCount++
case *rtcp.FullIntraRequest:
stats.FirCount++
case *rtcp.ReceiverReport:
for _, rr := range pkt.Reports {
if jitter := float64(rr.Jitter); jitter > stats.Jitter {
stats.Jitter = jitter
}
if totalLost := uint64(rr.TotalLost); totalLost > stats.PacketLost {
stats.PacketLost = totalLost
}
}
if streamType == livekit.StreamType_DOWNSTREAM {
rtt := GetRttMs(pkt)
if rtt >= 0 {
stats.Rtt = uint32(rtt)
}
}
}
}
func (t *telemetryServiceInternal) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat) {
direction := prometheus.Incoming
if streamType == livekit.StreamType_DOWNSTREAM {
direction = prometheus.Outgoing
}
prometheus.IncrementRTCP(direction, stats.NackCount, stats.PliCount, stats.FirCount)
prometheus.IncrementRTCP(direction, stat.NackCount, stat.PliCount, stat.FirCount)
w := t.workers[participantID]
if w != nil {
w.OnRTCP(trackID, streamType, stats)
w.OnTrackStat(trackID, streamType, stat)
}
}
@@ -124,8 +124,8 @@ func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, partici
w := t.workers[participantID]
if w != nil {
roomID = w.roomID
w.RemoveBuffer(livekit.TrackID(track.GetSid()))
roomName = w.roomName
w.RemoveStats(livekit.TrackID(track.GetSid()))
}
prometheus.SubPublishedTrack(track.Type.String())
+63 -113
View File
@@ -5,10 +5,8 @@ import (
"testing"
"github.com/livekit/protocol/livekit"
"github.com/pion/rtcp"
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes"
)
@@ -37,7 +35,8 @@ func Test_ParticipantAndRoomDataAreSentWithAnalytics(t *testing.T) {
// do
packet := 33
fixture.sut.OnDownstreamPacket(partSID, "", packet)
stat := &livekit.AnalyticsStat{TotalBytes: uint64(packet)}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, "", stat)
fixture.sut.SendAnalytics()
// test
@@ -65,8 +64,11 @@ func Test_OnDownstreamPackets(t *testing.T) {
totalBytes := packets[0] + packets[1]
totalPackets := len(packets)
trackID := livekit.TrackID("trackID")
var bytes int
for i := range packets {
fixture.sut.OnDownstreamPacket(partSID, trackID, packets[i])
bytes += packets[i]
stat := &livekit.AnalyticsStat{TotalBytes: uint64(bytes), TotalPackets: uint64(i + 1)}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat)
}
fixture.sut.SendAnalytics()
@@ -93,10 +95,13 @@ func Test_OnDownstreamPackets_SeveralTracks(t *testing.T) {
// do
packet1 := 33
trackID1 := livekit.TrackID("trackID1")
stat1 := &livekit.AnalyticsStat{TotalBytes: uint64(packet1), TotalPackets: 1}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID1, stat1)
packet2 := 23
trackID2 := livekit.TrackID("trackID2")
fixture.sut.OnDownstreamPacket(partSID, trackID1, packet1)
fixture.sut.OnDownstreamPacket(partSID, trackID2, packet2)
stat2 := &livekit.AnalyticsStat{TotalBytes: uint64(packet2), TotalPackets: 1}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID2, stat2)
fixture.sut.SendAnalytics()
// test
@@ -131,20 +136,12 @@ func Test_OnDownStreamRTCP(t *testing.T) {
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil)
// do
pkts := []rtcp.Packet{
&rtcp.TransportLayerNack{},
&rtcp.PictureLossIndication{},
&rtcp.FullIntraRequest{},
&rtcp.ReceiverReport{
Reports: []rtcp.ReceptionReport{
{Jitter: 5, TotalLost: 3},
{Jitter: 2, TotalLost: 4},
},
},
}
stat1 := &livekit.AnalyticsStat{NackCount: 1, PliCount: 1, Jitter: 3, PacketLost: 3, TotalBytes: 1, TotalPackets: 1}
trackID := livekit.TrackID("trackID1")
fixture.sut.OnDownstreamPacket(partSID, trackID, 1) // there should be bytes reported so that stats are sent
fixture.sut.HandleRTCP(livekit.StreamType_DOWNSTREAM, partSID, trackID, pkts)
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat1)
stat2 := &livekit.AnalyticsStat{FirCount: 1, Jitter: 5, PacketLost: 4, TotalBytes: 2, TotalPackets: 2}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat2)
fixture.sut.SendAnalytics()
// test
@@ -171,25 +168,12 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) {
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil)
// do
pkts1 := []rtcp.Packet{
&rtcp.ReceiverReport{
Reports: []rtcp.ReceptionReport{
{Delay: 0, Jitter: 0, TotalLost: 1},
},
},
}
pkts2 := []rtcp.Packet{
&rtcp.ReceiverReport{
Reports: []rtcp.ReceptionReport{
{Delay: 0, Jitter: 0, TotalLost: 4}, // diff with previous is 3, so in second call to SendAnalytics, 3 should be sent
},
},
}
trackID := livekit.TrackID("trackID1")
fixture.sut.OnDownstreamPacket(partSID, trackID, 1) // there should be bytes reported so that stats are sent
fixture.sut.HandleRTCP(livekit.StreamType_DOWNSTREAM, partSID, trackID, pkts1)
stat1 := &livekit.AnalyticsStat{PacketLost: 1, TotalPackets: 1, TotalBytes: 1}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat1) // there should be bytes reported so that stats are sent
fixture.sut.SendAnalytics()
fixture.sut.HandleRTCP(livekit.StreamType_DOWNSTREAM, partSID, trackID, pkts2)
stat2 := &livekit.AnalyticsStat{PacketLost: 4, TotalPackets: 2, TotalBytes: 2}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat2)
fixture.sut.SendAnalytics()
// test
@@ -215,18 +199,17 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) {
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil)
// do
pkts1 := []rtcp.Packet{
&rtcp.TransportLayerNack{},
}
pkts2 := []rtcp.Packet{
&rtcp.FullIntraRequest{},
}
trackID1 := livekit.TrackID("trackID1")
trackID2 := livekit.TrackID("trackID2")
fixture.sut.OnDownstreamPacket(partSID, trackID1, 1) // there should be bytes reported so that stats are sent
fixture.sut.HandleRTCP(livekit.StreamType_DOWNSTREAM, partSID, trackID1, pkts1)
fixture.sut.OnDownstreamPacket(partSID, trackID2, 1) // there should be bytes reported so that stats are sent
fixture.sut.HandleRTCP(livekit.StreamType_DOWNSTREAM, partSID, trackID2, pkts2)
stat1 := &livekit.AnalyticsStat{TotalBytes: 1, TotalPackets: 1}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID1, stat1) // there should be bytes reported so that stats are sent
stat2 := &livekit.AnalyticsStat{NackCount: 1, TotalPackets: 2, TotalBytes: 2}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID1, stat2)
stat3 := &livekit.AnalyticsStat{FirCount: 1, TotalPackets: 3, TotalBytes: 3}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID2, stat3)
fixture.sut.SendAnalytics()
// test
@@ -260,26 +243,14 @@ func Test_OnUpstreamRTCP(t *testing.T) {
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil)
// do
pkts := []rtcp.Packet{
&rtcp.TransportLayerNack{},
&rtcp.PictureLossIndication{},
&rtcp.FullIntraRequest{},
&rtcp.ReceiverReport{
Reports: []rtcp.ReceptionReport{
{Jitter: 5, TotalLost: 3},
{Jitter: 2, TotalLost: 4},
},
},
}
// there should be bytes reported so that stats are sent
buf := &buffer.Buffer{}
buf.SetStatsTestOnly(buffer.Stats{
PacketCount: 1,
TotalByte: 1,
})
stat1 := &livekit.AnalyticsStat{NackCount: 1, PliCount: 1, FirCount: 1, Jitter: 5, PacketLost: 3, TotalPackets: 1, TotalBytes: 1}
trackID := livekit.TrackID("trackID")
fixture.sut.AddUpTrack(partSID, trackID, buf)
fixture.sut.HandleRTCP(livekit.StreamType_UPSTREAM, partSID, trackID, pkts)
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat1)
stat2 := &livekit.AnalyticsStat{Jitter: 2, PacketLost: 4, TotalPackets: 2, TotalBytes: 2}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat2)
fixture.sut.SendAnalytics()
// test
@@ -306,26 +277,22 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil)
// there should be bytes reported so that stats are sent
buf := &buffer.Buffer{}
totalBytes := 1
totalPackets := 1
buf.SetStatsTestOnly(buffer.Stats{
PacketCount: uint32(totalPackets),
TotalByte: uint64(totalBytes),
})
trackID1 := livekit.TrackID("trackID1")
trackID2 := livekit.TrackID("trackID2")
fixture.sut.AddUpTrack(partSID, trackID1, buf)
fixture.sut.AddUpTrack(partSID, trackID2, buf) // using same buffer is not correct but for test it is fine
pkts1 := []rtcp.Packet{
&rtcp.TransportLayerNack{},
}
pkts2 := []rtcp.Packet{
&rtcp.FullIntraRequest{},
}
stat1 := &livekit.AnalyticsStat{TotalBytes: uint64(totalBytes), TotalPackets: uint64(totalPackets)}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID1, stat1)
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID2, stat1) // using same buffer is not correct but for test it is fine
// do
fixture.sut.HandleRTCP(livekit.StreamType_UPSTREAM, partSID, trackID1, pkts1)
fixture.sut.HandleRTCP(livekit.StreamType_UPSTREAM, partSID, trackID2, pkts2)
totalBytes++
totalPackets++
stat2 := &livekit.AnalyticsStat{NackCount: 1, TotalBytes: uint64(totalBytes), TotalPackets: uint64(totalPackets)}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID1, stat2)
stat3 := &livekit.AnalyticsStat{FirCount: 1, TotalBytes: uint64(totalBytes), TotalPackets: uint64(totalPackets)}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID2, stat3)
fixture.sut.SendAnalytics()
// test
@@ -351,7 +318,7 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) {
require.True(t, found1)
require.True(t, found2)
// remove 1 buffer
// remove 1 track
fixture.sut.TrackUnpublished(context.Background(), partSID, &livekit.TrackInfo{Sid: string(trackID2)}, 0)
fixture.sut.SendAnalytics()
require.Equal(t, 2, fixture.analytics.SendStatsCallCount())
@@ -393,14 +360,10 @@ func Test_AddUpTrack(t *testing.T) {
// do
var totalBytes uint64 = 3
var totalPackets uint32 = 3
buf := &buffer.Buffer{}
bufferStats := buffer.Stats{
PacketCount: totalPackets,
TotalByte: totalBytes,
}
buf.SetStatsTestOnly(bufferStats)
stat := &livekit.AnalyticsStat{TotalPackets: uint64(totalPackets), TotalBytes: totalBytes}
trackID := livekit.TrackID("trackID")
fixture.sut.AddUpTrack(partSID, trackID, buf)
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat)
fixture.sut.SendAnalytics()
// test
@@ -423,30 +386,20 @@ func Test_AddUpTrack_SeveralBuffers_Simulcast(t *testing.T) {
fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil)
// do
trackID := livekit.TrackID("trackID")
// buffer 1
buf1 := &buffer.Buffer{}
buf1.SetStatsTestOnly(buffer.Stats{
PacketCount: 1,
TotalByte: 1,
})
fixture.sut.AddUpTrack(partSID, trackID, buf1)
// buffer 2
buf2 := &buffer.Buffer{}
buf2.SetStatsTestOnly(buffer.Stats{
PacketCount: 2,
TotalByte: 2,
})
fixture.sut.AddUpTrack(partSID, trackID, buf2)
stat1 := &livekit.AnalyticsStat{TotalBytes: 1, TotalPackets: 1}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat1)
stat2 := &livekit.AnalyticsStat{TotalPackets: 2, TotalBytes: 2}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat2)
fixture.sut.SendAnalytics()
// test
totalBytes := buf1.GetStats().TotalByte + buf2.GetStats().TotalByte
totalPackets := buf1.GetStats().PacketCount + buf2.GetStats().PacketCount
require.Equal(t, 1, fixture.analytics.SendStatsCallCount())
_, stats := fixture.analytics.SendStatsArgsForCall(0)
require.Equal(t, 1, len(stats))
require.Equal(t, livekit.StreamType_UPSTREAM, stats[0].Kind)
require.Equal(t, totalBytes, stats[0].TotalBytes)
require.Equal(t, totalPackets, uint32(stats[0].TotalPackets))
require.Equal(t, stat2.TotalBytes, stats[0].TotalBytes)
require.Equal(t, stat2.TotalPackets, stats[0].TotalPackets)
require.Equal(t, string(trackID), stats[0].TrackId)
}
@@ -461,14 +414,11 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) {
// do
// upstream bytes
buf := &buffer.Buffer{}
buf.SetStatsTestOnly(buffer.Stats{
PacketCount: 3,
TotalByte: 3,
})
fixture.sut.AddUpTrack(partSID, "trackID", buf)
stat1 := &livekit.AnalyticsStat{TotalPackets: 3, TotalBytes: 3}
fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, "trackID", stat1)
// downstream bytes
fixture.sut.OnDownstreamPacket(partSID, "trackID1", 1)
stat2 := &livekit.AnalyticsStat{TotalPackets: 1, TotalBytes: 1}
fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, "trackID1", stat2)
fixture.sut.SendAnalytics()
// test