diff --git a/go.mod b/go.mod index c93bc5d8c..c6af6151f 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 + github.com/livekit/protocol v1.9.2-0.20231126171322-3f612979d8c9 github.com/livekit/psrpc v0.5.2 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -70,7 +70,7 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.5 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/josharian/native v1.1.0 // indirect - github.com/klauspost/compress v1.17.2 // indirect + github.com/klauspost/compress v1.17.3 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect @@ -101,7 +101,7 @@ require ( golang.org/x/sys v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.15.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/grpc v1.59.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 909a503d1..e0ede4f38 100644 --- a/go.sum +++ b/go.sum @@ -107,8 +107,8 @@ github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786 h1:N527AHMa79 github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786/go.mod h1:v4hqbTdfQngbVSZJVWUhGE/lbTFf9jb+ygmNUDQMuOs= github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= +github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 h1:M/ljEz6MCH5lovoTT0t6hyaaZJEn4hvXs9J9OtQ+gS4= -github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7/go.mod h1:JgFHHd99wgEp4smATlJupOdA7iJHFoj2g3RFeM/Hk8M= +github.com/livekit/protocol v1.9.2-0.20231126171322-3f612979d8c9 h1:7/S8gXZCWbtx46xHdYSBg4XqnMVpAt+YfHMTZYmWzkQ= +github.com/livekit/protocol v1.9.2-0.20231126171322-3f612979d8c9/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U= github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -413,8 +413,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f h1:ultW7fxlIvee4HYrtnaRPon9HpEgFk5zYpmfMgtKB5I= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f/go.mod h1:L9KNLi232K1/xB6f7AlSX692koaRnKaWSR0stBki0Yc= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 692885a25..20883e19a 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -828,4 +828,20 @@ func (t *MediaTrackReceiver) IsEncrypted() bool { return t.trackInfo.Encryption != livekit.Encryption_NONE } +func (t *MediaTrackReceiver) GetTrackStats() *livekit.RTPStats { + t.lock.Lock() + receivers := t.receiversShadow + t.lock.Unlock() + + stats := make([]*livekit.RTPStats, 0, len(receivers)) + for _, receiver := range receivers { + receiverStats := receiver.GetTrackStats() + if receiverStats != nil { + stats = append(stats, receiverStats) + } + } + + return buffer.AggregateRTPStats(stats) +} + // --------------------------- diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 09f3c77b9..ca1d0c9f2 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -167,6 +167,7 @@ type ParticipantImpl struct { *TransportManager *UpTrackManager *SubscriptionManager + *ParticipantTrafficLoad // keeps track of unpublished tracks in order to reuse trackID unpublishedTracks []*livekit.TrackInfo @@ -269,6 +270,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { p.setupUpTrackManager() p.setupSubscriptionManager() + p.setupParticipantTrafficLoad() return p, nil } @@ -784,6 +786,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea go func() { p.SubscriptionManager.Close(isExpectedToResume) p.TransportManager.Close() + p.ParticipantTrafficLoad.Close() }() p.dataChannelStats.Stop() @@ -915,10 +918,6 @@ func (p *ParticipantImpl) OnICEConfigChanged(f func(participant types.LocalParti p.lock.Unlock() } -// -// signal connection methods -// - func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo { numTracks := 0 minQuality := livekit.ConnectionQuality_EXCELLENT @@ -1227,6 +1226,14 @@ func (p *ParticipantImpl) setupSubscriptionManager() { }) } +func (p *ParticipantImpl) setupParticipantTrafficLoad() { + p.ParticipantTrafficLoad = NewParticipantTrafficLoad(ParticipantTrafficLoadParams{ + Participant: p, + DataChannelStats: p.dataChannelStats, + Logger: p.params.Logger, + }) +} + func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) { oldState := p.State() if state == oldState { diff --git a/pkg/rtc/participant_traffic_load.go b/pkg/rtc/participant_traffic_load.go new file mode 100644 index 000000000..eef44ad5e --- /dev/null +++ b/pkg/rtc/participant_traffic_load.go @@ -0,0 +1,211 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rtc + +import ( + "sync" + "time" + + "github.com/frostbyte73/core" + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +const ( + reportInterval = 10 * time.Second +) + +type ParticipantTrafficLoadParams struct { + Participant *ParticipantImpl + DataChannelStats *telemetry.BytesTrackStats + Logger logger.Logger +} + +type ParticipantTrafficLoad struct { + params ParticipantTrafficLoadParams + + lock sync.RWMutex + onTrafficLoad func(trafficLoad *types.TrafficLoad) + tracksStatsMedia map[livekit.TrackID]*livekit.RTPStats + dataChannelTraffic *telemetry.TrafficTotals + trafficLoad *types.TrafficLoad + + closed core.Fuse +} + +func NewParticipantTrafficLoad(params ParticipantTrafficLoadParams) *ParticipantTrafficLoad { + p := &ParticipantTrafficLoad{ + params: params, + tracksStatsMedia: make(map[livekit.TrackID]*livekit.RTPStats), + closed: core.NewFuse(), + } + go p.reporter() + return p +} + +func (p *ParticipantTrafficLoad) Close() { + p.closed.Break() +} + +func (p *ParticipantTrafficLoad) OnTrafficLoad(f func(trafficLoad *types.TrafficLoad)) { + p.lock.Lock() + p.onTrafficLoad = f + p.lock.Unlock() +} + +func (p *ParticipantTrafficLoad) getOnTrafficLoad() func(trafficLoad *types.TrafficLoad) { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.onTrafficLoad +} + +func (p *ParticipantTrafficLoad) GetTrafficLoad() *types.TrafficLoad { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.trafficLoad +} + +func (p *ParticipantTrafficLoad) updateTrafficLoad() *types.TrafficLoad { + publishedTracks := p.params.Participant.GetPublishedTracks() + subscribedTracks := p.params.Participant.SubscriptionManager.GetSubscribedTracks() + + availableTracks := make(map[livekit.TrackID]bool, len(publishedTracks)+len(subscribedTracks)) + + upstreamAudioStats := make([]*types.TrafficStats, 0, len(publishedTracks)) + upstreamVideoStats := make([]*types.TrafficStats, 0, len(publishedTracks)) + + downstreamAudioStats := make([]*types.TrafficStats, 0, len(subscribedTracks)) + downstreamVideoStats := make([]*types.TrafficStats, 0, len(subscribedTracks)) + + p.lock.Lock() + defer p.lock.Unlock() + for _, pt := range publishedTracks { + lmt, ok := pt.(types.LocalMediaTrack) + if !ok { + continue + } + trackID := lmt.ID() + stats := lmt.GetTrackStats() + trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats) + if stats != nil { + p.tracksStatsMedia[trackID] = stats + availableTracks[trackID] = true + } + if trafficStats != nil { + switch lmt.Kind() { + case livekit.TrackType_AUDIO: + upstreamAudioStats = append(upstreamAudioStats, trafficStats) + case livekit.TrackType_VIDEO: + upstreamVideoStats = append(upstreamVideoStats, trafficStats) + } + } + } + + for _, st := range subscribedTracks { + trackID := st.ID() + stats := st.DownTrack().GetTrackStats() + trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats) + if stats != nil { + p.tracksStatsMedia[trackID] = stats + availableTracks[trackID] = true + } + if trafficStats != nil { + switch st.MediaTrack().Kind() { + case livekit.TrackType_AUDIO: + downstreamAudioStats = append(downstreamAudioStats, trafficStats) + case livekit.TrackType_VIDEO: + downstreamVideoStats = append(downstreamVideoStats, trafficStats) + } + } + } + + // remove unavailable tracks from track stats cache + for trackID := range p.tracksStatsMedia { + if !availableTracks[trackID] { + delete(p.tracksStatsMedia, trackID) + } + } + + trafficTypeStats := make([]*types.TrafficTypeStats, 0, 6) + addTypeStats := func(statsList []*types.TrafficStats, trackType livekit.TrackType, streamType livekit.StreamType) { + agg := types.AggregateTrafficStats(statsList) + if agg != nil { + trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{ + TrackType: trackType, + StreamType: streamType, + TrafficStats: agg, + }) + } + } + addTypeStats(upstreamAudioStats, livekit.TrackType_AUDIO, livekit.StreamType_UPSTREAM) + addTypeStats(upstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_UPSTREAM) + addTypeStats(downstreamAudioStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM) + addTypeStats(downstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM) + + if p.params.DataChannelStats != nil { + dataChannelTraffic := p.params.DataChannelStats.GetTrafficTotals() + if p.dataChannelTraffic != nil { + trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{ + TrackType: livekit.TrackType_DATA, + StreamType: livekit.StreamType_UPSTREAM, + TrafficStats: &types.TrafficStats{ + StartTime: p.dataChannelTraffic.At, + EndTime: dataChannelTraffic.At, + Packets: dataChannelTraffic.RecvMessages - p.dataChannelTraffic.RecvMessages, + Bytes: dataChannelTraffic.RecvBytes - p.dataChannelTraffic.RecvBytes, + }, + }) + + trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{ + TrackType: livekit.TrackType_DATA, + StreamType: livekit.StreamType_DOWNSTREAM, + TrafficStats: &types.TrafficStats{ + StartTime: p.dataChannelTraffic.At, + EndTime: dataChannelTraffic.At, + Packets: dataChannelTraffic.SendMessages - p.dataChannelTraffic.SendMessages, + Bytes: dataChannelTraffic.SendBytes - p.dataChannelTraffic.SendBytes, + }, + }) + } + p.dataChannelTraffic = dataChannelTraffic + } + + p.trafficLoad = &types.TrafficLoad{ + TrafficTypeStats: trafficTypeStats, + } + return p.trafficLoad +} + +func (p *ParticipantTrafficLoad) reporter() { + ticker := time.NewTicker(reportInterval) + defer ticker.Stop() + + for { + select { + case <-p.closed.Watch(): + return + + case <-ticker.C: + trafficLoad := p.updateTrafficLoad() + if onTrafficLoad := p.getOnTrafficLoad(); onTrafficLoad != nil { + onTrafficLoad(trafficLoad) + } + } + } +} diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index bef8abb6e..266e24b8b 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -395,6 +395,7 @@ type LocalParticipant interface { OnClose(callback func(LocalParticipant)) OnClaimsChanged(callback func(LocalParticipant)) OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) + OnTrafficLoad(callback func(trafficLoad *TrafficLoad)) // session migration MaybeStartMigration(force bool, onStart func()) bool @@ -420,6 +421,8 @@ type LocalParticipant interface { SetSubscriberChannelCapacity(channelCapacity int64) GetPacer() pacer.Pacer + + GetTrafficLoad() *TrafficLoad } // Room is a container of participants, and can provide room-level actions @@ -499,6 +502,7 @@ type LocalMediaTrack interface { HasSdpCid(cid string) bool GetConnectionScoreAndQuality() (float32, livekit.ConnectionQuality) + GetTrackStats() *livekit.RTPStats SetRTT(rtt uint32) diff --git a/pkg/rtc/types/trafficstats.go b/pkg/rtc/types/trafficstats.go new file mode 100644 index 000000000..70b269db8 --- /dev/null +++ b/pkg/rtc/types/trafficstats.go @@ -0,0 +1,126 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "time" + + "github.com/livekit/protocol/livekit" +) + +type TrafficStats struct { + StartTime time.Time + EndTime time.Time + Packets uint32 + Bytes uint64 +} + +type TrafficTypeStats struct { + TrackType livekit.TrackType + StreamType livekit.StreamType + TrafficStats *TrafficStats +} + +type TrafficLoad struct { + TrafficTypeStats []*TrafficTypeStats +} + +func RTPStatsDiffToTrafficStats(before, after *livekit.RTPStats) *TrafficStats { + if after == nil { + return nil + } + + startTime := after.StartTime + if before != nil { + startTime = before.EndTime + } + + if before == nil { + return &TrafficStats{ + StartTime: startTime.AsTime(), + EndTime: after.EndTime.AsTime(), + Packets: after.Packets, + Bytes: after.Bytes + after.BytesDuplicate + after.BytesPadding, + } + } + + return &TrafficStats{ + StartTime: startTime.AsTime(), + EndTime: after.EndTime.AsTime(), + Packets: after.Packets - before.Packets, + Bytes: (after.Bytes + after.BytesDuplicate + after.BytesPadding) - (before.Bytes + before.BytesDuplicate + before.BytesPadding), + } +} + +func AggregateTrafficStats(statsList []*TrafficStats) *TrafficStats { + if len(statsList) == 0 { + return nil + } + + startTime := time.Time{} + endTime := time.Time{} + + packets := uint32(0) + bytes := uint64(0) + + for _, stats := range statsList { + if startTime.IsZero() || startTime.After(stats.StartTime) { + startTime = stats.StartTime + } + + if endTime.IsZero() || endTime.Before(stats.EndTime) { + endTime = stats.EndTime + } + + packets += stats.Packets + bytes += stats.Bytes + } + + if endTime.IsZero() { + endTime = time.Now() + } + return &TrafficStats{ + StartTime: startTime, + EndTime: endTime, + Packets: packets, + Bytes: bytes, + } +} + +func TrafficLoadToTrafficRate(trafficLoad *TrafficLoad) ( + packetRateIn float64, + byteRateIn float64, + packetRateOut float64, + byteRateOut float64, +) { + if trafficLoad == nil { + return + } + + for _, trafficTypeStat := range trafficLoad.TrafficTypeStats { + elapsed := trafficTypeStat.TrafficStats.EndTime.Sub(trafficTypeStat.TrafficStats.StartTime).Seconds() + packetRate := float64(trafficTypeStat.TrafficStats.Packets) / elapsed + byteRate := float64(trafficTypeStat.TrafficStats.Bytes) / elapsed + switch trafficTypeStat.StreamType { + case livekit.StreamType_UPSTREAM: + packetRateIn += packetRate + byteRateIn += byteRate + case livekit.StreamType_DOWNSTREAM: + packetRateOut += packetRate + byteRateOut += byteRate + } + } + return +} diff --git a/pkg/rtc/types/typesfakes/fake_local_media_track.go b/pkg/rtc/types/typesfakes/fake_local_media_track.go index 83fdb1580..a606f8ef1 100644 --- a/pkg/rtc/types/typesfakes/fake_local_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_local_media_track.go @@ -107,6 +107,16 @@ type FakeLocalMediaTrack struct { getTemporalLayerForSpatialFpsReturnsOnCall map[int]struct { result1 int32 } + GetTrackStatsStub func() *livekit.RTPStats + getTrackStatsMutex sync.RWMutex + getTrackStatsArgsForCall []struct { + } + getTrackStatsReturns struct { + result1 *livekit.RTPStats + } + getTrackStatsReturnsOnCall map[int]struct { + result1 *livekit.RTPStats + } HasSdpCidStub func(string) bool hasSdpCidMutex sync.RWMutex hasSdpCidArgsForCall []struct { @@ -834,6 +844,59 @@ func (fake *FakeLocalMediaTrack) GetTemporalLayerForSpatialFpsReturnsOnCall(i in }{result1} } +func (fake *FakeLocalMediaTrack) GetTrackStats() *livekit.RTPStats { + fake.getTrackStatsMutex.Lock() + ret, specificReturn := fake.getTrackStatsReturnsOnCall[len(fake.getTrackStatsArgsForCall)] + fake.getTrackStatsArgsForCall = append(fake.getTrackStatsArgsForCall, struct { + }{}) + stub := fake.GetTrackStatsStub + fakeReturns := fake.getTrackStatsReturns + fake.recordInvocation("GetTrackStats", []interface{}{}) + fake.getTrackStatsMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalMediaTrack) GetTrackStatsCallCount() int { + fake.getTrackStatsMutex.RLock() + defer fake.getTrackStatsMutex.RUnlock() + return len(fake.getTrackStatsArgsForCall) +} + +func (fake *FakeLocalMediaTrack) GetTrackStatsCalls(stub func() *livekit.RTPStats) { + fake.getTrackStatsMutex.Lock() + defer fake.getTrackStatsMutex.Unlock() + fake.GetTrackStatsStub = stub +} + +func (fake *FakeLocalMediaTrack) GetTrackStatsReturns(result1 *livekit.RTPStats) { + fake.getTrackStatsMutex.Lock() + defer fake.getTrackStatsMutex.Unlock() + fake.GetTrackStatsStub = nil + fake.getTrackStatsReturns = struct { + result1 *livekit.RTPStats + }{result1} +} + +func (fake *FakeLocalMediaTrack) GetTrackStatsReturnsOnCall(i int, result1 *livekit.RTPStats) { + fake.getTrackStatsMutex.Lock() + defer fake.getTrackStatsMutex.Unlock() + fake.GetTrackStatsStub = nil + if fake.getTrackStatsReturnsOnCall == nil { + fake.getTrackStatsReturnsOnCall = make(map[int]struct { + result1 *livekit.RTPStats + }) + } + fake.getTrackStatsReturnsOnCall[i] = struct { + result1 *livekit.RTPStats + }{result1} +} + func (fake *FakeLocalMediaTrack) HasSdpCid(arg1 string) bool { fake.hasSdpCidMutex.Lock() ret, specificReturn := fake.hasSdpCidReturnsOnCall[len(fake.hasSdpCidArgsForCall)] @@ -2069,6 +2132,8 @@ func (fake *FakeLocalMediaTrack) Invocations() map[string][][]interface{} { defer fake.getQualityForDimensionMutex.RUnlock() fake.getTemporalLayerForSpatialFpsMutex.RLock() defer fake.getTemporalLayerForSpatialFpsMutex.RUnlock() + fake.getTrackStatsMutex.RLock() + defer fake.getTrackStatsMutex.RUnlock() fake.hasSdpCidMutex.RLock() defer fake.hasSdpCidMutex.RUnlock() fake.iDMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 9e11470c3..a01a3834b 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -325,6 +325,16 @@ type FakeLocalParticipant struct { getSubscribedTracksReturnsOnCall map[int]struct { result1 []types.SubscribedTrack } + GetTrafficLoadStub func() *types.TrafficLoad + getTrafficLoadMutex sync.RWMutex + getTrafficLoadArgsForCall []struct { + } + getTrafficLoadReturns struct { + result1 *types.TrafficLoad + } + getTrafficLoadReturnsOnCall map[int]struct { + result1 *types.TrafficLoad + } GetTrailerStub func() []byte getTrailerMutex sync.RWMutex getTrailerArgsForCall []struct { @@ -582,6 +592,11 @@ type FakeLocalParticipant struct { onTrackUpdatedArgsForCall []struct { arg1 func(types.LocalParticipant, types.MediaTrack) } + OnTrafficLoadStub func(func(trafficLoad *types.TrafficLoad)) + onTrafficLoadMutex sync.RWMutex + onTrafficLoadArgsForCall []struct { + arg1 func(trafficLoad *types.TrafficLoad) + } ProtocolVersionStub func() types.ProtocolVersion protocolVersionMutex sync.RWMutex protocolVersionArgsForCall []struct { @@ -2540,6 +2555,59 @@ func (fake *FakeLocalParticipant) GetSubscribedTracksReturnsOnCall(i int, result }{result1} } +func (fake *FakeLocalParticipant) GetTrafficLoad() *types.TrafficLoad { + fake.getTrafficLoadMutex.Lock() + ret, specificReturn := fake.getTrafficLoadReturnsOnCall[len(fake.getTrafficLoadArgsForCall)] + fake.getTrafficLoadArgsForCall = append(fake.getTrafficLoadArgsForCall, struct { + }{}) + stub := fake.GetTrafficLoadStub + fakeReturns := fake.getTrafficLoadReturns + fake.recordInvocation("GetTrafficLoad", []interface{}{}) + fake.getTrafficLoadMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) GetTrafficLoadCallCount() int { + fake.getTrafficLoadMutex.RLock() + defer fake.getTrafficLoadMutex.RUnlock() + return len(fake.getTrafficLoadArgsForCall) +} + +func (fake *FakeLocalParticipant) GetTrafficLoadCalls(stub func() *types.TrafficLoad) { + fake.getTrafficLoadMutex.Lock() + defer fake.getTrafficLoadMutex.Unlock() + fake.GetTrafficLoadStub = stub +} + +func (fake *FakeLocalParticipant) GetTrafficLoadReturns(result1 *types.TrafficLoad) { + fake.getTrafficLoadMutex.Lock() + defer fake.getTrafficLoadMutex.Unlock() + fake.GetTrafficLoadStub = nil + fake.getTrafficLoadReturns = struct { + result1 *types.TrafficLoad + }{result1} +} + +func (fake *FakeLocalParticipant) GetTrafficLoadReturnsOnCall(i int, result1 *types.TrafficLoad) { + fake.getTrafficLoadMutex.Lock() + defer fake.getTrafficLoadMutex.Unlock() + fake.GetTrafficLoadStub = nil + if fake.getTrafficLoadReturnsOnCall == nil { + fake.getTrafficLoadReturnsOnCall = make(map[int]struct { + result1 *types.TrafficLoad + }) + } + fake.getTrafficLoadReturnsOnCall[i] = struct { + result1 *types.TrafficLoad + }{result1} +} + func (fake *FakeLocalParticipant) GetTrailer() []byte { fake.getTrailerMutex.Lock() ret, specificReturn := fake.getTrailerReturnsOnCall[len(fake.getTrailerArgsForCall)] @@ -3992,6 +4060,38 @@ func (fake *FakeLocalParticipant) OnTrackUpdatedArgsForCall(i int) func(types.Lo return argsForCall.arg1 } +func (fake *FakeLocalParticipant) OnTrafficLoad(arg1 func(trafficLoad *types.TrafficLoad)) { + fake.onTrafficLoadMutex.Lock() + fake.onTrafficLoadArgsForCall = append(fake.onTrafficLoadArgsForCall, struct { + arg1 func(trafficLoad *types.TrafficLoad) + }{arg1}) + stub := fake.OnTrafficLoadStub + fake.recordInvocation("OnTrafficLoad", []interface{}{arg1}) + fake.onTrafficLoadMutex.Unlock() + if stub != nil { + fake.OnTrafficLoadStub(arg1) + } +} + +func (fake *FakeLocalParticipant) OnTrafficLoadCallCount() int { + fake.onTrafficLoadMutex.RLock() + defer fake.onTrafficLoadMutex.RUnlock() + return len(fake.onTrafficLoadArgsForCall) +} + +func (fake *FakeLocalParticipant) OnTrafficLoadCalls(stub func(func(trafficLoad *types.TrafficLoad))) { + fake.onTrafficLoadMutex.Lock() + defer fake.onTrafficLoadMutex.Unlock() + fake.OnTrafficLoadStub = stub +} + +func (fake *FakeLocalParticipant) OnTrafficLoadArgsForCall(i int) func(trafficLoad *types.TrafficLoad) { + fake.onTrafficLoadMutex.RLock() + defer fake.onTrafficLoadMutex.RUnlock() + argsForCall := fake.onTrafficLoadArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) ProtocolVersion() types.ProtocolVersion { fake.protocolVersionMutex.Lock() ret, specificReturn := fake.protocolVersionReturnsOnCall[len(fake.protocolVersionArgsForCall)] @@ -6074,6 +6174,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.getSubscribedParticipantsMutex.RUnlock() fake.getSubscribedTracksMutex.RLock() defer fake.getSubscribedTracksMutex.RUnlock() + fake.getTrafficLoadMutex.RLock() + defer fake.getTrafficLoadMutex.RUnlock() fake.getTrailerMutex.RLock() defer fake.getTrailerMutex.RUnlock() fake.handleAnswerMutex.RLock() @@ -6142,6 +6244,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.onTrackUnpublishedMutex.RUnlock() fake.onTrackUpdatedMutex.RLock() defer fake.onTrackUpdatedMutex.RUnlock() + fake.onTrafficLoadMutex.RLock() + defer fake.onTrafficLoadMutex.RUnlock() fake.protocolVersionMutex.RLock() defer fake.protocolVersionMutex.RUnlock() fake.removePublishedTrackMutex.RLock() diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 593a62487..8df479ce7 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -323,3 +323,10 @@ func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, re } return 0, errors.New("receiver not available") } + +func (d *DummyReceiver) GetTrackStats() *livekit.RTPStats { + if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { + return r.GetTrackStats() + } + return nil +} diff --git a/pkg/service/ioservice_sip.go b/pkg/service/ioservice_sip.go index d427c5e2b..3f0b34042 100644 --- a/pkg/service/ioservice_sip.go +++ b/pkg/service/ioservice_sip.go @@ -322,7 +322,7 @@ func (s *IOInfoService) GetSIPTrunkAuthentication(ctx context.Context, req *rpc. return nil, err } return &rpc.GetSIPTrunkAuthenticationResponse{ - Username: trunk.Username, - Password: trunk.Password, + Username: trunk.InboundUsername, + Password: trunk.InboundPassword, }, nil } diff --git a/pkg/service/sip.go b/pkg/service/sip.go index 4a7fe411a..2118d767b 100644 --- a/pkg/service/sip.go +++ b/pkg/service/sip.go @@ -65,8 +65,10 @@ func (s *SIPService) CreateSIPTrunk(ctx context.Context, req *livekit.CreateSIPT OutboundAddress: req.OutboundAddress, OutboundNumber: req.OutboundNumber, InboundNumbersRegex: req.InboundNumbersRegex, - Username: req.Username, - Password: req.Password, + InboundUsername: req.InboundUsername, + InboundPassword: req.InboundPassword, + OutboundUsername: req.OutboundUsername, + OutboundPassword: req.OutboundPassword, } if err := s.store.StoreSIPTrunk(ctx, info); err != nil { diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 3dac1c4af..e2c406317 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -24,6 +24,7 @@ import ( "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" ) const ( @@ -758,7 +759,7 @@ func (r *rtpStatsBase) toProto( } if gapsPresent { - p.GapHistogram = make(map[int32]uint32, cGapHistogramNumBins) + p.GapHistogram = make(map[int32]uint32, len(r.gapHistogram)) for i := 0; i < len(r.gapHistogram); i++ { if r.gapHistogram[i] == 0 { continue @@ -915,172 +916,7 @@ func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snaps // ---------------------------------- func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { - if len(statsList) == 0 { - return nil - } - - startTime := time.Time{} - endTime := time.Time{} - - packets := uint32(0) - bytes := uint64(0) - headerBytes := uint64(0) - packetsLost := uint32(0) - packetsDuplicate := uint32(0) - bytesDuplicate := uint64(0) - headerBytesDuplicate := uint64(0) - packetsPadding := uint32(0) - bytesPadding := uint64(0) - headerBytesPadding := uint64(0) - packetsOutOfOrder := uint32(0) - frames := uint32(0) - keyFrames := uint32(0) - lastKeyFrame := time.Time{} - jitter := 0.0 - maxJitter := float64(0) - gapHistogram := make(map[int32]uint32, cGapHistogramNumBins) - nacks := uint32(0) - nackAcks := uint32(0) - nackMisses := uint32(0) - nackRepeated := uint32(0) - plis := uint32(0) - lastPli := time.Time{} - layerLockPlis := uint32(0) - lastLayerLockPli := time.Time{} - firs := uint32(0) - lastFir := time.Time{} - rtt := uint32(0) - maxRtt := uint32(0) - - for _, stats := range statsList { - if startTime.IsZero() || startTime.After(stats.StartTime.AsTime()) { - startTime = stats.StartTime.AsTime() - } - - if endTime.IsZero() || endTime.Before(stats.EndTime.AsTime()) { - endTime = stats.EndTime.AsTime() - } - - packets += stats.Packets - bytes += stats.Bytes - headerBytes += stats.HeaderBytes - - packetsLost += stats.PacketsLost - - packetsDuplicate += stats.PacketsDuplicate - bytesDuplicate += stats.BytesDuplicate - headerBytesDuplicate += stats.HeaderBytesDuplicate - - packetsPadding += stats.PacketsPadding - bytesPadding += stats.BytesPadding - headerBytesPadding += stats.HeaderBytesPadding - - packetsOutOfOrder += stats.PacketsOutOfOrder - - frames += stats.Frames - - keyFrames += stats.KeyFrames - if lastKeyFrame.IsZero() || lastKeyFrame.Before(stats.LastKeyFrame.AsTime()) { - lastKeyFrame = stats.LastKeyFrame.AsTime() - } - - jitter += stats.JitterCurrent - if stats.JitterMax > maxJitter { - maxJitter = stats.JitterMax - } - - for burst, count := range stats.GapHistogram { - gapHistogram[burst] += count - } - - nacks += stats.Nacks - nackAcks += stats.NackAcks - nackMisses += stats.NackMisses - nackRepeated += stats.NackRepeated - - plis += stats.Plis - if lastPli.IsZero() || lastPli.Before(stats.LastPli.AsTime()) { - lastPli = stats.LastPli.AsTime() - } - - layerLockPlis += stats.LayerLockPlis - if lastLayerLockPli.IsZero() || lastLayerLockPli.Before(stats.LastLayerLockPli.AsTime()) { - lastLayerLockPli = stats.LastLayerLockPli.AsTime() - } - - firs += stats.Firs - if lastFir.IsZero() || lastPli.Before(stats.LastFir.AsTime()) { - lastFir = stats.LastFir.AsTime() - } - - rtt += stats.RttCurrent - if stats.RttMax > maxRtt { - maxRtt = stats.RttMax - } - } - - if endTime.IsZero() { - endTime = time.Now() - } - elapsed := endTime.Sub(startTime).Seconds() - - packetLostRate := float64(packetsLost) / elapsed - packetLostPercentage := float32(packetsLost) / (float32(packets) + float32(packetsLost)) * 100.0 - - packetRate := float64(packets) / elapsed - packetDuplicateRate := float64(packetsDuplicate) / elapsed - packetPaddingRate := float64(packetsPadding) / elapsed - - bitrate := float64(bytes) * 8.0 / elapsed - bitrateDuplicate := float64(bytesDuplicate) * 8.0 / elapsed - bitratePadding := float64(bytesPadding) * 8.0 / elapsed - - frameRate := float64(frames) / elapsed - - return &livekit.RTPStats{ - StartTime: timestamppb.New(startTime), - EndTime: timestamppb.New(endTime), - Duration: elapsed, - Packets: packets, - PacketRate: packetRate, - Bytes: bytes, - HeaderBytes: headerBytes, - Bitrate: bitrate, - PacketsLost: packetsLost, - PacketLossRate: packetLostRate, - PacketLossPercentage: packetLostPercentage, - PacketsDuplicate: packetsDuplicate, - PacketDuplicateRate: packetDuplicateRate, - BytesDuplicate: bytesDuplicate, - HeaderBytesDuplicate: headerBytesDuplicate, - BitrateDuplicate: bitrateDuplicate, - PacketsPadding: packetsPadding, - PacketPaddingRate: packetPaddingRate, - BytesPadding: bytesPadding, - HeaderBytesPadding: headerBytesPadding, - BitratePadding: bitratePadding, - PacketsOutOfOrder: packetsOutOfOrder, - Frames: frames, - FrameRate: frameRate, - KeyFrames: keyFrames, - LastKeyFrame: timestamppb.New(lastKeyFrame), - JitterCurrent: jitter / float64(len(statsList)), - JitterMax: maxJitter, - GapHistogram: gapHistogram, - Nacks: nacks, - NackAcks: nackAcks, - NackMisses: nackMisses, - NackRepeated: nackRepeated, - Plis: plis, - LastPli: timestamppb.New(lastPli), - LayerLockPlis: layerLockPlis, - LastLayerLockPli: timestamppb.New(lastLayerLockPli), - Firs: firs, - LastFir: timestamppb.New(lastFir), - RttCurrent: rtt / uint32(len(statsList)), - RttMax: maxRtt, - // no aggregation for drift calculations - } + return utils.AggregateRTPStats(statsList, cGapHistogramNumBins) } func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index d07a50615..9afff8e98 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -82,6 +82,8 @@ type TrackReceiver interface { GetCalculatedClockRate(layer int32) uint32 GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) + + GetTrackStats() *livekit.RTPStats } // WebRTCReceiver receives a media track @@ -558,7 +560,7 @@ func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats { w.bufferMu.RLock() defer w.bufferMu.RUnlock() - var stats []*livekit.RTPStats + stats := make([]*livekit.RTPStats, 0, len(w.buffers)) for _, buff := range w.buffers { if buff == nil { continue diff --git a/pkg/telemetry/signalanddatastats.go b/pkg/telemetry/signalanddatastats.go index fb17e61e7..5ed99ebb0 100644 --- a/pkg/telemetry/signalanddatastats.go +++ b/pkg/telemetry/signalanddatastats.go @@ -32,13 +32,27 @@ const ( BytesTrackTypeSignal BytesTrackType = "SG" ) +// ------------------------------- + +type TrafficTotals struct { + At time.Time + SendBytes uint64 + SendMessages uint32 + RecvBytes uint64 + RecvMessages uint32 +} + +// -------------------------------- + // stats for signal and data channel type BytesTrackStats struct { - trackID livekit.TrackID - pID livekit.ParticipantID - send, recv atomic.Uint64 - telemetry TelemetryService - isStopped atomic.Bool + trackID livekit.TrackID + pID livekit.ParticipantID + send, recv atomic.Uint64 + totalSendBytes, totalRecvBytes atomic.Uint64 + totalSendMessages, totalRecvMessages atomic.Uint32 + telemetry TelemetryService + isStopped atomic.Bool } func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, telemetry TelemetryService) *BytesTrackStats { @@ -54,8 +68,22 @@ func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, tele func (s *BytesTrackStats) AddBytes(bytes uint64, isSend bool) { if isSend { s.send.Add(bytes) + s.totalSendBytes.Add(bytes) + s.totalSendMessages.Inc() } else { s.recv.Add(bytes) + s.totalRecvBytes.Add(bytes) + s.totalRecvMessages.Inc() + } +} + +func (s *BytesTrackStats) GetTrafficTotals() *TrafficTotals { + return &TrafficTotals{ + At: time.Now(), + SendBytes: s.totalSendBytes.Load(), + SendMessages: s.totalSendMessages.Load(), + RecvBytes: s.totalRecvBytes.Load(), + RecvMessages: s.totalRecvMessages.Load(), } }