diff --git a/go.mod b/go.mod index afee33447..220abd88a 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 github.com/livekit/protocol v0.13.3-0.20220526091938-2ecd4805e7e2 + github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a github.com/mackerelio/go-osstat v0.2.1 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 @@ -48,12 +49,15 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect + github.com/d5/tengo/v2 v2.10.1 github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/eapache/channels v1.1.0 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/go-logr/zapr v1.2.3 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/google/subcommands v1.2.0 // indirect github.com/google/uuid v1.3.0 // indirect github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b // indirect github.com/lithammer/shortuuid/v3 v3.0.6 // indirect @@ -64,29 +68,23 @@ require ( github.com/pion/mdns v0.0.5 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/sctp v1.8.2 // indirect - github.com/pion/srtp/v2 v2.0.7 // indirect + github.com/pion/srtp/v2 v2.0.9 // indirect github.com/pion/udp v0.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect go.uber.org/multierr v1.6.0 // indirect + golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect + golang.org/x/mod v0.5.1 // indirect + golang.org/x/net v0.0.0-20220526153639-5463443f8c37 // indirect + golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.7 // indirect + golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect google.golang.org/grpc v1.42.0 // indirect gopkg.in/square/go-jose.v2 v2.5.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) - -require ( - github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect - github.com/d5/tengo/v2 v2.10.1 - github.com/google/subcommands v1.2.0 // indirect - github.com/russross/blackfriday/v2 v2.1.0 // indirect - golang.org/x/crypto v0.0.0-20220516162934-403b01795ae8 // indirect - golang.org/x/mod v0.5.1 // indirect - golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect - golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect - golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect -) diff --git a/go.sum b/go.sum index 4029948a6..979e8c264 100644 --- a/go.sum +++ b/go.sum @@ -133,6 +133,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/protocol v0.13.3-0.20220526091938-2ecd4805e7e2 h1:ufUi4uR5OgfnrweGTWXkplxudLeZw9/gtkgws2oszAo= github.com/livekit/protocol v0.13.3-0.20220526091938-2ecd4805e7e2/go.mod h1:BLtSeVmn2rLP37xjzw7gHgaAmkWl3L/L9bPvgSbaOfo= +github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I= +github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.1 h1:5AeAcBEutEErAOlDz6WCkEvm6AKYgHTUQrfwm5RbeQc= github.com/mackerelio/go-osstat v0.2.1/go.mod h1:UzRL8dMCCTqG5WdRtsxbuljMpZt9PCAGXqxPst5QtaY= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= @@ -193,8 +195,9 @@ github.com/pion/sctp v1.8.2 h1:yBBCIrUMJ4yFICL3RIvR4eh/H2BTTvlligmSTy+3kiA= github.com/pion/sctp v1.8.2/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s= github.com/pion/sdp/v3 v3.0.5 h1:ouvI7IgGl+V4CrqskVtr3AaTrPvPisEOxwgpdktctkU= github.com/pion/sdp/v3 v3.0.5/go.mod h1:iiFWFpQO8Fy3S5ldclBkpXqmWy02ns78NOKoLLL0YQw= -github.com/pion/srtp/v2 v2.0.7 h1:1ODEFojQu3gVLmqOrTVVjzsrxfx1UHLk3LnKHjfWjS0= github.com/pion/srtp/v2 v2.0.7/go.mod h1:5TtM9yw6lsH0ppNCehB/EjEUli7VkUgKSPJqWVqbhQ4= +github.com/pion/srtp/v2 v2.0.9 h1:JJq3jClmDFBPX/F5roEb0U19jSU7eUhyDqR/NZ34EKQ= +github.com/pion/srtp/v2 v2.0.9/go.mod h1:5TtM9yw6lsH0ppNCehB/EjEUli7VkUgKSPJqWVqbhQ4= github.com/pion/stun v0.3.5 h1:uLUCBCkQby4S1cf6CGuR9QrVOKcvUwFeemaC865QHDg= github.com/pion/stun v0.3.5/go.mod h1:gDMim+47EeEtfWogA37n6qXZS88L5V6LqFcf+DZA2UA= github.com/pion/transport v0.12.2/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q= @@ -289,8 +292,9 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220516162934-403b01795ae8 h1:y+mHpWoQJNAHt26Nhh6JP7hvM71IRZureyvZhoVALIs= golang.org/x/crypto v0.0.0-20220516162934-403b01795ae8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= +golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -326,8 +330,9 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211201190559-0a0e4e1bb54c/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220401154927-543a649e0bdd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 h1:HVyaeDAYux4pnY+D/SiwmLOR36ewZ4iGQIIrtnuCjFA= golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220526153639-5463443f8c37 h1:lUkvobShwKsOesNfWWlCS5q7fnbG1MEliIzwu886fn8= +golang.org/x/net v0.0.0-20220526153639-5463443f8c37/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -365,8 +370,8 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 h1:nonptSpoQ4vQjyraW20DXPAglgQfVnM9ZC6MmNLMR60= -golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index ebb3bd7ac..848c3912c 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -155,7 +155,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra receiver, track, t.PublisherID(), - t.params.TrackInfo.Source, + t.params.TrackInfo, LoggerWithCodecMime(t.params.Logger, mime), twcc, sfu.WithPliThrottleConfig(t.params.PLIThrottleConfig), diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index e75972825..e97d12fca 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -19,6 +19,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/livekit-server/pkg/utils" ) const ( @@ -107,7 +108,7 @@ func (t *MediaTrackReceiver) Restart() { t.lock.Unlock() for _, receiver := range receivers { - receiver.SetMaxExpectedSpatialLayer(SpatialLayerForQuality(livekit.VideoQuality_HIGH)) + receiver.SetMaxExpectedSpatialLayer(utils.SpatialLayerForQuality(livekit.VideoQuality_HIGH)) } t.MediaTrackSubscriptions.Restart() @@ -592,7 +593,7 @@ func (t *MediaTrackReceiver) OnSubscribedMaxQualityChange(f func(trackID livekit for _, q := range maxSubscribedQualities { receiver := t.Receiver(q.CodecMime) if receiver != nil { - receiver.SetMaxExpectedSpatialLayer(SpatialLayerForQuality(q.Quality)) + receiver.SetMaxExpectedSpatialLayer(utils.SpatialLayerForQuality(q.Quality)) } } }) @@ -600,21 +601,6 @@ func (t *MediaTrackReceiver) OnSubscribedMaxQualityChange(f func(trackID livekit // --------------------------- -func SpatialLayerForQuality(quality livekit.VideoQuality) int32 { - switch quality { - case livekit.VideoQuality_LOW: - return 0 - case livekit.VideoQuality_MEDIUM: - return 1 - case livekit.VideoQuality_HIGH: - return 2 - case livekit.VideoQuality_OFF: - return -1 - default: - return -1 - } -} - func QualityForSpatialLayer(layer int32) livekit.VideoQuality { switch layer { case 0: diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index bfa10878b..84e1b8bf7 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -735,7 +735,10 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo if subTrack.IsMuted() || subTrack.MediaTrack().IsMuted() { continue } - totalScore += subTrack.DownTrack().GetConnectionScore() + score := subTrack.DownTrack().GetConnectionScore() + + totalScore += score + numTracks++ } p.lock.RUnlock() @@ -1447,7 +1450,9 @@ func (p *ParticipantImpl) getPublisherConnectionQuality() (totalScore float32, n if pt.IsMuted() { continue } - totalScore += pt.(types.LocalMediaTrack).GetConnectionScore() + score := pt.(types.LocalMediaTrack).GetConnectionScore() + + totalScore += score numTracks++ } diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index e95d1a110..d18577ab3 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" @@ -221,31 +222,28 @@ func TestMuteSetting(t *testing.T) { } func TestConnectionQuality(t *testing.T) { - // loss based score is currently a publisher method. - videoScore := func(loss float32, numPublishing, numRegistered uint32) float32 { - var reducedQuality bool - if numRegistered > 0 && numPublishing != numRegistered { - reducedQuality = true - } - return connectionquality.VideoConnectionScore(loss, reducedQuality) + + videoScore := func(totalBytes int64, totalFrames int64, qualityParam *buffer.ConnectionQualityParams, + codec string, expectedHeight int32, expectedWidth int32, actualHeight int32, actualWidth int32) float32 { + return connectionquality.VideoConnectionScore(1*time.Second, totalBytes, totalFrames, qualityParam, codec, + expectedHeight, expectedWidth, actualHeight, actualWidth) } - testPublishedVideoTrack := func(loss float32, numPublishing, numRegistered uint32) *typesfakes.FakeLocalMediaTrack { + testPublishedVideoTrack := func(totalBytes int64, totalFrames int64, qualityParam *buffer.ConnectionQualityParams, + codec string, expectedHeight int32, expectedWidth int32, actualHeight int32, actualWidth int32) *typesfakes.FakeLocalMediaTrack { tr := &typesfakes.FakeLocalMediaTrack{} - score := videoScore(loss, numPublishing, numRegistered) + score := videoScore(totalBytes, totalFrames, qualityParam, codec, expectedHeight, expectedWidth, + actualHeight, actualWidth) t.Log("video score: ", score) tr.GetConnectionScoreReturns(score) return tr } - testPublishedAudioTrack := func(totalPackets, packetsLost uint32) *typesfakes.FakeLocalMediaTrack { + testPublishedAudioTrack := func(totalBytes int64, qualityParam *buffer.ConnectionQualityParams, + dtxDisabled bool) *typesfakes.FakeLocalMediaTrack { tr := &typesfakes.FakeLocalMediaTrack{} - pctLoss := float32(0.0) - if totalPackets > 0 { - pctLoss = (float32(packetsLost) / float32(totalPackets)) * 100.0 - } - score := connectionquality.AudioConnectionScore(pctLoss, 0, 0.0) + score := connectionquality.AudioConnectionScore(1*time.Second, totalBytes, qualityParam, dtxDisabled) t.Log("audio score: ", score) tr.GetConnectionScoreReturns(score) return tr @@ -256,33 +254,78 @@ func TestConnectionQuality(t *testing.T) { t.Run("smooth sailing", func(t *testing.T) { p := newParticipantForTest("test") - p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(2, 3, 3) - p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, 0) + + // >2Mbps, 30fps, expected/actual video size = 1280x720 + p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(290000, 30, &buffer.ConnectionQualityParams{}, + "", 720, 1280, 720, 1280) + + // no packet loss + p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, &buffer.ConnectionQualityParams{}, false) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, p.GetConnectionQuality().GetQuality()) }) t.Run("reduced publishing", func(t *testing.T) { p := newParticipantForTest("test") - p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(3, 2, 3) - p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, 100) + + // 1Mbps, 15fps, expected = 1280x720, actual = 640 x 480 + p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(25000, 15, &buffer.ConnectionQualityParams{}, + "", 720, 1280, 480, 640) + + // packet loss of 10% + p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, &buffer.ConnectionQualityParams{LossPercentage: 5}, false) require.Equal(t, livekit.ConnectionQuality_GOOD, p.GetConnectionQuality().GetQuality()) }) t.Run("audio smooth publishing", func(t *testing.T) { p := newParticipantForTest("test") - p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, 10) + // no packet loss + p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, &buffer.ConnectionQualityParams{}, false) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, p.GetConnectionQuality().GetQuality()) }) t.Run("audio reduced publishing", func(t *testing.T) { p := newParticipantForTest("test") - p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, 100) + p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, &buffer.ConnectionQualityParams{LossPercentage: 5}, false) require.Equal(t, livekit.ConnectionQuality_GOOD, p.GetConnectionQuality().GetQuality()) }) + t.Run("audio bad publishing", func(t *testing.T) { + p := newParticipantForTest("test") + p.UpTrackManager.publishedTracks["audio"] = testPublishedAudioTrack(1000, &buffer.ConnectionQualityParams{LossPercentage: 20}, false) + + require.Equal(t, livekit.ConnectionQuality_POOR, p.GetConnectionQuality().GetQuality()) + }) + + t.Run("video smooth publishing", func(t *testing.T) { + p := newParticipantForTest("test") + + // >2Mbps, 30fps, expected/actual video size = 1280x720 + p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(290000, 30, &buffer.ConnectionQualityParams{}, + "", 720, 1280, 720, 1280) + + require.Equal(t, livekit.ConnectionQuality_EXCELLENT, p.GetConnectionQuality().GetQuality()) + }) + t.Run("video reduced publishing", func(t *testing.T) { + p := newParticipantForTest("test") + + // 1Mbps, 15fps, expected = 1280x720, actual = 640 x 480 + p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(100000, 15, &buffer.ConnectionQualityParams{}, + "", 720, 1280, 480, 640) + + require.Equal(t, livekit.ConnectionQuality_GOOD, p.GetConnectionQuality().GetQuality()) + }) + t.Run("video poor publishing", func(t *testing.T) { + p := newParticipantForTest("test") + + // 20kbps, 8fps, expected = 1280x720, actual = 640 x 480 + p.UpTrackManager.publishedTracks["video"] = testPublishedVideoTrack(2500, 8, &buffer.ConnectionQualityParams{}, + "", 720, 1280, 426, 240) + + require.Equal(t, livekit.ConnectionQuality_POOR, p.GetConnectionQuality().GetQuality()) + }) } func TestSubscriberAsPrimary(t *testing.T) { diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index 417dcfd19..7f9fc43d0 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -11,6 +11,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/utils" ) const ( @@ -53,7 +54,7 @@ func (t *SubscribedTrack) OnBind(f func()) { func (t *SubscribedTrack) Bound() { if !t.params.AdaptiveStream { - t.params.DownTrack.SetMaxSpatialLayer(SpatialLayerForQuality(livekit.VideoQuality_HIGH)) + t.params.DownTrack.SetMaxSpatialLayer(utils.SpatialLayerForQuality(livekit.VideoQuality_HIGH)) } if t.onBind != nil { t.onBind() @@ -124,7 +125,7 @@ func (t *SubscribedTrack) UpdateVideoLayer() { if settings.Width > 0 { quality = t.MediaTrack().GetQualityForDimension(settings.Width, settings.Height) } - t.DownTrack().SetMaxSpatialLayer(SpatialLayerForQuality(quality)) + t.DownTrack().SetMaxSpatialLayer(utils.SpatialLayerForQuality(quality)) } func (t *SubscribedTrack) updateDownTrackMute() { diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index 520440d19..a9f612a54 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -1,6 +1,8 @@ package connectionquality import ( + "math" + "sort" "sync" "time" @@ -11,6 +13,7 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/livekit-server/pkg/utils" ) const ( @@ -20,10 +23,14 @@ const ( type ConnectionStatsParams struct { UpdateInterval time.Duration CodecType webrtc.RTPCodecType + CodecName string + DtxDisabled bool MimeType string GetDeltaStats func() map[uint32]*buffer.StreamStatsWithLayers GetQualityParams func() *buffer.ConnectionQualityParams GetIsReducedQuality func() bool + GetLayerDimension func(int32) (uint32, uint32) + GetMaxExpectedLayer func() livekit.VideoLayer Logger logger.Logger } @@ -42,7 +49,7 @@ type ConnectionStats struct { func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats { return &ConnectionStats{ params: params, - score: 4.0, + score: 5.0, done: make(chan struct{}), } } @@ -70,29 +77,67 @@ func (cs *ConnectionStats) GetScore() float32 { return cs.score } -func (cs *ConnectionStats) updateScore() float32 { +func (cs *ConnectionStats) updateScore(streams []*livekit.AnalyticsStream, iteration uint64) float32 { cs.lock.Lock() defer cs.lock.Unlock() - s := cs.params.GetQualityParams() - if s == nil { + // Initial interval will have partial data + if iteration < 2 { + cs.score = 5 return cs.score } - if cs.params.CodecType == webrtc.RTPCodecTypeAudio { - cs.score = AudioConnectionScore(s.LossPercentage, s.Rtt, s.Jitter) + s := cs.params.GetQualityParams() + var qualityParam buffer.ConnectionQualityParams + if s == nil { + return cs.score } else { - isReducedQuality := false - if cs.params.GetIsReducedQuality != nil { - isReducedQuality = cs.params.GetIsReducedQuality() + qualityParam = *s + if math.IsInf(float64(qualityParam.Jitter), 0) { + qualityParam.Jitter = 0 } - cs.score = VideoConnectionScore(s.LossPercentage, isReducedQuality) + if math.IsInf(float64(qualityParam.LossPercentage), 0) { + qualityParam.LossPercentage = 0 + } + } + + interval := cs.params.UpdateInterval + if interval == 0 { + interval = UpdateInterval + } + if cs.params.CodecType == webrtc.RTPCodecTypeAudio { + totalBytes, _, _ := cs.getBytesFramesFromStreams(streams) + cs.score = AudioConnectionScore(interval, int64(totalBytes), s, cs.params.DtxDisabled) + } else { + // get tracks expected max layer and dimensions + expectedLayer := cs.params.GetMaxExpectedLayer() + if utils.SpatialLayerForQuality(expectedLayer.Quality) == buffer.InvalidLayerSpatial { + return cs.score + } + + // get bytes/frames and max later from actual stream stats + totalBytes, totalFrames, maxLayer := cs.getBytesFramesFromStreams(streams) + var actualHeight uint32 + var actualWidth uint32 + // if data present, but maxLayer == -1 no layer info available, set actual to expected, else fetch + if maxLayer == buffer.InvalidLayerSpatial && totalBytes > 0 { + actualHeight = expectedLayer.Height + actualWidth = expectedLayer.Width + } else { + actualWidth, actualHeight = cs.params.GetLayerDimension(maxLayer) + } + + cs.score = VideoConnectionScore(interval, int64(totalBytes), int64(totalFrames), &qualityParam, cs.params.CodecName, + int32(expectedLayer.Height), int32(expectedLayer.Width), int32(actualHeight), int32(actualWidth)) + //logger.Infow("VideoScoreScore", "score", cs.score, "expectedLayer", expectedLayer.Quality, "maxLayer", maxLayer, + // "expectedWidth", expectedLayer.Width, "actualWidth", actualWidth, "expectedHeight", expectedLayer.Height, "actualHeight", actualHeight) + } return cs.score } -func (cs *ConnectionStats) getStat() *livekit.AnalyticsStat { +func (cs *ConnectionStats) getStat(iteration uint64) *livekit.AnalyticsStat { if cs.params.GetDeltaStats == nil { return nil } @@ -120,7 +165,7 @@ func (cs *ConnectionStats) getStat() *livekit.AnalyticsStat { analyticsStreams = append(analyticsStreams, as) } - score := cs.updateScore() + score := cs.updateScore(analyticsStreams, iteration) return &livekit.AnalyticsStat{ Score: score, @@ -138,13 +183,16 @@ func (cs *ConnectionStats) updateStatsWorker() { tk := time.NewTicker(interval) defer tk.Stop() + // Delay sending scores until 2nd cycle, as 1st will be partial. + counter := uint64(0) + for { select { case <-cs.done: return case <-tk.C: - stat := cs.getStat() + stat := cs.getStat(counter) if stat == nil { continue } @@ -152,6 +200,9 @@ func (cs *ConnectionStats) updateStatsWorker() { if cs.onStatsUpdate != nil { cs.onStatsUpdate(cs, stat) } + + counter++ + } } } @@ -183,3 +234,41 @@ func ToAnalyticsVideoLayer(layer int, layerStats *buffer.LayerStats) *livekit.An Frames: layerStats.Frames, } } + +func (cs *ConnectionStats) getBytesFramesFromStreams(streams []*livekit.AnalyticsStream) (totalBytes uint64, totalFrames uint32, maxLayer int32) { + layerStats := make(map[int32]buffer.LayerStats) + hasLayers := false + maxLayer = buffer.InvalidLayerSpatial + for _, stream := range streams { + // get frames/bytes/packets from video layers if available. Store per layer in layerStats map + if len(stream.VideoLayers) > 0 { + hasLayers = true + + layers := stream.VideoLayers + // find max quality 0(LOW), 1(MED), 2(HIGH) . sort on layer.Layer desc + sort.Slice(layers, func(i, j int) bool { + return layers[i].Layer > layers[j].Layer + }) + + layerStats[layers[0].Layer] = buffer.LayerStats{ + Bytes: layers[0].GetBytes(), + Frames: layers[0].GetFrames(), + } + if layers[0].Layer > maxLayer { + maxLayer = layers[0].Layer + } + } else { + totalFrames += stream.GetFrames() + totalBytes += stream.GetPrimaryBytes() + } + } + if hasLayers { + if stats, ok := layerStats[maxLayer]; ok { + return stats.Bytes, stats.Frames, maxLayer + } else { + return 0, 0, buffer.InvalidLayerSpatial + } + } + return totalBytes, totalFrames, maxLayer + +} diff --git a/pkg/sfu/connectionquality/mos.go b/pkg/sfu/connectionquality/mos.go index c2b02ea3f..c4714093a 100644 --- a/pkg/sfu/connectionquality/mos.go +++ b/pkg/sfu/connectionquality/mos.go @@ -1,76 +1,78 @@ package connectionquality import ( + "time" + + "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/protocol/livekit" -) - -// MOS score calculation is based on webrtc-stats -// available @ https://github.com/oanguenot/webrtc-stats - -const ( - defaultRtt = uint32(70) + "github.com/livekit/rtcscore-go/pkg/rtcmos" ) func Score2Rating(score float32) livekit.ConnectionQuality { - if score > 3.9 { + if score > 4.0 { return livekit.ConnectionQuality_EXCELLENT } - if score > 2.5 { + if score > 3.0 { return livekit.ConnectionQuality_GOOD } return livekit.ConnectionQuality_POOR } -func mosAudioEModel(pctLoss float32, rtt uint32, jitter float32) float32 { - rx := 93.2 - pctLoss - ry := 0.18*rx*rx - 27.9*rx + 1126.62 - - if rtt == 0 { - rtt = defaultRtt - } - // Jitter is in Milliseconds - d := float32(rtt) + jitter - h := d - 177.3 - if h < 0 { - h = 0 - } else { - h = 1 - } - id := 0.024*d + 0.11*(d-177.3)*h - r := ry - (id) - if r < 0 { - return 1 - } - if r > 100 { - return 4.5 - } - score := 1 + (0.035 * r) + (7.0/1000000)*r*(r-60)*(100-r) - - return score +func getBitRate(interval float64, totalBytes int64) int32 { + return int32(float64(totalBytes*8) / interval) } -func loss2Score(pctLoss float32, reducedQuality bool) float32 { - // No Loss, excellent - if pctLoss == 0.0 && !reducedQuality { - return 5.0 - } - // default when loss is minimal, but reducedQuality - score := float32(3.5) - // loss is bad - if pctLoss >= 4.0 { - score = 2.0 - } else if pctLoss <= 2.0 && !reducedQuality { - // loss is acceptable and at reduced quality - score = 4.5 - } - return score +func getFrameRate(interval float64, totalFrames int64) int32 { + return int32(float64(totalFrames) / interval) } -func AudioConnectionScore(pctLoss float32, rtt uint32, jitter float32) float32 { - return mosAudioEModel(pctLoss, rtt, jitter) +func int32Ptr(x int32) *int32 { + return &x } -func VideoConnectionScore(pctLoss float32, reducedQuality bool) float32 { - return loss2Score(pctLoss, reducedQuality) +func AudioConnectionScore(interval time.Duration, totalBytes int64, + qualityParam *buffer.ConnectionQualityParams, dtxDisabled bool) float32 { + + stat := rtcmos.Stat{ + Bitrate: getBitRate(interval.Seconds(), totalBytes), + PacketLoss: qualityParam.LossPercentage, + RoundTripTime: int32Ptr(int32(qualityParam.Rtt)), + BufferDelay: int32Ptr(int32(qualityParam.Jitter)), + AudioConfig: &rtcmos.AudioConfig{}, + } + + if dtxDisabled { + flag := false + stat.AudioConfig.Dtx = &flag + } + + scores := rtcmos.Score([]rtcmos.Stat{stat}) + if len(scores) == 1 { + return float32(scores[0].AudioScore) + } + return 0 +} + +func VideoConnectionScore(interval time.Duration, totalBytes int64, totalFrames int64, qualityParam *buffer.ConnectionQualityParams, + codec string, expectedHeight int32, expectedWidth int32, actualHeight int32, actualWidth int32) float32 { + stat := rtcmos.Stat{ + Bitrate: getBitRate(interval.Seconds(), totalBytes), + PacketLoss: qualityParam.LossPercentage, + RoundTripTime: int32Ptr(int32(qualityParam.Rtt)), + BufferDelay: int32Ptr(int32(qualityParam.Jitter)), + VideoConfig: &rtcmos.VideoConfig{ + FrameRate: int32Ptr(getFrameRate(interval.Seconds(), totalFrames)), + Codec: codec, + ExpectedHeight: &expectedHeight, + ExpectedWidth: &expectedWidth, + Height: &actualHeight, + Width: &actualWidth, + }, + } + scores := rtcmos.Score([]rtcmos.Stat{stat}) + if len(scores) == 1 { + return float32(scores[0].VideoScore) + } + return 0 } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index e4cdee0e3..b89c553ca 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -220,7 +220,19 @@ func NewDownTrack( GetIsReducedQuality: func() bool { return d.GetForwardingStatus() != ForwardingStatusOptimal }, - Logger: d.logger, + GetLayerDimension: func(quality int32) (uint32, uint32) { + if d.receiver != nil { + return d.receiver.GetLayerDimension(quality) + } + return 0, 0 + }, + GetMaxExpectedLayer: func() livekit.VideoLayer { + quality := d.forwarder.MaxLayers().Spatial + width, height := d.receiver.GetLayerDimension(quality) + return livekit.VideoLayer{Quality: livekit.VideoQuality(quality), Width: width, Height: height} + }, + Logger: d.logger, + CodecName: getCodecNameFromMime(codecs[0].MimeType), }) d.connectionStats.OnStatsUpdate(func(_cs *connectionquality.ConnectionStats, stat *livekit.AnalyticsStat) { if d.onStatsUpdate != nil { diff --git a/pkg/sfu/helpers.go b/pkg/sfu/helpers.go index cc54c824e..d34332ac6 100644 --- a/pkg/sfu/helpers.go +++ b/pkg/sfu/helpers.go @@ -55,3 +55,12 @@ func getRttMs(report *rtcp.ReceptionReport) uint32 { ntpDiff := now - report.LastSenderReport - report.Delay return uint32(math.Ceil(float64(ntpDiff) * 1000.0 / 65536.0)) } + +func getCodecNameFromMime(mime string) string { + codecName := "" + codecParsed := strings.Split(strings.ToLower(mime), "/") + if len(codecParsed) > 1 { + codecName = codecParsed[1] + } + return codecName +} diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 9617203a8..e9dadd1a4 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -12,6 +12,7 @@ import ( "github.com/pion/webrtc/v3" "go.uber.org/atomic" + "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -51,6 +52,8 @@ type TrackReceiver interface { DeleteDownTrack(peerID livekit.ParticipantID) DebugInfo() map[string]interface{} + + GetLayerDimension(quality int32) (uint32, uint32) } // WebRTCReceiver receives a media track @@ -72,6 +75,7 @@ type WebRTCReceiver struct { closeOnce sync.Once closed atomic.Bool useTrackers bool + TrackInfo *livekit.TrackInfo rtcpCh chan []rtcp.Packet @@ -160,7 +164,7 @@ func NewWebRTCReceiver( receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, pid livekit.ParticipantID, - source livekit.TrackSource, + trackInfo *livekit.TrackInfo, logger logger.Logger, twcc *twcc.Responder, opts ...ReceiverOpts, @@ -176,7 +180,8 @@ func NewWebRTCReceiver( // LK-TODO: this should be based on VideoLayers protocol message rather than RID based isSimulcast: len(track.RID()) > 0, twcc: twcc, - streamTrackerManager: NewStreamTrackerManager(logger, source), + streamTrackerManager: NewStreamTrackerManager(logger, trackInfo.Source), + TrackInfo: trackInfo, isSVC: IsSvcCodec(track.Codec().MimeType), } @@ -199,7 +204,36 @@ func NewWebRTCReceiver( GetIsReducedQuality: func() bool { return w.streamTrackerManager.IsReducedQuality() }, - Logger: w.logger, + GetLayerDimension: func(quality int32) (uint32, uint32) { + return w.GetLayerDimension(quality) + }, + GetMaxExpectedLayer: func() livekit.VideoLayer { + var expectedLayer livekit.VideoLayer + var maxPublishedLayer livekit.VideoLayer + // find min of layer + expectedQuality := w.streamTrackerManager.GetMaxExpectedLayer() + maxPublishedQuality := InvalidLayerSpatial + if w.TrackInfo != nil { + for _, layer := range w.TrackInfo.Layers { + if layer.Quality == livekit.VideoQuality_OFF { + continue + } + if expectedQuality == utils.SpatialLayerForQuality(layer.Quality) { + expectedLayer = *layer + } + if utils.SpatialLayerForQuality(layer.Quality) > maxPublishedQuality { + maxPublishedQuality = int32(layer.Quality) + maxPublishedLayer = *layer + } + } + } + if expectedQuality < maxPublishedQuality { + return expectedLayer + } + return maxPublishedLayer + }, + Logger: w.logger, + CodecName: getCodecNameFromMime(w.codec.MimeType), }) w.connectionStats.OnStatsUpdate(func(_cs *connectionquality.ConnectionStats, stat *livekit.AnalyticsStat) { if w.onStatsUpdate != nil { @@ -211,6 +245,19 @@ func NewWebRTCReceiver( return w } +func (w *WebRTCReceiver) GetLayerDimension(quality int32) (uint32, uint32) { + height := uint32(0) + width := uint32(0) + for _, layer := range w.TrackInfo.Layers { + if layer.Quality == livekit.VideoQuality(quality) { + height = layer.Height + width = layer.Width + break + } + } + return width, height +} + func (w *WebRTCReceiver) OnStatsUpdate(fn func(w *WebRTCReceiver, stat *livekit.AnalyticsStat)) { w.onStatsUpdate = fn } diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 6d22983f2..0a83ec818 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -239,6 +239,12 @@ func (s *StreamTrackerManager) IsReducedQuality() bool { return int32(len(s.availableLayers)) < (s.maxExpectedLayer + 1) } +func (s *StreamTrackerManager) GetMaxExpectedLayer() int32 { + s.lock.RLock() + defer s.lock.RUnlock() + return s.maxExpectedLayer +} + func (s *StreamTrackerManager) GetBitrateTemporalCumulative() Bitrates { s.lock.RLock() defer s.lock.RUnlock() diff --git a/pkg/utils/helpers.go b/pkg/utils/helpers.go new file mode 100644 index 000000000..c8403c5be --- /dev/null +++ b/pkg/utils/helpers.go @@ -0,0 +1,18 @@ +package utils + +import "github.com/livekit/protocol/livekit" + +func SpatialLayerForQuality(quality livekit.VideoQuality) int32 { + switch quality { + case livekit.VideoQuality_LOW: + return 0 + case livekit.VideoQuality_MEDIUM: + return 1 + case livekit.VideoQuality_HIGH: + return 2 + case livekit.VideoQuality_OFF: + return -1 + default: + return -1 + } +}