Participant traffic load. (#2262)

* Participant traffic load.

Capturing information about participant traffic
- Upstream/Downstream
- Audio/Video/Data
- Packets/Bytes

This captures a notion of how much traffic load a participant is
generating.

Can be used to make allocation decisions.

* Clean up

* SIP patches

* reporter goroutine

* unlock

* move traffic stats from protocol

* check type
This commit is contained in:
Raja Subramanian
2023-11-26 23:05:00 +05:30
committed by GitHub
parent 56dd399684
commit 53542b09a0
15 changed files with 598 additions and 190 deletions
+3 -3
View File
@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e
github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7
github.com/livekit/protocol v1.9.2-0.20231126171322-3f612979d8c9
github.com/livekit/psrpc v0.5.2
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
@@ -70,7 +70,7 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.5 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/klauspost/compress v1.17.3 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/lithammer/shortuuid/v4 v4.0.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
@@ -101,7 +101,7 @@ require (
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/grpc v1.59.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+6 -6
View File
@@ -107,8 +107,8 @@ github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786 h1:N527AHMa79
github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786/go.mod h1:v4hqbTdfQngbVSZJVWUhGE/lbTFf9jb+ygmNUDQMuOs=
github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw=
github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA=
github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 h1:M/ljEz6MCH5lovoTT0t6hyaaZJEn4hvXs9J9OtQ+gS4=
github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7/go.mod h1:JgFHHd99wgEp4smATlJupOdA7iJHFoj2g3RFeM/Hk8M=
github.com/livekit/protocol v1.9.2-0.20231126171322-3f612979d8c9 h1:7/S8gXZCWbtx46xHdYSBg4XqnMVpAt+YfHMTZYmWzkQ=
github.com/livekit/protocol v1.9.2-0.20231126171322-3f612979d8c9/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes=
github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U=
github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
@@ -413,8 +413,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f h1:ultW7fxlIvee4HYrtnaRPon9HpEgFk5zYpmfMgtKB5I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f/go.mod h1:L9KNLi232K1/xB6f7AlSX692koaRnKaWSR0stBki0Yc=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+16
View File
@@ -828,4 +828,20 @@ func (t *MediaTrackReceiver) IsEncrypted() bool {
return t.trackInfo.Encryption != livekit.Encryption_NONE
}
func (t *MediaTrackReceiver) GetTrackStats() *livekit.RTPStats {
t.lock.Lock()
receivers := t.receiversShadow
t.lock.Unlock()
stats := make([]*livekit.RTPStats, 0, len(receivers))
for _, receiver := range receivers {
receiverStats := receiver.GetTrackStats()
if receiverStats != nil {
stats = append(stats, receiverStats)
}
}
return buffer.AggregateRTPStats(stats)
}
// ---------------------------
+11 -4
View File
@@ -167,6 +167,7 @@ type ParticipantImpl struct {
*TransportManager
*UpTrackManager
*SubscriptionManager
*ParticipantTrafficLoad
// keeps track of unpublished tracks in order to reuse trackID
unpublishedTracks []*livekit.TrackInfo
@@ -269,6 +270,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
p.setupUpTrackManager()
p.setupSubscriptionManager()
p.setupParticipantTrafficLoad()
return p, nil
}
@@ -784,6 +786,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
go func() {
p.SubscriptionManager.Close(isExpectedToResume)
p.TransportManager.Close()
p.ParticipantTrafficLoad.Close()
}()
p.dataChannelStats.Stop()
@@ -915,10 +918,6 @@ func (p *ParticipantImpl) OnICEConfigChanged(f func(participant types.LocalParti
p.lock.Unlock()
}
//
// signal connection methods
//
func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo {
numTracks := 0
minQuality := livekit.ConnectionQuality_EXCELLENT
@@ -1227,6 +1226,14 @@ func (p *ParticipantImpl) setupSubscriptionManager() {
})
}
func (p *ParticipantImpl) setupParticipantTrafficLoad() {
p.ParticipantTrafficLoad = NewParticipantTrafficLoad(ParticipantTrafficLoadParams{
Participant: p,
DataChannelStats: p.dataChannelStats,
Logger: p.params.Logger,
})
}
func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {
oldState := p.State()
if state == oldState {
+211
View File
@@ -0,0 +1,211 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rtc
import (
"sync"
"time"
"github.com/frostbyte73/core"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
const (
reportInterval = 10 * time.Second
)
type ParticipantTrafficLoadParams struct {
Participant *ParticipantImpl
DataChannelStats *telemetry.BytesTrackStats
Logger logger.Logger
}
type ParticipantTrafficLoad struct {
params ParticipantTrafficLoadParams
lock sync.RWMutex
onTrafficLoad func(trafficLoad *types.TrafficLoad)
tracksStatsMedia map[livekit.TrackID]*livekit.RTPStats
dataChannelTraffic *telemetry.TrafficTotals
trafficLoad *types.TrafficLoad
closed core.Fuse
}
func NewParticipantTrafficLoad(params ParticipantTrafficLoadParams) *ParticipantTrafficLoad {
p := &ParticipantTrafficLoad{
params: params,
tracksStatsMedia: make(map[livekit.TrackID]*livekit.RTPStats),
closed: core.NewFuse(),
}
go p.reporter()
return p
}
func (p *ParticipantTrafficLoad) Close() {
p.closed.Break()
}
func (p *ParticipantTrafficLoad) OnTrafficLoad(f func(trafficLoad *types.TrafficLoad)) {
p.lock.Lock()
p.onTrafficLoad = f
p.lock.Unlock()
}
func (p *ParticipantTrafficLoad) getOnTrafficLoad() func(trafficLoad *types.TrafficLoad) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onTrafficLoad
}
func (p *ParticipantTrafficLoad) GetTrafficLoad() *types.TrafficLoad {
p.lock.RLock()
defer p.lock.RUnlock()
return p.trafficLoad
}
func (p *ParticipantTrafficLoad) updateTrafficLoad() *types.TrafficLoad {
publishedTracks := p.params.Participant.GetPublishedTracks()
subscribedTracks := p.params.Participant.SubscriptionManager.GetSubscribedTracks()
availableTracks := make(map[livekit.TrackID]bool, len(publishedTracks)+len(subscribedTracks))
upstreamAudioStats := make([]*types.TrafficStats, 0, len(publishedTracks))
upstreamVideoStats := make([]*types.TrafficStats, 0, len(publishedTracks))
downstreamAudioStats := make([]*types.TrafficStats, 0, len(subscribedTracks))
downstreamVideoStats := make([]*types.TrafficStats, 0, len(subscribedTracks))
p.lock.Lock()
defer p.lock.Unlock()
for _, pt := range publishedTracks {
lmt, ok := pt.(types.LocalMediaTrack)
if !ok {
continue
}
trackID := lmt.ID()
stats := lmt.GetTrackStats()
trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats)
if stats != nil {
p.tracksStatsMedia[trackID] = stats
availableTracks[trackID] = true
}
if trafficStats != nil {
switch lmt.Kind() {
case livekit.TrackType_AUDIO:
upstreamAudioStats = append(upstreamAudioStats, trafficStats)
case livekit.TrackType_VIDEO:
upstreamVideoStats = append(upstreamVideoStats, trafficStats)
}
}
}
for _, st := range subscribedTracks {
trackID := st.ID()
stats := st.DownTrack().GetTrackStats()
trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats)
if stats != nil {
p.tracksStatsMedia[trackID] = stats
availableTracks[trackID] = true
}
if trafficStats != nil {
switch st.MediaTrack().Kind() {
case livekit.TrackType_AUDIO:
downstreamAudioStats = append(downstreamAudioStats, trafficStats)
case livekit.TrackType_VIDEO:
downstreamVideoStats = append(downstreamVideoStats, trafficStats)
}
}
}
// remove unavailable tracks from track stats cache
for trackID := range p.tracksStatsMedia {
if !availableTracks[trackID] {
delete(p.tracksStatsMedia, trackID)
}
}
trafficTypeStats := make([]*types.TrafficTypeStats, 0, 6)
addTypeStats := func(statsList []*types.TrafficStats, trackType livekit.TrackType, streamType livekit.StreamType) {
agg := types.AggregateTrafficStats(statsList)
if agg != nil {
trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{
TrackType: trackType,
StreamType: streamType,
TrafficStats: agg,
})
}
}
addTypeStats(upstreamAudioStats, livekit.TrackType_AUDIO, livekit.StreamType_UPSTREAM)
addTypeStats(upstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_UPSTREAM)
addTypeStats(downstreamAudioStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM)
addTypeStats(downstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM)
if p.params.DataChannelStats != nil {
dataChannelTraffic := p.params.DataChannelStats.GetTrafficTotals()
if p.dataChannelTraffic != nil {
trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{
TrackType: livekit.TrackType_DATA,
StreamType: livekit.StreamType_UPSTREAM,
TrafficStats: &types.TrafficStats{
StartTime: p.dataChannelTraffic.At,
EndTime: dataChannelTraffic.At,
Packets: dataChannelTraffic.RecvMessages - p.dataChannelTraffic.RecvMessages,
Bytes: dataChannelTraffic.RecvBytes - p.dataChannelTraffic.RecvBytes,
},
})
trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{
TrackType: livekit.TrackType_DATA,
StreamType: livekit.StreamType_DOWNSTREAM,
TrafficStats: &types.TrafficStats{
StartTime: p.dataChannelTraffic.At,
EndTime: dataChannelTraffic.At,
Packets: dataChannelTraffic.SendMessages - p.dataChannelTraffic.SendMessages,
Bytes: dataChannelTraffic.SendBytes - p.dataChannelTraffic.SendBytes,
},
})
}
p.dataChannelTraffic = dataChannelTraffic
}
p.trafficLoad = &types.TrafficLoad{
TrafficTypeStats: trafficTypeStats,
}
return p.trafficLoad
}
func (p *ParticipantTrafficLoad) reporter() {
ticker := time.NewTicker(reportInterval)
defer ticker.Stop()
for {
select {
case <-p.closed.Watch():
return
case <-ticker.C:
trafficLoad := p.updateTrafficLoad()
if onTrafficLoad := p.getOnTrafficLoad(); onTrafficLoad != nil {
onTrafficLoad(trafficLoad)
}
}
}
}
+4
View File
@@ -395,6 +395,7 @@ type LocalParticipant interface {
OnClose(callback func(LocalParticipant))
OnClaimsChanged(callback func(LocalParticipant))
OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport)
OnTrafficLoad(callback func(trafficLoad *TrafficLoad))
// session migration
MaybeStartMigration(force bool, onStart func()) bool
@@ -420,6 +421,8 @@ type LocalParticipant interface {
SetSubscriberChannelCapacity(channelCapacity int64)
GetPacer() pacer.Pacer
GetTrafficLoad() *TrafficLoad
}
// Room is a container of participants, and can provide room-level actions
@@ -499,6 +502,7 @@ type LocalMediaTrack interface {
HasSdpCid(cid string) bool
GetConnectionScoreAndQuality() (float32, livekit.ConnectionQuality)
GetTrackStats() *livekit.RTPStats
SetRTT(rtt uint32)
+126
View File
@@ -0,0 +1,126 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"time"
"github.com/livekit/protocol/livekit"
)
type TrafficStats struct {
StartTime time.Time
EndTime time.Time
Packets uint32
Bytes uint64
}
type TrafficTypeStats struct {
TrackType livekit.TrackType
StreamType livekit.StreamType
TrafficStats *TrafficStats
}
type TrafficLoad struct {
TrafficTypeStats []*TrafficTypeStats
}
func RTPStatsDiffToTrafficStats(before, after *livekit.RTPStats) *TrafficStats {
if after == nil {
return nil
}
startTime := after.StartTime
if before != nil {
startTime = before.EndTime
}
if before == nil {
return &TrafficStats{
StartTime: startTime.AsTime(),
EndTime: after.EndTime.AsTime(),
Packets: after.Packets,
Bytes: after.Bytes + after.BytesDuplicate + after.BytesPadding,
}
}
return &TrafficStats{
StartTime: startTime.AsTime(),
EndTime: after.EndTime.AsTime(),
Packets: after.Packets - before.Packets,
Bytes: (after.Bytes + after.BytesDuplicate + after.BytesPadding) - (before.Bytes + before.BytesDuplicate + before.BytesPadding),
}
}
func AggregateTrafficStats(statsList []*TrafficStats) *TrafficStats {
if len(statsList) == 0 {
return nil
}
startTime := time.Time{}
endTime := time.Time{}
packets := uint32(0)
bytes := uint64(0)
for _, stats := range statsList {
if startTime.IsZero() || startTime.After(stats.StartTime) {
startTime = stats.StartTime
}
if endTime.IsZero() || endTime.Before(stats.EndTime) {
endTime = stats.EndTime
}
packets += stats.Packets
bytes += stats.Bytes
}
if endTime.IsZero() {
endTime = time.Now()
}
return &TrafficStats{
StartTime: startTime,
EndTime: endTime,
Packets: packets,
Bytes: bytes,
}
}
func TrafficLoadToTrafficRate(trafficLoad *TrafficLoad) (
packetRateIn float64,
byteRateIn float64,
packetRateOut float64,
byteRateOut float64,
) {
if trafficLoad == nil {
return
}
for _, trafficTypeStat := range trafficLoad.TrafficTypeStats {
elapsed := trafficTypeStat.TrafficStats.EndTime.Sub(trafficTypeStat.TrafficStats.StartTime).Seconds()
packetRate := float64(trafficTypeStat.TrafficStats.Packets) / elapsed
byteRate := float64(trafficTypeStat.TrafficStats.Bytes) / elapsed
switch trafficTypeStat.StreamType {
case livekit.StreamType_UPSTREAM:
packetRateIn += packetRate
byteRateIn += byteRate
case livekit.StreamType_DOWNSTREAM:
packetRateOut += packetRate
byteRateOut += byteRate
}
}
return
}
@@ -107,6 +107,16 @@ type FakeLocalMediaTrack struct {
getTemporalLayerForSpatialFpsReturnsOnCall map[int]struct {
result1 int32
}
GetTrackStatsStub func() *livekit.RTPStats
getTrackStatsMutex sync.RWMutex
getTrackStatsArgsForCall []struct {
}
getTrackStatsReturns struct {
result1 *livekit.RTPStats
}
getTrackStatsReturnsOnCall map[int]struct {
result1 *livekit.RTPStats
}
HasSdpCidStub func(string) bool
hasSdpCidMutex sync.RWMutex
hasSdpCidArgsForCall []struct {
@@ -834,6 +844,59 @@ func (fake *FakeLocalMediaTrack) GetTemporalLayerForSpatialFpsReturnsOnCall(i in
}{result1}
}
func (fake *FakeLocalMediaTrack) GetTrackStats() *livekit.RTPStats {
fake.getTrackStatsMutex.Lock()
ret, specificReturn := fake.getTrackStatsReturnsOnCall[len(fake.getTrackStatsArgsForCall)]
fake.getTrackStatsArgsForCall = append(fake.getTrackStatsArgsForCall, struct {
}{})
stub := fake.GetTrackStatsStub
fakeReturns := fake.getTrackStatsReturns
fake.recordInvocation("GetTrackStats", []interface{}{})
fake.getTrackStatsMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalMediaTrack) GetTrackStatsCallCount() int {
fake.getTrackStatsMutex.RLock()
defer fake.getTrackStatsMutex.RUnlock()
return len(fake.getTrackStatsArgsForCall)
}
func (fake *FakeLocalMediaTrack) GetTrackStatsCalls(stub func() *livekit.RTPStats) {
fake.getTrackStatsMutex.Lock()
defer fake.getTrackStatsMutex.Unlock()
fake.GetTrackStatsStub = stub
}
func (fake *FakeLocalMediaTrack) GetTrackStatsReturns(result1 *livekit.RTPStats) {
fake.getTrackStatsMutex.Lock()
defer fake.getTrackStatsMutex.Unlock()
fake.GetTrackStatsStub = nil
fake.getTrackStatsReturns = struct {
result1 *livekit.RTPStats
}{result1}
}
func (fake *FakeLocalMediaTrack) GetTrackStatsReturnsOnCall(i int, result1 *livekit.RTPStats) {
fake.getTrackStatsMutex.Lock()
defer fake.getTrackStatsMutex.Unlock()
fake.GetTrackStatsStub = nil
if fake.getTrackStatsReturnsOnCall == nil {
fake.getTrackStatsReturnsOnCall = make(map[int]struct {
result1 *livekit.RTPStats
})
}
fake.getTrackStatsReturnsOnCall[i] = struct {
result1 *livekit.RTPStats
}{result1}
}
func (fake *FakeLocalMediaTrack) HasSdpCid(arg1 string) bool {
fake.hasSdpCidMutex.Lock()
ret, specificReturn := fake.hasSdpCidReturnsOnCall[len(fake.hasSdpCidArgsForCall)]
@@ -2069,6 +2132,8 @@ func (fake *FakeLocalMediaTrack) Invocations() map[string][][]interface{} {
defer fake.getQualityForDimensionMutex.RUnlock()
fake.getTemporalLayerForSpatialFpsMutex.RLock()
defer fake.getTemporalLayerForSpatialFpsMutex.RUnlock()
fake.getTrackStatsMutex.RLock()
defer fake.getTrackStatsMutex.RUnlock()
fake.hasSdpCidMutex.RLock()
defer fake.hasSdpCidMutex.RUnlock()
fake.iDMutex.RLock()
@@ -325,6 +325,16 @@ type FakeLocalParticipant struct {
getSubscribedTracksReturnsOnCall map[int]struct {
result1 []types.SubscribedTrack
}
GetTrafficLoadStub func() *types.TrafficLoad
getTrafficLoadMutex sync.RWMutex
getTrafficLoadArgsForCall []struct {
}
getTrafficLoadReturns struct {
result1 *types.TrafficLoad
}
getTrafficLoadReturnsOnCall map[int]struct {
result1 *types.TrafficLoad
}
GetTrailerStub func() []byte
getTrailerMutex sync.RWMutex
getTrailerArgsForCall []struct {
@@ -582,6 +592,11 @@ type FakeLocalParticipant struct {
onTrackUpdatedArgsForCall []struct {
arg1 func(types.LocalParticipant, types.MediaTrack)
}
OnTrafficLoadStub func(func(trafficLoad *types.TrafficLoad))
onTrafficLoadMutex sync.RWMutex
onTrafficLoadArgsForCall []struct {
arg1 func(trafficLoad *types.TrafficLoad)
}
ProtocolVersionStub func() types.ProtocolVersion
protocolVersionMutex sync.RWMutex
protocolVersionArgsForCall []struct {
@@ -2540,6 +2555,59 @@ func (fake *FakeLocalParticipant) GetSubscribedTracksReturnsOnCall(i int, result
}{result1}
}
func (fake *FakeLocalParticipant) GetTrafficLoad() *types.TrafficLoad {
fake.getTrafficLoadMutex.Lock()
ret, specificReturn := fake.getTrafficLoadReturnsOnCall[len(fake.getTrafficLoadArgsForCall)]
fake.getTrafficLoadArgsForCall = append(fake.getTrafficLoadArgsForCall, struct {
}{})
stub := fake.GetTrafficLoadStub
fakeReturns := fake.getTrafficLoadReturns
fake.recordInvocation("GetTrafficLoad", []interface{}{})
fake.getTrafficLoadMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) GetTrafficLoadCallCount() int {
fake.getTrafficLoadMutex.RLock()
defer fake.getTrafficLoadMutex.RUnlock()
return len(fake.getTrafficLoadArgsForCall)
}
func (fake *FakeLocalParticipant) GetTrafficLoadCalls(stub func() *types.TrafficLoad) {
fake.getTrafficLoadMutex.Lock()
defer fake.getTrafficLoadMutex.Unlock()
fake.GetTrafficLoadStub = stub
}
func (fake *FakeLocalParticipant) GetTrafficLoadReturns(result1 *types.TrafficLoad) {
fake.getTrafficLoadMutex.Lock()
defer fake.getTrafficLoadMutex.Unlock()
fake.GetTrafficLoadStub = nil
fake.getTrafficLoadReturns = struct {
result1 *types.TrafficLoad
}{result1}
}
func (fake *FakeLocalParticipant) GetTrafficLoadReturnsOnCall(i int, result1 *types.TrafficLoad) {
fake.getTrafficLoadMutex.Lock()
defer fake.getTrafficLoadMutex.Unlock()
fake.GetTrafficLoadStub = nil
if fake.getTrafficLoadReturnsOnCall == nil {
fake.getTrafficLoadReturnsOnCall = make(map[int]struct {
result1 *types.TrafficLoad
})
}
fake.getTrafficLoadReturnsOnCall[i] = struct {
result1 *types.TrafficLoad
}{result1}
}
func (fake *FakeLocalParticipant) GetTrailer() []byte {
fake.getTrailerMutex.Lock()
ret, specificReturn := fake.getTrailerReturnsOnCall[len(fake.getTrailerArgsForCall)]
@@ -3992,6 +4060,38 @@ func (fake *FakeLocalParticipant) OnTrackUpdatedArgsForCall(i int) func(types.Lo
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnTrafficLoad(arg1 func(trafficLoad *types.TrafficLoad)) {
fake.onTrafficLoadMutex.Lock()
fake.onTrafficLoadArgsForCall = append(fake.onTrafficLoadArgsForCall, struct {
arg1 func(trafficLoad *types.TrafficLoad)
}{arg1})
stub := fake.OnTrafficLoadStub
fake.recordInvocation("OnTrafficLoad", []interface{}{arg1})
fake.onTrafficLoadMutex.Unlock()
if stub != nil {
fake.OnTrafficLoadStub(arg1)
}
}
func (fake *FakeLocalParticipant) OnTrafficLoadCallCount() int {
fake.onTrafficLoadMutex.RLock()
defer fake.onTrafficLoadMutex.RUnlock()
return len(fake.onTrafficLoadArgsForCall)
}
func (fake *FakeLocalParticipant) OnTrafficLoadCalls(stub func(func(trafficLoad *types.TrafficLoad))) {
fake.onTrafficLoadMutex.Lock()
defer fake.onTrafficLoadMutex.Unlock()
fake.OnTrafficLoadStub = stub
}
func (fake *FakeLocalParticipant) OnTrafficLoadArgsForCall(i int) func(trafficLoad *types.TrafficLoad) {
fake.onTrafficLoadMutex.RLock()
defer fake.onTrafficLoadMutex.RUnlock()
argsForCall := fake.onTrafficLoadArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) ProtocolVersion() types.ProtocolVersion {
fake.protocolVersionMutex.Lock()
ret, specificReturn := fake.protocolVersionReturnsOnCall[len(fake.protocolVersionArgsForCall)]
@@ -6074,6 +6174,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.getSubscribedParticipantsMutex.RUnlock()
fake.getSubscribedTracksMutex.RLock()
defer fake.getSubscribedTracksMutex.RUnlock()
fake.getTrafficLoadMutex.RLock()
defer fake.getTrafficLoadMutex.RUnlock()
fake.getTrailerMutex.RLock()
defer fake.getTrailerMutex.RUnlock()
fake.handleAnswerMutex.RLock()
@@ -6142,6 +6244,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.onTrackUnpublishedMutex.RUnlock()
fake.onTrackUpdatedMutex.RLock()
defer fake.onTrackUpdatedMutex.RUnlock()
fake.onTrafficLoadMutex.RLock()
defer fake.onTrafficLoadMutex.RUnlock()
fake.protocolVersionMutex.RLock()
defer fake.protocolVersionMutex.RUnlock()
fake.removePublishedTrackMutex.RLock()
+7
View File
@@ -323,3 +323,10 @@ func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, re
}
return 0, errors.New("receiver not available")
}
func (d *DummyReceiver) GetTrackStats() *livekit.RTPStats {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetTrackStats()
}
return nil
}
+2 -2
View File
@@ -322,7 +322,7 @@ func (s *IOInfoService) GetSIPTrunkAuthentication(ctx context.Context, req *rpc.
return nil, err
}
return &rpc.GetSIPTrunkAuthenticationResponse{
Username: trunk.Username,
Password: trunk.Password,
Username: trunk.InboundUsername,
Password: trunk.InboundPassword,
}, nil
}
+4 -2
View File
@@ -65,8 +65,10 @@ func (s *SIPService) CreateSIPTrunk(ctx context.Context, req *livekit.CreateSIPT
OutboundAddress: req.OutboundAddress,
OutboundNumber: req.OutboundNumber,
InboundNumbersRegex: req.InboundNumbersRegex,
Username: req.Username,
Password: req.Password,
InboundUsername: req.InboundUsername,
InboundPassword: req.InboundPassword,
OutboundUsername: req.OutboundUsername,
OutboundPassword: req.OutboundPassword,
}
if err := s.store.StoreSIPTrunk(ctx, info); err != nil {
+3 -167
View File
@@ -24,6 +24,7 @@ import (
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)
const (
@@ -758,7 +759,7 @@ func (r *rtpStatsBase) toProto(
}
if gapsPresent {
p.GapHistogram = make(map[int32]uint32, cGapHistogramNumBins)
p.GapHistogram = make(map[int32]uint32, len(r.gapHistogram))
for i := 0; i < len(r.gapHistogram); i++ {
if r.gapHistogram[i] == 0 {
continue
@@ -915,172 +916,7 @@ func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snaps
// ----------------------------------
func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
if len(statsList) == 0 {
return nil
}
startTime := time.Time{}
endTime := time.Time{}
packets := uint32(0)
bytes := uint64(0)
headerBytes := uint64(0)
packetsLost := uint32(0)
packetsDuplicate := uint32(0)
bytesDuplicate := uint64(0)
headerBytesDuplicate := uint64(0)
packetsPadding := uint32(0)
bytesPadding := uint64(0)
headerBytesPadding := uint64(0)
packetsOutOfOrder := uint32(0)
frames := uint32(0)
keyFrames := uint32(0)
lastKeyFrame := time.Time{}
jitter := 0.0
maxJitter := float64(0)
gapHistogram := make(map[int32]uint32, cGapHistogramNumBins)
nacks := uint32(0)
nackAcks := uint32(0)
nackMisses := uint32(0)
nackRepeated := uint32(0)
plis := uint32(0)
lastPli := time.Time{}
layerLockPlis := uint32(0)
lastLayerLockPli := time.Time{}
firs := uint32(0)
lastFir := time.Time{}
rtt := uint32(0)
maxRtt := uint32(0)
for _, stats := range statsList {
if startTime.IsZero() || startTime.After(stats.StartTime.AsTime()) {
startTime = stats.StartTime.AsTime()
}
if endTime.IsZero() || endTime.Before(stats.EndTime.AsTime()) {
endTime = stats.EndTime.AsTime()
}
packets += stats.Packets
bytes += stats.Bytes
headerBytes += stats.HeaderBytes
packetsLost += stats.PacketsLost
packetsDuplicate += stats.PacketsDuplicate
bytesDuplicate += stats.BytesDuplicate
headerBytesDuplicate += stats.HeaderBytesDuplicate
packetsPadding += stats.PacketsPadding
bytesPadding += stats.BytesPadding
headerBytesPadding += stats.HeaderBytesPadding
packetsOutOfOrder += stats.PacketsOutOfOrder
frames += stats.Frames
keyFrames += stats.KeyFrames
if lastKeyFrame.IsZero() || lastKeyFrame.Before(stats.LastKeyFrame.AsTime()) {
lastKeyFrame = stats.LastKeyFrame.AsTime()
}
jitter += stats.JitterCurrent
if stats.JitterMax > maxJitter {
maxJitter = stats.JitterMax
}
for burst, count := range stats.GapHistogram {
gapHistogram[burst] += count
}
nacks += stats.Nacks
nackAcks += stats.NackAcks
nackMisses += stats.NackMisses
nackRepeated += stats.NackRepeated
plis += stats.Plis
if lastPli.IsZero() || lastPli.Before(stats.LastPli.AsTime()) {
lastPli = stats.LastPli.AsTime()
}
layerLockPlis += stats.LayerLockPlis
if lastLayerLockPli.IsZero() || lastLayerLockPli.Before(stats.LastLayerLockPli.AsTime()) {
lastLayerLockPli = stats.LastLayerLockPli.AsTime()
}
firs += stats.Firs
if lastFir.IsZero() || lastPli.Before(stats.LastFir.AsTime()) {
lastFir = stats.LastFir.AsTime()
}
rtt += stats.RttCurrent
if stats.RttMax > maxRtt {
maxRtt = stats.RttMax
}
}
if endTime.IsZero() {
endTime = time.Now()
}
elapsed := endTime.Sub(startTime).Seconds()
packetLostRate := float64(packetsLost) / elapsed
packetLostPercentage := float32(packetsLost) / (float32(packets) + float32(packetsLost)) * 100.0
packetRate := float64(packets) / elapsed
packetDuplicateRate := float64(packetsDuplicate) / elapsed
packetPaddingRate := float64(packetsPadding) / elapsed
bitrate := float64(bytes) * 8.0 / elapsed
bitrateDuplicate := float64(bytesDuplicate) * 8.0 / elapsed
bitratePadding := float64(bytesPadding) * 8.0 / elapsed
frameRate := float64(frames) / elapsed
return &livekit.RTPStats{
StartTime: timestamppb.New(startTime),
EndTime: timestamppb.New(endTime),
Duration: elapsed,
Packets: packets,
PacketRate: packetRate,
Bytes: bytes,
HeaderBytes: headerBytes,
Bitrate: bitrate,
PacketsLost: packetsLost,
PacketLossRate: packetLostRate,
PacketLossPercentage: packetLostPercentage,
PacketsDuplicate: packetsDuplicate,
PacketDuplicateRate: packetDuplicateRate,
BytesDuplicate: bytesDuplicate,
HeaderBytesDuplicate: headerBytesDuplicate,
BitrateDuplicate: bitrateDuplicate,
PacketsPadding: packetsPadding,
PacketPaddingRate: packetPaddingRate,
BytesPadding: bytesPadding,
HeaderBytesPadding: headerBytesPadding,
BitratePadding: bitratePadding,
PacketsOutOfOrder: packetsOutOfOrder,
Frames: frames,
FrameRate: frameRate,
KeyFrames: keyFrames,
LastKeyFrame: timestamppb.New(lastKeyFrame),
JitterCurrent: jitter / float64(len(statsList)),
JitterMax: maxJitter,
GapHistogram: gapHistogram,
Nacks: nacks,
NackAcks: nackAcks,
NackMisses: nackMisses,
NackRepeated: nackRepeated,
Plis: plis,
LastPli: timestamppb.New(lastPli),
LayerLockPlis: layerLockPlis,
LastLayerLockPli: timestamppb.New(lastLayerLockPli),
Firs: firs,
LastFir: timestamppb.New(lastFir),
RttCurrent: rtt / uint32(len(statsList)),
RttMax: maxRtt,
// no aggregation for drift calculations
}
return utils.AggregateRTPStats(statsList, cGapHistogramNumBins)
}
func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
+3 -1
View File
@@ -82,6 +82,8 @@ type TrackReceiver interface {
GetCalculatedClockRate(layer int32) uint32
GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error)
GetTrackStats() *livekit.RTPStats
}
// WebRTCReceiver receives a media track
@@ -558,7 +560,7 @@ func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
var stats []*livekit.RTPStats
stats := make([]*livekit.RTPStats, 0, len(w.buffers))
for _, buff := range w.buffers {
if buff == nil {
continue
+33 -5
View File
@@ -32,13 +32,27 @@ const (
BytesTrackTypeSignal BytesTrackType = "SG"
)
// -------------------------------
type TrafficTotals struct {
At time.Time
SendBytes uint64
SendMessages uint32
RecvBytes uint64
RecvMessages uint32
}
// --------------------------------
// stats for signal and data channel
type BytesTrackStats struct {
trackID livekit.TrackID
pID livekit.ParticipantID
send, recv atomic.Uint64
telemetry TelemetryService
isStopped atomic.Bool
trackID livekit.TrackID
pID livekit.ParticipantID
send, recv atomic.Uint64
totalSendBytes, totalRecvBytes atomic.Uint64
totalSendMessages, totalRecvMessages atomic.Uint32
telemetry TelemetryService
isStopped atomic.Bool
}
func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, telemetry TelemetryService) *BytesTrackStats {
@@ -54,8 +68,22 @@ func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, tele
func (s *BytesTrackStats) AddBytes(bytes uint64, isSend bool) {
if isSend {
s.send.Add(bytes)
s.totalSendBytes.Add(bytes)
s.totalSendMessages.Inc()
} else {
s.recv.Add(bytes)
s.totalRecvBytes.Add(bytes)
s.totalRecvMessages.Inc()
}
}
func (s *BytesTrackStats) GetTrafficTotals() *TrafficTotals {
return &TrafficTotals{
At: time.Now(),
SendBytes: s.totalSendBytes.Load(),
SendMessages: s.totalSendMessages.Load(),
RecvBytes: s.totalRecvBytes.Load(),
RecvMessages: s.totalRecvMessages.Load(),
}
}