diff --git a/README.md b/README.md
index 896cf9d4c..313e8fe21 100644
--- a/README.md
+++ b/README.md
@@ -305,11 +305,11 @@ LiveKit server is licensed under Apache License v2.0.
diff --git a/go.mod b/go.mod
index add999cac..3977bd586 100644
--- a/go.mod
+++ b/go.mod
@@ -8,7 +8,7 @@ require (
github.com/d5/tengo/v2 v2.17.0
github.com/dustin/go-humanize v1.0.1
github.com/elliotchance/orderedmap/v2 v2.2.0
- github.com/florianl/go-tc v0.4.3
+ github.com/florianl/go-tc v0.4.4
github.com/frostbyte73/core v0.0.10
github.com/gammazero/deque v0.2.1
github.com/gammazero/workerpool v1.1.3
@@ -19,8 +19,8 @@ require (
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
- github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7
- github.com/livekit/protocol v1.19.2-0.20240719172332-0df8e893874b
+ github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598
+ github.com/livekit/protocol v1.19.4-0.20240805121416-5be7cb358ec1
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
@@ -40,19 +40,19 @@ require (
github.com/pion/webrtc/v3 v3.2.47
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.1
- github.com/redis/go-redis/v9 v9.5.4
+ github.com/redis/go-redis/v9 v9.6.1
github.com/rs/cors v1.11.0
github.com/stretchr/testify v1.9.0
github.com/thoas/go-funk v0.9.3
github.com/twitchtv/twirp v8.1.3+incompatible
github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6
- github.com/urfave/cli/v2 v2.27.2
+ github.com/urfave/cli/v2 v2.27.3
github.com/urfave/negroni/v3 v3.1.1
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
- golang.org/x/exp v0.0.0-20240716160929-1d5bc16f04a8
- golang.org/x/sync v0.7.0
+ golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
+ golang.org/x/sync v0.8.0
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
)
@@ -125,17 +125,17 @@ require (
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
- github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
+ github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/zap/exp v0.2.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
- golang.org/x/mod v0.19.0 // indirect
+ golang.org/x/mod v0.20.0 // indirect
golang.org/x/net v0.27.0 // indirect
- golang.org/x/sys v0.22.0 // indirect
+ golang.org/x/sys v0.23.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/tools v0.23.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
- google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d // indirect
+ google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect
google.golang.org/grpc v1.65.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
diff --git a/go.sum b/go.sum
index 6b708f506..f78e4d63f 100644
--- a/go.sum
+++ b/go.sum
@@ -68,8 +68,8 @@ github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
-github.com/florianl/go-tc v0.4.3 h1:xpobG2gFNvEqbclU07zjddALSjqTQTWJkxg5/kRYDpw=
-github.com/florianl/go-tc v0.4.3/go.mod h1:uvp6pIlOw7Z8hhfnT5M4+V1hHVgZWRZwwMS8Z0JsRxc=
+github.com/florianl/go-tc v0.4.4 h1:q6lhEWEfyhGffRzdl3eIcNqX/yVIw0IJwXqa9Rdcctw=
+github.com/florianl/go-tc v0.4.4/go.mod h1:uvp6pIlOw7Z8hhfnT5M4+V1hHVgZWRZwwMS8Z0JsRxc=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
github.com/frostbyte73/core v0.0.10 h1:D4DQXdPb8ICayz0n75rs4UYTXrUSdxzUfeleuNJORsU=
@@ -165,10 +165,10 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
-github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 h1:F1L8inJoynwIAYpZENNYS+1xHJMF5RFRorsnAlcxfSY=
-github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
-github.com/livekit/protocol v1.19.2-0.20240719172332-0df8e893874b h1:Wn6D+B5YbMe1tH7WCazLJz+msBQzR69dK2wTdgJsF5k=
-github.com/livekit/protocol v1.19.2-0.20240719172332-0df8e893874b/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4=
+github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
+github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
+github.com/livekit/protocol v1.19.4-0.20240805121416-5be7cb358ec1 h1:GP4QtOjYE6zDdtIi8AyM6ukse55HXr0174uOYXxb/H8=
+github.com/livekit/protocol v1.19.4-0.20240805121416-5be7cb358ec1/go.mod h1:oU5XbEaQlywdgXcSQDzrI5CPnwuGn/HuRXuQaDxVryQ=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
@@ -288,8 +288,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/puzpuzpuz/xsync/v3 v3.1.0 h1:EewKT7/LNac5SLiEblJeUu8z5eERHrmRLnMQL2d7qX4=
github.com/puzpuzpuz/xsync/v3 v3.1.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
-github.com/redis/go-redis/v9 v9.5.4 h1:vOFYDKKVgrI5u++QvnMT7DksSMYg7Aw/Np4vLJLKLwY=
-github.com/redis/go-redis/v9 v9.5.4/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
+github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
+github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
@@ -323,8 +323,8 @@ github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJX
github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A=
github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6 h1:SIKIoA4e/5Y9ZOl0DCe3eVMLPOQzJxgZpfdHHeauNTM=
github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6/go.mod h1:BUbeWZiieNxAuuADTBNb3/aeje6on3DhU3rpWsQSB1E=
-github.com/urfave/cli/v2 v2.27.2 h1:6e0H+AkS+zDckwPCUrZkKX38mRaau4nL2uipkJpbkcI=
-github.com/urfave/cli/v2 v2.27.2/go.mod h1:g0+79LmHHATl7DAcHO99smiR/T7uGLw84w8Y42x+4eM=
+github.com/urfave/cli/v2 v2.27.3 h1:/POWahRmdh7uztQ3CYnaDddk0Rm90PyOgIxgW2rr41M=
+github.com/urfave/cli/v2 v2.27.3/go.mod h1:m4QzxcD2qpra4z7WhzEGn74WZLViBnMpb1ToCAKdGRQ=
github.com/urfave/negroni/v3 v3.1.1 h1:6MS4nG9Jk/UuCACaUlNXCbiKa0ywF9LXz5dGu09v8hw=
github.com/urfave/negroni/v3 v3.1.1/go.mod h1:jWvnX03kcSjDBl/ShB0iHvx5uOs7mAzZXW+JvJ5XYAs=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
@@ -334,8 +334,8 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
-github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 h1:+qGGcbkzsfDQNPPe9UDgpxAWQrhbbBXOYJFQDq/dtJw=
-github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913/go.mod h1:4aEEwZQutDLsQv2Deui4iYQ6DWTxR14g6m8Wv88+Xqk=
+github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
+github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
@@ -365,16 +365,16 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
-golang.org/x/exp v0.0.0-20240716160929-1d5bc16f04a8 h1:Z+vTUQyBb738QmIhbJx3z4htsxDeI+rd0EHvNm8jHkg=
-golang.org/x/exp v0.0.0-20240716160929-1d5bc16f04a8/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
+golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
+golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
-golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8=
-golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
+golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0=
+golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -411,8 +411,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
-golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
-golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
+golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190411185658-b44545bcd369/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -452,8 +452,8 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
-golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
-golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
+golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@@ -494,8 +494,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw=
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU=
-google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d h1:JU0iKnSg02Gmb5ZdV8nYsKEKsP6o/FGVWTrw4i1DA9A=
-google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf h1:liao9UHurZLtiEwBgT9LMOnKYsHze6eA6w1KQCMVN2Q=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
diff --git a/pkg/agent/client.go b/pkg/agent/client.go
index 13d4aba9b..df1ff7dfe 100644
--- a/pkg/agent/client.go
+++ b/pkg/agent/client.go
@@ -122,7 +122,11 @@ func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverut
dispatcher := c.getDispatcher(desc.AgentName, desc.JobType)
if dispatcher == nil {
- logger.Infow("not dispatching agent job since no worker is available", "agentName", desc.AgentName, "jobType", desc.JobType)
+ logger.Infow("not dispatching agent job since no worker is available",
+ "agentName", desc.AgentName,
+ "jobType", desc.JobType,
+ "room", desc.Room.Name,
+ "roomID", desc.Room.Sid)
return ret
}
@@ -183,6 +187,12 @@ func (c *agentClient) getDispatcher(agName string, jobType livekit.JobType) *ser
}
c.mu.Unlock()
+ if agName == "" {
+ // if no agent name is given, we would need to dispatch backwards compatible mode
+ // which means dispatching to each of the namespaces
+ return target
+ }
+
done := make(chan *serverutils.IncrementalDispatcher[string], 1)
c.workers.Submit(func() {
agentNames.ForEach(func(ag string) {
diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go
index dc0828c66..853936955 100644
--- a/pkg/rtc/participant.go
+++ b/pkg/rtc/participant.go
@@ -129,6 +129,7 @@ type ParticipantParams struct {
TURNSEnabled bool
GetParticipantInfo func(pID livekit.ParticipantID) *livekit.ParticipantInfo
GetRegionSettings func(ip string) *livekit.RegionSettings
+ GetSubscriberForwarderState func(p types.LocalParticipant) (map[livekit.TrackID]*livekit.RTPForwarderState, error)
DisableSupervisor bool
ReconnectOnPublicationError bool
ReconnectOnSubscriptionError bool
@@ -231,6 +232,7 @@ type ParticipantImpl struct {
onICEConfigChanged func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig)
cachedDownTracks map[livekit.TrackID]*downTrackState
+ forwarderState map[livekit.TrackID]*livekit.RTPForwarderState
supervisor *supervisor.ParticipantSupervisor
@@ -1051,6 +1053,7 @@ func (p *ParticipantImpl) SetMigrateState(s types.MigrateState) {
case types.MigrateStateComplete:
p.TransportManager.ProcessPendingPublisherDataChannels()
+ p.cacheForwarderState()
}
if onMigrateStateChange := p.getOnMigrateStateChange(); onMigrateStateChange != nil {
@@ -1209,7 +1212,9 @@ func (p *ParticipantImpl) onTrackSubscribed(subTrack types.SubscribedTrack) {
return
}
if p.TransportManager.HasSubscriberEverConnected() {
- subTrack.DownTrack().SetConnected()
+ dt := subTrack.DownTrack()
+ dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(subTrack.ID())})
+ dt.SetConnected()
}
p.TransportManager.AddSubscribedTrack(subTrack)
})
@@ -1636,7 +1641,7 @@ func (p *ParticipantImpl) onPublisherInitialConnected() {
func (p *ParticipantImpl) onSubscriberInitialConnected() {
go p.subscriberRTCPWorker()
- p.setDowntracksConnected()
+ p.setDownTracksConnected()
}
func (p *ParticipantImpl) onPrimaryTransportInitialConnected() {
@@ -2448,18 +2453,39 @@ func (p *ParticipantImpl) postRtcp(pkts []rtcp.Packet) {
}, postRtcpOp{p, pkts})
}
-func (p *ParticipantImpl) setDowntracksConnected() {
+func (p *ParticipantImpl) setDownTracksConnected() {
for _, t := range p.SubscriptionManager.GetSubscribedTracks() {
if dt := t.DownTrack(); dt != nil {
+ dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(t.ID())})
dt.SetConnected()
}
}
}
+func (p *ParticipantImpl) cacheForwarderState() {
+ // if migrating in, get forwarder state from migrating out node to facilitate resume
+ if f := p.params.GetSubscriberForwarderState; f != nil {
+ if fs, err := f(p); err == nil {
+ p.lock.Lock()
+ p.forwarderState = fs
+ p.lock.Unlock()
+ }
+ }
+}
+
+func (p *ParticipantImpl) getAndDeleteForwarderState(trackID livekit.TrackID) *livekit.RTPForwarderState {
+ p.lock.Lock()
+ fs := p.forwarderState[trackID]
+ delete(p.forwarderState, trackID)
+ p.lock.Unlock()
+
+ return fs
+}
+
func (p *ParticipantImpl) CacheDownTrack(trackID livekit.TrackID, rtpTransceiver *webrtc.RTPTransceiver, downTrack sfu.DownTrackState) {
p.lock.Lock()
if existing := p.cachedDownTracks[trackID]; existing != nil && existing.transceiver != rtpTransceiver {
- p.subLogger.Infow("cached transceiver changed", "trackID", trackID)
+ p.subLogger.Warnw("cached transceiver changed", nil, "trackID", trackID)
}
p.cachedDownTracks[trackID] = &downTrackState{transceiver: rtpTransceiver, downTrack: downTrack}
p.subLogger.Debugw("caching downtrack", "trackID", trackID)
diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go
index b0c515df0..74ddbe81d 100644
--- a/pkg/rtc/room.go
+++ b/pkg/rtc/room.go
@@ -791,7 +791,7 @@ func (r *Room) CloseIfEmpty() {
r.lock.Unlock()
if elapsed >= int64(timeout) {
- r.Close(types.ParticipantCloseReasonNone)
+ r.Close(types.ParticipantCloseReasonRoomClosed)
}
}
@@ -1488,7 +1488,15 @@ func (r *Room) createAgentDispatchesFromRoomAgent() {
return
}
- for _, ag := range r.internal.AgentDispatches {
+ roomDisp := r.internal.AgentDispatches
+ if len(roomDisp) == 0 {
+ // Backward compatibility: by default, start any agent in the empty JobName
+ roomDisp = []*livekit.RoomAgentDispatch{
+ &livekit.RoomAgentDispatch{},
+ }
+ }
+
+ for _, ag := range roomDisp {
ad := &livekit.AgentDispatch{
Id: guid.New(guid.AgentDispatchPrefix),
AgentName: ag.AgentName,
diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go
index 4a49b11db..37ca19561 100644
--- a/pkg/rtc/subscriptionmanager.go
+++ b/pkg/rtc/subscriptionmanager.go
@@ -180,6 +180,26 @@ func (m *SubscriptionManager) GetSubscribedTracks() []types.SubscribedTrack {
return tracks
}
+func (m *SubscriptionManager) StopAndGetSubscribedTracksForwarderState() map[livekit.TrackID]*livekit.RTPForwarderState {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
+
+ states := make(map[livekit.TrackID]*livekit.RTPForwarderState, len(m.subscriptions))
+ for trackID, t := range m.subscriptions {
+ st := t.getSubscribedTrack()
+ if st != nil {
+ dt := st.DownTrack()
+ if dt != nil {
+ state := dt.StopWriteAndGetState()
+ if state.ForwarderState != nil {
+ states[trackID] = state.ForwarderState
+ }
+ }
+ }
+ }
+ return states
+}
+
func (m *SubscriptionManager) HasSubscriptions() bool {
m.lock.RLock()
defer m.lock.RUnlock()
diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go
index 146861937..4d775698c 100644
--- a/pkg/rtc/transport.go
+++ b/pkg/rtc/transport.go
@@ -1409,7 +1409,7 @@ func (t *PCTransport) handleRemoteICECandidate(e event) error {
t.params.Logger.Warnw("failed to add cached ICE candidate", err, "candidate", c)
return errors.Wrap(err, "add ice candidate failed")
} else {
- t.params.Logger.Debugw("added cached ICE candidate", "candidate", c)
+ t.params.Logger.Debugw("added ICE candidate", "candidate", c)
}
return nil
diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go
index afe74ec91..e3d25a59d 100644
--- a/pkg/rtc/types/interfaces.go
+++ b/pkg/rtc/types/interfaces.go
@@ -106,6 +106,7 @@ const (
ParticipantCloseReasonDataChannelError
ParticipantCloseReasonMigrateCodecMismatch
ParticipantCloseReasonSignalSourceClose
+ ParticipantCloseReasonRoomClosed
)
func (p ParticipantCloseReason) String() string {
@@ -158,6 +159,8 @@ func (p ParticipantCloseReason) String() string {
return "MIGRATE_CODEC_MISMATCH"
case ParticipantCloseReasonSignalSourceClose:
return "SIGNAL_SOURCE_CLOSE"
+ case ParticipantCloseReasonRoomClosed:
+ return "ROOM_CLOSED"
default:
return fmt.Sprintf("%d", int(p))
}
@@ -188,6 +191,8 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason {
return livekit.DisconnectReason_STATE_MISMATCH
case ParticipantCloseReasonSignalSourceClose:
return livekit.DisconnectReason_SIGNAL_CLOSE
+ case ParticipantCloseReasonRoomClosed:
+ return livekit.DisconnectReason_ROOM_CLOSED
default:
// the other types will map to unknown reason
return livekit.DisconnectReason_UNKNOWN_REASON
@@ -364,6 +369,7 @@ type LocalParticipant interface {
// WaitUntilSubscribed waits until all subscriptions have been settled, or if the timeout
// has been reached. If the timeout expires, it will return an error.
WaitUntilSubscribed(timeout time.Duration) error
+ StopAndGetSubscribedTracksForwarderState() map[livekit.TrackID]*livekit.RTPForwarderState
// returns list of participant identities that the current participant is subscribed to
GetSubscribedParticipants() []livekit.ParticipantID
@@ -407,7 +413,11 @@ type LocalParticipant interface {
NotifyMigration()
SetMigrateState(s MigrateState)
MigrateState() MigrateState
- SetMigrateInfo(previousOffer, previousAnswer *webrtc.SessionDescription, mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo)
+ SetMigrateInfo(
+ previousOffer, previousAnswer *webrtc.SessionDescription,
+ mediaTracks []*livekit.TrackPublishedResponse,
+ dataChannels []*livekit.DataChannelInfo,
+ )
UpdateMediaRTT(rtt uint32)
UpdateSignalingRTT(rtt uint32)
diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go
index a21c7c04d..79f22b445 100644
--- a/pkg/rtc/types/typesfakes/fake_local_participant.go
+++ b/pkg/rtc/types/typesfakes/fake_local_participant.go
@@ -854,6 +854,16 @@ type FakeLocalParticipant struct {
stateReturnsOnCall map[int]struct {
result1 livekit.ParticipantInfo_State
}
+ StopAndGetSubscribedTracksForwarderStateStub func() map[livekit.TrackID]*livekit.RTPForwarderState
+ stopAndGetSubscribedTracksForwarderStateMutex sync.RWMutex
+ stopAndGetSubscribedTracksForwarderStateArgsForCall []struct {
+ }
+ stopAndGetSubscribedTracksForwarderStateReturns struct {
+ result1 map[livekit.TrackID]*livekit.RTPForwarderState
+ }
+ stopAndGetSubscribedTracksForwarderStateReturnsOnCall map[int]struct {
+ result1 map[livekit.TrackID]*livekit.RTPForwarderState
+ }
SubscribeToTrackStub func(livekit.TrackID)
subscribeToTrackMutex sync.RWMutex
subscribeToTrackArgsForCall []struct {
@@ -5607,6 +5617,59 @@ func (fake *FakeLocalParticipant) StateReturnsOnCall(i int, result1 livekit.Part
}{result1}
}
+func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderState() map[livekit.TrackID]*livekit.RTPForwarderState {
+ fake.stopAndGetSubscribedTracksForwarderStateMutex.Lock()
+ ret, specificReturn := fake.stopAndGetSubscribedTracksForwarderStateReturnsOnCall[len(fake.stopAndGetSubscribedTracksForwarderStateArgsForCall)]
+ fake.stopAndGetSubscribedTracksForwarderStateArgsForCall = append(fake.stopAndGetSubscribedTracksForwarderStateArgsForCall, struct {
+ }{})
+ stub := fake.StopAndGetSubscribedTracksForwarderStateStub
+ fakeReturns := fake.stopAndGetSubscribedTracksForwarderStateReturns
+ fake.recordInvocation("StopAndGetSubscribedTracksForwarderState", []interface{}{})
+ fake.stopAndGetSubscribedTracksForwarderStateMutex.Unlock()
+ if stub != nil {
+ return stub()
+ }
+ if specificReturn {
+ return ret.result1
+ }
+ return fakeReturns.result1
+}
+
+func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderStateCallCount() int {
+ fake.stopAndGetSubscribedTracksForwarderStateMutex.RLock()
+ defer fake.stopAndGetSubscribedTracksForwarderStateMutex.RUnlock()
+ return len(fake.stopAndGetSubscribedTracksForwarderStateArgsForCall)
+}
+
+func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderStateCalls(stub func() map[livekit.TrackID]*livekit.RTPForwarderState) {
+ fake.stopAndGetSubscribedTracksForwarderStateMutex.Lock()
+ defer fake.stopAndGetSubscribedTracksForwarderStateMutex.Unlock()
+ fake.StopAndGetSubscribedTracksForwarderStateStub = stub
+}
+
+func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderStateReturns(result1 map[livekit.TrackID]*livekit.RTPForwarderState) {
+ fake.stopAndGetSubscribedTracksForwarderStateMutex.Lock()
+ defer fake.stopAndGetSubscribedTracksForwarderStateMutex.Unlock()
+ fake.StopAndGetSubscribedTracksForwarderStateStub = nil
+ fake.stopAndGetSubscribedTracksForwarderStateReturns = struct {
+ result1 map[livekit.TrackID]*livekit.RTPForwarderState
+ }{result1}
+}
+
+func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderStateReturnsOnCall(i int, result1 map[livekit.TrackID]*livekit.RTPForwarderState) {
+ fake.stopAndGetSubscribedTracksForwarderStateMutex.Lock()
+ defer fake.stopAndGetSubscribedTracksForwarderStateMutex.Unlock()
+ fake.StopAndGetSubscribedTracksForwarderStateStub = nil
+ if fake.stopAndGetSubscribedTracksForwarderStateReturnsOnCall == nil {
+ fake.stopAndGetSubscribedTracksForwarderStateReturnsOnCall = make(map[int]struct {
+ result1 map[livekit.TrackID]*livekit.RTPForwarderState
+ })
+ }
+ fake.stopAndGetSubscribedTracksForwarderStateReturnsOnCall[i] = struct {
+ result1 map[livekit.TrackID]*livekit.RTPForwarderState
+ }{result1}
+}
+
func (fake *FakeLocalParticipant) SubscribeToTrack(arg1 livekit.TrackID) {
fake.subscribeToTrackMutex.Lock()
fake.subscribeToTrackArgsForCall = append(fake.subscribeToTrackArgsForCall, struct {
@@ -6851,6 +6914,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.setTrackMutedMutex.RUnlock()
fake.stateMutex.RLock()
defer fake.stateMutex.RUnlock()
+ fake.stopAndGetSubscribedTracksForwarderStateMutex.RLock()
+ defer fake.stopAndGetSubscribedTracksForwarderStateMutex.RUnlock()
fake.subscribeToTrackMutex.RLock()
defer fake.subscribeToTrackMutex.RUnlock()
fake.subscriberAsPrimaryMutex.RLock()
diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go
index a09201036..5c6d1c0dd 100644
--- a/pkg/rtc/wrappedreceiver.go
+++ b/pkg/rtc/wrappedreceiver.go
@@ -201,9 +201,9 @@ func (d *DummyReceiver) HeaderExtensions() []webrtc.RTPHeaderExtensionParameter
return d.headerExtensions
}
-func (d *DummyReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {
+func (d *DummyReceiver) ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
- return r.ReadRTP(buf, layer, sn)
+ return r.ReadRTP(buf, layer, esn)
}
return 0, errors.New("no receiver")
}
diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go
index 131e77c41..2670985e5 100644
--- a/pkg/service/roomallocator.go
+++ b/pkg/service/roomallocator.go
@@ -106,15 +106,9 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
internal.TrackEgress = req.Egress.Tracks
}
}
- if req.Agent == nil {
- // Backward compatibility: by default, start any agent in the empty JobName
- req.Agent = &livekit.RoomAgent{
- Dispatches: []*livekit.RoomAgentDispatch{
- &livekit.RoomAgentDispatch{},
- },
- }
+ if req.Agent != nil {
+ internal.AgentDispatches = req.Agent.Dispatches
}
- internal.AgentDispatches = req.Agent.Dispatches
if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 {
internal.PlayoutDelay = &livekit.PlayoutDelay{
Enabled: true,
diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go
index 92520433b..1482035d4 100644
--- a/pkg/sfu/buffer/buffer.go
+++ b/pkg/sfu/buffer/buffer.go
@@ -17,7 +17,6 @@ package buffer
import (
"encoding/binary"
"errors"
- "fmt"
"io"
"strings"
"sync"
@@ -73,7 +72,7 @@ type ExtPacket struct {
type Buffer struct {
sync.RWMutex
readCond *sync.Cond
- bucket *bucket.Bucket
+ bucket *bucket.Bucket[uint64]
nacker *nack.NackQueue
maxVideoPkts int
maxAudioPkts int
@@ -252,10 +251,11 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
switch {
case strings.HasPrefix(b.mime, "audio/"):
b.codecType = webrtc.RTPCodecTypeAudio
- b.bucket = bucket.NewBucket(InitPacketBufferSizeAudio)
+ b.bucket = bucket.NewBucket[uint64](InitPacketBufferSizeAudio)
+
case strings.HasPrefix(b.mime, "video/"):
b.codecType = webrtc.RTPCodecTypeVideo
- b.bucket = bucket.NewBucket(InitPacketBufferSizeVideo)
+ b.bucket = bucket.NewBucket[uint64](InitPacketBufferSizeVideo)
if b.frameRateCalculator[0] == nil {
if strings.EqualFold(codec.MimeType, webrtc.MimeTypeVP8) {
b.frameRateCalculator[0] = NewFrameRateCalculatorVP8(b.clockRate, b.logger)
@@ -320,28 +320,6 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) {
return
}
- if err = utils.ValidateRTPPacket(&rtpPacket, b.payloadType, b.mediaSSRC); err != nil {
- invalidPacketCount := b.invalidPacketCount.Inc()
- if (invalidPacketCount-1)%100 == 0 {
- b.logger.Warnw(
- "validating RTP packet failed", err,
- "version", rtpPacket.Version,
- "padding", rtpPacket.Padding,
- "marker", rtpPacket.Marker,
- "expectedPayloadType", b.payloadType,
- "payloadType", rtpPacket.PayloadType,
- "sequenceNumber", rtpPacket.SequenceNumber,
- "timestamp", rtpPacket.Timestamp,
- "expectedSSRC", b.mediaSSRC,
- "ssrc", rtpPacket.SSRC,
- "numExtensions", len(rtpPacket.Extensions),
- "payloadSize", len(rtpPacket.Payload),
- "rtpStats", b.rtpStats,
- "snRangeMap", b.snRangeMap,
- )
- }
- }
-
now := time.Now().UnixNano()
if b.twcc != nil && b.twccExtID != 0 && !b.closed.Load() {
if ext := rtpPacket.GetExtension(b.twccExtID); ext != nil {
@@ -349,6 +327,13 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) {
}
}
+ // libwebrtc will use 0 ssrc for probing, don't push the packet to pending queue to avoid memory increasing since
+ // the Bind will not be called to consume the pending packets. More details in https://github.com/pion/webrtc/pull/2816
+ if rtpPacket.SSRC == 0 {
+ b.Unlock()
+ return
+ }
+
// handle RTX packet
if pb := b.primaryBufferForRTX; pb != nil {
b.Unlock()
@@ -618,7 +603,7 @@ func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, i
}
flowState.ExtSequenceNumber -= snAdjustment
rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber)
- _, err = b.bucket.AddPacketWithSequenceNumber(rawPkt, rtpPacket.Header.SequenceNumber)
+ _, err = b.bucket.AddPacketWithSequenceNumber(rawPkt, flowState.ExtSequenceNumber)
if err != nil {
if !flowState.IsDuplicate {
if errors.Is(err, bucket.ErrPacketTooOld) {
@@ -664,7 +649,7 @@ func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, i
}
func (b *Buffer) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket {
- n, err := b.getPacket(buf, ep.Packet.SequenceNumber)
+ n, err := b.getPacket(buf, ep.ExtSequenceNumber)
if err != nil {
packetNotFoundCount := b.packetNotFoundCount.Inc()
if (packetNotFoundCount-1)%20 == 0 {
@@ -689,32 +674,6 @@ func (b *Buffer) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket {
b.logger.Warnw("unexpected marshal size", nil, "max", n, "need", payloadEnd)
return nil
}
- // TODO-REMOVE-AFTER-DEBUG START
- if payloadEnd != n {
- paddingEnd := payloadStart + int(ep.Packet.PaddingSize)
- if paddingEnd != n {
- b.logger.Warnw("unexpected marshal size", nil, "max", n, "payloadEnd", payloadEnd, "paddingEnd", paddingEnd)
- }
- }
- // check a few fields for validity
- checkVersion := (buf[0] & 0xc0) >> 6
- checkPayloadType := buf[1] & 0x7f
- checkSequenceNumber := binary.BigEndian.Uint16(buf[2:])
- checkSSRC := binary.BigEndian.Uint32(buf[8:])
- if checkVersion != pkt.Version || checkPayloadType != pkt.PayloadType || checkSequenceNumber != pkt.SequenceNumber || checkSSRC != pkt.SSRC {
- b.logger.Warnw(
- "rtp packet mismatch", nil,
- "version", fmt.Sprintf("%d != %d", checkVersion, pkt.Version),
- "payloadType", fmt.Sprintf("%d != %d", checkPayloadType, pkt.PayloadType),
- "sequenceNumber", fmt.Sprintf("%d != %d", checkSequenceNumber, pkt.SequenceNumber),
- "SSRC", fmt.Sprintf("%d != %d", checkSSRC, pkt.SSRC),
- "bytes", buf[0:16],
- "len", n,
- "headerSize", payloadStart,
- "payloadSize", payloadEnd-payloadStart,
- )
- }
- // TODO-REMOVE-AFTER-DEBUG END
pkt.Payload = buf[payloadStart:payloadEnd]
ep.Packet = &pkt
@@ -1027,18 +986,18 @@ func (b *Buffer) getRTCP() []rtcp.Packet {
return pkts
}
-func (b *Buffer) GetPacket(buff []byte, sn uint16) (int, error) {
+func (b *Buffer) GetPacket(buff []byte, esn uint64) (int, error) {
b.Lock()
defer b.Unlock()
- return b.getPacket(buff, sn)
+ return b.getPacket(buff, esn)
}
-func (b *Buffer) getPacket(buff []byte, sn uint16) (int, error) {
+func (b *Buffer) getPacket(buff []byte, esn uint64) (int, error) {
if b.closed.Load() {
return 0, io.EOF
}
- return b.bucket.GetPacket(buff, sn)
+ return b.bucket.GetPacket(buff, esn)
}
func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) {
diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go
index 7b2daa8d5..8944b96ba 100644
--- a/pkg/sfu/buffer/rtpstats_receiver.go
+++ b/pkg/sfu/buffer/rtpstats_receiver.go
@@ -137,14 +137,21 @@ func (r *RTPStatsReceiver) NewSnapshotId() uint32 {
return r.newSnapshotID(r.sequenceNumber.GetExtendedHighest())
}
-func (r *RTPStatsReceiver) getTSRolloverCount(diffNano int64) int {
+func (r *RTPStatsReceiver) getTSRolloverCount(diffNano int64, ts uint32) int {
if diffNano < r.tsRolloverThreshold {
// time not more than rollover threshold
- return 0
+ return -1
}
- excess := int((diffNano - r.tsRolloverThreshold) * int64(r.params.ClockRate) / 1e9)
- return excess/(1<<32) + 1
+ excess := int((diffNano - r.tsRolloverThreshold*2) * int64(r.params.ClockRate) / 1e9)
+ roc := excess / (1 << 32)
+ if roc < 0 {
+ roc = 0
+ }
+ if r.timestamp.GetHighest() > ts {
+ roc++
+ }
+ return roc
}
func (r *RTPStatsReceiver) Update(
@@ -167,6 +174,8 @@ func (r *RTPStatsReceiver) Update(
var resSN utils.WrapAroundUpdateResult[uint64]
var gapSN int64
var resTS utils.WrapAroundUpdateResult[uint64]
+ var timeSinceHighest int64
+ var tsRolloverCount int
getLoggingFields := func() []interface{} {
return []interface{}{
@@ -174,6 +183,8 @@ func (r *RTPStatsReceiver) Update(
"gapSN", gapSN,
"resTS", resTS,
"gapTS", int64(resTS.ExtendedVal - resTS.PreExtendedHighest),
+ "timeSinceHighest", time.Duration(timeSinceHighest),
+ "tsRolloverCount", tsRolloverCount,
"packetTime", time.Unix(0, packetTime).String(),
"sequenceNumber", sequenceNumber,
"timestamp", timestamp,
@@ -219,30 +230,57 @@ func (r *RTPStatsReceiver) Update(
}
gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
- resTS = r.timestamp.Rollover(timestamp, r.getTSRolloverCount(packetTime-r.highestTime))
+ timeSinceHighest = packetTime - r.highestTime
+ tsRolloverCount = r.getTSRolloverCount(timeSinceHighest, timestamp)
+ if tsRolloverCount >= 0 {
+ r.logger.Warnw(
+ "potential time stamp roll over", nil,
+ getLoggingFields()...,
+ )
+ }
+ resTS = r.timestamp.Rollover(timestamp, tsRolloverCount)
if resTS.IsUnhandled {
flowState.IsNotHandled = true
return
}
gapTS := int64(resTS.ExtendedVal - resTS.PreExtendedHighest)
- // it is possible that sequence number has rolled over too
- if gapSN < 0 && gapTS > 0 && payloadSize > 0 {
- // not possible to know how many cycles of sequence number roll over could have happened,
- // use 1 to ensure that it at least does not go backwards
- resSN = r.sequenceNumber.Rollover(sequenceNumber, 1)
- if resSN.IsUnhandled {
- flowState.IsNotHandled = true
- return
- }
-
+ // it is possible to reecive old packets,
+ // as it is not possible to detect how far to roll back sequence number, ignore old packets
+ if gapTS < 0 && gapSN > 0 {
+ r.sequenceNumber.UndoUpdate(resSN)
r.logger.Warnw(
- "forcing sequence number rollover", nil,
+ "dropping old packet", nil,
getLoggingFields()...,
)
- gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
+ flowState.IsNotHandled = true
+ return
+ }
+
+ // it is possible that sequence number has rolled over too
+ if gapSN < 0 && gapTS > 0 && payloadSize > 0 {
+ if tsRolloverCount >= 0 {
+ // not possible to know how many cycles of sequence number roll over could have happened,
+ // use 1 to ensure that it at least does not go backwards
+ resSN = r.sequenceNumber.Rollover(sequenceNumber, 1)
+ if resSN.IsUnhandled {
+ flowState.IsNotHandled = true
+ return
+ }
+
+ r.logger.Warnw(
+ "forcing sequence number rollover", nil,
+ getLoggingFields()...,
+ )
+ } else {
+ r.logger.Warnw(
+ "forcing sequence number rollover skipped", nil,
+ getLoggingFields()...,
+ )
+ }
}
}
+ gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
pktSize := uint64(hdrSize + payloadSize + paddingSize)
if gapSN <= 0 { // duplicate OR out-of-order
@@ -263,8 +301,6 @@ func (r *RTPStatsReceiver) Update(
}
flowState.IsOutOfOrder = true
- flowState.ExtSequenceNumber = resSN.ExtendedVal
- flowState.ExtTimestamp = resTS.ExtendedVal
if !flowState.IsDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpNegativeCount++
@@ -316,9 +352,9 @@ func (r *RTPStatsReceiver) Update(
flowState.LossStartInclusive = resSN.PreExtendedHighest + 1
flowState.LossEndExclusive = resSN.ExtendedVal
}
- flowState.ExtSequenceNumber = resSN.ExtendedVal
- flowState.ExtTimestamp = resTS.ExtendedVal
}
+ flowState.ExtSequenceNumber = resSN.ExtendedVal
+ flowState.ExtTimestamp = resTS.ExtendedVal
if !flowState.IsDuplicate {
if payloadSize == 0 {
@@ -585,8 +621,8 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
return false
}
- r.updatePropagationDelayAndRecordSenderReport(srDataExt)
r.checkRTPClockSkewForSenderReport(srDataExt)
+ r.updatePropagationDelayAndRecordSenderReport(srDataExt)
r.checkRTPClockSkewAgainstMediaPathForSenderReport(srDataExt)
if err, loggingFields := r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()); err != nil {
@@ -731,6 +767,13 @@ func (r *RTPStatsReceiver) isInRange(esn uint64, ehsn uint64) bool {
return diff >= 0 && diff < cHistorySize
}
+func (r *RTPStatsReceiver) HighestTimestamp() uint32 {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+
+ return r.timestamp.GetHighest()
+}
+
// ----------------------------------
type lockedRTPStatsReceiverLogEncoder struct {
diff --git a/pkg/sfu/buffer/rtpstats_receiver_test.go b/pkg/sfu/buffer/rtpstats_receiver_test.go
index eb9ed72bf..b9005faf7 100644
--- a/pkg/sfu/buffer/rtpstats_receiver_test.go
+++ b/pkg/sfu/buffer/rtpstats_receiver_test.go
@@ -249,6 +249,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
// padding only
sequenceNumber += 2
+ timestamp += 3000
packet = getPacket(sequenceNumber, timestamp, 0)
flowState = r.Update(
time.Now().UnixNano(),
@@ -266,5 +267,22 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
require.True(t, r.history.IsSet(uint64(sequenceNumber)-1))
require.True(t, r.history.IsSet(uint64(sequenceNumber)-2))
+ // old packet, but simulating increasing sequence number after roll over
+ packet = getPacket(sequenceNumber+400, timestamp-6000, 300)
+ flowState = r.Update(
+ time.Now().UnixNano(),
+ packet.Header.SequenceNumber,
+ packet.Header.Timestamp,
+ packet.Header.Marker,
+ packet.Header.MarshalSize(),
+ len(packet.Payload),
+ 0,
+ )
+ require.True(t, flowState.IsNotHandled)
+ require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
+ require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
+ require.Equal(t, timestamp, r.timestamp.GetHighest())
+ require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
+
r.Stop()
}
diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go
index 6494013c6..43ac5eb4a 100644
--- a/pkg/sfu/buffer/rtpstats_sender.go
+++ b/pkg/sfu/buffer/rtpstats_sender.go
@@ -990,6 +990,13 @@ func (r *RTPStatsSender) getIntervalStats(
return
}
+func (r *RTPStatsSender) ExtHighestSequenceNumber() uint64 {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+
+ return r.extHighestSN
+}
+
// -------------------------------------------------------------------
type lockedRTPStatsSenderLogEncoder struct {
diff --git a/pkg/sfu/codecmunger/vp8.go b/pkg/sfu/codecmunger/vp8.go
index 97c37bc4c..3c89d2e1a 100644
--- a/pkg/sfu/codecmunger/vp8.go
+++ b/pkg/sfu/codecmunger/vp8.go
@@ -15,10 +15,9 @@
package codecmunger
import (
- "fmt"
-
"github.com/elliotchance/orderedmap/v2"
+ "github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
@@ -32,23 +31,6 @@ const (
// -----------------------------------------------------------
-type VP8State struct {
- ExtLastPictureId int32
- PictureIdUsed bool
- LastTl0PicIdx uint8
- Tl0PicIdxUsed bool
- TidUsed bool
- LastKeyIdx uint8
- KeyIdxUsed bool
-}
-
-func (v VP8State) String() string {
- return fmt.Sprintf("VP8State{extLastPictureId: %d, pictureIdUsed: %+v, lastTl0PicIdx: %d, tl0PicIdxUsed: %+v, tidUsed: %+v, lastKeyIdx: %d, keyIdxUsed: %+v)",
- v.ExtLastPictureId, v.PictureIdUsed, v.LastTl0PicIdx, v.Tl0PicIdxUsed, v.TidUsed, v.LastKeyIdx, v.KeyIdxUsed)
-}
-
-// -----------------------------------------------------------
-
type VP8 struct {
logger logger.Logger
@@ -85,25 +67,27 @@ func NewVP8FromNull(cm CodecMunger, logger logger.Logger) *VP8 {
}
func (v *VP8) GetState() interface{} {
- return VP8State{
+ return &livekit.VP8MungerState{
ExtLastPictureId: v.extLastPictureId,
PictureIdUsed: v.pictureIdUsed,
- LastTl0PicIdx: v.lastTl0PicIdx,
+ LastTl0PicIdx: uint32(v.lastTl0PicIdx),
Tl0PicIdxUsed: v.tl0PicIdxUsed,
TidUsed: v.tidUsed,
- LastKeyIdx: v.lastKeyIdx,
+ LastKeyIdx: uint32(v.lastKeyIdx),
KeyIdxUsed: v.keyIdxUsed,
}
}
func (v *VP8) SeedState(seed interface{}) {
- if state, ok := seed.(VP8State); ok {
+ switch cm := seed.(type) {
+ case *livekit.RTPForwarderState_Vp8Munger:
+ state := cm.Vp8Munger
v.extLastPictureId = state.ExtLastPictureId
v.pictureIdUsed = state.PictureIdUsed
- v.lastTl0PicIdx = state.LastTl0PicIdx
+ v.lastTl0PicIdx = uint8(state.LastTl0PicIdx)
v.tl0PicIdxUsed = state.Tl0PicIdxUsed
v.tidUsed = state.TidUsed
- v.lastKeyIdx = state.LastKeyIdx
+ v.lastKeyIdx = uint8(state.LastKeyIdx)
v.keyIdxUsed = state.KeyIdxUsed
}
}
diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go
index c13760e21..fff33d8aa 100644
--- a/pkg/sfu/downtrack.go
+++ b/pkg/sfu/downtrack.go
@@ -138,7 +138,7 @@ var (
type DownTrackState struct {
RTPStats *buffer.RTPStatsSender
DeltaStatsSenderSnapshotId uint32
- ForwarderState ForwarderState
+ ForwarderState *livekit.RTPForwarderState
}
func (d DownTrackState) String() string {
@@ -265,6 +265,7 @@ type DownTrack struct {
connected atomic.Bool
bindAndConnectedOnce atomic.Bool
writable atomic.Bool
+ writeStopped atomic.Bool
rtpStats *buffer.RTPStatsSender
@@ -1147,20 +1148,30 @@ func (d *DownTrack) MaxLayer() buffer.VideoLayer {
}
func (d *DownTrack) GetState() DownTrackState {
- dts := DownTrackState{
+ return DownTrackState{
RTPStats: d.rtpStats,
DeltaStatsSenderSnapshotId: d.deltaStatsSenderSnapshotId,
ForwarderState: d.forwarder.GetState(),
}
- return dts
}
func (d *DownTrack) SeedState(state DownTrackState) {
- d.rtpStats.Seed(state.RTPStats)
- d.deltaStatsSenderSnapshotId = state.DeltaStatsSenderSnapshotId
+ if state.RTPStats != nil {
+ d.rtpStats.Seed(state.RTPStats)
+ d.deltaStatsSenderSnapshotId = state.DeltaStatsSenderSnapshotId
+ }
d.forwarder.SeedState(state.ForwarderState)
}
+func (d *DownTrack) StopWriteAndGetState() DownTrackState {
+ d.bindLock.Lock()
+ d.writable.Store(false)
+ d.writeStopped.Store(true)
+ d.bindLock.Unlock()
+
+ return d.GetState()
+}
+
func (d *DownTrack) UpTrackLayersChange() {
if sal := d.getStreamAllocatorListener(); sal != nil {
sal.OnAvailableLayersChanged(d)
@@ -1985,6 +1996,9 @@ func (d *DownTrack) GetAndResetBytesSent() (uint32, uint32) {
*/
func (d *DownTrack) onBindAndConnectedChange() {
+ if d.writeStopped.Load() {
+ return
+ }
d.writable.Store(d.connected.Load() && d.bound.Load())
if d.connected.Load() && d.bound.Load() && !d.bindAndConnectedOnce.Swap(true) {
if d.activePaddingOnMuteUpTrack.Load() {
diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go
index 8f0cb1af9..7f7f06d0f 100644
--- a/pkg/sfu/forwarder.go
+++ b/pkg/sfu/forwarder.go
@@ -27,6 +27,7 @@ import (
"github.com/pion/webrtc/v3"
"go.uber.org/zap/zapcore"
+ "github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
@@ -186,35 +187,6 @@ type TranslationParams struct {
// -------------------------------------------------------------------
-type ForwarderState struct {
- Started bool
- ReferenceLayerSpatial int32
- PreStartTime time.Time
- ExtFirstTS uint64
- DummyStartTSOffset uint64
- RTP RTPMungerState
- Codec interface{}
-}
-
-func (f ForwarderState) String() string {
- codecString := ""
- switch codecState := f.Codec.(type) {
- case codecmunger.VP8State:
- codecString = codecState.String()
- }
- return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, extFirstTS: %d, dummyStartTSOffset: %d, rtp: %s, codec: %s}",
- f.Started,
- f.ReferenceLayerSpatial,
- f.PreStartTime.String(),
- f.ExtFirstTS,
- f.DummyStartTSOffset,
- f.RTP.String(),
- codecString,
- )
-}
-
-// -------------------------------------------------------------------
-
type refInfo struct {
senderReport *buffer.RTCPSenderReportData
tsOffset uint64
@@ -402,41 +374,52 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [
}
}
-func (f *Forwarder) GetState() ForwarderState {
+func (f *Forwarder) GetState() *livekit.RTPForwarderState {
f.lock.RLock()
defer f.lock.RUnlock()
if !f.started {
- return ForwarderState{}
+ return nil
}
- return ForwarderState{
- Started: f.started,
- ReferenceLayerSpatial: f.referenceLayerSpatial,
- PreStartTime: f.preStartTime,
- ExtFirstTS: f.extFirstTS,
- DummyStartTSOffset: f.dummyStartTSOffset,
- RTP: f.rtpMunger.GetLast(),
- Codec: f.codecMunger.GetState(),
+ state := &livekit.RTPForwarderState{
+ Started: f.started,
+ ReferenceLayerSpatial: f.referenceLayerSpatial,
+ ExtFirstTimestamp: f.extFirstTS,
+ DummyStartTimestampOffset: f.dummyStartTSOffset,
+ RtpMunger: f.rtpMunger.GetState(),
}
+ if !f.preStartTime.IsZero() {
+ state.PreStartTime = f.preStartTime.UnixNano()
+ }
+
+ codecMungerState := f.codecMunger.GetState()
+ if vp8MungerState, ok := codecMungerState.(*livekit.VP8MungerState); ok {
+ state.CodecMunger = &livekit.RTPForwarderState_Vp8Munger{
+ Vp8Munger: vp8MungerState,
+ }
+ }
+ return state
}
-func (f *Forwarder) SeedState(state ForwarderState) {
- if !state.Started {
+func (f *Forwarder) SeedState(state *livekit.RTPForwarderState) {
+ if state == nil || !state.Started {
return
}
f.lock.Lock()
defer f.lock.Unlock()
- f.rtpMunger.SeedLast(state.RTP)
- f.codecMunger.SeedState(state.Codec)
+ f.rtpMunger.SeedState(state.RtpMunger)
+ f.codecMunger.SeedState(state.CodecMunger)
f.started = true
f.referenceLayerSpatial = state.ReferenceLayerSpatial
- f.preStartTime = state.PreStartTime
- f.extFirstTS = state.ExtFirstTS
- f.dummyStartTSOffset = state.DummyStartTSOffset
+ if state.PreStartTime != 0 {
+ f.preStartTime = time.Unix(0, state.PreStartTime)
+ }
+ f.extFirstTS = state.ExtFirstTimestamp
+ f.dummyStartTSOffset = state.DummyStartTimestampOffset
}
func (f *Forwarder) Mute(muted bool, isSubscribeMutable bool) bool {
@@ -1654,12 +1637,14 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
f.logger.Debugw(
message,
"layer", layer,
+ "referenceLayerSpatial", f.referenceLayerSpatial,
"extExpectedTS", extExpectedTS,
"incomingTS", extPkt.Packet.Timestamp,
"extIncomingTS", extPkt.ExtTimestamp,
"extRefTS", extRefTS,
"extLastTS", extLastTS,
"diffSeconds", math.Abs(diffSeconds),
+ "refInfos", wrappedRefInfoLogger{f},
)
}
// TODO-REMOVE-AFTER-DATA-COLLECTION
@@ -1667,12 +1652,14 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
f.logger.Infow(
message,
"layer", layer,
+ "referenceLayerSpatial", f.referenceLayerSpatial,
"extExpectedTS", extExpectedTS,
"incomingTS", extPkt.Packet.Timestamp,
"extIncomingTS", extPkt.ExtTimestamp,
"extRefTS", extRefTS,
"extLastTS", extLastTS,
"diffSeconds", math.Abs(diffSeconds),
+ "refInfos", wrappedRefInfoLogger{f},
)
}
@@ -1686,8 +1673,8 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
// 3. extExpectedTS -> expected timestamp of this packet calculated based on elapsed time since first packet
// Ideally, extRefTS and extExpectedTS should be very close and extLastTS should be before both of those.
// But, cases like muting/unmuting, clock vagaries, pacing, etc. make them not satisfy those conditions always.
- rtpMungerState := f.rtpMunger.GetLast()
- extLastTS := rtpMungerState.ExtLastTS
+ rtpMungerState := f.rtpMunger.GetState()
+ extLastTS := rtpMungerState.ExtLastTimestamp
extExpectedTS := extLastTS
extRefTS := extLastTS
refTS := uint32(extRefTS)
@@ -1838,7 +1825,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"extExpectedTS", extExpectedTS,
"extNextTS", extNextTS,
"tsJump", extNextTS-extLastTS,
- "nextSN", rtpMungerState.ExtLastSN+1,
+ "nextSN", rtpMungerState.ExtLastSequenceNumber+1,
"extIncomingSN", extPkt.ExtSequenceNumber,
"incomingTS", extPkt.Packet.Timestamp,
"extIncomingTS", extPkt.ExtTimestamp,
@@ -1856,7 +1843,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"extExpectedTS", extExpectedTS,
"extNextTS", extNextTS,
"tsJump", extNextTS-extLastTS,
- "nextSN", rtpMungerState.ExtLastSN+1,
+ "nextSN", rtpMungerState.ExtLastSequenceNumber+1,
"extIncomingSN", extPkt.ExtSequenceNumber,
"extIncomingTS", extPkt.ExtTimestamp,
)
@@ -2052,7 +2039,7 @@ func (f *Forwarder) GetSnTsForBlankFrames(frameRate uint32, numPackets int) ([]S
numPackets++
}
- extLastTS := f.rtpMunger.GetLast().ExtLastTS
+ extLastTS := f.rtpMunger.GetState().ExtLastTimestamp
extExpectedTS := extLastTS
if f.getExpectedRTPTimestamp != nil {
tsExt, err := f.getExpectedRTPTimestamp(time.Now())
diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go
index 3b0f5c6ad..326712771 100644
--- a/pkg/sfu/receiver.go
+++ b/pkg/sfu/receiver.go
@@ -56,7 +56,7 @@ type TrackReceiver interface {
HeaderExtensions() []webrtc.RTPHeaderExtensionParameter
IsClosed() bool
- ReadRTP(buf []byte, layer uint8, sn uint16) (int, error)
+ ReadRTP(buf []byte, layer uint8, esn uint64) (int, error)
GetLayeredBitrate() ([]int32, Bitrates)
GetAudioLevel() (float64, bool)
@@ -576,13 +576,13 @@ func (w *WebRTCReceiver) getBufferLocked(layer int32) *buffer.Buffer {
return w.buffers[layer]
}
-func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {
+func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) {
b := w.getBuffer(int32(layer))
if b == nil {
return 0, ErrBufferNotFound
}
- return b.GetPacket(buf, sn)
+ return b.GetPacket(buf, esn)
}
func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats {
@@ -713,22 +713,6 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
spatialTracker = w.streamTrackerManager.AddTracker(pkt.Spatial)
}
}
- if spatialLayer > buffer.DefaultMaxLayerSpatial { // TODO-REMOVE-AFTER-DEBUG
- w.logger.Warnw(
- "invalid spatial layer", nil,
- "mime", w.codec.MimeType,
- "layer", layer,
- "spatialLayer", spatialLayer,
- "sn", pkt.Packet.SequenceNumber,
- "esn", pkt.ExtSequenceNumber,
- "timestamp", pkt.Packet.Timestamp,
- "ets", pkt.ExtTimestamp,
- "payloadSize", len(pkt.Packet.Payload),
- "rtpVersion", pkt.Packet.Version,
- "payloadType", pkt.Packet.PayloadType,
- "ssrc", pkt.Packet.SSRC,
- )
- }
writeCount := w.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.WriteRTP(pkt, spatialLayer)
diff --git a/pkg/sfu/redprimaryreceiver.go b/pkg/sfu/redprimaryreceiver.go
index 956ee7251..e04d56f82 100644
--- a/pkg/sfu/redprimaryreceiver.go
+++ b/pkg/sfu/redprimaryreceiver.go
@@ -136,8 +136,8 @@ func (r *RedPrimaryReceiver) Close() {
closeTrackSenders(r.downTrackSpreader.ResetAndGetDownTracks())
}
-func (r *RedPrimaryReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {
- n, err := r.TrackReceiver.ReadRTP(buf, layer, sn)
+func (r *RedPrimaryReceiver) ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) {
+ n, err := r.TrackReceiver.ReadRTP(buf, layer, esn)
if err != nil {
return n, err
}
diff --git a/pkg/sfu/redreceiver.go b/pkg/sfu/redreceiver.go
index 9d24b997b..4af4fb5f5 100644
--- a/pkg/sfu/redreceiver.go
+++ b/pkg/sfu/redreceiver.go
@@ -127,7 +127,7 @@ func (r *RedReceiver) Close() {
closeTrackSenders(r.downTrackSpreader.ResetAndGetDownTracks())
}
-func (r *RedReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {
+func (r *RedReceiver) ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) {
// red encoding doesn't support nack
return 0, bucket.ErrPacketMismatch
}
diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go
index 0e8fa78ab..7eb41d776 100644
--- a/pkg/sfu/rtpmunger.go
+++ b/pkg/sfu/rtpmunger.go
@@ -15,8 +15,7 @@
package sfu
import (
- "fmt"
-
+ "github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
@@ -50,26 +49,6 @@ type SnTs struct {
// ----------------------------------------------------------------------
-type RTPMungerState struct {
- ExtLastSN uint64
- ExtSecondLastSN uint64
- ExtLastTS uint64
- ExtSecondLastTS uint64
- LastMarker bool
- SecondLastMarker bool
-}
-
-func (r RTPMungerState) String() string {
- return fmt.Sprintf(
- "RTPMungerState{extLastSN: %d, extSecondLastSN: %d, extLastTS: %d, extSecondLastTS: %d, lastMarker: %v, secondLastMarker: %v)",
- r.ExtLastSN, r.ExtSecondLastSN,
- r.ExtLastTS, r.ExtSecondLastTS,
- r.LastMarker, r.SecondLastMarker,
- )
-}
-
-// ----------------------------------------------------------------------
-
type RTPMunger struct {
logger logger.Logger
@@ -112,14 +91,14 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} {
}
}
-func (r *RTPMunger) GetLast() RTPMungerState {
- return RTPMungerState{
- ExtLastSN: r.extLastSN,
- ExtSecondLastSN: r.extSecondLastSN,
- ExtLastTS: r.extLastTS,
- ExtSecondLastTS: r.extSecondLastTS,
- LastMarker: r.lastMarker,
- SecondLastMarker: r.secondLastMarker,
+func (r *RTPMunger) GetState() *livekit.RTPMungerState {
+ return &livekit.RTPMungerState{
+ ExtLastSequenceNumber: r.extLastSN,
+ ExtSecondLastSequenceNumber: r.extSecondLastSN,
+ ExtLastTimestamp: r.extLastTS,
+ ExtSecondLastTimestamp: r.extSecondLastTS,
+ LastMarker: r.lastMarker,
+ SecondLastMarker: r.secondLastMarker,
}
}
@@ -127,11 +106,11 @@ func (r *RTPMunger) GetTSOffset() uint64 {
return r.tsOffset
}
-func (r *RTPMunger) SeedLast(state RTPMungerState) {
- r.extLastSN = state.ExtLastSN
- r.extSecondLastSN = state.ExtSecondLastSN
- r.extLastTS = state.ExtLastTS
- r.extSecondLastTS = state.ExtSecondLastTS
+func (r *RTPMunger) SeedState(state *livekit.RTPMungerState) {
+ r.extLastSN = state.ExtLastSequenceNumber
+ r.extSecondLastSN = state.ExtSecondLastSequenceNumber
+ r.extLastTS = state.ExtLastTimestamp
+ r.extSecondLastTS = state.ExtSecondLastTimestamp
r.lastMarker = state.LastMarker
r.secondLastMarker = state.SecondLastMarker
}
diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go
index 22750a037..294029cc5 100644
--- a/pkg/sfu/sequencer.go
+++ b/pkg/sfu/sequencer.go
@@ -42,10 +42,10 @@ func itob(i int) bool {
}
type packetMeta struct {
- // Original sequence number from stream.
- // The original sequence number is used to find the original
+ // Original extended sequence number from stream.
+ // The original extended sequence number is used to find the original
// packet from publisher
- sourceSeqNo uint16
+ sourceSeqNo uint64
// Modified sequence number after offset.
// This sequence number is used for the associated
// down track, is modified according the offsets, and
@@ -199,7 +199,7 @@ func (s *sequencer) push(
slot := extModifiedSNAdjusted % uint64(s.size)
s.meta[slot] = packetMeta{
- sourceSeqNo: uint16(extIncomingSN),
+ sourceSeqNo: extIncomingSN,
targetSeqNo: uint16(extModifiedSN),
timestamp: uint32(extModifiedTS),
marker: marker,
diff --git a/pkg/sfu/sequencer_test.go b/pkg/sfu/sequencer_test.go
index 773c54309..862ac531b 100644
--- a/pkg/sfu/sequencer_test.go
+++ b/pkg/sfu/sequencer_test.go
@@ -45,7 +45,7 @@ func Test_sequencer(t *testing.T) {
require.Equal(t, len(req), len(res))
for i, val := range res {
require.Equal(t, val.targetSeqNo, req[i])
- require.Equal(t, val.sourceSeqNo, req[i]-off)
+ require.Equal(t, val.sourceSeqNo, uint64(req[i]-off))
require.Equal(t, val.layer, int8(2))
require.Equal(t, val.extSequenceNumber, uint64(req[i]))
require.Equal(t, val.extTimestamp, uint64(123))
@@ -57,7 +57,7 @@ func Test_sequencer(t *testing.T) {
require.Equal(t, len(req), len(res))
for i, val := range res {
require.Equal(t, val.targetSeqNo, req[i])
- require.Equal(t, val.sourceSeqNo, req[i]-off)
+ require.Equal(t, val.sourceSeqNo, uint64(req[i]-off))
require.Equal(t, val.layer, int8(2))
require.Equal(t, val.extSequenceNumber, uint64(req[i]))
require.Equal(t, val.extTimestamp, uint64(123))
@@ -204,7 +204,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
g := n.getExtPacketMetas(tt.args.seqNo)
var got []uint16
for _, sn := range g {
- got = append(got, sn.sourceSeqNo)
+ got = append(got, uint16(sn.sourceSeqNo))
if sn.sourceSeqNo%5 == 0 {
require.Equal(t, tt.fields.markerOdd, sn.marker)
require.Equal(t, tt.fields.codecBytesOversized, sn.codecBytesSlice)
@@ -343,7 +343,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
g := n.getExtPacketMetas(tt.args.seqNo)
var got []uint16
for _, sn := range g {
- got = append(got, sn.sourceSeqNo)
+ got = append(got, uint16(sn.sourceSeqNo))
if sn.sourceSeqNo%2 == 0 {
require.Equal(t, tt.fields.markerEven, sn.marker)
require.Equal(t, tt.fields.codecBytesEven, sn.codecBytes[:sn.numCodecBytesOut])
diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go
index d18d5a4ab..2a30a6325 100644
--- a/pkg/sfu/utils/wraparound.go
+++ b/pkg/sfu/utils/wraparound.go
@@ -111,8 +111,16 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
return
}
+func (w *WrapAround[T, ET]) UndoUpdate(result WrapAroundUpdateResult[ET]) {
+ if !w.initialized || result.PreExtendedHighest >= result.ExtendedVal {
+ return
+ }
+
+ w.ResetHighest(result.PreExtendedHighest)
+}
+
func (w *WrapAround[T, ET]) Rollover(val T, numCycles int) (result WrapAroundUpdateResult[ET]) {
- if !w.initialized || numCycles == 0 {
+ if !w.initialized || numCycles < 0 {
return w.Update(val)
}
diff --git a/pkg/sfu/utils/wraparound_test.go b/pkg/sfu/utils/wraparound_test.go
index 76cb8fdb3..4f2e505d5 100644
--- a/pkg/sfu/utils/wraparound_test.go
+++ b/pkg/sfu/utils/wraparound_test.go
@@ -476,10 +476,11 @@ func TestWrapAroundUint16Rollover(t *testing.T) {
highest: 10,
extendedHighest: 10,
},
- // zero cycles - should just do an update
+ // negative cycles - should just do an update
{
- name: "zero",
- input: 8,
+ name: "zero",
+ input: 8,
+ numCycles: -1,
updated: WrapAroundUpdateResult[uint32]{
IsUnhandled: true,
// the following fields are not valid when `IsUnhandled = true`, but code fills it in