Merge remote-tracking branch 'origin/master' into raja_1833

This commit is contained in:
boks1971
2023-11-30 14:32:29 +05:30
27 changed files with 805 additions and 295 deletions
+9 -9
View File
@@ -17,8 +17,8 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e
github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7
github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb
github.com/livekit/protocol v1.9.3-0.20231129173544-1c3f5fe919b0
github.com/livekit/psrpc v0.5.2
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
@@ -46,7 +46,7 @@ require (
github.com/urfave/cli/v2 v2.25.7
github.com/urfave/negroni/v3 v3.0.0
go.uber.org/atomic v1.11.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
golang.org/x/exp v0.0.0-20231127185646-65229373498e
golang.org/x/sync v0.5.0
google.golang.org/protobuf v1.31.0
gopkg.in/yaml.v3 v3.0.1
@@ -70,7 +70,7 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.5 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/klauspost/compress v1.17.3 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/lithammer/shortuuid/v4 v4.0.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
@@ -95,13 +95,13 @@ require (
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
golang.org/x/tools v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect
google.golang.org/grpc v1.59.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+18 -18
View File
@@ -107,8 +107,8 @@ github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786 h1:N527AHMa79
github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786/go.mod h1:v4hqbTdfQngbVSZJVWUhGE/lbTFf9jb+ygmNUDQMuOs=
github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw=
github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA=
github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -123,10 +123,10 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 h1:M/ljEz6MCH5lovoTT0t6hyaaZJEn4hvXs9J9OtQ+gS4=
github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7/go.mod h1:JgFHHd99wgEp4smATlJupOdA7iJHFoj2g3RFeM/Hk8M=
github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb h1:KiGg4k+kYQD9NjKixaSDMMeYOO2//XBM4IROTI1Itjo=
github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc=
github.com/livekit/protocol v1.9.3-0.20231129173544-1c3f5fe919b0 h1:AhJlQejQ+Ma9Q+EPqCNt2S7h6ETJXDiO7qsQdTq9VvM=
github.com/livekit/protocol v1.9.3-0.20231129173544-1c3f5fe919b0/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes=
github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U=
github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
@@ -291,10 +291,10 @@ golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No=
golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
@@ -327,8 +327,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ=
golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg=
golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -378,8 +378,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@@ -407,14 +407,14 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.15.0 h1:zdAyfUGbYmuVokhzVmghFl2ZJh5QhcfebBgmVPFYA+8=
golang.org/x/tools v0.15.0/go.mod h1:hpksKq4dtpQWS1uQ61JkdqWM3LscIS6Slf+VVkm+wQk=
golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM=
golang.org/x/tools v0.16.0/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+16
View File
@@ -828,4 +828,20 @@ func (t *MediaTrackReceiver) IsEncrypted() bool {
return t.trackInfo.Encryption != livekit.Encryption_NONE
}
func (t *MediaTrackReceiver) GetTrackStats() *livekit.RTPStats {
t.lock.Lock()
receivers := t.receiversShadow
t.lock.Unlock()
stats := make([]*livekit.RTPStats, 0, len(receivers))
for _, receiver := range receivers {
receiverStats := receiver.GetTrackStats()
if receiverStats != nil {
stats = append(stats, receiverStats)
}
}
return buffer.AggregateRTPStats(stats)
}
// ---------------------------
+45 -23
View File
@@ -117,6 +117,7 @@ type ParticipantParams struct {
AllowUDPUnstableFallback bool
TURNSEnabled bool
GetParticipantInfo func(pID livekit.ParticipantID) *livekit.ParticipantInfo
DisableSupervisor bool
ReconnectOnPublicationError bool
ReconnectOnSubscriptionError bool
ReconnectOnDataChannelError bool
@@ -167,6 +168,7 @@ type ParticipantImpl struct {
*TransportManager
*UpTrackManager
*SubscriptionManager
*ParticipantTrafficLoad
// keeps track of unpublished tracks in order to reuse trackID
unpublishedTracks []*livekit.TrackInfo
@@ -240,11 +242,13 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, params.SID),
params.SID,
params.Telemetry),
supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}),
tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality),
pubLogger: params.Logger.WithComponent(sutils.ComponentPub),
subLogger: params.Logger.WithComponent(sutils.ComponentSub),
}
if !params.DisableSupervisor {
p.supervisor = supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger})
}
p.version.Store(params.InitialVersion)
p.timedVersion.Update(params.VersionGenerator.New())
p.migrateState.Store(types.MigrateStateInit)
@@ -254,7 +258,9 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
p.SetResponseSink(params.Sink)
p.setupEnabledCodecs(params.PublishEnabledCodecs, params.SubscribeEnabledCodecs, params.ClientConf.GetDisabledCodecs())
p.supervisor.OnPublicationError(p.onPublicationError)
if p.supervisor != nil {
p.supervisor.OnPublicationError(p.onPublicationError)
}
var err error
// keep last participants and when updates were sent
@@ -269,6 +275,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
p.setupUpTrackManager()
p.setupSubscriptionManager()
p.setupParticipantTrafficLoad()
return p, nil
}
@@ -710,8 +717,10 @@ func (p *ParticipantImpl) SetMigrateInfo(
for _, t := range mediaTracks {
ti := t.GetTrack()
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
if p.supervisor != nil {
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
}
p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}, migrated: true}
p.pubLogger.Infow("pending track added (migration)", "trackID", ti.Sid, "track", logger.Proto(ti))
@@ -754,7 +763,9 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
})
}
p.supervisor.Stop()
if p.supervisor != nil {
p.supervisor.Stop()
}
p.pendingTracksLock.Lock()
p.pendingTracks = make(map[string]*pendingTrackInfo)
@@ -784,6 +795,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
go func() {
p.SubscriptionManager.Close(isExpectedToResume)
p.TransportManager.Close()
p.ParticipantTrafficLoad.Close()
}()
p.dataChannelStats.Stop()
@@ -915,10 +927,6 @@ func (p *ParticipantImpl) OnICEConfigChanged(f func(participant types.LocalParti
p.lock.Unlock()
}
//
// signal connection methods
//
func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo {
numTracks := 0
minQuality := livekit.ConnectionQuality_EXCELLENT
@@ -932,8 +940,7 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo
numTracks++
score, quality := pt.(types.LocalMediaTrack).GetConnectionScoreAndQuality()
if quality < minQuality {
// WARNING NOTE: comparing protobuf enums directly
if utils.IsConnectionQualityLower(minQuality, quality) {
minQuality = quality
minScore = score
} else if quality == minQuality && score < minScore {
@@ -943,8 +950,7 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo
p.lock.Lock()
trackID := pt.ID()
if prevQuality, ok := p.tracksQuality[trackID]; ok {
// WARNING NOTE: comparing protobuf enums directly
if prevQuality > quality {
if utils.IsConnectionQualityLower(prevQuality, quality) {
numUpDrops++
}
}
@@ -959,8 +965,7 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo
numTracks++
score, quality := subTrack.DownTrack().GetConnectionScoreAndQuality()
if quality < minQuality {
// WARNING NOTE: comparing protobuf enums directly
if utils.IsConnectionQualityLower(minQuality, quality) {
minQuality = quality
minScore = score
} else if quality == minQuality && score < minScore {
@@ -970,8 +975,7 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo
p.lock.Lock()
trackID := subTrack.ID()
if prevQuality, ok := p.tracksQuality[trackID]; ok {
// WARNING NOTE: comparing protobuf enums directly
if prevQuality > quality {
if utils.IsConnectionQualityLower(prevQuality, quality) {
numDownDrops++
}
}
@@ -1227,6 +1231,14 @@ func (p *ParticipantImpl) setupSubscriptionManager() {
})
}
func (p *ParticipantImpl) setupParticipantTrafficLoad() {
p.ParticipantTrafficLoad = NewParticipantTrafficLoad(ParticipantTrafficLoadParams{
Participant: p,
DataChannelStats: p.dataChannelStats,
Logger: p.params.Logger,
})
}
func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {
oldState := p.State()
if state == oldState {
@@ -1390,7 +1402,9 @@ func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit.
}
func (p *ParticipantImpl) onPublisherInitialConnected() {
p.supervisor.SetPublisherPeerConnectionConnected(true)
if p.supervisor != nil {
p.supervisor.SetPublisherPeerConnectionConnected(true)
}
go p.publisherRTCPWorker()
}
@@ -1660,8 +1674,10 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
}
p.params.Telemetry.TrackPublishRequested(context.Background(), p.ID(), p.Identity(), ti)
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
if p.supervisor != nil {
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
}
if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil || p.pendingTracks[req.Cid] != nil {
if p.pendingTracks[req.Cid] == nil {
p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}}
@@ -1713,7 +1729,9 @@ func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fro
func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) *livekit.TrackInfo {
p.dirty.Store(true)
p.supervisor.SetPublicationMute(trackID, muted)
if p.supervisor != nil {
p.supervisor.SetPublicationMute(trackID, muted)
}
track := p.UpTrackManager.SetPublishedTrackMuted(trackID, muted)
var trackInfo *livekit.TrackInfo
@@ -1898,7 +1916,9 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
mt.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange)
// add to published and clean up pending
p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt)
if p.supervisor != nil {
p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt)
}
p.UpTrackManager.AddPublishedTrack(mt)
pti := p.pendingTracks[signalCid]
@@ -1919,7 +1939,9 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
trackID := livekit.TrackID(ti.Sid)
mt.AddOnClose(func() {
p.supervisor.ClearPublishedTrack(trackID, mt)
if p.supervisor != nil {
p.supervisor.ClearPublishedTrack(trackID, mt)
}
// not logged when closing
p.params.Telemetry.TrackUnpublished(
+211
View File
@@ -0,0 +1,211 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rtc
import (
"sync"
"time"
"github.com/frostbyte73/core"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
const (
reportInterval = 10 * time.Second
)
type ParticipantTrafficLoadParams struct {
Participant *ParticipantImpl
DataChannelStats *telemetry.BytesTrackStats
Logger logger.Logger
}
type ParticipantTrafficLoad struct {
params ParticipantTrafficLoadParams
lock sync.RWMutex
onTrafficLoad func(trafficLoad *types.TrafficLoad)
tracksStatsMedia map[livekit.TrackID]*livekit.RTPStats
dataChannelTraffic *telemetry.TrafficTotals
trafficLoad *types.TrafficLoad
closed core.Fuse
}
func NewParticipantTrafficLoad(params ParticipantTrafficLoadParams) *ParticipantTrafficLoad {
p := &ParticipantTrafficLoad{
params: params,
tracksStatsMedia: make(map[livekit.TrackID]*livekit.RTPStats),
closed: core.NewFuse(),
}
go p.reporter()
return p
}
func (p *ParticipantTrafficLoad) Close() {
p.closed.Break()
}
func (p *ParticipantTrafficLoad) OnTrafficLoad(f func(trafficLoad *types.TrafficLoad)) {
p.lock.Lock()
p.onTrafficLoad = f
p.lock.Unlock()
}
func (p *ParticipantTrafficLoad) getOnTrafficLoad() func(trafficLoad *types.TrafficLoad) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onTrafficLoad
}
func (p *ParticipantTrafficLoad) GetTrafficLoad() *types.TrafficLoad {
p.lock.RLock()
defer p.lock.RUnlock()
return p.trafficLoad
}
func (p *ParticipantTrafficLoad) updateTrafficLoad() *types.TrafficLoad {
publishedTracks := p.params.Participant.GetPublishedTracks()
subscribedTracks := p.params.Participant.SubscriptionManager.GetSubscribedTracks()
availableTracks := make(map[livekit.TrackID]bool, len(publishedTracks)+len(subscribedTracks))
upstreamAudioStats := make([]*types.TrafficStats, 0, len(publishedTracks))
upstreamVideoStats := make([]*types.TrafficStats, 0, len(publishedTracks))
downstreamAudioStats := make([]*types.TrafficStats, 0, len(subscribedTracks))
downstreamVideoStats := make([]*types.TrafficStats, 0, len(subscribedTracks))
p.lock.Lock()
defer p.lock.Unlock()
for _, pt := range publishedTracks {
lmt, ok := pt.(types.LocalMediaTrack)
if !ok {
continue
}
trackID := lmt.ID()
stats := lmt.GetTrackStats()
trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats)
if stats != nil {
p.tracksStatsMedia[trackID] = stats
availableTracks[trackID] = true
}
if trafficStats != nil {
switch lmt.Kind() {
case livekit.TrackType_AUDIO:
upstreamAudioStats = append(upstreamAudioStats, trafficStats)
case livekit.TrackType_VIDEO:
upstreamVideoStats = append(upstreamVideoStats, trafficStats)
}
}
}
for _, st := range subscribedTracks {
trackID := st.ID()
stats := st.DownTrack().GetTrackStats()
trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats)
if stats != nil {
p.tracksStatsMedia[trackID] = stats
availableTracks[trackID] = true
}
if trafficStats != nil {
switch st.MediaTrack().Kind() {
case livekit.TrackType_AUDIO:
downstreamAudioStats = append(downstreamAudioStats, trafficStats)
case livekit.TrackType_VIDEO:
downstreamVideoStats = append(downstreamVideoStats, trafficStats)
}
}
}
// remove unavailable tracks from track stats cache
for trackID := range p.tracksStatsMedia {
if !availableTracks[trackID] {
delete(p.tracksStatsMedia, trackID)
}
}
trafficTypeStats := make([]*types.TrafficTypeStats, 0, 6)
addTypeStats := func(statsList []*types.TrafficStats, trackType livekit.TrackType, streamType livekit.StreamType) {
agg := types.AggregateTrafficStats(statsList)
if agg != nil {
trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{
TrackType: trackType,
StreamType: streamType,
TrafficStats: agg,
})
}
}
addTypeStats(upstreamAudioStats, livekit.TrackType_AUDIO, livekit.StreamType_UPSTREAM)
addTypeStats(upstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_UPSTREAM)
addTypeStats(downstreamAudioStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM)
addTypeStats(downstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM)
if p.params.DataChannelStats != nil {
dataChannelTraffic := p.params.DataChannelStats.GetTrafficTotals()
if p.dataChannelTraffic != nil {
trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{
TrackType: livekit.TrackType_DATA,
StreamType: livekit.StreamType_UPSTREAM,
TrafficStats: &types.TrafficStats{
StartTime: p.dataChannelTraffic.At,
EndTime: dataChannelTraffic.At,
Packets: dataChannelTraffic.RecvMessages - p.dataChannelTraffic.RecvMessages,
Bytes: dataChannelTraffic.RecvBytes - p.dataChannelTraffic.RecvBytes,
},
})
trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{
TrackType: livekit.TrackType_DATA,
StreamType: livekit.StreamType_DOWNSTREAM,
TrafficStats: &types.TrafficStats{
StartTime: p.dataChannelTraffic.At,
EndTime: dataChannelTraffic.At,
Packets: dataChannelTraffic.SendMessages - p.dataChannelTraffic.SendMessages,
Bytes: dataChannelTraffic.SendBytes - p.dataChannelTraffic.SendBytes,
},
})
}
p.dataChannelTraffic = dataChannelTraffic
}
p.trafficLoad = &types.TrafficLoad{
TrafficTypeStats: trafficTypeStats,
}
return p.trafficLoad
}
func (p *ParticipantTrafficLoad) reporter() {
ticker := time.NewTicker(reportInterval)
defer ticker.Stop()
for {
select {
case <-p.closed.Watch():
return
case <-ticker.C:
trafficLoad := p.updateTrafficLoad()
if onTrafficLoad := p.getOnTrafficLoad(); onTrafficLoad != nil {
onTrafficLoad(trafficLoad)
}
}
}
}
+15 -8
View File
@@ -24,6 +24,7 @@ import (
"time"
"go.uber.org/atomic"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"
"github.com/pion/sctp"
@@ -219,17 +220,21 @@ func (r *Room) GetParticipantByID(participantID livekit.ParticipantID) types.Loc
func (r *Room) GetParticipants() []types.LocalParticipant {
r.lock.RLock()
defer r.lock.RUnlock()
participants := make([]types.LocalParticipant, 0, len(r.participants))
for _, p := range r.participants {
participants = append(participants, p)
}
return participants
return maps.Values(r.participants)
}
func (r *Room) GetLocalParticipants() []types.LocalParticipant {
return r.GetParticipants()
}
func (r *Room) GetParticipantCount() int {
r.lock.RLock()
defer r.lock.RUnlock()
return len(r.participants)
}
func (r *Room) GetActiveSpeakers() []*livekit.SpeakerInfo {
participants := r.GetParticipants()
speakers := make([]*livekit.SpeakerInfo, 0, len(participants))
@@ -391,11 +396,13 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me
}
})
r.Logger.Infow("new participant joined",
r.Logger.Debugw("new participant joined",
"pID", participant.ID(),
"participant", participant.Identity(),
"protocol", participant.ProtocolVersion(),
"options", opts)
"clientInfo", logger.Proto(participant.GetClientInfo()),
"options", opts,
"numParticipants", len(r.participants),
)
if participant.IsRecorder() && !r.protoRoom.ActiveRecording {
r.protoRoom.ActiveRecording = true
+5
View File
@@ -31,10 +31,15 @@ import (
"github.com/livekit/livekit-server/pkg/rtc/types/typesfakes"
"github.com/livekit/livekit-server/pkg/sfu/audio"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes"
"github.com/livekit/livekit-server/pkg/testutils"
)
func init() {
prometheus.Init("test", livekit.NodeType_SERVER, "test")
}
const (
numParticipants = 3
defaultDelay = 10 * time.Millisecond
+1 -1
View File
@@ -55,7 +55,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
participant.UpdateSubscribedTrackSettings(sid, msg.TrackSetting)
}
case *livekit.SignalRequest_Leave:
pLogger.Infow("client leaving room")
pLogger.Debugw("client leaving room")
room.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonClientRequestLeave)
case *livekit.SignalRequest_UpdateLayers:
err := room.UpdateVideoLayers(participant, msg.UpdateLayers)
-5
View File
@@ -21,13 +21,8 @@ import (
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/rtc/types/typesfakes"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
func init() {
prometheus.Init("test", livekit.NodeType_SERVER, "test")
}
func NewMockParticipant(identity livekit.ParticipantIdentity, protocol types.ProtocolVersion, hidden bool, publisher bool) *typesfakes.FakeLocalParticipant {
p := &typesfakes.FakeLocalParticipant{}
sid := utils.NewGuid(utils.ParticipantPrefix)
+4
View File
@@ -395,6 +395,7 @@ type LocalParticipant interface {
OnClose(callback func(LocalParticipant))
OnClaimsChanged(callback func(LocalParticipant))
OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport)
OnTrafficLoad(callback func(trafficLoad *TrafficLoad))
// session migration
MaybeStartMigration(force bool, onStart func()) bool
@@ -420,6 +421,8 @@ type LocalParticipant interface {
SetSubscriberChannelCapacity(channelCapacity int64)
GetPacer() pacer.Pacer
GetTrafficLoad() *TrafficLoad
}
// Room is a container of participants, and can provide room-level actions
@@ -499,6 +502,7 @@ type LocalMediaTrack interface {
HasSdpCid(cid string) bool
GetConnectionScoreAndQuality() (float32, livekit.ConnectionQuality)
GetTrackStats() *livekit.RTPStats
SetRTT(rtt uint32)
+126
View File
@@ -0,0 +1,126 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"time"
"github.com/livekit/protocol/livekit"
)
type TrafficStats struct {
StartTime time.Time
EndTime time.Time
Packets uint32
Bytes uint64
}
type TrafficTypeStats struct {
TrackType livekit.TrackType
StreamType livekit.StreamType
TrafficStats *TrafficStats
}
type TrafficLoad struct {
TrafficTypeStats []*TrafficTypeStats
}
func RTPStatsDiffToTrafficStats(before, after *livekit.RTPStats) *TrafficStats {
if after == nil {
return nil
}
startTime := after.StartTime
if before != nil {
startTime = before.EndTime
}
if before == nil {
return &TrafficStats{
StartTime: startTime.AsTime(),
EndTime: after.EndTime.AsTime(),
Packets: after.Packets,
Bytes: after.Bytes + after.BytesDuplicate + after.BytesPadding,
}
}
return &TrafficStats{
StartTime: startTime.AsTime(),
EndTime: after.EndTime.AsTime(),
Packets: after.Packets - before.Packets,
Bytes: (after.Bytes + after.BytesDuplicate + after.BytesPadding) - (before.Bytes + before.BytesDuplicate + before.BytesPadding),
}
}
func AggregateTrafficStats(statsList []*TrafficStats) *TrafficStats {
if len(statsList) == 0 {
return nil
}
startTime := time.Time{}
endTime := time.Time{}
packets := uint32(0)
bytes := uint64(0)
for _, stats := range statsList {
if startTime.IsZero() || startTime.After(stats.StartTime) {
startTime = stats.StartTime
}
if endTime.IsZero() || endTime.Before(stats.EndTime) {
endTime = stats.EndTime
}
packets += stats.Packets
bytes += stats.Bytes
}
if endTime.IsZero() {
endTime = time.Now()
}
return &TrafficStats{
StartTime: startTime,
EndTime: endTime,
Packets: packets,
Bytes: bytes,
}
}
func TrafficLoadToTrafficRate(trafficLoad *TrafficLoad) (
packetRateIn float64,
byteRateIn float64,
packetRateOut float64,
byteRateOut float64,
) {
if trafficLoad == nil {
return
}
for _, trafficTypeStat := range trafficLoad.TrafficTypeStats {
elapsed := trafficTypeStat.TrafficStats.EndTime.Sub(trafficTypeStat.TrafficStats.StartTime).Seconds()
packetRate := float64(trafficTypeStat.TrafficStats.Packets) / elapsed
byteRate := float64(trafficTypeStat.TrafficStats.Bytes) / elapsed
switch trafficTypeStat.StreamType {
case livekit.StreamType_UPSTREAM:
packetRateIn += packetRate
byteRateIn += byteRate
case livekit.StreamType_DOWNSTREAM:
packetRateOut += packetRate
byteRateOut += byteRate
}
}
return
}
@@ -107,6 +107,16 @@ type FakeLocalMediaTrack struct {
getTemporalLayerForSpatialFpsReturnsOnCall map[int]struct {
result1 int32
}
GetTrackStatsStub func() *livekit.RTPStats
getTrackStatsMutex sync.RWMutex
getTrackStatsArgsForCall []struct {
}
getTrackStatsReturns struct {
result1 *livekit.RTPStats
}
getTrackStatsReturnsOnCall map[int]struct {
result1 *livekit.RTPStats
}
HasSdpCidStub func(string) bool
hasSdpCidMutex sync.RWMutex
hasSdpCidArgsForCall []struct {
@@ -834,6 +844,59 @@ func (fake *FakeLocalMediaTrack) GetTemporalLayerForSpatialFpsReturnsOnCall(i in
}{result1}
}
func (fake *FakeLocalMediaTrack) GetTrackStats() *livekit.RTPStats {
fake.getTrackStatsMutex.Lock()
ret, specificReturn := fake.getTrackStatsReturnsOnCall[len(fake.getTrackStatsArgsForCall)]
fake.getTrackStatsArgsForCall = append(fake.getTrackStatsArgsForCall, struct {
}{})
stub := fake.GetTrackStatsStub
fakeReturns := fake.getTrackStatsReturns
fake.recordInvocation("GetTrackStats", []interface{}{})
fake.getTrackStatsMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalMediaTrack) GetTrackStatsCallCount() int {
fake.getTrackStatsMutex.RLock()
defer fake.getTrackStatsMutex.RUnlock()
return len(fake.getTrackStatsArgsForCall)
}
func (fake *FakeLocalMediaTrack) GetTrackStatsCalls(stub func() *livekit.RTPStats) {
fake.getTrackStatsMutex.Lock()
defer fake.getTrackStatsMutex.Unlock()
fake.GetTrackStatsStub = stub
}
func (fake *FakeLocalMediaTrack) GetTrackStatsReturns(result1 *livekit.RTPStats) {
fake.getTrackStatsMutex.Lock()
defer fake.getTrackStatsMutex.Unlock()
fake.GetTrackStatsStub = nil
fake.getTrackStatsReturns = struct {
result1 *livekit.RTPStats
}{result1}
}
func (fake *FakeLocalMediaTrack) GetTrackStatsReturnsOnCall(i int, result1 *livekit.RTPStats) {
fake.getTrackStatsMutex.Lock()
defer fake.getTrackStatsMutex.Unlock()
fake.GetTrackStatsStub = nil
if fake.getTrackStatsReturnsOnCall == nil {
fake.getTrackStatsReturnsOnCall = make(map[int]struct {
result1 *livekit.RTPStats
})
}
fake.getTrackStatsReturnsOnCall[i] = struct {
result1 *livekit.RTPStats
}{result1}
}
func (fake *FakeLocalMediaTrack) HasSdpCid(arg1 string) bool {
fake.hasSdpCidMutex.Lock()
ret, specificReturn := fake.hasSdpCidReturnsOnCall[len(fake.hasSdpCidArgsForCall)]
@@ -2069,6 +2132,8 @@ func (fake *FakeLocalMediaTrack) Invocations() map[string][][]interface{} {
defer fake.getQualityForDimensionMutex.RUnlock()
fake.getTemporalLayerForSpatialFpsMutex.RLock()
defer fake.getTemporalLayerForSpatialFpsMutex.RUnlock()
fake.getTrackStatsMutex.RLock()
defer fake.getTrackStatsMutex.RUnlock()
fake.hasSdpCidMutex.RLock()
defer fake.hasSdpCidMutex.RUnlock()
fake.iDMutex.RLock()
@@ -325,6 +325,16 @@ type FakeLocalParticipant struct {
getSubscribedTracksReturnsOnCall map[int]struct {
result1 []types.SubscribedTrack
}
GetTrafficLoadStub func() *types.TrafficLoad
getTrafficLoadMutex sync.RWMutex
getTrafficLoadArgsForCall []struct {
}
getTrafficLoadReturns struct {
result1 *types.TrafficLoad
}
getTrafficLoadReturnsOnCall map[int]struct {
result1 *types.TrafficLoad
}
GetTrailerStub func() []byte
getTrailerMutex sync.RWMutex
getTrailerArgsForCall []struct {
@@ -582,6 +592,11 @@ type FakeLocalParticipant struct {
onTrackUpdatedArgsForCall []struct {
arg1 func(types.LocalParticipant, types.MediaTrack)
}
OnTrafficLoadStub func(func(trafficLoad *types.TrafficLoad))
onTrafficLoadMutex sync.RWMutex
onTrafficLoadArgsForCall []struct {
arg1 func(trafficLoad *types.TrafficLoad)
}
ProtocolVersionStub func() types.ProtocolVersion
protocolVersionMutex sync.RWMutex
protocolVersionArgsForCall []struct {
@@ -2540,6 +2555,59 @@ func (fake *FakeLocalParticipant) GetSubscribedTracksReturnsOnCall(i int, result
}{result1}
}
func (fake *FakeLocalParticipant) GetTrafficLoad() *types.TrafficLoad {
fake.getTrafficLoadMutex.Lock()
ret, specificReturn := fake.getTrafficLoadReturnsOnCall[len(fake.getTrafficLoadArgsForCall)]
fake.getTrafficLoadArgsForCall = append(fake.getTrafficLoadArgsForCall, struct {
}{})
stub := fake.GetTrafficLoadStub
fakeReturns := fake.getTrafficLoadReturns
fake.recordInvocation("GetTrafficLoad", []interface{}{})
fake.getTrafficLoadMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) GetTrafficLoadCallCount() int {
fake.getTrafficLoadMutex.RLock()
defer fake.getTrafficLoadMutex.RUnlock()
return len(fake.getTrafficLoadArgsForCall)
}
func (fake *FakeLocalParticipant) GetTrafficLoadCalls(stub func() *types.TrafficLoad) {
fake.getTrafficLoadMutex.Lock()
defer fake.getTrafficLoadMutex.Unlock()
fake.GetTrafficLoadStub = stub
}
func (fake *FakeLocalParticipant) GetTrafficLoadReturns(result1 *types.TrafficLoad) {
fake.getTrafficLoadMutex.Lock()
defer fake.getTrafficLoadMutex.Unlock()
fake.GetTrafficLoadStub = nil
fake.getTrafficLoadReturns = struct {
result1 *types.TrafficLoad
}{result1}
}
func (fake *FakeLocalParticipant) GetTrafficLoadReturnsOnCall(i int, result1 *types.TrafficLoad) {
fake.getTrafficLoadMutex.Lock()
defer fake.getTrafficLoadMutex.Unlock()
fake.GetTrafficLoadStub = nil
if fake.getTrafficLoadReturnsOnCall == nil {
fake.getTrafficLoadReturnsOnCall = make(map[int]struct {
result1 *types.TrafficLoad
})
}
fake.getTrafficLoadReturnsOnCall[i] = struct {
result1 *types.TrafficLoad
}{result1}
}
func (fake *FakeLocalParticipant) GetTrailer() []byte {
fake.getTrailerMutex.Lock()
ret, specificReturn := fake.getTrailerReturnsOnCall[len(fake.getTrailerArgsForCall)]
@@ -3992,6 +4060,38 @@ func (fake *FakeLocalParticipant) OnTrackUpdatedArgsForCall(i int) func(types.Lo
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnTrafficLoad(arg1 func(trafficLoad *types.TrafficLoad)) {
fake.onTrafficLoadMutex.Lock()
fake.onTrafficLoadArgsForCall = append(fake.onTrafficLoadArgsForCall, struct {
arg1 func(trafficLoad *types.TrafficLoad)
}{arg1})
stub := fake.OnTrafficLoadStub
fake.recordInvocation("OnTrafficLoad", []interface{}{arg1})
fake.onTrafficLoadMutex.Unlock()
if stub != nil {
fake.OnTrafficLoadStub(arg1)
}
}
func (fake *FakeLocalParticipant) OnTrafficLoadCallCount() int {
fake.onTrafficLoadMutex.RLock()
defer fake.onTrafficLoadMutex.RUnlock()
return len(fake.onTrafficLoadArgsForCall)
}
func (fake *FakeLocalParticipant) OnTrafficLoadCalls(stub func(func(trafficLoad *types.TrafficLoad))) {
fake.onTrafficLoadMutex.Lock()
defer fake.onTrafficLoadMutex.Unlock()
fake.OnTrafficLoadStub = stub
}
func (fake *FakeLocalParticipant) OnTrafficLoadArgsForCall(i int) func(trafficLoad *types.TrafficLoad) {
fake.onTrafficLoadMutex.RLock()
defer fake.onTrafficLoadMutex.RUnlock()
argsForCall := fake.onTrafficLoadArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) ProtocolVersion() types.ProtocolVersion {
fake.protocolVersionMutex.Lock()
ret, specificReturn := fake.protocolVersionReturnsOnCall[len(fake.protocolVersionArgsForCall)]
@@ -6074,6 +6174,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.getSubscribedParticipantsMutex.RUnlock()
fake.getSubscribedTracksMutex.RLock()
defer fake.getSubscribedTracksMutex.RUnlock()
fake.getTrafficLoadMutex.RLock()
defer fake.getTrafficLoadMutex.RUnlock()
fake.getTrailerMutex.RLock()
defer fake.getTrailerMutex.RUnlock()
fake.handleAnswerMutex.RLock()
@@ -6142,6 +6244,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.onTrackUnpublishedMutex.RUnlock()
fake.onTrackUpdatedMutex.RLock()
defer fake.onTrackUpdatedMutex.RUnlock()
fake.onTrafficLoadMutex.RLock()
defer fake.onTrafficLoadMutex.RUnlock()
fake.protocolVersionMutex.RLock()
defer fake.protocolVersionMutex.RUnlock()
fake.removePublishedTrackMutex.RLock()
+7
View File
@@ -323,3 +323,10 @@ func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, re
}
return 0, errors.New("receiver not available")
}
func (d *DummyReceiver) GetTrackStats() *livekit.RTPStats {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetTrackStats()
}
return nil
}
+2 -2
View File
@@ -322,7 +322,7 @@ func (s *IOInfoService) GetSIPTrunkAuthentication(ctx context.Context, req *rpc.
return nil, err
}
return &rpc.GetSIPTrunkAuthenticationResponse{
Username: trunk.Username,
Password: trunk.Password,
Username: trunk.InboundUsername,
Password: trunk.InboundPassword,
}, nil
}
+4 -6
View File
@@ -290,11 +290,10 @@ func (r *RoomManager) StartSession(
return errors.New("could not restart closed participant")
}
logger.Infow("resuming RTC session",
"room", roomName,
participant.GetLogger().Infow("resuming RTC session",
"nodeID", r.currentNode.Id,
"participant", pi.Identity,
"reason", pi.ReconnectReason,
"numParticipants", room.GetParticipantCount(),
)
iceConfig := r.getIceConfig(participant)
if iceConfig == nil {
@@ -340,12 +339,11 @@ func (r *RoomManager) StartSession(
"room", roomName,
"nodeID", r.currentNode.Id,
"participant", pi.Identity,
"sdk", pi.Client.Sdk,
"sdkVersion", pi.Client.Version,
"protocol", pi.Client.Protocol,
"clientInfo", logger.Proto(pi.Client),
"reconnect", pi.Reconnect,
"reconnectReason", pi.ReconnectReason,
"adaptiveStream", pi.AdaptiveStream,
"numParticipants", room.GetParticipantCount(),
)
clientConf := r.clientConfManager.GetConfiguration(pi.Client)
+51 -9
View File
@@ -16,14 +16,15 @@ package service
import (
"context"
"fmt"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/telemetry"
)
type SIPService struct {
@@ -65,8 +66,10 @@ func (s *SIPService) CreateSIPTrunk(ctx context.Context, req *livekit.CreateSIPT
OutboundAddress: req.OutboundAddress,
OutboundNumber: req.OutboundNumber,
InboundNumbersRegex: req.InboundNumbersRegex,
Username: req.Username,
Password: req.Password,
InboundUsername: req.InboundUsername,
InboundPassword: req.InboundPassword,
OutboundUsername: req.OutboundUsername,
OutboundPassword: req.OutboundPassword,
}
if err := s.store.StoreSIPTrunk(ctx, info); err != nil {
@@ -159,15 +162,44 @@ func (s *SIPService) CreateSIPParticipant(ctx context.Context, req *livekit.Crea
}
info := &livekit.SIPParticipantInfo{
SipParticipantId: utils.NewGuid(utils.SIPParticipantPrefix),
SipParticipantId: utils.NewGuid(utils.SIPParticipantPrefix),
SipTrunkId: req.SipTrunkId,
SipCallTo: req.SipCallTo,
RoomName: req.RoomName,
ParticipantIdentity: req.ParticipantIdentity,
}
if err := s.store.StoreSIPParticipant(ctx, info); err != nil {
return nil, err
}
s.updateParticipant(ctx, info)
return info, nil
}
func (s *SIPService) updateParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) {
AppendLogFields(ctx, "participantId", info.SipParticipantId, "room", info.RoomName, "trunk", info.SipTrunkId, "to", info.SipCallTo)
req := &rpc.InternalUpdateSIPParticipantRequest{
ParticipantId: info.SipParticipantId,
CallTo: info.SipCallTo,
RoomName: info.RoomName,
ParticipantIdentity: info.ParticipantIdentity,
}
if info.SipTrunkId != "" {
trunk, err := s.store.LoadSIPTrunk(ctx, info.SipTrunkId)
if err != nil {
logger.Errorw("cannot get trunk to update sip participant", err)
return
}
req.Address = trunk.OutboundAddress
req.Number = trunk.OutboundNumber
req.Username = trunk.OutboundUsername
req.Password = trunk.OutboundPassword
}
if _, err := s.psrpcClient.UpdateSIPParticipant(ctx, req); err != nil {
logger.Errorw("cannot update sip participant", err)
}
}
func (s *SIPService) ListSIPParticipant(ctx context.Context, req *livekit.ListSIPParticipantRequest) (*livekit.ListSIPParticipantResponse, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
@@ -194,7 +226,10 @@ func (s *SIPService) DeleteSIPParticipant(ctx context.Context, req *livekit.Dele
if err = s.store.DeleteSIPParticipant(ctx, info); err != nil {
return nil, err
}
// These indicate that the call should be disconnected
info.SipTrunkId = ""
info.SipCallTo = ""
s.updateParticipant(ctx, info)
return info, nil
}
@@ -202,6 +237,13 @@ func (s *SIPService) SendSIPParticipantDTMF(ctx context.Context, req *livekit.Se
if s.store == nil {
return nil, ErrSIPNotConnected
}
return nil, fmt.Errorf("TODO")
AppendLogFields(ctx, "participantId", req.SipParticipantId)
_, err := s.psrpcClient.SendSIPParticipantDTMF(ctx, &rpc.InternalSendSIPParticipantDTMFRequest{
ParticipantId: req.SipParticipantId,
Digits: req.Digits,
})
if err != nil {
logger.Errorw("cannot send dtmf to sip participant", err)
}
return nil, err
}
+3 -3
View File
@@ -60,7 +60,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
egressClient, err := rpc.NewEgressClient(messageBus)
clientParams := getPSRPCClientParams(psrpcConfig, messageBus)
egressClient, err := rpc.NewEgressClient(clientParams)
if err != nil {
return nil, err
}
@@ -83,7 +84,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService)
topicFormatter := rpc.NewTopicFormatter()
clientParams := getPSRPCClientParams(psrpcConfig, messageBus)
roomClient, err := rpc.NewTypedRoomClient(clientParams)
if err != nil {
return nil, err
@@ -98,7 +98,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
egressService := NewEgressService(egressClient, rtcEgressLauncher, objectStore, ioInfoService, roomService)
ingressConfig := getIngressConfig(conf)
ingressClient, err := rpc.NewIngressClient(messageBus)
ingressClient, err := rpc.NewIngressClient(clientParams)
if err != nil {
return nil, err
}
+2 -3
View File
@@ -47,7 +47,6 @@ type DependencyDescriptorParser struct {
}
func NewDependencyDescriptorParser(ddExtID uint8, logger logger.Logger, onMaxLayerChanged func(int32, int32)) *DependencyDescriptorParser {
logger.Infow("creating dependency descriptor parser", "ddExtID", ddExtID)
return &DependencyDescriptorParser{
ddExtID: ddExtID,
logger: logger,
@@ -86,7 +85,7 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr
_, err := ext.Unmarshal(ddBuf)
if err != nil {
if err != dd.ErrDDReaderNoStructure {
r.logger.Warnw("failed to parse generic dependency descriptor", err, "payload", pkt.PayloadType, "ddbufLen", len(ddBuf))
r.logger.Infow("failed to parse generic dependency descriptor", err, "payload", pkt.PayloadType, "ddbufLen", len(ddBuf))
}
return nil, videoLayer, err
}
@@ -119,7 +118,7 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr
}
if r.structure == nil || ddVal.AttachedStructure.StructureId != r.structure.StructureId {
r.logger.Infow("structure updated", "structureID", ddVal.AttachedStructure.StructureId, "extSeq", extSeq, "extFN", extFN, "descriptor", ddVal.String())
r.logger.Debugw("structure updated", "structureID", ddVal.AttachedStructure.StructureId, "extSeq", extSeq, "extFN", extFN, "descriptor", ddVal.String())
}
r.structure = ddVal.AttachedStructure
r.decodeTargets = ProcessFrameDependencyStructure(ddVal.AttachedStructure)
+3 -167
View File
@@ -24,6 +24,7 @@ import (
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)
const (
@@ -758,7 +759,7 @@ func (r *rtpStatsBase) toProto(
}
if gapsPresent {
p.GapHistogram = make(map[int32]uint32, cGapHistogramNumBins)
p.GapHistogram = make(map[int32]uint32, len(r.gapHistogram))
for i := 0; i < len(r.gapHistogram); i++ {
if r.gapHistogram[i] == 0 {
continue
@@ -915,172 +916,7 @@ func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snaps
// ----------------------------------
func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
if len(statsList) == 0 {
return nil
}
startTime := time.Time{}
endTime := time.Time{}
packets := uint32(0)
bytes := uint64(0)
headerBytes := uint64(0)
packetsLost := uint32(0)
packetsDuplicate := uint32(0)
bytesDuplicate := uint64(0)
headerBytesDuplicate := uint64(0)
packetsPadding := uint32(0)
bytesPadding := uint64(0)
headerBytesPadding := uint64(0)
packetsOutOfOrder := uint32(0)
frames := uint32(0)
keyFrames := uint32(0)
lastKeyFrame := time.Time{}
jitter := 0.0
maxJitter := float64(0)
gapHistogram := make(map[int32]uint32, cGapHistogramNumBins)
nacks := uint32(0)
nackAcks := uint32(0)
nackMisses := uint32(0)
nackRepeated := uint32(0)
plis := uint32(0)
lastPli := time.Time{}
layerLockPlis := uint32(0)
lastLayerLockPli := time.Time{}
firs := uint32(0)
lastFir := time.Time{}
rtt := uint32(0)
maxRtt := uint32(0)
for _, stats := range statsList {
if startTime.IsZero() || startTime.After(stats.StartTime.AsTime()) {
startTime = stats.StartTime.AsTime()
}
if endTime.IsZero() || endTime.Before(stats.EndTime.AsTime()) {
endTime = stats.EndTime.AsTime()
}
packets += stats.Packets
bytes += stats.Bytes
headerBytes += stats.HeaderBytes
packetsLost += stats.PacketsLost
packetsDuplicate += stats.PacketsDuplicate
bytesDuplicate += stats.BytesDuplicate
headerBytesDuplicate += stats.HeaderBytesDuplicate
packetsPadding += stats.PacketsPadding
bytesPadding += stats.BytesPadding
headerBytesPadding += stats.HeaderBytesPadding
packetsOutOfOrder += stats.PacketsOutOfOrder
frames += stats.Frames
keyFrames += stats.KeyFrames
if lastKeyFrame.IsZero() || lastKeyFrame.Before(stats.LastKeyFrame.AsTime()) {
lastKeyFrame = stats.LastKeyFrame.AsTime()
}
jitter += stats.JitterCurrent
if stats.JitterMax > maxJitter {
maxJitter = stats.JitterMax
}
for burst, count := range stats.GapHistogram {
gapHistogram[burst] += count
}
nacks += stats.Nacks
nackAcks += stats.NackAcks
nackMisses += stats.NackMisses
nackRepeated += stats.NackRepeated
plis += stats.Plis
if lastPli.IsZero() || lastPli.Before(stats.LastPli.AsTime()) {
lastPli = stats.LastPli.AsTime()
}
layerLockPlis += stats.LayerLockPlis
if lastLayerLockPli.IsZero() || lastLayerLockPli.Before(stats.LastLayerLockPli.AsTime()) {
lastLayerLockPli = stats.LastLayerLockPli.AsTime()
}
firs += stats.Firs
if lastFir.IsZero() || lastPli.Before(stats.LastFir.AsTime()) {
lastFir = stats.LastFir.AsTime()
}
rtt += stats.RttCurrent
if stats.RttMax > maxRtt {
maxRtt = stats.RttMax
}
}
if endTime.IsZero() {
endTime = time.Now()
}
elapsed := endTime.Sub(startTime).Seconds()
packetLostRate := float64(packetsLost) / elapsed
packetLostPercentage := float32(packetsLost) / (float32(packets) + float32(packetsLost)) * 100.0
packetRate := float64(packets) / elapsed
packetDuplicateRate := float64(packetsDuplicate) / elapsed
packetPaddingRate := float64(packetsPadding) / elapsed
bitrate := float64(bytes) * 8.0 / elapsed
bitrateDuplicate := float64(bytesDuplicate) * 8.0 / elapsed
bitratePadding := float64(bytesPadding) * 8.0 / elapsed
frameRate := float64(frames) / elapsed
return &livekit.RTPStats{
StartTime: timestamppb.New(startTime),
EndTime: timestamppb.New(endTime),
Duration: elapsed,
Packets: packets,
PacketRate: packetRate,
Bytes: bytes,
HeaderBytes: headerBytes,
Bitrate: bitrate,
PacketsLost: packetsLost,
PacketLossRate: packetLostRate,
PacketLossPercentage: packetLostPercentage,
PacketsDuplicate: packetsDuplicate,
PacketDuplicateRate: packetDuplicateRate,
BytesDuplicate: bytesDuplicate,
HeaderBytesDuplicate: headerBytesDuplicate,
BitrateDuplicate: bitrateDuplicate,
PacketsPadding: packetsPadding,
PacketPaddingRate: packetPaddingRate,
BytesPadding: bytesPadding,
HeaderBytesPadding: headerBytesPadding,
BitratePadding: bitratePadding,
PacketsOutOfOrder: packetsOutOfOrder,
Frames: frames,
FrameRate: frameRate,
KeyFrames: keyFrames,
LastKeyFrame: timestamppb.New(lastKeyFrame),
JitterCurrent: jitter / float64(len(statsList)),
JitterMax: maxJitter,
GapHistogram: gapHistogram,
Nacks: nacks,
NackAcks: nackAcks,
NackMisses: nackMisses,
NackRepeated: nackRepeated,
Plis: plis,
LastPli: timestamppb.New(lastPli),
LayerLockPlis: layerLockPlis,
LastLayerLockPli: timestamppb.New(lastLayerLockPli),
Firs: firs,
LastFir: timestamppb.New(lastFir),
RttCurrent: rtt / uint32(len(statsList)),
RttMax: maxRtt,
// no aggregation for drift calculations
}
return utils.AggregateRTPStats(statsList, cGapHistogramNumBins)
}
func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
+1
View File
@@ -544,6 +544,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
"extLastRRSN", s.extLastRRSN,
"firstTime", r.firstTime.String(),
"startTime", r.startTime.String(),
"highestTime", r.highestTime.String(),
"extReceivedRRSN", extReceivedRRSN,
"packetsInInterval", extReceivedRRSN-s.extLastRRSN,
"intervalStats", is.ToString(),
+6 -1
View File
@@ -32,7 +32,12 @@ func LayerPresenceFromTrackInfo(trackInfo *livekit.TrackInfo) *[livekit.VideoQua
var layerPresence [livekit.VideoQuality_HIGH + 1]bool
for _, layer := range trackInfo.Layers {
layerPresence[layer.Quality] = true
// WARNING: comparing protobuf enum
if layer.Quality <= livekit.VideoQuality_HIGH {
layerPresence[layer.Quality] = true
} else {
logger.Warnw("unexpected quality in track info", nil, "trackInfo", logger.Proto(trackInfo))
}
}
return &layerPresence
@@ -259,7 +259,7 @@ func TestConnectionQuality(t *testing.T) {
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
// next update with no packets should knock quality down
// next update with no packets should knock quality down to LOST
now = now.Add(duration)
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
@@ -273,12 +273,36 @@ func TestConnectionQuality(t *testing.T) {
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(2.1), mos)
require.Equal(t, livekit.ConnectionQuality_POOR, quality)
require.Equal(t, livekit.ConnectionQuality_LOST, quality)
// mute/unmute to bring quality back up
// mute when LOST should not bump up score/quality
now = now.Add(duration)
cs.UpdateMuteAt(true, now.Add(1*time.Second))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(2.1), mos)
require.Equal(t, livekit.ConnectionQuality_LOST, quality)
// unmute and send packets to bring quality back up
now = now.Add(duration)
cs.UpdateMuteAt(false, now.Add(2*time.Second))
for i := 0; i < 3; i++ {
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
Packets: 250,
PacketsLost: 0,
},
},
})
cs.updateScoreAt(now.Add(duration))
now = now.Add(duration)
}
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
// with lesser number of packet (simulating DTX).
// even higher loss (like 10%) should not knock down quality due to quadratic weighting of packet loss ratio
+40 -24
View File
@@ -29,9 +29,9 @@ const (
MaxMOS = float32(4.5)
MinMOS = float32(1.0)
maxScore = float64(100.0)
poorScore = float64(30.0)
minScore = float64(20.0)
cMaxScore = float64(100.0)
cMinScore = float64(20.0)
cPausedPoorScore = float64(30.0)
increaseFactor = float64(0.4) // slower increase, i. e. when score is recovering move up slower -> conservative
decreaseFactor = float64(0.7) // faster decrease, i. e. when score is dropping move down faster -> aggressive to be responsive to quality drops
@@ -41,6 +41,14 @@ const (
unmuteTimeThreshold = float64(0.5)
)
var (
qualityTransitionScore = map[livekit.ConnectionQuality]float64{
livekit.ConnectionQuality_GOOD: 80,
livekit.ConnectionQuality_POOR: 40,
livekit.ConnectionQuality_LOST: 20,
}
)
// ------------------------------------------
type windowStat struct {
@@ -107,7 +115,7 @@ func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJ
}
lossEffect *= plw
score := maxScore - delayEffect - lossEffect
score := cMaxScore - delayEffect - lossEffect
if score < 0.0 {
score = 0.0
}
@@ -118,7 +126,7 @@ func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJ
func (w *windowStat) calculateBitrateScore(expectedBitrate int64) float64 {
if expectedBitrate == 0 {
// unsupported mode OR all layers stopped
return maxScore
return cMaxScore
}
var score float64
@@ -126,9 +134,9 @@ func (w *windowStat) calculateBitrateScore(expectedBitrate int64) float64 {
// using the ratio of expectedBitrate / actualBitrate
// the quality inflection points are approximately
// GOOD at ~2.7x, POOR at ~20.1x
score = maxScore - 20*math.Log(float64(expectedBitrate)/float64(w.bytes*8))
if score > maxScore {
score = maxScore
score = cMaxScore - 20*math.Log(float64(expectedBitrate)/float64(w.bytes*8))
if score > cMaxScore {
score = cMaxScore
}
if score < 0.0 {
score = 0.0
@@ -188,7 +196,7 @@ type qualityScorer struct {
func newQualityScorer(params qualityScorerParams) *qualityScorer {
return &qualityScorer{
params: params,
score: maxScore,
score: cMaxScore,
aggregateBitrate: utils.NewTimedAggregator[int64](utils.TimedAggregatorParams{
CapNegativeValues: true,
}),
@@ -219,7 +227,10 @@ func (q *qualityScorer) Start() {
func (q *qualityScorer) updateMuteAtLocked(isMuted bool, at time.Time) {
if isMuted {
q.mutedAt = at
q.score = maxScore
// muting when LOST should not push quality to EXCELLENT
if q.score != qualityTransitionScore[livekit.ConnectionQuality_LOST] {
q.score = cMaxScore
}
} else {
q.unmutedAt = at
}
@@ -264,7 +275,7 @@ func (q *qualityScorer) updateLayerMuteAtLocked(isMuted bool, at time.Time) {
q.layerDistance.Reset()
q.layerMutedAt = at
q.score = maxScore
q.score = cMaxScore
}
} else {
if q.isLayerMuted() {
@@ -294,7 +305,7 @@ func (q *qualityScorer) updatePauseAtLocked(isPaused bool, at time.Time) {
q.layerDistance.Reset()
q.pausedAt = at
q.score = poorScore
q.score = cPausedPoorScore
}
} else {
if q.isPaused() {
@@ -353,7 +364,7 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) {
// considered (as long as enough time has passed since unmute).
//
// Similarly, when paused (possibly due to congestion), score is immediately
// set to poorScore for responsiveness. The layer transision is reest.
// set to cPausedPoorScore for responsiveness. The layer transision is reest.
// On a resume, quality climbs back up using normal operation.
if q.isMuted() || !q.isUnmutedEnough(at) || q.isLayerMuted() || q.isPaused() {
q.lastUpdateAt = at
@@ -365,11 +376,11 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) {
var score float64
if stat.packetsExpected == 0 {
reason = "dry"
score = poorScore
score = qualityTransitionScore[livekit.ConnectionQuality_LOST]
} else {
packetScore := stat.calculatePacketScore(plw, q.params.IncludeRTT, q.params.IncludeJitter)
bitrateScore := stat.calculateBitrateScore(expectedBitrate)
layerScore := math.Max(math.Min(maxScore, maxScore-(expectedDistance*distanceWeight)), 0.0)
layerScore := math.Max(math.Min(cMaxScore, cMaxScore-(expectedDistance*distanceWeight)), 0.0)
minScore := math.Min(packetScore, bitrateScore)
minScore = math.Min(minScore, layerScore)
@@ -394,22 +405,23 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) {
}
score = factor*score + (1.0-factor)*q.score
}
if score < minScore {
if score < cMinScore {
// lower bound to prevent score from becoming very small values due to extreme conditions.
// Without a lower bound, it can get so low that it takes a long time to climb back to
// better quality even under excellent conditions.
score = minScore
score = cMinScore
}
// WARNING NOTE: comparing protobuf enum values directly (livekit.ConnectionQuality)
if scoreToConnectionQuality(q.score) > scoreToConnectionQuality(score) {
prevCQ := scoreToConnectionQuality(q.score)
currCQ := scoreToConnectionQuality(score)
if utils.IsConnectionQualityLower(prevCQ, currCQ) {
q.params.Logger.Infow(
"quality drop",
"reason", reason,
"prevScore", q.score,
"prevQuality", scoreToConnectionQuality(q.score),
"prevQuality", prevCQ,
"prevStat", &q.stat,
"score", score,
"quality", scoreToConnectionQuality(score),
"quality", currCQ,
"stat", stat,
"packetLossWeight", plw,
"maxPPS", q.maxPPS,
@@ -523,15 +535,19 @@ func scoreToConnectionQuality(score float64) livekit.ConnectionQuality {
// that a score of 60 does not correspond to `POOR` quality. Repair
// mechanisms and use of algorithms like de-jittering makes the experience
// better even under harsh conditions.
if score > 80.0 {
if score > qualityTransitionScore[livekit.ConnectionQuality_GOOD] {
return livekit.ConnectionQuality_EXCELLENT
}
if score > 40.0 {
if score > qualityTransitionScore[livekit.ConnectionQuality_POOR] {
return livekit.ConnectionQuality_GOOD
}
return livekit.ConnectionQuality_POOR
if score > qualityTransitionScore[livekit.ConnectionQuality_LOST] {
return livekit.ConnectionQuality_POOR
}
return livekit.ConnectionQuality_LOST
}
// ------------------------------------------
+3 -1
View File
@@ -82,6 +82,8 @@ type TrackReceiver interface {
GetCalculatedClockRate(layer int32) uint32
GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error)
GetTrackStats() *livekit.RTPStats
}
// WebRTCReceiver receives a media track
@@ -558,7 +560,7 @@ func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
var stats []*livekit.RTPStats
stats := make([]*livekit.RTPStats, 0, len(w.buffers))
for _, buff := range w.buffers {
if buff == nil {
continue
@@ -111,8 +111,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
}
if ddwdt.StructureUpdated {
// TODO-REMOVE: remove this log after stable
d.logger.Infow("update dependency structure",
d.logger.Debugw("update dependency structure",
"structureID", dd.AttachedStructure.StructureId,
"structure", dd.AttachedStructure,
"decodeTargets", ddwdt.DecodeTargets,
@@ -127,8 +126,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
if ddwdt.ExtKeyFrameNum != d.extKeyFrameNum {
// keyframe mismatch, drop and reset chains
// TODO-REMOVE: remove this log after stable
d.logger.Infow("drop packet for keyframe mismatch", "incoming", incomingLayer, "efn", extFrameNum, "sn", extPkt.Packet.SequenceNumber, "requiredKeyFrame", ddwdt.ExtKeyFrameNum, "structureKeyFrame", d.extKeyFrameNum)
d.logger.Debugw("drop packet for keyframe mismatch", "incoming", incomingLayer, "efn", extFrameNum, "sn", extPkt.Packet.SequenceNumber, "requiredKeyFrame", ddwdt.ExtKeyFrameNum, "structureKeyFrame", d.extKeyFrameNum)
d.decisions.AddDropped(extFrameNum)
d.invalidateKeyFrame()
return
@@ -138,9 +136,8 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
d.updateActiveDecodeTargets(*dd.ActiveDecodeTargetsBitmask)
}
// TODO-REMOVE: remove this log after stable
if len(fd.ChainDiffs) != len(d.chains) {
d.logger.Warnw("frame chain diff length mismatch", nil,
d.logger.Debugw("frame chain diff length mismatch", nil,
"incoming", incomingLayer,
"efn", extFrameNum,
"sn", extPkt.Packet.SequenceNumber,
@@ -255,7 +252,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
result.IsSwitching = true
if !d.currentLayer.IsValid() {
result.IsResuming = true
d.logger.Infow(
d.logger.Debugw(
"resuming at layer",
"current", incomingLayer,
"target", d.targetLayer,
+33 -5
View File
@@ -32,13 +32,27 @@ const (
BytesTrackTypeSignal BytesTrackType = "SG"
)
// -------------------------------
type TrafficTotals struct {
At time.Time
SendBytes uint64
SendMessages uint32
RecvBytes uint64
RecvMessages uint32
}
// --------------------------------
// stats for signal and data channel
type BytesTrackStats struct {
trackID livekit.TrackID
pID livekit.ParticipantID
send, recv atomic.Uint64
telemetry TelemetryService
isStopped atomic.Bool
trackID livekit.TrackID
pID livekit.ParticipantID
send, recv atomic.Uint64
totalSendBytes, totalRecvBytes atomic.Uint64
totalSendMessages, totalRecvMessages atomic.Uint32
telemetry TelemetryService
isStopped atomic.Bool
}
func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, telemetry TelemetryService) *BytesTrackStats {
@@ -54,8 +68,22 @@ func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, tele
func (s *BytesTrackStats) AddBytes(bytes uint64, isSend bool) {
if isSend {
s.send.Add(bytes)
s.totalSendBytes.Add(bytes)
s.totalSendMessages.Inc()
} else {
s.recv.Add(bytes)
s.totalRecvBytes.Add(bytes)
s.totalRecvMessages.Inc()
}
}
func (s *BytesTrackStats) GetTrafficTotals() *TrafficTotals {
return &TrafficTotals{
At: time.Now(),
SendBytes: s.totalSendBytes.Load(),
SendMessages: s.totalSendMessages.Load(),
RecvBytes: s.totalRecvBytes.Load(),
RecvMessages: s.totalRecvMessages.Load(),
}
}