diff --git a/.github/workflows/buildtest.yaml b/.github/workflows/buildtest.yaml index b21641e86..8f8b59f04 100644 --- a/.github/workflows/buildtest.yaml +++ b/.github/workflows/buildtest.yaml @@ -43,9 +43,11 @@ jobs: args: build - name: Static Check - uses: dominikh/staticcheck-action@v1 + uses: dominikh/staticcheck-action@v1.2.0 with: - version: "2022.1" + min-go-version: 1.18 + version: 2022.1.3 + install-go: false - name: Test run: | diff --git a/go.mod b/go.mod index 9330904b8..9839310ee 100644 --- a/go.mod +++ b/go.mod @@ -16,9 +16,10 @@ require ( github.com/hashicorp/go-version v1.6.0 github.com/hashicorp/golang-lru v0.6.0 github.com/jxskiss/base62 v1.1.0 - github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc + github.com/livekit/mageutil v0.0.0-20221221221243-f361fbe40290 github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b github.com/livekit/protocol v1.3.1-0.20221219041553-fc943512b0fb + github.com/livekit/psrpc v0.1.0 github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 github.com/mackerelio/go-osstat v0.2.3 github.com/magefile/mage v1.14.0 @@ -67,11 +68,17 @@ require ( github.com/google/subcommands v1.2.0 // indirect github.com/google/uuid v1.3.0 // indirect github.com/josharian/native v1.0.0 // indirect + github.com/klauspost/compress v1.15.13 // indirect github.com/lithammer/shortuuid/v3 v3.0.7 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mdlayher/netlink v1.6.0 // indirect github.com/mdlayher/socket v0.1.1 // indirect + github.com/minio/highwayhash v1.0.2 // indirect + github.com/nats-io/jwt/v2 v2.3.0 // indirect + github.com/nats-io/nats.go v1.21.0 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/pion/datachannel v1.5.5 // indirect github.com/pion/dtls/v2 v2.1.5 // indirect github.com/pion/mdns v0.0.5 // indirect diff --git a/go.sum b/go.sum index 28381fb11..12acc9cae 100644 --- a/go.sum +++ b/go.sum @@ -211,6 +211,8 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.15.13 h1:NFn1Wr8cfnenSJSA46lLq4wHCcBzKTSjnBIexDMMOV0= +github.com/klauspost/compress v1.15.13/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -224,12 +226,14 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc h1:e3GIA9AL6h4a38MLHCwTTKzW/JCIGTtpwavPj82Tcfo= -github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= +github.com/livekit/mageutil v0.0.0-20221221221243-f361fbe40290 h1:ZVsQUuUOM9G7O3qfDSSUd1d+KlE5EVzHKylMkMkRhYg= +github.com/livekit/mageutil v0.0.0-20221221221243-f361fbe40290/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b h1:RBNV8TckETSkIkKxcD12d8nZKVkB9GSY/sQlMoaruP4= github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw= github.com/livekit/protocol v1.3.1-0.20221219041553-fc943512b0fb h1:Kt92FupD3q4GzAlabx7bxUZAz+MOPgBnsUnTDU3EuWo= github.com/livekit/protocol v1.3.1-0.20221219041553-fc943512b0fb/go.mod h1:lTX4zmLbKoeDWHjBJEzo5wYNcFb8MaJFlc5gUnFbHTc= +github.com/livekit/psrpc v0.1.0 h1:0G52IyHBficaCKIqFXEgWBD5Gxo88EdHGczGro0gwRM= +github.com/livekit/psrpc v0.1.0/go.mod h1:n3Dn9KLCeaKXoaPb8e0ALSVQwHZnJwO9LWP1qC33jss= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.3 h1:jAMXD5erlDE39kdX2CU7YwCGRcxIO33u/p8+Fhe5dJw= @@ -259,6 +263,8 @@ github.com/mdlayher/netlink v1.6.0/go.mod h1:0o3PlBmGst1xve7wQ7j/hwpNaFaH4qCRyWC github.com/mdlayher/socket v0.0.0-20210307095302-262dc9984e00/go.mod h1:GAFlyu4/XV68LkQKYzKhIo/WW7j3Zi0YRAz/BOoanUc= github.com/mdlayher/socket v0.1.1 h1:q3uOGirUPfAV2MUoaC7BavjQ154J7+JOkTWyiV+intI= github.com/mdlayher/socket v0.1.1/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5Awbj+qDs= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -268,6 +274,15 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= +github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.9.8 h1:jgxZsv+A3Reb3MgwxaINcNq/za8xZInKhDg9Q0cGN1o= +github.com/nats-io/nats.go v1.21.0 h1:kQiWyQMMMIPjDR7NanrLhTnRUxWgU04yrzmYdq9JxCU= +github.com/nats-io/nats.go v1.21.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -418,6 +433,7 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= @@ -533,6 +549,7 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190411185658-b44545bcd369/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -611,6 +628,7 @@ golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/magefile.go b/magefile.go index 5d9198b48..e1b84da1b 100644 --- a/magefile.go +++ b/magefile.go @@ -15,6 +15,7 @@ import ( "github.com/livekit/livekit-server/version" "github.com/livekit/mageutil" + _ "github.com/livekit/psrpc" ) const ( @@ -144,6 +145,57 @@ func PublishDocker() error { return nil } +// regenerate psrpc service definitions +func Psrpc() error { + psrpcProtoFiles := []string{ + "pkg/service/rpc/egress.proto", + } + + fmt.Println("generating psrpc") + + protocolDir, err := mageutil.GetPkgDir("github.com/livekit/protocol") + if err != nil { + return err + } + + psrpcDir, err := mageutil.GetPkgDir("github.com/livekit/psrpc") + if err != nil { + return err + } + + protoc, err := mageutil.GetToolPath("protoc") + if err != nil { + return err + } + protocGoPath, err := mageutil.GetToolPath("protoc-gen-go") + if err != nil { + return err + } + psrpcPath, err := mageutil.GetToolPath("protoc-gen-psrpc") + if err != nil { + return err + } + + fmt.Println("generating psrpc protobuf") + args := append([]string{ + "--go_out", ".", + "--psrpc_out", ".", + "--go_opt=paths=source_relative", + "--psrpc_opt=paths=source_relative", + "--plugin=go=" + protocGoPath, + "--plugin=psrpc=" + psrpcPath, + "-I" + protocolDir, + "-I" + psrpcDir + "/protoc-gen-psrpc/options", + "-I=.", + }, psrpcProtoFiles...) + cmd := exec.Command(protoc, args...) + mageutil.ConnectStd(cmd) + if err := cmd.Run(); err != nil { + return err + } + return nil +} + // run unit tests, skipping integration func Test() error { mg.Deps(generateWire, setULimit) diff --git a/pkg/service/egress.go b/pkg/service/egress.go index e2a657a37..7a7bc8729 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -4,12 +4,15 @@ import ( "context" "encoding/json" "errors" + "fmt" "reflect" "time" + goversion "github.com/hashicorp/go-version" "google.golang.org/protobuf/proto" "github.com/livekit/livekit-server/pkg/rtc" + "github.com/livekit/livekit-server/pkg/service/rpc" "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -18,35 +21,44 @@ import ( ) type EgressService struct { - rpcClient egress.RPCClient - store ServiceStore - es EgressStore - roomService livekit.RoomService - telemetry telemetry.TelemetryService - launcher rtc.EgressLauncher - shutdown chan struct{} + psrpcClient rpc.EgressClient + clientDeprecated egress.RPCClient + store ServiceStore + es EgressStore + roomService livekit.RoomService + telemetry telemetry.TelemetryService + launcher rtc.EgressLauncher + shutdown chan struct{} } type egressLauncher struct { - rpcClient egress.RPCClient - es EgressStore - telemetry telemetry.TelemetryService + psrpcClient rpc.EgressClient + clientDeprecated egress.RPCClient + usePSRPC bool + es EgressStore + telemetry telemetry.TelemetryService } -func NewEgressLauncher(rpcClient egress.RPCClient, es EgressStore, ts telemetry.TelemetryService) rtc.EgressLauncher { - if rpcClient == nil { +func NewEgressLauncher( + psrpcClient rpc.EgressClient, + clientDeprecated egress.RPCClient, + es EgressStore, + ts telemetry.TelemetryService) rtc.EgressLauncher { + if psrpcClient == nil && clientDeprecated == nil { return nil } return &egressLauncher{ - rpcClient: rpcClient, - es: es, - telemetry: ts, + psrpcClient: psrpcClient, + clientDeprecated: clientDeprecated, + es: es, + telemetry: ts, } } func NewEgressService( - rpcClient egress.RPCClient, + psrpcClient rpc.EgressClient, + clientDeprecated egress.RPCClient, store ServiceStore, es EgressStore, rs livekit.RoomService, @@ -54,12 +66,13 @@ func NewEgressService( launcher rtc.EgressLauncher, ) *EgressService { return &EgressService{ - rpcClient: rpcClient, - store: store, - es: es, - roomService: rs, - telemetry: ts, - launcher: launcher, + psrpcClient: psrpcClient, + clientDeprecated: clientDeprecated, + store: store, + es: es, + roomService: rs, + telemetry: ts, + launcher: launcher, } } @@ -69,7 +82,7 @@ func (s *EgressService) Start() error { } s.shutdown = make(chan struct{}) - if s.rpcClient != nil && s.es != nil { + if s.psrpcClient != nil && s.es != nil { return s.startWorker() } @@ -169,7 +182,19 @@ func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomNa } func (s *egressLauncher) StartEgress(ctx context.Context, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) { - info, err := s.rpcClient.SendRequest(ctx, req) + var info *livekit.EgressInfo + var err error + + if !s.usePSRPC { + s.usePSRPC = usePSRPC(s.es) + } + + if s.usePSRPC { + info, err = s.psrpcClient.StartEgress(ctx, req) + } else { + logger.Warnw("Using deprecated egress client. Please upgrade egress to v >=1.5.4", nil) + info, err = s.clientDeprecated.SendRequest(ctx, req) + } if err != nil { return nil, err } @@ -193,7 +218,7 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.rpcClient == nil { + if s.psrpcClient == nil { return nil, ErrEgressNotConnected } @@ -228,16 +253,24 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.rpcClient == nil { + + if s.psrpcClient == nil { return nil, ErrEgressNotConnected } - info, err := s.rpcClient.SendRequest(ctx, &livekit.EgressRequest{ - EgressId: req.EgressId, - Request: &livekit.EgressRequest_UpdateStream{ - UpdateStream: req, - }, - }) + f0 := func() (*livekit.EgressInfo, error) { + return s.clientDeprecated.SendRequest(ctx, &livekit.EgressRequest{ + EgressId: req.EgressId, + Request: &livekit.EgressRequest_UpdateStream{ + UpdateStream: req, + }, + }) + } + f1 := func() (*livekit.EgressInfo, error) { + return s.psrpcClient.UpdateStream(ctx, req.EgressId, req) + } + + info, err := s.getFirst(f0, f1) if err != nil { return nil, err } @@ -258,7 +291,7 @@ func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressR if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.rpcClient == nil { + if s.psrpcClient == nil { return nil, ErrEgressNotConnected } @@ -275,16 +308,34 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.rpcClient == nil { + + if s.psrpcClient == nil { return nil, ErrEgressNotConnected } - info, err := s.rpcClient.SendRequest(ctx, &livekit.EgressRequest{ - EgressId: req.EgressId, - Request: &livekit.EgressRequest_Stop{ - Stop: req, - }, - }) + info, err := s.es.LoadEgress(ctx, req.EgressId) + if err != nil { + return nil, err + } else { + if info.Status != livekit.EgressStatus_EGRESS_STARTING && + info.Status != livekit.EgressStatus_EGRESS_ACTIVE { + return nil, fmt.Errorf("egress with status %s cannot be stopped", info.Status.String()) + } + } + + f0 := func() (*livekit.EgressInfo, error) { + return s.clientDeprecated.SendRequest(ctx, &livekit.EgressRequest{ + EgressId: req.EgressId, + Request: &livekit.EgressRequest_Stop{ + Stop: req, + }, + }) + } + f1 := func() (*livekit.EgressInfo, error) { + return s.psrpcClient.StopEgress(ctx, req.EgressId, req) + } + + info, err = s.getFirst(f0, f1) if err != nil { return nil, err } @@ -300,61 +351,22 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR func (s *EgressService) startWorker() error { rs := s.es.(*RedisStore) - if err := rs.Start(); err != nil { + err := rs.Start() + if err != nil { logger.Errorw("failed to start redis egress worker", err) return err } - sub, err := s.rpcClient.GetUpdateChannel(context.Background()) - if err != nil { - logger.Errorw("failed to subscribe to results channel", err) - return err - } - go func() { - resChan := sub.Channel() + sub, err := s.psrpcClient.SubscribeInfoUpdate(context.Background()) + if err != nil { + logger.Errorw("failed to subscribe", err) + } + for { select { - case msg := <-resChan: - b := sub.Payload(msg) - - res := &livekit.EgressInfo{} - if err = proto.Unmarshal(b, res); err != nil { - logger.Errorw("failed to read results", err) - continue - } - - switch res.Status { - case livekit.EgressStatus_EGRESS_COMPLETE, - livekit.EgressStatus_EGRESS_FAILED, - livekit.EgressStatus_EGRESS_ABORTED: - - // make sure endedAt is set so it eventually gets deleted - if res.EndedAt == 0 { - res.EndedAt = time.Now().UnixNano() - } - - err = s.es.UpdateEgress(context.Background(), res) - if err != nil { - logger.Errorw("could not update egress", err) - } - - // log results - if res.Error != "" { - logger.Errorw("egress failed", errors.New(res.Error), "egressID", res.EgressId) - } else { - logger.Infow("egress ended", "egressID", res.EgressId) - } - - s.telemetry.EgressEnded(context.Background(), res) - - default: - err = s.es.UpdateEgress(context.Background(), res) - if err != nil { - logger.Errorw("could not update egress", err) - } - } - + case info := <-sub.Channel(): + s.handleUpdate(info) case <-s.shutdown: _ = sub.Close() rs.Stop() @@ -363,5 +375,114 @@ func (s *EgressService) startWorker() error { } }() + if s.clientDeprecated != nil { + go func() { + sub, err := s.clientDeprecated.GetUpdateChannel(context.Background()) + if err != nil { + logger.Errorw("failed to subscribe to results channel", err) + } + + resChan := sub.Channel() + for { + select { + case msg := <-resChan: + b := sub.Payload(msg) + info := &livekit.EgressInfo{} + if err = proto.Unmarshal(b, info); err != nil { + logger.Errorw("failed to read results", err) + continue + } + s.handleUpdate(info) + case <-s.shutdown: + _ = sub.Close() + rs.Stop() + return + } + } + }() + } + return nil } + +func (s *EgressService) handleUpdate(info *livekit.EgressInfo) { + switch info.Status { + case livekit.EgressStatus_EGRESS_COMPLETE, + livekit.EgressStatus_EGRESS_FAILED, + livekit.EgressStatus_EGRESS_ABORTED: + + // make sure endedAt is set so it eventually gets deleted + if info.EndedAt == 0 { + info.EndedAt = time.Now().UnixNano() + } + + if err := s.es.UpdateEgress(context.Background(), info); err != nil { + logger.Errorw("could not update egress", err) + } + + // log results + if info.Error != "" { + logger.Errorw("egress failed", errors.New(info.Error), "egressID", info.EgressId) + } else { + logger.Infow("egress ended", "egressID", info.EgressId) + } + + s.telemetry.EgressEnded(context.Background(), info) + + default: + if err := s.es.UpdateEgress(context.Background(), info); err != nil { + logger.Errorw("could not update egress", err) + } + } +} + +// TODO: remove in future version +func (s *EgressService) getFirst(f0, f1 func() (*livekit.EgressInfo, error)) (*livekit.EgressInfo, error) { + if s.clientDeprecated == nil { + return f1() + } + + type res struct { + info *livekit.EgressInfo + err error + } + v0 := make(chan *res, 1) + v1 := make(chan *res, 1) + + go func() { + info, err := f0() + v0 <- &res{ + info: info, + err: err, + } + }() + + go func() { + info, err := f1() + v1 <- &res{ + info: info, + err: err, + } + }() + + select { + case r := <-v0: + return r.info, r.err + case r := <-v1: + return r.info, r.err + } +} + +var minVersion *goversion.Version + +func usePSRPC(es EgressStore) bool { + if minVersion == nil { + minVersion, _ = goversion.NewVersion("1.5.4") + } + v, err := es.GetEgressVersion(context.Background()) + if err != nil { + return false + } + + return v.GreaterThanOrEqual(minVersion) +} diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index c2b9876e6..58ba57716 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -4,6 +4,8 @@ import ( "context" "time" + goversion "github.com/hashicorp/go-version" + "github.com/livekit/protocol/livekit" ) @@ -44,6 +46,7 @@ type EgressStore interface { LoadEgress(ctx context.Context, egressID string) (*livekit.EgressInfo, error) ListEgress(ctx context.Context, roomName livekit.RoomName) ([]*livekit.EgressInfo, error) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error + GetEgressVersion(ctx context.Context) (*goversion.Version, error) } //counterfeiter:generate . IngressStore diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index f7a2fbda4..21fc37671 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -20,7 +20,8 @@ import ( ) const ( - VersionKey = "livekit_version" + VersionKey = "livekit_version" + EgressVersionKey = "egress_version" // RoomsKey is hash of room_name => Room proto RoomsKey = "rooms" @@ -73,21 +74,17 @@ func (s *RedisStore) Start() error { } s.done = make(chan struct{}, 1) - current, err := s.rc.Get(s.ctx, VersionKey).Result() + + v, err := s.rc.Get(s.ctx, VersionKey).Result() if err != nil && err != redis.Nil { return err } - if current == "" { - current = "0.0.0" + if v == "" { + v = "0.0.0" } - - v, _ := goversion.NewVersion(current) - migrateEgress, _ := goversion.NewVersion("1.1.3") - if v.LessThan(migrateEgress) { - if _, err = s.MigrateEgressInfo(); err != nil { - return err - } - + existing, _ := goversion.NewVersion(v) + current, _ := goversion.NewVersion(version.Version) + if current.GreaterThan(existing) { if err = s.rc.Set(s.ctx, VersionKey, version.Version, 0).Err(); err != nil { return err } @@ -422,6 +419,17 @@ func (s *RedisStore) UpdateEgress(_ context.Context, info *livekit.EgressInfo) e return nil } +func (s *RedisStore) GetEgressVersion(_ context.Context) (*goversion.Version, error) { + egressVersion, err := s.rc.Get(s.ctx, EgressVersionKey).Result() + if err != nil && err != redis.Nil { + return nil, err + } + if egressVersion == "" { + egressVersion = "0.0.0" + } + return goversion.NewVersion(egressVersion) +} + // Deletes egress info 24h after the egress has ended func (s *RedisStore) egressWorker() { ticker := time.NewTicker(time.Minute * 30) @@ -440,7 +448,7 @@ func (s *RedisStore) egressWorker() { } } -func (s RedisStore) CleanEndedEgress() error { +func (s *RedisStore) CleanEndedEgress() error { values, err := s.rc.HGetAll(s.ctx, EndedEgressKey).Result() if err != nil && err != redis.Nil { return err @@ -797,52 +805,3 @@ func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) return nil } - -// Migration to LiveKit >= v1.1.3 -func (s *RedisStore) MigrateEgressInfo() (int, error) { - locked, err := s.rc.SetNX(s.ctx, "egress-migration", utils.NewGuid("LOCK"), time.Minute).Result() - if err != nil { - return 0, err - } else if !locked { - return 0, nil - } - - it := s.rc.Scan(s.ctx, 0, DeprecatedRoomEgressPrefix+"*", 0).Iterator() - migrated := 0 - for it.Next(s.ctx) { - migrated++ - key := it.Val() - egressIDs, err := s.rc.SMembers(s.ctx, key).Result() - if err != nil && err != redis.Nil { - return migrated, err - } - - for _, egressID := range egressIDs { - info, err := s.LoadEgress(s.ctx, egressID) - if err != nil { - return migrated, err - } - - var roomName string - switch req := info.Request.(type) { - case *livekit.EgressInfo_RoomComposite: - roomName = req.RoomComposite.RoomName - case *livekit.EgressInfo_TrackComposite: - roomName = req.TrackComposite.RoomName - case *livekit.EgressInfo_Track: - roomName = req.Track.RoomName - } - - tx := s.rc.TxPipeline() - tx.SAdd(s.ctx, RoomEgressPrefix+roomName, egressID) - tx.SRem(s.ctx, key, egressID) - if _, err = tx.Exec(s.ctx); err != nil { - return migrated, err - } - } - - s.rc.Del(s.ctx, key) - } - - return migrated, nil -} diff --git a/pkg/service/redisstore_test.go b/pkg/service/redisstore_test.go index b14cce2e2..17e8e5021 100644 --- a/pkg/service/redisstore_test.go +++ b/pkg/service/redisstore_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" - "google.golang.org/protobuf/proto" "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" @@ -149,7 +148,7 @@ func TestEgressStore(t *testing.T) { roomName := "egress-test" - // test migration + // store egress info info := &livekit.EgressInfo{ EgressId: utils.NewGuid(utils.EgressPrefix), RoomId: utils.NewGuid(utils.RoomPrefix), @@ -162,30 +161,7 @@ func TestEgressStore(t *testing.T) { }, }, } - - data, err := proto.Marshal(info) - require.NoError(t, err) - - // store egress info the old way - tx := rc.TxPipeline() - tx.HSet(ctx, service.EgressKey, info.EgressId, data) - tx.SAdd(ctx, service.DeprecatedRoomEgressPrefix+info.RoomId, info.EgressId) - _, err = tx.Exec(ctx) - require.NoError(t, err) - - // run migration - migrated, err := rs.MigrateEgressInfo() - require.NoError(t, err) - require.Equal(t, 1, migrated) - - // check that it was migrated - exists, err := rc.Exists(ctx, service.DeprecatedRoomEgressPrefix+info.RoomId).Result() - require.NoError(t, err) - require.Equal(t, int64(0), exists) - - exists, err = rc.Exists(ctx, service.RoomEgressPrefix+info.RoomName).Result() - require.NoError(t, err) - require.Equal(t, int64(1), exists) + require.NoError(t, rs.StoreEgress(ctx, info)) // load res, err := rs.LoadEgress(ctx, info.EgressId) diff --git a/pkg/service/rpc/egress.pb.go b/pkg/service/rpc/egress.pb.go new file mode 100644 index 000000000..880219869 --- /dev/null +++ b/pkg/service/rpc/egress.pb.go @@ -0,0 +1,295 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.21.5 +// source: pkg/service/rpc/egress.proto + +package rpc + +import ( + livekit "github.com/livekit/protocol/livekit" + _ "github.com/livekit/psrpc/protoc-gen-psrpc/options" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Empty struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *Empty) Reset() { + *x = Empty{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_service_rpc_egress_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Empty) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Empty) ProtoMessage() {} + +func (x *Empty) ProtoReflect() protoreflect.Message { + mi := &file_pkg_service_rpc_egress_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Empty.ProtoReflect.Descriptor instead. +func (*Empty) Descriptor() ([]byte, []int) { + return file_pkg_service_rpc_egress_proto_rawDescGZIP(), []int{0} +} + +type ListActiveEgressRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ListActiveEgressRequest) Reset() { + *x = ListActiveEgressRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_service_rpc_egress_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListActiveEgressRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListActiveEgressRequest) ProtoMessage() {} + +func (x *ListActiveEgressRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_service_rpc_egress_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListActiveEgressRequest.ProtoReflect.Descriptor instead. +func (*ListActiveEgressRequest) Descriptor() ([]byte, []int) { + return file_pkg_service_rpc_egress_proto_rawDescGZIP(), []int{1} +} + +type ListActiveEgressResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + EgressIds []string `protobuf:"bytes,1,rep,name=egress_ids,json=egressIds,proto3" json:"egress_ids,omitempty"` +} + +func (x *ListActiveEgressResponse) Reset() { + *x = ListActiveEgressResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_service_rpc_egress_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListActiveEgressResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListActiveEgressResponse) ProtoMessage() {} + +func (x *ListActiveEgressResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_service_rpc_egress_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListActiveEgressResponse.ProtoReflect.Descriptor instead. +func (*ListActiveEgressResponse) Descriptor() ([]byte, []int) { + return file_pkg_service_rpc_egress_proto_rawDescGZIP(), []int{2} +} + +func (x *ListActiveEgressResponse) GetEgressIds() []string { + if x != nil { + return x.EgressIds + } + return nil +} + +var File_pkg_service_rpc_egress_proto protoreflect.FileDescriptor + +var file_pkg_service_rpc_egress_proto_rawDesc = []byte{ + 0x0a, 0x1c, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70, + 0x63, 0x2f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, + 0x72, 0x70, 0x63, 0x1a, 0x0d, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x1a, 0x1a, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x72, 0x70, 0x63, 0x5f, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x14, + 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x19, 0x0a, + 0x17, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x45, 0x67, 0x72, 0x65, 0x73, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x39, 0x0a, 0x18, 0x4c, 0x69, 0x73, 0x74, + 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x69, + 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, + 0x49, 0x64, 0x73, 0x32, 0xb2, 0x01, 0x0a, 0x0e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, + 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x12, 0x47, 0x0a, 0x0b, 0x53, 0x74, 0x61, 0x72, 0x74, 0x45, + 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1b, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, + 0x53, 0x74, 0x61, 0x72, 0x74, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x45, 0x67, 0x72, + 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x06, 0xb2, 0x89, 0x01, 0x02, 0x20, 0x01, 0x12, + 0x57, 0x0a, 0x10, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x45, 0x67, 0x72, + 0x65, 0x73, 0x73, 0x12, 0x1c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, + 0x74, 0x69, 0x76, 0x65, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, + 0x76, 0x65, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x06, 0xb2, 0x89, 0x01, 0x02, 0x08, 0x01, 0x32, 0xd8, 0x01, 0x0a, 0x0d, 0x45, 0x67, 0x72, + 0x65, 0x73, 0x73, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x49, 0x0a, 0x0c, 0x55, 0x70, + 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1c, 0x2e, 0x6c, 0x69, 0x76, + 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, + 0x69, 0x74, 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x06, 0xb2, + 0x89, 0x01, 0x02, 0x18, 0x01, 0x12, 0x45, 0x0a, 0x0a, 0x53, 0x74, 0x6f, 0x70, 0x45, 0x67, 0x72, + 0x65, 0x73, 0x73, 0x12, 0x1a, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x53, 0x74, + 0x6f, 0x70, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, + 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x06, 0xb2, 0x89, 0x01, 0x02, 0x18, 0x01, 0x12, 0x35, 0x0a, 0x0a, + 0x49, 0x6e, 0x66, 0x6f, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x0a, 0x2e, 0x72, 0x70, 0x63, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, + 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x06, 0xb2, 0x89, 0x01, + 0x02, 0x10, 0x01, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, + 0x74, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70, + 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pkg_service_rpc_egress_proto_rawDescOnce sync.Once + file_pkg_service_rpc_egress_proto_rawDescData = file_pkg_service_rpc_egress_proto_rawDesc +) + +func file_pkg_service_rpc_egress_proto_rawDescGZIP() []byte { + file_pkg_service_rpc_egress_proto_rawDescOnce.Do(func() { + file_pkg_service_rpc_egress_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_service_rpc_egress_proto_rawDescData) + }) + return file_pkg_service_rpc_egress_proto_rawDescData +} + +var file_pkg_service_rpc_egress_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_pkg_service_rpc_egress_proto_goTypes = []interface{}{ + (*Empty)(nil), // 0: rpc.Empty + (*ListActiveEgressRequest)(nil), // 1: rpc.ListActiveEgressRequest + (*ListActiveEgressResponse)(nil), // 2: rpc.ListActiveEgressResponse + (*livekit.StartEgressRequest)(nil), // 3: livekit.StartEgressRequest + (*livekit.UpdateStreamRequest)(nil), // 4: livekit.UpdateStreamRequest + (*livekit.StopEgressRequest)(nil), // 5: livekit.StopEgressRequest + (*livekit.EgressInfo)(nil), // 6: livekit.EgressInfo +} +var file_pkg_service_rpc_egress_proto_depIdxs = []int32{ + 3, // 0: rpc.EgressInternal.StartEgress:input_type -> livekit.StartEgressRequest + 1, // 1: rpc.EgressInternal.ListActiveEgress:input_type -> rpc.ListActiveEgressRequest + 4, // 2: rpc.EgressHandler.UpdateStream:input_type -> livekit.UpdateStreamRequest + 5, // 3: rpc.EgressHandler.StopEgress:input_type -> livekit.StopEgressRequest + 0, // 4: rpc.EgressHandler.InfoUpdate:input_type -> rpc.Empty + 6, // 5: rpc.EgressInternal.StartEgress:output_type -> livekit.EgressInfo + 2, // 6: rpc.EgressInternal.ListActiveEgress:output_type -> rpc.ListActiveEgressResponse + 6, // 7: rpc.EgressHandler.UpdateStream:output_type -> livekit.EgressInfo + 6, // 8: rpc.EgressHandler.StopEgress:output_type -> livekit.EgressInfo + 6, // 9: rpc.EgressHandler.InfoUpdate:output_type -> livekit.EgressInfo + 5, // [5:10] is the sub-list for method output_type + 0, // [0:5] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_pkg_service_rpc_egress_proto_init() } +func file_pkg_service_rpc_egress_proto_init() { + if File_pkg_service_rpc_egress_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pkg_service_rpc_egress_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Empty); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_service_rpc_egress_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListActiveEgressRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_service_rpc_egress_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListActiveEgressResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pkg_service_rpc_egress_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 2, + }, + GoTypes: file_pkg_service_rpc_egress_proto_goTypes, + DependencyIndexes: file_pkg_service_rpc_egress_proto_depIdxs, + MessageInfos: file_pkg_service_rpc_egress_proto_msgTypes, + }.Build() + File_pkg_service_rpc_egress_proto = out.File + file_pkg_service_rpc_egress_proto_rawDesc = nil + file_pkg_service_rpc_egress_proto_goTypes = nil + file_pkg_service_rpc_egress_proto_depIdxs = nil +} diff --git a/pkg/service/rpc/egress.proto b/pkg/service/rpc/egress.proto new file mode 100644 index 000000000..4c343b412 --- /dev/null +++ b/pkg/service/rpc/egress.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; + +package rpc; + +option go_package = "github.com/livekit/livekit/pkg/service/rpc"; + +import "options.proto"; +import "livekit_rpc_internal.proto"; +import "livekit_egress.proto"; + +service EgressInternal { + rpc StartEgress(livekit.StartEgressRequest) returns (livekit.EgressInfo) { + option (psrpc.options).affinity_func = true; + }; + rpc ListActiveEgress(ListActiveEgressRequest) returns (ListActiveEgressResponse) { + option (psrpc.options).multi = true; + } +} + +service EgressHandler { + rpc UpdateStream(livekit.UpdateStreamRequest) returns (livekit.EgressInfo) { + option (psrpc.options).topics = true; + } + rpc StopEgress(livekit.StopEgressRequest) returns (livekit.EgressInfo) { + option (psrpc.options).topics = true; + } + rpc InfoUpdate(Empty) returns (livekit.EgressInfo) { + option (psrpc.options).subscription = true; + } +} + +message Empty {} + +message ListActiveEgressRequest {} + +message ListActiveEgressResponse { + repeated string egress_ids = 1; +} diff --git a/pkg/service/rpc/egress.psrpc.go b/pkg/service/rpc/egress.psrpc.go new file mode 100644 index 000000000..1340790e5 --- /dev/null +++ b/pkg/service/rpc/egress.psrpc.go @@ -0,0 +1,232 @@ +// Code generated by protoc-gen-psrpc v0.1.0, DO NOT EDIT. +// source: pkg/service/rpc/egress.proto + +package rpc + +import context "context" + +import psrpc1 "github.com/livekit/psrpc" +import livekit "github.com/livekit/protocol/livekit" +import livekit3 "github.com/livekit/protocol/livekit" + +// =============================== +// EgressInternal Client Interface +// =============================== + +type EgressInternalClient interface { + StartEgress(context.Context, *livekit3.StartEgressRequest, ...psrpc1.RequestOpt) (*livekit.EgressInfo, error) + + ListActiveEgress(context.Context, *ListActiveEgressRequest, ...psrpc1.RequestOpt) (<-chan *psrpc1.Response[*ListActiveEgressResponse], error) +} + +// =================================== +// EgressInternal ServerImpl Interface +// =================================== + +type EgressInternalServerImpl interface { + StartEgress(context.Context, *livekit3.StartEgressRequest) (*livekit.EgressInfo, error) + StartEgressAffinity(*livekit3.StartEgressRequest) float32 + + ListActiveEgress(context.Context, *ListActiveEgressRequest) (*ListActiveEgressResponse, error) +} + +// =============================== +// EgressInternal Server Interface +// =============================== + +type EgressInternalServer interface { +} + +// ===================== +// EgressInternal Client +// ===================== + +type egressInternalClient struct { + client psrpc1.RPCClient +} + +// NewEgressInternalClient creates a psrpc client that implements the EgressInternalClient interface. +func NewEgressInternalClient(clientID string, bus psrpc1.MessageBus, opts ...psrpc1.ClientOpt) (EgressInternalClient, error) { + rpcClient, err := psrpc1.NewRPCClient("EgressInternal", clientID, bus, opts...) + if err != nil { + return nil, err + } + + return &egressInternalClient{ + client: rpcClient, + }, nil +} + +func (c *egressInternalClient) StartEgress(ctx context.Context, req *livekit3.StartEgressRequest, opts ...psrpc1.RequestOpt) (*livekit.EgressInfo, error) { + return psrpc1.RequestTopicSingle[*livekit.EgressInfo](ctx, c.client, "StartEgress", "", req, opts...) +} + +func (c *egressInternalClient) ListActiveEgress(ctx context.Context, req *ListActiveEgressRequest, opts ...psrpc1.RequestOpt) (<-chan *psrpc1.Response[*ListActiveEgressResponse], error) { + return psrpc1.RequestTopicAll[*ListActiveEgressResponse](ctx, c.client, "ListActiveEgress", "", req, opts...) +} + +// ===================== +// EgressInternal Server +// ===================== + +type egressInternalServer struct { + svc EgressInternalServerImpl + rpc psrpc1.RPCServer +} + +// NewEgressInternalServer builds a RPCServer that can be used to handle +// requests that are routed to the right method in the provided svc implementation. +func NewEgressInternalServer(serverID string, svc EgressInternalServerImpl, bus psrpc1.MessageBus, opts ...psrpc1.ServerOpt) (EgressInternalServer, error) { + rpcServer := psrpc1.NewRPCServer("EgressInternal", serverID, bus, opts...) + + var err error + err = rpcServer.RegisterHandler(psrpc1.NewHandlerWithAffinity("StartEgress", svc.StartEgress, svc.StartEgressAffinity)) + if err != nil { + rpcServer.Close() + return nil, err + } + + err = rpcServer.RegisterHandler(psrpc1.NewHandler("ListActiveEgress", svc.ListActiveEgress)) + if err != nil { + rpcServer.Close() + return nil, err + } + + return &egressInternalServer{ + svc: svc, + rpc: rpcServer, + }, nil +} + +// ============================== +// EgressHandler Client Interface +// ============================== + +type EgressHandlerClient interface { + UpdateStream(context.Context, string, *livekit.UpdateStreamRequest, ...psrpc1.RequestOpt) (*livekit.EgressInfo, error) + + StopEgress(context.Context, string, *livekit.StopEgressRequest, ...psrpc1.RequestOpt) (*livekit.EgressInfo, error) + + SubscribeInfoUpdate(context.Context) (psrpc1.Subscription[*livekit.EgressInfo], error) +} + +// ================================== +// EgressHandler ServerImpl Interface +// ================================== + +type EgressHandlerServerImpl interface { + UpdateStream(context.Context, *livekit.UpdateStreamRequest) (*livekit.EgressInfo, error) + + StopEgress(context.Context, *livekit.StopEgressRequest) (*livekit.EgressInfo, error) +} + +// ============================== +// EgressHandler Server Interface +// ============================== + +type EgressHandlerServer interface { + RegisterUpdateStreamTopic(string) error + DeregisterUpdateStreamTopic(string) error + + RegisterStopEgressTopic(string) error + DeregisterStopEgressTopic(string) error + + PublishInfoUpdate(context.Context, *livekit.EgressInfo) error +} + +// ==================== +// EgressHandler Client +// ==================== + +type egressHandlerClient struct { + client psrpc1.RPCClient +} + +// NewEgressHandlerClient creates a psrpc client that implements the EgressHandlerClient interface. +func NewEgressHandlerClient(clientID string, bus psrpc1.MessageBus, opts ...psrpc1.ClientOpt) (EgressHandlerClient, error) { + rpcClient, err := psrpc1.NewRPCClient("EgressHandler", clientID, bus, opts...) + if err != nil { + return nil, err + } + + return &egressHandlerClient{ + client: rpcClient, + }, nil +} + +func (c *egressHandlerClient) UpdateStream(ctx context.Context, topic string, req *livekit.UpdateStreamRequest, opts ...psrpc1.RequestOpt) (*livekit.EgressInfo, error) { + return psrpc1.RequestTopicSingle[*livekit.EgressInfo](ctx, c.client, "UpdateStream", topic, req, opts...) +} + +func (c *egressHandlerClient) StopEgress(ctx context.Context, topic string, req *livekit.StopEgressRequest, opts ...psrpc1.RequestOpt) (*livekit.EgressInfo, error) { + return psrpc1.RequestTopicSingle[*livekit.EgressInfo](ctx, c.client, "StopEgress", topic, req, opts...) +} + +func (c *egressHandlerClient) SubscribeInfoUpdate(ctx context.Context) (psrpc1.Subscription[*livekit.EgressInfo], error) { + return psrpc1.SubscribeTopicQueue[*livekit.EgressInfo](ctx, c.client, "InfoUpdate", "") +} + +// ==================== +// EgressHandler Server +// ==================== + +type egressHandlerServer struct { + svc EgressHandlerServerImpl + rpc psrpc1.RPCServer +} + +// NewEgressHandlerServer builds a RPCServer that can be used to handle +// requests that are routed to the right method in the provided svc implementation. +func NewEgressHandlerServer(serverID string, svc EgressHandlerServerImpl, bus psrpc1.MessageBus, opts ...psrpc1.ServerOpt) (EgressHandlerServer, error) { + rpcServer := psrpc1.NewRPCServer("EgressHandler", serverID, bus, opts...) + + return &egressHandlerServer{ + svc: svc, + rpc: rpcServer, + }, nil +} + +func (s *egressHandlerServer) RegisterUpdateStreamTopic(topic string) error { + return s.rpc.RegisterHandler(psrpc1.NewTopicHandler("UpdateStream", topic, s.svc.UpdateStream)) +} + +func (s *egressHandlerServer) DeregisterUpdateStreamTopic(topic string) error { + return s.rpc.DeregisterTopic("UpdateStream", topic) +} + +func (s *egressHandlerServer) RegisterStopEgressTopic(topic string) error { + return s.rpc.RegisterHandler(psrpc1.NewTopicHandler("StopEgress", topic, s.svc.StopEgress)) +} + +func (s *egressHandlerServer) DeregisterStopEgressTopic(topic string) error { + return s.rpc.DeregisterTopic("StopEgress", topic) +} + +func (s *egressHandlerServer) PublishInfoUpdate(ctx context.Context, msg *livekit.EgressInfo) error { + return s.rpc.PublishTopic(ctx, "InfoUpdate", "", msg) +} + +var psrpcFileDescriptor0 = []byte{ + // 329 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0xd1, 0x4a, 0x02, 0x41, + 0x14, 0x65, 0x92, 0x2c, 0x6f, 0x19, 0x32, 0x05, 0xd9, 0xa6, 0x20, 0xfb, 0x14, 0x11, 0xbb, 0x60, + 0xf4, 0xd0, 0x63, 0x81, 0x94, 0xd0, 0x93, 0x12, 0x41, 0x2f, 0xb2, 0xce, 0xde, 0x6c, 0x50, 0x77, + 0xa6, 0x99, 0xab, 0xd0, 0x27, 0xf4, 0x3b, 0x7e, 0x4d, 0x9f, 0x13, 0xed, 0x8c, 0xb2, 0x18, 0x56, + 0x4f, 0xc3, 0x9c, 0x73, 0xef, 0xb9, 0xe7, 0x70, 0x2f, 0x34, 0xf4, 0x78, 0x14, 0x5b, 0x34, 0x73, + 0x29, 0x30, 0x36, 0x5a, 0xc4, 0x38, 0x32, 0x68, 0x6d, 0xa4, 0x8d, 0x22, 0xc5, 0x4b, 0x46, 0x8b, + 0xa0, 0xaa, 0x34, 0x49, 0x95, 0x79, 0x2c, 0x08, 0x26, 0x72, 0x8e, 0x63, 0x49, 0x03, 0xa3, 0xc5, + 0x40, 0x66, 0x84, 0x26, 0x4b, 0x26, 0x9e, 0x3b, 0x5a, 0x72, 0x45, 0x95, 0x70, 0x07, 0xb6, 0x3b, + 0x53, 0x4d, 0xef, 0xe1, 0x09, 0x1c, 0x3f, 0x48, 0x4b, 0x37, 0x82, 0xe4, 0x1c, 0x3b, 0x79, 0x49, + 0x0f, 0xdf, 0x66, 0x68, 0x29, 0xbc, 0x86, 0xfa, 0x4f, 0xca, 0x6a, 0x95, 0x59, 0xe4, 0x4d, 0x00, + 0xa7, 0x37, 0x90, 0xa9, 0xad, 0xb3, 0x56, 0xe9, 0xac, 0xd2, 0xab, 0x38, 0xa4, 0x9b, 0xda, 0xf6, + 0x82, 0xc1, 0x81, 0xeb, 0xe8, 0x7a, 0x37, 0xfc, 0x0e, 0xf6, 0xfa, 0x94, 0x18, 0x72, 0x30, 0x3f, + 0x8d, 0xbc, 0xaf, 0xa8, 0x80, 0xfa, 0xc9, 0xc1, 0xe1, 0x8a, 0x5c, 0x8a, 0xbc, 0xa8, 0xb0, 0xbc, + 0xf8, 0x60, 0x5b, 0x2d, 0xc6, 0x9f, 0xa0, 0xb6, 0x6e, 0x8b, 0x37, 0x22, 0xa3, 0x45, 0xb4, 0x21, + 0x48, 0xd0, 0xdc, 0xc0, 0xba, 0x2c, 0x4e, 0x78, 0x97, 0xb5, 0x3f, 0x19, 0x54, 0x1d, 0x75, 0x9f, + 0x64, 0xe9, 0x04, 0x0d, 0xef, 0xc2, 0xfe, 0xa3, 0x4e, 0x13, 0xc2, 0x3e, 0x19, 0x4c, 0xa6, 0xbc, + 0xb1, 0xf2, 0x55, 0x84, 0xff, 0x76, 0x5d, 0x67, 0xbc, 0x03, 0xd0, 0x27, 0xa5, 0xbd, 0xdf, 0xa0, + 0x90, 0x7e, 0x09, 0xfe, 0x4b, 0xe6, 0x0a, 0xe0, 0xfb, 0xef, 0xc6, 0x73, 0xc8, 0x83, 0xe5, 0x8b, + 0xfc, 0xa5, 0xad, 0xc6, 0x6e, 0x2f, 0x9e, 0xcf, 0x47, 0x92, 0x5e, 0x67, 0xc3, 0x48, 0xa8, 0x69, + 0xec, 0x0b, 0x57, 0xef, 0xda, 0xbd, 0x0d, 0xcb, 0xf9, 0x8d, 0x5c, 0x7e, 0x05, 0x00, 0x00, 0xff, + 0xff, 0x26, 0x11, 0xba, 0xff, 0x89, 0x02, 0x00, 0x00, +} diff --git a/pkg/service/rpc/egress_client.go b/pkg/service/rpc/egress_client.go new file mode 100644 index 000000000..eba43b4c4 --- /dev/null +++ b/pkg/service/rpc/egress_client.go @@ -0,0 +1,36 @@ +package rpc + +import ( + "github.com/livekit/protocol/livekit" + "github.com/livekit/psrpc" +) + +type EgressClient interface { + EgressInternalClient + EgressHandlerClient +} + +type egressClient struct { + EgressInternalClient + EgressHandlerClient +} + +func NewEgressClient(nodeID livekit.NodeID, bus psrpc.MessageBus) (EgressClient, error) { + if bus == nil { + return nil, nil + } + + clientID := string(nodeID) + internalClient, err := NewEgressInternalClient(clientID, bus) + if err != nil { + return nil, err + } + handlerClient, err := NewEgressHandlerClient(clientID, bus) + if err != nil { + return nil, err + } + return &egressClient{ + EgressInternalClient: internalClient, + EgressHandlerClient: handlerClient, + }, nil +} diff --git a/pkg/service/servicefakes/fake_egress_store.go b/pkg/service/servicefakes/fake_egress_store.go index 06ee8d9cc..3f85bb130 100644 --- a/pkg/service/servicefakes/fake_egress_store.go +++ b/pkg/service/servicefakes/fake_egress_store.go @@ -5,11 +5,25 @@ import ( "context" "sync" + version "github.com/hashicorp/go-version" "github.com/livekit/livekit-server/pkg/service" "github.com/livekit/protocol/livekit" ) type FakeEgressStore struct { + GetEgressVersionStub func(context.Context) (*version.Version, error) + getEgressVersionMutex sync.RWMutex + getEgressVersionArgsForCall []struct { + arg1 context.Context + } + getEgressVersionReturns struct { + result1 *version.Version + result2 error + } + getEgressVersionReturnsOnCall map[int]struct { + result1 *version.Version + result2 error + } ListEgressStub func(context.Context, livekit.RoomName) ([]*livekit.EgressInfo, error) listEgressMutex sync.RWMutex listEgressArgsForCall []struct { @@ -66,6 +80,70 @@ type FakeEgressStore struct { invocationsMutex sync.RWMutex } +func (fake *FakeEgressStore) GetEgressVersion(arg1 context.Context) (*version.Version, error) { + fake.getEgressVersionMutex.Lock() + ret, specificReturn := fake.getEgressVersionReturnsOnCall[len(fake.getEgressVersionArgsForCall)] + fake.getEgressVersionArgsForCall = append(fake.getEgressVersionArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.GetEgressVersionStub + fakeReturns := fake.getEgressVersionReturns + fake.recordInvocation("GetEgressVersion", []interface{}{arg1}) + fake.getEgressVersionMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeEgressStore) GetEgressVersionCallCount() int { + fake.getEgressVersionMutex.RLock() + defer fake.getEgressVersionMutex.RUnlock() + return len(fake.getEgressVersionArgsForCall) +} + +func (fake *FakeEgressStore) GetEgressVersionCalls(stub func(context.Context) (*version.Version, error)) { + fake.getEgressVersionMutex.Lock() + defer fake.getEgressVersionMutex.Unlock() + fake.GetEgressVersionStub = stub +} + +func (fake *FakeEgressStore) GetEgressVersionArgsForCall(i int) context.Context { + fake.getEgressVersionMutex.RLock() + defer fake.getEgressVersionMutex.RUnlock() + argsForCall := fake.getEgressVersionArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeEgressStore) GetEgressVersionReturns(result1 *version.Version, result2 error) { + fake.getEgressVersionMutex.Lock() + defer fake.getEgressVersionMutex.Unlock() + fake.GetEgressVersionStub = nil + fake.getEgressVersionReturns = struct { + result1 *version.Version + result2 error + }{result1, result2} +} + +func (fake *FakeEgressStore) GetEgressVersionReturnsOnCall(i int, result1 *version.Version, result2 error) { + fake.getEgressVersionMutex.Lock() + defer fake.getEgressVersionMutex.Unlock() + fake.GetEgressVersionStub = nil + if fake.getEgressVersionReturnsOnCall == nil { + fake.getEgressVersionReturnsOnCall = make(map[int]struct { + result1 *version.Version + result2 error + }) + } + fake.getEgressVersionReturnsOnCall[i] = struct { + result1 *version.Version + result2 error + }{result1, result2} +} + func (fake *FakeEgressStore) ListEgress(arg1 context.Context, arg2 livekit.RoomName) ([]*livekit.EgressInfo, error) { fake.listEgressMutex.Lock() ret, specificReturn := fake.listEgressReturnsOnCall[len(fake.listEgressArgsForCall)] @@ -323,6 +401,8 @@ func (fake *FakeEgressStore) UpdateEgressReturnsOnCall(i int, result1 error) { func (fake *FakeEgressStore) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() + fake.getEgressVersionMutex.RLock() + defer fake.getEgressVersionMutex.RUnlock() fake.listEgressMutex.RLock() defer fake.listEgressMutex.RUnlock() fake.loadEgressMutex.RLock() diff --git a/pkg/service/wire.go b/pkg/service/wire.go index bd58ed952..e2306d2a6 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -13,12 +13,14 @@ import ( "github.com/pkg/errors" "gopkg.in/yaml.v3" + "github.com/livekit/livekit-server/pkg/service/rpc" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/egress" "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" redisLiveKit "github.com/livekit/protocol/redis" "github.com/livekit/protocol/webhook" + "github.com/livekit/psrpc" "github.com/livekit/livekit-server/pkg/clientconfiguration" "github.com/livekit/livekit-server/pkg/config" @@ -42,6 +44,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live wire.Bind(new(livekit.RoomService), new(*RoomService)), telemetry.NewAnalyticsService, telemetry.NewTelemetryService, + getMessageBus, + rpc.NewEgressClient, egress.NewRedisRPCClient, getEgressStore, NewEgressLauncher, @@ -130,6 +134,13 @@ func createStore(rc redis.UniversalClient) ObjectStore { return NewLocalStore() } +func getMessageBus(rc redis.UniversalClient) psrpc.MessageBus { + if rc == nil { + return nil + } + return psrpc.NewRedisMessageBus(rc) +} + func getEgressStore(s ObjectStore) EgressStore { switch store := s.(type) { case *RedisStore: diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 619911a7d..fd5af95cc 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -12,6 +12,7 @@ import ( "github.com/livekit/livekit-server/pkg/clientconfiguration" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/service/rpc" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/egress" @@ -19,6 +20,7 @@ import ( "github.com/livekit/protocol/livekit" redis2 "github.com/livekit/protocol/redis" "github.com/livekit/protocol/webhook" + "github.com/livekit/psrpc" "github.com/pion/turn/v2" "github.com/pkg/errors" "gopkg.in/yaml.v3" @@ -45,6 +47,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } nodeID := getNodeID(currentNode) + messageBus := getMessageBus(universalClient) + egressClient, err := rpc.NewEgressClient(nodeID, messageBus) + if err != nil { + return nil, err + } rpcClient := egress.NewRedisRPCClient(nodeID, universalClient) egressStore := getEgressStore(objectStore) keyProvider, err := createKeyProvider(conf) @@ -57,15 +64,15 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } analyticsService := telemetry.NewAnalyticsService(conf, currentNode) telemetryService := telemetry.NewTelemetryService(notifier, analyticsService) - rtcEgressLauncher := NewEgressLauncher(rpcClient, egressStore, telemetryService) + rtcEgressLauncher := NewEgressLauncher(egressClient, rpcClient, egressStore, telemetryService) roomService, err := NewRoomService(roomConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher) if err != nil { return nil, err } - egressService := NewEgressService(rpcClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher) + egressService := NewEgressService(egressClient, rpcClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher) ingressConfig := getIngressConfig(conf) - rpc := ingress.NewRedisRPC(nodeID, universalClient) - ingressRPCClient := getIngressRPCClient(rpc) + ingressRPC := ingress.NewRedisRPC(nodeID, universalClient) + ingressRPCClient := getIngressRPCClient(ingressRPC) ingressStore := getIngressStore(objectStore) ingressService := NewIngressService(ingressConfig, ingressRPCClient, ingressStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService) @@ -156,6 +163,13 @@ func createStore(rc redis.UniversalClient) ObjectStore { return NewLocalStore() } +func getMessageBus(rc redis.UniversalClient) psrpc.MessageBus { + if rc == nil { + return nil + } + return psrpc.NewRedisMessageBus(rc) +} + func getEgressStore(s ObjectStore) EgressStore { switch store := s.(type) { case *RedisStore: @@ -178,8 +192,8 @@ func getIngressConfig(conf *config.Config) *config.IngressConfig { return &conf.Ingress } -func getIngressRPCClient(rpc ingress.RPC) ingress.RPCClient { - return rpc +func getIngressRPCClient(rpc2 ingress.RPC) ingress.RPCClient { + return rpc2 } func createClientConfiguration() clientconfiguration.ClientConfigurationManager {