Merge remote-tracking branch 'origin/master' into raja_fr

This commit is contained in:
boks1971
2024-08-06 14:35:53 +05:30
27 changed files with 402 additions and 285 deletions
+3 -3
View File
@@ -305,11 +305,11 @@ LiveKit server is licensed under Apache License v2.0.
<br/><table>
<thead><tr><th colspan="2">LiveKit Ecosystem</th></tr></thead>
<tbody>
<tr><td>Real-time SDKs</td><td><a href="https://github.com/livekit/components-js">React Components</a> · <a href="https://github.com/livekit/client-sdk-js">Browser</a> · <a href="https://github.com/livekit/client-sdk-swift">iOS/macOS</a> · <a href="https://github.com/livekit/client-sdk-android">Android</a> · <a href="https://github.com/livekit/client-sdk-flutter">Flutter</a> · <a href="https://github.com/livekit/client-sdk-react-native">React Native</a> · <a href="https://github.com/livekit/rust-sdks">Rust</a> · <a href="https://github.com/livekit/node-sdks">Node.js</a> · <a href="https://github.com/livekit/python-sdks">Python</a> · <a href="https://github.com/livekit/client-sdk-unity-web">Unity (web)</a> · <a href="https://github.com/livekit/client-sdk-unity">Unity (beta)</a></td></tr><tr></tr>
<tr><td>Realtime SDKs</td><td><a href="https://github.com/livekit/components-js">React Components</a> · <a href="https://github.com/livekit/client-sdk-js">Browser</a> · <a href="https://github.com/livekit/components-swift">Swift Components</a> · <a href="https://github.com/livekit/client-sdk-swift">iOS/macOS/visionOS</a> · <a href="https://github.com/livekit/client-sdk-android">Android</a> · <a href="https://github.com/livekit/client-sdk-flutter">Flutter</a> · <a href="https://github.com/livekit/client-sdk-react-native">React Native</a> · <a href="https://github.com/livekit/rust-sdks">Rust</a> · <a href="https://github.com/livekit/node-sdks">Node.js</a> · <a href="https://github.com/livekit/python-sdks">Python</a> · <a href="https://github.com/livekit/client-sdk-unity-web">Unity (web)</a> · <a href="https://github.com/livekit/client-sdk-unity">Unity (beta)</a></td></tr><tr></tr>
<tr><td>Server APIs</td><td><a href="https://github.com/livekit/node-sdks">Node.js</a> · <a href="https://github.com/livekit/server-sdk-go">Golang</a> · <a href="https://github.com/livekit/server-sdk-ruby">Ruby</a> · <a href="https://github.com/livekit/server-sdk-kotlin">Java/Kotlin</a> · <a href="https://github.com/livekit/python-sdks">Python</a> · <a href="https://github.com/livekit/rust-sdks">Rust</a> · <a href="https://github.com/agence104/livekit-server-sdk-php">PHP (community)</a></td></tr><tr></tr>
<tr><td>Agents Frameworks</td><td><a href="https://github.com/livekit/agents">Python</a> · <a href="https://github.com/livekit/agent-playground">Playground</a></td></tr><tr></tr>
<tr><td>Services</td><td><b>Livekit server</b> · <a href="https://github.com/livekit/egress">Egress</a> · <a href="https://github.com/livekit/ingress">Ingress</a> · <a href="https://github.com/livekit/sip">SIP</a></td></tr><tr></tr>
<tr><td>Resources</td><td><a href="https://docs.livekit.io">Docs</a> · <a href="https://github.com/livekit-examples">Example apps</a> · <a href="https://livekit.io/cloud">Cloud</a> · <a href="https://docs.livekit.io/oss/deployment">Self-hosting</a> · <a href="https://github.com/livekit/livekit-cli">CLI</a></td></tr>
<tr><td>Services</td><td><b>LiveKit server</b> · <a href="https://github.com/livekit/egress">Egress</a> · <a href="https://github.com/livekit/ingress">Ingress</a> · <a href="https://github.com/livekit/sip">SIP</a></td></tr><tr></tr>
<tr><td>Resources</td><td><a href="https://docs.livekit.io">Docs</a> · <a href="https://github.com/livekit-examples">Example apps</a> · <a href="https://livekit.io/cloud">Cloud</a> · <a href="https://docs.livekit.io/home/self-hosting/deployment">Self-hosting</a> · <a href="https://github.com/livekit/livekit-cli">CLI</a></td></tr>
</tbody>
</table>
<!--END_REPO_NAV-->
+11 -11
View File
@@ -8,7 +8,7 @@ require (
github.com/d5/tengo/v2 v2.17.0
github.com/dustin/go-humanize v1.0.1
github.com/elliotchance/orderedmap/v2 v2.2.0
github.com/florianl/go-tc v0.4.3
github.com/florianl/go-tc v0.4.4
github.com/frostbyte73/core v0.0.10
github.com/gammazero/deque v0.2.1
github.com/gammazero/workerpool v1.1.3
@@ -19,8 +19,8 @@ require (
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7
github.com/livekit/protocol v1.19.2-0.20240719172332-0df8e893874b
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598
github.com/livekit/protocol v1.19.4-0.20240805121416-5be7cb358ec1
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
@@ -40,19 +40,19 @@ require (
github.com/pion/webrtc/v3 v3.2.47
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.1
github.com/redis/go-redis/v9 v9.5.4
github.com/redis/go-redis/v9 v9.6.1
github.com/rs/cors v1.11.0
github.com/stretchr/testify v1.9.0
github.com/thoas/go-funk v0.9.3
github.com/twitchtv/twirp v8.1.3+incompatible
github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6
github.com/urfave/cli/v2 v2.27.2
github.com/urfave/cli/v2 v2.27.3
github.com/urfave/negroni/v3 v3.1.1
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240716160929-1d5bc16f04a8
golang.org/x/sync v0.7.0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/sync v0.8.0
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
)
@@ -125,17 +125,17 @@ require (
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/zap/exp v0.2.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/mod v0.19.0 // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/tools v0.23.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect
google.golang.org/grpc v1.65.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+22 -22
View File
@@ -68,8 +68,8 @@ github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/florianl/go-tc v0.4.3 h1:xpobG2gFNvEqbclU07zjddALSjqTQTWJkxg5/kRYDpw=
github.com/florianl/go-tc v0.4.3/go.mod h1:uvp6pIlOw7Z8hhfnT5M4+V1hHVgZWRZwwMS8Z0JsRxc=
github.com/florianl/go-tc v0.4.4 h1:q6lhEWEfyhGffRzdl3eIcNqX/yVIw0IJwXqa9Rdcctw=
github.com/florianl/go-tc v0.4.4/go.mod h1:uvp6pIlOw7Z8hhfnT5M4+V1hHVgZWRZwwMS8Z0JsRxc=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
github.com/frostbyte73/core v0.0.10 h1:D4DQXdPb8ICayz0n75rs4UYTXrUSdxzUfeleuNJORsU=
@@ -165,10 +165,10 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 h1:F1L8inJoynwIAYpZENNYS+1xHJMF5RFRorsnAlcxfSY=
github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.19.2-0.20240719172332-0df8e893874b h1:Wn6D+B5YbMe1tH7WCazLJz+msBQzR69dK2wTdgJsF5k=
github.com/livekit/protocol v1.19.2-0.20240719172332-0df8e893874b/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.19.4-0.20240805121416-5be7cb358ec1 h1:GP4QtOjYE6zDdtIi8AyM6ukse55HXr0174uOYXxb/H8=
github.com/livekit/protocol v1.19.4-0.20240805121416-5be7cb358ec1/go.mod h1:oU5XbEaQlywdgXcSQDzrI5CPnwuGn/HuRXuQaDxVryQ=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
@@ -288,8 +288,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/puzpuzpuz/xsync/v3 v3.1.0 h1:EewKT7/LNac5SLiEblJeUu8z5eERHrmRLnMQL2d7qX4=
github.com/puzpuzpuz/xsync/v3 v3.1.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
github.com/redis/go-redis/v9 v9.5.4 h1:vOFYDKKVgrI5u++QvnMT7DksSMYg7Aw/Np4vLJLKLwY=
github.com/redis/go-redis/v9 v9.5.4/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
@@ -323,8 +323,8 @@ github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJX
github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A=
github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6 h1:SIKIoA4e/5Y9ZOl0DCe3eVMLPOQzJxgZpfdHHeauNTM=
github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6/go.mod h1:BUbeWZiieNxAuuADTBNb3/aeje6on3DhU3rpWsQSB1E=
github.com/urfave/cli/v2 v2.27.2 h1:6e0H+AkS+zDckwPCUrZkKX38mRaau4nL2uipkJpbkcI=
github.com/urfave/cli/v2 v2.27.2/go.mod h1:g0+79LmHHATl7DAcHO99smiR/T7uGLw84w8Y42x+4eM=
github.com/urfave/cli/v2 v2.27.3 h1:/POWahRmdh7uztQ3CYnaDddk0Rm90PyOgIxgW2rr41M=
github.com/urfave/cli/v2 v2.27.3/go.mod h1:m4QzxcD2qpra4z7WhzEGn74WZLViBnMpb1ToCAKdGRQ=
github.com/urfave/negroni/v3 v3.1.1 h1:6MS4nG9Jk/UuCACaUlNXCbiKa0ywF9LXz5dGu09v8hw=
github.com/urfave/negroni/v3 v3.1.1/go.mod h1:jWvnX03kcSjDBl/ShB0iHvx5uOs7mAzZXW+JvJ5XYAs=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
@@ -334,8 +334,8 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 h1:+qGGcbkzsfDQNPPe9UDgpxAWQrhbbBXOYJFQDq/dtJw=
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913/go.mod h1:4aEEwZQutDLsQv2Deui4iYQ6DWTxR14g6m8Wv88+Xqk=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
@@ -365,16 +365,16 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/exp v0.0.0-20240716160929-1d5bc16f04a8 h1:Z+vTUQyBb738QmIhbJx3z4htsxDeI+rd0EHvNm8jHkg=
golang.org/x/exp v0.0.0-20240716160929-1d5bc16f04a8/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8=
golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0=
golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -411,8 +411,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190411185658-b44545bcd369/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -452,8 +452,8 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@@ -494,8 +494,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw=
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d h1:JU0iKnSg02Gmb5ZdV8nYsKEKsP6o/FGVWTrw4i1DA9A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf h1:liao9UHurZLtiEwBgT9LMOnKYsHze6eA6w1KQCMVN2Q=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+11 -1
View File
@@ -122,7 +122,11 @@ func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverut
dispatcher := c.getDispatcher(desc.AgentName, desc.JobType)
if dispatcher == nil {
logger.Infow("not dispatching agent job since no worker is available", "agentName", desc.AgentName, "jobType", desc.JobType)
logger.Infow("not dispatching agent job since no worker is available",
"agentName", desc.AgentName,
"jobType", desc.JobType,
"room", desc.Room.Name,
"roomID", desc.Room.Sid)
return ret
}
@@ -183,6 +187,12 @@ func (c *agentClient) getDispatcher(agName string, jobType livekit.JobType) *ser
}
c.mu.Unlock()
if agName == "" {
// if no agent name is given, we would need to dispatch backwards compatible mode
// which means dispatching to each of the namespaces
return target
}
done := make(chan *serverutils.IncrementalDispatcher[string], 1)
c.workers.Submit(func() {
agentNames.ForEach(func(ag string) {
+30 -4
View File
@@ -129,6 +129,7 @@ type ParticipantParams struct {
TURNSEnabled bool
GetParticipantInfo func(pID livekit.ParticipantID) *livekit.ParticipantInfo
GetRegionSettings func(ip string) *livekit.RegionSettings
GetSubscriberForwarderState func(p types.LocalParticipant) (map[livekit.TrackID]*livekit.RTPForwarderState, error)
DisableSupervisor bool
ReconnectOnPublicationError bool
ReconnectOnSubscriptionError bool
@@ -231,6 +232,7 @@ type ParticipantImpl struct {
onICEConfigChanged func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig)
cachedDownTracks map[livekit.TrackID]*downTrackState
forwarderState map[livekit.TrackID]*livekit.RTPForwarderState
supervisor *supervisor.ParticipantSupervisor
@@ -1051,6 +1053,7 @@ func (p *ParticipantImpl) SetMigrateState(s types.MigrateState) {
case types.MigrateStateComplete:
p.TransportManager.ProcessPendingPublisherDataChannels()
p.cacheForwarderState()
}
if onMigrateStateChange := p.getOnMigrateStateChange(); onMigrateStateChange != nil {
@@ -1209,7 +1212,9 @@ func (p *ParticipantImpl) onTrackSubscribed(subTrack types.SubscribedTrack) {
return
}
if p.TransportManager.HasSubscriberEverConnected() {
subTrack.DownTrack().SetConnected()
dt := subTrack.DownTrack()
dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(subTrack.ID())})
dt.SetConnected()
}
p.TransportManager.AddSubscribedTrack(subTrack)
})
@@ -1636,7 +1641,7 @@ func (p *ParticipantImpl) onPublisherInitialConnected() {
func (p *ParticipantImpl) onSubscriberInitialConnected() {
go p.subscriberRTCPWorker()
p.setDowntracksConnected()
p.setDownTracksConnected()
}
func (p *ParticipantImpl) onPrimaryTransportInitialConnected() {
@@ -2448,18 +2453,39 @@ func (p *ParticipantImpl) postRtcp(pkts []rtcp.Packet) {
}, postRtcpOp{p, pkts})
}
func (p *ParticipantImpl) setDowntracksConnected() {
func (p *ParticipantImpl) setDownTracksConnected() {
for _, t := range p.SubscriptionManager.GetSubscribedTracks() {
if dt := t.DownTrack(); dt != nil {
dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(t.ID())})
dt.SetConnected()
}
}
}
func (p *ParticipantImpl) cacheForwarderState() {
// if migrating in, get forwarder state from migrating out node to facilitate resume
if f := p.params.GetSubscriberForwarderState; f != nil {
if fs, err := f(p); err == nil {
p.lock.Lock()
p.forwarderState = fs
p.lock.Unlock()
}
}
}
func (p *ParticipantImpl) getAndDeleteForwarderState(trackID livekit.TrackID) *livekit.RTPForwarderState {
p.lock.Lock()
fs := p.forwarderState[trackID]
delete(p.forwarderState, trackID)
p.lock.Unlock()
return fs
}
func (p *ParticipantImpl) CacheDownTrack(trackID livekit.TrackID, rtpTransceiver *webrtc.RTPTransceiver, downTrack sfu.DownTrackState) {
p.lock.Lock()
if existing := p.cachedDownTracks[trackID]; existing != nil && existing.transceiver != rtpTransceiver {
p.subLogger.Infow("cached transceiver changed", "trackID", trackID)
p.subLogger.Warnw("cached transceiver changed", nil, "trackID", trackID)
}
p.cachedDownTracks[trackID] = &downTrackState{transceiver: rtpTransceiver, downTrack: downTrack}
p.subLogger.Debugw("caching downtrack", "trackID", trackID)
+10 -2
View File
@@ -791,7 +791,7 @@ func (r *Room) CloseIfEmpty() {
r.lock.Unlock()
if elapsed >= int64(timeout) {
r.Close(types.ParticipantCloseReasonNone)
r.Close(types.ParticipantCloseReasonRoomClosed)
}
}
@@ -1488,7 +1488,15 @@ func (r *Room) createAgentDispatchesFromRoomAgent() {
return
}
for _, ag := range r.internal.AgentDispatches {
roomDisp := r.internal.AgentDispatches
if len(roomDisp) == 0 {
// Backward compatibility: by default, start any agent in the empty JobName
roomDisp = []*livekit.RoomAgentDispatch{
&livekit.RoomAgentDispatch{},
}
}
for _, ag := range roomDisp {
ad := &livekit.AgentDispatch{
Id: guid.New(guid.AgentDispatchPrefix),
AgentName: ag.AgentName,
+20
View File
@@ -180,6 +180,26 @@ func (m *SubscriptionManager) GetSubscribedTracks() []types.SubscribedTrack {
return tracks
}
func (m *SubscriptionManager) StopAndGetSubscribedTracksForwarderState() map[livekit.TrackID]*livekit.RTPForwarderState {
m.lock.RLock()
defer m.lock.RUnlock()
states := make(map[livekit.TrackID]*livekit.RTPForwarderState, len(m.subscriptions))
for trackID, t := range m.subscriptions {
st := t.getSubscribedTrack()
if st != nil {
dt := st.DownTrack()
if dt != nil {
state := dt.StopWriteAndGetState()
if state.ForwarderState != nil {
states[trackID] = state.ForwarderState
}
}
}
}
return states
}
func (m *SubscriptionManager) HasSubscriptions() bool {
m.lock.RLock()
defer m.lock.RUnlock()
+1 -1
View File
@@ -1409,7 +1409,7 @@ func (t *PCTransport) handleRemoteICECandidate(e event) error {
t.params.Logger.Warnw("failed to add cached ICE candidate", err, "candidate", c)
return errors.Wrap(err, "add ice candidate failed")
} else {
t.params.Logger.Debugw("added cached ICE candidate", "candidate", c)
t.params.Logger.Debugw("added ICE candidate", "candidate", c)
}
return nil
+11 -1
View File
@@ -106,6 +106,7 @@ const (
ParticipantCloseReasonDataChannelError
ParticipantCloseReasonMigrateCodecMismatch
ParticipantCloseReasonSignalSourceClose
ParticipantCloseReasonRoomClosed
)
func (p ParticipantCloseReason) String() string {
@@ -158,6 +159,8 @@ func (p ParticipantCloseReason) String() string {
return "MIGRATE_CODEC_MISMATCH"
case ParticipantCloseReasonSignalSourceClose:
return "SIGNAL_SOURCE_CLOSE"
case ParticipantCloseReasonRoomClosed:
return "ROOM_CLOSED"
default:
return fmt.Sprintf("%d", int(p))
}
@@ -188,6 +191,8 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason {
return livekit.DisconnectReason_STATE_MISMATCH
case ParticipantCloseReasonSignalSourceClose:
return livekit.DisconnectReason_SIGNAL_CLOSE
case ParticipantCloseReasonRoomClosed:
return livekit.DisconnectReason_ROOM_CLOSED
default:
// the other types will map to unknown reason
return livekit.DisconnectReason_UNKNOWN_REASON
@@ -364,6 +369,7 @@ type LocalParticipant interface {
// WaitUntilSubscribed waits until all subscriptions have been settled, or if the timeout
// has been reached. If the timeout expires, it will return an error.
WaitUntilSubscribed(timeout time.Duration) error
StopAndGetSubscribedTracksForwarderState() map[livekit.TrackID]*livekit.RTPForwarderState
// returns list of participant identities that the current participant is subscribed to
GetSubscribedParticipants() []livekit.ParticipantID
@@ -407,7 +413,11 @@ type LocalParticipant interface {
NotifyMigration()
SetMigrateState(s MigrateState)
MigrateState() MigrateState
SetMigrateInfo(previousOffer, previousAnswer *webrtc.SessionDescription, mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo)
SetMigrateInfo(
previousOffer, previousAnswer *webrtc.SessionDescription,
mediaTracks []*livekit.TrackPublishedResponse,
dataChannels []*livekit.DataChannelInfo,
)
UpdateMediaRTT(rtt uint32)
UpdateSignalingRTT(rtt uint32)
@@ -854,6 +854,16 @@ type FakeLocalParticipant struct {
stateReturnsOnCall map[int]struct {
result1 livekit.ParticipantInfo_State
}
StopAndGetSubscribedTracksForwarderStateStub func() map[livekit.TrackID]*livekit.RTPForwarderState
stopAndGetSubscribedTracksForwarderStateMutex sync.RWMutex
stopAndGetSubscribedTracksForwarderStateArgsForCall []struct {
}
stopAndGetSubscribedTracksForwarderStateReturns struct {
result1 map[livekit.TrackID]*livekit.RTPForwarderState
}
stopAndGetSubscribedTracksForwarderStateReturnsOnCall map[int]struct {
result1 map[livekit.TrackID]*livekit.RTPForwarderState
}
SubscribeToTrackStub func(livekit.TrackID)
subscribeToTrackMutex sync.RWMutex
subscribeToTrackArgsForCall []struct {
@@ -5607,6 +5617,59 @@ func (fake *FakeLocalParticipant) StateReturnsOnCall(i int, result1 livekit.Part
}{result1}
}
func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderState() map[livekit.TrackID]*livekit.RTPForwarderState {
fake.stopAndGetSubscribedTracksForwarderStateMutex.Lock()
ret, specificReturn := fake.stopAndGetSubscribedTracksForwarderStateReturnsOnCall[len(fake.stopAndGetSubscribedTracksForwarderStateArgsForCall)]
fake.stopAndGetSubscribedTracksForwarderStateArgsForCall = append(fake.stopAndGetSubscribedTracksForwarderStateArgsForCall, struct {
}{})
stub := fake.StopAndGetSubscribedTracksForwarderStateStub
fakeReturns := fake.stopAndGetSubscribedTracksForwarderStateReturns
fake.recordInvocation("StopAndGetSubscribedTracksForwarderState", []interface{}{})
fake.stopAndGetSubscribedTracksForwarderStateMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderStateCallCount() int {
fake.stopAndGetSubscribedTracksForwarderStateMutex.RLock()
defer fake.stopAndGetSubscribedTracksForwarderStateMutex.RUnlock()
return len(fake.stopAndGetSubscribedTracksForwarderStateArgsForCall)
}
func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderStateCalls(stub func() map[livekit.TrackID]*livekit.RTPForwarderState) {
fake.stopAndGetSubscribedTracksForwarderStateMutex.Lock()
defer fake.stopAndGetSubscribedTracksForwarderStateMutex.Unlock()
fake.StopAndGetSubscribedTracksForwarderStateStub = stub
}
func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderStateReturns(result1 map[livekit.TrackID]*livekit.RTPForwarderState) {
fake.stopAndGetSubscribedTracksForwarderStateMutex.Lock()
defer fake.stopAndGetSubscribedTracksForwarderStateMutex.Unlock()
fake.StopAndGetSubscribedTracksForwarderStateStub = nil
fake.stopAndGetSubscribedTracksForwarderStateReturns = struct {
result1 map[livekit.TrackID]*livekit.RTPForwarderState
}{result1}
}
func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderStateReturnsOnCall(i int, result1 map[livekit.TrackID]*livekit.RTPForwarderState) {
fake.stopAndGetSubscribedTracksForwarderStateMutex.Lock()
defer fake.stopAndGetSubscribedTracksForwarderStateMutex.Unlock()
fake.StopAndGetSubscribedTracksForwarderStateStub = nil
if fake.stopAndGetSubscribedTracksForwarderStateReturnsOnCall == nil {
fake.stopAndGetSubscribedTracksForwarderStateReturnsOnCall = make(map[int]struct {
result1 map[livekit.TrackID]*livekit.RTPForwarderState
})
}
fake.stopAndGetSubscribedTracksForwarderStateReturnsOnCall[i] = struct {
result1 map[livekit.TrackID]*livekit.RTPForwarderState
}{result1}
}
func (fake *FakeLocalParticipant) SubscribeToTrack(arg1 livekit.TrackID) {
fake.subscribeToTrackMutex.Lock()
fake.subscribeToTrackArgsForCall = append(fake.subscribeToTrackArgsForCall, struct {
@@ -6851,6 +6914,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.setTrackMutedMutex.RUnlock()
fake.stateMutex.RLock()
defer fake.stateMutex.RUnlock()
fake.stopAndGetSubscribedTracksForwarderStateMutex.RLock()
defer fake.stopAndGetSubscribedTracksForwarderStateMutex.RUnlock()
fake.subscribeToTrackMutex.RLock()
defer fake.subscribeToTrackMutex.RUnlock()
fake.subscriberAsPrimaryMutex.RLock()
+2 -2
View File
@@ -201,9 +201,9 @@ func (d *DummyReceiver) HeaderExtensions() []webrtc.RTPHeaderExtensionParameter
return d.headerExtensions
}
func (d *DummyReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {
func (d *DummyReceiver) ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.ReadRTP(buf, layer, sn)
return r.ReadRTP(buf, layer, esn)
}
return 0, errors.New("no receiver")
}
+2 -8
View File
@@ -106,15 +106,9 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
internal.TrackEgress = req.Egress.Tracks
}
}
if req.Agent == nil {
// Backward compatibility: by default, start any agent in the empty JobName
req.Agent = &livekit.RoomAgent{
Dispatches: []*livekit.RoomAgentDispatch{
&livekit.RoomAgentDispatch{},
},
}
if req.Agent != nil {
internal.AgentDispatches = req.Agent.Dispatches
}
internal.AgentDispatches = req.Agent.Dispatches
if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 {
internal.PlayoutDelay = &livekit.PlayoutDelay{
Enabled: true,
+17 -58
View File
@@ -17,7 +17,6 @@ package buffer
import (
"encoding/binary"
"errors"
"fmt"
"io"
"strings"
"sync"
@@ -73,7 +72,7 @@ type ExtPacket struct {
type Buffer struct {
sync.RWMutex
readCond *sync.Cond
bucket *bucket.Bucket
bucket *bucket.Bucket[uint64]
nacker *nack.NackQueue
maxVideoPkts int
maxAudioPkts int
@@ -252,10 +251,11 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
switch {
case strings.HasPrefix(b.mime, "audio/"):
b.codecType = webrtc.RTPCodecTypeAudio
b.bucket = bucket.NewBucket(InitPacketBufferSizeAudio)
b.bucket = bucket.NewBucket[uint64](InitPacketBufferSizeAudio)
case strings.HasPrefix(b.mime, "video/"):
b.codecType = webrtc.RTPCodecTypeVideo
b.bucket = bucket.NewBucket(InitPacketBufferSizeVideo)
b.bucket = bucket.NewBucket[uint64](InitPacketBufferSizeVideo)
if b.frameRateCalculator[0] == nil {
if strings.EqualFold(codec.MimeType, webrtc.MimeTypeVP8) {
b.frameRateCalculator[0] = NewFrameRateCalculatorVP8(b.clockRate, b.logger)
@@ -320,28 +320,6 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) {
return
}
if err = utils.ValidateRTPPacket(&rtpPacket, b.payloadType, b.mediaSSRC); err != nil {
invalidPacketCount := b.invalidPacketCount.Inc()
if (invalidPacketCount-1)%100 == 0 {
b.logger.Warnw(
"validating RTP packet failed", err,
"version", rtpPacket.Version,
"padding", rtpPacket.Padding,
"marker", rtpPacket.Marker,
"expectedPayloadType", b.payloadType,
"payloadType", rtpPacket.PayloadType,
"sequenceNumber", rtpPacket.SequenceNumber,
"timestamp", rtpPacket.Timestamp,
"expectedSSRC", b.mediaSSRC,
"ssrc", rtpPacket.SSRC,
"numExtensions", len(rtpPacket.Extensions),
"payloadSize", len(rtpPacket.Payload),
"rtpStats", b.rtpStats,
"snRangeMap", b.snRangeMap,
)
}
}
now := time.Now().UnixNano()
if b.twcc != nil && b.twccExtID != 0 && !b.closed.Load() {
if ext := rtpPacket.GetExtension(b.twccExtID); ext != nil {
@@ -349,6 +327,13 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) {
}
}
// libwebrtc will use 0 ssrc for probing, don't push the packet to pending queue to avoid memory increasing since
// the Bind will not be called to consume the pending packets. More details in https://github.com/pion/webrtc/pull/2816
if rtpPacket.SSRC == 0 {
b.Unlock()
return
}
// handle RTX packet
if pb := b.primaryBufferForRTX; pb != nil {
b.Unlock()
@@ -618,7 +603,7 @@ func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, i
}
flowState.ExtSequenceNumber -= snAdjustment
rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber)
_, err = b.bucket.AddPacketWithSequenceNumber(rawPkt, rtpPacket.Header.SequenceNumber)
_, err = b.bucket.AddPacketWithSequenceNumber(rawPkt, flowState.ExtSequenceNumber)
if err != nil {
if !flowState.IsDuplicate {
if errors.Is(err, bucket.ErrPacketTooOld) {
@@ -664,7 +649,7 @@ func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, i
}
func (b *Buffer) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket {
n, err := b.getPacket(buf, ep.Packet.SequenceNumber)
n, err := b.getPacket(buf, ep.ExtSequenceNumber)
if err != nil {
packetNotFoundCount := b.packetNotFoundCount.Inc()
if (packetNotFoundCount-1)%20 == 0 {
@@ -689,32 +674,6 @@ func (b *Buffer) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket {
b.logger.Warnw("unexpected marshal size", nil, "max", n, "need", payloadEnd)
return nil
}
// TODO-REMOVE-AFTER-DEBUG START
if payloadEnd != n {
paddingEnd := payloadStart + int(ep.Packet.PaddingSize)
if paddingEnd != n {
b.logger.Warnw("unexpected marshal size", nil, "max", n, "payloadEnd", payloadEnd, "paddingEnd", paddingEnd)
}
}
// check a few fields for validity
checkVersion := (buf[0] & 0xc0) >> 6
checkPayloadType := buf[1] & 0x7f
checkSequenceNumber := binary.BigEndian.Uint16(buf[2:])
checkSSRC := binary.BigEndian.Uint32(buf[8:])
if checkVersion != pkt.Version || checkPayloadType != pkt.PayloadType || checkSequenceNumber != pkt.SequenceNumber || checkSSRC != pkt.SSRC {
b.logger.Warnw(
"rtp packet mismatch", nil,
"version", fmt.Sprintf("%d != %d", checkVersion, pkt.Version),
"payloadType", fmt.Sprintf("%d != %d", checkPayloadType, pkt.PayloadType),
"sequenceNumber", fmt.Sprintf("%d != %d", checkSequenceNumber, pkt.SequenceNumber),
"SSRC", fmt.Sprintf("%d != %d", checkSSRC, pkt.SSRC),
"bytes", buf[0:16],
"len", n,
"headerSize", payloadStart,
"payloadSize", payloadEnd-payloadStart,
)
}
// TODO-REMOVE-AFTER-DEBUG END
pkt.Payload = buf[payloadStart:payloadEnd]
ep.Packet = &pkt
@@ -1027,18 +986,18 @@ func (b *Buffer) getRTCP() []rtcp.Packet {
return pkts
}
func (b *Buffer) GetPacket(buff []byte, sn uint16) (int, error) {
func (b *Buffer) GetPacket(buff []byte, esn uint64) (int, error) {
b.Lock()
defer b.Unlock()
return b.getPacket(buff, sn)
return b.getPacket(buff, esn)
}
func (b *Buffer) getPacket(buff []byte, sn uint16) (int, error) {
func (b *Buffer) getPacket(buff []byte, esn uint64) (int, error) {
if b.closed.Load() {
return 0, io.EOF
}
return b.bucket.GetPacket(buff, sn)
return b.bucket.GetPacket(buff, esn)
}
func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) {
+65 -22
View File
@@ -137,14 +137,21 @@ func (r *RTPStatsReceiver) NewSnapshotId() uint32 {
return r.newSnapshotID(r.sequenceNumber.GetExtendedHighest())
}
func (r *RTPStatsReceiver) getTSRolloverCount(diffNano int64) int {
func (r *RTPStatsReceiver) getTSRolloverCount(diffNano int64, ts uint32) int {
if diffNano < r.tsRolloverThreshold {
// time not more than rollover threshold
return 0
return -1
}
excess := int((diffNano - r.tsRolloverThreshold) * int64(r.params.ClockRate) / 1e9)
return excess/(1<<32) + 1
excess := int((diffNano - r.tsRolloverThreshold*2) * int64(r.params.ClockRate) / 1e9)
roc := excess / (1 << 32)
if roc < 0 {
roc = 0
}
if r.timestamp.GetHighest() > ts {
roc++
}
return roc
}
func (r *RTPStatsReceiver) Update(
@@ -167,6 +174,8 @@ func (r *RTPStatsReceiver) Update(
var resSN utils.WrapAroundUpdateResult[uint64]
var gapSN int64
var resTS utils.WrapAroundUpdateResult[uint64]
var timeSinceHighest int64
var tsRolloverCount int
getLoggingFields := func() []interface{} {
return []interface{}{
@@ -174,6 +183,8 @@ func (r *RTPStatsReceiver) Update(
"gapSN", gapSN,
"resTS", resTS,
"gapTS", int64(resTS.ExtendedVal - resTS.PreExtendedHighest),
"timeSinceHighest", time.Duration(timeSinceHighest),
"tsRolloverCount", tsRolloverCount,
"packetTime", time.Unix(0, packetTime).String(),
"sequenceNumber", sequenceNumber,
"timestamp", timestamp,
@@ -219,30 +230,57 @@ func (r *RTPStatsReceiver) Update(
}
gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
resTS = r.timestamp.Rollover(timestamp, r.getTSRolloverCount(packetTime-r.highestTime))
timeSinceHighest = packetTime - r.highestTime
tsRolloverCount = r.getTSRolloverCount(timeSinceHighest, timestamp)
if tsRolloverCount >= 0 {
r.logger.Warnw(
"potential time stamp roll over", nil,
getLoggingFields()...,
)
}
resTS = r.timestamp.Rollover(timestamp, tsRolloverCount)
if resTS.IsUnhandled {
flowState.IsNotHandled = true
return
}
gapTS := int64(resTS.ExtendedVal - resTS.PreExtendedHighest)
// it is possible that sequence number has rolled over too
if gapSN < 0 && gapTS > 0 && payloadSize > 0 {
// not possible to know how many cycles of sequence number roll over could have happened,
// use 1 to ensure that it at least does not go backwards
resSN = r.sequenceNumber.Rollover(sequenceNumber, 1)
if resSN.IsUnhandled {
flowState.IsNotHandled = true
return
}
// it is possible to reecive old packets,
// as it is not possible to detect how far to roll back sequence number, ignore old packets
if gapTS < 0 && gapSN > 0 {
r.sequenceNumber.UndoUpdate(resSN)
r.logger.Warnw(
"forcing sequence number rollover", nil,
"dropping old packet", nil,
getLoggingFields()...,
)
gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
flowState.IsNotHandled = true
return
}
// it is possible that sequence number has rolled over too
if gapSN < 0 && gapTS > 0 && payloadSize > 0 {
if tsRolloverCount >= 0 {
// not possible to know how many cycles of sequence number roll over could have happened,
// use 1 to ensure that it at least does not go backwards
resSN = r.sequenceNumber.Rollover(sequenceNumber, 1)
if resSN.IsUnhandled {
flowState.IsNotHandled = true
return
}
r.logger.Warnw(
"forcing sequence number rollover", nil,
getLoggingFields()...,
)
} else {
r.logger.Warnw(
"forcing sequence number rollover skipped", nil,
getLoggingFields()...,
)
}
}
}
gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
pktSize := uint64(hdrSize + payloadSize + paddingSize)
if gapSN <= 0 { // duplicate OR out-of-order
@@ -263,8 +301,6 @@ func (r *RTPStatsReceiver) Update(
}
flowState.IsOutOfOrder = true
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
if !flowState.IsDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpNegativeCount++
@@ -316,9 +352,9 @@ func (r *RTPStatsReceiver) Update(
flowState.LossStartInclusive = resSN.PreExtendedHighest + 1
flowState.LossEndExclusive = resSN.ExtendedVal
}
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
}
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
if !flowState.IsDuplicate {
if payloadSize == 0 {
@@ -585,8 +621,8 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
return false
}
r.updatePropagationDelayAndRecordSenderReport(srDataExt)
r.checkRTPClockSkewForSenderReport(srDataExt)
r.updatePropagationDelayAndRecordSenderReport(srDataExt)
r.checkRTPClockSkewAgainstMediaPathForSenderReport(srDataExt)
if err, loggingFields := r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()); err != nil {
@@ -731,6 +767,13 @@ func (r *RTPStatsReceiver) isInRange(esn uint64, ehsn uint64) bool {
return diff >= 0 && diff < cHistorySize
}
func (r *RTPStatsReceiver) HighestTimestamp() uint32 {
r.lock.RLock()
defer r.lock.RUnlock()
return r.timestamp.GetHighest()
}
// ----------------------------------
type lockedRTPStatsReceiverLogEncoder struct {
+18
View File
@@ -249,6 +249,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
// padding only
sequenceNumber += 2
timestamp += 3000
packet = getPacket(sequenceNumber, timestamp, 0)
flowState = r.Update(
time.Now().UnixNano(),
@@ -266,5 +267,22 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
require.True(t, r.history.IsSet(uint64(sequenceNumber)-1))
require.True(t, r.history.IsSet(uint64(sequenceNumber)-2))
// old packet, but simulating increasing sequence number after roll over
packet = getPacket(sequenceNumber+400, timestamp-6000, 300)
flowState = r.Update(
time.Now().UnixNano(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
packet.Header.MarshalSize(),
len(packet.Payload),
0,
)
require.True(t, flowState.IsNotHandled)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
require.Equal(t, timestamp, r.timestamp.GetHighest())
require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
r.Stop()
}
+7
View File
@@ -990,6 +990,13 @@ func (r *RTPStatsSender) getIntervalStats(
return
}
func (r *RTPStatsSender) ExtHighestSequenceNumber() uint64 {
r.lock.RLock()
defer r.lock.RUnlock()
return r.extHighestSN
}
// -------------------------------------------------------------------
type lockedRTPStatsSenderLogEncoder struct {
+9 -25
View File
@@ -15,10 +15,9 @@
package codecmunger
import (
"fmt"
"github.com/elliotchance/orderedmap/v2"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
@@ -32,23 +31,6 @@ const (
// -----------------------------------------------------------
type VP8State struct {
ExtLastPictureId int32
PictureIdUsed bool
LastTl0PicIdx uint8
Tl0PicIdxUsed bool
TidUsed bool
LastKeyIdx uint8
KeyIdxUsed bool
}
func (v VP8State) String() string {
return fmt.Sprintf("VP8State{extLastPictureId: %d, pictureIdUsed: %+v, lastTl0PicIdx: %d, tl0PicIdxUsed: %+v, tidUsed: %+v, lastKeyIdx: %d, keyIdxUsed: %+v)",
v.ExtLastPictureId, v.PictureIdUsed, v.LastTl0PicIdx, v.Tl0PicIdxUsed, v.TidUsed, v.LastKeyIdx, v.KeyIdxUsed)
}
// -----------------------------------------------------------
type VP8 struct {
logger logger.Logger
@@ -85,25 +67,27 @@ func NewVP8FromNull(cm CodecMunger, logger logger.Logger) *VP8 {
}
func (v *VP8) GetState() interface{} {
return VP8State{
return &livekit.VP8MungerState{
ExtLastPictureId: v.extLastPictureId,
PictureIdUsed: v.pictureIdUsed,
LastTl0PicIdx: v.lastTl0PicIdx,
LastTl0PicIdx: uint32(v.lastTl0PicIdx),
Tl0PicIdxUsed: v.tl0PicIdxUsed,
TidUsed: v.tidUsed,
LastKeyIdx: v.lastKeyIdx,
LastKeyIdx: uint32(v.lastKeyIdx),
KeyIdxUsed: v.keyIdxUsed,
}
}
func (v *VP8) SeedState(seed interface{}) {
if state, ok := seed.(VP8State); ok {
switch cm := seed.(type) {
case *livekit.RTPForwarderState_Vp8Munger:
state := cm.Vp8Munger
v.extLastPictureId = state.ExtLastPictureId
v.pictureIdUsed = state.PictureIdUsed
v.lastTl0PicIdx = state.LastTl0PicIdx
v.lastTl0PicIdx = uint8(state.LastTl0PicIdx)
v.tl0PicIdxUsed = state.Tl0PicIdxUsed
v.tidUsed = state.TidUsed
v.lastKeyIdx = state.LastKeyIdx
v.lastKeyIdx = uint8(state.LastKeyIdx)
v.keyIdxUsed = state.KeyIdxUsed
}
}
+19 -5
View File
@@ -138,7 +138,7 @@ var (
type DownTrackState struct {
RTPStats *buffer.RTPStatsSender
DeltaStatsSenderSnapshotId uint32
ForwarderState ForwarderState
ForwarderState *livekit.RTPForwarderState
}
func (d DownTrackState) String() string {
@@ -265,6 +265,7 @@ type DownTrack struct {
connected atomic.Bool
bindAndConnectedOnce atomic.Bool
writable atomic.Bool
writeStopped atomic.Bool
rtpStats *buffer.RTPStatsSender
@@ -1147,20 +1148,30 @@ func (d *DownTrack) MaxLayer() buffer.VideoLayer {
}
func (d *DownTrack) GetState() DownTrackState {
dts := DownTrackState{
return DownTrackState{
RTPStats: d.rtpStats,
DeltaStatsSenderSnapshotId: d.deltaStatsSenderSnapshotId,
ForwarderState: d.forwarder.GetState(),
}
return dts
}
func (d *DownTrack) SeedState(state DownTrackState) {
d.rtpStats.Seed(state.RTPStats)
d.deltaStatsSenderSnapshotId = state.DeltaStatsSenderSnapshotId
if state.RTPStats != nil {
d.rtpStats.Seed(state.RTPStats)
d.deltaStatsSenderSnapshotId = state.DeltaStatsSenderSnapshotId
}
d.forwarder.SeedState(state.ForwarderState)
}
func (d *DownTrack) StopWriteAndGetState() DownTrackState {
d.bindLock.Lock()
d.writable.Store(false)
d.writeStopped.Store(true)
d.bindLock.Unlock()
return d.GetState()
}
func (d *DownTrack) UpTrackLayersChange() {
if sal := d.getStreamAllocatorListener(); sal != nil {
sal.OnAvailableLayersChanged(d)
@@ -1985,6 +1996,9 @@ func (d *DownTrack) GetAndResetBytesSent() (uint32, uint32) {
*/
func (d *DownTrack) onBindAndConnectedChange() {
if d.writeStopped.Load() {
return
}
d.writable.Store(d.connected.Load() && d.bound.Load())
if d.connected.Load() && d.bound.Load() && !d.bindAndConnectedOnce.Swap(true) {
if d.activePaddingOnMuteUpTrack.Load() {
+38 -51
View File
@@ -27,6 +27,7 @@ import (
"github.com/pion/webrtc/v3"
"go.uber.org/zap/zapcore"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
@@ -186,35 +187,6 @@ type TranslationParams struct {
// -------------------------------------------------------------------
type ForwarderState struct {
Started bool
ReferenceLayerSpatial int32
PreStartTime time.Time
ExtFirstTS uint64
DummyStartTSOffset uint64
RTP RTPMungerState
Codec interface{}
}
func (f ForwarderState) String() string {
codecString := ""
switch codecState := f.Codec.(type) {
case codecmunger.VP8State:
codecString = codecState.String()
}
return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, extFirstTS: %d, dummyStartTSOffset: %d, rtp: %s, codec: %s}",
f.Started,
f.ReferenceLayerSpatial,
f.PreStartTime.String(),
f.ExtFirstTS,
f.DummyStartTSOffset,
f.RTP.String(),
codecString,
)
}
// -------------------------------------------------------------------
type refInfo struct {
senderReport *buffer.RTCPSenderReportData
tsOffset uint64
@@ -402,41 +374,52 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [
}
}
func (f *Forwarder) GetState() ForwarderState {
func (f *Forwarder) GetState() *livekit.RTPForwarderState {
f.lock.RLock()
defer f.lock.RUnlock()
if !f.started {
return ForwarderState{}
return nil
}
return ForwarderState{
Started: f.started,
ReferenceLayerSpatial: f.referenceLayerSpatial,
PreStartTime: f.preStartTime,
ExtFirstTS: f.extFirstTS,
DummyStartTSOffset: f.dummyStartTSOffset,
RTP: f.rtpMunger.GetLast(),
Codec: f.codecMunger.GetState(),
state := &livekit.RTPForwarderState{
Started: f.started,
ReferenceLayerSpatial: f.referenceLayerSpatial,
ExtFirstTimestamp: f.extFirstTS,
DummyStartTimestampOffset: f.dummyStartTSOffset,
RtpMunger: f.rtpMunger.GetState(),
}
if !f.preStartTime.IsZero() {
state.PreStartTime = f.preStartTime.UnixNano()
}
codecMungerState := f.codecMunger.GetState()
if vp8MungerState, ok := codecMungerState.(*livekit.VP8MungerState); ok {
state.CodecMunger = &livekit.RTPForwarderState_Vp8Munger{
Vp8Munger: vp8MungerState,
}
}
return state
}
func (f *Forwarder) SeedState(state ForwarderState) {
if !state.Started {
func (f *Forwarder) SeedState(state *livekit.RTPForwarderState) {
if state == nil || !state.Started {
return
}
f.lock.Lock()
defer f.lock.Unlock()
f.rtpMunger.SeedLast(state.RTP)
f.codecMunger.SeedState(state.Codec)
f.rtpMunger.SeedState(state.RtpMunger)
f.codecMunger.SeedState(state.CodecMunger)
f.started = true
f.referenceLayerSpatial = state.ReferenceLayerSpatial
f.preStartTime = state.PreStartTime
f.extFirstTS = state.ExtFirstTS
f.dummyStartTSOffset = state.DummyStartTSOffset
if state.PreStartTime != 0 {
f.preStartTime = time.Unix(0, state.PreStartTime)
}
f.extFirstTS = state.ExtFirstTimestamp
f.dummyStartTSOffset = state.DummyStartTimestampOffset
}
func (f *Forwarder) Mute(muted bool, isSubscribeMutable bool) bool {
@@ -1654,12 +1637,14 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
f.logger.Debugw(
message,
"layer", layer,
"referenceLayerSpatial", f.referenceLayerSpatial,
"extExpectedTS", extExpectedTS,
"incomingTS", extPkt.Packet.Timestamp,
"extIncomingTS", extPkt.ExtTimestamp,
"extRefTS", extRefTS,
"extLastTS", extLastTS,
"diffSeconds", math.Abs(diffSeconds),
"refInfos", wrappedRefInfoLogger{f},
)
}
// TODO-REMOVE-AFTER-DATA-COLLECTION
@@ -1667,12 +1652,14 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
f.logger.Infow(
message,
"layer", layer,
"referenceLayerSpatial", f.referenceLayerSpatial,
"extExpectedTS", extExpectedTS,
"incomingTS", extPkt.Packet.Timestamp,
"extIncomingTS", extPkt.ExtTimestamp,
"extRefTS", extRefTS,
"extLastTS", extLastTS,
"diffSeconds", math.Abs(diffSeconds),
"refInfos", wrappedRefInfoLogger{f},
)
}
@@ -1686,8 +1673,8 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
// 3. extExpectedTS -> expected timestamp of this packet calculated based on elapsed time since first packet
// Ideally, extRefTS and extExpectedTS should be very close and extLastTS should be before both of those.
// But, cases like muting/unmuting, clock vagaries, pacing, etc. make them not satisfy those conditions always.
rtpMungerState := f.rtpMunger.GetLast()
extLastTS := rtpMungerState.ExtLastTS
rtpMungerState := f.rtpMunger.GetState()
extLastTS := rtpMungerState.ExtLastTimestamp
extExpectedTS := extLastTS
extRefTS := extLastTS
refTS := uint32(extRefTS)
@@ -1838,7 +1825,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"extExpectedTS", extExpectedTS,
"extNextTS", extNextTS,
"tsJump", extNextTS-extLastTS,
"nextSN", rtpMungerState.ExtLastSN+1,
"nextSN", rtpMungerState.ExtLastSequenceNumber+1,
"extIncomingSN", extPkt.ExtSequenceNumber,
"incomingTS", extPkt.Packet.Timestamp,
"extIncomingTS", extPkt.ExtTimestamp,
@@ -1856,7 +1843,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"extExpectedTS", extExpectedTS,
"extNextTS", extNextTS,
"tsJump", extNextTS-extLastTS,
"nextSN", rtpMungerState.ExtLastSN+1,
"nextSN", rtpMungerState.ExtLastSequenceNumber+1,
"extIncomingSN", extPkt.ExtSequenceNumber,
"extIncomingTS", extPkt.ExtTimestamp,
)
@@ -2052,7 +2039,7 @@ func (f *Forwarder) GetSnTsForBlankFrames(frameRate uint32, numPackets int) ([]S
numPackets++
}
extLastTS := f.rtpMunger.GetLast().ExtLastTS
extLastTS := f.rtpMunger.GetState().ExtLastTimestamp
extExpectedTS := extLastTS
if f.getExpectedRTPTimestamp != nil {
tsExt, err := f.getExpectedRTPTimestamp(time.Now())
+3 -19
View File
@@ -56,7 +56,7 @@ type TrackReceiver interface {
HeaderExtensions() []webrtc.RTPHeaderExtensionParameter
IsClosed() bool
ReadRTP(buf []byte, layer uint8, sn uint16) (int, error)
ReadRTP(buf []byte, layer uint8, esn uint64) (int, error)
GetLayeredBitrate() ([]int32, Bitrates)
GetAudioLevel() (float64, bool)
@@ -576,13 +576,13 @@ func (w *WebRTCReceiver) getBufferLocked(layer int32) *buffer.Buffer {
return w.buffers[layer]
}
func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {
func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) {
b := w.getBuffer(int32(layer))
if b == nil {
return 0, ErrBufferNotFound
}
return b.GetPacket(buf, sn)
return b.GetPacket(buf, esn)
}
func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats {
@@ -713,22 +713,6 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
spatialTracker = w.streamTrackerManager.AddTracker(pkt.Spatial)
}
}
if spatialLayer > buffer.DefaultMaxLayerSpatial { // TODO-REMOVE-AFTER-DEBUG
w.logger.Warnw(
"invalid spatial layer", nil,
"mime", w.codec.MimeType,
"layer", layer,
"spatialLayer", spatialLayer,
"sn", pkt.Packet.SequenceNumber,
"esn", pkt.ExtSequenceNumber,
"timestamp", pkt.Packet.Timestamp,
"ets", pkt.ExtTimestamp,
"payloadSize", len(pkt.Packet.Payload),
"rtpVersion", pkt.Packet.Version,
"payloadType", pkt.Packet.PayloadType,
"ssrc", pkt.Packet.SSRC,
)
}
writeCount := w.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.WriteRTP(pkt, spatialLayer)
+2 -2
View File
@@ -136,8 +136,8 @@ func (r *RedPrimaryReceiver) Close() {
closeTrackSenders(r.downTrackSpreader.ResetAndGetDownTracks())
}
func (r *RedPrimaryReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {
n, err := r.TrackReceiver.ReadRTP(buf, layer, sn)
func (r *RedPrimaryReceiver) ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) {
n, err := r.TrackReceiver.ReadRTP(buf, layer, esn)
if err != nil {
return n, err
}
+1 -1
View File
@@ -127,7 +127,7 @@ func (r *RedReceiver) Close() {
closeTrackSenders(r.downTrackSpreader.ResetAndGetDownTracks())
}
func (r *RedReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {
func (r *RedReceiver) ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) {
// red encoding doesn't support nack
return 0, bucket.ErrPacketMismatch
}
+14 -35
View File
@@ -15,8 +15,7 @@
package sfu
import (
"fmt"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
@@ -50,26 +49,6 @@ type SnTs struct {
// ----------------------------------------------------------------------
type RTPMungerState struct {
ExtLastSN uint64
ExtSecondLastSN uint64
ExtLastTS uint64
ExtSecondLastTS uint64
LastMarker bool
SecondLastMarker bool
}
func (r RTPMungerState) String() string {
return fmt.Sprintf(
"RTPMungerState{extLastSN: %d, extSecondLastSN: %d, extLastTS: %d, extSecondLastTS: %d, lastMarker: %v, secondLastMarker: %v)",
r.ExtLastSN, r.ExtSecondLastSN,
r.ExtLastTS, r.ExtSecondLastTS,
r.LastMarker, r.SecondLastMarker,
)
}
// ----------------------------------------------------------------------
type RTPMunger struct {
logger logger.Logger
@@ -112,14 +91,14 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} {
}
}
func (r *RTPMunger) GetLast() RTPMungerState {
return RTPMungerState{
ExtLastSN: r.extLastSN,
ExtSecondLastSN: r.extSecondLastSN,
ExtLastTS: r.extLastTS,
ExtSecondLastTS: r.extSecondLastTS,
LastMarker: r.lastMarker,
SecondLastMarker: r.secondLastMarker,
func (r *RTPMunger) GetState() *livekit.RTPMungerState {
return &livekit.RTPMungerState{
ExtLastSequenceNumber: r.extLastSN,
ExtSecondLastSequenceNumber: r.extSecondLastSN,
ExtLastTimestamp: r.extLastTS,
ExtSecondLastTimestamp: r.extSecondLastTS,
LastMarker: r.lastMarker,
SecondLastMarker: r.secondLastMarker,
}
}
@@ -127,11 +106,11 @@ func (r *RTPMunger) GetTSOffset() uint64 {
return r.tsOffset
}
func (r *RTPMunger) SeedLast(state RTPMungerState) {
r.extLastSN = state.ExtLastSN
r.extSecondLastSN = state.ExtSecondLastSN
r.extLastTS = state.ExtLastTS
r.extSecondLastTS = state.ExtSecondLastTS
func (r *RTPMunger) SeedState(state *livekit.RTPMungerState) {
r.extLastSN = state.ExtLastSequenceNumber
r.extSecondLastSN = state.ExtSecondLastSequenceNumber
r.extLastTS = state.ExtLastTimestamp
r.extSecondLastTS = state.ExtSecondLastTimestamp
r.lastMarker = state.LastMarker
r.secondLastMarker = state.SecondLastMarker
}
+4 -4
View File
@@ -42,10 +42,10 @@ func itob(i int) bool {
}
type packetMeta struct {
// Original sequence number from stream.
// The original sequence number is used to find the original
// Original extended sequence number from stream.
// The original extended sequence number is used to find the original
// packet from publisher
sourceSeqNo uint16
sourceSeqNo uint64
// Modified sequence number after offset.
// This sequence number is used for the associated
// down track, is modified according the offsets, and
@@ -199,7 +199,7 @@ func (s *sequencer) push(
slot := extModifiedSNAdjusted % uint64(s.size)
s.meta[slot] = packetMeta{
sourceSeqNo: uint16(extIncomingSN),
sourceSeqNo: extIncomingSN,
targetSeqNo: uint16(extModifiedSN),
timestamp: uint32(extModifiedTS),
marker: marker,
+4 -4
View File
@@ -45,7 +45,7 @@ func Test_sequencer(t *testing.T) {
require.Equal(t, len(req), len(res))
for i, val := range res {
require.Equal(t, val.targetSeqNo, req[i])
require.Equal(t, val.sourceSeqNo, req[i]-off)
require.Equal(t, val.sourceSeqNo, uint64(req[i]-off))
require.Equal(t, val.layer, int8(2))
require.Equal(t, val.extSequenceNumber, uint64(req[i]))
require.Equal(t, val.extTimestamp, uint64(123))
@@ -57,7 +57,7 @@ func Test_sequencer(t *testing.T) {
require.Equal(t, len(req), len(res))
for i, val := range res {
require.Equal(t, val.targetSeqNo, req[i])
require.Equal(t, val.sourceSeqNo, req[i]-off)
require.Equal(t, val.sourceSeqNo, uint64(req[i]-off))
require.Equal(t, val.layer, int8(2))
require.Equal(t, val.extSequenceNumber, uint64(req[i]))
require.Equal(t, val.extTimestamp, uint64(123))
@@ -204,7 +204,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
g := n.getExtPacketMetas(tt.args.seqNo)
var got []uint16
for _, sn := range g {
got = append(got, sn.sourceSeqNo)
got = append(got, uint16(sn.sourceSeqNo))
if sn.sourceSeqNo%5 == 0 {
require.Equal(t, tt.fields.markerOdd, sn.marker)
require.Equal(t, tt.fields.codecBytesOversized, sn.codecBytesSlice)
@@ -343,7 +343,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
g := n.getExtPacketMetas(tt.args.seqNo)
var got []uint16
for _, sn := range g {
got = append(got, sn.sourceSeqNo)
got = append(got, uint16(sn.sourceSeqNo))
if sn.sourceSeqNo%2 == 0 {
require.Equal(t, tt.fields.markerEven, sn.marker)
require.Equal(t, tt.fields.codecBytesEven, sn.codecBytes[:sn.numCodecBytesOut])
+9 -1
View File
@@ -111,8 +111,16 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
return
}
func (w *WrapAround[T, ET]) UndoUpdate(result WrapAroundUpdateResult[ET]) {
if !w.initialized || result.PreExtendedHighest >= result.ExtendedVal {
return
}
w.ResetHighest(result.PreExtendedHighest)
}
func (w *WrapAround[T, ET]) Rollover(val T, numCycles int) (result WrapAroundUpdateResult[ET]) {
if !w.initialized || numCycles == 0 {
if !w.initialized || numCycles < 0 {
return w.Update(val)
}
+4 -3
View File
@@ -476,10 +476,11 @@ func TestWrapAroundUint16Rollover(t *testing.T) {
highest: 10,
extendedHighest: 10,
},
// zero cycles - should just do an update
// negative cycles - should just do an update
{
name: "zero",
input: 8,
name: "zero",
input: 8,
numCycles: -1,
updated: WrapAroundUpdateResult[uint32]{
IsUnhandled: true,
// the following fields are not valid when `IsUnhandled = true`, but code fills it in