Support simulcasting of audio (#3920)

* WIP

* WIP

* new files

* clean up

* test

* Deps

* clean up

* clean up

* goimports latest
This commit is contained in:
Raja Subramanian
2025-09-12 10:20:04 +05:30
committed by GitHub
parent f4a06cf025
commit 798fa76110
20 changed files with 1319 additions and 385 deletions
+1 -1
View File
@@ -45,7 +45,7 @@ jobs:
go get github.com/sasha-s/go-deadlock
grep -rl sync.Mutex ./pkg | xargs sed -i 's/sync\.Mutex/deadlock\.Mutex/g'
grep -rl sync.RWMutex ./pkg | xargs sed -i 's/sync\.RWMutex/deadlock\.RWMutex/g'
go install golang.org/x/tools/cmd/goimports
go install golang.org/x/tools/cmd/goimports@latest
grep -rl deadlock.Mutex ./pkg | xargs goimports -w
grep -rl deadlock.RWMutex ./pkg | xargs goimports -w
go mod tidy
+17 -16
View File
@@ -23,7 +23,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade
github.com/livekit/protocol v1.41.1-0.20250911034601-84953643320b
github.com/livekit/protocol v1.41.1-0.20250911214555-7e8f7f1f9435
github.com/livekit/psrpc v0.7.0
github.com/mackerelio/go-osstat v0.2.6
github.com/magefile/mage v1.15.0
@@ -36,7 +36,7 @@ require (
github.com/pion/ice/v4 v4.0.10
github.com/pion/interceptor v0.1.40
github.com/pion/rtcp v1.2.15
github.com/pion/rtp v1.8.21
github.com/pion/rtp v1.8.22
github.com/pion/sctp v1.8.39
github.com/pion/sdp/v3 v3.0.16
github.com/pion/transport/v3 v3.0.7
@@ -44,7 +44,7 @@ require (
github.com/pion/webrtc/v4 v4.1.5-0.20250828044558-c376d0edf977
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.0
github.com/redis/go-redis/v9 v9.12.1
github.com/redis/go-redis/v9 v9.14.0
github.com/rs/cors v1.11.1
github.com/stretchr/testify v1.11.1
github.com/thoas/go-funk v0.9.3
@@ -55,10 +55,10 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b
golang.org/x/mod v0.27.0
golang.org/x/sync v0.16.0
google.golang.org/protobuf v1.36.8
golang.org/x/exp v0.0.0-20250911091902-df9299821621
golang.org/x/mod v0.28.0
golang.org/x/sync v0.17.0
google.golang.org/protobuf v1.36.9
gopkg.in/yaml.v3 v3.0.1
)
@@ -66,6 +66,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.1.0 // indirect
github.com/moby/sys/user v0.3.0 // indirect
github.com/nyaruka/phonenumbers v1.6.5 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel v1.37.0 // indirect
go.opentelemetry.io/otel/metric v1.37.0 // indirect
@@ -73,7 +74,7 @@ require (
)
require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1 // indirect
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250717185734-6c6e0d3c608e.1 // indirect
buf.build/go/protovalidate v0.14.0 // indirect
buf.build/go/protoyaml v0.6.0 // indirect
cel.dev/expr v0.24.0 // indirect
@@ -139,13 +140,13 @@ require (
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/zap/exp v0.3.0 // indirect
golang.org/x/crypto v0.41.0 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.28.0 // indirect
golang.org/x/tools v0.36.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250826171959-ef028d996bc1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1 // indirect
google.golang.org/grpc v1.75.0 // indirect
golang.org/x/crypto v0.42.0 // indirect
golang.org/x/net v0.44.0 // indirect
golang.org/x/sys v0.36.0 // indirect
golang.org/x/text v0.29.0 // indirect
golang.org/x/tools v0.37.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 // indirect
google.golang.org/grpc v1.75.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+34 -32
View File
@@ -1,5 +1,5 @@
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1 h1:sjY1k5uszbIZfv11HO2keV4SLhNA47SabPO886v7Rvo=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1/go.mod h1:8EQ5GzyGJQ5tEIwMSxCl8RKJYsjCpAwkdcENoioXT6g=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250717185734-6c6e0d3c608e.1 h1:u98oQG8CHYBrOWrYdqbyNpKz4Pw02ssv03DsTInnXn8=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250717185734-6c6e0d3c608e.1/go.mod h1:aY3zbkNan5F+cGm9lITDP6oxJIwu0dn9KjJuJjWaHkg=
buf.build/go/protovalidate v0.14.0 h1:kr/rC/no+DtRyYX+8KXLDxNnI1rINz0imk5K44ZpZ3A=
buf.build/go/protovalidate v0.14.0/go.mod h1:+F/oISho9MO7gJQNYC2VWLzcO1fTPmaTA08SDYJZncA=
buf.build/go/protoyaml v0.6.0 h1:Nzz1lvcXF8YgNZXk+voPPwdU8FjDPTUV4ndNTXN0n2w=
@@ -171,8 +171,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade h1:lpxPcglwzUWNB4J0S2qZuyMehzmR7vW9whzSwV4IGoI=
github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
github.com/livekit/protocol v1.41.1-0.20250911034601-84953643320b h1:xa2nqUsAnDip5YfzaEhdGqfsyHlJJfhxs2MTw0DJst0=
github.com/livekit/protocol v1.41.1-0.20250911034601-84953643320b/go.mod h1:sZNL/CLRtAEy4AcuUsab/pNSQ/StM0KstU5foHMk1jM=
github.com/livekit/protocol v1.41.1-0.20250911214555-7e8f7f1f9435 h1:Kb7ezfpKQsL9c4xZmME40or6nnbpemmsr/VEviZzbKQ=
github.com/livekit/protocol v1.41.1-0.20250911214555-7e8f7f1f9435/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A=
github.com/livekit/psrpc v0.7.0 h1:rtfqfjYN06WJYloE/S0nmkJ/Y04x4pxLQLe8kQ4FVHU=
github.com/livekit/psrpc v0.7.0/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk=
github.com/mackerelio/go-osstat v0.2.6 h1:gs4U8BZeS1tjrL08tt5VUliVvSWP26Ai2Ob8Lr7f2i0=
@@ -222,6 +222,8 @@ github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nyaruka/phonenumbers v1.6.5 h1:aBCaUhfpRA7hU6fsXk+p7KF1aNx4nQlq9hGeo2qdFg8=
github.com/nyaruka/phonenumbers v1.6.5/go.mod h1:7gjs+Lchqm49adhAKB5cdcng5ZXgt6x7Jgvi0ZorUtU=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y=
@@ -250,8 +252,8 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo=
github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0=
github.com/pion/rtp v1.8.21 h1:3yrOwmZFyUpcIosNcWRpQaU+UXIJ6yxLuJ8Bx0mw37Y=
github.com/pion/rtp v1.8.21/go.mod h1:bAu2UFKScgzyFqvUKmbvzSdPr+NGbZtv6UB2hesqXBk=
github.com/pion/rtp v1.8.22 h1:8NCVDDF+uSJmMUkjLJVnIr/HX7gPesyMV1xFt5xozXc=
github.com/pion/rtp v1.8.22/go.mod h1:rF5nS1GqbR7H/TCpKwylzeq6yDM+MM6k+On5EgeThEM=
github.com/pion/sctp v1.8.39 h1:PJma40vRHa3UTO3C4MyeJDQ+KIobVYRZQZ0Nt7SjQnE=
github.com/pion/sctp v1.8.39/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
github.com/pion/sdp/v3 v3.0.16 h1:0dKzYO6gTAvuLaAKQkC02eCPjMIi4NuAr/ibAwrGDCo=
@@ -280,8 +282,8 @@ github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzM
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/puzpuzpuz/xsync/v3 v3.5.1 h1:GJYJZwO6IdxN/IKbneznS6yPkVC+c3zyY/j19c++5Fg=
github.com/puzpuzpuz/xsync/v3 v3.5.1/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
github.com/redis/go-redis/v9 v9.12.1 h1:k5iquqv27aBtnTm2tIkROUDp8JBXhXZIVu1InSgvovg=
github.com/redis/go-redis/v9 v9.12.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE=
github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
@@ -363,18 +365,18 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/exp v0.0.0-20250911091902-df9299821621 h1:2id6c1/gto0kaHYyrixvknJ8tUK/Qs5IsmBtrc+FtgU=
golang.org/x/exp v0.0.0-20250911091902-df9299821621/go.mod h1:TwQYMMnGpvZyc+JpB/UAuTNIsVJifOlSkrZkhcvpVUk=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ=
golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc=
golang.org/x/mod v0.28.0 h1:gQBtGhjxykdjY9YhZpSlZIsbnaE2+PgjfLWUQTnoZ1U=
golang.org/x/mod v0.28.0/go.mod h1:yfB/L0NOf/kmEbXjzCPOx1iK1fRutOydrCMsqRhEBxI=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -397,8 +399,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -407,8 +409,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190411185658-b44545bcd369/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -442,8 +444,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@@ -459,8 +461,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
@@ -469,22 +471,22 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps=
golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
golang.org/x/tools v0.37.0 h1:DVSRzp7FwePZW356yEAChSdNcQo6Nsp+fex1SUW09lE=
golang.org/x/tools v0.37.0/go.mod h1:MBN5QPQtLMHVdvsbtarmTNukZDdgwdwlO5qGacAzF0w=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/api v0.0.0-20250826171959-ef028d996bc1 h1:APHvLLYBhtZvsbnpkfknDZ7NyH4z5+ub/I0u8L3Oz6g=
google.golang.org/genproto/googleapis/api v0.0.0-20250826171959-ef028d996bc1/go.mod h1:xUjFWUnWDpZ/C0Gu0qloASKFb6f8/QXiiXhSPFsD668=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1 h1:pmJpJEvT846VzausCQ5d7KreSROcDqmO388w5YbnltA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og=
google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4=
google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 h1:d8Nakh1G+ur7+P3GcMjpRDEkoLUcLW2iU92XVqR+XMQ=
google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090/go.mod h1:U8EXRNSd8sUYyDfs/It7KVWodQr+Hf9xtxyxWudSwEw=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 h1:/OQuEa4YWtDt7uQWHd3q3sUMb+QOLQUg1xa8CEsRv5w=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og=
google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI=
google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+231 -91
View File
@@ -27,16 +27,37 @@ import (
"github.com/livekit/protocol/livekit"
)
func TestSubscribedMaxQuality(t *testing.T) {
type testDynacastManagerListener struct {
onSubscribedMaxQualityChange func(subscribedQualties []*livekit.SubscribedCodec)
onSubscribedAudioCodecChange func(subscribedCodecs []*livekit.SubscribedAudioCodec)
}
func (t *testDynacastManagerListener) OnDynacastSubscribedMaxQualityChange(
subscribedQualities []*livekit.SubscribedCodec,
_maxSubscribedQualities []types.SubscribedCodecQuality,
) {
t.onSubscribedMaxQualityChange(subscribedQualities)
}
func (t *testDynacastManagerListener) OnDynacastSubscribedAudioCodecChange(
codecs []*livekit.SubscribedAudioCodec,
) {
t.onSubscribedAudioCodecChange(codecs)
}
func TestSubscribedMaxQuality(t *testing.T) {
t.Run("subscribers muted", func(t *testing.T) {
dm := NewDynacastManager(DynacastManagerParams{})
var lock sync.Mutex
actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0)
dm.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) {
lock.Lock()
actualSubscribedQualities = subscribedQualities
lock.Unlock()
dm := NewDynacastManagerVideo(DynacastManagerVideoParams{
Listener: &testDynacastManagerListener{
onSubscribedMaxQualityChange: func(subscribedQualities []*livekit.SubscribedCodec) {
lock.Lock()
actualSubscribedQualities = subscribedQualities
lock.Unlock()
},
},
})
dm.NotifySubscriberMaxQuality("s1", mime.MimeTypeVP8, livekit.VideoQuality_HIGH)
@@ -72,21 +93,20 @@ func TestSubscribedMaxQuality(t *testing.T) {
})
t.Run("subscribers max quality", func(t *testing.T) {
dm := NewDynacastManager(DynacastManagerParams{
DynacastPauseDelay: 100 * time.Millisecond,
})
lock := sync.RWMutex{}
lock.Lock()
actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0)
lock.Unlock()
dm.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) {
lock.Lock()
actualSubscribedQualities = subscribedQualities
lock.Unlock()
dm := NewDynacastManagerVideo(DynacastManagerVideoParams{
Listener: &testDynacastManagerListener{
onSubscribedMaxQualityChange: func(subscribedQualities []*livekit.SubscribedCodec) {
lock.Lock()
actualSubscribedQualities = subscribedQualities
lock.Unlock()
},
},
})
dm.maxSubscribedQuality = map[mime.MimeType]livekit.VideoQuality{
dm.(*dynacastManagerVideo).maxSubscribedQuality = map[mime.MimeType]livekit.VideoQuality{
mime.MimeTypeVP8: livekit.VideoQuality_LOW,
mime.MimeTypeAV1: livekit.VideoQuality_LOW,
}
@@ -279,91 +299,202 @@ func TestSubscribedMaxQuality(t *testing.T) {
}
func TestCodecRegression(t *testing.T) {
dm := NewDynacastManager(DynacastManagerParams{})
var lock sync.Mutex
actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0)
dm.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) {
lock.Lock()
actualSubscribedQualities = subscribedQualities
lock.Unlock()
t.Run("codec regression video", func(t *testing.T) {
var lock sync.Mutex
actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0)
dm := NewDynacastManagerVideo(DynacastManagerVideoParams{
Listener: &testDynacastManagerListener{
onSubscribedMaxQualityChange: func(subscribedQualities []*livekit.SubscribedCodec) {
lock.Lock()
actualSubscribedQualities = subscribedQualities
lock.Unlock()
},
},
})
dm.NotifySubscriberMaxQuality("s1", mime.MimeTypeAV1, livekit.VideoQuality_HIGH)
expectedSubscribedQualities := []*livekit.SubscribedCodec{
{
Codec: mime.MimeTypeAV1.String(),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: true},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
dm.HandleCodecRegression(mime.MimeTypeAV1, mime.MimeTypeVP8)
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: mime.MimeTypeAV1.String(),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
{
Codec: mime.MimeTypeVP8.String(),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: true},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
// av1 quality change should be forwarded to vp8
// av1 quality change of node should be ignored
dm.NotifySubscriberMaxQuality("s1", mime.MimeTypeAV1, livekit.VideoQuality_MEDIUM)
dm.NotifySubscriberNodeMaxQuality("n1", []types.SubscribedCodecQuality{
{CodecMime: mime.MimeTypeAV1, Quality: livekit.VideoQuality_HIGH},
})
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: mime.MimeTypeAV1.String(),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
{
Codec: mime.MimeTypeVP8.String(),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
})
dm.NotifySubscriberMaxQuality("s1", mime.MimeTypeAV1, livekit.VideoQuality_HIGH)
t.Run("codec regression audio", func(t *testing.T) {
var lock sync.Mutex
actualSubscribedCodecs := make([]*livekit.SubscribedAudioCodec, 0)
expectedSubscribedQualities := []*livekit.SubscribedCodec{
{
Codec: mime.MimeTypeAV1.String(),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: true},
dm := NewDynacastManagerAudio(DynacastManagerAudioParams{
Listener: &testDynacastManagerListener{
onSubscribedAudioCodecChange: func(subscribedCodecs []*livekit.SubscribedAudioCodec) {
lock.Lock()
actualSubscribedCodecs = subscribedCodecs
lock.Unlock()
},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
})
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
dm.NotifySubscription("s1", mime.MimeTypeRED, true)
dm.HandleCodecRegression(mime.MimeTypeAV1, mime.MimeTypeVP8)
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: mime.MimeTypeAV1.String(),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
expectedSubscribedCodecs := []*livekit.SubscribedAudioCodec{
{
Codec: mime.MimeTypeRED.String(),
Enabled: true,
},
},
{
Codec: mime.MimeTypeVP8.String(),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: true},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedAudioCodecsAsString(expectedSubscribedCodecs) == subscribedAudioCodecsAsString(actualSubscribedCodecs)
}, 10*time.Second, 100*time.Millisecond)
dm.HandleCodecRegression(mime.MimeTypeRED, mime.MimeTypeOpus)
expectedSubscribedCodecs = []*livekit.SubscribedAudioCodec{
{
Codec: mime.MimeTypeRED.String(),
Enabled: false,
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
{
Codec: mime.MimeTypeOpus.String(),
Enabled: true,
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
return subscribedAudioCodecsAsString(expectedSubscribedCodecs) == subscribedAudioCodecsAsString(actualSubscribedCodecs)
}, 10*time.Second, 100*time.Millisecond)
// RED disable as subscriber or subscriber node should be ignored as it has been regressed
dm.NotifySubscription("s1", mime.MimeTypeRED, false)
dm.NotifySubscriptionNode("n1", []*livekit.SubscribedAudioCodec{
{Codec: mime.MimeTypeRED.String(), Enabled: false},
})
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedAudioCodecsAsString(expectedSubscribedCodecs) == subscribedAudioCodecsAsString(actualSubscribedCodecs)
}, 10*time.Second, 100*time.Millisecond)
// `s1` unsubscription should turn off `opus`
dm.NotifySubscription("s1", mime.MimeTypeOpus, false)
expectedSubscribedCodecs = []*livekit.SubscribedAudioCodec{
{
Codec: mime.MimeTypeRED.String(),
Enabled: false,
},
{
Codec: mime.MimeTypeOpus.String(),
Enabled: false,
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedAudioCodecsAsString(expectedSubscribedCodecs) == subscribedAudioCodecsAsString(actualSubscribedCodecs)
}, 10*time.Second, 100*time.Millisecond)
// a node subscription should turn `opus` back on
dm.NotifySubscriptionNode("n1", []*livekit.SubscribedAudioCodec{
{
Codec: mime.MimeTypeOpus.String(),
Enabled: true,
},
})
expectedSubscribedCodecs = []*livekit.SubscribedAudioCodec{
{
Codec: mime.MimeTypeRED.String(),
Enabled: false,
},
{
Codec: mime.MimeTypeOpus.String(),
Enabled: true,
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedAudioCodecsAsString(expectedSubscribedCodecs) == subscribedAudioCodecsAsString(actualSubscribedCodecs)
}, 10*time.Second, 100*time.Millisecond)
// av1 quality change should be forwarded to vp8
// av1 quality change of node should be ignored
dm.NotifySubscriberMaxQuality("s1", mime.MimeTypeAV1, livekit.VideoQuality_MEDIUM)
dm.NotifySubscriberNodeMaxQuality("n1", []types.SubscribedCodecQuality{
{CodecMime: mime.MimeTypeAV1, Quality: livekit.VideoQuality_HIGH},
})
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: mime.MimeTypeAV1.String(),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
{
Codec: mime.MimeTypeVP8.String(),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
},
},
}
require.Eventually(t, func() bool {
lock.Lock()
defer lock.Unlock()
return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities)
}, 10*time.Second, 100*time.Millisecond)
}
func subscribedCodecsAsString(c1 []*livekit.SubscribedCodec) string {
@@ -374,3 +505,12 @@ func subscribedCodecsAsString(c1 []*livekit.SubscribedCodec) string {
}
return s1
}
func subscribedAudioCodecsAsString(c1 []*livekit.SubscribedAudioCodec) string {
sort.Slice(c1, func(i, j int) bool { return c1[i].Codec < c1[j].Codec })
var s1 string
for _, c := range c1 {
s1 += c.String()
}
return s1
}
+198
View File
@@ -0,0 +1,198 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dynacast
import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu/mime"
)
var _ DynacastManager = (*dynacastManagerAudio)(nil)
var _ dynacastQualityListener = (*dynacastManagerAudio)(nil)
type DynacastManagerAudioParams struct {
Listener DynacastManagerListener
Logger logger.Logger
}
type dynacastManagerAudio struct {
params DynacastManagerAudioParams
subscribedCodecs map[mime.MimeType]bool
committedSubscribedCodecs map[mime.MimeType]bool
isClosed bool
*dynacastManagerBase
}
func NewDynacastManagerAudio(params DynacastManagerAudioParams) DynacastManager {
if params.Logger == nil {
params.Logger = logger.GetLogger()
}
d := &dynacastManagerAudio{
params: params,
subscribedCodecs: make(map[mime.MimeType]bool),
committedSubscribedCodecs: make(map[mime.MimeType]bool),
}
d.dynacastManagerBase = newDynacastManagerBase(dynacastManagerBaseParams{
Logger: params.Logger,
OpsQueueDepth: 4,
OnRestart: func() {
d.committedSubscribedCodecs = make(map[mime.MimeType]bool)
},
OnDynacastQualityCreate: func(mimeType mime.MimeType) dynacastQuality {
dq := newDynacastQualityAudio(dynacastQualityAudioParams{
MimeType: mimeType,
Listener: d,
Logger: d.params.Logger,
})
return dq
},
OnRegressCodec: func(fromMime, toMime mime.MimeType) {
d.subscribedCodecs[fromMime] = false
// if the new codec is not added, notify the publisher to start publishing
if _, ok := d.subscribedCodecs[toMime]; !ok {
d.subscribedCodecs[toMime] = true
}
},
OnUpdateNeeded: d.update,
})
return d
}
// It is possible for tracks to be in pending close state. When track
// is waiting to be closed, a node is not streaming a track. This can
// be used to force an update announcing that subscribed codec is disabled,
// i.e. indicating not pulling track any more.
func (d *dynacastManagerAudio) ForceEnable(enabled bool) {
d.lock.Lock()
defer d.lock.Unlock()
for mime := range d.committedSubscribedCodecs {
d.committedSubscribedCodecs[mime] = enabled
}
d.enqueueSubscribedChange()
}
func (d *dynacastManagerAudio) NotifySubscription(
subscriberID livekit.ParticipantID,
mime mime.MimeType,
enabled bool,
) {
dq := d.getOrCreateDynacastQuality(mime)
if dq != nil {
dq.NotifySubscription(subscriberID, enabled)
}
}
func (d *dynacastManagerAudio) NotifySubscriptionNode(
nodeID livekit.NodeID,
codecs []*livekit.SubscribedAudioCodec,
) {
for _, codec := range codecs {
dq := d.getOrCreateDynacastQuality(mime.NormalizeMimeType(codec.Codec))
if dq != nil {
dq.NotifySubscriptionNode(nodeID, codec.Enabled)
}
}
}
func (d *dynacastManagerAudio) OnUpdateAudioCodecForMime(mime mime.MimeType, enabled bool) {
d.lock.Lock()
if _, ok := d.regressedCodec[mime]; !ok {
d.subscribedCodecs[mime] = enabled
}
d.lock.Unlock()
d.update(false)
}
func (d *dynacastManagerAudio) update(force bool) {
d.lock.Lock()
d.params.Logger.Debugw(
"processing subscribed codec change",
"force", force,
"committedSubscribedCodecs", d.committedSubscribedCodecs,
"subscribedCodecs", d.subscribedCodecs,
)
if len(d.subscribedCodecs) == 0 {
// no mime has been added, nothing to update
d.lock.Unlock()
return
}
// add or remove of a mime triggers an update
changed := len(d.subscribedCodecs) != len(d.committedSubscribedCodecs)
if !changed {
for mime, enabled := range d.subscribedCodecs {
if ce, ok := d.committedSubscribedCodecs[mime]; ok {
if ce != enabled {
changed = true
break
}
}
}
}
if !force && !changed {
d.lock.Unlock()
return
}
d.params.Logger.Debugw(
"committing subscribed codec change",
"force", force,
"committedSubscribedCoecs", d.committedSubscribedCodecs,
"subscribedcodecs", d.subscribedCodecs,
)
// commit change
d.committedSubscribedCodecs = make(map[mime.MimeType]bool, len(d.subscribedCodecs))
for mime, enabled := range d.subscribedCodecs {
d.committedSubscribedCodecs[mime] = enabled
}
d.enqueueSubscribedChange()
d.lock.Unlock()
}
func (d *dynacastManagerAudio) enqueueSubscribedChange() {
if d.isClosed || d.params.Listener == nil {
return
}
subscribedCodecs := make([]*livekit.SubscribedAudioCodec, 0, len(d.committedSubscribedCodecs))
for mime, enabled := range d.committedSubscribedCodecs {
subscribedCodecs = append(subscribedCodecs, &livekit.SubscribedAudioCodec{
Codec: mime.String(),
Enabled: enabled,
})
}
d.params.Logger.Debugw(
"subscribedAudioCodecChange",
"subscribedCodecs", logger.ProtoSlice(subscribedCodecs),
)
d.notifyOpsQueue.Enqueue(func() {
d.params.Listener.OnDynacastSubscribedAudioCodecChange(subscribedCodecs)
})
}
+165
View File
@@ -0,0 +1,165 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dynacast
import (
"sync"
"golang.org/x/exp/maps"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu/mime"
"github.com/livekit/livekit-server/pkg/utils"
)
type dynacastManagerBaseParams struct {
Logger logger.Logger
OpsQueueDepth uint
OnRestart func()
OnDynacastQualityCreate func(mimeType mime.MimeType) dynacastQuality
OnRegressCodec func(fromMime, toMime mime.MimeType)
OnUpdateNeeded func(force bool)
}
type dynacastManagerBase struct {
params dynacastManagerBaseParams
lock sync.RWMutex
regressedCodec map[mime.MimeType]struct{}
dynacastQuality map[mime.MimeType]dynacastQuality
notifyOpsQueue *utils.OpsQueue
isClosed bool
dynacastManagerNull
dynacastQualityListenerNull
}
func newDynacastManagerBase(params dynacastManagerBaseParams) *dynacastManagerBase {
if params.OpsQueueDepth == 0 {
params.OpsQueueDepth = 4
}
d := &dynacastManagerBase{
params: params,
regressedCodec: make(map[mime.MimeType]struct{}),
dynacastQuality: make(map[mime.MimeType]dynacastQuality),
notifyOpsQueue: utils.NewOpsQueue(utils.OpsQueueParams{
Name: "dynacast-notify",
MinSize: params.OpsQueueDepth,
FlushOnStop: true,
Logger: params.Logger,
}),
}
d.notifyOpsQueue.Start()
return d
}
func (d *dynacastManagerBase) AddCodec(mime mime.MimeType) {
d.getOrCreateDynacastQuality(mime)
}
func (d *dynacastManagerBase) HandleCodecRegression(fromMime, toMime mime.MimeType) {
fromDq := d.getOrCreateDynacastQuality(fromMime)
d.lock.Lock()
if d.isClosed {
d.lock.Unlock()
return
}
if fromDq == nil {
// should not happen as we have added the codec on setup receiver
d.params.Logger.Warnw("regression from codec not found", nil, "mime", fromMime, "toMime", toMime)
d.lock.Unlock()
return
}
d.regressedCodec[fromMime] = struct{}{}
d.params.OnRegressCodec(fromMime, toMime)
d.lock.Unlock()
d.params.OnUpdateNeeded(false)
fromDq.Stop()
fromDq.RegressTo(d.getOrCreateDynacastQuality(toMime))
}
func (d *dynacastManagerBase) Restart() {
d.lock.Lock()
d.params.OnRestart()
dqs := d.getDynacastQualitiesLocked()
d.lock.Unlock()
for _, dq := range dqs {
dq.Restart()
}
}
func (d *dynacastManagerBase) Close() {
d.notifyOpsQueue.Stop()
d.lock.Lock()
dqs := d.getDynacastQualitiesLocked()
d.dynacastQuality = make(map[mime.MimeType]dynacastQuality)
d.isClosed = true
d.lock.Unlock()
for _, dq := range dqs {
dq.Stop()
}
}
// There are situations like track unmute or streaming from a different node
// where subscription changes needs to sent to the provider immediately.
// This bypasses any debouncing and forces a subscription change update
// with immediate effect.
func (d *dynacastManagerBase) ForceUpdate() {
d.params.OnUpdateNeeded(true)
}
func (d *dynacastManagerBase) ClearSubscriberNodes() {
d.lock.Lock()
dqs := d.getDynacastQualitiesLocked()
d.lock.Unlock()
for _, dq := range dqs {
dq.ClearSubscriberNodes()
}
}
func (d *dynacastManagerBase) getOrCreateDynacastQuality(mimeType mime.MimeType) dynacastQuality {
d.lock.Lock()
defer d.lock.Unlock()
if d.isClosed || mimeType == mime.MimeTypeUnknown {
return nil
}
if dq := d.dynacastQuality[mimeType]; dq != nil {
return dq
}
dq := d.params.OnDynacastQualityCreate(mimeType)
dq.Start()
d.dynacastQuality[mimeType] = dq
return dq
}
func (d *dynacastManagerBase) getDynacastQualitiesLocked() []dynacastQuality {
return maps.Values(d.dynacastQuality)
}
@@ -15,148 +15,84 @@
package dynacast
import (
"sync"
"time"
"github.com/bep/debounce"
"golang.org/x/exp/maps"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu/mime"
"github.com/livekit/livekit-server/pkg/utils"
)
type DynacastManagerParams struct {
var _ DynacastManager = (*dynacastManagerVideo)(nil)
var _ dynacastQualityListener = (*dynacastManagerVideo)(nil)
type DynacastManagerVideoParams struct {
DynacastPauseDelay time.Duration
Listener DynacastManagerListener
Logger logger.Logger
}
type DynacastManager struct {
params DynacastManagerParams
type dynacastManagerVideo struct {
params DynacastManagerVideoParams
lock sync.RWMutex
regressedCodec map[mime.MimeType]struct{}
dynacastQuality map[mime.MimeType]*DynacastQuality
maxSubscribedQuality map[mime.MimeType]livekit.VideoQuality
committedMaxSubscribedQuality map[mime.MimeType]livekit.VideoQuality
maxSubscribedQualityDebounce func(func())
maxSubscribedQualityDebouncePending bool
qualityNotifyOpQueue *utils.OpsQueue
isClosed bool
onSubscribedMaxQualityChange func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality)
*dynacastManagerBase
}
func NewDynacastManager(params DynacastManagerParams) *DynacastManager {
func NewDynacastManagerVideo(params DynacastManagerVideoParams) DynacastManager {
if params.Logger == nil {
params.Logger = logger.GetLogger()
}
d := &DynacastManager{
d := &dynacastManagerVideo{
params: params,
regressedCodec: make(map[mime.MimeType]struct{}),
dynacastQuality: make(map[mime.MimeType]*DynacastQuality),
maxSubscribedQuality: make(map[mime.MimeType]livekit.VideoQuality),
committedMaxSubscribedQuality: make(map[mime.MimeType]livekit.VideoQuality),
qualityNotifyOpQueue: utils.NewOpsQueue(utils.OpsQueueParams{
Name: "quality-notify",
MinSize: 64,
FlushOnStop: true,
Logger: params.Logger,
}),
}
if params.DynacastPauseDelay > 0 {
d.maxSubscribedQualityDebounce = debounce.New(params.DynacastPauseDelay)
}
d.qualityNotifyOpQueue.Start()
d.dynacastManagerBase = newDynacastManagerBase(dynacastManagerBaseParams{
Logger: params.Logger,
OpsQueueDepth: 64,
OnRestart: func() {
d.committedMaxSubscribedQuality = make(map[mime.MimeType]livekit.VideoQuality)
},
OnDynacastQualityCreate: func(mimeType mime.MimeType) dynacastQuality {
dq := newDynacastQualityVideo(dynacastQualityVideoParams{
MimeType: mimeType,
Listener: d,
Logger: d.params.Logger,
})
return dq
},
OnRegressCodec: func(fromMime, toMime mime.MimeType) {
d.maxSubscribedQuality[fromMime] = livekit.VideoQuality_OFF
// if the new codec is not added, notify the publisher to start publishing
if _, ok := d.maxSubscribedQuality[toMime]; !ok {
d.maxSubscribedQuality[toMime] = livekit.VideoQuality_HIGH
}
},
OnUpdateNeeded: d.update,
})
return d
}
func (d *DynacastManager) OnSubscribedMaxQualityChange(f func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality)) {
d.lock.Lock()
d.onSubscribedMaxQualityChange = f
d.lock.Unlock()
}
func (d *DynacastManager) AddCodec(mime mime.MimeType) {
d.getOrCreateDynacastQuality(mime)
}
func (d *DynacastManager) HandleCodecRegression(fromMime, toMime mime.MimeType) {
fromDq := d.getOrCreateDynacastQuality(fromMime)
d.lock.Lock()
if d.isClosed {
d.lock.Unlock()
return
}
if fromDq == nil {
// should not happen as we have added the codec on setup receiver
d.params.Logger.Warnw("regression from codec not found", nil, "mime", fromMime)
d.lock.Unlock()
return
}
d.regressedCodec[fromMime] = struct{}{}
d.maxSubscribedQuality[fromMime] = livekit.VideoQuality_OFF
// if the new codec is not added, notify the publisher to start publishing
if _, ok := d.maxSubscribedQuality[toMime]; !ok {
d.maxSubscribedQuality[toMime] = livekit.VideoQuality_HIGH
}
d.lock.Unlock()
d.update(false)
fromDq.Stop()
fromDq.RegressTo(d.getOrCreateDynacastQuality(toMime))
}
func (d *DynacastManager) Restart() {
d.lock.Lock()
d.committedMaxSubscribedQuality = make(map[mime.MimeType]livekit.VideoQuality)
dqs := d.getDynacastQualitiesLocked()
d.lock.Unlock()
for _, dq := range dqs {
dq.Restart()
}
}
func (d *DynacastManager) Close() {
d.qualityNotifyOpQueue.Stop()
d.lock.Lock()
dqs := d.getDynacastQualitiesLocked()
d.dynacastQuality = make(map[mime.MimeType]*DynacastQuality)
d.isClosed = true
d.lock.Unlock()
for _, dq := range dqs {
dq.Stop()
}
}
// THere are situations like track unmute or streaming from a different node
// where subscribed quality needs to sent to the provider immediately.
// This bypasses any debouncing and forces a subscribed quality update
// with immediate effect.
func (d *DynacastManager) ForceUpdate() {
d.update(true)
}
// It is possible for tracks to be in pending close state. When track
// is waiting to be closed, a node is not streaming a track. This can
// be used to force an update announcing that subscribed quality is OFF,
// i.e. indicating not pulling track any more.
func (d *DynacastManager) ForceQuality(quality livekit.VideoQuality) {
func (d *dynacastManagerVideo) ForceQuality(quality livekit.VideoQuality) {
d.lock.Lock()
defer d.lock.Unlock()
@@ -167,14 +103,21 @@ func (d *DynacastManager) ForceQuality(quality livekit.VideoQuality) {
d.enqueueSubscribedQualityChange()
}
func (d *DynacastManager) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, mime mime.MimeType, quality livekit.VideoQuality) {
func (d *dynacastManagerVideo) NotifySubscriberMaxQuality(
subscriberID livekit.ParticipantID,
mime mime.MimeType,
quality livekit.VideoQuality,
) {
dq := d.getOrCreateDynacastQuality(mime)
if dq != nil {
dq.NotifySubscriberMaxQuality(subscriberID, quality)
}
}
func (d *DynacastManager) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []types.SubscribedCodecQuality) {
func (d *dynacastManagerVideo) NotifySubscriberNodeMaxQuality(
nodeID livekit.NodeID,
qualities []types.SubscribedCodecQuality,
) {
for _, quality := range qualities {
dq := d.getOrCreateDynacastQuality(quality.CodecMime)
if dq != nil {
@@ -183,43 +126,10 @@ func (d *DynacastManager) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID,
}
}
func (d *DynacastManager) ClearSubscriberNodesMaxQuality() {
d.lock.Lock()
dqs := d.getDynacastQualitiesLocked()
d.lock.Unlock()
for _, dq := range dqs {
dq.ClearSubscriberNodesMaxQuality()
}
}
func (d *DynacastManager) getOrCreateDynacastQuality(mimeType mime.MimeType) *DynacastQuality {
d.lock.Lock()
defer d.lock.Unlock()
if d.isClosed || mimeType == mime.MimeTypeUnknown {
return nil
}
if dq := d.dynacastQuality[mimeType]; dq != nil {
return dq
}
dq := NewDynacastQuality(DynacastQualityParams{
MimeType: mimeType,
Logger: d.params.Logger,
})
dq.OnSubscribedMaxQualityChange(d.updateMaxQualityForMime)
dq.Start()
d.dynacastQuality[mimeType] = dq
return dq
}
func (d *DynacastManager) getDynacastQualitiesLocked() []*DynacastQuality {
return maps.Values(d.dynacastQuality)
}
func (d *DynacastManager) updateMaxQualityForMime(mime mime.MimeType, maxQuality livekit.VideoQuality) {
func (d *dynacastManagerVideo) OnUpdateMaxQualityForMime(
mime mime.MimeType,
maxQuality livekit.VideoQuality,
) {
d.lock.Lock()
if _, ok := d.regressedCodec[mime]; !ok {
d.maxSubscribedQuality[mime] = maxQuality
@@ -229,10 +139,11 @@ func (d *DynacastManager) updateMaxQualityForMime(mime mime.MimeType, maxQuality
d.update(false)
}
func (d *DynacastManager) update(force bool) {
func (d *dynacastManagerVideo) update(force bool) {
d.lock.Lock()
d.params.Logger.Debugw("processing quality change",
d.params.Logger.Debugw(
"processing quality change",
"force", force,
"committedMaxSubscribedQuality", d.committedMaxSubscribedQuality,
"maxSubscribedQuality", d.maxSubscribedQuality,
@@ -269,7 +180,8 @@ func (d *DynacastManager) update(force bool) {
if downgradesOnly && d.maxSubscribedQualityDebounce != nil {
if !d.maxSubscribedQualityDebouncePending {
d.params.Logger.Debugw("debouncing quality downgrade",
d.params.Logger.Debugw(
"debouncing quality downgrade",
"committedMaxSubscribedQuality", d.committedMaxSubscribedQuality,
"maxSubscribedQuality", d.maxSubscribedQuality,
)
@@ -278,7 +190,8 @@ func (d *DynacastManager) update(force bool) {
})
d.maxSubscribedQualityDebouncePending = true
} else {
d.params.Logger.Debugw("quality downgrade waiting for debounce",
d.params.Logger.Debugw(
"quality downgrade waiting for debounce",
"committedMaxSubscribedQuality", d.committedMaxSubscribedQuality,
"maxSubscribedQuality", d.maxSubscribedQuality,
)
@@ -294,7 +207,8 @@ func (d *DynacastManager) update(force bool) {
d.maxSubscribedQualityDebouncePending = false
}
d.params.Logger.Debugw("committing quality change",
d.params.Logger.Debugw(
"committing quality change",
"force", force,
"committedMaxSubscribedQuality", d.committedMaxSubscribedQuality,
"maxSubscribedQuality", d.maxSubscribedQuality,
@@ -310,8 +224,8 @@ func (d *DynacastManager) update(force bool) {
d.lock.Unlock()
}
func (d *DynacastManager) enqueueSubscribedQualityChange() {
if d.isClosed || d.onSubscribedMaxQualityChange == nil {
func (d *dynacastManagerVideo) enqueueSubscribedQualityChange() {
if d.isClosed || d.params.Listener == nil {
return
}
@@ -352,7 +266,7 @@ func (d *DynacastManager) enqueueSubscribedQualityChange() {
"subscribedCodecs", subscribedCodecs,
"maxSubscribedQualities", maxSubscribedQualities,
)
d.qualityNotifyOpQueue.Enqueue(func() {
d.onSubscribedMaxQualityChange(subscribedCodecs, maxSubscribedQualities)
d.notifyOpsQueue.Enqueue(func() {
d.params.Listener.OnDynacastSubscribedMaxQualityChange(subscribedCodecs, maxSubscribedQualities)
})
}
+168
View File
@@ -0,0 +1,168 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dynacast
import (
"sync"
"github.com/livekit/livekit-server/pkg/sfu/mime"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
var _ dynacastQuality = (*dynacastQualityAudio)(nil)
type dynacastQualityAudioParams struct {
MimeType mime.MimeType
Listener dynacastQualityListener
Logger logger.Logger
}
// dynacastQualityAudio manages enable a single receiver of a media track
type dynacastQualityAudio struct {
params dynacastQualityAudioParams
// quality level enable/disable
lock sync.RWMutex
initialized bool
subscriberEnables map[livekit.ParticipantID]bool
subscriberNodeEnables map[livekit.NodeID]bool
enabled bool
regressTo dynacastQuality
dynacastQualityNull
}
func newDynacastQualityAudio(params dynacastQualityAudioParams) dynacastQuality {
return &dynacastQualityAudio{
params: params,
subscriberEnables: make(map[livekit.ParticipantID]bool),
subscriberNodeEnables: make(map[livekit.NodeID]bool),
}
}
func (d *dynacastQualityAudio) Start() {
d.reset()
}
func (d *dynacastQualityAudio) Restart() {
d.reset()
}
func (d *dynacastQualityAudio) Stop() {
}
func (d *dynacastQualityAudio) NotifySubscription(subscriberID livekit.ParticipantID, enabled bool) {
d.params.Logger.Debugw(
"setting subscriber codec enable",
"mime", d.params.MimeType,
"subscriberID", subscriberID,
"enabled", enabled,
)
d.lock.Lock()
if r := d.regressTo; r != nil {
d.lock.Unlock()
return
}
if !enabled {
delete(d.subscriberEnables, subscriberID)
} else {
d.subscriberEnables[subscriberID] = true
}
d.lock.Unlock()
d.updateQualityChange(false)
}
func (d *dynacastQualityAudio) NotifySubscriptionNode(nodeID livekit.NodeID, enabled bool) {
d.params.Logger.Debugw(
"setting subscriber node codec enabled",
"mime", d.params.MimeType,
"subscriberNodeID", nodeID,
"enabled", enabled,
)
d.lock.Lock()
if r := d.regressTo; r != nil {
// the downstream node will synthesize correct enable (its dynacast manager has codec regression), just ignore it
d.params.Logger.Debugw(
"ignoring node codec change, regressed to another dynacast quality",
"mime", d.params.MimeType,
"regressedMime", d.regressTo.Mime(),
)
d.lock.Unlock()
return
}
if !enabled {
delete(d.subscriberNodeEnables, nodeID)
} else {
d.subscriberNodeEnables[nodeID] = true
}
d.lock.Unlock()
d.updateQualityChange(false)
}
func (d *dynacastQualityAudio) ClearSubscriberNodes() {
d.lock.Lock()
d.subscriberNodeEnables = make(map[livekit.NodeID]bool)
d.lock.Unlock()
d.updateQualityChange(false)
}
func (d *dynacastQualityAudio) Mime() mime.MimeType {
return d.params.MimeType
}
func (d *dynacastQualityAudio) RegressTo(other dynacastQuality) {
d.lock.Lock()
d.regressTo = other
d.lock.Unlock()
other.Restart()
}
func (d *dynacastQualityAudio) reset() {
d.lock.Lock()
d.initialized = false
d.lock.Unlock()
}
func (d *dynacastQualityAudio) updateQualityChange(force bool) {
d.lock.Lock()
enabled := len(d.subscriberEnables) != 0 || len(d.subscriberNodeEnables) != 0
if enabled == d.enabled && d.initialized && !force {
d.lock.Unlock()
return
}
d.initialized = true
d.enabled = enabled
d.params.Logger.Debugw(
"notifying enabled change",
"mime", d.params.MimeType,
"enabled", d.enabled,
"subscriberNodeEnables", d.subscriberNodeEnables,
"subscribedEnables", d.subscriberEnables,
"force", force,
)
d.lock.Unlock()
d.params.Listener.OnUpdateAudioCodecForMime(d.params.MimeType, enabled)
}
@@ -23,18 +23,21 @@ import (
"github.com/livekit/protocol/logger"
)
var _ dynacastQuality = (*dynacastQualityVideo)(nil)
const (
initialQualityUpdateWait = 10 * time.Second
)
type DynacastQualityParams struct {
type dynacastQualityVideoParams struct {
MimeType mime.MimeType
Listener dynacastQualityListener
Logger logger.Logger
}
// DynacastQuality manages max subscribed quality of a single receiver of a media track
type DynacastQuality struct {
params DynacastQualityParams
// dynacastQualityVideo manages max subscribed quality of a single receiver of a media track
type dynacastQualityVideo struct {
params dynacastQualityVideoParams
// quality level enable/disable
lock sync.RWMutex
@@ -43,38 +46,32 @@ type DynacastQuality struct {
maxSubscriberNodeQuality map[livekit.NodeID]livekit.VideoQuality
maxSubscribedQuality livekit.VideoQuality
maxQualityTimer *time.Timer
regressTo *DynacastQuality
regressTo dynacastQuality
onSubscribedMaxQualityChange func(mimeType mime.MimeType, maxSubscribedQuality livekit.VideoQuality)
dynacastQualityNull
}
func NewDynacastQuality(params DynacastQualityParams) *DynacastQuality {
return &DynacastQuality{
func newDynacastQualityVideo(params dynacastQualityVideoParams) dynacastQuality {
return &dynacastQualityVideo{
params: params,
maxSubscriberQuality: make(map[livekit.ParticipantID]livekit.VideoQuality),
maxSubscriberNodeQuality: make(map[livekit.NodeID]livekit.VideoQuality),
}
}
func (d *DynacastQuality) Start() {
func (d *dynacastQualityVideo) Start() {
d.reset()
}
func (d *DynacastQuality) Restart() {
func (d *dynacastQualityVideo) Restart() {
d.reset()
}
func (d *DynacastQuality) Stop() {
func (d *dynacastQualityVideo) Stop() {
d.stopMaxQualityTimer()
}
func (d *DynacastQuality) OnSubscribedMaxQualityChange(f func(mimeType mime.MimeType, maxSubscribedQuality livekit.VideoQuality)) {
d.lock.Lock()
defer d.lock.Unlock()
d.onSubscribedMaxQualityChange = f
}
func (d *DynacastQuality) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, quality livekit.VideoQuality) {
func (d *dynacastQualityVideo) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, quality livekit.VideoQuality) {
d.params.Logger.Debugw(
"setting subscriber max quality",
"mime", d.params.MimeType,
@@ -99,14 +96,7 @@ func (d *DynacastQuality) NotifySubscriberMaxQuality(subscriberID livekit.Partic
d.updateQualityChange(false)
}
func (d *DynacastQuality) ClearSubscriberNodesMaxQuality() {
d.lock.Lock()
d.maxSubscriberNodeQuality = make(map[livekit.NodeID]livekit.VideoQuality)
d.lock.Unlock()
d.updateQualityChange(false)
}
func (d *DynacastQuality) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quality livekit.VideoQuality) {
func (d *dynacastQualityVideo) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quality livekit.VideoQuality) {
d.params.Logger.Debugw(
"setting subscriber node max quality",
"mime", d.params.MimeType,
@@ -117,8 +107,12 @@ func (d *DynacastQuality) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID,
d.lock.Lock()
if r := d.regressTo; r != nil {
// the downstream node will synthesize correct quality notify (its dynacast manager has codec regression), just ignore it
d.params.Logger.Debugw(
"ignoring node quality change, regressed to another dynacast quality",
"mime", d.params.MimeType,
"regressedMime", d.regressTo.Mime(),
)
d.lock.Unlock()
r.params.Logger.Debugw("ignoring node quality change, regressed to another dynacast quality", "mime", d.params.MimeType)
return
}
@@ -132,7 +126,19 @@ func (d *DynacastQuality) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID,
d.updateQualityChange(false)
}
func (d *DynacastQuality) RegressTo(other *DynacastQuality) {
func (d *dynacastQualityVideo) ClearSubscriberNodes() {
d.lock.Lock()
d.maxSubscriberNodeQuality = make(map[livekit.NodeID]livekit.VideoQuality)
d.lock.Unlock()
d.updateQualityChange(false)
}
func (d *dynacastQualityVideo) Mime() mime.MimeType {
return d.params.MimeType
}
func (d *dynacastQualityVideo) RegressTo(other dynacastQuality) {
d.lock.Lock()
d.regressTo = other
maxSubscriberQuality := d.maxSubscriberQuality
@@ -141,33 +147,41 @@ func (d *DynacastQuality) RegressTo(other *DynacastQuality) {
d.maxSubscriberNodeQuality = make(map[livekit.NodeID]livekit.VideoQuality)
d.lock.Unlock()
other.lock.Lock()
other.Replace(maxSubscriberQuality, maxSubscriberNodeQuality)
}
func (d *dynacastQualityVideo) Replace(
maxSubscriberQuality map[livekit.ParticipantID]livekit.VideoQuality,
maxSubscriberNodeQuality map[livekit.NodeID]livekit.VideoQuality,
) {
d.lock.Lock()
for subID, quality := range maxSubscriberQuality {
if otherQuality, ok := other.maxSubscriberQuality[subID]; ok {
if oldQuality, ok := d.maxSubscriberQuality[subID]; ok {
// no QUALITY_OFF in the map
if quality > otherQuality {
other.maxSubscriberQuality[subID] = quality
if quality > oldQuality {
d.maxSubscriberQuality[subID] = quality
}
} else {
other.maxSubscriberQuality[subID] = quality
d.maxSubscriberQuality[subID] = quality
}
}
for nodeID, quality := range maxSubscriberNodeQuality {
if otherQuality, ok := other.maxSubscriberNodeQuality[nodeID]; ok {
if oldQuality, ok := d.maxSubscriberNodeQuality[nodeID]; ok {
// no QUALITY_OFF in the map
if quality > otherQuality {
other.maxSubscriberNodeQuality[nodeID] = quality
if quality > oldQuality {
d.maxSubscriberNodeQuality[nodeID] = quality
}
} else {
other.maxSubscriberNodeQuality[nodeID] = quality
d.maxSubscriberNodeQuality[nodeID] = quality
}
}
other.lock.Unlock()
other.Restart()
d.lock.Unlock()
d.Restart()
}
func (d *DynacastQuality) reset() {
func (d *dynacastQualityVideo) reset() {
d.lock.Lock()
d.initialized = false
d.lock.Unlock()
@@ -175,7 +189,7 @@ func (d *DynacastQuality) reset() {
d.startMaxQualityTimer()
}
func (d *DynacastQuality) updateQualityChange(force bool) {
func (d *dynacastQualityVideo) updateQualityChange(force bool) {
d.lock.Lock()
maxSubscribedQuality := livekit.VideoQuality_OFF
for _, subQuality := range d.maxSubscriberQuality {
@@ -196,22 +210,20 @@ func (d *DynacastQuality) updateQualityChange(force bool) {
d.initialized = true
d.maxSubscribedQuality = maxSubscribedQuality
d.params.Logger.Debugw("notifying quality change",
d.params.Logger.Debugw(
"notifying quality change",
"mime", d.params.MimeType,
"maxSubscriberQuality", d.maxSubscriberQuality,
"maxSubscriberNodeQuality", d.maxSubscriberNodeQuality,
"maxSubscribedQuality", d.maxSubscribedQuality,
"force", force,
)
onSubscribedMaxQualityChange := d.onSubscribedMaxQualityChange
d.lock.Unlock()
if onSubscribedMaxQualityChange != nil {
onSubscribedMaxQualityChange(d.params.MimeType, maxSubscribedQuality)
}
d.params.Listener.OnUpdateMaxQualityForMime(d.params.MimeType, maxSubscribedQuality)
}
func (d *DynacastQuality) startMaxQualityTimer() {
func (d *dynacastQualityVideo) startMaxQualityTimer() {
d.lock.Lock()
defer d.lock.Unlock()
@@ -226,7 +238,7 @@ func (d *DynacastQuality) startMaxQualityTimer() {
})
}
func (d *DynacastQuality) stopMaxQualityTimer() {
func (d *dynacastQualityVideo) stopMaxQualityTimer() {
d.lock.Lock()
defer d.lock.Unlock()
+185
View File
@@ -0,0 +1,185 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dynacast
import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu/mime"
)
type DynacastManagerListener interface {
OnDynacastSubscribedMaxQualityChange(
subscribedQualities []*livekit.SubscribedCodec,
maxSubscribedQualities []types.SubscribedCodecQuality,
)
OnDynacastSubscribedAudioCodecChange(codecs []*livekit.SubscribedAudioCodec)
}
var _ DynacastManagerListener = (*DynacastManagerListenerNull)(nil)
type DynacastManagerListenerNull struct {
}
func (d *DynacastManagerListenerNull) OnDynacastSubscribedMaxQualityChange(
subscribedQualities []*livekit.SubscribedCodec,
maxSubscribedQualities []types.SubscribedCodecQuality,
) {
}
func (d *DynacastManagerListenerNull) OnDynacastSubscribedAudioCodecChange(
codecs []*livekit.SubscribedAudioCodec,
) {
}
// -----------------------------------------
type DynacastManager interface {
AddCodec(mime mime.MimeType)
HandleCodecRegression(fromMime, toMime mime.MimeType)
Restart()
Close()
ForceUpdate()
ForceQuality(quality livekit.VideoQuality)
ForceEnable(enabled bool)
NotifySubscriberMaxQuality(
subscriberID livekit.ParticipantID,
mime mime.MimeType,
quality livekit.VideoQuality,
)
NotifySubscription(
subscriberID livekit.ParticipantID,
mime mime.MimeType,
enabled bool,
)
NotifySubscriberNodeMaxQuality(
nodeID livekit.NodeID,
qualities []types.SubscribedCodecQuality,
)
NotifySubscriptionNode(
nodeID livekit.NodeID,
codecs []*livekit.SubscribedAudioCodec,
)
ClearSubscriberNodes()
}
var _ DynacastManager = (*dynacastManagerNull)(nil)
type dynacastManagerNull struct {
}
func (d *dynacastManagerNull) AddCodec(mime mime.MimeType) {}
func (d *dynacastManagerNull) HandleCodecRegression(fromMime, toMime mime.MimeType) {}
func (d *dynacastManagerNull) Restart() {}
func (d *dynacastManagerNull) Close() {}
func (d *dynacastManagerNull) ForceUpdate() {}
func (d *dynacastManagerNull) ForceQuality(quality livekit.VideoQuality) {}
func (d *dynacastManagerNull) ForceEnable(enabled bool) {}
func (d *dynacastManagerNull) NotifySubscriberMaxQuality(
subscriberID livekit.ParticipantID,
mime mime.MimeType,
quality livekit.VideoQuality,
) {
}
func (d *dynacastManagerNull) NotifySubscription(
subscriberID livekit.ParticipantID,
mime mime.MimeType,
enabled bool,
) {
}
func (d *dynacastManagerNull) NotifySubscriberNodeMaxQuality(
nodeID livekit.NodeID,
qualities []types.SubscribedCodecQuality,
) {
}
func (d *dynacastManagerNull) NotifySubscriptionNode(
nodeID livekit.NodeID,
codecs []*livekit.SubscribedAudioCodec,
) {
}
func (d *dynacastManagerNull) ClearSubscriberNodes() {}
// ------------------------------------------------
type dynacastQualityListener interface {
OnUpdateMaxQualityForMime(mimeType mime.MimeType, maxQuality livekit.VideoQuality)
OnUpdateAudioCodecForMime(mimeType mime.MimeType, enabled bool)
}
var _ dynacastQualityListener = (*dynacastQualityListenerNull)(nil)
type dynacastQualityListenerNull struct {
}
func (d *dynacastQualityListenerNull) OnUpdateMaxQualityForMime(
mimeType mime.MimeType,
maxQuality livekit.VideoQuality,
) {
}
func (d *dynacastQualityListenerNull) OnUpdateAudioCodecForMime(
mimeType mime.MimeType,
enabled bool,
) {
}
// ------------------------------------------------
type dynacastQuality interface {
Start()
Restart()
Stop()
NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, quality livekit.VideoQuality)
NotifySubscription(subscriberID livekit.ParticipantID, enabled bool)
NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quality livekit.VideoQuality)
NotifySubscriptionNode(nodeID livekit.NodeID, enabled bool)
ClearSubscriberNodes()
Replace(
maxSubscriberQuality map[livekit.ParticipantID]livekit.VideoQuality,
maxSubscriberNodeQuality map[livekit.NodeID]livekit.VideoQuality,
)
Mime() mime.MimeType
RegressTo(other dynacastQuality)
}
var _ dynacastQuality = (*dynacastQualityNull)(nil)
type dynacastQualityNull struct {
}
func (d *dynacastQualityNull) Start() {}
func (d *dynacastQualityNull) Restart() {}
func (d *dynacastQualityNull) Stop() {}
func (d *dynacastQualityNull) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, quality livekit.VideoQuality) {
}
func (d *dynacastQualityNull) NotifySubscription(subscriberID livekit.ParticipantID, enabled bool) {}
func (d *dynacastQualityNull) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quality livekit.VideoQuality) {
}
func (d *dynacastQualityNull) NotifySubscriptionNode(nodeID livekit.NodeID, enabled bool) {}
func (d *dynacastQualityNull) ClearSubscriberNodes() {}
func (d *dynacastQualityNull) Replace(
maxSubscriberQuality map[livekit.ParticipantID]livekit.VideoQuality,
maxSubscriberNodeQuality map[livekit.NodeID]livekit.VideoQuality,
) {
}
func (d *dynacastQualityNull) Mime() mime.MimeType { return mime.MimeTypeUnknown }
func (d *dynacastQualityNull) RegressTo(other dynacastQuality) {}
+113 -34
View File
@@ -51,7 +51,7 @@ type MediaTrack struct {
*MediaTrackReceiver
*MediaLossProxy
dynacastManager *dynacast.DynacastManager
dynacastManager dynacast.DynacastManager
lock sync.RWMutex
@@ -60,6 +60,17 @@ type MediaTrack struct {
backupCodecPolicy livekit.BackupCodecPolicy
regressionTargetCodec mime.MimeType
regressionTargetCodecReceived bool
onSubscribedMaxQualityChange func(
trackID livekit.TrackID,
trackInfo *livekit.TrackInfo,
subscribedQualities []*livekit.SubscribedCodec,
maxSubscribedQualities []types.SubscribedCodecQuality,
) error
onSubscribedAudioCodecChange func(
trackID livekit.TrackID,
codecs []*livekit.SubscribedAudioCodec,
) error
}
type MediaTrackParams struct {
@@ -122,27 +133,57 @@ func NewMediaTrack(params MediaTrackParams, ti *livekit.TrackInfo) *MediaTrack {
t.MediaTrackReceiver.OnMediaLossFeedback(t.MediaLossProxy.HandleMaxLossFeedback)
}
if ti.Type == livekit.TrackType_VIDEO {
t.dynacastManager = dynacast.NewDynacastManager(dynacast.DynacastManagerParams{
switch ti.Type {
case livekit.TrackType_VIDEO:
t.dynacastManager = dynacast.NewDynacastManagerVideo(dynacast.DynacastManagerVideoParams{
DynacastPauseDelay: params.VideoConfig.DynacastPauseDelay,
Listener: t,
Logger: params.Logger,
})
t.MediaTrackReceiver.OnSetupReceiver(func(mime mime.MimeType) {
case livekit.TrackType_AUDIO:
if len(ti.Codecs) > 1 {
t.dynacastManager = dynacast.NewDynacastManagerAudio(dynacast.DynacastManagerAudioParams{
Listener: t,
Logger: params.Logger,
})
}
}
t.MediaTrackReceiver.OnSetupReceiver(func(mime mime.MimeType) {
if t.dynacastManager != nil {
t.dynacastManager.AddCodec(mime)
})
t.MediaTrackReceiver.OnSubscriberMaxQualityChange(
func(subscriberID livekit.ParticipantID, mimeType mime.MimeType, layer int32) {
}
})
t.MediaTrackReceiver.OnSubscriberMaxQualityChange(
func(subscriberID livekit.ParticipantID, mimeType mime.MimeType, layer int32) {
if t.dynacastManager != nil {
t.dynacastManager.NotifySubscriberMaxQuality(
subscriberID,
mimeType,
buffer.GetVideoQualityForSpatialLayer(mimeType, layer, t.MediaTrackReceiver.TrackInfo()),
buffer.GetVideoQualityForSpatialLayer(
mimeType,
layer,
t.MediaTrackReceiver.TrackInfo(),
),
)
},
)
t.MediaTrackReceiver.OnCodecRegression(func(old, new webrtc.RTPCodecParameters) {
t.dynacastManager.HandleCodecRegression(mime.NormalizeMimeType(old.MimeType), mime.NormalizeMimeType(new.MimeType))
})
}
}
},
)
t.MediaTrackReceiver.OnSubscriberAudioCodecChange(
func(subscriberID livekit.ParticipantID, mimeType mime.MimeType, enabled bool) {
if t.dynacastManager != nil {
t.dynacastManager.NotifySubscription(subscriberID, mimeType, enabled)
}
},
)
t.MediaTrackReceiver.OnCodecRegression(func(old, new webrtc.RTPCodecParameters) {
if t.dynacastManager != nil {
t.dynacastManager.HandleCodecRegression(
mime.NormalizeMimeType(old.MimeType),
mime.NormalizeMimeType(new.MimeType),
)
}
})
t.SetMuted(ti.Muted)
return t
@@ -156,26 +197,20 @@ func (t *MediaTrack) OnSubscribedMaxQualityChange(
maxSubscribedQualities []types.SubscribedCodecQuality,
) error,
) {
if t.dynacastManager == nil {
return
}
t.lock.Lock()
t.onSubscribedMaxQualityChange = f
t.lock.Unlock()
}
handler := func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) {
if f != nil && !t.IsMuted() {
_ = f(t.ID(), t.ToProto(), subscribedQualities, maxSubscribedQualities)
}
for _, q := range maxSubscribedQualities {
receiver := t.Receiver(q.CodecMime)
if receiver != nil {
receiver.SetMaxExpectedSpatialLayer(
buffer.GetSpatialLayerForVideoQuality(q.CodecMime, q.Quality, t.MediaTrackReceiver.TrackInfo()),
)
}
}
}
t.dynacastManager.OnSubscribedMaxQualityChange(handler)
func (t *MediaTrack) OnSubscribedAudioCodecChange(
f func(
trackID livekit.TrackID,
codecs []*livekit.SubscribedAudioCodec,
) error,
) {
t.lock.Lock()
t.onSubscribedAudioCodecChange = f
t.lock.Unlock()
}
func (t *MediaTrack) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []types.SubscribedCodecQuality) {
@@ -186,7 +221,7 @@ func (t *MediaTrack) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quali
func (t *MediaTrack) ClearSubscriberNodesMaxQuality() {
if t.dynacastManager != nil {
t.dynacastManager.ClearSubscriberNodesMaxQuality()
t.dynacastManager.ClearSubscriberNodes()
}
}
@@ -582,3 +617,47 @@ func (t *MediaTrack) enableRegression() bool {
func (t *MediaTrack) Logger() logger.Logger {
return t.params.Logger
}
// dynacast.DynacastManagerListtener implementation
var _ dynacast.DynacastManagerListener = (*MediaTrack)(nil)
func (t *MediaTrack) OnDynacastSubscribedMaxQualityChange(
subscribedQualities []*livekit.SubscribedCodec,
maxSubscribedQualities []types.SubscribedCodecQuality,
) {
t.lock.RLock()
onSubscribedMaxQualityChange := t.onSubscribedMaxQualityChange
t.lock.RUnlock()
if onSubscribedMaxQualityChange != nil && !t.IsMuted() {
_ = onSubscribedMaxQualityChange(
t.ID(),
t.ToProto(),
subscribedQualities,
maxSubscribedQualities,
)
}
for _, q := range maxSubscribedQualities {
receiver := t.Receiver(q.CodecMime)
if receiver != nil {
receiver.SetMaxExpectedSpatialLayer(
buffer.GetSpatialLayerForVideoQuality(
q.CodecMime,
q.Quality,
t.MediaTrackReceiver.TrackInfo(),
),
)
}
}
}
func (t *MediaTrack) OnDynacastSubscribedAudioCodecChange(codecs []*livekit.SubscribedAudioCodec) {
t.lock.RLock()
onSubscribedAudioCodecChange := t.onSubscribedAudioCodecChange
t.lock.RUnlock()
if onSubscribedAudioCodecChange != nil {
_ = onSubscribedAudioCodecChange(t.ID(), codecs)
}
}
+6 -1
View File
@@ -305,7 +305,11 @@ func (t *MediaTrackReceiver) HandleReceiverCodecChange(r sfu.TrackReceiver, code
return
}
t.params.Logger.Infow("regressing codec", "from", codec.MimeType, "to", backupCodecReceiver.Codec().MimeType)
t.params.Logger.Infow(
"regressing codec",
"from", codec.MimeType,
"to", backupCodecReceiver.Codec().MimeType,
)
// remove old codec from potential codecs
for i, c := range t.potentialCodecs {
@@ -578,6 +582,7 @@ func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) (types.Su
UpstreamCodecs: potentialCodecs,
Logger: tLogger,
DisableRed: t.TrackInfo().GetDisableRed() || !t.params.AudioConfig.ActiveREDEncoding,
IsEncrypted: t.IsEncrypted(),
})
subID := sub.ID()
subTrack, err := t.MediaTrackSubscriptions.AddSubscriber(sub, wr)
+6
View File
@@ -44,6 +44,7 @@ type MediaTrackSubscriptions struct {
onDownTrackCreated func(downTrack *sfu.DownTrack)
onSubscriberMaxQualityChange func(subscriberID livekit.ParticipantID, mime mime.MimeType, layer int32)
onSubscriberAudioCodecChange func(subscriberID livekit.ParticipantID, mime mime.MimeType, enabled bool)
}
type MediaTrackSubscriptionsParams struct {
@@ -73,6 +74,10 @@ func (t *MediaTrackSubscriptions) OnSubscriberMaxQualityChange(f func(subscriber
t.onSubscriberMaxQualityChange = f
}
func (t *MediaTrackSubscriptions) OnSubscriberAudioCodecChange(f func(subscriberID livekit.ParticipantID, mime mime.MimeType, enabled bool)) {
t.onSubscriberAudioCodecChange = f
}
func (t *MediaTrackSubscriptions) SetMuted(muted bool) {
// update mute of all subscribed tracks
for _, st := range t.getAllSubscribedTracks() {
@@ -117,6 +122,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
t.subscribedTracksMu.Unlock()
},
OnSubscriberMaxQualityChange: t.onSubscriberMaxQualityChange,
OnSubscriberAudioCodecChange: t.onSubscriberAudioCodecChange,
})
if err != nil {
return nil, err
+30
View File
@@ -2798,6 +2798,35 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(
return p.sendSubscribedQualityUpdate(subscribedQualityUpdate)
}
func (p *ParticipantImpl) onSubscribedAudioCodecChange(
trackID livekit.TrackID,
codecs []*livekit.SubscribedAudioCodec,
) error {
if p.params.DisableDynacast {
return nil
}
if len(codecs) == 0 {
return nil
}
// normalize the codec name
for _, codec := range codecs {
codec.Codec = strings.ToLower(strings.TrimPrefix(codec.Codec, mime.MimeTypePrefixAudio))
}
subscribedAudioCodecUpdate := &livekit.SubscribedAudioCodecUpdate{
TrackSid: string(trackID),
SubscribedAudioCodecs: codecs,
}
p.pubLogger.Debugw(
"sending subscribed audio codec update",
"trackID", trackID,
"update", logger.Proto(subscribedAudioCodecUpdate),
)
return p.sendSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate)
}
func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *livekit.TrackInfo {
if req.Sid != "" {
track := p.GetPublishedTrack(livekit.TrackID(req.Sid))
@@ -3317,6 +3346,7 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, ti *livekit.TrackInfo)
}, ti)
mt.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange)
mt.OnSubscribedAudioCodecChange(p.onSubscribedAudioCodecChange)
// add to published and clean up pending
if p.supervisor != nil {
+4
View File
@@ -311,6 +311,10 @@ func (p *ParticipantImpl) sendSubscribedQualityUpdate(subscribedQualityUpdate *l
return p.signaller.WriteMessage(p.signalling.SignalSubscribedQualityUpdate(subscribedQualityUpdate))
}
func (p *ParticipantImpl) sendSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate *livekit.SubscribedAudioCodecUpdate) error {
return p.signaller.WriteMessage(p.signalling.SignalSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate))
}
func (p *ParticipantImpl) sendSubscriptionResponse(trackID livekit.TrackID, subErr livekit.SubscriptionError) error {
return p.signaller.WriteMessage(p.signalling.SignalSubscriptionResponse(&livekit.SubscriptionResponse{
TrackSid: string(trackID),
+1
View File
@@ -58,4 +58,5 @@ type ParticipantSignalling interface {
SignalSubscriptionResponse(subscriptionResponse *livekit.SubscriptionResponse) proto.Message
SignalSubscriptionPermissionUpdate(subscriptionPermissionUpdate *livekit.SubscriptionPermissionUpdate) proto.Message
SignalMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) proto.Message
SignalSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate *livekit.SubscribedAudioCodecUpdate) proto.Message
}
+8
View File
@@ -228,3 +228,11 @@ func (u *signalling) SignalMediaSectionsRequirement(mediaSectionsRequirement *li
},
}
}
func (s *signalling) SignalSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate *livekit.SubscribedAudioCodecUpdate) proto.Message {
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_SubscribedAudioCodecUpdate{
SubscribedAudioCodecUpdate: subscribedAudioCodecUpdate,
},
}
}
@@ -111,3 +111,7 @@ func (u *signallingUnimplemented) SignalSubscriptionPermissionUpdate(subscriptio
func (u *signallingUnimplemented) SignalMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate *livekit.SubscribedAudioCodecUpdate) proto.Message {
return nil
}
+14 -7
View File
@@ -54,6 +54,7 @@ type SubscribedTrackParams struct {
OnDownTrackCreated func(downTrack *sfu.DownTrack)
OnDownTrackClosed func(subscriberID livekit.ParticipantID)
OnSubscriberMaxQualityChange func(subscriberID livekit.ParticipantID, mime mime.MimeType, layer int32)
OnSubscriberAudioCodecChange func(subscriberID livekit.ParticipantID, mime mime.MimeType, enabled bool)
}
type SubscribedTrack struct {
@@ -436,15 +437,21 @@ func (t *SubscribedTrack) OnCodecNegotiated(codec webrtc.RTPCodecCapability) {
return
}
if t.params.OnSubscriberMaxQualityChange != nil {
if t.params.OnSubscriberMaxQualityChange != nil || t.params.OnSubscriberAudioCodecChange != nil {
go func() {
mimeType := mime.NormalizeMimeType(codec.MimeType)
spatial := buffer.GetSpatialLayerForVideoQuality(
mimeType,
livekit.VideoQuality_HIGH,
t.params.MediaTrack.ToProto(),
)
t.params.OnSubscriberMaxQualityChange(t.downTrack.SubscriberID(), mimeType, spatial)
switch t.params.MediaTrack.Kind() {
case livekit.TrackType_VIDEO:
spatial := buffer.GetSpatialLayerForVideoQuality(
mimeType,
livekit.VideoQuality_HIGH,
t.params.MediaTrack.ToProto(),
)
t.params.OnSubscriberMaxQualityChange(t.downTrack.SubscriberID(), mimeType, spatial)
case livekit.TrackType_AUDIO:
t.params.OnSubscriberAudioCodecChange(t.downTrack.SubscriberID(), mimeType, true)
}
}()
}
}
+13 -8
View File
@@ -40,6 +40,7 @@ type WrappedReceiverParams struct {
UpstreamCodecs []webrtc.RTPCodecParameters
Logger logger.Logger
DisableRed bool
IsEncrypted bool
}
type WrappedReceiver struct {
@@ -59,7 +60,7 @@ func NewWrappedReceiver(params WrappedReceiverParams) *WrappedReceiver {
}
codecs := params.UpstreamCodecs
if len(codecs) == 1 {
if len(codecs) == 1 && !params.IsEncrypted {
normalizedMimeType := mime.NormalizeMimeType(codecs[0].MimeType)
if normalizedMimeType == mime.MimeTypeRED {
// if upstream is opus/red, then add opus to match clients that don't support red
@@ -98,13 +99,17 @@ func (r *WrappedReceiver) DetermineReceiver(codec webrtc.RTPCodecCapability) boo
if receiverMimeType == codecMimeType {
trackReceiver = receiver
break
} else if receiverMimeType == mime.MimeTypeRED && codecMimeType == mime.MimeTypeOpus {
// audio opus/red can match opus only
trackReceiver = receiver.GetPrimaryReceiverForRed()
break
} else if receiverMimeType == mime.MimeTypeOpus && codecMimeType == mime.MimeTypeRED {
trackReceiver = receiver.GetRedReceiver()
break
}
if !r.params.IsEncrypted {
if receiverMimeType == mime.MimeTypeRED && codecMimeType == mime.MimeTypeOpus {
// audio opus/red can match opus only
trackReceiver = receiver.GetPrimaryReceiverForRed()
break
} else if receiverMimeType == mime.MimeTypeOpus && codecMimeType == mime.MimeTypeRED {
trackReceiver = receiver.GetRedReceiver()
break
}
}
}
if trackReceiver == nil {