diff --git a/go.mod b/go.mod index b2d5ba870..0cefc83e6 100644 --- a/go.mod +++ b/go.mod @@ -26,14 +26,14 @@ require ( github.com/mitchellh/go-homedir v1.1.0 github.com/olekukonko/tablewriter v0.0.5 github.com/pion/dtls/v2 v2.2.7 - github.com/pion/ice/v2 v2.3.9 + github.com/pion/ice/v2 v2.3.10 github.com/pion/interceptor v0.1.17 github.com/pion/rtcp v1.2.10 - github.com/pion/rtp v1.8.0 + github.com/pion/rtp v1.8.1 github.com/pion/sdp/v3 v3.0.6 github.com/pion/transport/v2 v2.2.1 - github.com/pion/turn/v2 v2.1.2 - github.com/pion/webrtc/v3 v3.2.13 + github.com/pion/turn/v2 v2.1.3 + github.com/pion/webrtc/v3 v3.2.14 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 github.com/redis/go-redis/v9 v9.0.5 @@ -45,7 +45,7 @@ require ( github.com/urfave/cli/v2 v2.25.7 github.com/urfave/negroni/v3 v3.0.0 go.uber.org/atomic v1.11.0 - golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b + golang.org/x/exp v0.0.0-20230807204917-050eac23e9de golang.org/x/sync v0.3.0 google.golang.org/protobuf v1.31.0 gopkg.in/yaml.v3 v3.0.1 @@ -83,7 +83,7 @@ require ( github.com/pion/mdns v0.0.7 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/sctp v1.8.7 // indirect - github.com/pion/srtp/v2 v2.0.15 // indirect + github.com/pion/srtp/v2 v2.0.16 // indirect github.com/pion/stun v0.6.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect @@ -94,11 +94,11 @@ require ( github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/crypto v0.10.0 // indirect + golang.org/x/crypto v0.11.0 // indirect golang.org/x/mod v0.11.0 // indirect - golang.org/x/net v0.11.0 // indirect - golang.org/x/sys v0.9.0 // indirect - golang.org/x/text v0.10.0 // indirect + golang.org/x/net v0.13.0 // indirect + golang.org/x/sys v0.10.0 // indirect + golang.org/x/text v0.11.0 // indirect golang.org/x/tools v0.9.3 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect google.golang.org/grpc v1.57.0 // indirect diff --git a/go.sum b/go.sum index 5dfd6e6be..3988853e9 100644 --- a/go.sum +++ b/go.sum @@ -184,8 +184,9 @@ github.com/pion/datachannel v1.5.5 h1:10ef4kwdjije+M9d7Xm9im2Y3O6A6ccQb0zcqZcJew github.com/pion/datachannel v1.5.5/go.mod h1:iMz+lECmfdCMqFRhXhcA/219B0SQlbpoR2V118yimL0= github.com/pion/dtls/v2 v2.2.7 h1:cSUBsETxepsCSFSxC3mc/aDo14qQLMSL+O6IjG28yV8= github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= -github.com/pion/ice/v2 v2.3.9 h1:7yZpHf3PhPxJGT4JkMj1Y8Rl5cQ6fB709iz99aeMd/U= github.com/pion/ice/v2 v2.3.9/go.mod h1:lT3kv5uUIlHfXHU/ZRD7uKD/ufM202+eTa3C/umgGf4= +github.com/pion/ice/v2 v2.3.10 h1:T3bUJKqh7pGEdMyTngUcTeQd6io9X8JjgsVWZDannnY= +github.com/pion/ice/v2 v2.3.10/go.mod h1:hHGCibDfmXGqukayQw979xEctASp2Pe5Oe0iDU8pRus= github.com/pion/interceptor v0.1.17 h1:prJtgwFh/gB8zMqGZoOgJPHivOwVAp61i2aG61Du/1w= github.com/pion/interceptor v0.1.17/go.mod h1:SY8kpmfVBvrbUzvj2bsXz7OJt5JvmVNZ+4Kjq7FcwrI= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= @@ -197,15 +198,16 @@ github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TB github.com/pion/rtcp v1.2.10 h1:nkr3uj+8Sp97zyItdN60tE/S6vk4al5CPRR6Gejsdjc= github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I= github.com/pion/rtp v1.7.13/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= -github.com/pion/rtp v1.8.0 h1:SYD7040IR+NqrGBOc2GDU5iDjAR+0m5rnX/EWCUMNhw= github.com/pion/rtp v1.8.0/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= +github.com/pion/rtp v1.8.1 h1:26OxTc6lKg/qLSGir5agLyj0QKaOv8OP5wps2SFnVNQ= +github.com/pion/rtp v1.8.1/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= github.com/pion/sctp v1.8.5/go.mod h1:SUFFfDpViyKejTAdwD1d/HQsCu+V/40cCs2nZIvC3s0= github.com/pion/sctp v1.8.7 h1:JnABvFakZueGAn4KU/4PSKg+GWbF6QWbKTWZOSGJjXw= github.com/pion/sctp v1.8.7/go.mod h1:g1Ul+ARqZq5JEmoFy87Q/4CePtKnTJ1QCL9dBBdN6AU= github.com/pion/sdp/v3 v3.0.6 h1:WuDLhtuFUUVpTfus9ILC4HRyHsW6TdugjEX/QY9OiUw= github.com/pion/sdp/v3 v3.0.6/go.mod h1:iiFWFpQO8Fy3S5ldclBkpXqmWy02ns78NOKoLLL0YQw= -github.com/pion/srtp/v2 v2.0.15 h1:+tqRtXGsGwHC0G0IUIAzRmdkHvriF79IHVfZGfHrQoA= -github.com/pion/srtp/v2 v2.0.15/go.mod h1:b/pQOlDrbB0HEH5EUAQXzSYxikFbNcNuKmF8tM0hCtw= +github.com/pion/srtp/v2 v2.0.16 h1:impT2XBrHKsDpXr1x5hHIRydwssrSWKpmw3KvSfXbso= +github.com/pion/srtp/v2 v2.0.16/go.mod h1:NCLCV+U+NpxQ+vXhfOETet4OgKioIgrFjZmIM3ldJYE= github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4= github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8= github.com/pion/transport v0.14.1 h1:XSM6olwW+o8J4SCmOBb/BpwZypkHeyM0PGFCxNQBr40= @@ -215,10 +217,11 @@ github.com/pion/transport/v2 v2.1.0/go.mod h1:AdSw4YBZVDkZm8fpoz+fclXyQwANWmZAlD github.com/pion/transport/v2 v2.2.0/go.mod h1:AdSw4YBZVDkZm8fpoz+fclXyQwANWmZAlDuQdctTThQ= github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N7c= github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g= -github.com/pion/turn/v2 v2.1.2 h1:wj0cAoGKltaZ790XEGW9HwoUewqjliwmhtxCuB2ApyM= github.com/pion/turn/v2 v2.1.2/go.mod h1:1kjnPkBcex3dhCU2Am+AAmxDcGhLX3WnMfmkNpvSTQU= -github.com/pion/webrtc/v3 v3.2.13 h1://ltbnahZewBWHvQYunlyLVWrHrsoyxYDfi3Ux6V4Gk= -github.com/pion/webrtc/v3 v3.2.13/go.mod h1:KS57v8u+fNMYAVM6gNsceIHtciyHlnfPNXU/7klJMFU= +github.com/pion/turn/v2 v2.1.3 h1:pYxTVWG2gpC97opdRc5IGsQ1lJ9O/IlNhkzj7MMrGAA= +github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= +github.com/pion/webrtc/v3 v3.2.14 h1:GlqnBnnLlcYYA/LOwqLLU1plZYwx0Y/e/57bZ2tzQcU= +github.com/pion/webrtc/v3 v3.2.14/go.mod h1:r1mtixc2MH847mmQTPwlEvGge7D18C2T5qp8jI9Lm44= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -287,10 +290,11 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= -golang.org/x/crypto v0.10.0 h1:LKqV2xt9+kDzSTfOhx4FrkEBcMrAgHSYgzywV9zcGmM= golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= -golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b h1:r+vk0EmXNmekl0S0BascoeeoHk/L7wmaW2QF90K+kYI= -golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= +golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= +golang.org/x/exp v0.0.0-20230807204917-050eac23e9de h1:l5Za6utMv/HsBWWqzt4S8X17j+kt1uVETUX5UFhn2rE= +golang.org/x/exp v0.0.0-20230807204917-050eac23e9de/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= @@ -322,8 +326,10 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.11.0 h1:Gi2tvZIJyBtO9SDr1q9h5hEQCp/4L2RQ+ar0qjx2oNU= golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= +golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +golang.org/x/net v0.13.0 h1:Nvo8UFsZ8X3BhAC9699Z1j7XQ3rsZnUUm7jfBEk1ueY= +golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -372,8 +378,9 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -383,6 +390,7 @@ golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo= +golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -392,8 +400,9 @@ golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.10.0 h1:UpjohKhiEgNc0CSauXmwYftY1+LlaC75SJwh0SgCX58= golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= +golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= diff --git a/pkg/service/errors.go b/pkg/service/errors.go index a1473c3a2..92093d682 100644 --- a/pkg/service/errors.go +++ b/pkg/service/errors.go @@ -24,6 +24,7 @@ var ( ErrIdentityEmpty = psrpc.NewErrorf(psrpc.InvalidArgument, "identity cannot be empty") ErrIngressNotConnected = psrpc.NewErrorf(psrpc.Internal, "ingress not connected (redis required)") ErrIngressNotFound = psrpc.NewErrorf(psrpc.NotFound, "ingress does not exist") + ErrIngressNonReusable = psrpc.NewErrorf(psrpc.InvalidArgument, "ingress is not reusable and cannot be modified") ErrMetadataExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "metadata size exceeds limits") ErrOperationFailed = psrpc.NewErrorf(psrpc.Internal, "operation cannot be completed") ErrParticipantNotFound = psrpc.NewErrorf(psrpc.NotFound, "participant does not exist") diff --git a/pkg/service/ingress.go b/pkg/service/ingress.go index 45c0bb789..73de7663c 100644 --- a/pkg/service/ingress.go +++ b/pkg/service/ingress.go @@ -16,6 +16,8 @@ package service import ( "context" + "fmt" + "net/url" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/telemetry" @@ -27,6 +29,10 @@ import ( "github.com/livekit/psrpc" ) +type IngressLauncher interface { + LaunchPullIngress(ctx context.Context, info *livekit.IngressInfo) (*livekit.IngressInfo, error) +} + type IngressService struct { conf *config.IngressConfig nodeID livekit.NodeID @@ -35,6 +41,30 @@ type IngressService struct { store IngressStore roomService livekit.RoomService telemetry telemetry.TelemetryService + launcher IngressLauncher +} + +func NewIngressServiceWithIngressLauncher( + conf *config.IngressConfig, + nodeID livekit.NodeID, + bus psrpc.MessageBus, + psrpcClient rpc.IngressClient, + store IngressStore, + rs livekit.RoomService, + ts telemetry.TelemetryService, + launcher IngressLauncher, +) *IngressService { + + return &IngressService{ + conf: conf, + nodeID: nodeID, + bus: bus, + psrpcClient: psrpcClient, + store: store, + roomService: rs, + telemetry: ts, + launcher: launcher, + } } func NewIngressService( @@ -46,16 +76,11 @@ func NewIngressService( rs livekit.RoomService, ts telemetry.TelemetryService, ) *IngressService { + s := NewIngressServiceWithIngressLauncher(conf, nodeID, bus, psrpcClient, store, rs, ts, nil) - return &IngressService{ - conf: conf, - nodeID: nodeID, - bus: bus, - psrpcClient: psrpcClient, - store: store, - roomService: rs, - telemetry: ts, - } + s.launcher = s + + return s } func (s *IngressService) CreateIngress(ctx context.Context, req *livekit.CreateIngressRequest) (*livekit.IngressInfo, error) { @@ -70,17 +95,18 @@ func (s *IngressService) CreateIngress(ctx context.Context, req *livekit.CreateI AppendLogFields(ctx, fields...) }() - var urlPrefix string + var url string switch req.InputType { case livekit.IngressInput_RTMP_INPUT: - urlPrefix = s.conf.RTMPBaseURL + url = s.conf.RTMPBaseURL case livekit.IngressInput_WHIP_INPUT: - urlPrefix = s.conf.WHIPBaseURL + url = s.conf.WHIPBaseURL + case livekit.IngressInput_URL_INPUT: default: return nil, ingress.ErrInvalidIngressType } - ig, err := s.CreateIngressWithUrlPrefix(ctx, urlPrefix, req) + ig, err := s.CreateIngressWithUrl(ctx, url, req) if err != nil { return nil, err } @@ -89,7 +115,7 @@ func (s *IngressService) CreateIngress(ctx context.Context, req *livekit.CreateI return ig, nil } -func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPrefix string, req *livekit.CreateIngressRequest) (*livekit.IngressInfo, error) { +func (s *IngressService) CreateIngressWithUrl(ctx context.Context, urlStr string, req *livekit.CreateIngressRequest) (*livekit.IngressInfo, error) { err := EnsureIngressAdminPermission(ctx) if err != nil { return nil, twirpAuthError(err) @@ -98,13 +124,28 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref return nil, ErrIngressNotConnected } + if req.InputType == livekit.IngressInput_URL_INPUT { + if req.Url == "" { + return nil, ingress.ErrInvalidIngress("missing URL parameter") + } + urlObj, err := url.Parse(req.Url) + if err != nil { + return nil, psrpc.NewError(psrpc.InvalidArgument, err) + } + if urlObj.Scheme != "http" && urlObj.Scheme != "https" { + return nil, ingress.ErrInvalidIngress(fmt.Sprintf("invalid url scheme %s", urlObj.Scheme)) + } + // Marshall the URL again for sanitization + urlStr = urlObj.String() + } + sk := utils.NewGuid("") info := &livekit.IngressInfo{ IngressId: utils.NewGuid(utils.IngressPrefix), Name: req.Name, StreamKey: sk, - Url: urlPrefix, + Url: urlStr, InputType: req.InputType, Audio: req.Audio, Video: req.Video, @@ -112,14 +153,41 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref RoomName: req.RoomName, ParticipantIdentity: req.ParticipantIdentity, ParticipantName: req.ParticipantName, - Reusable: req.InputType == livekit.IngressInput_RTMP_INPUT, State: &livekit.IngressState{}, } + switch req.InputType { + case livekit.IngressInput_RTMP_INPUT, + livekit.IngressInput_WHIP_INPUT: + info.Reusable = true + if err := ingress.ValidateForSerialization(info); err != nil { + return nil, err + } + case livekit.IngressInput_URL_INPUT: + if err := ingress.Validate(info); err != nil { + return nil, err + } + default: + return nil, ingress.ErrInvalidIngressType + } + if err := ingress.ValidateForSerialization(info); err != nil { return nil, err } + if req.InputType == livekit.IngressInput_URL_INPUT { + retInfo, err := s.launcher.LaunchPullIngress(ctx, info) + if retInfo != nil { + info = retInfo + } else { + info.State.Status = livekit.IngressState_ENDPOINT_ERROR + info.State.Error = err.Error() + } + if err != nil { + return info, err + } + } + if err = s.store.StoreIngress(ctx, info); err != nil { logger.Errorw("could not write ingress info", err) return nil, err @@ -129,6 +197,14 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref return info, nil } +func (s *IngressService) LaunchPullIngress(ctx context.Context, info *livekit.IngressInfo) (*livekit.IngressInfo, error) { + req := &rpc.StartIngressRequest{ + Info: info, + } + + return s.psrpcClient.StartIngress(ctx, req) +} + func updateInfoUsingRequest(req *livekit.UpdateIngressRequest, info *livekit.IngressInfo) error { if req.Name != "" { info.Name = req.Name @@ -183,6 +259,11 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI return nil, err } + if !info.Reusable { + logger.Infow("ingress update attempted on non reusable ingress", "ingressID", info.IngressId) + return info, ErrIngressNonReusable + } + switch info.State.Status { case livekit.IngressState_ENDPOINT_ERROR: info.State.Status = livekit.IngressState_ENDPOINT_INACTIVE diff --git a/pkg/sfu/dependencydescriptor/dependencydescriptorextension.go b/pkg/sfu/dependencydescriptor/dependencydescriptorextension.go index 3e7ed2510..293482396 100644 --- a/pkg/sfu/dependencydescriptor/dependencydescriptorextension.go +++ b/pkg/sfu/dependencydescriptor/dependencydescriptorextension.go @@ -97,8 +97,15 @@ func (d *DependencyDescriptor) MarshalSizeWithActiveChains(activeChains uint32) } func (d *DependencyDescriptor) String() string { - return fmt.Sprintf("DependencyDescriptor{FirstPacketInFrame: %v, LastPacketInFrame: %v, FrameNumber: %v, FrameDependencies: %+v, Resolution: %+v, ActiveDecodeTargetsBitmask: %v, AttachedStructure: %v}", - d.FirstPacketInFrame, d.LastPacketInFrame, d.FrameNumber, *d.FrameDependencies, *d.Resolution, formatBitmask(d.ActiveDecodeTargetsBitmask), d.AttachedStructure) + resolution, dependencies := "-", "-" + if d.Resolution != nil { + resolution = fmt.Sprintf("%+v", *d.Resolution) + } + if d.FrameDependencies != nil { + dependencies = fmt.Sprintf("%+v", *d.FrameDependencies) + } + return fmt.Sprintf("DependencyDescriptor{FirstPacketInFrame: %v, LastPacketInFrame: %v, FrameNumber: %v, FrameDependencies: %s, Resolution: %s, ActiveDecodeTargetsBitmask: %v, AttachedStructure: %v}", + d.FirstPacketInFrame, d.LastPacketInFrame, d.FrameNumber, dependencies, resolution, formatBitmask(d.ActiveDecodeTargetsBitmask), d.AttachedStructure) } // ------------------------------------------------------------------------------ diff --git a/pkg/sfu/streamallocator/trenddetector.go b/pkg/sfu/streamallocator/trenddetector.go index 94609c091..360cf595d 100644 --- a/pkg/sfu/streamallocator/trenddetector.go +++ b/pkg/sfu/streamallocator/trenddetector.go @@ -51,6 +51,23 @@ type trendDetectorSample struct { at time.Time } +func trendDetectorSampleListToString(samples []trendDetectorSample) string { + samplesStr := "" + if len(samples) > 0 { + firstTime := samples[0].at + samplesStr += "[" + for i, sample := range samples { + suffix := ", " + if i == len(samples)-1 { + suffix = "" + } + samplesStr += fmt.Sprintf("%d(%d)%s", sample.value, sample.at.Sub(firstTime).Milliseconds(), suffix) + } + samplesStr += "]" + } + return samplesStr +} + // ------------------------------------------------ type TrendDetectorParams struct { @@ -147,23 +164,10 @@ func (t *TrendDetector) HasEnoughSamples() bool { func (t *TrendDetector) ToString() string { now := time.Now() elapsed := now.Sub(t.startTime).Seconds() - samplesStr := "" - if len(t.samples) > 0 { - firstTime := t.samples[0].at - samplesStr += "[" - for i, sample := range t.samples { - suffix := ", " - if i == len(t.samples)-1 { - suffix = "" - } - samplesStr += fmt.Sprintf("%d(%d)%s", sample.value, sample.at.Sub(firstTime).Milliseconds(), suffix) - } - samplesStr += "]" - } return fmt.Sprintf("n: %s, t: %+v|%+v|%.2fs, v: %d|%d|%d|%s|%.2f", t.params.Name, t.startTime.Format(time.UnixDate), now.Format(time.UnixDate), elapsed, - t.numSamples, t.lowestValue, t.highestValue, samplesStr, kendallsTau(t.samples), + t.numSamples, t.lowestValue, t.highestValue, trendDetectorSampleListToString(t.samples), kendallsTau(t.samples), ) }