diff --git a/go.mod b/go.mod index c93bc5d8c..558fd67d9 100644 --- a/go.mod +++ b/go.mod @@ -17,8 +17,8 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 + github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb + github.com/livekit/protocol v1.9.3-0.20231129173544-1c3f5fe919b0 github.com/livekit/psrpc v0.5.2 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -46,7 +46,7 @@ require ( github.com/urfave/cli/v2 v2.25.7 github.com/urfave/negroni/v3 v3.0.0 go.uber.org/atomic v1.11.0 - golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa + golang.org/x/exp v0.0.0-20231127185646-65229373498e golang.org/x/sync v0.5.0 google.golang.org/protobuf v1.31.0 gopkg.in/yaml.v3 v3.0.1 @@ -70,7 +70,7 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.5 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/josharian/native v1.1.0 // indirect - github.com/klauspost/compress v1.17.2 // indirect + github.com/klauspost/compress v1.17.3 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect @@ -95,13 +95,13 @@ require ( github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect - golang.org/x/crypto v0.15.0 // indirect + golang.org/x/crypto v0.16.0 // indirect golang.org/x/mod v0.14.0 // indirect - golang.org/x/net v0.18.0 // indirect - golang.org/x/sys v0.14.0 // indirect + golang.org/x/net v0.19.0 // indirect + golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.15.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect + golang.org/x/tools v0.16.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect google.golang.org/grpc v1.59.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 909a503d1..a1b81d76c 100644 --- a/go.sum +++ b/go.sum @@ -107,8 +107,8 @@ github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786 h1:N527AHMa79 github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786/go.mod h1:v4hqbTdfQngbVSZJVWUhGE/lbTFf9jb+ygmNUDQMuOs= github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= +github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -123,10 +123,10 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= -github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 h1:M/ljEz6MCH5lovoTT0t6hyaaZJEn4hvXs9J9OtQ+gS4= -github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7/go.mod h1:JgFHHd99wgEp4smATlJupOdA7iJHFoj2g3RFeM/Hk8M= +github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb h1:KiGg4k+kYQD9NjKixaSDMMeYOO2//XBM4IROTI1Itjo= +github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc= +github.com/livekit/protocol v1.9.3-0.20231129173544-1c3f5fe919b0 h1:AhJlQejQ+Ma9Q+EPqCNt2S7h6ETJXDiO7qsQdTq9VvM= +github.com/livekit/protocol v1.9.3-0.20231129173544-1c3f5fe919b0/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U= github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -291,10 +291,10 @@ golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= -golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA= -golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g= -golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= -golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= +golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= +golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= +golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= @@ -327,8 +327,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= -golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= -golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -378,8 +378,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -407,14 +407,14 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.15.0 h1:zdAyfUGbYmuVokhzVmghFl2ZJh5QhcfebBgmVPFYA+8= -golang.org/x/tools v0.15.0/go.mod h1:hpksKq4dtpQWS1uQ61JkdqWM3LscIS6Slf+VVkm+wQk= +golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM= +golang.org/x/tools v0.16.0/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 692885a25..20883e19a 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -828,4 +828,20 @@ func (t *MediaTrackReceiver) IsEncrypted() bool { return t.trackInfo.Encryption != livekit.Encryption_NONE } +func (t *MediaTrackReceiver) GetTrackStats() *livekit.RTPStats { + t.lock.Lock() + receivers := t.receiversShadow + t.lock.Unlock() + + stats := make([]*livekit.RTPStats, 0, len(receivers)) + for _, receiver := range receivers { + receiverStats := receiver.GetTrackStats() + if receiverStats != nil { + stats = append(stats, receiverStats) + } + } + + return buffer.AggregateRTPStats(stats) +} + // --------------------------- diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 09f3c77b9..6e45fcef9 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -117,6 +117,7 @@ type ParticipantParams struct { AllowUDPUnstableFallback bool TURNSEnabled bool GetParticipantInfo func(pID livekit.ParticipantID) *livekit.ParticipantInfo + DisableSupervisor bool ReconnectOnPublicationError bool ReconnectOnSubscriptionError bool ReconnectOnDataChannelError bool @@ -167,6 +168,7 @@ type ParticipantImpl struct { *TransportManager *UpTrackManager *SubscriptionManager + *ParticipantTrafficLoad // keeps track of unpublished tracks in order to reuse trackID unpublishedTracks []*livekit.TrackInfo @@ -240,11 +242,13 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, params.SID), params.SID, params.Telemetry), - supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}), tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality), pubLogger: params.Logger.WithComponent(sutils.ComponentPub), subLogger: params.Logger.WithComponent(sutils.ComponentSub), } + if !params.DisableSupervisor { + p.supervisor = supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}) + } p.version.Store(params.InitialVersion) p.timedVersion.Update(params.VersionGenerator.New()) p.migrateState.Store(types.MigrateStateInit) @@ -254,7 +258,9 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { p.SetResponseSink(params.Sink) p.setupEnabledCodecs(params.PublishEnabledCodecs, params.SubscribeEnabledCodecs, params.ClientConf.GetDisabledCodecs()) - p.supervisor.OnPublicationError(p.onPublicationError) + if p.supervisor != nil { + p.supervisor.OnPublicationError(p.onPublicationError) + } var err error // keep last participants and when updates were sent @@ -269,6 +275,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { p.setupUpTrackManager() p.setupSubscriptionManager() + p.setupParticipantTrafficLoad() return p, nil } @@ -710,8 +717,10 @@ func (p *ParticipantImpl) SetMigrateInfo( for _, t := range mediaTracks { ti := t.GetTrack() - p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) - p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + if p.supervisor != nil { + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + } p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}, migrated: true} p.pubLogger.Infow("pending track added (migration)", "trackID", ti.Sid, "track", logger.Proto(ti)) @@ -754,7 +763,9 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea }) } - p.supervisor.Stop() + if p.supervisor != nil { + p.supervisor.Stop() + } p.pendingTracksLock.Lock() p.pendingTracks = make(map[string]*pendingTrackInfo) @@ -784,6 +795,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea go func() { p.SubscriptionManager.Close(isExpectedToResume) p.TransportManager.Close() + p.ParticipantTrafficLoad.Close() }() p.dataChannelStats.Stop() @@ -915,10 +927,6 @@ func (p *ParticipantImpl) OnICEConfigChanged(f func(participant types.LocalParti p.lock.Unlock() } -// -// signal connection methods -// - func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo { numTracks := 0 minQuality := livekit.ConnectionQuality_EXCELLENT @@ -932,8 +940,7 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo numTracks++ score, quality := pt.(types.LocalMediaTrack).GetConnectionScoreAndQuality() - if quality < minQuality { - // WARNING NOTE: comparing protobuf enums directly + if utils.IsConnectionQualityLower(minQuality, quality) { minQuality = quality minScore = score } else if quality == minQuality && score < minScore { @@ -943,8 +950,7 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo p.lock.Lock() trackID := pt.ID() if prevQuality, ok := p.tracksQuality[trackID]; ok { - // WARNING NOTE: comparing protobuf enums directly - if prevQuality > quality { + if utils.IsConnectionQualityLower(prevQuality, quality) { numUpDrops++ } } @@ -959,8 +965,7 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo numTracks++ score, quality := subTrack.DownTrack().GetConnectionScoreAndQuality() - if quality < minQuality { - // WARNING NOTE: comparing protobuf enums directly + if utils.IsConnectionQualityLower(minQuality, quality) { minQuality = quality minScore = score } else if quality == minQuality && score < minScore { @@ -970,8 +975,7 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo p.lock.Lock() trackID := subTrack.ID() if prevQuality, ok := p.tracksQuality[trackID]; ok { - // WARNING NOTE: comparing protobuf enums directly - if prevQuality > quality { + if utils.IsConnectionQualityLower(prevQuality, quality) { numDownDrops++ } } @@ -1227,6 +1231,14 @@ func (p *ParticipantImpl) setupSubscriptionManager() { }) } +func (p *ParticipantImpl) setupParticipantTrafficLoad() { + p.ParticipantTrafficLoad = NewParticipantTrafficLoad(ParticipantTrafficLoadParams{ + Participant: p, + DataChannelStats: p.dataChannelStats, + Logger: p.params.Logger, + }) +} + func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) { oldState := p.State() if state == oldState { @@ -1390,7 +1402,9 @@ func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit. } func (p *ParticipantImpl) onPublisherInitialConnected() { - p.supervisor.SetPublisherPeerConnectionConnected(true) + if p.supervisor != nil { + p.supervisor.SetPublisherPeerConnectionConnected(true) + } go p.publisherRTCPWorker() } @@ -1660,8 +1674,10 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l } p.params.Telemetry.TrackPublishRequested(context.Background(), p.ID(), p.Identity(), ti) - p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) - p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + if p.supervisor != nil { + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + } if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil || p.pendingTracks[req.Cid] != nil { if p.pendingTracks[req.Cid] == nil { p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}} @@ -1713,7 +1729,9 @@ func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fro func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) *livekit.TrackInfo { p.dirty.Store(true) - p.supervisor.SetPublicationMute(trackID, muted) + if p.supervisor != nil { + p.supervisor.SetPublicationMute(trackID, muted) + } track := p.UpTrackManager.SetPublishedTrackMuted(trackID, muted) var trackInfo *livekit.TrackInfo @@ -1898,7 +1916,9 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv mt.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange) // add to published and clean up pending - p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt) + if p.supervisor != nil { + p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt) + } p.UpTrackManager.AddPublishedTrack(mt) pti := p.pendingTracks[signalCid] @@ -1919,7 +1939,9 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv trackID := livekit.TrackID(ti.Sid) mt.AddOnClose(func() { - p.supervisor.ClearPublishedTrack(trackID, mt) + if p.supervisor != nil { + p.supervisor.ClearPublishedTrack(trackID, mt) + } // not logged when closing p.params.Telemetry.TrackUnpublished( diff --git a/pkg/rtc/participant_traffic_load.go b/pkg/rtc/participant_traffic_load.go new file mode 100644 index 000000000..eef44ad5e --- /dev/null +++ b/pkg/rtc/participant_traffic_load.go @@ -0,0 +1,211 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rtc + +import ( + "sync" + "time" + + "github.com/frostbyte73/core" + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +const ( + reportInterval = 10 * time.Second +) + +type ParticipantTrafficLoadParams struct { + Participant *ParticipantImpl + DataChannelStats *telemetry.BytesTrackStats + Logger logger.Logger +} + +type ParticipantTrafficLoad struct { + params ParticipantTrafficLoadParams + + lock sync.RWMutex + onTrafficLoad func(trafficLoad *types.TrafficLoad) + tracksStatsMedia map[livekit.TrackID]*livekit.RTPStats + dataChannelTraffic *telemetry.TrafficTotals + trafficLoad *types.TrafficLoad + + closed core.Fuse +} + +func NewParticipantTrafficLoad(params ParticipantTrafficLoadParams) *ParticipantTrafficLoad { + p := &ParticipantTrafficLoad{ + params: params, + tracksStatsMedia: make(map[livekit.TrackID]*livekit.RTPStats), + closed: core.NewFuse(), + } + go p.reporter() + return p +} + +func (p *ParticipantTrafficLoad) Close() { + p.closed.Break() +} + +func (p *ParticipantTrafficLoad) OnTrafficLoad(f func(trafficLoad *types.TrafficLoad)) { + p.lock.Lock() + p.onTrafficLoad = f + p.lock.Unlock() +} + +func (p *ParticipantTrafficLoad) getOnTrafficLoad() func(trafficLoad *types.TrafficLoad) { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.onTrafficLoad +} + +func (p *ParticipantTrafficLoad) GetTrafficLoad() *types.TrafficLoad { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.trafficLoad +} + +func (p *ParticipantTrafficLoad) updateTrafficLoad() *types.TrafficLoad { + publishedTracks := p.params.Participant.GetPublishedTracks() + subscribedTracks := p.params.Participant.SubscriptionManager.GetSubscribedTracks() + + availableTracks := make(map[livekit.TrackID]bool, len(publishedTracks)+len(subscribedTracks)) + + upstreamAudioStats := make([]*types.TrafficStats, 0, len(publishedTracks)) + upstreamVideoStats := make([]*types.TrafficStats, 0, len(publishedTracks)) + + downstreamAudioStats := make([]*types.TrafficStats, 0, len(subscribedTracks)) + downstreamVideoStats := make([]*types.TrafficStats, 0, len(subscribedTracks)) + + p.lock.Lock() + defer p.lock.Unlock() + for _, pt := range publishedTracks { + lmt, ok := pt.(types.LocalMediaTrack) + if !ok { + continue + } + trackID := lmt.ID() + stats := lmt.GetTrackStats() + trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats) + if stats != nil { + p.tracksStatsMedia[trackID] = stats + availableTracks[trackID] = true + } + if trafficStats != nil { + switch lmt.Kind() { + case livekit.TrackType_AUDIO: + upstreamAudioStats = append(upstreamAudioStats, trafficStats) + case livekit.TrackType_VIDEO: + upstreamVideoStats = append(upstreamVideoStats, trafficStats) + } + } + } + + for _, st := range subscribedTracks { + trackID := st.ID() + stats := st.DownTrack().GetTrackStats() + trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats) + if stats != nil { + p.tracksStatsMedia[trackID] = stats + availableTracks[trackID] = true + } + if trafficStats != nil { + switch st.MediaTrack().Kind() { + case livekit.TrackType_AUDIO: + downstreamAudioStats = append(downstreamAudioStats, trafficStats) + case livekit.TrackType_VIDEO: + downstreamVideoStats = append(downstreamVideoStats, trafficStats) + } + } + } + + // remove unavailable tracks from track stats cache + for trackID := range p.tracksStatsMedia { + if !availableTracks[trackID] { + delete(p.tracksStatsMedia, trackID) + } + } + + trafficTypeStats := make([]*types.TrafficTypeStats, 0, 6) + addTypeStats := func(statsList []*types.TrafficStats, trackType livekit.TrackType, streamType livekit.StreamType) { + agg := types.AggregateTrafficStats(statsList) + if agg != nil { + trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{ + TrackType: trackType, + StreamType: streamType, + TrafficStats: agg, + }) + } + } + addTypeStats(upstreamAudioStats, livekit.TrackType_AUDIO, livekit.StreamType_UPSTREAM) + addTypeStats(upstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_UPSTREAM) + addTypeStats(downstreamAudioStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM) + addTypeStats(downstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM) + + if p.params.DataChannelStats != nil { + dataChannelTraffic := p.params.DataChannelStats.GetTrafficTotals() + if p.dataChannelTraffic != nil { + trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{ + TrackType: livekit.TrackType_DATA, + StreamType: livekit.StreamType_UPSTREAM, + TrafficStats: &types.TrafficStats{ + StartTime: p.dataChannelTraffic.At, + EndTime: dataChannelTraffic.At, + Packets: dataChannelTraffic.RecvMessages - p.dataChannelTraffic.RecvMessages, + Bytes: dataChannelTraffic.RecvBytes - p.dataChannelTraffic.RecvBytes, + }, + }) + + trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{ + TrackType: livekit.TrackType_DATA, + StreamType: livekit.StreamType_DOWNSTREAM, + TrafficStats: &types.TrafficStats{ + StartTime: p.dataChannelTraffic.At, + EndTime: dataChannelTraffic.At, + Packets: dataChannelTraffic.SendMessages - p.dataChannelTraffic.SendMessages, + Bytes: dataChannelTraffic.SendBytes - p.dataChannelTraffic.SendBytes, + }, + }) + } + p.dataChannelTraffic = dataChannelTraffic + } + + p.trafficLoad = &types.TrafficLoad{ + TrafficTypeStats: trafficTypeStats, + } + return p.trafficLoad +} + +func (p *ParticipantTrafficLoad) reporter() { + ticker := time.NewTicker(reportInterval) + defer ticker.Stop() + + for { + select { + case <-p.closed.Watch(): + return + + case <-ticker.C: + trafficLoad := p.updateTrafficLoad() + if onTrafficLoad := p.getOnTrafficLoad(); onTrafficLoad != nil { + onTrafficLoad(trafficLoad) + } + } + } +} diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index e35fbc738..09904506f 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -24,6 +24,7 @@ import ( "time" "go.uber.org/atomic" + "golang.org/x/exp/maps" "google.golang.org/protobuf/proto" "github.com/pion/sctp" @@ -219,17 +220,21 @@ func (r *Room) GetParticipantByID(participantID livekit.ParticipantID) types.Loc func (r *Room) GetParticipants() []types.LocalParticipant { r.lock.RLock() defer r.lock.RUnlock() - participants := make([]types.LocalParticipant, 0, len(r.participants)) - for _, p := range r.participants { - participants = append(participants, p) - } - return participants + + return maps.Values(r.participants) } func (r *Room) GetLocalParticipants() []types.LocalParticipant { return r.GetParticipants() } +func (r *Room) GetParticipantCount() int { + r.lock.RLock() + defer r.lock.RUnlock() + + return len(r.participants) +} + func (r *Room) GetActiveSpeakers() []*livekit.SpeakerInfo { participants := r.GetParticipants() speakers := make([]*livekit.SpeakerInfo, 0, len(participants)) @@ -391,11 +396,13 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me } }) - r.Logger.Infow("new participant joined", + r.Logger.Debugw("new participant joined", "pID", participant.ID(), "participant", participant.Identity(), - "protocol", participant.ProtocolVersion(), - "options", opts) + "clientInfo", logger.Proto(participant.GetClientInfo()), + "options", opts, + "numParticipants", len(r.participants), + ) if participant.IsRecorder() && !r.protoRoom.ActiveRecording { r.protoRoom.ActiveRecording = true diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 36322cbc1..81dfe90c8 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -31,10 +31,15 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/types/typesfakes" "github.com/livekit/livekit-server/pkg/sfu/audio" "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes" "github.com/livekit/livekit-server/pkg/testutils" ) +func init() { + prometheus.Init("test", livekit.NodeType_SERVER, "test") +} + const ( numParticipants = 3 defaultDelay = 10 * time.Millisecond diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index 6f8ba407e..e835cf6ba 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -55,7 +55,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant participant.UpdateSubscribedTrackSettings(sid, msg.TrackSetting) } case *livekit.SignalRequest_Leave: - pLogger.Infow("client leaving room") + pLogger.Debugw("client leaving room") room.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonClientRequestLeave) case *livekit.SignalRequest_UpdateLayers: err := room.UpdateVideoLayers(participant, msg.UpdateLayers) diff --git a/pkg/rtc/testutils.go b/pkg/rtc/testutils.go index fe5c10ad6..845cc4629 100644 --- a/pkg/rtc/testutils.go +++ b/pkg/rtc/testutils.go @@ -21,13 +21,8 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/rtc/types/typesfakes" - "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) -func init() { - prometheus.Init("test", livekit.NodeType_SERVER, "test") -} - func NewMockParticipant(identity livekit.ParticipantIdentity, protocol types.ProtocolVersion, hidden bool, publisher bool) *typesfakes.FakeLocalParticipant { p := &typesfakes.FakeLocalParticipant{} sid := utils.NewGuid(utils.ParticipantPrefix) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index bef8abb6e..266e24b8b 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -395,6 +395,7 @@ type LocalParticipant interface { OnClose(callback func(LocalParticipant)) OnClaimsChanged(callback func(LocalParticipant)) OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) + OnTrafficLoad(callback func(trafficLoad *TrafficLoad)) // session migration MaybeStartMigration(force bool, onStart func()) bool @@ -420,6 +421,8 @@ type LocalParticipant interface { SetSubscriberChannelCapacity(channelCapacity int64) GetPacer() pacer.Pacer + + GetTrafficLoad() *TrafficLoad } // Room is a container of participants, and can provide room-level actions @@ -499,6 +502,7 @@ type LocalMediaTrack interface { HasSdpCid(cid string) bool GetConnectionScoreAndQuality() (float32, livekit.ConnectionQuality) + GetTrackStats() *livekit.RTPStats SetRTT(rtt uint32) diff --git a/pkg/rtc/types/trafficstats.go b/pkg/rtc/types/trafficstats.go new file mode 100644 index 000000000..70b269db8 --- /dev/null +++ b/pkg/rtc/types/trafficstats.go @@ -0,0 +1,126 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "time" + + "github.com/livekit/protocol/livekit" +) + +type TrafficStats struct { + StartTime time.Time + EndTime time.Time + Packets uint32 + Bytes uint64 +} + +type TrafficTypeStats struct { + TrackType livekit.TrackType + StreamType livekit.StreamType + TrafficStats *TrafficStats +} + +type TrafficLoad struct { + TrafficTypeStats []*TrafficTypeStats +} + +func RTPStatsDiffToTrafficStats(before, after *livekit.RTPStats) *TrafficStats { + if after == nil { + return nil + } + + startTime := after.StartTime + if before != nil { + startTime = before.EndTime + } + + if before == nil { + return &TrafficStats{ + StartTime: startTime.AsTime(), + EndTime: after.EndTime.AsTime(), + Packets: after.Packets, + Bytes: after.Bytes + after.BytesDuplicate + after.BytesPadding, + } + } + + return &TrafficStats{ + StartTime: startTime.AsTime(), + EndTime: after.EndTime.AsTime(), + Packets: after.Packets - before.Packets, + Bytes: (after.Bytes + after.BytesDuplicate + after.BytesPadding) - (before.Bytes + before.BytesDuplicate + before.BytesPadding), + } +} + +func AggregateTrafficStats(statsList []*TrafficStats) *TrafficStats { + if len(statsList) == 0 { + return nil + } + + startTime := time.Time{} + endTime := time.Time{} + + packets := uint32(0) + bytes := uint64(0) + + for _, stats := range statsList { + if startTime.IsZero() || startTime.After(stats.StartTime) { + startTime = stats.StartTime + } + + if endTime.IsZero() || endTime.Before(stats.EndTime) { + endTime = stats.EndTime + } + + packets += stats.Packets + bytes += stats.Bytes + } + + if endTime.IsZero() { + endTime = time.Now() + } + return &TrafficStats{ + StartTime: startTime, + EndTime: endTime, + Packets: packets, + Bytes: bytes, + } +} + +func TrafficLoadToTrafficRate(trafficLoad *TrafficLoad) ( + packetRateIn float64, + byteRateIn float64, + packetRateOut float64, + byteRateOut float64, +) { + if trafficLoad == nil { + return + } + + for _, trafficTypeStat := range trafficLoad.TrafficTypeStats { + elapsed := trafficTypeStat.TrafficStats.EndTime.Sub(trafficTypeStat.TrafficStats.StartTime).Seconds() + packetRate := float64(trafficTypeStat.TrafficStats.Packets) / elapsed + byteRate := float64(trafficTypeStat.TrafficStats.Bytes) / elapsed + switch trafficTypeStat.StreamType { + case livekit.StreamType_UPSTREAM: + packetRateIn += packetRate + byteRateIn += byteRate + case livekit.StreamType_DOWNSTREAM: + packetRateOut += packetRate + byteRateOut += byteRate + } + } + return +} diff --git a/pkg/rtc/types/typesfakes/fake_local_media_track.go b/pkg/rtc/types/typesfakes/fake_local_media_track.go index 83fdb1580..a606f8ef1 100644 --- a/pkg/rtc/types/typesfakes/fake_local_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_local_media_track.go @@ -107,6 +107,16 @@ type FakeLocalMediaTrack struct { getTemporalLayerForSpatialFpsReturnsOnCall map[int]struct { result1 int32 } + GetTrackStatsStub func() *livekit.RTPStats + getTrackStatsMutex sync.RWMutex + getTrackStatsArgsForCall []struct { + } + getTrackStatsReturns struct { + result1 *livekit.RTPStats + } + getTrackStatsReturnsOnCall map[int]struct { + result1 *livekit.RTPStats + } HasSdpCidStub func(string) bool hasSdpCidMutex sync.RWMutex hasSdpCidArgsForCall []struct { @@ -834,6 +844,59 @@ func (fake *FakeLocalMediaTrack) GetTemporalLayerForSpatialFpsReturnsOnCall(i in }{result1} } +func (fake *FakeLocalMediaTrack) GetTrackStats() *livekit.RTPStats { + fake.getTrackStatsMutex.Lock() + ret, specificReturn := fake.getTrackStatsReturnsOnCall[len(fake.getTrackStatsArgsForCall)] + fake.getTrackStatsArgsForCall = append(fake.getTrackStatsArgsForCall, struct { + }{}) + stub := fake.GetTrackStatsStub + fakeReturns := fake.getTrackStatsReturns + fake.recordInvocation("GetTrackStats", []interface{}{}) + fake.getTrackStatsMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalMediaTrack) GetTrackStatsCallCount() int { + fake.getTrackStatsMutex.RLock() + defer fake.getTrackStatsMutex.RUnlock() + return len(fake.getTrackStatsArgsForCall) +} + +func (fake *FakeLocalMediaTrack) GetTrackStatsCalls(stub func() *livekit.RTPStats) { + fake.getTrackStatsMutex.Lock() + defer fake.getTrackStatsMutex.Unlock() + fake.GetTrackStatsStub = stub +} + +func (fake *FakeLocalMediaTrack) GetTrackStatsReturns(result1 *livekit.RTPStats) { + fake.getTrackStatsMutex.Lock() + defer fake.getTrackStatsMutex.Unlock() + fake.GetTrackStatsStub = nil + fake.getTrackStatsReturns = struct { + result1 *livekit.RTPStats + }{result1} +} + +func (fake *FakeLocalMediaTrack) GetTrackStatsReturnsOnCall(i int, result1 *livekit.RTPStats) { + fake.getTrackStatsMutex.Lock() + defer fake.getTrackStatsMutex.Unlock() + fake.GetTrackStatsStub = nil + if fake.getTrackStatsReturnsOnCall == nil { + fake.getTrackStatsReturnsOnCall = make(map[int]struct { + result1 *livekit.RTPStats + }) + } + fake.getTrackStatsReturnsOnCall[i] = struct { + result1 *livekit.RTPStats + }{result1} +} + func (fake *FakeLocalMediaTrack) HasSdpCid(arg1 string) bool { fake.hasSdpCidMutex.Lock() ret, specificReturn := fake.hasSdpCidReturnsOnCall[len(fake.hasSdpCidArgsForCall)] @@ -2069,6 +2132,8 @@ func (fake *FakeLocalMediaTrack) Invocations() map[string][][]interface{} { defer fake.getQualityForDimensionMutex.RUnlock() fake.getTemporalLayerForSpatialFpsMutex.RLock() defer fake.getTemporalLayerForSpatialFpsMutex.RUnlock() + fake.getTrackStatsMutex.RLock() + defer fake.getTrackStatsMutex.RUnlock() fake.hasSdpCidMutex.RLock() defer fake.hasSdpCidMutex.RUnlock() fake.iDMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 9e11470c3..a01a3834b 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -325,6 +325,16 @@ type FakeLocalParticipant struct { getSubscribedTracksReturnsOnCall map[int]struct { result1 []types.SubscribedTrack } + GetTrafficLoadStub func() *types.TrafficLoad + getTrafficLoadMutex sync.RWMutex + getTrafficLoadArgsForCall []struct { + } + getTrafficLoadReturns struct { + result1 *types.TrafficLoad + } + getTrafficLoadReturnsOnCall map[int]struct { + result1 *types.TrafficLoad + } GetTrailerStub func() []byte getTrailerMutex sync.RWMutex getTrailerArgsForCall []struct { @@ -582,6 +592,11 @@ type FakeLocalParticipant struct { onTrackUpdatedArgsForCall []struct { arg1 func(types.LocalParticipant, types.MediaTrack) } + OnTrafficLoadStub func(func(trafficLoad *types.TrafficLoad)) + onTrafficLoadMutex sync.RWMutex + onTrafficLoadArgsForCall []struct { + arg1 func(trafficLoad *types.TrafficLoad) + } ProtocolVersionStub func() types.ProtocolVersion protocolVersionMutex sync.RWMutex protocolVersionArgsForCall []struct { @@ -2540,6 +2555,59 @@ func (fake *FakeLocalParticipant) GetSubscribedTracksReturnsOnCall(i int, result }{result1} } +func (fake *FakeLocalParticipant) GetTrafficLoad() *types.TrafficLoad { + fake.getTrafficLoadMutex.Lock() + ret, specificReturn := fake.getTrafficLoadReturnsOnCall[len(fake.getTrafficLoadArgsForCall)] + fake.getTrafficLoadArgsForCall = append(fake.getTrafficLoadArgsForCall, struct { + }{}) + stub := fake.GetTrafficLoadStub + fakeReturns := fake.getTrafficLoadReturns + fake.recordInvocation("GetTrafficLoad", []interface{}{}) + fake.getTrafficLoadMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) GetTrafficLoadCallCount() int { + fake.getTrafficLoadMutex.RLock() + defer fake.getTrafficLoadMutex.RUnlock() + return len(fake.getTrafficLoadArgsForCall) +} + +func (fake *FakeLocalParticipant) GetTrafficLoadCalls(stub func() *types.TrafficLoad) { + fake.getTrafficLoadMutex.Lock() + defer fake.getTrafficLoadMutex.Unlock() + fake.GetTrafficLoadStub = stub +} + +func (fake *FakeLocalParticipant) GetTrafficLoadReturns(result1 *types.TrafficLoad) { + fake.getTrafficLoadMutex.Lock() + defer fake.getTrafficLoadMutex.Unlock() + fake.GetTrafficLoadStub = nil + fake.getTrafficLoadReturns = struct { + result1 *types.TrafficLoad + }{result1} +} + +func (fake *FakeLocalParticipant) GetTrafficLoadReturnsOnCall(i int, result1 *types.TrafficLoad) { + fake.getTrafficLoadMutex.Lock() + defer fake.getTrafficLoadMutex.Unlock() + fake.GetTrafficLoadStub = nil + if fake.getTrafficLoadReturnsOnCall == nil { + fake.getTrafficLoadReturnsOnCall = make(map[int]struct { + result1 *types.TrafficLoad + }) + } + fake.getTrafficLoadReturnsOnCall[i] = struct { + result1 *types.TrafficLoad + }{result1} +} + func (fake *FakeLocalParticipant) GetTrailer() []byte { fake.getTrailerMutex.Lock() ret, specificReturn := fake.getTrailerReturnsOnCall[len(fake.getTrailerArgsForCall)] @@ -3992,6 +4060,38 @@ func (fake *FakeLocalParticipant) OnTrackUpdatedArgsForCall(i int) func(types.Lo return argsForCall.arg1 } +func (fake *FakeLocalParticipant) OnTrafficLoad(arg1 func(trafficLoad *types.TrafficLoad)) { + fake.onTrafficLoadMutex.Lock() + fake.onTrafficLoadArgsForCall = append(fake.onTrafficLoadArgsForCall, struct { + arg1 func(trafficLoad *types.TrafficLoad) + }{arg1}) + stub := fake.OnTrafficLoadStub + fake.recordInvocation("OnTrafficLoad", []interface{}{arg1}) + fake.onTrafficLoadMutex.Unlock() + if stub != nil { + fake.OnTrafficLoadStub(arg1) + } +} + +func (fake *FakeLocalParticipant) OnTrafficLoadCallCount() int { + fake.onTrafficLoadMutex.RLock() + defer fake.onTrafficLoadMutex.RUnlock() + return len(fake.onTrafficLoadArgsForCall) +} + +func (fake *FakeLocalParticipant) OnTrafficLoadCalls(stub func(func(trafficLoad *types.TrafficLoad))) { + fake.onTrafficLoadMutex.Lock() + defer fake.onTrafficLoadMutex.Unlock() + fake.OnTrafficLoadStub = stub +} + +func (fake *FakeLocalParticipant) OnTrafficLoadArgsForCall(i int) func(trafficLoad *types.TrafficLoad) { + fake.onTrafficLoadMutex.RLock() + defer fake.onTrafficLoadMutex.RUnlock() + argsForCall := fake.onTrafficLoadArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) ProtocolVersion() types.ProtocolVersion { fake.protocolVersionMutex.Lock() ret, specificReturn := fake.protocolVersionReturnsOnCall[len(fake.protocolVersionArgsForCall)] @@ -6074,6 +6174,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.getSubscribedParticipantsMutex.RUnlock() fake.getSubscribedTracksMutex.RLock() defer fake.getSubscribedTracksMutex.RUnlock() + fake.getTrafficLoadMutex.RLock() + defer fake.getTrafficLoadMutex.RUnlock() fake.getTrailerMutex.RLock() defer fake.getTrailerMutex.RUnlock() fake.handleAnswerMutex.RLock() @@ -6142,6 +6244,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.onTrackUnpublishedMutex.RUnlock() fake.onTrackUpdatedMutex.RLock() defer fake.onTrackUpdatedMutex.RUnlock() + fake.onTrafficLoadMutex.RLock() + defer fake.onTrafficLoadMutex.RUnlock() fake.protocolVersionMutex.RLock() defer fake.protocolVersionMutex.RUnlock() fake.removePublishedTrackMutex.RLock() diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 593a62487..8df479ce7 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -323,3 +323,10 @@ func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, re } return 0, errors.New("receiver not available") } + +func (d *DummyReceiver) GetTrackStats() *livekit.RTPStats { + if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { + return r.GetTrackStats() + } + return nil +} diff --git a/pkg/service/ioservice_sip.go b/pkg/service/ioservice_sip.go index d427c5e2b..3f0b34042 100644 --- a/pkg/service/ioservice_sip.go +++ b/pkg/service/ioservice_sip.go @@ -322,7 +322,7 @@ func (s *IOInfoService) GetSIPTrunkAuthentication(ctx context.Context, req *rpc. return nil, err } return &rpc.GetSIPTrunkAuthenticationResponse{ - Username: trunk.Username, - Password: trunk.Password, + Username: trunk.InboundUsername, + Password: trunk.InboundPassword, }, nil } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index d26eec0d5..a6fa679f3 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -290,11 +290,10 @@ func (r *RoomManager) StartSession( return errors.New("could not restart closed participant") } - logger.Infow("resuming RTC session", - "room", roomName, + participant.GetLogger().Infow("resuming RTC session", "nodeID", r.currentNode.Id, - "participant", pi.Identity, "reason", pi.ReconnectReason, + "numParticipants", room.GetParticipantCount(), ) iceConfig := r.getIceConfig(participant) if iceConfig == nil { @@ -340,12 +339,11 @@ func (r *RoomManager) StartSession( "room", roomName, "nodeID", r.currentNode.Id, "participant", pi.Identity, - "sdk", pi.Client.Sdk, - "sdkVersion", pi.Client.Version, - "protocol", pi.Client.Protocol, + "clientInfo", logger.Proto(pi.Client), "reconnect", pi.Reconnect, "reconnectReason", pi.ReconnectReason, "adaptiveStream", pi.AdaptiveStream, + "numParticipants", room.GetParticipantCount(), ) clientConf := r.clientConfManager.GetConfiguration(pi.Client) diff --git a/pkg/service/sip.go b/pkg/service/sip.go index 4a7fe411a..c69540eed 100644 --- a/pkg/service/sip.go +++ b/pkg/service/sip.go @@ -16,14 +16,15 @@ package service import ( "context" - "fmt" - "github.com/livekit/livekit-server/pkg/config" - "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" "github.com/livekit/psrpc" + + "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/telemetry" ) type SIPService struct { @@ -65,8 +66,10 @@ func (s *SIPService) CreateSIPTrunk(ctx context.Context, req *livekit.CreateSIPT OutboundAddress: req.OutboundAddress, OutboundNumber: req.OutboundNumber, InboundNumbersRegex: req.InboundNumbersRegex, - Username: req.Username, - Password: req.Password, + InboundUsername: req.InboundUsername, + InboundPassword: req.InboundPassword, + OutboundUsername: req.OutboundUsername, + OutboundPassword: req.OutboundPassword, } if err := s.store.StoreSIPTrunk(ctx, info); err != nil { @@ -159,15 +162,44 @@ func (s *SIPService) CreateSIPParticipant(ctx context.Context, req *livekit.Crea } info := &livekit.SIPParticipantInfo{ - SipParticipantId: utils.NewGuid(utils.SIPParticipantPrefix), + SipParticipantId: utils.NewGuid(utils.SIPParticipantPrefix), + SipTrunkId: req.SipTrunkId, + SipCallTo: req.SipCallTo, + RoomName: req.RoomName, + ParticipantIdentity: req.ParticipantIdentity, } if err := s.store.StoreSIPParticipant(ctx, info); err != nil { return nil, err } + s.updateParticipant(ctx, info) return info, nil } +func (s *SIPService) updateParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) { + AppendLogFields(ctx, "participantId", info.SipParticipantId, "room", info.RoomName, "trunk", info.SipTrunkId, "to", info.SipCallTo) + req := &rpc.InternalUpdateSIPParticipantRequest{ + ParticipantId: info.SipParticipantId, + CallTo: info.SipCallTo, + RoomName: info.RoomName, + ParticipantIdentity: info.ParticipantIdentity, + } + if info.SipTrunkId != "" { + trunk, err := s.store.LoadSIPTrunk(ctx, info.SipTrunkId) + if err != nil { + logger.Errorw("cannot get trunk to update sip participant", err) + return + } + req.Address = trunk.OutboundAddress + req.Number = trunk.OutboundNumber + req.Username = trunk.OutboundUsername + req.Password = trunk.OutboundPassword + } + if _, err := s.psrpcClient.UpdateSIPParticipant(ctx, req); err != nil { + logger.Errorw("cannot update sip participant", err) + } +} + func (s *SIPService) ListSIPParticipant(ctx context.Context, req *livekit.ListSIPParticipantRequest) (*livekit.ListSIPParticipantResponse, error) { if s.store == nil { return nil, ErrSIPNotConnected @@ -194,7 +226,10 @@ func (s *SIPService) DeleteSIPParticipant(ctx context.Context, req *livekit.Dele if err = s.store.DeleteSIPParticipant(ctx, info); err != nil { return nil, err } - + // These indicate that the call should be disconnected + info.SipTrunkId = "" + info.SipCallTo = "" + s.updateParticipant(ctx, info) return info, nil } @@ -202,6 +237,13 @@ func (s *SIPService) SendSIPParticipantDTMF(ctx context.Context, req *livekit.Se if s.store == nil { return nil, ErrSIPNotConnected } - - return nil, fmt.Errorf("TODO") + AppendLogFields(ctx, "participantId", req.SipParticipantId) + _, err := s.psrpcClient.SendSIPParticipantDTMF(ctx, &rpc.InternalSendSIPParticipantDTMFRequest{ + ParticipantId: req.SipParticipantId, + Digits: req.Digits, + }) + if err != nil { + logger.Errorw("cannot send dtmf to sip participant", err) + } + return nil, err } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 0f0eb6e87..9c9a741e5 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -60,7 +60,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - egressClient, err := rpc.NewEgressClient(messageBus) + clientParams := getPSRPCClientParams(psrpcConfig, messageBus) + egressClient, err := rpc.NewEgressClient(clientParams) if err != nil { return nil, err } @@ -83,7 +84,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService) topicFormatter := rpc.NewTopicFormatter() - clientParams := getPSRPCClientParams(psrpcConfig, messageBus) roomClient, err := rpc.NewTypedRoomClient(clientParams) if err != nil { return nil, err @@ -98,7 +98,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } egressService := NewEgressService(egressClient, rtcEgressLauncher, objectStore, ioInfoService, roomService) ingressConfig := getIngressConfig(conf) - ingressClient, err := rpc.NewIngressClient(messageBus) + ingressClient, err := rpc.NewIngressClient(clientParams) if err != nil { return nil, err } diff --git a/pkg/sfu/buffer/dependencydescriptorparser.go b/pkg/sfu/buffer/dependencydescriptorparser.go index 8cc648b7f..5b2744716 100644 --- a/pkg/sfu/buffer/dependencydescriptorparser.go +++ b/pkg/sfu/buffer/dependencydescriptorparser.go @@ -47,7 +47,6 @@ type DependencyDescriptorParser struct { } func NewDependencyDescriptorParser(ddExtID uint8, logger logger.Logger, onMaxLayerChanged func(int32, int32)) *DependencyDescriptorParser { - logger.Infow("creating dependency descriptor parser", "ddExtID", ddExtID) return &DependencyDescriptorParser{ ddExtID: ddExtID, logger: logger, @@ -86,7 +85,7 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr _, err := ext.Unmarshal(ddBuf) if err != nil { if err != dd.ErrDDReaderNoStructure { - r.logger.Warnw("failed to parse generic dependency descriptor", err, "payload", pkt.PayloadType, "ddbufLen", len(ddBuf)) + r.logger.Infow("failed to parse generic dependency descriptor", err, "payload", pkt.PayloadType, "ddbufLen", len(ddBuf)) } return nil, videoLayer, err } @@ -119,7 +118,7 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr } if r.structure == nil || ddVal.AttachedStructure.StructureId != r.structure.StructureId { - r.logger.Infow("structure updated", "structureID", ddVal.AttachedStructure.StructureId, "extSeq", extSeq, "extFN", extFN, "descriptor", ddVal.String()) + r.logger.Debugw("structure updated", "structureID", ddVal.AttachedStructure.StructureId, "extSeq", extSeq, "extFN", extFN, "descriptor", ddVal.String()) } r.structure = ddVal.AttachedStructure r.decodeTargets = ProcessFrameDependencyStructure(ddVal.AttachedStructure) diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 3dac1c4af..e2c406317 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -24,6 +24,7 @@ import ( "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" ) const ( @@ -758,7 +759,7 @@ func (r *rtpStatsBase) toProto( } if gapsPresent { - p.GapHistogram = make(map[int32]uint32, cGapHistogramNumBins) + p.GapHistogram = make(map[int32]uint32, len(r.gapHistogram)) for i := 0; i < len(r.gapHistogram); i++ { if r.gapHistogram[i] == 0 { continue @@ -915,172 +916,7 @@ func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snaps // ---------------------------------- func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { - if len(statsList) == 0 { - return nil - } - - startTime := time.Time{} - endTime := time.Time{} - - packets := uint32(0) - bytes := uint64(0) - headerBytes := uint64(0) - packetsLost := uint32(0) - packetsDuplicate := uint32(0) - bytesDuplicate := uint64(0) - headerBytesDuplicate := uint64(0) - packetsPadding := uint32(0) - bytesPadding := uint64(0) - headerBytesPadding := uint64(0) - packetsOutOfOrder := uint32(0) - frames := uint32(0) - keyFrames := uint32(0) - lastKeyFrame := time.Time{} - jitter := 0.0 - maxJitter := float64(0) - gapHistogram := make(map[int32]uint32, cGapHistogramNumBins) - nacks := uint32(0) - nackAcks := uint32(0) - nackMisses := uint32(0) - nackRepeated := uint32(0) - plis := uint32(0) - lastPli := time.Time{} - layerLockPlis := uint32(0) - lastLayerLockPli := time.Time{} - firs := uint32(0) - lastFir := time.Time{} - rtt := uint32(0) - maxRtt := uint32(0) - - for _, stats := range statsList { - if startTime.IsZero() || startTime.After(stats.StartTime.AsTime()) { - startTime = stats.StartTime.AsTime() - } - - if endTime.IsZero() || endTime.Before(stats.EndTime.AsTime()) { - endTime = stats.EndTime.AsTime() - } - - packets += stats.Packets - bytes += stats.Bytes - headerBytes += stats.HeaderBytes - - packetsLost += stats.PacketsLost - - packetsDuplicate += stats.PacketsDuplicate - bytesDuplicate += stats.BytesDuplicate - headerBytesDuplicate += stats.HeaderBytesDuplicate - - packetsPadding += stats.PacketsPadding - bytesPadding += stats.BytesPadding - headerBytesPadding += stats.HeaderBytesPadding - - packetsOutOfOrder += stats.PacketsOutOfOrder - - frames += stats.Frames - - keyFrames += stats.KeyFrames - if lastKeyFrame.IsZero() || lastKeyFrame.Before(stats.LastKeyFrame.AsTime()) { - lastKeyFrame = stats.LastKeyFrame.AsTime() - } - - jitter += stats.JitterCurrent - if stats.JitterMax > maxJitter { - maxJitter = stats.JitterMax - } - - for burst, count := range stats.GapHistogram { - gapHistogram[burst] += count - } - - nacks += stats.Nacks - nackAcks += stats.NackAcks - nackMisses += stats.NackMisses - nackRepeated += stats.NackRepeated - - plis += stats.Plis - if lastPli.IsZero() || lastPli.Before(stats.LastPli.AsTime()) { - lastPli = stats.LastPli.AsTime() - } - - layerLockPlis += stats.LayerLockPlis - if lastLayerLockPli.IsZero() || lastLayerLockPli.Before(stats.LastLayerLockPli.AsTime()) { - lastLayerLockPli = stats.LastLayerLockPli.AsTime() - } - - firs += stats.Firs - if lastFir.IsZero() || lastPli.Before(stats.LastFir.AsTime()) { - lastFir = stats.LastFir.AsTime() - } - - rtt += stats.RttCurrent - if stats.RttMax > maxRtt { - maxRtt = stats.RttMax - } - } - - if endTime.IsZero() { - endTime = time.Now() - } - elapsed := endTime.Sub(startTime).Seconds() - - packetLostRate := float64(packetsLost) / elapsed - packetLostPercentage := float32(packetsLost) / (float32(packets) + float32(packetsLost)) * 100.0 - - packetRate := float64(packets) / elapsed - packetDuplicateRate := float64(packetsDuplicate) / elapsed - packetPaddingRate := float64(packetsPadding) / elapsed - - bitrate := float64(bytes) * 8.0 / elapsed - bitrateDuplicate := float64(bytesDuplicate) * 8.0 / elapsed - bitratePadding := float64(bytesPadding) * 8.0 / elapsed - - frameRate := float64(frames) / elapsed - - return &livekit.RTPStats{ - StartTime: timestamppb.New(startTime), - EndTime: timestamppb.New(endTime), - Duration: elapsed, - Packets: packets, - PacketRate: packetRate, - Bytes: bytes, - HeaderBytes: headerBytes, - Bitrate: bitrate, - PacketsLost: packetsLost, - PacketLossRate: packetLostRate, - PacketLossPercentage: packetLostPercentage, - PacketsDuplicate: packetsDuplicate, - PacketDuplicateRate: packetDuplicateRate, - BytesDuplicate: bytesDuplicate, - HeaderBytesDuplicate: headerBytesDuplicate, - BitrateDuplicate: bitrateDuplicate, - PacketsPadding: packetsPadding, - PacketPaddingRate: packetPaddingRate, - BytesPadding: bytesPadding, - HeaderBytesPadding: headerBytesPadding, - BitratePadding: bitratePadding, - PacketsOutOfOrder: packetsOutOfOrder, - Frames: frames, - FrameRate: frameRate, - KeyFrames: keyFrames, - LastKeyFrame: timestamppb.New(lastKeyFrame), - JitterCurrent: jitter / float64(len(statsList)), - JitterMax: maxJitter, - GapHistogram: gapHistogram, - Nacks: nacks, - NackAcks: nackAcks, - NackMisses: nackMisses, - NackRepeated: nackRepeated, - Plis: plis, - LastPli: timestamppb.New(lastPli), - LayerLockPlis: layerLockPlis, - LastLayerLockPli: timestamppb.New(lastLayerLockPli), - Firs: firs, - LastFir: timestamppb.New(lastFir), - RttCurrent: rtt / uint32(len(statsList)), - RttMax: maxRtt, - // no aggregation for drift calculations - } + return utils.AggregateRTPStats(statsList, cGapHistogramNumBins) } func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index fd2e9dd58..1cb0ff1bd 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -544,6 +544,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt "extLastRRSN", s.extLastRRSN, "firstTime", r.firstTime.String(), "startTime", r.startTime.String(), + "highestTime", r.highestTime.String(), "extReceivedRRSN", extReceivedRRSN, "packetsInInterval", extReceivedRRSN-s.extLastRRSN, "intervalStats", is.ToString(), diff --git a/pkg/sfu/buffer/videolayerutils.go b/pkg/sfu/buffer/videolayerutils.go index b18c83d84..9028283fc 100644 --- a/pkg/sfu/buffer/videolayerutils.go +++ b/pkg/sfu/buffer/videolayerutils.go @@ -32,7 +32,12 @@ func LayerPresenceFromTrackInfo(trackInfo *livekit.TrackInfo) *[livekit.VideoQua var layerPresence [livekit.VideoQuality_HIGH + 1]bool for _, layer := range trackInfo.Layers { - layerPresence[layer.Quality] = true + // WARNING: comparing protobuf enum + if layer.Quality <= livekit.VideoQuality_HIGH { + layerPresence[layer.Quality] = true + } else { + logger.Warnw("unexpected quality in track info", nil, "trackInfo", logger.Proto(trackInfo)) + } } return &layerPresence diff --git a/pkg/sfu/connectionquality/connectionstats_test.go b/pkg/sfu/connectionquality/connectionstats_test.go index 73c9c649b..7b07ba8bc 100644 --- a/pkg/sfu/connectionquality/connectionstats_test.go +++ b/pkg/sfu/connectionquality/connectionstats_test.go @@ -259,7 +259,7 @@ func TestConnectionQuality(t *testing.T) { require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) - // next update with no packets should knock quality down + // next update with no packets should knock quality down to LOST now = now.Add(duration) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { @@ -273,12 +273,36 @@ func TestConnectionQuality(t *testing.T) { cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(2.1), mos) - require.Equal(t, livekit.ConnectionQuality_POOR, quality) + require.Equal(t, livekit.ConnectionQuality_LOST, quality) - // mute/unmute to bring quality back up + // mute when LOST should not bump up score/quality now = now.Add(duration) cs.UpdateMuteAt(true, now.Add(1*time.Second)) + mos, quality = cs.GetScoreAndQuality() + require.Greater(t, float32(2.1), mos) + require.Equal(t, livekit.ConnectionQuality_LOST, quality) + + // unmute and send packets to bring quality back up + now = now.Add(duration) cs.UpdateMuteAt(false, now.Add(2*time.Second)) + for i := 0; i < 3; i++ { + trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ + 1: { + RTPStats: &buffer.RTPDeltaInfo{ + StartTime: now, + Duration: duration, + Packets: 250, + PacketsLost: 0, + }, + }, + }) + cs.updateScoreAt(now.Add(duration)) + now = now.Add(duration) + } + cs.updateScoreAt(now.Add(duration)) + mos, quality = cs.GetScoreAndQuality() + require.Greater(t, float32(4.6), mos) + require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) // with lesser number of packet (simulating DTX). // even higher loss (like 10%) should not knock down quality due to quadratic weighting of packet loss ratio diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index 340f32368..ffc3961da 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -29,9 +29,9 @@ const ( MaxMOS = float32(4.5) MinMOS = float32(1.0) - maxScore = float64(100.0) - poorScore = float64(30.0) - minScore = float64(20.0) + cMaxScore = float64(100.0) + cMinScore = float64(20.0) + cPausedPoorScore = float64(30.0) increaseFactor = float64(0.4) // slower increase, i. e. when score is recovering move up slower -> conservative decreaseFactor = float64(0.7) // faster decrease, i. e. when score is dropping move down faster -> aggressive to be responsive to quality drops @@ -41,6 +41,14 @@ const ( unmuteTimeThreshold = float64(0.5) ) +var ( + qualityTransitionScore = map[livekit.ConnectionQuality]float64{ + livekit.ConnectionQuality_GOOD: 80, + livekit.ConnectionQuality_POOR: 40, + livekit.ConnectionQuality_LOST: 20, + } +) + // ------------------------------------------ type windowStat struct { @@ -107,7 +115,7 @@ func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJ } lossEffect *= plw - score := maxScore - delayEffect - lossEffect + score := cMaxScore - delayEffect - lossEffect if score < 0.0 { score = 0.0 } @@ -118,7 +126,7 @@ func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJ func (w *windowStat) calculateBitrateScore(expectedBitrate int64) float64 { if expectedBitrate == 0 { // unsupported mode OR all layers stopped - return maxScore + return cMaxScore } var score float64 @@ -126,9 +134,9 @@ func (w *windowStat) calculateBitrateScore(expectedBitrate int64) float64 { // using the ratio of expectedBitrate / actualBitrate // the quality inflection points are approximately // GOOD at ~2.7x, POOR at ~20.1x - score = maxScore - 20*math.Log(float64(expectedBitrate)/float64(w.bytes*8)) - if score > maxScore { - score = maxScore + score = cMaxScore - 20*math.Log(float64(expectedBitrate)/float64(w.bytes*8)) + if score > cMaxScore { + score = cMaxScore } if score < 0.0 { score = 0.0 @@ -188,7 +196,7 @@ type qualityScorer struct { func newQualityScorer(params qualityScorerParams) *qualityScorer { return &qualityScorer{ params: params, - score: maxScore, + score: cMaxScore, aggregateBitrate: utils.NewTimedAggregator[int64](utils.TimedAggregatorParams{ CapNegativeValues: true, }), @@ -219,7 +227,10 @@ func (q *qualityScorer) Start() { func (q *qualityScorer) updateMuteAtLocked(isMuted bool, at time.Time) { if isMuted { q.mutedAt = at - q.score = maxScore + // muting when LOST should not push quality to EXCELLENT + if q.score != qualityTransitionScore[livekit.ConnectionQuality_LOST] { + q.score = cMaxScore + } } else { q.unmutedAt = at } @@ -264,7 +275,7 @@ func (q *qualityScorer) updateLayerMuteAtLocked(isMuted bool, at time.Time) { q.layerDistance.Reset() q.layerMutedAt = at - q.score = maxScore + q.score = cMaxScore } } else { if q.isLayerMuted() { @@ -294,7 +305,7 @@ func (q *qualityScorer) updatePauseAtLocked(isPaused bool, at time.Time) { q.layerDistance.Reset() q.pausedAt = at - q.score = poorScore + q.score = cPausedPoorScore } } else { if q.isPaused() { @@ -353,7 +364,7 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { // considered (as long as enough time has passed since unmute). // // Similarly, when paused (possibly due to congestion), score is immediately - // set to poorScore for responsiveness. The layer transision is reest. + // set to cPausedPoorScore for responsiveness. The layer transision is reest. // On a resume, quality climbs back up using normal operation. if q.isMuted() || !q.isUnmutedEnough(at) || q.isLayerMuted() || q.isPaused() { q.lastUpdateAt = at @@ -365,11 +376,11 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { var score float64 if stat.packetsExpected == 0 { reason = "dry" - score = poorScore + score = qualityTransitionScore[livekit.ConnectionQuality_LOST] } else { packetScore := stat.calculatePacketScore(plw, q.params.IncludeRTT, q.params.IncludeJitter) bitrateScore := stat.calculateBitrateScore(expectedBitrate) - layerScore := math.Max(math.Min(maxScore, maxScore-(expectedDistance*distanceWeight)), 0.0) + layerScore := math.Max(math.Min(cMaxScore, cMaxScore-(expectedDistance*distanceWeight)), 0.0) minScore := math.Min(packetScore, bitrateScore) minScore = math.Min(minScore, layerScore) @@ -394,22 +405,23 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { } score = factor*score + (1.0-factor)*q.score } - if score < minScore { + if score < cMinScore { // lower bound to prevent score from becoming very small values due to extreme conditions. // Without a lower bound, it can get so low that it takes a long time to climb back to // better quality even under excellent conditions. - score = minScore + score = cMinScore } - // WARNING NOTE: comparing protobuf enum values directly (livekit.ConnectionQuality) - if scoreToConnectionQuality(q.score) > scoreToConnectionQuality(score) { + prevCQ := scoreToConnectionQuality(q.score) + currCQ := scoreToConnectionQuality(score) + if utils.IsConnectionQualityLower(prevCQ, currCQ) { q.params.Logger.Infow( "quality drop", "reason", reason, "prevScore", q.score, - "prevQuality", scoreToConnectionQuality(q.score), + "prevQuality", prevCQ, "prevStat", &q.stat, "score", score, - "quality", scoreToConnectionQuality(score), + "quality", currCQ, "stat", stat, "packetLossWeight", plw, "maxPPS", q.maxPPS, @@ -523,15 +535,19 @@ func scoreToConnectionQuality(score float64) livekit.ConnectionQuality { // that a score of 60 does not correspond to `POOR` quality. Repair // mechanisms and use of algorithms like de-jittering makes the experience // better even under harsh conditions. - if score > 80.0 { + if score > qualityTransitionScore[livekit.ConnectionQuality_GOOD] { return livekit.ConnectionQuality_EXCELLENT } - if score > 40.0 { + if score > qualityTransitionScore[livekit.ConnectionQuality_POOR] { return livekit.ConnectionQuality_GOOD } - return livekit.ConnectionQuality_POOR + if score > qualityTransitionScore[livekit.ConnectionQuality_LOST] { + return livekit.ConnectionQuality_POOR + } + + return livekit.ConnectionQuality_LOST } // ------------------------------------------ diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index d07a50615..9afff8e98 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -82,6 +82,8 @@ type TrackReceiver interface { GetCalculatedClockRate(layer int32) uint32 GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) + + GetTrackStats() *livekit.RTPStats } // WebRTCReceiver receives a media track @@ -558,7 +560,7 @@ func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats { w.bufferMu.RLock() defer w.bufferMu.RUnlock() - var stats []*livekit.RTPStats + stats := make([]*livekit.RTPStats, 0, len(w.buffers)) for _, buff := range w.buffers { if buff == nil { continue diff --git a/pkg/sfu/videolayerselector/dependencydescriptor.go b/pkg/sfu/videolayerselector/dependencydescriptor.go index 3c091160e..9adc54b87 100644 --- a/pkg/sfu/videolayerselector/dependencydescriptor.go +++ b/pkg/sfu/videolayerselector/dependencydescriptor.go @@ -111,8 +111,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r } if ddwdt.StructureUpdated { - // TODO-REMOVE: remove this log after stable - d.logger.Infow("update dependency structure", + d.logger.Debugw("update dependency structure", "structureID", dd.AttachedStructure.StructureId, "structure", dd.AttachedStructure, "decodeTargets", ddwdt.DecodeTargets, @@ -127,8 +126,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r if ddwdt.ExtKeyFrameNum != d.extKeyFrameNum { // keyframe mismatch, drop and reset chains - // TODO-REMOVE: remove this log after stable - d.logger.Infow("drop packet for keyframe mismatch", "incoming", incomingLayer, "efn", extFrameNum, "sn", extPkt.Packet.SequenceNumber, "requiredKeyFrame", ddwdt.ExtKeyFrameNum, "structureKeyFrame", d.extKeyFrameNum) + d.logger.Debugw("drop packet for keyframe mismatch", "incoming", incomingLayer, "efn", extFrameNum, "sn", extPkt.Packet.SequenceNumber, "requiredKeyFrame", ddwdt.ExtKeyFrameNum, "structureKeyFrame", d.extKeyFrameNum) d.decisions.AddDropped(extFrameNum) d.invalidateKeyFrame() return @@ -138,9 +136,8 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r d.updateActiveDecodeTargets(*dd.ActiveDecodeTargetsBitmask) } - // TODO-REMOVE: remove this log after stable if len(fd.ChainDiffs) != len(d.chains) { - d.logger.Warnw("frame chain diff length mismatch", nil, + d.logger.Debugw("frame chain diff length mismatch", nil, "incoming", incomingLayer, "efn", extFrameNum, "sn", extPkt.Packet.SequenceNumber, @@ -255,7 +252,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r result.IsSwitching = true if !d.currentLayer.IsValid() { result.IsResuming = true - d.logger.Infow( + d.logger.Debugw( "resuming at layer", "current", incomingLayer, "target", d.targetLayer, diff --git a/pkg/telemetry/signalanddatastats.go b/pkg/telemetry/signalanddatastats.go index fb17e61e7..5ed99ebb0 100644 --- a/pkg/telemetry/signalanddatastats.go +++ b/pkg/telemetry/signalanddatastats.go @@ -32,13 +32,27 @@ const ( BytesTrackTypeSignal BytesTrackType = "SG" ) +// ------------------------------- + +type TrafficTotals struct { + At time.Time + SendBytes uint64 + SendMessages uint32 + RecvBytes uint64 + RecvMessages uint32 +} + +// -------------------------------- + // stats for signal and data channel type BytesTrackStats struct { - trackID livekit.TrackID - pID livekit.ParticipantID - send, recv atomic.Uint64 - telemetry TelemetryService - isStopped atomic.Bool + trackID livekit.TrackID + pID livekit.ParticipantID + send, recv atomic.Uint64 + totalSendBytes, totalRecvBytes atomic.Uint64 + totalSendMessages, totalRecvMessages atomic.Uint32 + telemetry TelemetryService + isStopped atomic.Bool } func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, telemetry TelemetryService) *BytesTrackStats { @@ -54,8 +68,22 @@ func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, tele func (s *BytesTrackStats) AddBytes(bytes uint64, isSend bool) { if isSend { s.send.Add(bytes) + s.totalSendBytes.Add(bytes) + s.totalSendMessages.Inc() } else { s.recv.Add(bytes) + s.totalRecvBytes.Add(bytes) + s.totalRecvMessages.Inc() + } +} + +func (s *BytesTrackStats) GetTrafficTotals() *TrafficTotals { + return &TrafficTotals{ + At: time.Now(), + SendBytes: s.totalSendBytes.Load(), + SendMessages: s.totalSendMessages.Load(), + RecvBytes: s.totalRecvBytes.Load(), + RecvMessages: s.totalRecvMessages.Load(), } }