From 30e99e8c6bae368070da9338dbb93e2c78c49df9 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 17 Jan 2021 00:12:05 -0800 Subject: [PATCH] redis based router & roomstore --- .gitignore | 1 - cmd/server/main.go | 60 +- go.mod | 2 + go.sum | 19 + magefile.go | 4 +- pkg/config/config.go | 13 +- pkg/routing/errors.go | 6 +- pkg/routing/interfaces.go | 18 +- pkg/routing/localrouter.go | 25 +- pkg/routing/messagechannel.go | 18 +- pkg/routing/node.go | 34 +- pkg/routing/redis.go | 88 +++ pkg/routing/redisrouter.go | 250 ++++++++ pkg/routing/routingfakes/fake_message_sink.go | 52 +- .../routingfakes/fake_message_source.go | 21 +- pkg/routing/routingfakes/fake_router.go | 228 +++++--- pkg/service/localroomstore.go | 3 - pkg/service/redisroomstore.go | 100 ++++ pkg/service/rtcrunner.go | 2 +- pkg/service/rtcservice.go | 15 +- pkg/service/server.go | 33 +- pkg/service/wire_gen.go | 2 +- pkg/utils/cachedredis.go | 63 ++ proto/internal.proto | 38 ++ proto/livekit/internal.pb.go | 539 ++++++++++++++++++ proto/livekit/model.pb.go | 314 +++------- proto/livekit/room.twirp.go | 1 - proto/model.proto | 14 - 28 files changed, 1559 insertions(+), 404 deletions(-) create mode 100644 pkg/routing/redis.go create mode 100644 pkg/routing/redisrouter.go create mode 100644 pkg/utils/cachedredis.go create mode 100644 proto/internal.proto create mode 100644 proto/livekit/internal.pb.go diff --git a/.gitignore b/.gitignore index c0a0548a2..6db1dab94 100644 --- a/.gitignore +++ b/.gitignore @@ -10,7 +10,6 @@ # checksums of file tree .checksumgo -proto/.checksumproto # Output of the go coverage tool, specifically when used with LiteIDE *.out diff --git a/cmd/server/main.go b/cmd/server/main.go index e5f1d3e17..e53548ad3 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -2,8 +2,10 @@ package main import ( "bytes" + "context" "errors" "fmt" + "io/ioutil" "math/rand" "os" "os/signal" @@ -12,6 +14,7 @@ import ( "syscall" "time" + "github.com/go-redis/redis/v8" "github.com/urfave/cli/v2" "github.com/livekit/livekit-server/pkg/auth" @@ -27,8 +30,12 @@ func main() { Name: "livekit-server", Flags: []cli.Flag{ &cli.StringFlag{ - Name: "config", - Usage: "path to LiveKit config", + Name: "config", + Usage: "path to LiveKit config file", + }, + &cli.StringFlag{ + Name: "config-body", + Usage: "LiveKit config in YAML, usually used via environment var", EnvVars: []string{"LIVEKIT_CONFIG"}, }, &cli.StringFlag{ @@ -74,7 +81,13 @@ func startServer(c *cli.Context) error { cpuProfile := c.String("cpuprofile") memProfile := c.String("memprofile") - conf, err := config.NewConfig(c.String("config")) + + confString, err := getConfigString(c) + if err != nil { + return err + } + + conf, err := config.NewConfig(confString) if err != nil { return err } @@ -125,8 +138,10 @@ func startServer(c *cli.Context) error { } // local routing and store - router := routing.NewLocalRouter(currentNode) - roomStore := service.NewLocalRoomStore() + router, roomStore, err := createRouterAndStore(conf, currentNode) + if err != nil { + return err + } server, err := service.InitializeServer(conf, keyProvider, roomStore, router, currentNode) @@ -146,6 +161,26 @@ func startServer(c *cli.Context) error { return server.Start() } +func createRouterAndStore(config *config.Config, node routing.LocalNode) (router routing.Router, store service.RoomStore, err error) { + if config.MultiNode { + rc := redis.NewClient(&redis.Options{ + Addr: config.Redis.Address, + Password: config.Redis.Password, + }) + if err = rc.Ping(context.Background()).Err(); err != nil { + return + } + + router = routing.NewRedisRouter(node, rc, false) + store = service.NewRedisRoomStore(rc) + } else { + // local routing and store + router = routing.NewLocalRouter(node) + store = service.NewLocalRoomStore() + } + return +} + func createKeyProvider(keyFile, keys string) (auth.KeyProvider, error) { // prefer keyfile if set if keyFile != "" { @@ -170,6 +205,21 @@ func createKeyProvider(keyFile, keys string) (auth.KeyProvider, error) { return nil, errors.New("one of key-file or keys must be provided in order to support a secure installation") } +func getConfigString(c *cli.Context) (string, error) { + configFile := c.String("config") + configBody := c.String("config-body") + if configBody == "" { + if configFile != "" { + content, err := ioutil.ReadFile(configFile) + if err != nil { + return "", err + } + configBody = string(content) + } + } + return configBody, nil +} + func generateKeys(c *cli.Context) error { apiKey := utils.NewGuid(utils.APIKeyPrefix) secret := utils.RandomSecret() diff --git a/go.mod b/go.mod index 9b6f9b278..b5dab816c 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,11 @@ go 1.15 require ( github.com/bep/debounce v1.2.0 github.com/gammazero/workerpool v1.1.1 + github.com/go-redis/redis/v8 v8.4.8 github.com/golang/protobuf v1.4.3 github.com/google/wire v0.4.0 github.com/gorilla/websocket v1.4.2 + github.com/karlseguin/ccache/v2 v2.0.7 github.com/lithammer/shortuuid/v3 v3.0.4 github.com/lunixbochs/vtclean v1.0.0 // indirect github.com/lytics/base62 v0.0.0-20180808010106-0ee4de5a5d6d diff --git a/go.sum b/go.sum index b8c51d063..8cd728c83 100644 --- a/go.sum +++ b/go.sum @@ -46,7 +46,9 @@ github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJm github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10 h1:Swpa1K6QvQznwJRcfTfQJmTE72DqScAa40E+fbHEXEE= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= @@ -76,6 +78,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f/go.mod h1:xH/i4TFMt8koVQZ6WFms69WAsDWr2XsYL3Hkl7jkoLE= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= @@ -106,6 +110,8 @@ github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgO github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-redis/redis/v8 v8.4.8 h1:sEG4g6Jq4hvQzbrNsVDNTDdxFCUnFC0jxuOp6tgALlA= +github.com/go-redis/redis/v8 v8.4.8/go.mod h1:/cTZsrSn1DPqRuOnSDuyH2OSvd9iX0iUGT0s7hYGIAg= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= @@ -143,6 +149,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -213,6 +221,11 @@ github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a/go.mod h1:UJSiEoRfvx github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= +github.com/karlseguin/ccache v1.0.1 h1:0gpC6z1qtv0cKmsi5Su5tTB6bJ2vm9bfOLACpDEB/Ro= +github.com/karlseguin/ccache v2.0.3+incompatible h1:j68C9tWOROiOLWTS/kCGg9IcJG+ACqn5+0+t8Oh83UU= +github.com/karlseguin/ccache/v2 v2.0.7 h1:y5Pfi4eiyYCOD6LS/Kj+o6Nb4M5Ngpw9qFQs+v44ZYM= +github.com/karlseguin/ccache/v2 v2.0.7/go.mod h1:2BDThcfQMf/c0jnZowt16eW405XIqZPavt+HoYEtcxQ= +github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -289,6 +302,8 @@ github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.3 h1:gph6h/qe9GSUw1NhH1gp+qb+h8rXD8Cy60Z32Qw3ELA= github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= +github.com/onsi/gomega v1.10.4 h1:NiTx7EEvBzu9sFOD1zORteLSt3o8gnlvZZwSE9TnY9U= +github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= @@ -445,6 +460,7 @@ github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4= github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/urfave/negroni v1.0.0 h1:kIimOitoypq34K7TG7DUaJ9kq/N4Ofuwi1sjz0KipXc= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= +github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0/go.mod h1:IXCdmsXIht47RaVFLEdVnh1t+pgYtTAhQGj73kz+2DM= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -455,6 +471,8 @@ go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opentelemetry.io/otel v0.15.0 h1:CZFy2lPhxd4HlhZnYK8gRyDotksO3Ip9rBweY1vVYJw= +go.opentelemetry.io/otel v0.15.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -534,6 +552,7 @@ golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTi golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201201195509-5d6afe98e0b7 h1:3uJsdck53FDIpWwLeAXlia9p4C8j0BO2xZrqzKpL0D8= golang.org/x/net v0.0.0-20201201195509-5d6afe98e0b7/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= diff --git a/magefile.go b/magefile.go index cedbba578..94703a938 100644 --- a/magefile.go +++ b/magefile.go @@ -45,6 +45,7 @@ func Proto() error { "proto/model.proto", "proto/room.proto", "proto/rtc.proto", + "proto/internal.proto", ) if err != nil { return err @@ -79,7 +80,6 @@ func Proto() error { "--plugin=twirp="+twirp_path, "-I=proto", "proto/room.proto", - "proto/model.proto", ) connectStd(cmd) if err := cmd.Run(); err != nil { @@ -93,6 +93,8 @@ func Proto() error { "--plugin=go="+protoc_go_path, "-I=proto", "proto/rtc.proto", + "proto/internal.proto", + "proto/model.proto", ) connectStd(cmd) if err := cmd.Run(); err != nil { diff --git a/pkg/config/config.go b/pkg/config/config.go index 548cc64bf..87bdf4ee7 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -6,8 +6,9 @@ import ( ) type Config struct { - Port uint32 `yaml:"port"` - RTC RTCConfig `yaml:"rtc"` + Port uint32 `yaml:"port"` + RTC RTCConfig `yaml:"rtc"` + Redis RedisConfig `yaml:"redis"` // multi-node configuration, MultiNode bool `yaml:"multi_node"` @@ -24,6 +25,11 @@ type RTCConfig struct { MaxBufferTime int `yaml:"max_buffer_time"` } +type RedisConfig struct { + Address string `yaml:"address"` + Password string `yaml:"password"` +} + func NewConfig(confString string) (*Config, error) { // start with defaults conf := &Config{ @@ -35,6 +41,9 @@ func NewConfig(confString string) (*Config, error) { "stun.l.google.com:19302", }, }, + Redis: RedisConfig{ + Address: "localhost:6379", + }, } if confString != "" { yaml.Unmarshal([]byte(confString), conf) diff --git a/pkg/routing/errors.go b/pkg/routing/errors.go index ded1fc2a9..1f1efebe0 100644 --- a/pkg/routing/errors.go +++ b/pkg/routing/errors.go @@ -3,6 +3,8 @@ package routing import "errors" var ( - ErrNodeNotFound = errors.New("could not find node") - ErrHandlerNotDefined = errors.New("handler not defined") + ErrNotFound = errors.New("could not find object") + ErrHandlerNotDefined = errors.New("handler not defined") + ErrIncorrectNodeForRoom = errors.New("incorrect node for the current room") + errInvalidRouterMessage = errors.New("invalid router message") ) diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 287abfe62..882995d48 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -1,6 +1,8 @@ package routing import ( + "google.golang.org/protobuf/proto" + "github.com/livekit/livekit-server/proto/livekit" ) @@ -9,13 +11,14 @@ import ( // routes signaling message //counterfeiter:generate . MessageSink type MessageSink interface { - WriteMessage(msg interface{}) error + WriteMessage(msg proto.Message) error Close() + OnClose(f func()) } //counterfeiter:generate . MessageSource type MessageSource interface { - ReadMessage() (interface{}, error) + ReadMessage() (proto.Message, error) } type ParticipantCallback func(roomId, participantId, participantName string, requestSource MessageSource, responseSink MessageSink) @@ -23,14 +26,15 @@ type ParticipantCallback func(roomId, participantId, participantName string, req //counterfeiter:generate . Router type Router interface { GetNodeIdForRoom(roomName string) (string, error) - RegisterNode(node *livekit.Node) error + RegisterNode() error + UnregisterNode() error GetNode(nodeId string) (*livekit.Node, error) - StartParticipant(roomName, participantId, participantName, nodeId string) error - SetRTCNode(participantId, nodeId string) error + SetParticipantRTCNode(participantId, nodeId string) error // functions for websocket handler - GetRequestSink(participantId string) MessageSink - GetResponseSource(participantId string) MessageSource + GetRequestSink(participantId string) (MessageSink, error) + GetResponseSource(participantId string) (MessageSource, error) + StartParticipant(roomName, participantId, participantName string) error OnNewParticipant(callback ParticipantCallback) Start() error diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index 0d6f0dc6b..72872736c 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -29,7 +29,11 @@ func (r *LocalRouter) GetNodeIdForRoom(roomName string) (string, error) { return r.currentNode.Id, nil } -func (r *LocalRouter) RegisterNode(node *livekit.Node) error { +func (r *LocalRouter) RegisterNode() error { + return nil +} + +func (r *LocalRouter) UnregisterNode() error { return nil } @@ -37,10 +41,10 @@ func (r *LocalRouter) GetNode(nodeId string) (*livekit.Node, error) { if nodeId == r.currentNode.Id { return r.currentNode, nil } - return nil, ErrNodeNotFound + return nil, ErrNotFound } -func (r *LocalRouter) StartParticipant(roomName, participantId, participantName, nodeId string) error { +func (r *LocalRouter) StartParticipant(roomName, participantId, participantName string) error { // treat it as a new participant connecting if r.onNewParticipant == nil { return ErrHandlerNotDefined @@ -55,18 +59,18 @@ func (r *LocalRouter) StartParticipant(roomName, participantId, participantName, return nil } -func (r *LocalRouter) SetRTCNode(participantId, nodeId string) error { +func (r *LocalRouter) SetParticipantRTCNode(participantId, nodeId string) error { // nothing to be done return nil } // for a local router, sink and source are pointing to the same spot -func (r *LocalRouter) GetRequestSink(participantId string) MessageSink { - return r.getOrCreateMessageChannel(r.requestChannels, participantId) +func (r *LocalRouter) GetRequestSink(participantId string) (MessageSink, error) { + return r.getOrCreateMessageChannel(r.requestChannels, participantId), nil } -func (r *LocalRouter) GetResponseSource(participantId string) MessageSource { - return r.getOrCreateMessageChannel(r.responseChannels, participantId) +func (r *LocalRouter) GetResponseSource(participantId string) (MessageSource, error) { + return r.getOrCreateMessageChannel(r.responseChannels, participantId), nil } func (r *LocalRouter) OnNewParticipant(callback ParticipantCallback) { @@ -91,6 +95,11 @@ func (r *LocalRouter) getOrCreateMessageChannel(target map[string]*MessageChanne } mc = NewMessageChannel() + mc.onClose = func() { + r.lock.Lock() + delete(target, participantId) + r.lock.Unlock() + } r.lock.Lock() target[participantId] = mc r.lock.Unlock() diff --git a/pkg/routing/messagechannel.go b/pkg/routing/messagechannel.go index f0fe34f22..285dacfdb 100644 --- a/pkg/routing/messagechannel.go +++ b/pkg/routing/messagechannel.go @@ -2,24 +2,31 @@ package routing import ( "io" + + "google.golang.org/protobuf/proto" ) type MessageChannel struct { - msgChan chan interface{} + msgChan chan proto.Message + onClose func() } func NewMessageChannel() *MessageChannel { return &MessageChannel{ - msgChan: make(chan interface{}, 1), + msgChan: make(chan proto.Message, 1), } } -func (m *MessageChannel) WriteMessage(msg interface{}) error { +func (m *MessageChannel) OnClose(f func()) { + m.onClose = f +} + +func (m *MessageChannel) WriteMessage(msg proto.Message) error { m.msgChan <- msg return nil } -func (m *MessageChannel) ReadMessage() (interface{}, error) { +func (m *MessageChannel) ReadMessage() (proto.Message, error) { msg := <-m.msgChan // channel closed if msg == nil { @@ -30,4 +37,7 @@ func (m *MessageChannel) ReadMessage() (interface{}, error) { func (m *MessageChannel) Close() { close(m.msgChan) + if m.onClose != nil { + m.onClose() + } } diff --git a/pkg/routing/node.go b/pkg/routing/node.go index d066ba7f5..d36d21a8b 100644 --- a/pkg/routing/node.go +++ b/pkg/routing/node.go @@ -1,8 +1,10 @@ package routing import ( + "bytes" "context" "fmt" + "net" "runtime" "time" @@ -30,7 +32,7 @@ func NewLocalNode(conf *config.Config) (LocalNode, error) { return nil, err } return &livekit.Node{ - Id: utils.NewGuid(utils.NodePrefix), + Id: fmt.Sprintf("%s%16.16X", utils.NodePrefix, macUint64()), Ip: ip, NumCpus: uint32(runtime.NumCPU()), }, nil @@ -91,3 +93,33 @@ func GetLocalIP(stunServers []string) (string, error) { return nodeIp, nil } + +func macUint64() uint64 { + interfaces, err := net.Interfaces() + if err != nil { + return 0 + } + + for _, i := range interfaces { + if i.Flags&net.FlagUp != 0 && bytes.Compare(i.HardwareAddr, nil) != 0 { + + // Skip locally administered addresses + if i.HardwareAddr[0]&2 == 2 { + continue + } + + var mac uint64 + for j, b := range i.HardwareAddr { + if j >= 8 { + break + } + mac <<= 8 + mac += uint64(b) + } + + return mac + } + } + + return 0 +} diff --git a/pkg/routing/redis.go b/pkg/routing/redis.go new file mode 100644 index 000000000..e8441975a --- /dev/null +++ b/pkg/routing/redis.go @@ -0,0 +1,88 @@ +package routing + +import ( + "context" + + "github.com/go-redis/redis/v8" + "google.golang.org/protobuf/proto" + + "github.com/livekit/livekit-server/proto/livekit" +) + +const ( + // hash of node_id => Node proto + NodesKey = "nodes" + + // hash of room_name => node_id + NodeRoomKey = "room_node_map" +) + +var redisCtx = context.Background() + +// location of the participant's RTC connection, hash +func participantRTCKey(participantId string) string { + return "participant_rtc:" + participantId +} + +// location of the participant's Signal connection, hash +func participantSignalKey(participantId string) string { + return "participant_signal:" + participantId +} + +func nodeChannel(nodeId string) string { + return "node_channel:" + nodeId +} + +func publishRouterMessage(rc *redis.Client, nodeId string, participantId string, msg proto.Message) error { + rm := &livekit.RouterMessage{ + ParticipantId: participantId, + } + switch o := msg.(type) { + case *livekit.StartSession: + rm.Message = &livekit.RouterMessage_StartSession{ + StartSession: o, + } + case *livekit.SignalRequest: + rm.Message = &livekit.RouterMessage_Request{ + Request: o, + } + case *livekit.SignalResponse: + rm.Message = &livekit.RouterMessage_Response{ + Response: o, + } + case *livekit.EndSession: + rm.Message = &livekit.RouterMessage_EndSession{ + EndSession: o, + } + default: + return errInvalidRouterMessage + } + data, err := proto.Marshal(rm) + if err != nil { + return err + } + return rc.Publish(redisCtx, nodeChannel(nodeId), data).Err() +} + +type RedisSink struct { + rc *redis.Client + nodeId string + participantId string + channel string + onClose func() +} + +func (s *RedisSink) WriteMessage(msg proto.Message) error { + return publishRouterMessage(s.rc, s.nodeId, s.participantId, msg) +} + +func (s *RedisSink) Close() { + publishRouterMessage(s.rc, s.nodeId, s.participantId, &livekit.EndSession{}) + if s.onClose != nil { + s.onClose() + } +} + +func (s *RedisSink) OnClose(f func()) { + s.onClose = f +} diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go new file mode 100644 index 000000000..36dc0b073 --- /dev/null +++ b/pkg/routing/redisrouter.go @@ -0,0 +1,250 @@ +package routing + +import ( + "context" + "time" + + "github.com/go-redis/redis/v8" + "google.golang.org/protobuf/proto" + + "github.com/livekit/livekit-server/pkg/logger" + "github.com/livekit/livekit-server/pkg/utils" + "github.com/livekit/livekit-server/proto/livekit" +) + +const ( + defaultCacheTTL = time.Minute +) + +// TODO: need to implement redis key cleanup, for when clients disconnect and such + +type RedisRouter struct { + LocalRouter + useLocal bool + rc *redis.Client + cr *utils.CachedRedis + ctx context.Context + + redisSinks map[string]*RedisSink + cancel func() +} + +func NewRedisRouter(currentNode LocalNode, rc *redis.Client, useLocal bool) *RedisRouter { + rr := &RedisRouter{ + LocalRouter: *NewLocalRouter(currentNode), + useLocal: useLocal, + rc: rc, + } + rr.ctx, rr.cancel = context.WithCancel(context.Background()) + rr.cr = utils.NewCachedRedis(rr.ctx, rr.rc) + return rr +} + +func (r *RedisRouter) RegisterNode() error { + data, err := proto.Marshal((*livekit.Node)(r.currentNode)) + if err != nil { + return err + } + r.cr.ExpireHash(NodesKey, r.currentNode.Id) + return r.rc.HSet(r.ctx, NodesKey, data).Err() +} + +func (r *RedisRouter) UnregisterNode() error { + r.rc.HDel(r.ctx, NodesKey, r.currentNode.Id) + return nil +} + +func (r *RedisRouter) GetNodeIdForRoom(roomName string) (string, error) { + return r.cr.CachedHGet(NodeRoomKey, roomName) +} + +func (r *RedisRouter) GetNode(nodeId string) (*livekit.Node, error) { + data, err := r.cr.CachedHGet(NodesKey, nodeId) + if err != nil { + return nil, err + } + n := livekit.Node{} + if err = proto.Unmarshal([]byte(data), &n); err != nil { + return nil, err + } + return &n, nil +} + +func (r *RedisRouter) SetParticipantRTCNode(participantId, nodeId string) error { + r.cr.Expire(participantRTCKey(participantId)) + return r.rc.Set(r.ctx, participantRTCKey(participantId), nodeId, 0).Err() +} + +// for a local router, sink and source are pointing to the same spot +func (r *RedisRouter) GetRequestSink(participantId string) (MessageSink, error) { + // request should go to RTC node + rtcNode, err := r.getParticipantRTCNode(participantId) + if err != nil { + return nil, err + } + + if rtcNode == r.currentNode.Id && r.useLocal { + return r.LocalRouter.GetRequestSink(participantId) + } + + sink := r.getOrCreateRedisSink(rtcNode, participantId) + return sink, nil +} + +func (r *RedisRouter) GetResponseSource(participantId string) (MessageSource, error) { + // request should go to RTC node + rtcNode, err := r.getParticipantRTCNode(participantId) + if err != nil { + return nil, err + } + + if rtcNode == r.currentNode.Id && r.useLocal { + return r.LocalRouter.GetResponseSource(participantId) + } + + // a message channel that we'll send data into + source := r.getOrCreateMessageChannel(r.responseChannels, participantId) + return source, nil +} + +func (r *RedisRouter) StartParticipant(roomName, participantId, participantName string) error { + // find the node where the room is hosted at + rtcNode, err := r.GetNodeIdForRoom(roomName) + if err != nil { + return err + } + + // StartParticipant should only be called on the current node + if rtcNode != r.currentNode.Id { + return ErrIncorrectNodeForRoom + } + + if r.useLocal { + return r.LocalRouter.StartParticipant(roomName, participantId, participantId) + } + + // find signal node to send responses back + signalNode, err := r.getParticipantSignalNode(participantId) + if err != nil { + return err + } + + // treat it as a new participant connecting + if r.onNewParticipant == nil { + return ErrHandlerNotDefined + } + r.onNewParticipant( + roomName, + participantId, + participantName, + r.getOrCreateMessageChannel(r.requestChannels, participantId), + r.getOrCreateRedisSink(signalNode, participantId), + ) + return nil +} + +func (r *RedisRouter) Start() error { + go r.subscribeWorker() + return nil +} + +func (r *RedisRouter) Stop() { + r.cancel() +} + +func (r *RedisRouter) getOrCreateRedisSink(nodeId string, participantId string) *RedisSink { + r.lock.RLock() + sink := r.redisSinks[participantId] + r.lock.RUnlock() + + if sink != nil { + return sink + } + + sink = &RedisSink{ + rc: r.rc, + nodeId: nodeId, + participantId: participantId, + channel: nodeChannel(nodeId), + } + sink.OnClose(func() { + r.lock.Lock() + delete(r.redisSinks, participantId) + r.lock.Unlock() + }) + r.lock.Lock() + r.redisSinks[participantId] = sink + r.lock.Unlock() + return nil +} + +func (r *RedisRouter) getParticipantRTCNode(participantId string) (string, error) { + return r.cr.CachedGet(participantRTCKey(participantId)) +} + +func (r *RedisRouter) getParticipantSignalNode(participantId string) (nodeId string, err error) { + return r.cr.CachedGet(participantSignalKey(participantId)) +} + +func (r *RedisRouter) subscribeWorker() { + sub := r.rc.Subscribe(redisCtx, nodeChannel(r.currentNode.Id)) + + for r.ctx.Err() == nil { + obj, err := sub.Receive(r.ctx) + if err != nil { + // TODO: retry? ignore? at a minimum need to sleep here to retry + time.Sleep(100 * time.Millisecond) + continue + } + msg, ok := obj.(*redis.Message) + if !ok { + continue + } + + rm := livekit.RouterMessage{} + err = proto.Unmarshal([]byte(msg.Payload), &rm) + pId := rm.ParticipantId + + switch rmb := rm.Message.(type) { + case *livekit.RouterMessage_StartSession: + // RTC session should start on this node + err = r.StartParticipant(rmb.StartSession.RoomName, pId, rmb.StartSession.ParticipantName) + if err != nil { + logger.Errorw("could not start participant", "error", err) + } + + case *livekit.RouterMessage_Request: + // in the event the current node is an RTC node, push to request channels + reqSink := r.getOrCreateMessageChannel(r.requestChannels, pId) + err = reqSink.WriteMessage(rmb.Request) + if err != nil { + logger.Errorw("could not write to request channel", + "participant", pId, + "error", err) + } + + case *livekit.RouterMessage_Response: + // in the event the current node is an Signal node, push to response channels + resSink := r.getOrCreateMessageChannel(r.responseChannels, pId) + err = resSink.WriteMessage(rmb.Response) + if err != nil { + logger.Errorw("could not write to response channel", + "participant", pId, + "error", err) + } + + case *livekit.RouterMessage_EndSession: + signalNode, err := r.getParticipantRTCNode(pId) + if err == nil { + logger.Errorw("could not get participant RTC node", + "error", err) + continue + } + // EndSession can only be initiated on an RTC node, is handled on the signal node + if signalNode == r.currentNode.Id { + resSink := r.getOrCreateMessageChannel(r.responseChannels, pId) + resSink.Close() + } + } + } +} diff --git a/pkg/routing/routingfakes/fake_message_sink.go b/pkg/routing/routingfakes/fake_message_sink.go index f53407344..457bc6ffa 100644 --- a/pkg/routing/routingfakes/fake_message_sink.go +++ b/pkg/routing/routingfakes/fake_message_sink.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/livekit/livekit-server/pkg/routing" + "google.golang.org/protobuf/reflect/protoreflect" ) type FakeMessageSink struct { @@ -12,10 +13,15 @@ type FakeMessageSink struct { closeMutex sync.RWMutex closeArgsForCall []struct { } - WriteMessageStub func(interface{}) error + OnCloseStub func(func()) + onCloseMutex sync.RWMutex + onCloseArgsForCall []struct { + arg1 func() + } + WriteMessageStub func(protoreflect.ProtoMessage) error writeMessageMutex sync.RWMutex writeMessageArgsForCall []struct { - arg1 interface{} + arg1 protoreflect.ProtoMessage } writeMessageReturns struct { result1 error @@ -51,11 +57,43 @@ func (fake *FakeMessageSink) CloseCalls(stub func()) { fake.CloseStub = stub } -func (fake *FakeMessageSink) WriteMessage(arg1 interface{}) error { +func (fake *FakeMessageSink) OnClose(arg1 func()) { + fake.onCloseMutex.Lock() + fake.onCloseArgsForCall = append(fake.onCloseArgsForCall, struct { + arg1 func() + }{arg1}) + stub := fake.OnCloseStub + fake.recordInvocation("OnClose", []interface{}{arg1}) + fake.onCloseMutex.Unlock() + if stub != nil { + fake.OnCloseStub(arg1) + } +} + +func (fake *FakeMessageSink) OnCloseCallCount() int { + fake.onCloseMutex.RLock() + defer fake.onCloseMutex.RUnlock() + return len(fake.onCloseArgsForCall) +} + +func (fake *FakeMessageSink) OnCloseCalls(stub func(func())) { + fake.onCloseMutex.Lock() + defer fake.onCloseMutex.Unlock() + fake.OnCloseStub = stub +} + +func (fake *FakeMessageSink) OnCloseArgsForCall(i int) func() { + fake.onCloseMutex.RLock() + defer fake.onCloseMutex.RUnlock() + argsForCall := fake.onCloseArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeMessageSink) WriteMessage(arg1 protoreflect.ProtoMessage) error { fake.writeMessageMutex.Lock() ret, specificReturn := fake.writeMessageReturnsOnCall[len(fake.writeMessageArgsForCall)] fake.writeMessageArgsForCall = append(fake.writeMessageArgsForCall, struct { - arg1 interface{} + arg1 protoreflect.ProtoMessage }{arg1}) stub := fake.WriteMessageStub fakeReturns := fake.writeMessageReturns @@ -76,13 +114,13 @@ func (fake *FakeMessageSink) WriteMessageCallCount() int { return len(fake.writeMessageArgsForCall) } -func (fake *FakeMessageSink) WriteMessageCalls(stub func(interface{}) error) { +func (fake *FakeMessageSink) WriteMessageCalls(stub func(protoreflect.ProtoMessage) error) { fake.writeMessageMutex.Lock() defer fake.writeMessageMutex.Unlock() fake.WriteMessageStub = stub } -func (fake *FakeMessageSink) WriteMessageArgsForCall(i int) interface{} { +func (fake *FakeMessageSink) WriteMessageArgsForCall(i int) protoreflect.ProtoMessage { fake.writeMessageMutex.RLock() defer fake.writeMessageMutex.RUnlock() argsForCall := fake.writeMessageArgsForCall[i] @@ -117,6 +155,8 @@ func (fake *FakeMessageSink) Invocations() map[string][][]interface{} { defer fake.invocationsMutex.RUnlock() fake.closeMutex.RLock() defer fake.closeMutex.RUnlock() + fake.onCloseMutex.RLock() + defer fake.onCloseMutex.RUnlock() fake.writeMessageMutex.RLock() defer fake.writeMessageMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/pkg/routing/routingfakes/fake_message_source.go b/pkg/routing/routingfakes/fake_message_source.go index cce7abec9..4f2cd28a1 100644 --- a/pkg/routing/routingfakes/fake_message_source.go +++ b/pkg/routing/routingfakes/fake_message_source.go @@ -5,26 +5,27 @@ import ( "sync" "github.com/livekit/livekit-server/pkg/routing" + "google.golang.org/protobuf/reflect/protoreflect" ) type FakeMessageSource struct { - ReadMessageStub func() (interface{}, error) + ReadMessageStub func() (protoreflect.ProtoMessage, error) readMessageMutex sync.RWMutex readMessageArgsForCall []struct { } readMessageReturns struct { - result1 interface{} + result1 protoreflect.ProtoMessage result2 error } readMessageReturnsOnCall map[int]struct { - result1 interface{} + result1 protoreflect.ProtoMessage result2 error } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } -func (fake *FakeMessageSource) ReadMessage() (interface{}, error) { +func (fake *FakeMessageSource) ReadMessage() (protoreflect.ProtoMessage, error) { fake.readMessageMutex.Lock() ret, specificReturn := fake.readMessageReturnsOnCall[len(fake.readMessageArgsForCall)] fake.readMessageArgsForCall = append(fake.readMessageArgsForCall, struct { @@ -48,34 +49,34 @@ func (fake *FakeMessageSource) ReadMessageCallCount() int { return len(fake.readMessageArgsForCall) } -func (fake *FakeMessageSource) ReadMessageCalls(stub func() (interface{}, error)) { +func (fake *FakeMessageSource) ReadMessageCalls(stub func() (protoreflect.ProtoMessage, error)) { fake.readMessageMutex.Lock() defer fake.readMessageMutex.Unlock() fake.ReadMessageStub = stub } -func (fake *FakeMessageSource) ReadMessageReturns(result1 interface{}, result2 error) { +func (fake *FakeMessageSource) ReadMessageReturns(result1 protoreflect.ProtoMessage, result2 error) { fake.readMessageMutex.Lock() defer fake.readMessageMutex.Unlock() fake.ReadMessageStub = nil fake.readMessageReturns = struct { - result1 interface{} + result1 protoreflect.ProtoMessage result2 error }{result1, result2} } -func (fake *FakeMessageSource) ReadMessageReturnsOnCall(i int, result1 interface{}, result2 error) { +func (fake *FakeMessageSource) ReadMessageReturnsOnCall(i int, result1 protoreflect.ProtoMessage, result2 error) { fake.readMessageMutex.Lock() defer fake.readMessageMutex.Unlock() fake.ReadMessageStub = nil if fake.readMessageReturnsOnCall == nil { fake.readMessageReturnsOnCall = make(map[int]struct { - result1 interface{} + result1 protoreflect.ProtoMessage result2 error }) } fake.readMessageReturnsOnCall[i] = struct { - result1 interface{} + result1 protoreflect.ProtoMessage result2 error }{result1, result2} } diff --git a/pkg/routing/routingfakes/fake_router.go b/pkg/routing/routingfakes/fake_router.go index d5f07be3b..b23e8b8d1 100644 --- a/pkg/routing/routingfakes/fake_router.go +++ b/pkg/routing/routingfakes/fake_router.go @@ -35,37 +35,40 @@ type FakeRouter struct { result1 string result2 error } - GetRequestSinkStub func(string) routing.MessageSink + GetRequestSinkStub func(string) (routing.MessageSink, error) getRequestSinkMutex sync.RWMutex getRequestSinkArgsForCall []struct { arg1 string } getRequestSinkReturns struct { result1 routing.MessageSink + result2 error } getRequestSinkReturnsOnCall map[int]struct { result1 routing.MessageSink + result2 error } - GetResponseSourceStub func(string) routing.MessageSource + GetResponseSourceStub func(string) (routing.MessageSource, error) getResponseSourceMutex sync.RWMutex getResponseSourceArgsForCall []struct { arg1 string } getResponseSourceReturns struct { result1 routing.MessageSource + result2 error } getResponseSourceReturnsOnCall map[int]struct { result1 routing.MessageSource + result2 error } OnNewParticipantStub func(routing.ParticipantCallback) onNewParticipantMutex sync.RWMutex onNewParticipantArgsForCall []struct { arg1 routing.ParticipantCallback } - RegisterNodeStub func(*livekit.Node) error + RegisterNodeStub func() error registerNodeMutex sync.RWMutex registerNodeArgsForCall []struct { - arg1 *livekit.Node } registerNodeReturns struct { result1 error @@ -73,16 +76,16 @@ type FakeRouter struct { registerNodeReturnsOnCall map[int]struct { result1 error } - SetRTCNodeStub func(string, string) error - setRTCNodeMutex sync.RWMutex - setRTCNodeArgsForCall []struct { + SetParticipantRTCNodeStub func(string, string) error + setParticipantRTCNodeMutex sync.RWMutex + setParticipantRTCNodeArgsForCall []struct { arg1 string arg2 string } - setRTCNodeReturns struct { + setParticipantRTCNodeReturns struct { result1 error } - setRTCNodeReturnsOnCall map[int]struct { + setParticipantRTCNodeReturnsOnCall map[int]struct { result1 error } StartStub func() error @@ -95,13 +98,12 @@ type FakeRouter struct { startReturnsOnCall map[int]struct { result1 error } - StartParticipantStub func(string, string, string, string) error + StartParticipantStub func(string, string, string) error startParticipantMutex sync.RWMutex startParticipantArgsForCall []struct { arg1 string arg2 string arg3 string - arg4 string } startParticipantReturns struct { result1 error @@ -113,6 +115,16 @@ type FakeRouter struct { stopMutex sync.RWMutex stopArgsForCall []struct { } + UnregisterNodeStub func() error + unregisterNodeMutex sync.RWMutex + unregisterNodeArgsForCall []struct { + } + unregisterNodeReturns struct { + result1 error + } + unregisterNodeReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -245,7 +257,7 @@ func (fake *FakeRouter) GetNodeIdForRoomReturnsOnCall(i int, result1 string, res }{result1, result2} } -func (fake *FakeRouter) GetRequestSink(arg1 string) routing.MessageSink { +func (fake *FakeRouter) GetRequestSink(arg1 string) (routing.MessageSink, error) { fake.getRequestSinkMutex.Lock() ret, specificReturn := fake.getRequestSinkReturnsOnCall[len(fake.getRequestSinkArgsForCall)] fake.getRequestSinkArgsForCall = append(fake.getRequestSinkArgsForCall, struct { @@ -259,9 +271,9 @@ func (fake *FakeRouter) GetRequestSink(arg1 string) routing.MessageSink { return stub(arg1) } if specificReturn { - return ret.result1 + return ret.result1, ret.result2 } - return fakeReturns.result1 + return fakeReturns.result1, fakeReturns.result2 } func (fake *FakeRouter) GetRequestSinkCallCount() int { @@ -270,7 +282,7 @@ func (fake *FakeRouter) GetRequestSinkCallCount() int { return len(fake.getRequestSinkArgsForCall) } -func (fake *FakeRouter) GetRequestSinkCalls(stub func(string) routing.MessageSink) { +func (fake *FakeRouter) GetRequestSinkCalls(stub func(string) (routing.MessageSink, error)) { fake.getRequestSinkMutex.Lock() defer fake.getRequestSinkMutex.Unlock() fake.GetRequestSinkStub = stub @@ -283,30 +295,33 @@ func (fake *FakeRouter) GetRequestSinkArgsForCall(i int) string { return argsForCall.arg1 } -func (fake *FakeRouter) GetRequestSinkReturns(result1 routing.MessageSink) { +func (fake *FakeRouter) GetRequestSinkReturns(result1 routing.MessageSink, result2 error) { fake.getRequestSinkMutex.Lock() defer fake.getRequestSinkMutex.Unlock() fake.GetRequestSinkStub = nil fake.getRequestSinkReturns = struct { result1 routing.MessageSink - }{result1} + result2 error + }{result1, result2} } -func (fake *FakeRouter) GetRequestSinkReturnsOnCall(i int, result1 routing.MessageSink) { +func (fake *FakeRouter) GetRequestSinkReturnsOnCall(i int, result1 routing.MessageSink, result2 error) { fake.getRequestSinkMutex.Lock() defer fake.getRequestSinkMutex.Unlock() fake.GetRequestSinkStub = nil if fake.getRequestSinkReturnsOnCall == nil { fake.getRequestSinkReturnsOnCall = make(map[int]struct { result1 routing.MessageSink + result2 error }) } fake.getRequestSinkReturnsOnCall[i] = struct { result1 routing.MessageSink - }{result1} + result2 error + }{result1, result2} } -func (fake *FakeRouter) GetResponseSource(arg1 string) routing.MessageSource { +func (fake *FakeRouter) GetResponseSource(arg1 string) (routing.MessageSource, error) { fake.getResponseSourceMutex.Lock() ret, specificReturn := fake.getResponseSourceReturnsOnCall[len(fake.getResponseSourceArgsForCall)] fake.getResponseSourceArgsForCall = append(fake.getResponseSourceArgsForCall, struct { @@ -320,9 +335,9 @@ func (fake *FakeRouter) GetResponseSource(arg1 string) routing.MessageSource { return stub(arg1) } if specificReturn { - return ret.result1 + return ret.result1, ret.result2 } - return fakeReturns.result1 + return fakeReturns.result1, fakeReturns.result2 } func (fake *FakeRouter) GetResponseSourceCallCount() int { @@ -331,7 +346,7 @@ func (fake *FakeRouter) GetResponseSourceCallCount() int { return len(fake.getResponseSourceArgsForCall) } -func (fake *FakeRouter) GetResponseSourceCalls(stub func(string) routing.MessageSource) { +func (fake *FakeRouter) GetResponseSourceCalls(stub func(string) (routing.MessageSource, error)) { fake.getResponseSourceMutex.Lock() defer fake.getResponseSourceMutex.Unlock() fake.GetResponseSourceStub = stub @@ -344,27 +359,30 @@ func (fake *FakeRouter) GetResponseSourceArgsForCall(i int) string { return argsForCall.arg1 } -func (fake *FakeRouter) GetResponseSourceReturns(result1 routing.MessageSource) { +func (fake *FakeRouter) GetResponseSourceReturns(result1 routing.MessageSource, result2 error) { fake.getResponseSourceMutex.Lock() defer fake.getResponseSourceMutex.Unlock() fake.GetResponseSourceStub = nil fake.getResponseSourceReturns = struct { result1 routing.MessageSource - }{result1} + result2 error + }{result1, result2} } -func (fake *FakeRouter) GetResponseSourceReturnsOnCall(i int, result1 routing.MessageSource) { +func (fake *FakeRouter) GetResponseSourceReturnsOnCall(i int, result1 routing.MessageSource, result2 error) { fake.getResponseSourceMutex.Lock() defer fake.getResponseSourceMutex.Unlock() fake.GetResponseSourceStub = nil if fake.getResponseSourceReturnsOnCall == nil { fake.getResponseSourceReturnsOnCall = make(map[int]struct { result1 routing.MessageSource + result2 error }) } fake.getResponseSourceReturnsOnCall[i] = struct { result1 routing.MessageSource - }{result1} + result2 error + }{result1, result2} } func (fake *FakeRouter) OnNewParticipant(arg1 routing.ParticipantCallback) { @@ -399,18 +417,17 @@ func (fake *FakeRouter) OnNewParticipantArgsForCall(i int) routing.ParticipantCa return argsForCall.arg1 } -func (fake *FakeRouter) RegisterNode(arg1 *livekit.Node) error { +func (fake *FakeRouter) RegisterNode() error { fake.registerNodeMutex.Lock() ret, specificReturn := fake.registerNodeReturnsOnCall[len(fake.registerNodeArgsForCall)] fake.registerNodeArgsForCall = append(fake.registerNodeArgsForCall, struct { - arg1 *livekit.Node - }{arg1}) + }{}) stub := fake.RegisterNodeStub fakeReturns := fake.registerNodeReturns - fake.recordInvocation("RegisterNode", []interface{}{arg1}) + fake.recordInvocation("RegisterNode", []interface{}{}) fake.registerNodeMutex.Unlock() if stub != nil { - return stub(arg1) + return stub() } if specificReturn { return ret.result1 @@ -424,19 +441,12 @@ func (fake *FakeRouter) RegisterNodeCallCount() int { return len(fake.registerNodeArgsForCall) } -func (fake *FakeRouter) RegisterNodeCalls(stub func(*livekit.Node) error) { +func (fake *FakeRouter) RegisterNodeCalls(stub func() error) { fake.registerNodeMutex.Lock() defer fake.registerNodeMutex.Unlock() fake.RegisterNodeStub = stub } -func (fake *FakeRouter) RegisterNodeArgsForCall(i int) *livekit.Node { - fake.registerNodeMutex.RLock() - defer fake.registerNodeMutex.RUnlock() - argsForCall := fake.registerNodeArgsForCall[i] - return argsForCall.arg1 -} - func (fake *FakeRouter) RegisterNodeReturns(result1 error) { fake.registerNodeMutex.Lock() defer fake.registerNodeMutex.Unlock() @@ -460,17 +470,17 @@ func (fake *FakeRouter) RegisterNodeReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRouter) SetRTCNode(arg1 string, arg2 string) error { - fake.setRTCNodeMutex.Lock() - ret, specificReturn := fake.setRTCNodeReturnsOnCall[len(fake.setRTCNodeArgsForCall)] - fake.setRTCNodeArgsForCall = append(fake.setRTCNodeArgsForCall, struct { +func (fake *FakeRouter) SetParticipantRTCNode(arg1 string, arg2 string) error { + fake.setParticipantRTCNodeMutex.Lock() + ret, specificReturn := fake.setParticipantRTCNodeReturnsOnCall[len(fake.setParticipantRTCNodeArgsForCall)] + fake.setParticipantRTCNodeArgsForCall = append(fake.setParticipantRTCNodeArgsForCall, struct { arg1 string arg2 string }{arg1, arg2}) - stub := fake.SetRTCNodeStub - fakeReturns := fake.setRTCNodeReturns - fake.recordInvocation("SetRTCNode", []interface{}{arg1, arg2}) - fake.setRTCNodeMutex.Unlock() + stub := fake.SetParticipantRTCNodeStub + fakeReturns := fake.setParticipantRTCNodeReturns + fake.recordInvocation("SetParticipantRTCNode", []interface{}{arg1, arg2}) + fake.setParticipantRTCNodeMutex.Unlock() if stub != nil { return stub(arg1, arg2) } @@ -480,44 +490,44 @@ func (fake *FakeRouter) SetRTCNode(arg1 string, arg2 string) error { return fakeReturns.result1 } -func (fake *FakeRouter) SetRTCNodeCallCount() int { - fake.setRTCNodeMutex.RLock() - defer fake.setRTCNodeMutex.RUnlock() - return len(fake.setRTCNodeArgsForCall) +func (fake *FakeRouter) SetParticipantRTCNodeCallCount() int { + fake.setParticipantRTCNodeMutex.RLock() + defer fake.setParticipantRTCNodeMutex.RUnlock() + return len(fake.setParticipantRTCNodeArgsForCall) } -func (fake *FakeRouter) SetRTCNodeCalls(stub func(string, string) error) { - fake.setRTCNodeMutex.Lock() - defer fake.setRTCNodeMutex.Unlock() - fake.SetRTCNodeStub = stub +func (fake *FakeRouter) SetParticipantRTCNodeCalls(stub func(string, string) error) { + fake.setParticipantRTCNodeMutex.Lock() + defer fake.setParticipantRTCNodeMutex.Unlock() + fake.SetParticipantRTCNodeStub = stub } -func (fake *FakeRouter) SetRTCNodeArgsForCall(i int) (string, string) { - fake.setRTCNodeMutex.RLock() - defer fake.setRTCNodeMutex.RUnlock() - argsForCall := fake.setRTCNodeArgsForCall[i] +func (fake *FakeRouter) SetParticipantRTCNodeArgsForCall(i int) (string, string) { + fake.setParticipantRTCNodeMutex.RLock() + defer fake.setParticipantRTCNodeMutex.RUnlock() + argsForCall := fake.setParticipantRTCNodeArgsForCall[i] return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeRouter) SetRTCNodeReturns(result1 error) { - fake.setRTCNodeMutex.Lock() - defer fake.setRTCNodeMutex.Unlock() - fake.SetRTCNodeStub = nil - fake.setRTCNodeReturns = struct { +func (fake *FakeRouter) SetParticipantRTCNodeReturns(result1 error) { + fake.setParticipantRTCNodeMutex.Lock() + defer fake.setParticipantRTCNodeMutex.Unlock() + fake.SetParticipantRTCNodeStub = nil + fake.setParticipantRTCNodeReturns = struct { result1 error }{result1} } -func (fake *FakeRouter) SetRTCNodeReturnsOnCall(i int, result1 error) { - fake.setRTCNodeMutex.Lock() - defer fake.setRTCNodeMutex.Unlock() - fake.SetRTCNodeStub = nil - if fake.setRTCNodeReturnsOnCall == nil { - fake.setRTCNodeReturnsOnCall = make(map[int]struct { +func (fake *FakeRouter) SetParticipantRTCNodeReturnsOnCall(i int, result1 error) { + fake.setParticipantRTCNodeMutex.Lock() + defer fake.setParticipantRTCNodeMutex.Unlock() + fake.SetParticipantRTCNodeStub = nil + if fake.setParticipantRTCNodeReturnsOnCall == nil { + fake.setParticipantRTCNodeReturnsOnCall = make(map[int]struct { result1 error }) } - fake.setRTCNodeReturnsOnCall[i] = struct { + fake.setParticipantRTCNodeReturnsOnCall[i] = struct { result1 error }{result1} } @@ -575,21 +585,20 @@ func (fake *FakeRouter) StartReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRouter) StartParticipant(arg1 string, arg2 string, arg3 string, arg4 string) error { +func (fake *FakeRouter) StartParticipant(arg1 string, arg2 string, arg3 string) error { fake.startParticipantMutex.Lock() ret, specificReturn := fake.startParticipantReturnsOnCall[len(fake.startParticipantArgsForCall)] fake.startParticipantArgsForCall = append(fake.startParticipantArgsForCall, struct { arg1 string arg2 string arg3 string - arg4 string - }{arg1, arg2, arg3, arg4}) + }{arg1, arg2, arg3}) stub := fake.StartParticipantStub fakeReturns := fake.startParticipantReturns - fake.recordInvocation("StartParticipant", []interface{}{arg1, arg2, arg3, arg4}) + fake.recordInvocation("StartParticipant", []interface{}{arg1, arg2, arg3}) fake.startParticipantMutex.Unlock() if stub != nil { - return stub(arg1, arg2, arg3, arg4) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1 @@ -603,17 +612,17 @@ func (fake *FakeRouter) StartParticipantCallCount() int { return len(fake.startParticipantArgsForCall) } -func (fake *FakeRouter) StartParticipantCalls(stub func(string, string, string, string) error) { +func (fake *FakeRouter) StartParticipantCalls(stub func(string, string, string) error) { fake.startParticipantMutex.Lock() defer fake.startParticipantMutex.Unlock() fake.StartParticipantStub = stub } -func (fake *FakeRouter) StartParticipantArgsForCall(i int) (string, string, string, string) { +func (fake *FakeRouter) StartParticipantArgsForCall(i int) (string, string, string) { fake.startParticipantMutex.RLock() defer fake.startParticipantMutex.RUnlock() argsForCall := fake.startParticipantArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeRouter) StartParticipantReturns(result1 error) { @@ -663,6 +672,59 @@ func (fake *FakeRouter) StopCalls(stub func()) { fake.StopStub = stub } +func (fake *FakeRouter) UnregisterNode() error { + fake.unregisterNodeMutex.Lock() + ret, specificReturn := fake.unregisterNodeReturnsOnCall[len(fake.unregisterNodeArgsForCall)] + fake.unregisterNodeArgsForCall = append(fake.unregisterNodeArgsForCall, struct { + }{}) + stub := fake.UnregisterNodeStub + fakeReturns := fake.unregisterNodeReturns + fake.recordInvocation("UnregisterNode", []interface{}{}) + fake.unregisterNodeMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeRouter) UnregisterNodeCallCount() int { + fake.unregisterNodeMutex.RLock() + defer fake.unregisterNodeMutex.RUnlock() + return len(fake.unregisterNodeArgsForCall) +} + +func (fake *FakeRouter) UnregisterNodeCalls(stub func() error) { + fake.unregisterNodeMutex.Lock() + defer fake.unregisterNodeMutex.Unlock() + fake.UnregisterNodeStub = stub +} + +func (fake *FakeRouter) UnregisterNodeReturns(result1 error) { + fake.unregisterNodeMutex.Lock() + defer fake.unregisterNodeMutex.Unlock() + fake.UnregisterNodeStub = nil + fake.unregisterNodeReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeRouter) UnregisterNodeReturnsOnCall(i int, result1 error) { + fake.unregisterNodeMutex.Lock() + defer fake.unregisterNodeMutex.Unlock() + fake.UnregisterNodeStub = nil + if fake.unregisterNodeReturnsOnCall == nil { + fake.unregisterNodeReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.unregisterNodeReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeRouter) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -678,14 +740,16 @@ func (fake *FakeRouter) Invocations() map[string][][]interface{} { defer fake.onNewParticipantMutex.RUnlock() fake.registerNodeMutex.RLock() defer fake.registerNodeMutex.RUnlock() - fake.setRTCNodeMutex.RLock() - defer fake.setRTCNodeMutex.RUnlock() + fake.setParticipantRTCNodeMutex.RLock() + defer fake.setParticipantRTCNodeMutex.RUnlock() fake.startMutex.RLock() defer fake.startMutex.RUnlock() fake.startParticipantMutex.RLock() defer fake.startParticipantMutex.RUnlock() fake.stopMutex.RLock() defer fake.stopMutex.RUnlock() + fake.unregisterNodeMutex.RLock() + defer fake.unregisterNodeMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/service/localroomstore.go b/pkg/service/localroomstore.go index 12b23b5a9..0da693a4b 100644 --- a/pkg/service/localroomstore.go +++ b/pkg/service/localroomstore.go @@ -15,9 +15,6 @@ type LocalRoomStore struct { // map of roomName => roomId roomIds map[string]string lock sync.RWMutex - //CreateRoom(room *livekit.Room) error - //GetRoom(idOrName string) (*livekit.Room, error) - //DeleteRoom(idOrName string) error } func NewLocalRoomStore() *LocalRoomStore { diff --git a/pkg/service/redisroomstore.go b/pkg/service/redisroomstore.go index 6d43c3366..15fcf712f 100644 --- a/pkg/service/redisroomstore.go +++ b/pkg/service/redisroomstore.go @@ -1 +1,101 @@ package service + +import ( + "context" + + "github.com/go-redis/redis/v8" + "google.golang.org/protobuf/proto" + + "github.com/livekit/livekit-server/proto/livekit" +) + +const ( + // hash of room_name => Room proto + RoomsKey = "rooms" + + // hash of room_id => room name + RoomIdMap = "room_id_map" +) + +type RedisRoomStore struct { + rc *redis.Client + ctx context.Context +} + +func NewRedisRoomStore(rc *redis.Client) *RedisRoomStore { + return &RedisRoomStore{ + rc: rc, + } +} + +func (p *RedisRoomStore) CreateRoom(room *livekit.Room) error { + err := p.rc.HSet(p.ctx, RoomIdMap, room.Sid, room.Name).Err() + if err != nil { + return err + } + + data, err := proto.Marshal(room) + if err != nil { + return err + } + return p.rc.HSet(p.ctx, RoomsKey, room.Name, data).Err() +} + +func (p *RedisRoomStore) GetRoom(idOrName string) (*livekit.Room, error) { + // see if matches any ids + name, err := p.rc.HGet(p.ctx, RoomIdMap, idOrName).Result() + if err != nil { + name = idOrName + } + + data, err := p.rc.HGet(p.ctx, RoomsKey, name).Result() + if err != nil { + if err == redis.Nil { + err = ErrRoomNotFound + } + return nil, err + } + + room := livekit.Room{} + err = proto.Unmarshal([]byte(data), &room) + if err != nil { + return nil, err + } + + return &room, nil +} + +func (p *RedisRoomStore) ListRooms() ([]*livekit.Room, error) { + items, err := p.rc.HVals(p.ctx, RoomsKey).Result() + if err != nil && err != redis.Nil { + return nil, err + } + + rooms := make([]*livekit.Room, 0, len(items)) + + for _, item := range items { + room := livekit.Room{} + err := proto.Unmarshal([]byte(item), &room) + if err != nil { + return nil, err + } + rooms = append(rooms, &room) + } + return rooms, nil +} + +func (p *RedisRoomStore) DeleteRoom(idOrName string) error { + room, err := p.GetRoom(idOrName) + if err == ErrRoomNotFound { + return nil + } else if err != nil { + return err + } + + err = p.rc.HDel(p.ctx, RoomIdMap, room.Sid).Err() + err2 := p.rc.HDel(p.ctx, RoomsKey, room.Name).Err() + if err == nil { + err = err2 + } + return err +} diff --git a/pkg/service/rtcrunner.go b/pkg/service/rtcrunner.go index 0bb09ad73..a92220901 100644 --- a/pkg/service/rtcrunner.go +++ b/pkg/service/rtcrunner.go @@ -61,7 +61,7 @@ func (r *RTCRunner) StartSession(roomName, participantId, participantName string } // register participant to be on this server - if err = r.router.SetRTCNode(participantId, r.currentNode.Id); err != nil { + if err = r.router.SetParticipantRTCNode(participantId, r.currentNode.Id); err != nil { logger.Errorw("could not set RTC node", "error", err) return } diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 61590b9d8..48f6b3ab8 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -79,9 +79,10 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { sigConn := NewWSSignalConnection(conn) participantId := utils.NewGuid(utils.ParticipantPrefix) - err = s.router.StartParticipant(roomName, participantId, pName, s.currentNode.Id) + err = s.router.StartParticipant(roomName, participantId, pName) if err != nil { handleError(w, http.StatusInternalServerError, "could not set signal node: "+err.Error()) + return } logger.Infow("new client connected", @@ -90,8 +91,16 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { "name", pName, ) - reqSink := s.router.GetRequestSink(participantId) - resSource := s.router.GetResponseSource(participantId) + reqSink, err := s.router.GetRequestSink(participantId) + if err != nil { + handleError(w, http.StatusInternalServerError, "could not get request sink"+err.Error()) + return + } + resSource, err := s.router.GetResponseSource(participantId) + if err != nil { + handleError(w, http.StatusInternalServerError, "could not get response source"+err.Error()) + return + } go func() { for { diff --git a/pkg/service/server.go b/pkg/service/server.go index fec55d311..65770fd5e 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -18,13 +18,14 @@ import ( ) type LivekitServer struct { - config *config.Config - roomServer livekit.TwirpServer - rtcService *RTCService - httpServer *http.Server - router routing.Router - running bool - doneChan chan bool + config *config.Config + roomServer livekit.TwirpServer + rtcService *RTCService + httpServer *http.Server + router routing.Router + currentNode routing.LocalNode + running bool + doneChan chan bool } func NewLivekitServer(conf *config.Config, @@ -33,12 +34,14 @@ func NewLivekitServer(conf *config.Config, keyProvider auth.KeyProvider, router routing.Router, runner *RTCRunner, + currentNode routing.LocalNode, ) (s *LivekitServer, err error) { s = &LivekitServer{ - config: conf, - roomServer: livekit.NewRoomServiceServer(roomService), - rtcService: rtcService, - router: router, + config: conf, + roomServer: livekit.NewRoomServiceServer(roomService), + rtcService: rtcService, + router: router, + currentNode: currentNode, } middlewares := []negroni.Handler{ @@ -73,6 +76,11 @@ func (s *LivekitServer) Start() error { return errors.New("already running") } + if err := s.router.RegisterNode(); err != nil { + return err + } + defer s.router.UnregisterNode() + if err := s.router.Start(); err != nil { return err } @@ -86,7 +94,8 @@ func (s *LivekitServer) Start() error { } go func() { - logger.Infow("starting LiveKit server", "address", s.httpServer.Addr) + logger.Infow("starting LiveKit server", "address", s.httpServer.Addr, + "nodeId", s.currentNode.Id) s.httpServer.Serve(ln) }() diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 575fd093c..c2e13f383 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -27,7 +27,7 @@ func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, roomSto return nil, err } rtcRunner := NewRTCRunner(roomStore, router, currentNode, webRTCConfig) - livekitServer, err := NewLivekitServer(conf, roomService, rtcService, keyProvider, router, rtcRunner) + livekitServer, err := NewLivekitServer(conf, roomService, rtcService, keyProvider, router, rtcRunner, currentNode) if err != nil { return nil, err } diff --git a/pkg/utils/cachedredis.go b/pkg/utils/cachedredis.go new file mode 100644 index 000000000..30b386b99 --- /dev/null +++ b/pkg/utils/cachedredis.go @@ -0,0 +1,63 @@ +package utils + +import ( + "context" + "time" + + "github.com/go-redis/redis/v8" + "github.com/karlseguin/ccache/v2" +) + +const ( + defaultCacheTTL = time.Minute +) + +type CachedRedis struct { + ctx context.Context + rc *redis.Client + cache *ccache.Cache +} + +func NewCachedRedis(ctx context.Context, rc *redis.Client) *CachedRedis { + return &CachedRedis{ + ctx: ctx, + rc: rc, + cache: ccache.New(ccache.Configure()), + } +} + +func (r *CachedRedis) CachedHGet(key, hashKey string) (string, error) { + cacheKey := key + ":" + hashKey + item := r.cache.Get(cacheKey) + if item != nil && !item.Expired() { + return item.Value().(string), nil + } + val, err := r.rc.HGet(r.ctx, key, hashKey).Result() + if err != nil { + return "", err + } + r.cache.Set(key, val, defaultCacheTTL) + return val, nil +} + +func (r *CachedRedis) CachedGet(key string) (string, error) { + item := r.cache.Get(key) + if item != nil && !item.Expired() { + return item.Value().(string), nil + } + val, err := r.rc.Get(r.ctx, key).Result() + if err != nil { + return "", err + } + + r.cache.Set(key, val, defaultCacheTTL) + return val, nil +} + +func (r *CachedRedis) ExpireHash(key, hashKey string) { + r.cache.Delete(key + ":" + hashKey) +} + +func (r *CachedRedis) Expire(key string) { + r.cache.Delete(key) +} diff --git a/proto/internal.proto b/proto/internal.proto new file mode 100644 index 000000000..46f587604 --- /dev/null +++ b/proto/internal.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; + +package livekit; +option go_package = "github.com/livekit/livekit-server/proto/livekit"; + +// internal protos, not exposed to clients +import "rtc.proto"; + +message Node { + string id = 1; + string ip = 2; + uint32 num_cpus = 3; + NodeStats stats = 4; +} + +message NodeStats { + int32 num_rooms = 1; + int32 num_clients = 2; +} + +// message for a node through the router +message RouterMessage { + oneof message { + StartSession start_session = 1; + SignalRequest request = 2; + SignalResponse response = 3; + EndSession end_session = 4; + } + string participant_id = 5; +} + +message StartSession { + string room_name = 1; + string participant_name = 2; +} + +message EndSession { +} \ No newline at end of file diff --git a/proto/livekit/internal.pb.go b/proto/livekit/internal.pb.go new file mode 100644 index 000000000..dc6f1b800 --- /dev/null +++ b/proto/livekit/internal.pb.go @@ -0,0 +1,539 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.14.0 +// source: internal.proto + +package livekit + +import ( + proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +type Node struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Ip string `protobuf:"bytes,2,opt,name=ip,proto3" json:"ip,omitempty"` + NumCpus uint32 `protobuf:"varint,3,opt,name=num_cpus,json=numCpus,proto3" json:"num_cpus,omitempty"` + Stats *NodeStats `protobuf:"bytes,4,opt,name=stats,proto3" json:"stats,omitempty"` +} + +func (x *Node) Reset() { + *x = Node{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Node) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Node) ProtoMessage() {} + +func (x *Node) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Node.ProtoReflect.Descriptor instead. +func (*Node) Descriptor() ([]byte, []int) { + return file_internal_proto_rawDescGZIP(), []int{0} +} + +func (x *Node) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *Node) GetIp() string { + if x != nil { + return x.Ip + } + return "" +} + +func (x *Node) GetNumCpus() uint32 { + if x != nil { + return x.NumCpus + } + return 0 +} + +func (x *Node) GetStats() *NodeStats { + if x != nil { + return x.Stats + } + return nil +} + +type NodeStats struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + NumRooms int32 `protobuf:"varint,1,opt,name=num_rooms,json=numRooms,proto3" json:"num_rooms,omitempty"` + NumClients int32 `protobuf:"varint,2,opt,name=num_clients,json=numClients,proto3" json:"num_clients,omitempty"` +} + +func (x *NodeStats) Reset() { + *x = NodeStats{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *NodeStats) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NodeStats) ProtoMessage() {} + +func (x *NodeStats) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NodeStats.ProtoReflect.Descriptor instead. +func (*NodeStats) Descriptor() ([]byte, []int) { + return file_internal_proto_rawDescGZIP(), []int{1} +} + +func (x *NodeStats) GetNumRooms() int32 { + if x != nil { + return x.NumRooms + } + return 0 +} + +func (x *NodeStats) GetNumClients() int32 { + if x != nil { + return x.NumClients + } + return 0 +} + +// message for a node through the router +type RouterMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Message: + // *RouterMessage_StartSession + // *RouterMessage_Request + // *RouterMessage_Response + // *RouterMessage_EndSession + Message isRouterMessage_Message `protobuf_oneof:"message"` + ParticipantId string `protobuf:"bytes,5,opt,name=participant_id,json=participantId,proto3" json:"participant_id,omitempty"` +} + +func (x *RouterMessage) Reset() { + *x = RouterMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RouterMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RouterMessage) ProtoMessage() {} + +func (x *RouterMessage) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RouterMessage.ProtoReflect.Descriptor instead. +func (*RouterMessage) Descriptor() ([]byte, []int) { + return file_internal_proto_rawDescGZIP(), []int{2} +} + +func (m *RouterMessage) GetMessage() isRouterMessage_Message { + if m != nil { + return m.Message + } + return nil +} + +func (x *RouterMessage) GetStartSession() *StartSession { + if x, ok := x.GetMessage().(*RouterMessage_StartSession); ok { + return x.StartSession + } + return nil +} + +func (x *RouterMessage) GetRequest() *SignalRequest { + if x, ok := x.GetMessage().(*RouterMessage_Request); ok { + return x.Request + } + return nil +} + +func (x *RouterMessage) GetResponse() *SignalResponse { + if x, ok := x.GetMessage().(*RouterMessage_Response); ok { + return x.Response + } + return nil +} + +func (x *RouterMessage) GetEndSession() *EndSession { + if x, ok := x.GetMessage().(*RouterMessage_EndSession); ok { + return x.EndSession + } + return nil +} + +func (x *RouterMessage) GetParticipantId() string { + if x != nil { + return x.ParticipantId + } + return "" +} + +type isRouterMessage_Message interface { + isRouterMessage_Message() +} + +type RouterMessage_StartSession struct { + StartSession *StartSession `protobuf:"bytes,1,opt,name=start_session,json=startSession,proto3,oneof"` +} + +type RouterMessage_Request struct { + Request *SignalRequest `protobuf:"bytes,2,opt,name=request,proto3,oneof"` +} + +type RouterMessage_Response struct { + Response *SignalResponse `protobuf:"bytes,3,opt,name=response,proto3,oneof"` +} + +type RouterMessage_EndSession struct { + EndSession *EndSession `protobuf:"bytes,4,opt,name=end_session,json=endSession,proto3,oneof"` +} + +func (*RouterMessage_StartSession) isRouterMessage_Message() {} + +func (*RouterMessage_Request) isRouterMessage_Message() {} + +func (*RouterMessage_Response) isRouterMessage_Message() {} + +func (*RouterMessage_EndSession) isRouterMessage_Message() {} + +type StartSession struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RoomName string `protobuf:"bytes,1,opt,name=room_name,json=roomName,proto3" json:"room_name,omitempty"` + ParticipantName string `protobuf:"bytes,2,opt,name=participant_name,json=participantName,proto3" json:"participant_name,omitempty"` +} + +func (x *StartSession) Reset() { + *x = StartSession{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StartSession) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StartSession) ProtoMessage() {} + +func (x *StartSession) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StartSession.ProtoReflect.Descriptor instead. +func (*StartSession) Descriptor() ([]byte, []int) { + return file_internal_proto_rawDescGZIP(), []int{3} +} + +func (x *StartSession) GetRoomName() string { + if x != nil { + return x.RoomName + } + return "" +} + +func (x *StartSession) GetParticipantName() string { + if x != nil { + return x.ParticipantName + } + return "" +} + +type EndSession struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *EndSession) Reset() { + *x = EndSession{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EndSession) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EndSession) ProtoMessage() {} + +func (x *EndSession) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EndSession.ProtoReflect.Descriptor instead. +func (*EndSession) Descriptor() ([]byte, []int) { + return file_internal_proto_rawDescGZIP(), []int{4} +} + +var File_internal_proto protoreflect.FileDescriptor + +var file_internal_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x07, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x1a, 0x09, 0x72, 0x74, 0x63, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x6b, 0x0a, 0x04, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x0e, 0x0a, 0x02, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x0e, 0x0a, 0x02, + 0x69, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x12, 0x19, 0x0a, 0x08, + 0x6e, 0x75, 0x6d, 0x5f, 0x63, 0x70, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, + 0x6e, 0x75, 0x6d, 0x43, 0x70, 0x75, 0x73, 0x12, 0x28, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, + 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, + 0x73, 0x22, 0x49, 0x0a, 0x09, 0x4e, 0x6f, 0x64, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x1b, + 0x0a, 0x09, 0x6e, 0x75, 0x6d, 0x5f, 0x72, 0x6f, 0x6f, 0x6d, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x08, 0x6e, 0x75, 0x6d, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x6e, + 0x75, 0x6d, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, + 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x22, 0xa2, 0x02, 0x0a, + 0x0d, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x3c, + 0x0a, 0x0d, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, + 0x53, 0x74, 0x61, 0x72, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x48, 0x00, 0x52, 0x0c, + 0x73, 0x74, 0x61, 0x72, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x32, 0x0a, 0x07, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, + 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x35, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x53, 0x69, 0x67, + 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x08, 0x72, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x0b, 0x65, 0x6e, 0x64, 0x5f, 0x73, + 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6c, + 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x45, 0x6e, 0x64, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x48, 0x00, 0x52, 0x0a, 0x65, 0x6e, 0x64, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, + 0x25, 0x0a, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x5f, 0x69, + 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, + 0x70, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x22, 0x56, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x72, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x12, 0x1b, 0x0a, 0x09, 0x72, 0x6f, 0x6f, 0x6d, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x6f, 0x6f, 0x6d, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x29, + 0x0a, 0x10, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x5f, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, + 0x69, 0x70, 0x61, 0x6e, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x0c, 0x0a, 0x0a, 0x45, 0x6e, 0x64, + 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, + 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_internal_proto_rawDescOnce sync.Once + file_internal_proto_rawDescData = file_internal_proto_rawDesc +) + +func file_internal_proto_rawDescGZIP() []byte { + file_internal_proto_rawDescOnce.Do(func() { + file_internal_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_proto_rawDescData) + }) + return file_internal_proto_rawDescData +} + +var file_internal_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_internal_proto_goTypes = []interface{}{ + (*Node)(nil), // 0: livekit.Node + (*NodeStats)(nil), // 1: livekit.NodeStats + (*RouterMessage)(nil), // 2: livekit.RouterMessage + (*StartSession)(nil), // 3: livekit.StartSession + (*EndSession)(nil), // 4: livekit.EndSession + (*SignalRequest)(nil), // 5: livekit.SignalRequest + (*SignalResponse)(nil), // 6: livekit.SignalResponse +} +var file_internal_proto_depIdxs = []int32{ + 1, // 0: livekit.Node.stats:type_name -> livekit.NodeStats + 3, // 1: livekit.RouterMessage.start_session:type_name -> livekit.StartSession + 5, // 2: livekit.RouterMessage.request:type_name -> livekit.SignalRequest + 6, // 3: livekit.RouterMessage.response:type_name -> livekit.SignalResponse + 4, // 4: livekit.RouterMessage.end_session:type_name -> livekit.EndSession + 5, // [5:5] is the sub-list for method output_type + 5, // [5:5] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name +} + +func init() { file_internal_proto_init() } +func file_internal_proto_init() { + if File_internal_proto != nil { + return + } + file_rtc_proto_init() + if !protoimpl.UnsafeEnabled { + file_internal_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Node); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*NodeStats); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RouterMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StartSession); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*EndSession); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_internal_proto_msgTypes[2].OneofWrappers = []interface{}{ + (*RouterMessage_StartSession)(nil), + (*RouterMessage_Request)(nil), + (*RouterMessage_Response)(nil), + (*RouterMessage_EndSession)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_internal_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_internal_proto_goTypes, + DependencyIndexes: file_internal_proto_depIdxs, + MessageInfos: file_internal_proto_msgTypes, + }.Build() + File_internal_proto = out.File + file_internal_proto_rawDesc = nil + file_internal_proto_goTypes = nil + file_internal_proto_depIdxs = nil +} diff --git a/proto/livekit/model.pb.go b/proto/livekit/model.pb.go index 6ad1e2a3a..d25da135d 100644 --- a/proto/livekit/model.pb.go +++ b/proto/livekit/model.pb.go @@ -127,136 +127,9 @@ func (x ParticipantInfo_State) Number() protoreflect.EnumNumber { // Deprecated: Use ParticipantInfo_State.Descriptor instead. func (ParticipantInfo_State) EnumDescriptor() ([]byte, []int) { - return file_model_proto_rawDescGZIP(), []int{3, 0} + return file_model_proto_rawDescGZIP(), []int{1, 0} } -type Node struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - Ip string `protobuf:"bytes,2,opt,name=ip,proto3" json:"ip,omitempty"` - NumCpus uint32 `protobuf:"varint,3,opt,name=num_cpus,json=numCpus,proto3" json:"num_cpus,omitempty"` - Stats *NodeStats `protobuf:"bytes,4,opt,name=stats,proto3" json:"stats,omitempty"` -} - -func (x *Node) Reset() { - *x = Node{} - if protoimpl.UnsafeEnabled { - mi := &file_model_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Node) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Node) ProtoMessage() {} - -func (x *Node) ProtoReflect() protoreflect.Message { - mi := &file_model_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Node.ProtoReflect.Descriptor instead. -func (*Node) Descriptor() ([]byte, []int) { - return file_model_proto_rawDescGZIP(), []int{0} -} - -func (x *Node) GetId() string { - if x != nil { - return x.Id - } - return "" -} - -func (x *Node) GetIp() string { - if x != nil { - return x.Ip - } - return "" -} - -func (x *Node) GetNumCpus() uint32 { - if x != nil { - return x.NumCpus - } - return 0 -} - -func (x *Node) GetStats() *NodeStats { - if x != nil { - return x.Stats - } - return nil -} - -type NodeStats struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - NumRooms int32 `protobuf:"varint,1,opt,name=num_rooms,json=numRooms,proto3" json:"num_rooms,omitempty"` - NumClients int32 `protobuf:"varint,2,opt,name=num_clients,json=numClients,proto3" json:"num_clients,omitempty"` -} - -func (x *NodeStats) Reset() { - *x = NodeStats{} - if protoimpl.UnsafeEnabled { - mi := &file_model_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *NodeStats) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*NodeStats) ProtoMessage() {} - -func (x *NodeStats) ProtoReflect() protoreflect.Message { - mi := &file_model_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use NodeStats.ProtoReflect.Descriptor instead. -func (*NodeStats) Descriptor() ([]byte, []int) { - return file_model_proto_rawDescGZIP(), []int{1} -} - -func (x *NodeStats) GetNumRooms() int32 { - if x != nil { - return x.NumRooms - } - return 0 -} - -func (x *NodeStats) GetNumClients() int32 { - if x != nil { - return x.NumClients - } - return 0 -} - -// internal type, for serialization with proto type Room struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -272,7 +145,7 @@ type Room struct { func (x *Room) Reset() { *x = Room{} if protoimpl.UnsafeEnabled { - mi := &file_model_proto_msgTypes[2] + mi := &file_model_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -285,7 +158,7 @@ func (x *Room) String() string { func (*Room) ProtoMessage() {} func (x *Room) ProtoReflect() protoreflect.Message { - mi := &file_model_proto_msgTypes[2] + mi := &file_model_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -298,7 +171,7 @@ func (x *Room) ProtoReflect() protoreflect.Message { // Deprecated: Use Room.ProtoReflect.Descriptor instead. func (*Room) Descriptor() ([]byte, []int) { - return file_model_proto_rawDescGZIP(), []int{2} + return file_model_proto_rawDescGZIP(), []int{0} } func (x *Room) GetSid() string { @@ -350,7 +223,7 @@ type ParticipantInfo struct { func (x *ParticipantInfo) Reset() { *x = ParticipantInfo{} if protoimpl.UnsafeEnabled { - mi := &file_model_proto_msgTypes[3] + mi := &file_model_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -363,7 +236,7 @@ func (x *ParticipantInfo) String() string { func (*ParticipantInfo) ProtoMessage() {} func (x *ParticipantInfo) ProtoReflect() protoreflect.Message { - mi := &file_model_proto_msgTypes[3] + mi := &file_model_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -376,7 +249,7 @@ func (x *ParticipantInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use ParticipantInfo.ProtoReflect.Descriptor instead. func (*ParticipantInfo) Descriptor() ([]byte, []int) { - return file_model_proto_rawDescGZIP(), []int{3} + return file_model_proto_rawDescGZIP(), []int{1} } func (x *ParticipantInfo) GetSid() string { @@ -407,7 +280,6 @@ func (x *ParticipantInfo) GetTracks() []*TrackInfo { return nil } -// describing type TrackInfo struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -422,7 +294,7 @@ type TrackInfo struct { func (x *TrackInfo) Reset() { *x = TrackInfo{} if protoimpl.UnsafeEnabled { - mi := &file_model_proto_msgTypes[4] + mi := &file_model_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -435,7 +307,7 @@ func (x *TrackInfo) String() string { func (*TrackInfo) ProtoMessage() {} func (x *TrackInfo) ProtoReflect() protoreflect.Message { - mi := &file_model_proto_msgTypes[4] + mi := &file_model_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -448,7 +320,7 @@ func (x *TrackInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use TrackInfo.ProtoReflect.Descriptor instead. func (*TrackInfo) Descriptor() ([]byte, []int) { - return file_model_proto_rawDescGZIP(), []int{4} + return file_model_proto_rawDescGZIP(), []int{2} } func (x *TrackInfo) GetSid() string { @@ -493,7 +365,7 @@ type DataMessage struct { func (x *DataMessage) Reset() { *x = DataMessage{} if protoimpl.UnsafeEnabled { - mi := &file_model_proto_msgTypes[5] + mi := &file_model_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -506,7 +378,7 @@ func (x *DataMessage) String() string { func (*DataMessage) ProtoMessage() {} func (x *DataMessage) ProtoReflect() protoreflect.Message { - mi := &file_model_proto_msgTypes[5] + mi := &file_model_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -519,7 +391,7 @@ func (x *DataMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use DataMessage.ProtoReflect.Descriptor instead. func (*DataMessage) Descriptor() ([]byte, []int) { - return file_model_proto_rawDescGZIP(), []int{5} + return file_model_proto_rawDescGZIP(), []int{3} } func (m *DataMessage) GetValue() isDataMessage_Value { @@ -563,60 +435,49 @@ var File_model_proto protoreflect.FileDescriptor var file_model_proto_rawDesc = []byte{ 0x0a, 0x0b, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6c, - 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x22, 0x6b, 0x0a, 0x04, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x0e, - 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x0e, - 0x0a, 0x02, 0x69, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x12, 0x19, - 0x0a, 0x08, 0x6e, 0x75, 0x6d, 0x5f, 0x63, 0x70, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, - 0x52, 0x07, 0x6e, 0x75, 0x6d, 0x43, 0x70, 0x75, 0x73, 0x12, 0x28, 0x0a, 0x05, 0x73, 0x74, 0x61, - 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, - 0x69, 0x74, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x05, 0x73, 0x74, - 0x61, 0x74, 0x73, 0x22, 0x49, 0x0a, 0x09, 0x4e, 0x6f, 0x64, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, - 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x75, 0x6d, 0x5f, 0x72, 0x6f, 0x6f, 0x6d, 0x73, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x05, 0x52, 0x08, 0x6e, 0x75, 0x6d, 0x52, 0x6f, 0x6f, 0x6d, 0x73, 0x12, 0x1f, 0x0a, - 0x0b, 0x6e, 0x75, 0x6d, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x22, 0xa1, - 0x01, 0x0a, 0x04, 0x52, 0x6f, 0x6f, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, - 0x0d, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x54, 0x69, 0x6d, 0x65, 0x6f, - 0x75, 0x74, 0x12, 0x29, 0x0a, 0x10, 0x6d, 0x61, 0x78, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, - 0x69, 0x70, 0x61, 0x6e, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x6d, 0x61, - 0x78, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x73, 0x12, 0x23, 0x0a, - 0x0d, 0x63, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x05, - 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x63, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x69, - 0x6d, 0x65, 0x22, 0xd9, 0x01, 0x0a, 0x0f, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, - 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x69, 0x64, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x34, 0x0a, 0x05, - 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1e, 0x2e, 0x6c, 0x69, - 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, - 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, - 0x74, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x73, 0x18, 0x04, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x54, 0x72, 0x61, - 0x63, 0x6b, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x06, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x73, 0x22, 0x3e, - 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x4a, 0x4f, 0x49, 0x4e, 0x49, - 0x4e, 0x47, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x4a, 0x4f, 0x49, 0x4e, 0x45, 0x44, 0x10, 0x01, - 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x02, 0x12, 0x10, 0x0a, 0x0c, - 0x44, 0x49, 0x53, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x45, 0x44, 0x10, 0x03, 0x22, 0x6f, - 0x0a, 0x09, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x10, 0x0a, 0x03, 0x73, - 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x69, 0x64, 0x12, 0x26, 0x0a, - 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x6c, 0x69, - 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x52, - 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6d, 0x75, 0x74, - 0x65, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x6d, 0x75, 0x74, 0x65, 0x64, 0x22, - 0x46, 0x0a, 0x0b, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, - 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, - 0x74, 0x65, 0x78, 0x74, 0x12, 0x18, 0x0a, 0x06, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x06, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x42, 0x07, - 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x2a, 0x2b, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x63, 0x6b, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x41, 0x55, 0x44, 0x49, 0x4f, 0x10, 0x00, 0x12, - 0x09, 0x0a, 0x05, 0x56, 0x49, 0x44, 0x45, 0x4f, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x41, - 0x54, 0x41, 0x10, 0x02, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, - 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, - 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x22, 0xa1, 0x01, 0x0a, 0x04, 0x52, 0x6f, 0x6f, 0x6d, 0x12, + 0x10, 0x0a, 0x03, 0x73, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x69, + 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x5f, 0x74, + 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x65, 0x6d, + 0x70, 0x74, 0x79, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x29, 0x0a, 0x10, 0x6d, 0x61, + 0x78, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x73, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x6d, 0x61, 0x78, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, + 0x70, 0x61, 0x6e, 0x74, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x63, 0x72, + 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xd9, 0x01, 0x0a, 0x0f, 0x50, + 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x10, + 0x0a, 0x03, 0x73, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x69, 0x64, + 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x34, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0e, 0x32, 0x1e, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x50, 0x61, + 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x53, 0x74, + 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x74, 0x72, + 0x61, 0x63, 0x6b, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, 0x69, 0x76, + 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x06, + 0x74, 0x72, 0x61, 0x63, 0x6b, 0x73, 0x22, 0x3e, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, + 0x0b, 0x0a, 0x07, 0x4a, 0x4f, 0x49, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, + 0x4a, 0x4f, 0x49, 0x4e, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x43, 0x54, 0x49, + 0x56, 0x45, 0x10, 0x02, 0x12, 0x10, 0x0a, 0x0c, 0x44, 0x49, 0x53, 0x43, 0x4f, 0x4e, 0x4e, 0x45, + 0x43, 0x54, 0x45, 0x44, 0x10, 0x03, 0x22, 0x6f, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x49, + 0x6e, 0x66, 0x6f, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x03, 0x73, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x54, 0x72, + 0x61, 0x63, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, + 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6d, 0x75, 0x74, 0x65, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, + 0x52, 0x05, 0x6d, 0x75, 0x74, 0x65, 0x64, 0x22, 0x46, 0x0a, 0x0b, 0x44, 0x61, 0x74, 0x61, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x18, 0x0a, 0x06, + 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x06, + 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x42, 0x07, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x2a, + 0x2b, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, 0x0a, 0x05, + 0x41, 0x55, 0x44, 0x49, 0x4f, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x56, 0x49, 0x44, 0x45, 0x4f, + 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x41, 0x54, 0x41, 0x10, 0x02, 0x42, 0x31, 0x5a, 0x2f, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, + 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -632,27 +493,24 @@ func file_model_proto_rawDescGZIP() []byte { } var file_model_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_model_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_model_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_model_proto_goTypes = []interface{}{ (TrackType)(0), // 0: livekit.TrackType (ParticipantInfo_State)(0), // 1: livekit.ParticipantInfo.State - (*Node)(nil), // 2: livekit.Node - (*NodeStats)(nil), // 3: livekit.NodeStats - (*Room)(nil), // 4: livekit.Room - (*ParticipantInfo)(nil), // 5: livekit.ParticipantInfo - (*TrackInfo)(nil), // 6: livekit.TrackInfo - (*DataMessage)(nil), // 7: livekit.DataMessage + (*Room)(nil), // 2: livekit.Room + (*ParticipantInfo)(nil), // 3: livekit.ParticipantInfo + (*TrackInfo)(nil), // 4: livekit.TrackInfo + (*DataMessage)(nil), // 5: livekit.DataMessage } var file_model_proto_depIdxs = []int32{ - 3, // 0: livekit.Node.stats:type_name -> livekit.NodeStats - 1, // 1: livekit.ParticipantInfo.state:type_name -> livekit.ParticipantInfo.State - 6, // 2: livekit.ParticipantInfo.tracks:type_name -> livekit.TrackInfo - 0, // 3: livekit.TrackInfo.type:type_name -> livekit.TrackType - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 1, // 0: livekit.ParticipantInfo.state:type_name -> livekit.ParticipantInfo.State + 4, // 1: livekit.ParticipantInfo.tracks:type_name -> livekit.TrackInfo + 0, // 2: livekit.TrackInfo.type:type_name -> livekit.TrackType + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_model_proto_init() } @@ -662,30 +520,6 @@ func file_model_proto_init() { } if !protoimpl.UnsafeEnabled { file_model_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Node); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_model_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*NodeStats); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_model_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Room); i { case 0: return &v.state @@ -697,7 +531,7 @@ func file_model_proto_init() { return nil } } - file_model_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + file_model_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ParticipantInfo); i { case 0: return &v.state @@ -709,7 +543,7 @@ func file_model_proto_init() { return nil } } - file_model_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + file_model_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*TrackInfo); i { case 0: return &v.state @@ -721,7 +555,7 @@ func file_model_proto_init() { return nil } } - file_model_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + file_model_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DataMessage); i { case 0: return &v.state @@ -734,7 +568,7 @@ func file_model_proto_init() { } } } - file_model_proto_msgTypes[5].OneofWrappers = []interface{}{ + file_model_proto_msgTypes[3].OneofWrappers = []interface{}{ (*DataMessage_Text)(nil), (*DataMessage_Binary)(nil), } @@ -744,7 +578,7 @@ func file_model_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_model_proto_rawDesc, NumEnums: 2, - NumMessages: 6, + NumMessages: 4, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/livekit/room.twirp.go b/proto/livekit/room.twirp.go index 9f44a1be7..f9dc4b360 100644 --- a/proto/livekit/room.twirp.go +++ b/proto/livekit/room.twirp.go @@ -7,7 +7,6 @@ This code was generated with github.com/twitchtv/twirp/protoc-gen-twirp v7.1.0. It is generated from these files: room.proto - model.proto */ package livekit diff --git a/proto/model.proto b/proto/model.proto index 0e07c563d..0252092cd 100644 --- a/proto/model.proto +++ b/proto/model.proto @@ -3,19 +3,6 @@ syntax = "proto3"; package livekit; option go_package = "github.com/livekit/livekit-server/proto/livekit"; -message Node { - string id = 1; - string ip = 2; - uint32 num_cpus = 3; - NodeStats stats = 4; -} - -message NodeStats { - int32 num_rooms = 1; - int32 num_clients = 2; -} - -// internal type, for serialization with proto message Room { string sid = 1; string name = 2; @@ -47,7 +34,6 @@ enum TrackType { DATA = 2; } -// describing message TrackInfo { string sid = 1; TrackType type = 2;