From 53542b09a033110c0949bf88ea0976cf099a1466 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 26 Nov 2023 23:05:00 +0530 Subject: [PATCH 01/13] Participant traffic load. (#2262) * Participant traffic load. Capturing information about participant traffic - Upstream/Downstream - Audio/Video/Data - Packets/Bytes This captures a notion of how much traffic load a participant is generating. Can be used to make allocation decisions. * Clean up * SIP patches * reporter goroutine * unlock * move traffic stats from protocol * check type --- go.mod | 6 +- go.sum | 12 +- pkg/rtc/mediatrackreceiver.go | 16 ++ pkg/rtc/participant.go | 15 +- pkg/rtc/participant_traffic_load.go | 211 ++++++++++++++++++ pkg/rtc/types/interfaces.go | 4 + pkg/rtc/types/trafficstats.go | 126 +++++++++++ .../typesfakes/fake_local_media_track.go | 65 ++++++ .../typesfakes/fake_local_participant.go | 104 +++++++++ pkg/rtc/wrappedreceiver.go | 7 + pkg/service/ioservice_sip.go | 4 +- pkg/service/sip.go | 6 +- pkg/sfu/buffer/rtpstats_base.go | 170 +------------- pkg/sfu/receiver.go | 4 +- pkg/telemetry/signalanddatastats.go | 38 +++- 15 files changed, 598 insertions(+), 190 deletions(-) create mode 100644 pkg/rtc/participant_traffic_load.go create mode 100644 pkg/rtc/types/trafficstats.go diff --git a/go.mod b/go.mod index c93bc5d8c..c6af6151f 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 + github.com/livekit/protocol v1.9.2-0.20231126171322-3f612979d8c9 github.com/livekit/psrpc v0.5.2 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -70,7 +70,7 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.5 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/josharian/native v1.1.0 // indirect - github.com/klauspost/compress v1.17.2 // indirect + github.com/klauspost/compress v1.17.3 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect @@ -101,7 +101,7 @@ require ( golang.org/x/sys v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.15.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/grpc v1.59.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 909a503d1..e0ede4f38 100644 --- a/go.sum +++ b/go.sum @@ -107,8 +107,8 @@ github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786 h1:N527AHMa79 github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786/go.mod h1:v4hqbTdfQngbVSZJVWUhGE/lbTFf9jb+ygmNUDQMuOs= github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= +github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 h1:M/ljEz6MCH5lovoTT0t6hyaaZJEn4hvXs9J9OtQ+gS4= -github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7/go.mod h1:JgFHHd99wgEp4smATlJupOdA7iJHFoj2g3RFeM/Hk8M= +github.com/livekit/protocol v1.9.2-0.20231126171322-3f612979d8c9 h1:7/S8gXZCWbtx46xHdYSBg4XqnMVpAt+YfHMTZYmWzkQ= +github.com/livekit/protocol v1.9.2-0.20231126171322-3f612979d8c9/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U= github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -413,8 +413,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f h1:ultW7fxlIvee4HYrtnaRPon9HpEgFk5zYpmfMgtKB5I= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f/go.mod h1:L9KNLi232K1/xB6f7AlSX692koaRnKaWSR0stBki0Yc= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 692885a25..20883e19a 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -828,4 +828,20 @@ func (t *MediaTrackReceiver) IsEncrypted() bool { return t.trackInfo.Encryption != livekit.Encryption_NONE } +func (t *MediaTrackReceiver) GetTrackStats() *livekit.RTPStats { + t.lock.Lock() + receivers := t.receiversShadow + t.lock.Unlock() + + stats := make([]*livekit.RTPStats, 0, len(receivers)) + for _, receiver := range receivers { + receiverStats := receiver.GetTrackStats() + if receiverStats != nil { + stats = append(stats, receiverStats) + } + } + + return buffer.AggregateRTPStats(stats) +} + // --------------------------- diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 09f3c77b9..ca1d0c9f2 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -167,6 +167,7 @@ type ParticipantImpl struct { *TransportManager *UpTrackManager *SubscriptionManager + *ParticipantTrafficLoad // keeps track of unpublished tracks in order to reuse trackID unpublishedTracks []*livekit.TrackInfo @@ -269,6 +270,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { p.setupUpTrackManager() p.setupSubscriptionManager() + p.setupParticipantTrafficLoad() return p, nil } @@ -784,6 +786,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea go func() { p.SubscriptionManager.Close(isExpectedToResume) p.TransportManager.Close() + p.ParticipantTrafficLoad.Close() }() p.dataChannelStats.Stop() @@ -915,10 +918,6 @@ func (p *ParticipantImpl) OnICEConfigChanged(f func(participant types.LocalParti p.lock.Unlock() } -// -// signal connection methods -// - func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo { numTracks := 0 minQuality := livekit.ConnectionQuality_EXCELLENT @@ -1227,6 +1226,14 @@ func (p *ParticipantImpl) setupSubscriptionManager() { }) } +func (p *ParticipantImpl) setupParticipantTrafficLoad() { + p.ParticipantTrafficLoad = NewParticipantTrafficLoad(ParticipantTrafficLoadParams{ + Participant: p, + DataChannelStats: p.dataChannelStats, + Logger: p.params.Logger, + }) +} + func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) { oldState := p.State() if state == oldState { diff --git a/pkg/rtc/participant_traffic_load.go b/pkg/rtc/participant_traffic_load.go new file mode 100644 index 000000000..eef44ad5e --- /dev/null +++ b/pkg/rtc/participant_traffic_load.go @@ -0,0 +1,211 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rtc + +import ( + "sync" + "time" + + "github.com/frostbyte73/core" + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +const ( + reportInterval = 10 * time.Second +) + +type ParticipantTrafficLoadParams struct { + Participant *ParticipantImpl + DataChannelStats *telemetry.BytesTrackStats + Logger logger.Logger +} + +type ParticipantTrafficLoad struct { + params ParticipantTrafficLoadParams + + lock sync.RWMutex + onTrafficLoad func(trafficLoad *types.TrafficLoad) + tracksStatsMedia map[livekit.TrackID]*livekit.RTPStats + dataChannelTraffic *telemetry.TrafficTotals + trafficLoad *types.TrafficLoad + + closed core.Fuse +} + +func NewParticipantTrafficLoad(params ParticipantTrafficLoadParams) *ParticipantTrafficLoad { + p := &ParticipantTrafficLoad{ + params: params, + tracksStatsMedia: make(map[livekit.TrackID]*livekit.RTPStats), + closed: core.NewFuse(), + } + go p.reporter() + return p +} + +func (p *ParticipantTrafficLoad) Close() { + p.closed.Break() +} + +func (p *ParticipantTrafficLoad) OnTrafficLoad(f func(trafficLoad *types.TrafficLoad)) { + p.lock.Lock() + p.onTrafficLoad = f + p.lock.Unlock() +} + +func (p *ParticipantTrafficLoad) getOnTrafficLoad() func(trafficLoad *types.TrafficLoad) { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.onTrafficLoad +} + +func (p *ParticipantTrafficLoad) GetTrafficLoad() *types.TrafficLoad { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.trafficLoad +} + +func (p *ParticipantTrafficLoad) updateTrafficLoad() *types.TrafficLoad { + publishedTracks := p.params.Participant.GetPublishedTracks() + subscribedTracks := p.params.Participant.SubscriptionManager.GetSubscribedTracks() + + availableTracks := make(map[livekit.TrackID]bool, len(publishedTracks)+len(subscribedTracks)) + + upstreamAudioStats := make([]*types.TrafficStats, 0, len(publishedTracks)) + upstreamVideoStats := make([]*types.TrafficStats, 0, len(publishedTracks)) + + downstreamAudioStats := make([]*types.TrafficStats, 0, len(subscribedTracks)) + downstreamVideoStats := make([]*types.TrafficStats, 0, len(subscribedTracks)) + + p.lock.Lock() + defer p.lock.Unlock() + for _, pt := range publishedTracks { + lmt, ok := pt.(types.LocalMediaTrack) + if !ok { + continue + } + trackID := lmt.ID() + stats := lmt.GetTrackStats() + trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats) + if stats != nil { + p.tracksStatsMedia[trackID] = stats + availableTracks[trackID] = true + } + if trafficStats != nil { + switch lmt.Kind() { + case livekit.TrackType_AUDIO: + upstreamAudioStats = append(upstreamAudioStats, trafficStats) + case livekit.TrackType_VIDEO: + upstreamVideoStats = append(upstreamVideoStats, trafficStats) + } + } + } + + for _, st := range subscribedTracks { + trackID := st.ID() + stats := st.DownTrack().GetTrackStats() + trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats) + if stats != nil { + p.tracksStatsMedia[trackID] = stats + availableTracks[trackID] = true + } + if trafficStats != nil { + switch st.MediaTrack().Kind() { + case livekit.TrackType_AUDIO: + downstreamAudioStats = append(downstreamAudioStats, trafficStats) + case livekit.TrackType_VIDEO: + downstreamVideoStats = append(downstreamVideoStats, trafficStats) + } + } + } + + // remove unavailable tracks from track stats cache + for trackID := range p.tracksStatsMedia { + if !availableTracks[trackID] { + delete(p.tracksStatsMedia, trackID) + } + } + + trafficTypeStats := make([]*types.TrafficTypeStats, 0, 6) + addTypeStats := func(statsList []*types.TrafficStats, trackType livekit.TrackType, streamType livekit.StreamType) { + agg := types.AggregateTrafficStats(statsList) + if agg != nil { + trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{ + TrackType: trackType, + StreamType: streamType, + TrafficStats: agg, + }) + } + } + addTypeStats(upstreamAudioStats, livekit.TrackType_AUDIO, livekit.StreamType_UPSTREAM) + addTypeStats(upstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_UPSTREAM) + addTypeStats(downstreamAudioStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM) + addTypeStats(downstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM) + + if p.params.DataChannelStats != nil { + dataChannelTraffic := p.params.DataChannelStats.GetTrafficTotals() + if p.dataChannelTraffic != nil { + trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{ + TrackType: livekit.TrackType_DATA, + StreamType: livekit.StreamType_UPSTREAM, + TrafficStats: &types.TrafficStats{ + StartTime: p.dataChannelTraffic.At, + EndTime: dataChannelTraffic.At, + Packets: dataChannelTraffic.RecvMessages - p.dataChannelTraffic.RecvMessages, + Bytes: dataChannelTraffic.RecvBytes - p.dataChannelTraffic.RecvBytes, + }, + }) + + trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{ + TrackType: livekit.TrackType_DATA, + StreamType: livekit.StreamType_DOWNSTREAM, + TrafficStats: &types.TrafficStats{ + StartTime: p.dataChannelTraffic.At, + EndTime: dataChannelTraffic.At, + Packets: dataChannelTraffic.SendMessages - p.dataChannelTraffic.SendMessages, + Bytes: dataChannelTraffic.SendBytes - p.dataChannelTraffic.SendBytes, + }, + }) + } + p.dataChannelTraffic = dataChannelTraffic + } + + p.trafficLoad = &types.TrafficLoad{ + TrafficTypeStats: trafficTypeStats, + } + return p.trafficLoad +} + +func (p *ParticipantTrafficLoad) reporter() { + ticker := time.NewTicker(reportInterval) + defer ticker.Stop() + + for { + select { + case <-p.closed.Watch(): + return + + case <-ticker.C: + trafficLoad := p.updateTrafficLoad() + if onTrafficLoad := p.getOnTrafficLoad(); onTrafficLoad != nil { + onTrafficLoad(trafficLoad) + } + } + } +} diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index bef8abb6e..266e24b8b 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -395,6 +395,7 @@ type LocalParticipant interface { OnClose(callback func(LocalParticipant)) OnClaimsChanged(callback func(LocalParticipant)) OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) + OnTrafficLoad(callback func(trafficLoad *TrafficLoad)) // session migration MaybeStartMigration(force bool, onStart func()) bool @@ -420,6 +421,8 @@ type LocalParticipant interface { SetSubscriberChannelCapacity(channelCapacity int64) GetPacer() pacer.Pacer + + GetTrafficLoad() *TrafficLoad } // Room is a container of participants, and can provide room-level actions @@ -499,6 +502,7 @@ type LocalMediaTrack interface { HasSdpCid(cid string) bool GetConnectionScoreAndQuality() (float32, livekit.ConnectionQuality) + GetTrackStats() *livekit.RTPStats SetRTT(rtt uint32) diff --git a/pkg/rtc/types/trafficstats.go b/pkg/rtc/types/trafficstats.go new file mode 100644 index 000000000..70b269db8 --- /dev/null +++ b/pkg/rtc/types/trafficstats.go @@ -0,0 +1,126 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "time" + + "github.com/livekit/protocol/livekit" +) + +type TrafficStats struct { + StartTime time.Time + EndTime time.Time + Packets uint32 + Bytes uint64 +} + +type TrafficTypeStats struct { + TrackType livekit.TrackType + StreamType livekit.StreamType + TrafficStats *TrafficStats +} + +type TrafficLoad struct { + TrafficTypeStats []*TrafficTypeStats +} + +func RTPStatsDiffToTrafficStats(before, after *livekit.RTPStats) *TrafficStats { + if after == nil { + return nil + } + + startTime := after.StartTime + if before != nil { + startTime = before.EndTime + } + + if before == nil { + return &TrafficStats{ + StartTime: startTime.AsTime(), + EndTime: after.EndTime.AsTime(), + Packets: after.Packets, + Bytes: after.Bytes + after.BytesDuplicate + after.BytesPadding, + } + } + + return &TrafficStats{ + StartTime: startTime.AsTime(), + EndTime: after.EndTime.AsTime(), + Packets: after.Packets - before.Packets, + Bytes: (after.Bytes + after.BytesDuplicate + after.BytesPadding) - (before.Bytes + before.BytesDuplicate + before.BytesPadding), + } +} + +func AggregateTrafficStats(statsList []*TrafficStats) *TrafficStats { + if len(statsList) == 0 { + return nil + } + + startTime := time.Time{} + endTime := time.Time{} + + packets := uint32(0) + bytes := uint64(0) + + for _, stats := range statsList { + if startTime.IsZero() || startTime.After(stats.StartTime) { + startTime = stats.StartTime + } + + if endTime.IsZero() || endTime.Before(stats.EndTime) { + endTime = stats.EndTime + } + + packets += stats.Packets + bytes += stats.Bytes + } + + if endTime.IsZero() { + endTime = time.Now() + } + return &TrafficStats{ + StartTime: startTime, + EndTime: endTime, + Packets: packets, + Bytes: bytes, + } +} + +func TrafficLoadToTrafficRate(trafficLoad *TrafficLoad) ( + packetRateIn float64, + byteRateIn float64, + packetRateOut float64, + byteRateOut float64, +) { + if trafficLoad == nil { + return + } + + for _, trafficTypeStat := range trafficLoad.TrafficTypeStats { + elapsed := trafficTypeStat.TrafficStats.EndTime.Sub(trafficTypeStat.TrafficStats.StartTime).Seconds() + packetRate := float64(trafficTypeStat.TrafficStats.Packets) / elapsed + byteRate := float64(trafficTypeStat.TrafficStats.Bytes) / elapsed + switch trafficTypeStat.StreamType { + case livekit.StreamType_UPSTREAM: + packetRateIn += packetRate + byteRateIn += byteRate + case livekit.StreamType_DOWNSTREAM: + packetRateOut += packetRate + byteRateOut += byteRate + } + } + return +} diff --git a/pkg/rtc/types/typesfakes/fake_local_media_track.go b/pkg/rtc/types/typesfakes/fake_local_media_track.go index 83fdb1580..a606f8ef1 100644 --- a/pkg/rtc/types/typesfakes/fake_local_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_local_media_track.go @@ -107,6 +107,16 @@ type FakeLocalMediaTrack struct { getTemporalLayerForSpatialFpsReturnsOnCall map[int]struct { result1 int32 } + GetTrackStatsStub func() *livekit.RTPStats + getTrackStatsMutex sync.RWMutex + getTrackStatsArgsForCall []struct { + } + getTrackStatsReturns struct { + result1 *livekit.RTPStats + } + getTrackStatsReturnsOnCall map[int]struct { + result1 *livekit.RTPStats + } HasSdpCidStub func(string) bool hasSdpCidMutex sync.RWMutex hasSdpCidArgsForCall []struct { @@ -834,6 +844,59 @@ func (fake *FakeLocalMediaTrack) GetTemporalLayerForSpatialFpsReturnsOnCall(i in }{result1} } +func (fake *FakeLocalMediaTrack) GetTrackStats() *livekit.RTPStats { + fake.getTrackStatsMutex.Lock() + ret, specificReturn := fake.getTrackStatsReturnsOnCall[len(fake.getTrackStatsArgsForCall)] + fake.getTrackStatsArgsForCall = append(fake.getTrackStatsArgsForCall, struct { + }{}) + stub := fake.GetTrackStatsStub + fakeReturns := fake.getTrackStatsReturns + fake.recordInvocation("GetTrackStats", []interface{}{}) + fake.getTrackStatsMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalMediaTrack) GetTrackStatsCallCount() int { + fake.getTrackStatsMutex.RLock() + defer fake.getTrackStatsMutex.RUnlock() + return len(fake.getTrackStatsArgsForCall) +} + +func (fake *FakeLocalMediaTrack) GetTrackStatsCalls(stub func() *livekit.RTPStats) { + fake.getTrackStatsMutex.Lock() + defer fake.getTrackStatsMutex.Unlock() + fake.GetTrackStatsStub = stub +} + +func (fake *FakeLocalMediaTrack) GetTrackStatsReturns(result1 *livekit.RTPStats) { + fake.getTrackStatsMutex.Lock() + defer fake.getTrackStatsMutex.Unlock() + fake.GetTrackStatsStub = nil + fake.getTrackStatsReturns = struct { + result1 *livekit.RTPStats + }{result1} +} + +func (fake *FakeLocalMediaTrack) GetTrackStatsReturnsOnCall(i int, result1 *livekit.RTPStats) { + fake.getTrackStatsMutex.Lock() + defer fake.getTrackStatsMutex.Unlock() + fake.GetTrackStatsStub = nil + if fake.getTrackStatsReturnsOnCall == nil { + fake.getTrackStatsReturnsOnCall = make(map[int]struct { + result1 *livekit.RTPStats + }) + } + fake.getTrackStatsReturnsOnCall[i] = struct { + result1 *livekit.RTPStats + }{result1} +} + func (fake *FakeLocalMediaTrack) HasSdpCid(arg1 string) bool { fake.hasSdpCidMutex.Lock() ret, specificReturn := fake.hasSdpCidReturnsOnCall[len(fake.hasSdpCidArgsForCall)] @@ -2069,6 +2132,8 @@ func (fake *FakeLocalMediaTrack) Invocations() map[string][][]interface{} { defer fake.getQualityForDimensionMutex.RUnlock() fake.getTemporalLayerForSpatialFpsMutex.RLock() defer fake.getTemporalLayerForSpatialFpsMutex.RUnlock() + fake.getTrackStatsMutex.RLock() + defer fake.getTrackStatsMutex.RUnlock() fake.hasSdpCidMutex.RLock() defer fake.hasSdpCidMutex.RUnlock() fake.iDMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 9e11470c3..a01a3834b 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -325,6 +325,16 @@ type FakeLocalParticipant struct { getSubscribedTracksReturnsOnCall map[int]struct { result1 []types.SubscribedTrack } + GetTrafficLoadStub func() *types.TrafficLoad + getTrafficLoadMutex sync.RWMutex + getTrafficLoadArgsForCall []struct { + } + getTrafficLoadReturns struct { + result1 *types.TrafficLoad + } + getTrafficLoadReturnsOnCall map[int]struct { + result1 *types.TrafficLoad + } GetTrailerStub func() []byte getTrailerMutex sync.RWMutex getTrailerArgsForCall []struct { @@ -582,6 +592,11 @@ type FakeLocalParticipant struct { onTrackUpdatedArgsForCall []struct { arg1 func(types.LocalParticipant, types.MediaTrack) } + OnTrafficLoadStub func(func(trafficLoad *types.TrafficLoad)) + onTrafficLoadMutex sync.RWMutex + onTrafficLoadArgsForCall []struct { + arg1 func(trafficLoad *types.TrafficLoad) + } ProtocolVersionStub func() types.ProtocolVersion protocolVersionMutex sync.RWMutex protocolVersionArgsForCall []struct { @@ -2540,6 +2555,59 @@ func (fake *FakeLocalParticipant) GetSubscribedTracksReturnsOnCall(i int, result }{result1} } +func (fake *FakeLocalParticipant) GetTrafficLoad() *types.TrafficLoad { + fake.getTrafficLoadMutex.Lock() + ret, specificReturn := fake.getTrafficLoadReturnsOnCall[len(fake.getTrafficLoadArgsForCall)] + fake.getTrafficLoadArgsForCall = append(fake.getTrafficLoadArgsForCall, struct { + }{}) + stub := fake.GetTrafficLoadStub + fakeReturns := fake.getTrafficLoadReturns + fake.recordInvocation("GetTrafficLoad", []interface{}{}) + fake.getTrafficLoadMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) GetTrafficLoadCallCount() int { + fake.getTrafficLoadMutex.RLock() + defer fake.getTrafficLoadMutex.RUnlock() + return len(fake.getTrafficLoadArgsForCall) +} + +func (fake *FakeLocalParticipant) GetTrafficLoadCalls(stub func() *types.TrafficLoad) { + fake.getTrafficLoadMutex.Lock() + defer fake.getTrafficLoadMutex.Unlock() + fake.GetTrafficLoadStub = stub +} + +func (fake *FakeLocalParticipant) GetTrafficLoadReturns(result1 *types.TrafficLoad) { + fake.getTrafficLoadMutex.Lock() + defer fake.getTrafficLoadMutex.Unlock() + fake.GetTrafficLoadStub = nil + fake.getTrafficLoadReturns = struct { + result1 *types.TrafficLoad + }{result1} +} + +func (fake *FakeLocalParticipant) GetTrafficLoadReturnsOnCall(i int, result1 *types.TrafficLoad) { + fake.getTrafficLoadMutex.Lock() + defer fake.getTrafficLoadMutex.Unlock() + fake.GetTrafficLoadStub = nil + if fake.getTrafficLoadReturnsOnCall == nil { + fake.getTrafficLoadReturnsOnCall = make(map[int]struct { + result1 *types.TrafficLoad + }) + } + fake.getTrafficLoadReturnsOnCall[i] = struct { + result1 *types.TrafficLoad + }{result1} +} + func (fake *FakeLocalParticipant) GetTrailer() []byte { fake.getTrailerMutex.Lock() ret, specificReturn := fake.getTrailerReturnsOnCall[len(fake.getTrailerArgsForCall)] @@ -3992,6 +4060,38 @@ func (fake *FakeLocalParticipant) OnTrackUpdatedArgsForCall(i int) func(types.Lo return argsForCall.arg1 } +func (fake *FakeLocalParticipant) OnTrafficLoad(arg1 func(trafficLoad *types.TrafficLoad)) { + fake.onTrafficLoadMutex.Lock() + fake.onTrafficLoadArgsForCall = append(fake.onTrafficLoadArgsForCall, struct { + arg1 func(trafficLoad *types.TrafficLoad) + }{arg1}) + stub := fake.OnTrafficLoadStub + fake.recordInvocation("OnTrafficLoad", []interface{}{arg1}) + fake.onTrafficLoadMutex.Unlock() + if stub != nil { + fake.OnTrafficLoadStub(arg1) + } +} + +func (fake *FakeLocalParticipant) OnTrafficLoadCallCount() int { + fake.onTrafficLoadMutex.RLock() + defer fake.onTrafficLoadMutex.RUnlock() + return len(fake.onTrafficLoadArgsForCall) +} + +func (fake *FakeLocalParticipant) OnTrafficLoadCalls(stub func(func(trafficLoad *types.TrafficLoad))) { + fake.onTrafficLoadMutex.Lock() + defer fake.onTrafficLoadMutex.Unlock() + fake.OnTrafficLoadStub = stub +} + +func (fake *FakeLocalParticipant) OnTrafficLoadArgsForCall(i int) func(trafficLoad *types.TrafficLoad) { + fake.onTrafficLoadMutex.RLock() + defer fake.onTrafficLoadMutex.RUnlock() + argsForCall := fake.onTrafficLoadArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) ProtocolVersion() types.ProtocolVersion { fake.protocolVersionMutex.Lock() ret, specificReturn := fake.protocolVersionReturnsOnCall[len(fake.protocolVersionArgsForCall)] @@ -6074,6 +6174,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.getSubscribedParticipantsMutex.RUnlock() fake.getSubscribedTracksMutex.RLock() defer fake.getSubscribedTracksMutex.RUnlock() + fake.getTrafficLoadMutex.RLock() + defer fake.getTrafficLoadMutex.RUnlock() fake.getTrailerMutex.RLock() defer fake.getTrailerMutex.RUnlock() fake.handleAnswerMutex.RLock() @@ -6142,6 +6244,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.onTrackUnpublishedMutex.RUnlock() fake.onTrackUpdatedMutex.RLock() defer fake.onTrackUpdatedMutex.RUnlock() + fake.onTrafficLoadMutex.RLock() + defer fake.onTrafficLoadMutex.RUnlock() fake.protocolVersionMutex.RLock() defer fake.protocolVersionMutex.RUnlock() fake.removePublishedTrackMutex.RLock() diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 593a62487..8df479ce7 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -323,3 +323,10 @@ func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, re } return 0, errors.New("receiver not available") } + +func (d *DummyReceiver) GetTrackStats() *livekit.RTPStats { + if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { + return r.GetTrackStats() + } + return nil +} diff --git a/pkg/service/ioservice_sip.go b/pkg/service/ioservice_sip.go index d427c5e2b..3f0b34042 100644 --- a/pkg/service/ioservice_sip.go +++ b/pkg/service/ioservice_sip.go @@ -322,7 +322,7 @@ func (s *IOInfoService) GetSIPTrunkAuthentication(ctx context.Context, req *rpc. return nil, err } return &rpc.GetSIPTrunkAuthenticationResponse{ - Username: trunk.Username, - Password: trunk.Password, + Username: trunk.InboundUsername, + Password: trunk.InboundPassword, }, nil } diff --git a/pkg/service/sip.go b/pkg/service/sip.go index 4a7fe411a..2118d767b 100644 --- a/pkg/service/sip.go +++ b/pkg/service/sip.go @@ -65,8 +65,10 @@ func (s *SIPService) CreateSIPTrunk(ctx context.Context, req *livekit.CreateSIPT OutboundAddress: req.OutboundAddress, OutboundNumber: req.OutboundNumber, InboundNumbersRegex: req.InboundNumbersRegex, - Username: req.Username, - Password: req.Password, + InboundUsername: req.InboundUsername, + InboundPassword: req.InboundPassword, + OutboundUsername: req.OutboundUsername, + OutboundPassword: req.OutboundPassword, } if err := s.store.StoreSIPTrunk(ctx, info); err != nil { diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 3dac1c4af..e2c406317 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -24,6 +24,7 @@ import ( "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" ) const ( @@ -758,7 +759,7 @@ func (r *rtpStatsBase) toProto( } if gapsPresent { - p.GapHistogram = make(map[int32]uint32, cGapHistogramNumBins) + p.GapHistogram = make(map[int32]uint32, len(r.gapHistogram)) for i := 0; i < len(r.gapHistogram); i++ { if r.gapHistogram[i] == 0 { continue @@ -915,172 +916,7 @@ func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snaps // ---------------------------------- func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { - if len(statsList) == 0 { - return nil - } - - startTime := time.Time{} - endTime := time.Time{} - - packets := uint32(0) - bytes := uint64(0) - headerBytes := uint64(0) - packetsLost := uint32(0) - packetsDuplicate := uint32(0) - bytesDuplicate := uint64(0) - headerBytesDuplicate := uint64(0) - packetsPadding := uint32(0) - bytesPadding := uint64(0) - headerBytesPadding := uint64(0) - packetsOutOfOrder := uint32(0) - frames := uint32(0) - keyFrames := uint32(0) - lastKeyFrame := time.Time{} - jitter := 0.0 - maxJitter := float64(0) - gapHistogram := make(map[int32]uint32, cGapHistogramNumBins) - nacks := uint32(0) - nackAcks := uint32(0) - nackMisses := uint32(0) - nackRepeated := uint32(0) - plis := uint32(0) - lastPli := time.Time{} - layerLockPlis := uint32(0) - lastLayerLockPli := time.Time{} - firs := uint32(0) - lastFir := time.Time{} - rtt := uint32(0) - maxRtt := uint32(0) - - for _, stats := range statsList { - if startTime.IsZero() || startTime.After(stats.StartTime.AsTime()) { - startTime = stats.StartTime.AsTime() - } - - if endTime.IsZero() || endTime.Before(stats.EndTime.AsTime()) { - endTime = stats.EndTime.AsTime() - } - - packets += stats.Packets - bytes += stats.Bytes - headerBytes += stats.HeaderBytes - - packetsLost += stats.PacketsLost - - packetsDuplicate += stats.PacketsDuplicate - bytesDuplicate += stats.BytesDuplicate - headerBytesDuplicate += stats.HeaderBytesDuplicate - - packetsPadding += stats.PacketsPadding - bytesPadding += stats.BytesPadding - headerBytesPadding += stats.HeaderBytesPadding - - packetsOutOfOrder += stats.PacketsOutOfOrder - - frames += stats.Frames - - keyFrames += stats.KeyFrames - if lastKeyFrame.IsZero() || lastKeyFrame.Before(stats.LastKeyFrame.AsTime()) { - lastKeyFrame = stats.LastKeyFrame.AsTime() - } - - jitter += stats.JitterCurrent - if stats.JitterMax > maxJitter { - maxJitter = stats.JitterMax - } - - for burst, count := range stats.GapHistogram { - gapHistogram[burst] += count - } - - nacks += stats.Nacks - nackAcks += stats.NackAcks - nackMisses += stats.NackMisses - nackRepeated += stats.NackRepeated - - plis += stats.Plis - if lastPli.IsZero() || lastPli.Before(stats.LastPli.AsTime()) { - lastPli = stats.LastPli.AsTime() - } - - layerLockPlis += stats.LayerLockPlis - if lastLayerLockPli.IsZero() || lastLayerLockPli.Before(stats.LastLayerLockPli.AsTime()) { - lastLayerLockPli = stats.LastLayerLockPli.AsTime() - } - - firs += stats.Firs - if lastFir.IsZero() || lastPli.Before(stats.LastFir.AsTime()) { - lastFir = stats.LastFir.AsTime() - } - - rtt += stats.RttCurrent - if stats.RttMax > maxRtt { - maxRtt = stats.RttMax - } - } - - if endTime.IsZero() { - endTime = time.Now() - } - elapsed := endTime.Sub(startTime).Seconds() - - packetLostRate := float64(packetsLost) / elapsed - packetLostPercentage := float32(packetsLost) / (float32(packets) + float32(packetsLost)) * 100.0 - - packetRate := float64(packets) / elapsed - packetDuplicateRate := float64(packetsDuplicate) / elapsed - packetPaddingRate := float64(packetsPadding) / elapsed - - bitrate := float64(bytes) * 8.0 / elapsed - bitrateDuplicate := float64(bytesDuplicate) * 8.0 / elapsed - bitratePadding := float64(bytesPadding) * 8.0 / elapsed - - frameRate := float64(frames) / elapsed - - return &livekit.RTPStats{ - StartTime: timestamppb.New(startTime), - EndTime: timestamppb.New(endTime), - Duration: elapsed, - Packets: packets, - PacketRate: packetRate, - Bytes: bytes, - HeaderBytes: headerBytes, - Bitrate: bitrate, - PacketsLost: packetsLost, - PacketLossRate: packetLostRate, - PacketLossPercentage: packetLostPercentage, - PacketsDuplicate: packetsDuplicate, - PacketDuplicateRate: packetDuplicateRate, - BytesDuplicate: bytesDuplicate, - HeaderBytesDuplicate: headerBytesDuplicate, - BitrateDuplicate: bitrateDuplicate, - PacketsPadding: packetsPadding, - PacketPaddingRate: packetPaddingRate, - BytesPadding: bytesPadding, - HeaderBytesPadding: headerBytesPadding, - BitratePadding: bitratePadding, - PacketsOutOfOrder: packetsOutOfOrder, - Frames: frames, - FrameRate: frameRate, - KeyFrames: keyFrames, - LastKeyFrame: timestamppb.New(lastKeyFrame), - JitterCurrent: jitter / float64(len(statsList)), - JitterMax: maxJitter, - GapHistogram: gapHistogram, - Nacks: nacks, - NackAcks: nackAcks, - NackMisses: nackMisses, - NackRepeated: nackRepeated, - Plis: plis, - LastPli: timestamppb.New(lastPli), - LayerLockPlis: layerLockPlis, - LastLayerLockPli: timestamppb.New(lastLayerLockPli), - Firs: firs, - LastFir: timestamppb.New(lastFir), - RttCurrent: rtt / uint32(len(statsList)), - RttMax: maxRtt, - // no aggregation for drift calculations - } + return utils.AggregateRTPStats(statsList, cGapHistogramNumBins) } func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index d07a50615..9afff8e98 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -82,6 +82,8 @@ type TrackReceiver interface { GetCalculatedClockRate(layer int32) uint32 GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) + + GetTrackStats() *livekit.RTPStats } // WebRTCReceiver receives a media track @@ -558,7 +560,7 @@ func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats { w.bufferMu.RLock() defer w.bufferMu.RUnlock() - var stats []*livekit.RTPStats + stats := make([]*livekit.RTPStats, 0, len(w.buffers)) for _, buff := range w.buffers { if buff == nil { continue diff --git a/pkg/telemetry/signalanddatastats.go b/pkg/telemetry/signalanddatastats.go index fb17e61e7..5ed99ebb0 100644 --- a/pkg/telemetry/signalanddatastats.go +++ b/pkg/telemetry/signalanddatastats.go @@ -32,13 +32,27 @@ const ( BytesTrackTypeSignal BytesTrackType = "SG" ) +// ------------------------------- + +type TrafficTotals struct { + At time.Time + SendBytes uint64 + SendMessages uint32 + RecvBytes uint64 + RecvMessages uint32 +} + +// -------------------------------- + // stats for signal and data channel type BytesTrackStats struct { - trackID livekit.TrackID - pID livekit.ParticipantID - send, recv atomic.Uint64 - telemetry TelemetryService - isStopped atomic.Bool + trackID livekit.TrackID + pID livekit.ParticipantID + send, recv atomic.Uint64 + totalSendBytes, totalRecvBytes atomic.Uint64 + totalSendMessages, totalRecvMessages atomic.Uint32 + telemetry TelemetryService + isStopped atomic.Bool } func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, telemetry TelemetryService) *BytesTrackStats { @@ -54,8 +68,22 @@ func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, tele func (s *BytesTrackStats) AddBytes(bytes uint64, isSend bool) { if isSend { s.send.Add(bytes) + s.totalSendBytes.Add(bytes) + s.totalSendMessages.Inc() } else { s.recv.Add(bytes) + s.totalRecvBytes.Add(bytes) + s.totalRecvMessages.Inc() + } +} + +func (s *BytesTrackStats) GetTrafficTotals() *TrafficTotals { + return &TrafficTotals{ + At: time.Now(), + SendBytes: s.totalSendBytes.Load(), + SendMessages: s.totalSendMessages.Load(), + RecvBytes: s.totalRecvBytes.Load(), + RecvMessages: s.totalRecvMessages.Load(), } } From 5f76d1adccd8fa72a8109f7dcaf283266a819905 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 27 Nov 2023 23:06:53 +0530 Subject: [PATCH 02/13] Introduce `DISCONNECTED` connection quality. (#2265) * Introduce `DISCONNECTED` connection quality. Currently, this state happens when any up stream track does not send any packets in an analysis window when it is expected to send packets. This can be used by participants to know the quality of a potentially disconnected participant. Previously, it took 20 - 30 seconds for the stale timeout to kick in and disconnect the limbo participant which triggered a participant update through which other participants knew about it. Previously, `POOR` quality was also overloaded to denote that the up stream is not sending any packets. With this change, that is a separate indicator, i. e. `DISCONNECTED`. * clean up * Update deps * spelling --- go.mod | 6 +-- go.sum | 12 ++--- pkg/rtc/participant.go | 12 ++--- .../connectionquality/connectionstats_test.go | 30 +++++++++-- pkg/sfu/connectionquality/scorer.go | 53 +++++++++++-------- 5 files changed, 71 insertions(+), 42 deletions(-) diff --git a/go.mod b/go.mod index c6af6151f..52241c921 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.9.2-0.20231126171322-3f612979d8c9 + github.com/livekit/protocol v1.9.2-0.20231127172639-1dda6d8eb064 github.com/livekit/psrpc v0.5.2 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -95,10 +95,10 @@ require ( github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect - golang.org/x/crypto v0.15.0 // indirect + golang.org/x/crypto v0.16.0 // indirect golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.18.0 // indirect - golang.org/x/sys v0.14.0 // indirect + golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.15.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect diff --git a/go.sum b/go.sum index e0ede4f38..caa11b8e3 100644 --- a/go.sum +++ b/go.sum @@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.9.2-0.20231126171322-3f612979d8c9 h1:7/S8gXZCWbtx46xHdYSBg4XqnMVpAt+YfHMTZYmWzkQ= -github.com/livekit/protocol v1.9.2-0.20231126171322-3f612979d8c9/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= +github.com/livekit/protocol v1.9.2-0.20231127172639-1dda6d8eb064 h1:sse1bDW+/I3vnuw003u+M6TjpiIER9d9KsRWXn7xtRY= +github.com/livekit/protocol v1.9.2-0.20231127172639-1dda6d8eb064/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U= github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -291,8 +291,8 @@ golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= -golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA= -golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g= +golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= +golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -378,8 +378,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index ca1d0c9f2..8362932b7 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -931,8 +931,7 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo numTracks++ score, quality := pt.(types.LocalMediaTrack).GetConnectionScoreAndQuality() - if quality < minQuality { - // WARNING NOTE: comparing protobuf enums directly + if utils.IsConnectionQualityLower(minQuality, quality) { minQuality = quality minScore = score } else if quality == minQuality && score < minScore { @@ -942,8 +941,7 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo p.lock.Lock() trackID := pt.ID() if prevQuality, ok := p.tracksQuality[trackID]; ok { - // WARNING NOTE: comparing protobuf enums directly - if prevQuality > quality { + if utils.IsConnectionQualityLower(prevQuality, quality) { numUpDrops++ } } @@ -958,8 +956,7 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo numTracks++ score, quality := subTrack.DownTrack().GetConnectionScoreAndQuality() - if quality < minQuality { - // WARNING NOTE: comparing protobuf enums directly + if utils.IsConnectionQualityLower(minQuality, quality) { minQuality = quality minScore = score } else if quality == minQuality && score < minScore { @@ -969,8 +966,7 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo p.lock.Lock() trackID := subTrack.ID() if prevQuality, ok := p.tracksQuality[trackID]; ok { - // WARNING NOTE: comparing protobuf enums directly - if prevQuality > quality { + if utils.IsConnectionQualityLower(prevQuality, quality) { numDownDrops++ } } diff --git a/pkg/sfu/connectionquality/connectionstats_test.go b/pkg/sfu/connectionquality/connectionstats_test.go index 73c9c649b..5af316ac2 100644 --- a/pkg/sfu/connectionquality/connectionstats_test.go +++ b/pkg/sfu/connectionquality/connectionstats_test.go @@ -259,7 +259,7 @@ func TestConnectionQuality(t *testing.T) { require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) - // next update with no packets should knock quality down + // next update with no packets should knock quality down to DISCONNECTED now = now.Add(duration) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { @@ -273,12 +273,36 @@ func TestConnectionQuality(t *testing.T) { cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(2.1), mos) - require.Equal(t, livekit.ConnectionQuality_POOR, quality) + require.Equal(t, livekit.ConnectionQuality_DISCONNECTED, quality) - // mute/unmute to bring quality back up + // mute when DISCONNECTED should not bump up score/quality now = now.Add(duration) cs.UpdateMuteAt(true, now.Add(1*time.Second)) + mos, quality = cs.GetScoreAndQuality() + require.Greater(t, float32(2.1), mos) + require.Equal(t, livekit.ConnectionQuality_DISCONNECTED, quality) + + // unmute and send packets to bring quality back up + now = now.Add(duration) cs.UpdateMuteAt(false, now.Add(2*time.Second)) + for i := 0; i < 3; i++ { + trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ + 1: { + RTPStats: &buffer.RTPDeltaInfo{ + StartTime: now, + Duration: duration, + Packets: 250, + PacketsLost: 0, + }, + }, + }) + cs.updateScoreAt(now.Add(duration)) + now = now.Add(duration) + } + cs.updateScoreAt(now.Add(duration)) + mos, quality = cs.GetScoreAndQuality() + require.Greater(t, float32(4.6), mos) + require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) // with lesser number of packet (simulating DTX). // even higher loss (like 10%) should not knock down quality due to quadratic weighting of packet loss ratio diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index 340f32368..d2d1f4e5b 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -29,9 +29,10 @@ const ( MaxMOS = float32(4.5) MinMOS = float32(1.0) - maxScore = float64(100.0) - poorScore = float64(30.0) - minScore = float64(20.0) + cMaxScore = float64(100.0) + cPoorScore = float64(30.0) + cMinScore = float64(20.0) + cDisconnectedScore = cMinScore increaseFactor = float64(0.4) // slower increase, i. e. when score is recovering move up slower -> conservative decreaseFactor = float64(0.7) // faster decrease, i. e. when score is dropping move down faster -> aggressive to be responsive to quality drops @@ -107,7 +108,7 @@ func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJ } lossEffect *= plw - score := maxScore - delayEffect - lossEffect + score := cMaxScore - delayEffect - lossEffect if score < 0.0 { score = 0.0 } @@ -118,7 +119,7 @@ func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJ func (w *windowStat) calculateBitrateScore(expectedBitrate int64) float64 { if expectedBitrate == 0 { // unsupported mode OR all layers stopped - return maxScore + return cMaxScore } var score float64 @@ -126,9 +127,9 @@ func (w *windowStat) calculateBitrateScore(expectedBitrate int64) float64 { // using the ratio of expectedBitrate / actualBitrate // the quality inflection points are approximately // GOOD at ~2.7x, POOR at ~20.1x - score = maxScore - 20*math.Log(float64(expectedBitrate)/float64(w.bytes*8)) - if score > maxScore { - score = maxScore + score = cMaxScore - 20*math.Log(float64(expectedBitrate)/float64(w.bytes*8)) + if score > cMaxScore { + score = cMaxScore } if score < 0.0 { score = 0.0 @@ -188,7 +189,7 @@ type qualityScorer struct { func newQualityScorer(params qualityScorerParams) *qualityScorer { return &qualityScorer{ params: params, - score: maxScore, + score: cMaxScore, aggregateBitrate: utils.NewTimedAggregator[int64](utils.TimedAggregatorParams{ CapNegativeValues: true, }), @@ -219,7 +220,10 @@ func (q *qualityScorer) Start() { func (q *qualityScorer) updateMuteAtLocked(isMuted bool, at time.Time) { if isMuted { q.mutedAt = at - q.score = maxScore + // muting when DISCONNECTED should not push quality to EXCELLENT + if q.score != cDisconnectedScore { + q.score = cMaxScore + } } else { q.unmutedAt = at } @@ -264,7 +268,7 @@ func (q *qualityScorer) updateLayerMuteAtLocked(isMuted bool, at time.Time) { q.layerDistance.Reset() q.layerMutedAt = at - q.score = maxScore + q.score = cMaxScore } } else { if q.isLayerMuted() { @@ -294,7 +298,7 @@ func (q *qualityScorer) updatePauseAtLocked(isPaused bool, at time.Time) { q.layerDistance.Reset() q.pausedAt = at - q.score = poorScore + q.score = cPoorScore } } else { if q.isPaused() { @@ -353,7 +357,7 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { // considered (as long as enough time has passed since unmute). // // Similarly, when paused (possibly due to congestion), score is immediately - // set to poorScore for responsiveness. The layer transision is reest. + // set to cPoorScore for responsiveness. The layer transision is reest. // On a resume, quality climbs back up using normal operation. if q.isMuted() || !q.isUnmutedEnough(at) || q.isLayerMuted() || q.isPaused() { q.lastUpdateAt = at @@ -365,11 +369,11 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { var score float64 if stat.packetsExpected == 0 { reason = "dry" - score = poorScore + score = cDisconnectedScore } else { packetScore := stat.calculatePacketScore(plw, q.params.IncludeRTT, q.params.IncludeJitter) bitrateScore := stat.calculateBitrateScore(expectedBitrate) - layerScore := math.Max(math.Min(maxScore, maxScore-(expectedDistance*distanceWeight)), 0.0) + layerScore := math.Max(math.Min(cMaxScore, cMaxScore-(expectedDistance*distanceWeight)), 0.0) minScore := math.Min(packetScore, bitrateScore) minScore = math.Min(minScore, layerScore) @@ -394,22 +398,23 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { } score = factor*score + (1.0-factor)*q.score } - if score < minScore { + if score < cMinScore { // lower bound to prevent score from becoming very small values due to extreme conditions. // Without a lower bound, it can get so low that it takes a long time to climb back to // better quality even under excellent conditions. - score = minScore + score = cMinScore } - // WARNING NOTE: comparing protobuf enum values directly (livekit.ConnectionQuality) - if scoreToConnectionQuality(q.score) > scoreToConnectionQuality(score) { + prevCQ := scoreToConnectionQuality(q.score) + currCQ := scoreToConnectionQuality(score) + if utils.IsConnectionQualityLower(prevCQ, currCQ) { q.params.Logger.Infow( "quality drop", "reason", reason, "prevScore", q.score, - "prevQuality", scoreToConnectionQuality(q.score), + "prevQuality", prevCQ, "prevStat", &q.stat, "score", score, - "quality", scoreToConnectionQuality(score), + "quality", currCQ, "stat", stat, "packetLossWeight", plw, "maxPPS", q.maxPPS, @@ -531,7 +536,11 @@ func scoreToConnectionQuality(score float64) livekit.ConnectionQuality { return livekit.ConnectionQuality_GOOD } - return livekit.ConnectionQuality_POOR + if score > 20.0 { + return livekit.ConnectionQuality_POOR + } + + return livekit.ConnectionQuality_DISCONNECTED } // ------------------------------------------ From ca71b486a14888ec352c40ece8ec1c9fff3d6ebc Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 27 Nov 2023 20:57:45 -0800 Subject: [PATCH 03/13] Update golang.org/x/exp digest to 6522937 (#2266) Generated by renovateBot Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 6 +++--- go.sum | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 52241c921..66dd2becf 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( github.com/urfave/cli/v2 v2.25.7 github.com/urfave/negroni/v3 v3.0.0 go.uber.org/atomic v1.11.0 - golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa + golang.org/x/exp v0.0.0-20231127185646-65229373498e golang.org/x/sync v0.5.0 google.golang.org/protobuf v1.31.0 gopkg.in/yaml.v3 v3.0.1 @@ -97,10 +97,10 @@ require ( go.uber.org/zap v1.26.0 // indirect golang.org/x/crypto v0.16.0 // indirect golang.org/x/mod v0.14.0 // indirect - golang.org/x/net v0.18.0 // indirect + golang.org/x/net v0.19.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.15.0 // indirect + golang.org/x/tools v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/grpc v1.59.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index caa11b8e3..270760e5f 100644 --- a/go.sum +++ b/go.sum @@ -293,8 +293,8 @@ golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIi golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= -golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= -golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= +golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= +golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= @@ -327,8 +327,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= -golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= -golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -407,8 +407,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.15.0 h1:zdAyfUGbYmuVokhzVmghFl2ZJh5QhcfebBgmVPFYA+8= -golang.org/x/tools v0.15.0/go.mod h1:hpksKq4dtpQWS1uQ61JkdqWM3LscIS6Slf+VVkm+wQk= +golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM= +golang.org/x/tools v0.16.0/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= From 890f0bfc677bdc62e78337ea2ef8f1648eff1861 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Mon, 27 Nov 2023 21:31:39 -0800 Subject: [PATCH 04/13] initialize prometheus metrics in test files (#2267) --- pkg/rtc/room_test.go | 5 +++++ pkg/rtc/testutils.go | 5 ----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 36322cbc1..81dfe90c8 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -31,10 +31,15 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/types/typesfakes" "github.com/livekit/livekit-server/pkg/sfu/audio" "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes" "github.com/livekit/livekit-server/pkg/testutils" ) +func init() { + prometheus.Init("test", livekit.NodeType_SERVER, "test") +} + const ( numParticipants = 3 defaultDelay = 10 * time.Millisecond diff --git a/pkg/rtc/testutils.go b/pkg/rtc/testutils.go index fe5c10ad6..845cc4629 100644 --- a/pkg/rtc/testutils.go +++ b/pkg/rtc/testutils.go @@ -21,13 +21,8 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/rtc/types/typesfakes" - "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) -func init() { - prometheus.Init("test", livekit.NodeType_SERVER, "test") -} - func NewMockParticipant(identity livekit.ParticipantIdentity, protocol types.ProtocolVersion, hidden bool, publisher bool) *typesfakes.FakeLocalParticipant { p := &typesfakes.FakeLocalParticipant{} sid := utils.NewGuid(utils.ParticipantPrefix) From 396371312b9c058c769346152d3c61f65632e3b8 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 28 Nov 2023 11:51:21 +0530 Subject: [PATCH 05/13] Use variables for score -> quality mapping (#2268) * Use variables for score -> quality mapping * spelling --- pkg/sfu/connectionquality/scorer.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index d2d1f4e5b..5649c20d2 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -29,10 +29,9 @@ const ( MaxMOS = float32(4.5) MinMOS = float32(1.0) - cMaxScore = float64(100.0) - cPoorScore = float64(30.0) - cMinScore = float64(20.0) - cDisconnectedScore = cMinScore + cMaxScore = float64(100.0) + cMinScore = float64(20.0) + cPausedPoorScore = float64(30.0) increaseFactor = float64(0.4) // slower increase, i. e. when score is recovering move up slower -> conservative decreaseFactor = float64(0.7) // faster decrease, i. e. when score is dropping move down faster -> aggressive to be responsive to quality drops @@ -42,6 +41,14 @@ const ( unmuteTimeThreshold = float64(0.5) ) +var ( + qualityTransitionScore = map[livekit.ConnectionQuality]float64{ + livekit.ConnectionQuality_GOOD: 80, + livekit.ConnectionQuality_POOR: 40, + livekit.ConnectionQuality_DISCONNECTED: 20, + } +) + // ------------------------------------------ type windowStat struct { @@ -221,7 +228,7 @@ func (q *qualityScorer) updateMuteAtLocked(isMuted bool, at time.Time) { if isMuted { q.mutedAt = at // muting when DISCONNECTED should not push quality to EXCELLENT - if q.score != cDisconnectedScore { + if q.score != qualityTransitionScore[livekit.ConnectionQuality_DISCONNECTED] { q.score = cMaxScore } } else { @@ -298,7 +305,7 @@ func (q *qualityScorer) updatePauseAtLocked(isPaused bool, at time.Time) { q.layerDistance.Reset() q.pausedAt = at - q.score = cPoorScore + q.score = cPausedPoorScore } } else { if q.isPaused() { @@ -357,7 +364,7 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { // considered (as long as enough time has passed since unmute). // // Similarly, when paused (possibly due to congestion), score is immediately - // set to cPoorScore for responsiveness. The layer transision is reest. + // set to cPausedPoorScore for responsiveness. The layer transision is reest. // On a resume, quality climbs back up using normal operation. if q.isMuted() || !q.isUnmutedEnough(at) || q.isLayerMuted() || q.isPaused() { q.lastUpdateAt = at @@ -369,7 +376,7 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { var score float64 if stat.packetsExpected == 0 { reason = "dry" - score = cDisconnectedScore + score = qualityTransitionScore[livekit.ConnectionQuality_DISCONNECTED] } else { packetScore := stat.calculatePacketScore(plw, q.params.IncludeRTT, q.params.IncludeJitter) bitrateScore := stat.calculateBitrateScore(expectedBitrate) @@ -528,15 +535,15 @@ func scoreToConnectionQuality(score float64) livekit.ConnectionQuality { // that a score of 60 does not correspond to `POOR` quality. Repair // mechanisms and use of algorithms like de-jittering makes the experience // better even under harsh conditions. - if score > 80.0 { + if score > qualityTransitionScore[livekit.ConnectionQuality_GOOD] { return livekit.ConnectionQuality_EXCELLENT } - if score > 40.0 { + if score > qualityTransitionScore[livekit.ConnectionQuality_POOR] { return livekit.ConnectionQuality_GOOD } - if score > 20.0 { + if score > qualityTransitionScore[livekit.ConnectionQuality_DISCONNECTED] { return livekit.ConnectionQuality_POOR } From 4ba4f5310b002085a4b19145a73e7da5168aca87 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Mon, 27 Nov 2023 23:20:16 -0800 Subject: [PATCH 06/13] Use default psrpc client parameters (#2269) --- go.mod | 2 +- go.sum | 4 ++-- pkg/service/wire_gen.go | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 66dd2becf..8143c37f2 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.9.2-0.20231127172639-1dda6d8eb064 + github.com/livekit/protocol v1.9.2 github.com/livekit/psrpc v0.5.2 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 270760e5f..876fa1585 100644 --- a/go.sum +++ b/go.sum @@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.9.2-0.20231127172639-1dda6d8eb064 h1:sse1bDW+/I3vnuw003u+M6TjpiIER9d9KsRWXn7xtRY= -github.com/livekit/protocol v1.9.2-0.20231127172639-1dda6d8eb064/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= +github.com/livekit/protocol v1.9.2 h1:gkKNjVwTbRNO1d5ZxTDHoZrOv4CMvGa1/oPwmCHp8oE= +github.com/livekit/protocol v1.9.2/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U= github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 0f0eb6e87..9c9a741e5 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -60,7 +60,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - egressClient, err := rpc.NewEgressClient(messageBus) + clientParams := getPSRPCClientParams(psrpcConfig, messageBus) + egressClient, err := rpc.NewEgressClient(clientParams) if err != nil { return nil, err } @@ -83,7 +84,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService) topicFormatter := rpc.NewTopicFormatter() - clientParams := getPSRPCClientParams(psrpcConfig, messageBus) roomClient, err := rpc.NewTypedRoomClient(clientParams) if err != nil { return nil, err @@ -98,7 +98,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } egressService := NewEgressService(egressClient, rtcEgressLauncher, objectStore, ioInfoService, roomService) ingressConfig := getIngressConfig(conf) - ingressClient, err := rpc.NewIngressClient(messageBus) + ingressClient, err := rpc.NewIngressClient(clientParams) if err != nil { return nil, err } From bfc4f19c74a6469415ee4e42ea2f1887f1c0b175 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 28 Nov 2023 22:28:30 +0530 Subject: [PATCH 07/13] Guard against bad quality in trackInfo (#2271) --- pkg/sfu/buffer/videolayerutils.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/sfu/buffer/videolayerutils.go b/pkg/sfu/buffer/videolayerutils.go index b18c83d84..9028283fc 100644 --- a/pkg/sfu/buffer/videolayerutils.go +++ b/pkg/sfu/buffer/videolayerutils.go @@ -32,7 +32,12 @@ func LayerPresenceFromTrackInfo(trackInfo *livekit.TrackInfo) *[livekit.VideoQua var layerPresence [livekit.VideoQuality_HIGH + 1]bool for _, layer := range trackInfo.Layers { - layerPresence[layer.Quality] = true + // WARNING: comparing protobuf enum + if layer.Quality <= livekit.VideoQuality_HIGH { + layerPresence[layer.Quality] = true + } else { + logger.Warnw("unexpected quality in track info", nil, "trackInfo", logger.Proto(trackInfo)) + } } return &layerPresence From c4da7f5995c3ab78d80296bc3e03aaec5e538cc4 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Tue, 28 Nov 2023 21:15:22 -0800 Subject: [PATCH 08/13] Update github.com/livekit/mediatransportutil digest to 05525c8 (#2254) Generated by renovateBot Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 8143c37f2..4aa4c3fa8 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e + github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb github.com/livekit/protocol v1.9.2 github.com/livekit/psrpc v0.5.2 github.com/mackerelio/go-osstat v0.2.4 diff --git a/go.sum b/go.sum index 876fa1585..52dccb1e6 100644 --- a/go.sum +++ b/go.sum @@ -123,8 +123,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= -github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= +github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb h1:KiGg4k+kYQD9NjKixaSDMMeYOO2//XBM4IROTI1Itjo= +github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc= github.com/livekit/protocol v1.9.2 h1:gkKNjVwTbRNO1d5ZxTDHoZrOv4CMvGa1/oPwmCHp8oE= github.com/livekit/protocol v1.9.2/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U= From fa061b47fc7cc458c849ea9989ea7dac7e1da660 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 29 Nov 2023 15:40:01 +0530 Subject: [PATCH 09/13] Logging adjustnments (#2273) --- pkg/rtc/room.go | 23 +++++++++++++++-------- pkg/rtc/signalhandler.go | 2 +- pkg/service/roommanager.go | 10 ++++------ 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index e35fbc738..09904506f 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -24,6 +24,7 @@ import ( "time" "go.uber.org/atomic" + "golang.org/x/exp/maps" "google.golang.org/protobuf/proto" "github.com/pion/sctp" @@ -219,17 +220,21 @@ func (r *Room) GetParticipantByID(participantID livekit.ParticipantID) types.Loc func (r *Room) GetParticipants() []types.LocalParticipant { r.lock.RLock() defer r.lock.RUnlock() - participants := make([]types.LocalParticipant, 0, len(r.participants)) - for _, p := range r.participants { - participants = append(participants, p) - } - return participants + + return maps.Values(r.participants) } func (r *Room) GetLocalParticipants() []types.LocalParticipant { return r.GetParticipants() } +func (r *Room) GetParticipantCount() int { + r.lock.RLock() + defer r.lock.RUnlock() + + return len(r.participants) +} + func (r *Room) GetActiveSpeakers() []*livekit.SpeakerInfo { participants := r.GetParticipants() speakers := make([]*livekit.SpeakerInfo, 0, len(participants)) @@ -391,11 +396,13 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me } }) - r.Logger.Infow("new participant joined", + r.Logger.Debugw("new participant joined", "pID", participant.ID(), "participant", participant.Identity(), - "protocol", participant.ProtocolVersion(), - "options", opts) + "clientInfo", logger.Proto(participant.GetClientInfo()), + "options", opts, + "numParticipants", len(r.participants), + ) if participant.IsRecorder() && !r.protoRoom.ActiveRecording { r.protoRoom.ActiveRecording = true diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index 6f8ba407e..e835cf6ba 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -55,7 +55,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant participant.UpdateSubscribedTrackSettings(sid, msg.TrackSetting) } case *livekit.SignalRequest_Leave: - pLogger.Infow("client leaving room") + pLogger.Debugw("client leaving room") room.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonClientRequestLeave) case *livekit.SignalRequest_UpdateLayers: err := room.UpdateVideoLayers(participant, msg.UpdateLayers) diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index d26eec0d5..a6fa679f3 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -290,11 +290,10 @@ func (r *RoomManager) StartSession( return errors.New("could not restart closed participant") } - logger.Infow("resuming RTC session", - "room", roomName, + participant.GetLogger().Infow("resuming RTC session", "nodeID", r.currentNode.Id, - "participant", pi.Identity, "reason", pi.ReconnectReason, + "numParticipants", room.GetParticipantCount(), ) iceConfig := r.getIceConfig(participant) if iceConfig == nil { @@ -340,12 +339,11 @@ func (r *RoomManager) StartSession( "room", roomName, "nodeID", r.currentNode.Id, "participant", pi.Identity, - "sdk", pi.Client.Sdk, - "sdkVersion", pi.Client.Version, - "protocol", pi.Client.Protocol, + "clientInfo", logger.Proto(pi.Client), "reconnect", pi.Reconnect, "reconnectReason", pi.ReconnectReason, "adaptiveStream", pi.AdaptiveStream, + "numParticipants", room.GetParticipantCount(), ) clientConf := r.clientConfManager.GetConfiguration(pi.Client) From b9ab057c6bbbce4dc263ddc19df6d6d5f880b150 Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Wed, 29 Nov 2023 16:13:33 +0200 Subject: [PATCH 10/13] SIP: Update protocol for outbound calls. (#2263) --- pkg/service/sip.go | 54 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/pkg/service/sip.go b/pkg/service/sip.go index 2118d767b..c69540eed 100644 --- a/pkg/service/sip.go +++ b/pkg/service/sip.go @@ -16,14 +16,15 @@ package service import ( "context" - "fmt" - "github.com/livekit/livekit-server/pkg/config" - "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" "github.com/livekit/psrpc" + + "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/telemetry" ) type SIPService struct { @@ -161,15 +162,44 @@ func (s *SIPService) CreateSIPParticipant(ctx context.Context, req *livekit.Crea } info := &livekit.SIPParticipantInfo{ - SipParticipantId: utils.NewGuid(utils.SIPParticipantPrefix), + SipParticipantId: utils.NewGuid(utils.SIPParticipantPrefix), + SipTrunkId: req.SipTrunkId, + SipCallTo: req.SipCallTo, + RoomName: req.RoomName, + ParticipantIdentity: req.ParticipantIdentity, } if err := s.store.StoreSIPParticipant(ctx, info); err != nil { return nil, err } + s.updateParticipant(ctx, info) return info, nil } +func (s *SIPService) updateParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) { + AppendLogFields(ctx, "participantId", info.SipParticipantId, "room", info.RoomName, "trunk", info.SipTrunkId, "to", info.SipCallTo) + req := &rpc.InternalUpdateSIPParticipantRequest{ + ParticipantId: info.SipParticipantId, + CallTo: info.SipCallTo, + RoomName: info.RoomName, + ParticipantIdentity: info.ParticipantIdentity, + } + if info.SipTrunkId != "" { + trunk, err := s.store.LoadSIPTrunk(ctx, info.SipTrunkId) + if err != nil { + logger.Errorw("cannot get trunk to update sip participant", err) + return + } + req.Address = trunk.OutboundAddress + req.Number = trunk.OutboundNumber + req.Username = trunk.OutboundUsername + req.Password = trunk.OutboundPassword + } + if _, err := s.psrpcClient.UpdateSIPParticipant(ctx, req); err != nil { + logger.Errorw("cannot update sip participant", err) + } +} + func (s *SIPService) ListSIPParticipant(ctx context.Context, req *livekit.ListSIPParticipantRequest) (*livekit.ListSIPParticipantResponse, error) { if s.store == nil { return nil, ErrSIPNotConnected @@ -196,7 +226,10 @@ func (s *SIPService) DeleteSIPParticipant(ctx context.Context, req *livekit.Dele if err = s.store.DeleteSIPParticipant(ctx, info); err != nil { return nil, err } - + // These indicate that the call should be disconnected + info.SipTrunkId = "" + info.SipCallTo = "" + s.updateParticipant(ctx, info) return info, nil } @@ -204,6 +237,13 @@ func (s *SIPService) SendSIPParticipantDTMF(ctx context.Context, req *livekit.Se if s.store == nil { return nil, ErrSIPNotConnected } - - return nil, fmt.Errorf("TODO") + AppendLogFields(ctx, "participantId", req.SipParticipantId) + _, err := s.psrpcClient.SendSIPParticipantDTMF(ctx, &rpc.InternalSendSIPParticipantDTMFRequest{ + ParticipantId: req.SipParticipantId, + Digits: req.Digits, + }) + if err != nil { + logger.Errorw("cannot send dtmf to sip participant", err) + } + return nil, err } From a2053dfd945cf0e85660a82101fd025b6526c2fd Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 29 Nov 2023 23:17:17 +0530 Subject: [PATCH 11/13] ConnectionQuality DISCONNECTED -> LOST (#2276) --- go.mod | 4 ++-- go.sum | 8 ++++---- .../connectionquality/connectionstats_test.go | 8 ++++---- pkg/sfu/connectionquality/scorer.go | 16 ++++++++-------- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 4aa4c3fa8..558fd67d9 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb - github.com/livekit/protocol v1.9.2 + github.com/livekit/protocol v1.9.3-0.20231129173544-1c3f5fe919b0 github.com/livekit/psrpc v0.5.2 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -101,7 +101,7 @@ require ( golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.16.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect google.golang.org/grpc v1.59.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 52dccb1e6..a1b81d76c 100644 --- a/go.sum +++ b/go.sum @@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb h1:KiGg4k+kYQD9NjKixaSDMMeYOO2//XBM4IROTI1Itjo= github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc= -github.com/livekit/protocol v1.9.2 h1:gkKNjVwTbRNO1d5ZxTDHoZrOv4CMvGa1/oPwmCHp8oE= -github.com/livekit/protocol v1.9.2/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= +github.com/livekit/protocol v1.9.3-0.20231129173544-1c3f5fe919b0 h1:AhJlQejQ+Ma9Q+EPqCNt2S7h6ETJXDiO7qsQdTq9VvM= +github.com/livekit/protocol v1.9.3-0.20231129173544-1c3f5fe919b0/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U= github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -413,8 +413,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f h1:ultW7fxlIvee4HYrtnaRPon9HpEgFk5zYpmfMgtKB5I= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f/go.mod h1:L9KNLi232K1/xB6f7AlSX692koaRnKaWSR0stBki0Yc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/sfu/connectionquality/connectionstats_test.go b/pkg/sfu/connectionquality/connectionstats_test.go index 5af316ac2..7b07ba8bc 100644 --- a/pkg/sfu/connectionquality/connectionstats_test.go +++ b/pkg/sfu/connectionquality/connectionstats_test.go @@ -259,7 +259,7 @@ func TestConnectionQuality(t *testing.T) { require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) - // next update with no packets should knock quality down to DISCONNECTED + // next update with no packets should knock quality down to LOST now = now.Add(duration) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { @@ -273,14 +273,14 @@ func TestConnectionQuality(t *testing.T) { cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(2.1), mos) - require.Equal(t, livekit.ConnectionQuality_DISCONNECTED, quality) + require.Equal(t, livekit.ConnectionQuality_LOST, quality) - // mute when DISCONNECTED should not bump up score/quality + // mute when LOST should not bump up score/quality now = now.Add(duration) cs.UpdateMuteAt(true, now.Add(1*time.Second)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(2.1), mos) - require.Equal(t, livekit.ConnectionQuality_DISCONNECTED, quality) + require.Equal(t, livekit.ConnectionQuality_LOST, quality) // unmute and send packets to bring quality back up now = now.Add(duration) diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index 5649c20d2..ffc3961da 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -43,9 +43,9 @@ const ( var ( qualityTransitionScore = map[livekit.ConnectionQuality]float64{ - livekit.ConnectionQuality_GOOD: 80, - livekit.ConnectionQuality_POOR: 40, - livekit.ConnectionQuality_DISCONNECTED: 20, + livekit.ConnectionQuality_GOOD: 80, + livekit.ConnectionQuality_POOR: 40, + livekit.ConnectionQuality_LOST: 20, } ) @@ -227,8 +227,8 @@ func (q *qualityScorer) Start() { func (q *qualityScorer) updateMuteAtLocked(isMuted bool, at time.Time) { if isMuted { q.mutedAt = at - // muting when DISCONNECTED should not push quality to EXCELLENT - if q.score != qualityTransitionScore[livekit.ConnectionQuality_DISCONNECTED] { + // muting when LOST should not push quality to EXCELLENT + if q.score != qualityTransitionScore[livekit.ConnectionQuality_LOST] { q.score = cMaxScore } } else { @@ -376,7 +376,7 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { var score float64 if stat.packetsExpected == 0 { reason = "dry" - score = qualityTransitionScore[livekit.ConnectionQuality_DISCONNECTED] + score = qualityTransitionScore[livekit.ConnectionQuality_LOST] } else { packetScore := stat.calculatePacketScore(plw, q.params.IncludeRTT, q.params.IncludeJitter) bitrateScore := stat.calculateBitrateScore(expectedBitrate) @@ -543,11 +543,11 @@ func scoreToConnectionQuality(score float64) livekit.ConnectionQuality { return livekit.ConnectionQuality_GOOD } - if score > qualityTransitionScore[livekit.ConnectionQuality_DISCONNECTED] { + if score > qualityTransitionScore[livekit.ConnectionQuality_LOST] { return livekit.ConnectionQuality_POOR } - return livekit.ConnectionQuality_DISCONNECTED + return livekit.ConnectionQuality_LOST } // ------------------------------------------ From 0f1c1ec2249496539982bdbcf4d60d8a201abd0d Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Thu, 30 Nov 2023 04:12:29 +0800 Subject: [PATCH 12/13] clean dd log (#2275) * clean dd log * Implemented Raja's feedback --------- Co-authored-by: David Zhao --- pkg/sfu/buffer/dependencydescriptorparser.go | 5 ++--- pkg/sfu/videolayerselector/dependencydescriptor.go | 11 ++++------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/pkg/sfu/buffer/dependencydescriptorparser.go b/pkg/sfu/buffer/dependencydescriptorparser.go index 8cc648b7f..5b2744716 100644 --- a/pkg/sfu/buffer/dependencydescriptorparser.go +++ b/pkg/sfu/buffer/dependencydescriptorparser.go @@ -47,7 +47,6 @@ type DependencyDescriptorParser struct { } func NewDependencyDescriptorParser(ddExtID uint8, logger logger.Logger, onMaxLayerChanged func(int32, int32)) *DependencyDescriptorParser { - logger.Infow("creating dependency descriptor parser", "ddExtID", ddExtID) return &DependencyDescriptorParser{ ddExtID: ddExtID, logger: logger, @@ -86,7 +85,7 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr _, err := ext.Unmarshal(ddBuf) if err != nil { if err != dd.ErrDDReaderNoStructure { - r.logger.Warnw("failed to parse generic dependency descriptor", err, "payload", pkt.PayloadType, "ddbufLen", len(ddBuf)) + r.logger.Infow("failed to parse generic dependency descriptor", err, "payload", pkt.PayloadType, "ddbufLen", len(ddBuf)) } return nil, videoLayer, err } @@ -119,7 +118,7 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr } if r.structure == nil || ddVal.AttachedStructure.StructureId != r.structure.StructureId { - r.logger.Infow("structure updated", "structureID", ddVal.AttachedStructure.StructureId, "extSeq", extSeq, "extFN", extFN, "descriptor", ddVal.String()) + r.logger.Debugw("structure updated", "structureID", ddVal.AttachedStructure.StructureId, "extSeq", extSeq, "extFN", extFN, "descriptor", ddVal.String()) } r.structure = ddVal.AttachedStructure r.decodeTargets = ProcessFrameDependencyStructure(ddVal.AttachedStructure) diff --git a/pkg/sfu/videolayerselector/dependencydescriptor.go b/pkg/sfu/videolayerselector/dependencydescriptor.go index 3c091160e..9adc54b87 100644 --- a/pkg/sfu/videolayerselector/dependencydescriptor.go +++ b/pkg/sfu/videolayerselector/dependencydescriptor.go @@ -111,8 +111,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r } if ddwdt.StructureUpdated { - // TODO-REMOVE: remove this log after stable - d.logger.Infow("update dependency structure", + d.logger.Debugw("update dependency structure", "structureID", dd.AttachedStructure.StructureId, "structure", dd.AttachedStructure, "decodeTargets", ddwdt.DecodeTargets, @@ -127,8 +126,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r if ddwdt.ExtKeyFrameNum != d.extKeyFrameNum { // keyframe mismatch, drop and reset chains - // TODO-REMOVE: remove this log after stable - d.logger.Infow("drop packet for keyframe mismatch", "incoming", incomingLayer, "efn", extFrameNum, "sn", extPkt.Packet.SequenceNumber, "requiredKeyFrame", ddwdt.ExtKeyFrameNum, "structureKeyFrame", d.extKeyFrameNum) + d.logger.Debugw("drop packet for keyframe mismatch", "incoming", incomingLayer, "efn", extFrameNum, "sn", extPkt.Packet.SequenceNumber, "requiredKeyFrame", ddwdt.ExtKeyFrameNum, "structureKeyFrame", d.extKeyFrameNum) d.decisions.AddDropped(extFrameNum) d.invalidateKeyFrame() return @@ -138,9 +136,8 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r d.updateActiveDecodeTargets(*dd.ActiveDecodeTargetsBitmask) } - // TODO-REMOVE: remove this log after stable if len(fd.ChainDiffs) != len(d.chains) { - d.logger.Warnw("frame chain diff length mismatch", nil, + d.logger.Debugw("frame chain diff length mismatch", nil, "incoming", incomingLayer, "efn", extFrameNum, "sn", extPkt.Packet.SequenceNumber, @@ -255,7 +252,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r result.IsSwitching = true if !d.currentLayer.IsValid() { result.IsResuming = true - d.logger.Infow( + d.logger.Debugw( "resuming at layer", "current", incomingLayer, "target", d.targetLayer, From 2ee5aa7c985f7555ee6055dbfc9f663489eebcc0 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 30 Nov 2023 13:04:31 +0530 Subject: [PATCH 13/13] Add optional supervisor disable. (#2277) * Add optional supervisor disable. Used `DisableSupervisor` so that default can be enabled and it can be disabled explicity. But, open to defaulting to disable (i. e. change param to `EnableSupervisor`). * Move nil check to call site --- pkg/rtc/participant.go | 41 ++++++++++++++++++++++--------- pkg/sfu/buffer/rtpstats_sender.go | 1 + 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 8362932b7..6e45fcef9 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -117,6 +117,7 @@ type ParticipantParams struct { AllowUDPUnstableFallback bool TURNSEnabled bool GetParticipantInfo func(pID livekit.ParticipantID) *livekit.ParticipantInfo + DisableSupervisor bool ReconnectOnPublicationError bool ReconnectOnSubscriptionError bool ReconnectOnDataChannelError bool @@ -241,11 +242,13 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, params.SID), params.SID, params.Telemetry), - supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}), tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality), pubLogger: params.Logger.WithComponent(sutils.ComponentPub), subLogger: params.Logger.WithComponent(sutils.ComponentSub), } + if !params.DisableSupervisor { + p.supervisor = supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}) + } p.version.Store(params.InitialVersion) p.timedVersion.Update(params.VersionGenerator.New()) p.migrateState.Store(types.MigrateStateInit) @@ -255,7 +258,9 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { p.SetResponseSink(params.Sink) p.setupEnabledCodecs(params.PublishEnabledCodecs, params.SubscribeEnabledCodecs, params.ClientConf.GetDisabledCodecs()) - p.supervisor.OnPublicationError(p.onPublicationError) + if p.supervisor != nil { + p.supervisor.OnPublicationError(p.onPublicationError) + } var err error // keep last participants and when updates were sent @@ -712,8 +717,10 @@ func (p *ParticipantImpl) SetMigrateInfo( for _, t := range mediaTracks { ti := t.GetTrack() - p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) - p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + if p.supervisor != nil { + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + } p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}, migrated: true} p.pubLogger.Infow("pending track added (migration)", "trackID", ti.Sid, "track", logger.Proto(ti)) @@ -756,7 +763,9 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea }) } - p.supervisor.Stop() + if p.supervisor != nil { + p.supervisor.Stop() + } p.pendingTracksLock.Lock() p.pendingTracks = make(map[string]*pendingTrackInfo) @@ -1393,7 +1402,9 @@ func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit. } func (p *ParticipantImpl) onPublisherInitialConnected() { - p.supervisor.SetPublisherPeerConnectionConnected(true) + if p.supervisor != nil { + p.supervisor.SetPublisherPeerConnectionConnected(true) + } go p.publisherRTCPWorker() } @@ -1663,8 +1674,10 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l } p.params.Telemetry.TrackPublishRequested(context.Background(), p.ID(), p.Identity(), ti) - p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) - p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + if p.supervisor != nil { + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + } if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil || p.pendingTracks[req.Cid] != nil { if p.pendingTracks[req.Cid] == nil { p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}} @@ -1716,7 +1729,9 @@ func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fro func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) *livekit.TrackInfo { p.dirty.Store(true) - p.supervisor.SetPublicationMute(trackID, muted) + if p.supervisor != nil { + p.supervisor.SetPublicationMute(trackID, muted) + } track := p.UpTrackManager.SetPublishedTrackMuted(trackID, muted) var trackInfo *livekit.TrackInfo @@ -1901,7 +1916,9 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv mt.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange) // add to published and clean up pending - p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt) + if p.supervisor != nil { + p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt) + } p.UpTrackManager.AddPublishedTrack(mt) pti := p.pendingTracks[signalCid] @@ -1922,7 +1939,9 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv trackID := livekit.TrackID(ti.Sid) mt.AddOnClose(func() { - p.supervisor.ClearPublishedTrack(trackID, mt) + if p.supervisor != nil { + p.supervisor.ClearPublishedTrack(trackID, mt) + } // not logged when closing p.params.Telemetry.TrackUnpublished( diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index fd2e9dd58..1cb0ff1bd 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -544,6 +544,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt "extLastRRSN", s.extLastRRSN, "firstTime", r.firstTime.String(), "startTime", r.startTime.String(), + "highestTime", r.highestTime.String(), "extReceivedRRSN", extReceivedRRSN, "packetsInInterval", extReceivedRRSN-s.extLastRRSN, "intervalStats", is.ToString(),