mirror of
https://github.com/livekit/livekit.git
synced 2026-05-11 21:14:58 +00:00
Merge remote-tracking branch 'origin/master' into raja_min_packets
This commit is contained in:
@@ -25,7 +25,7 @@ jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- uses: shogo82148/actions-setup-redis@v1
|
||||
with:
|
||||
redis-version: "6.x"
|
||||
|
||||
@@ -25,7 +25,7 @@ jobs:
|
||||
docker:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Docker meta
|
||||
id: meta
|
||||
uses: docker/metadata-action@v4
|
||||
|
||||
@@ -28,7 +28,7 @@ jobs:
|
||||
release:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Fetch all tags
|
||||
run: git fetch --force --tags
|
||||
|
||||
|
||||
@@ -55,8 +55,9 @@ func printPorts(c *cli.Context) error {
|
||||
if conf.RTC.TCPPort != 0 {
|
||||
tcpPorts = append(tcpPorts, fmt.Sprintf("%d - ICE/TCP", conf.RTC.TCPPort))
|
||||
}
|
||||
if conf.RTC.UDPPort != 0 {
|
||||
udpPorts = append(udpPorts, fmt.Sprintf("%d - ICE/UDP", conf.RTC.UDPPort))
|
||||
if conf.RTC.UDPPort.Valid() {
|
||||
portStr, _ := conf.RTC.UDPPort.MarshalYAML()
|
||||
udpPorts = append(udpPorts, fmt.Sprintf("%s - ICE/UDP", portStr))
|
||||
} else {
|
||||
udpPorts = append(udpPorts, fmt.Sprintf("%d-%d - ICE/UDP range", conf.RTC.ICEPortRangeStart, conf.RTC.ICEPortRangeEnd))
|
||||
}
|
||||
|
||||
+4
-3
@@ -29,6 +29,7 @@ import (
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/rtc"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
|
||||
"github.com/livekit/protocol/logger"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
@@ -70,9 +71,9 @@ var baseFlags = []cli.Flag{
|
||||
Usage: "IP address of the current node, used to advertise to clients. Automatically determined by default",
|
||||
EnvVars: []string{"NODE_IP"},
|
||||
},
|
||||
&cli.IntFlag{
|
||||
&cli.StringFlag{
|
||||
Name: "udp-port",
|
||||
Usage: "Single UDP port to use for WebRTC traffic",
|
||||
Usage: "UDP port(s) to use for WebRTC traffic",
|
||||
EnvVars: []string{"UDP_PORT"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
@@ -206,7 +207,7 @@ func getConfig(c *cli.Context) (*config.Config, error) {
|
||||
|
||||
if c.String("config") == "" && c.String("config-body") == "" && conf.Development {
|
||||
// use single port UDP when no config is provided
|
||||
conf.RTC.UDPPort = 7882
|
||||
conf.RTC.UDPPort = rtcconfig.PortRange{Start: 7882}
|
||||
conf.RTC.ICEPortRangeStart = 0
|
||||
conf.RTC.ICEPortRangeEnd = 0
|
||||
logger.Infow("starting in development mode")
|
||||
|
||||
+8
-3
@@ -43,10 +43,10 @@ rtc:
|
||||
# that maps to an external one
|
||||
use_external_ip: true
|
||||
# # when set, LiveKit will attempt to use a UDP mux so all UDP traffic goes through
|
||||
# # a single port. This simplifies deployment, but mux will become an overhead for
|
||||
# # highly trafficked deployments.
|
||||
# # listed port(s). To maximize system performance, we recommend using a range of ports
|
||||
# # greater or equal to the number of vCPUs on the machine.
|
||||
# # port_range_start & end must not be set for this config to take effect
|
||||
# udp_port: 7882
|
||||
# udp_port: 7882-7892
|
||||
# # when set to true, server will use a lite ice agent, that will speed up ice connection, but
|
||||
# # might cause connect issue if server running behind NAT.
|
||||
# use_ice_lite: true
|
||||
@@ -107,6 +107,11 @@ rtc:
|
||||
# # are disabled and clients don't ACK opened peer connections, only reliable, ordered delivery
|
||||
# # will be available.
|
||||
# strict_acks: true
|
||||
# # enable batch write to merge network write system calls to reduce cpu usage. Outgoing packets
|
||||
# # will be queued until length of queue equal to `batch_size` or time elapsed since last write exceeds `max_flush_interval`.
|
||||
# batch_io:
|
||||
# batch_size: 128
|
||||
# max_flush_interval: 2ms
|
||||
|
||||
# when enabled, LiveKit will expose prometheus metrics on :6789/metrics
|
||||
# prometheus_port: 6789
|
||||
|
||||
@@ -14,10 +14,10 @@ require (
|
||||
github.com/google/wire v0.5.0
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/hashicorp/go-version v1.6.0
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.5
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.6
|
||||
github.com/jxskiss/base62 v1.1.0
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f
|
||||
github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86
|
||||
github.com/livekit/psrpc v0.3.3
|
||||
github.com/mackerelio/go-osstat v0.2.4
|
||||
@@ -27,7 +27,7 @@ require (
|
||||
github.com/olekukonko/tablewriter v0.0.5
|
||||
github.com/pion/dtls/v2 v2.2.7
|
||||
github.com/pion/ice/v2 v2.3.11
|
||||
github.com/pion/interceptor v0.1.17
|
||||
github.com/pion/interceptor v0.1.18
|
||||
github.com/pion/rtcp v1.2.10
|
||||
github.com/pion/rtp v1.8.1
|
||||
github.com/pion/sctp v1.8.7
|
||||
@@ -38,15 +38,15 @@ require (
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/prometheus/client_golang v1.16.0
|
||||
github.com/redis/go-redis/v9 v9.1.0
|
||||
github.com/rs/cors v1.9.0
|
||||
github.com/rs/cors v1.10.0
|
||||
github.com/stretchr/testify v1.8.4
|
||||
github.com/thoas/go-funk v0.9.3
|
||||
github.com/twitchtv/twirp v8.1.3+incompatible
|
||||
github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3
|
||||
github.com/ua-parser/uap-go v0.0.0-20230823213814-f77b3e91e9dc
|
||||
github.com/urfave/cli/v2 v2.25.7
|
||||
github.com/urfave/negroni/v3 v3.0.0
|
||||
go.uber.org/atomic v1.11.0
|
||||
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
|
||||
golang.org/x/sync v0.3.0
|
||||
google.golang.org/protobuf v1.31.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
@@ -68,6 +68,7 @@ require (
|
||||
github.com/google/uuid v1.3.1 // indirect
|
||||
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
|
||||
github.com/hashicorp/go-retryablehttp v0.7.4 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||
github.com/josharian/native v1.1.0 // indirect
|
||||
github.com/klauspost/compress v1.16.7 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
|
||||
@@ -94,12 +95,12 @@ require (
|
||||
github.com/zeebo/xxh3 v1.0.2 // indirect
|
||||
go.uber.org/multierr v1.10.0 // indirect
|
||||
go.uber.org/zap v1.25.0 // indirect
|
||||
golang.org/x/crypto v0.12.0 // indirect
|
||||
golang.org/x/crypto v0.13.0 // indirect
|
||||
golang.org/x/mod v0.12.0 // indirect
|
||||
golang.org/x/net v0.14.0 // indirect
|
||||
golang.org/x/sys v0.11.0 // indirect
|
||||
golang.org/x/text v0.12.0 // indirect
|
||||
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect
|
||||
golang.org/x/net v0.15.0 // indirect
|
||||
golang.org/x/sys v0.12.0 // indirect
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
golang.org/x/tools v0.13.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 // indirect
|
||||
google.golang.org/grpc v1.57.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
|
||||
@@ -88,8 +88,10 @@ github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZn
|
||||
github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
|
||||
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
|
||||
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
|
||||
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.6 h1:3xi/Cafd1NaoEnS/yDssIiuVeDVywU0QdFGl3aQaQHM=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.6/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
|
||||
github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
|
||||
@@ -123,8 +125,10 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
|
||||
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 h1:cHNvPzn6VHFcsHx8ZC9LwU/4jj22mW3LILrNg/y5A6I=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230905085142-e1fcf8eae216 h1:gwGhhhx+vUXR1dZqpKBautkx7qJAXvgCdQxgluBiUqc=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230905085142-e1fcf8eae216/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f h1:b4ri7hQESRSzJWzXXcmANG2hJ4HTj5LM01Ekm8lnQmg=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
|
||||
github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 h1:QEzGhfIOmGdRw17xIldbYzb1MTsYuVfXSqz8FTyfjWQ=
|
||||
github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU=
|
||||
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
|
||||
@@ -188,8 +192,9 @@ github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ
|
||||
github.com/pion/ice/v2 v2.3.10/go.mod h1:hHGCibDfmXGqukayQw979xEctASp2Pe5Oe0iDU8pRus=
|
||||
github.com/pion/ice/v2 v2.3.11 h1:rZjVmUwyT55cmN8ySMpL7rsS8KYsJERsrxJLLxpKhdw=
|
||||
github.com/pion/ice/v2 v2.3.11/go.mod h1:hPcLC3kxMa+JGRzMHqQzjoSj3xtE9F+eoncmXLlCL4E=
|
||||
github.com/pion/interceptor v0.1.17 h1:prJtgwFh/gB8zMqGZoOgJPHivOwVAp61i2aG61Du/1w=
|
||||
github.com/pion/interceptor v0.1.17/go.mod h1:SY8kpmfVBvrbUzvj2bsXz7OJt5JvmVNZ+4Kjq7FcwrI=
|
||||
github.com/pion/interceptor v0.1.18 h1:Hk26334NUQeUcJNR27YHYKT+sWNhhegQ9KFz5Nn6yMQ=
|
||||
github.com/pion/interceptor v0.1.18/go.mod h1:tpvvF4cPM6NGxFA1DUMbhabzQBxdWMATDGEUYOR9x6I=
|
||||
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
|
||||
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
|
||||
github.com/pion/mdns v0.0.7/go.mod h1:4iP2UbeFhLI/vWju/bw6ZfwjJzk0z8DNValjGxR/dD8=
|
||||
@@ -241,8 +246,8 @@ github.com/redis/go-redis/v9 v9.1.0 h1:137FnGdk+EQdCbye1FW+qOEcY5S+SpY9T0Niuqvtf
|
||||
github.com/redis/go-redis/v9 v9.1.0/go.mod h1:urWj3He21Dj5k4TK1y59xH8Uj6ATueP8AH1cY3lZl4c=
|
||||
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
|
||||
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
|
||||
github.com/rs/cors v1.9.0 h1:l9HGsTsHJcvW14Nk7J9KFz8bzeAWXn3CG6bgt7LsrAE=
|
||||
github.com/rs/cors v1.9.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
|
||||
github.com/rs/cors v1.10.0 h1:62NOS1h+r8p1mW6FM0FSB0exioXLhd/sh15KpjWBZ+8=
|
||||
github.com/rs/cors v1.10.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
|
||||
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw=
|
||||
@@ -265,8 +270,8 @@ github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw=
|
||||
github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
|
||||
github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJXP61mNV3/7iuU=
|
||||
github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A=
|
||||
github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3 h1:YsXCA7ZdgFMgwDpNpYj4y2WPRVrOVVDAkQlFc477T54=
|
||||
github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3/go.mod h1:OBcG9bn7sHtXgarhUEb3OfCnNsgtGnkVf41ilSZ3K3E=
|
||||
github.com/ua-parser/uap-go v0.0.0-20230823213814-f77b3e91e9dc h1:iT5lwxf894PiMq7cnMMQg/7VOD1pxmu//gQuHWAFy4s=
|
||||
github.com/ua-parser/uap-go v0.0.0-20230823213814-f77b3e91e9dc/go.mod h1:BUbeWZiieNxAuuADTBNb3/aeje6on3DhU3rpWsQSB1E=
|
||||
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
|
||||
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
|
||||
github.com/urfave/negroni/v3 v3.0.0 h1:Vo8CeZfu1lFR9gW8GnAb6dOGCJyijfil9j/jKKc/JhU=
|
||||
@@ -293,10 +298,11 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
|
||||
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
|
||||
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
|
||||
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
|
||||
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
|
||||
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
|
||||
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ=
|
||||
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8=
|
||||
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
|
||||
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
||||
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
@@ -330,8 +336,9 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
|
||||
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
|
||||
golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ=
|
||||
golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
|
||||
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
|
||||
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
|
||||
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
|
||||
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
@@ -382,8 +389,9 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
|
||||
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
|
||||
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
@@ -406,8 +414,9 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
|
||||
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
|
||||
golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
|
||||
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
@@ -415,8 +424,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
|
||||
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
|
||||
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 h1:Vve/L0v7CXXuxUmaMGIEK/dEeq7uiqb5qBgQrZzIE7E=
|
||||
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM=
|
||||
golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ=
|
||||
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
||||
@@ -308,7 +308,6 @@ var DefaultConfig = Config{
|
||||
RTCConfig: rtcconfig.RTCConfig{
|
||||
UseExternalIP: false,
|
||||
TCPPort: 7881,
|
||||
UDPPort: 0,
|
||||
ICEPortRangeStart: 0,
|
||||
ICEPortRangeEnd: 0,
|
||||
STUNServers: []string{},
|
||||
@@ -802,7 +801,7 @@ func (conf *Config) updateFromCLI(c *cli.Context, baseFlags []cli.Flag) error {
|
||||
conf.RTC.NodeIP = c.String("node-ip")
|
||||
}
|
||||
if c.IsSet("udp-port") {
|
||||
conf.RTC.UDPPort = uint32(c.Int("udp-port"))
|
||||
conf.RTC.UDPPort.UnmarshalString(c.String("udp-port"))
|
||||
}
|
||||
if c.IsSet("bind") {
|
||||
conf.BindAddresses = c.StringSlice("bind")
|
||||
|
||||
@@ -1713,8 +1713,8 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei
|
||||
ssrc := uint32(track.SSRC())
|
||||
if p.twcc == nil {
|
||||
p.twcc = twcc.NewTransportWideCCResponder(ssrc)
|
||||
p.twcc.OnFeedback(func(pkt rtcp.RawPacket) {
|
||||
p.postRtcp([]rtcp.Packet{&pkt})
|
||||
p.twcc.OnFeedback(func(pkts []rtcp.Packet) {
|
||||
p.postRtcp(pkts)
|
||||
})
|
||||
}
|
||||
p.pendingTracksLock.Unlock()
|
||||
|
||||
@@ -661,7 +661,6 @@ func newParticipantForTestWithOpts(identity livekit.ParticipantIdentity, opts *p
|
||||
}
|
||||
conf, _ := config.NewConfig("", true, nil, nil)
|
||||
// disable mux, it doesn't play too well with unit test
|
||||
conf.RTC.UDPPort = 0
|
||||
conf.RTC.TCPPort = 0
|
||||
rtcConf, err := NewWebRTCConfig(conf)
|
||||
if err != nil {
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
"github.com/pion/interceptor/pkg/gcc"
|
||||
"github.com/pion/interceptor/pkg/twcc"
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/sctp"
|
||||
"github.com/pion/sdp/v3"
|
||||
"github.com/pion/webrtc/v3"
|
||||
"github.com/pkg/errors"
|
||||
@@ -831,7 +832,9 @@ func (t *PCTransport) CreateDataChannel(label string, dci *webrtc.DataChannelIni
|
||||
}
|
||||
|
||||
dcErrorHandler := func(err error) {
|
||||
t.params.Logger.Errorw(dc.Label()+" data channel error", err)
|
||||
if !errors.Is(err, sctp.ErrResetPacketInStateNotExist) {
|
||||
t.params.Logger.Errorw(dc.Label()+" data channel error", err)
|
||||
}
|
||||
}
|
||||
|
||||
t.lock.Lock()
|
||||
|
||||
@@ -556,7 +556,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa
|
||||
_ = r.refreshToken(participant)
|
||||
tokenTicker := time.NewTicker(tokenRefreshInterval)
|
||||
defer tokenTicker.Stop()
|
||||
stateCheckTicker := time.NewTicker(time.Millisecond * 50)
|
||||
stateCheckTicker := time.NewTicker(time.Millisecond * 500)
|
||||
defer stateCheckTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
|
||||
@@ -220,7 +220,7 @@ func (s *LivekitServer) Start() error {
|
||||
if s.config.RTC.TCPPort != 0 {
|
||||
values = append(values, "rtc.portTCP", s.config.RTC.TCPPort)
|
||||
}
|
||||
if !s.config.RTC.ForceTCP && s.config.RTC.UDPPort != 0 {
|
||||
if !s.config.RTC.ForceTCP && s.config.RTC.UDPPort.Valid() {
|
||||
values = append(values, "rtc.portUDP", s.config.RTC.UDPPort)
|
||||
} else {
|
||||
values = append(values,
|
||||
|
||||
@@ -644,8 +644,6 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32
|
||||
if extHighestSNOverridden < r.sequenceNumber.GetExtendedStart() {
|
||||
// it is possible that the `LastSequenceNumber` in the receiver report is before the starting
|
||||
// sequence number when dummy packets are used to trigger Pion's OnTrack path.
|
||||
r.lastRRTime = time.Now()
|
||||
r.lastRR = rr
|
||||
return
|
||||
}
|
||||
|
||||
@@ -707,7 +705,7 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32
|
||||
return
|
||||
}
|
||||
|
||||
func (r *RTPStats) LastReceiverReport() time.Time {
|
||||
func (r *RTPStats) LastReceiverReportTime() time.Time {
|
||||
r.lock.RLock()
|
||||
defer r.lock.RUnlock()
|
||||
|
||||
@@ -1211,7 +1209,7 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
|
||||
}
|
||||
|
||||
func (r *RTPStats) DeltaInfoOverridden(snapshotId uint32) *RTPDeltaInfo {
|
||||
if !r.params.IsReceiverReportDriven {
|
||||
if !r.params.IsReceiverReportDriven || r.lastRRTime.IsZero() {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -42,6 +42,7 @@ type ConnectionStatsParams struct {
|
||||
GetDeltaStats func() map[uint32]*buffer.StreamStatsWithLayers
|
||||
GetDeltaStatsOverridden func() map[uint32]*buffer.StreamStatsWithLayers
|
||||
GetLastReceiverReportTime func() time.Time
|
||||
GetTotalPacketsSent func() uint64
|
||||
Logger logger.Logger
|
||||
}
|
||||
|
||||
@@ -54,6 +55,7 @@ type ConnectionStats struct {
|
||||
onStatsUpdate func(cs *ConnectionStats, stat *livekit.AnalyticsStat)
|
||||
|
||||
lock sync.RWMutex
|
||||
packetsSent uint64
|
||||
streamingStartedAt time.Time
|
||||
|
||||
scorer *qualityScorer
|
||||
@@ -213,13 +215,11 @@ func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, at
|
||||
}
|
||||
|
||||
func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) {
|
||||
if cs.params.GetDeltaStatsOverridden == nil || cs.params.GetLastReceiverReportTime == nil {
|
||||
if cs.params.GetDeltaStatsOverridden == nil || cs.params.GetLastReceiverReportTime == nil || cs.params.GetTotalPacketsSent == nil {
|
||||
return MinMOS, nil
|
||||
}
|
||||
|
||||
cs.lock.RLock()
|
||||
streamingStartedAt := cs.streamingStartedAt
|
||||
cs.lock.RUnlock()
|
||||
streamingStartedAt := cs.updateStreamingStart(at)
|
||||
if streamingStartedAt.IsZero() {
|
||||
// not streaming, just return current score
|
||||
mos, _ := cs.scorer.GetMOSAndQuality()
|
||||
@@ -260,6 +260,11 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32,
|
||||
}
|
||||
|
||||
func (cs *ConnectionStats) updateScoreAt(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) {
|
||||
if cs.params.GetDeltaStatsOverridden != nil {
|
||||
// receiver report based quality scoring, use stats from receiver report for scoring
|
||||
return cs.updateScoreFromReceiverReport(at)
|
||||
}
|
||||
|
||||
if cs.params.GetDeltaStats == nil {
|
||||
return MinMOS, nil
|
||||
}
|
||||
@@ -275,33 +280,25 @@ func (cs *ConnectionStats) updateScoreAt(at time.Time) (float32, map[uint32]*buf
|
||||
deltaInfoList = append(deltaInfoList, s.RTPStats)
|
||||
}
|
||||
agg := buffer.AggregateRTPDeltaInfo(deltaInfoList)
|
||||
if agg != nil && agg.Packets > 0 {
|
||||
// not very accurate as streaming could have started part way in the window, but don't need accurate time
|
||||
cs.maybeSetStreamingStart(agg.StartTime)
|
||||
} else {
|
||||
cs.clearStreamingStart()
|
||||
}
|
||||
|
||||
if cs.params.GetDeltaStatsOverridden != nil {
|
||||
// receiver report based quality scoring, use stats from receiver report for scoring
|
||||
return cs.updateScoreFromReceiverReport(at)
|
||||
}
|
||||
|
||||
return cs.updateScoreWithAggregate(agg, at), streams
|
||||
}
|
||||
|
||||
func (cs *ConnectionStats) maybeSetStreamingStart(at time.Time) {
|
||||
func (cs *ConnectionStats) updateStreamingStart(at time.Time) time.Time {
|
||||
cs.lock.Lock()
|
||||
if cs.streamingStartedAt.IsZero() {
|
||||
cs.streamingStartedAt = at
|
||||
}
|
||||
cs.lock.Unlock()
|
||||
}
|
||||
defer cs.lock.Unlock()
|
||||
|
||||
func (cs *ConnectionStats) clearStreamingStart() {
|
||||
cs.lock.Lock()
|
||||
cs.streamingStartedAt = time.Time{}
|
||||
cs.lock.Unlock()
|
||||
packetsSent := cs.params.GetTotalPacketsSent()
|
||||
if packetsSent > cs.packetsSent {
|
||||
if cs.streamingStartedAt.IsZero() {
|
||||
// the start could be anywhere after last update, but using `at` as this is not required to be accurate
|
||||
cs.streamingStartedAt = at
|
||||
}
|
||||
} else {
|
||||
cs.streamingStartedAt = time.Time{}
|
||||
}
|
||||
cs.packetsSent = packetsSent
|
||||
|
||||
return cs.streamingStartedAt
|
||||
}
|
||||
|
||||
func (cs *ConnectionStats) getStat() {
|
||||
|
||||
+35
-36
@@ -129,14 +129,13 @@ var (
|
||||
|
||||
type DownTrackState struct {
|
||||
RTPStats *buffer.RTPStats
|
||||
DeltaStatsSnapshotId uint32
|
||||
DeltaStatsOverriddenSnapshotId uint32
|
||||
ForwarderState ForwarderState
|
||||
}
|
||||
|
||||
func (d DownTrackState) String() string {
|
||||
return fmt.Sprintf("DownTrackState{rtpStats: %s, delta: %d, deltaOverridden: %d, forwarder: %s}",
|
||||
d.RTPStats.ToString(), d.DeltaStatsSnapshotId, d.DeltaStatsOverriddenSnapshotId, d.ForwarderState.String())
|
||||
return fmt.Sprintf("DownTrackState{rtpStats: %s, deltaOverridden: %d, forwarder: %s}",
|
||||
d.RTPStats.ToString(), d.DeltaStatsOverriddenSnapshotId, d.ForwarderState.String())
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
@@ -237,6 +236,7 @@ type DownTrack struct {
|
||||
isClosed atomic.Bool
|
||||
connected atomic.Bool
|
||||
bindAndConnectedOnce atomic.Bool
|
||||
writable atomic.Bool
|
||||
|
||||
rtpStats *buffer.RTPStats
|
||||
|
||||
@@ -247,7 +247,6 @@ type DownTrack struct {
|
||||
blankFramesGeneration atomic.Uint32
|
||||
|
||||
connectionStats *connectionquality.ConnectionStats
|
||||
deltaStatsSnapshotId uint32
|
||||
deltaStatsOverriddenSnapshotId uint32
|
||||
|
||||
isNACKThrottled atomic.Bool
|
||||
@@ -309,15 +308,14 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) {
|
||||
IsReceiverReportDriven: true,
|
||||
Logger: params.Logger,
|
||||
})
|
||||
d.deltaStatsSnapshotId = d.rtpStats.NewSnapshotId()
|
||||
d.deltaStatsOverriddenSnapshotId = d.rtpStats.NewSnapshotId()
|
||||
|
||||
d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
|
||||
MimeType: codecs[0].MimeType, // LK-TODO have to notify on codec change
|
||||
IsFECEnabled: strings.EqualFold(codecs[0].MimeType, webrtc.MimeTypeOpus) && strings.Contains(strings.ToLower(codecs[0].SDPFmtpLine), "fec"),
|
||||
GetDeltaStats: d.getDeltaStats,
|
||||
GetDeltaStatsOverridden: d.getDeltaStatsOverridden,
|
||||
GetLastReceiverReportTime: func() time.Time { return d.rtpStats.LastReceiverReport() },
|
||||
GetLastReceiverReportTime: func() time.Time { return d.rtpStats.LastReceiverReportTime() },
|
||||
GetTotalPacketsSent: func() uint64 { return d.rtpStats.GetTotalPacketsPrimary() },
|
||||
Logger: params.Logger.WithValues("direction", "down"),
|
||||
})
|
||||
d.connectionStats.OnStatsUpdate(func(_cs *connectionquality.ConnectionStats, stat *livekit.AnalyticsStat) {
|
||||
@@ -327,7 +325,6 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) {
|
||||
})
|
||||
|
||||
// set initial playout delay to minimum value
|
||||
|
||||
if d.params.PlayoutDelayLimit.GetEnabled() && d.params.PlayoutDelayLimit.GetMin() > 0 {
|
||||
delay := rtpextension.PlayoutDelayFromValue(
|
||||
uint16(d.params.PlayoutDelayLimit.GetMin()),
|
||||
@@ -420,7 +417,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
|
||||
d.forwarder.DetermineCodec(d.codec, d.params.Receiver.HeaderExtensions())
|
||||
|
||||
d.params.Logger.Debugw("downtrack bound")
|
||||
d.onBindAndConnected()
|
||||
d.onBindAndConnectedChange()
|
||||
|
||||
return codec, nil
|
||||
}
|
||||
@@ -429,6 +426,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
|
||||
// because a track has been stopped.
|
||||
func (d *DownTrack) Unbind(_ webrtc.TrackLocalContext) error {
|
||||
d.bound.Store(false)
|
||||
d.onBindAndConnectedChange()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -600,7 +598,7 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) {
|
||||
return
|
||||
}
|
||||
|
||||
if d.connected.Load() {
|
||||
if d.writable.Load() {
|
||||
d.params.Logger.Debugw("sending PLI for layer lock", "generation", generation, "layer", layer)
|
||||
d.params.Receiver.SendPLI(layer, false)
|
||||
d.rtpStats.UpdateLayerLockPliAndTime(1)
|
||||
@@ -608,7 +606,7 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) {
|
||||
|
||||
<-ticker.C
|
||||
|
||||
if generation != d.keyFrameRequestGeneration.Load() || !d.bound.Load() {
|
||||
if generation != d.keyFrameRequestGeneration.Load() || !d.writable.Load() {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -643,7 +641,7 @@ func (d *DownTrack) maxLayerNotifierWorker() {
|
||||
|
||||
// WriteRTP writes an RTP Packet to the DownTrack
|
||||
func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
|
||||
if !d.bound.Load() || !d.connected.Load() {
|
||||
if !d.writable.Load() {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -677,7 +675,10 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
|
||||
return err
|
||||
}
|
||||
|
||||
extensions := []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: tp.ddBytes}}
|
||||
var extensions []pacer.ExtensionData
|
||||
if tp.ddBytes != nil {
|
||||
extensions = []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: tp.ddBytes}}
|
||||
}
|
||||
if d.playoutDelayExtID != 0 && !d.playoudDelayAcked.Load() {
|
||||
if val := d.playoutDelayBytes.Load(); val != nil {
|
||||
extensions = append(extensions, pacer.ExtensionData{ID: uint8(d.playoutDelayExtID), Payload: val.([]byte)})
|
||||
@@ -717,17 +718,19 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
|
||||
// WritePaddingRTP tries to write as many padding only RTP packets as necessary
|
||||
// to satisfy given size to the DownTrack
|
||||
func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMarker bool) int {
|
||||
if !d.writable.Load() {
|
||||
return 0
|
||||
}
|
||||
|
||||
if !d.rtpStats.IsActive() && !paddingOnMute {
|
||||
return 0
|
||||
}
|
||||
|
||||
// LK-TODO-START
|
||||
// Ideally should look at header extensions negotiated for
|
||||
// track and decide if padding can be sent. But, browsers behave
|
||||
// in unexpected ways when using audio for bandwidth estimation and
|
||||
// padding is mainly used to probe for excess available bandwidth.
|
||||
// So, to be safe, limit to video tracks
|
||||
// LK-TODO-END
|
||||
if d.kind == webrtc.RTPCodecTypeAudio {
|
||||
return 0
|
||||
}
|
||||
@@ -741,6 +744,12 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
|
||||
return 0
|
||||
}
|
||||
|
||||
// Hold sending padding packets till first RTCP-RR is received for this RTP stream.
|
||||
// That is definitive proof that the remote side knows about this RTP stream.
|
||||
if d.rtpStats.LastReceiverReportTime().IsZero() && !paddingOnMute {
|
||||
return 0
|
||||
}
|
||||
|
||||
// RTP padding maximum is 255 bytes. Break it up.
|
||||
// Use 20 byte as estimate of RTP header size (12 byte header + 8 byte extension)
|
||||
num := (bytesToSend + RTPPaddingMaxPayloadSize + RTPPaddingEstimatedHeaderSize - 1) / (RTPPaddingMaxPayloadSize + RTPPaddingEstimatedHeaderSize)
|
||||
@@ -753,16 +762,8 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
|
||||
return 0
|
||||
}
|
||||
|
||||
// LK-TODO Look at load balancing a la sfu.Receiver to spread across available CPUs
|
||||
bytesSent := 0
|
||||
for i := 0; i < len(snts); i++ {
|
||||
// LK-TODO-START
|
||||
// Hold sending padding packets till first RTCP-RR is received for this RTP stream.
|
||||
// That is definitive proof that the remote side knows about this RTP stream.
|
||||
// The packet count check at the beginning of this function gates sending padding
|
||||
// on as yet unstarted streams which is a reasonable check.
|
||||
// LK-TODO-END
|
||||
|
||||
hdr := rtp.Header{
|
||||
Version: 2,
|
||||
Padding: true,
|
||||
@@ -803,7 +804,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
|
||||
bytesSent += hdr.MarshalSize() + len(payload)
|
||||
}
|
||||
|
||||
// STREAM_ALLOCATOR-TODO: change this to pull this counter from stream allocator so that counter can be update in pacer callback
|
||||
// STREAM_ALLOCATOR-TODO: change this to pull this counter from stream allocator so that counter can be updated in pacer callback
|
||||
return bytesSent
|
||||
}
|
||||
|
||||
@@ -970,7 +971,6 @@ func (d *DownTrack) MaxLayer() buffer.VideoLayer {
|
||||
func (d *DownTrack) GetState() DownTrackState {
|
||||
dts := DownTrackState{
|
||||
RTPStats: d.rtpStats,
|
||||
DeltaStatsSnapshotId: d.deltaStatsSnapshotId,
|
||||
DeltaStatsOverriddenSnapshotId: d.deltaStatsOverriddenSnapshotId,
|
||||
ForwarderState: d.forwarder.GetState(),
|
||||
}
|
||||
@@ -979,7 +979,6 @@ func (d *DownTrack) GetState() DownTrackState {
|
||||
|
||||
func (d *DownTrack) SeedState(state DownTrackState) {
|
||||
d.rtpStats.Seed(state.RTPStats)
|
||||
d.deltaStatsSnapshotId = state.DeltaStatsSnapshotId
|
||||
d.deltaStatsOverriddenSnapshotId = state.DeltaStatsOverriddenSnapshotId
|
||||
d.forwarder.SeedState(state.ForwarderState)
|
||||
}
|
||||
@@ -1221,8 +1220,8 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport {
|
||||
func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
// don't send if nothing has been sent
|
||||
if !d.rtpStats.IsActive() {
|
||||
// don't send if not writable OR nothing has been sent
|
||||
if !d.writable.Load() || !d.rtpStats.IsActive() {
|
||||
close(done)
|
||||
return
|
||||
}
|
||||
@@ -1489,7 +1488,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
|
||||
|
||||
func (d *DownTrack) SetConnected() {
|
||||
if !d.connected.Swap(true) {
|
||||
d.onBindAndConnected()
|
||||
d.onBindAndConnectedChange()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1689,10 +1688,6 @@ func (d *DownTrack) deltaStats(ds *buffer.RTPDeltaInfo) map[uint32]*buffer.Strea
|
||||
return streamStats
|
||||
}
|
||||
|
||||
func (d *DownTrack) getDeltaStats() map[uint32]*buffer.StreamStatsWithLayers {
|
||||
return d.deltaStats(d.rtpStats.DeltaInfo(d.deltaStatsSnapshotId))
|
||||
}
|
||||
|
||||
func (d *DownTrack) getDeltaStatsOverridden() map[uint32]*buffer.StreamStatsWithLayers {
|
||||
return d.deltaStats(d.rtpStats.DeltaInfoOverridden(d.deltaStatsOverriddenSnapshotId))
|
||||
}
|
||||
@@ -1707,7 +1702,7 @@ func (d *DownTrack) GetAndResetBytesSent() (uint32, uint32) {
|
||||
return d.bytesSent.Swap(0), d.bytesRetransmitted.Swap(0)
|
||||
}
|
||||
|
||||
func (d *DownTrack) onBindAndConnected() {
|
||||
func (d *DownTrack) onBindAndConnectedChange() {
|
||||
if d.connected.Load() && d.bound.Load() && !d.bindAndConnectedOnce.Swap(true) {
|
||||
if d.kind == webrtc.RTPCodecTypeVideo {
|
||||
_, layer := d.forwarder.CheckSync()
|
||||
@@ -1720,6 +1715,7 @@ func (d *DownTrack) onBindAndConnected() {
|
||||
go d.sendPaddingOnMute()
|
||||
}
|
||||
}
|
||||
d.writable.Store(d.connected.Load() && d.bound.Load())
|
||||
}
|
||||
|
||||
func (d *DownTrack) sendPaddingOnMute() {
|
||||
@@ -1783,8 +1779,11 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() {
|
||||
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
|
||||
TransportWideExtID: uint8(d.transportWideExtID),
|
||||
WriteStream: d.writeStream,
|
||||
Metadata: sendPacketMetadata{},
|
||||
OnSent: d.packetSent,
|
||||
Metadata: sendPacketMetadata{
|
||||
// although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only
|
||||
isPadding: true,
|
||||
},
|
||||
OnSent: d.packetSent,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
+43
-20
@@ -273,6 +273,15 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [
|
||||
}
|
||||
f.codec = codec
|
||||
|
||||
ddAvailable := func(exts []webrtc.RTPHeaderExtensionParameter) bool {
|
||||
for _, ext := range exts {
|
||||
if ext.URI == dd.ExtensionURI {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
switch strings.ToLower(codec.MimeType) {
|
||||
case "video/vp8":
|
||||
f.codecMunger = codecmunger.NewVP8FromNull(f.codecMunger, f.logger)
|
||||
@@ -289,15 +298,8 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [
|
||||
f.vls = videolayerselector.NewSimulcast(f.logger)
|
||||
}
|
||||
case "video/vp9":
|
||||
isDDAvailable := false
|
||||
searchDone:
|
||||
for _, ext := range extensions {
|
||||
switch ext.URI {
|
||||
case dd.ExtensionURI:
|
||||
isDDAvailable = true
|
||||
break searchDone
|
||||
}
|
||||
}
|
||||
isDDAvailable := ddAvailable(extensions)
|
||||
|
||||
if isDDAvailable {
|
||||
if f.vls != nil {
|
||||
f.vls = videolayerselector.NewDependencyDescriptorFromNull(f.vls)
|
||||
@@ -314,12 +316,22 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [
|
||||
// SVC-TODO: Support for VP9 simulcast. When DD is not available, have to pick selector based on VP9 SVC or Simulcast
|
||||
case "video/av1":
|
||||
// DD-TODO : we only enable dd layer selector for av1/vp9 now, in the future we can enable it for vp8 too
|
||||
if f.vls != nil {
|
||||
f.vls = videolayerselector.NewDependencyDescriptorFromNull(f.vls)
|
||||
|
||||
isDDAvailable := ddAvailable(extensions)
|
||||
if isDDAvailable {
|
||||
if f.vls != nil {
|
||||
f.vls = videolayerselector.NewDependencyDescriptorFromNull(f.vls)
|
||||
} else {
|
||||
f.vls = videolayerselector.NewDependencyDescriptor(f.logger)
|
||||
}
|
||||
} else {
|
||||
f.vls = videolayerselector.NewDependencyDescriptor(f.logger)
|
||||
if f.vls != nil {
|
||||
f.vls = videolayerselector.NewSimulcastFromNull(f.vls)
|
||||
} else {
|
||||
f.vls = videolayerselector.NewSimulcast(f.logger)
|
||||
}
|
||||
}
|
||||
// SVC-TODO: Support for AV1 Simulcast or just single spatial layer - won't have DD in that case
|
||||
// SVC-TODO: Support for AV1 Simulcast
|
||||
}
|
||||
}
|
||||
|
||||
@@ -799,7 +811,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b
|
||||
if f.provisional.muted || f.provisional.pubMuted {
|
||||
f.provisional.allocatedLayer = buffer.InvalidLayer
|
||||
return VideoTransition{
|
||||
From: f.vls.GetTarget(),
|
||||
From: existingTargetLayer,
|
||||
To: f.provisional.allocatedLayer,
|
||||
BandwidthDelta: -getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested),
|
||||
}
|
||||
@@ -1288,12 +1300,18 @@ func (f *Forwarder) Pause(availableLayers []int32, brs Bitrates) VideoAllocation
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
existingTargetLayer := f.vls.GetTarget()
|
||||
if !existingTargetLayer.IsValid() {
|
||||
// already paused
|
||||
return f.lastAllocation
|
||||
}
|
||||
|
||||
maxLayer := f.vls.GetMax()
|
||||
maxSeenLayer := f.vls.GetMaxSeen()
|
||||
optimalBandwidthNeeded := getOptimalBandwidthNeeded(f.muted, f.pubMuted, maxSeenLayer.Spatial, brs, maxLayer)
|
||||
alloc := VideoAllocation{
|
||||
BandwidthRequested: 0,
|
||||
BandwidthDelta: 0 - getBandwidthNeeded(brs, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested),
|
||||
BandwidthDelta: 0 - getBandwidthNeeded(brs, existingTargetLayer, f.lastAllocation.BandwidthRequested),
|
||||
Bitrates: brs,
|
||||
BandwidthNeeded: optimalBandwidthNeeded,
|
||||
TargetLayer: buffer.InvalidLayer,
|
||||
@@ -1442,6 +1460,15 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
|
||||
"referenceLayerSpatial", f.referenceLayerSpatial,
|
||||
)
|
||||
return nil
|
||||
} else if f.referenceLayerSpatial == buffer.InvalidLayerSpatial {
|
||||
f.referenceLayerSpatial = layer
|
||||
f.logger.Debugw(
|
||||
"catch up forwarding",
|
||||
"sequenceNumber", extPkt.Packet.SequenceNumber,
|
||||
"timestamp", extPkt.Packet.Timestamp,
|
||||
"layer", layer,
|
||||
"referenceLayerSpatial", f.referenceLayerSpatial,
|
||||
)
|
||||
}
|
||||
|
||||
logTransition := func(message string, extExpectedTS, extRefTS, extLastTS uint64, diffSeconds float64) {
|
||||
@@ -1595,9 +1622,6 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
|
||||
|
||||
// should be called with lock held
|
||||
func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer int32, tp *TranslationParams) (*TranslationParams, error) {
|
||||
if tp == nil {
|
||||
tp = &TranslationParams{}
|
||||
}
|
||||
if f.lastSSRC != extPkt.Packet.SSRC {
|
||||
if err := f.processSourceSwitch(extPkt, layer); err != nil {
|
||||
tp.shouldDrop = true
|
||||
@@ -1622,7 +1646,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i
|
||||
|
||||
// should be called with lock held
|
||||
func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error) {
|
||||
return f.getTranslationParamsCommon(extPkt, layer, nil)
|
||||
return f.getTranslationParamsCommon(extPkt, layer, &TranslationParams{})
|
||||
}
|
||||
|
||||
// should be called with lock held
|
||||
@@ -1634,7 +1658,6 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
|
||||
}
|
||||
|
||||
tp := &TranslationParams{}
|
||||
|
||||
if !f.vls.GetTarget().IsValid() {
|
||||
// stream is paused by streamallocator
|
||||
tp.shouldDrop = true
|
||||
|
||||
+62
-47
@@ -72,9 +72,12 @@ type RTPMunger struct {
|
||||
|
||||
extLastSN uint64
|
||||
extSecondLastSN uint64
|
||||
extLastTS uint64
|
||||
tsOffset uint64
|
||||
lastMarker bool
|
||||
snOffset uint64
|
||||
|
||||
extLastTS uint64
|
||||
tsOffset uint64
|
||||
|
||||
lastMarker bool
|
||||
|
||||
extRtxGateSn uint64
|
||||
isInRtxGateRegion bool
|
||||
@@ -88,12 +91,11 @@ func NewRTPMunger(logger logger.Logger) *RTPMunger {
|
||||
}
|
||||
|
||||
func (r *RTPMunger) DebugInfo() map[string]interface{} {
|
||||
snOffset, _ := r.snRangeMap.GetValue(r.extHighestIncomingSN + 1)
|
||||
return map[string]interface{}{
|
||||
"ExtHighestIncomingSN": r.extHighestIncomingSN,
|
||||
"ExtLastSN": r.extLastSN,
|
||||
"ExtSecondLastSN": r.extSecondLastSN,
|
||||
"SNOffset": snOffset,
|
||||
"SNOffset": r.snOffset,
|
||||
"ExtLastTS": r.extLastTS,
|
||||
"TSOffset": r.tsOffset,
|
||||
"LastMarker": r.lastMarker,
|
||||
@@ -116,14 +118,20 @@ func (r *RTPMunger) SeedLast(state RTPMungerState) {
|
||||
|
||||
func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
|
||||
r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1
|
||||
|
||||
r.extLastSN = extPkt.ExtSequenceNumber
|
||||
r.extSecondLastSN = r.extLastSN - 1
|
||||
r.updateSnOffset()
|
||||
|
||||
r.extLastTS = extPkt.ExtTimestamp
|
||||
}
|
||||
|
||||
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint64, tsAdjust uint64) {
|
||||
r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1
|
||||
|
||||
r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust)
|
||||
r.updateSnOffset()
|
||||
|
||||
r.tsOffset = extPkt.ExtTimestamp - r.extLastTS - tsAdjust
|
||||
}
|
||||
|
||||
@@ -148,16 +156,42 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) {
|
||||
}
|
||||
|
||||
r.extLastSN = r.extSecondLastSN
|
||||
r.updateSnOffset()
|
||||
}
|
||||
|
||||
func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) {
|
||||
diff := int64(extPkt.ExtSequenceNumber - r.extHighestIncomingSN)
|
||||
if (diff == 1 && len(extPkt.Packet.Payload) != 0) || diff > 1 {
|
||||
// in-order - either contiguous packet with payload OR packet following a gap, may or may not have payload
|
||||
r.extHighestIncomingSN = extPkt.ExtSequenceNumber
|
||||
|
||||
ordering := SequenceNumberOrderingContiguous
|
||||
if diff > 1 {
|
||||
ordering = SequenceNumberOrderingGap
|
||||
}
|
||||
|
||||
extMungedSN := extPkt.ExtSequenceNumber - r.snOffset
|
||||
extMungedTS := extPkt.ExtTimestamp - r.tsOffset
|
||||
|
||||
r.extSecondLastSN = r.extLastSN
|
||||
r.extLastSN = extMungedSN
|
||||
r.extLastTS = extMungedTS
|
||||
r.lastMarker = extPkt.Packet.Marker
|
||||
|
||||
if extPkt.KeyFrame {
|
||||
r.extRtxGateSn = extMungedSN
|
||||
r.isInRtxGateRegion = true
|
||||
}
|
||||
|
||||
if r.isInRtxGateRegion && (extMungedSN-r.extRtxGateSn) > RtxGateWindow {
|
||||
r.isInRtxGateRegion = false
|
||||
}
|
||||
|
||||
// can get duplicate packet due to FEC
|
||||
if diff == 0 {
|
||||
return &TranslationParamsRTP{
|
||||
snOrdering: SequenceNumberOrderingDuplicate,
|
||||
}, ErrDuplicatePacket
|
||||
snOrdering: ordering,
|
||||
sequenceNumber: uint16(extMungedSN),
|
||||
timestamp: uint32(extMungedTS),
|
||||
}, nil
|
||||
}
|
||||
|
||||
if diff < 0 {
|
||||
@@ -176,53 +210,25 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
|
||||
}, nil
|
||||
}
|
||||
|
||||
ordering := SequenceNumberOrderingContiguous
|
||||
if diff > 1 {
|
||||
ordering = SequenceNumberOrderingGap
|
||||
}
|
||||
|
||||
r.extHighestIncomingSN = extPkt.ExtSequenceNumber
|
||||
|
||||
// if padding only packet, can be dropped and sequence number adjusted, if contiguous
|
||||
if diff == 1 && len(extPkt.Packet.Payload) == 0 {
|
||||
if diff == 1 {
|
||||
r.extHighestIncomingSN = extPkt.ExtSequenceNumber
|
||||
|
||||
if err := r.snRangeMap.ExcludeRange(r.extHighestIncomingSN, r.extHighestIncomingSN+1); err != nil {
|
||||
r.logger.Errorw("could not exclude range", err, "sn", r.extHighestIncomingSN)
|
||||
}
|
||||
|
||||
r.updateSnOffset()
|
||||
|
||||
return &TranslationParamsRTP{
|
||||
snOrdering: ordering,
|
||||
snOrdering: SequenceNumberOrderingContiguous,
|
||||
}, ErrPaddingOnlyPacket
|
||||
}
|
||||
|
||||
snOffset, err := r.snRangeMap.GetValue(extPkt.ExtSequenceNumber)
|
||||
if err != nil {
|
||||
r.logger.Errorw("could not get sequence number adjustment", err, "sn", extPkt.ExtSequenceNumber, "payloadSize", len(extPkt.Packet.Payload))
|
||||
return &TranslationParamsRTP{
|
||||
snOrdering: ordering,
|
||||
}, ErrSequenceNumberOffsetNotFound
|
||||
}
|
||||
|
||||
extMungedSN := extPkt.ExtSequenceNumber - snOffset
|
||||
extMungedTS := extPkt.ExtTimestamp - r.tsOffset
|
||||
|
||||
r.extSecondLastSN = r.extLastSN
|
||||
r.extLastSN = extMungedSN
|
||||
r.extLastTS = extMungedTS
|
||||
r.lastMarker = extPkt.Packet.Marker
|
||||
|
||||
if extPkt.KeyFrame {
|
||||
r.extRtxGateSn = extMungedSN
|
||||
r.isInRtxGateRegion = true
|
||||
}
|
||||
|
||||
if r.isInRtxGateRegion && (extMungedSN-r.extRtxGateSn) > RtxGateWindow {
|
||||
r.isInRtxGateRegion = false
|
||||
}
|
||||
|
||||
// can get duplicate packet due to FEC
|
||||
return &TranslationParamsRTP{
|
||||
snOrdering: ordering,
|
||||
sequenceNumber: uint16(extMungedSN),
|
||||
timestamp: uint32(extMungedTS),
|
||||
}, nil
|
||||
snOrdering: SequenceNumberOrderingDuplicate,
|
||||
}, ErrDuplicatePacket
|
||||
}
|
||||
|
||||
func (r *RTPMunger) FilterRTX(nacks []uint16) []uint16 {
|
||||
@@ -283,6 +289,7 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate
|
||||
r.extSecondLastSN = extLastSN - 1
|
||||
r.extLastSN = extLastSN
|
||||
r.snRangeMap.DecValue(uint64(num))
|
||||
r.updateSnOffset()
|
||||
|
||||
r.tsOffset -= extLastTS - r.extLastTS
|
||||
r.extLastTS = extLastTS
|
||||
@@ -297,3 +304,11 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate
|
||||
func (r *RTPMunger) IsOnFrameBoundary() bool {
|
||||
return r.lastMarker
|
||||
}
|
||||
|
||||
func (r *RTPMunger) updateSnOffset() {
|
||||
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN + 1)
|
||||
if err != nil {
|
||||
r.logger.Errorw("could not get SN offset", err)
|
||||
}
|
||||
r.snOffset = snOffset
|
||||
}
|
||||
|
||||
@@ -883,8 +883,8 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
|
||||
return
|
||||
}
|
||||
|
||||
// already streaming at some layer and transition is not requesting any change, i. e. BandwidthDelta == 0
|
||||
if transition.From.IsValid() && transition.BandwidthDelta == 0 {
|
||||
// a no-op transition
|
||||
if transition.From == transition.To {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
+25
-12
@@ -29,10 +29,11 @@ type extendedNumber interface {
|
||||
type WrapAround[T number, ET extendedNumber] struct {
|
||||
fullRange ET
|
||||
|
||||
initialized bool
|
||||
start T
|
||||
highest T
|
||||
cycles ET
|
||||
initialized bool
|
||||
start T
|
||||
highest T
|
||||
cycles ET
|
||||
extendedHighest ET
|
||||
}
|
||||
|
||||
func NewWrapAround[T number, ET extendedNumber]() *WrapAround[T, ET] {
|
||||
@@ -47,6 +48,7 @@ func (w *WrapAround[T, ET]) Seed(from *WrapAround[T, ET]) {
|
||||
w.start = from.start
|
||||
w.highest = from.highest
|
||||
w.cycles = from.cycles
|
||||
w.updateExtendedHighest()
|
||||
}
|
||||
|
||||
type WrapAroundUpdateResult[ET extendedNumber] struct {
|
||||
@@ -63,15 +65,16 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
|
||||
|
||||
w.start = val
|
||||
w.highest = val
|
||||
w.updateExtendedHighest()
|
||||
w.initialized = true
|
||||
return
|
||||
}
|
||||
|
||||
result.PreExtendedHighest = w.GetExtendedHighest()
|
||||
result.PreExtendedHighest = w.extendedHighest
|
||||
|
||||
gap := val - w.highest
|
||||
if gap == 0 || gap > T(w.fullRange>>1) {
|
||||
// duplicate OR out-of-order
|
||||
if gap > T(w.fullRange>>1) {
|
||||
// out-of-order
|
||||
result.IsRestart, result.PreExtendedStart, result.ExtendedVal = w.maybeAdjustStart(val)
|
||||
return
|
||||
}
|
||||
@@ -82,13 +85,15 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
|
||||
}
|
||||
w.highest = val
|
||||
|
||||
result.ExtendedVal = w.getExtendedHighest(w.cycles, val)
|
||||
w.updateExtendedHighest()
|
||||
result.ExtendedVal = w.extendedHighest
|
||||
return
|
||||
}
|
||||
|
||||
func (w *WrapAround[T, ET]) RollbackRestart(ev ET) {
|
||||
if w.isWrapBack(w.start, T(ev)) {
|
||||
w.cycles -= w.fullRange
|
||||
w.updateExtendedHighest()
|
||||
}
|
||||
w.start = T(ev)
|
||||
}
|
||||
@@ -96,6 +101,7 @@ func (w *WrapAround[T, ET]) RollbackRestart(ev ET) {
|
||||
func (w *WrapAround[T, ET]) ResetHighest(ev ET) {
|
||||
w.highest = T(ev)
|
||||
w.cycles = ev & ^(w.fullRange - 1)
|
||||
w.updateExtendedHighest()
|
||||
}
|
||||
|
||||
func (w *WrapAround[T, ET]) GetStart() T {
|
||||
@@ -111,7 +117,11 @@ func (w *WrapAround[T, ET]) GetHighest() T {
|
||||
}
|
||||
|
||||
func (w *WrapAround[T, ET]) GetExtendedHighest() ET {
|
||||
return w.getExtendedHighest(w.cycles, w.highest)
|
||||
return w.extendedHighest
|
||||
}
|
||||
|
||||
func (w *WrapAround[T, ET]) updateExtendedHighest() {
|
||||
w.extendedHighest = getExtendedHighest(w.cycles, w.highest)
|
||||
}
|
||||
|
||||
func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtendedStart ET, extendedVal ET) {
|
||||
@@ -125,7 +135,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended
|
||||
if w.isWrapBack(val, w.highest) {
|
||||
cycles -= w.fullRange
|
||||
}
|
||||
extendedVal = w.getExtendedHighest(cycles, val)
|
||||
extendedVal = getExtendedHighest(cycles, val)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -136,6 +146,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended
|
||||
|
||||
if w.isWrapBack(val, w.highest) {
|
||||
w.cycles = w.fullRange
|
||||
w.updateExtendedHighest()
|
||||
cycles = 0
|
||||
}
|
||||
w.start = val
|
||||
@@ -144,7 +155,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended
|
||||
cycles -= w.fullRange
|
||||
}
|
||||
}
|
||||
extendedVal = w.getExtendedHighest(cycles, val)
|
||||
extendedVal = getExtendedHighest(cycles, val)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -152,6 +163,8 @@ func (w *WrapAround[T, ET]) isWrapBack(earlier T, later T) bool {
|
||||
return ET(later) < (w.fullRange>>1) && ET(earlier) >= (w.fullRange>>1)
|
||||
}
|
||||
|
||||
func (w *WrapAround[T, ET]) getExtendedHighest(cycles ET, val T) ET {
|
||||
// ------------------------------------
|
||||
|
||||
func getExtendedHighest[T number, ET extendedNumber](cycles ET, val T) ET {
|
||||
return cycles + ET(val)
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
"github.com/livekit/livekit-server/pkg/testutils"
|
||||
testclient "github.com/livekit/livekit-server/test/client"
|
||||
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
|
||||
"github.com/livekit/protocol/auth"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
@@ -178,7 +179,7 @@ func createMultiNodeServer(nodeID string, port uint32) *service.LivekitServer {
|
||||
panic(fmt.Sprintf("could not create config: %v", err))
|
||||
}
|
||||
conf.Port = port
|
||||
conf.RTC.UDPPort = port + 1
|
||||
conf.RTC.UDPPort = rtcconfig.PortRange{Start: int(port) + 1}
|
||||
conf.RTC.TCPPort = port + 2
|
||||
conf.Redis.Address = "localhost:6379"
|
||||
conf.Keys = map[string]string{testApiKey: testApiSecret}
|
||||
|
||||
Reference in New Issue
Block a user