From 94aec3b98d2c2bbba55393c011675d3811cc0fa2 Mon Sep 17 00:00:00 2001 From: Mathew Kamkar <578302+matkam@users.noreply.github.com> Date: Tue, 9 Nov 2021 17:19:46 -0800 Subject: [PATCH] Node updates stats with KeepAlive message to self (#177) * node sends KeepAlive message to self * use WriteRTCNodeMessage instead of participants[0] --- go.mod | 11 +++- go.sum | 29 +++++---- pkg/routing/interfaces.go | 1 + pkg/routing/localrouter.go | 15 ++++- pkg/routing/redisrouter.go | 34 ++++++++--- pkg/routing/routingfakes/fake_router.go | 78 +++++++++++++++++++++++++ pkg/service/roomservice.go | 55 ++++++++--------- 7 files changed, 168 insertions(+), 55 deletions(-) diff --git a/go.mod b/go.mod index 634910061..d40ce392e 100644 --- a/go.mod +++ b/go.mod @@ -5,16 +5,18 @@ go 1.15 require ( github.com/bep/debounce v1.2.0 github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8 + github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/gammazero/deque v0.1.0 github.com/gammazero/workerpool v1.1.2 github.com/go-logr/logr v1.2.0 github.com/go-logr/zapr v1.1.0 github.com/go-redis/redis/v8 v8.11.3 + github.com/google/subcommands v1.2.0 // indirect github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.10.1-0.20211109000312-b3847c8d35ff + github.com/livekit/protocol v0.10.1-0.20211110010924-91c31dabb096 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 @@ -31,13 +33,18 @@ require ( github.com/pion/webrtc/v3 v3.1.5 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 - github.com/rs/zerolog v1.25.0 + github.com/rs/zerolog v1.26.0 + github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/stretchr/testify v1.7.0 github.com/thoas/go-funk v0.8.0 github.com/twitchtv/twirp v8.1.0+incompatible github.com/urfave/cli/v2 v2.3.0 github.com/urfave/negroni v1.0.0 go.uber.org/zap v1.19.1 + golang.org/x/mod v0.5.1 // indirect + golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect google.golang.org/protobuf v1.27.1 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) + +replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.20.16 diff --git a/go.sum b/go.sum index ea57bb8e7..3ef3e6008 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,9 @@ github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8/go.mod h1:uEyr4WpAH github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM= +github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -75,8 +76,9 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/subcommands v1.0.1 h1:/eqq+otEXm5vhfBrbREPCSVQbvofip6kIz+mX5TUH7k= github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= +github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE= +github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -104,8 +106,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.10.1-0.20211109000312-b3847c8d35ff h1:21SZ2sh5e7ELCVdXT01hlpdSZyNlwDv6KTOlcplBrQ8= -github.com/livekit/protocol v0.10.1-0.20211109000312-b3847c8d35ff/go.mod h1:7ir9zSlgnrPQoGGNv4f8U/c9QrWh+ogC9B5xVbJNedM= +github.com/livekit/protocol v0.10.1-0.20211110010924-91c31dabb096 h1:/WVPy3AjPO5xf/pKWaoChcuEK3x2JwHbCujXnt/1oqc= +github.com/livekit/protocol v0.10.1-0.20211110010924-91c31dabb096/go.mod h1:7ir9zSlgnrPQoGGNv4f8U/c9QrWh+ogC9B5xVbJNedM= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= @@ -205,14 +207,14 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/zerolog v1.25.0 h1:Rj7XygbUHKUlDPcVdoLyR91fJBsduXj5fRxyqIQj/II= -github.com/rs/zerolog v1.25.0/go.mod h1:7KHcEGe0QZPOm2IE4Kpb5rTh6n1h2hIgS5OOnu1rUaI= -github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= +github.com/rs/zerolog v1.26.0 h1:ORM4ibhEZeTeQlCojCK2kPz1ogAY4bGs4tD+SaAdGaE= +github.com/rs/zerolog v1.26.0/go.mod h1:yBiM87lvSqX8h0Ww4sdzNSkVYZ8dL2xjZJG1lAuGZEo= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/sclevine/spec v1.4.0 h1:z/Q9idDcay5m5irkZ28M7PtQM4aOISzOpj4bUPkDee8= github.com/sclevine/spec v1.4.0/go.mod h1:LvpgJaFyvQzRvc1kaDs0bulYwzC70PbiYjC4QnFHkOM= -github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -236,6 +238,7 @@ github.com/urfave/negroni v1.0.0 h1:kIimOitoypq34K7TG7DUaJ9kq/N4Ofuwi1sjz0KipXc= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= @@ -257,8 +260,9 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38= +golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -312,8 +316,10 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -327,8 +333,9 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201023174141-c8cfbd0f21e6/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.7 h1:6j8CgantCy3yc8JGBqkDLMKWqZ0RDU2g1HVgacojGWQ= +golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 2fe71929a..550384e7f 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -60,6 +60,7 @@ type Router interface { // WriteRTCMessage sends a message to the RTC node WriteRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error + WriteRTCNodeMessage(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error // OnNewParticipantRTC is called to start a new participant's RTC connection OnNewParticipantRTC(callback NewParticipantCallback) diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index 53972c417..20de1e2c5 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -115,12 +115,21 @@ func (r *LocalRouter) WriteRTCMessage(ctx context.Context, roomName, identity st // create a new one r.rtcMessageChan = NewMessageChannel() } - return r.writeRTCMessage(roomName, identity, msg, r.rtcMessageChan) + msg.ParticipantKey = participantKey(roomName, identity) + return r.writeRTCMessage(r.rtcMessageChan, msg) } -func (r *LocalRouter) writeRTCMessage(roomName, identity string, msg *livekit.RTCNodeMessage, sink MessageSink) error { +func (r *LocalRouter) WriteRTCNodeMessage(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error { + if r.rtcMessageChan.IsClosed() { + // create a new one + r.rtcMessageChan = NewMessageChannel() + } + return r.writeRTCMessage(r.rtcMessageChan, msg) +} + +func (r *LocalRouter) writeRTCMessage(sink MessageSink, msg *livekit.RTCNodeMessage) error { defer sink.Close() - msg.ParticipantKey = participantKey(roomName, identity) + msg.SenderTime = time.Now().Unix() return sink.WriteMessage(msg) } diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 3ad0e1816..e8ef88450 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -176,7 +176,13 @@ func (r *RedisRouter) WriteRTCMessage(ctx context.Context, roomName, identity st } rtcSink := NewRTCNodeSink(r.rc, rtcNode, pkey) - return r.writeRTCMessage(roomName, identity, msg, rtcSink) + msg.ParticipantKey = participantKey(roomName, identity) + return r.writeRTCMessage(rtcSink, msg) +} + +func (r *RedisRouter) WriteRTCNodeMessage(ctx context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error { + rtcSink := NewRTCNodeSink(r.rc, rtcNodeID, "") + return r.writeRTCMessage(rtcSink, msg) } func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantKey string) error { @@ -189,7 +195,8 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK if rtcNode.Id != r.currentNode.Id { err = ErrIncorrectRTCNode logger.Errorw("called participant on incorrect node", err, - "rtcNode", rtcNode, "nodeID", r.currentNode.Id) + "rtcNode", rtcNode, + ) return err } @@ -312,16 +319,14 @@ func (r *RedisRouter) statsWorker() { // update periodically seconds select { case <-time.After(statsUpdateInterval): - if err := prometheus.UpdateCurrentNodeStats(r.currentNode.Stats); err != nil { - logger.Errorw("could not update node stats", err, "nodeID", r.currentNode.Id) - } - if err := r.RegisterNode(); err != nil { - logger.Errorw("could not update node", err, "nodeID", r.currentNode.Id) - } + r.WriteRTCNodeMessage(context.Background(), r.currentNode.Id, &livekit.RTCNodeMessage{ + Message: &livekit.RTCNodeMessage_KeepAlive{}, + }) case <-r.ctx.Done(): return } } + } // worker that consumes redis messages intended for this node @@ -421,6 +426,19 @@ func (r *RedisRouter) handleRTCMessage(rm *livekit.RTCNodeMessage) error { return err } + case *livekit.RTCNodeMessage_KeepAlive: + if time.Unix(rm.SenderTime, 0).Add(statsUpdateInterval).Before(time.Now()) { + logger.Infow("keep alive too old, skipping", "senderTime", rm.SenderTime) + break + } + + if err := prometheus.UpdateCurrentNodeStats(r.currentNode.Stats); err != nil { + logger.Errorw("could not update node stats", err) + } + if err := r.RegisterNode(); err != nil { + logger.Errorw("could not update node", err) + } + default: // route it to handler if r.onRTCMessage != nil { diff --git a/pkg/routing/routingfakes/fake_router.go b/pkg/routing/routingfakes/fake_router.go index e6183c087..699e2d907 100644 --- a/pkg/routing/routingfakes/fake_router.go +++ b/pkg/routing/routingfakes/fake_router.go @@ -165,6 +165,19 @@ type FakeRouter struct { writeRTCMessageReturnsOnCall map[int]struct { result1 error } + WriteRTCNodeMessageStub func(context.Context, string, *livekit.RTCNodeMessage) error + writeRTCNodeMessageMutex sync.RWMutex + writeRTCNodeMessageArgsForCall []struct { + arg1 context.Context + arg2 string + arg3 *livekit.RTCNodeMessage + } + writeRTCNodeMessageReturns struct { + result1 error + } + writeRTCNodeMessageReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -939,6 +952,69 @@ func (fake *FakeRouter) WriteRTCMessageReturnsOnCall(i int, result1 error) { }{result1} } +func (fake *FakeRouter) WriteRTCNodeMessage(arg1 context.Context, arg2 string, arg3 *livekit.RTCNodeMessage) error { + fake.writeRTCNodeMessageMutex.Lock() + ret, specificReturn := fake.writeRTCNodeMessageReturnsOnCall[len(fake.writeRTCNodeMessageArgsForCall)] + fake.writeRTCNodeMessageArgsForCall = append(fake.writeRTCNodeMessageArgsForCall, struct { + arg1 context.Context + arg2 string + arg3 *livekit.RTCNodeMessage + }{arg1, arg2, arg3}) + stub := fake.WriteRTCNodeMessageStub + fakeReturns := fake.writeRTCNodeMessageReturns + fake.recordInvocation("WriteRTCNodeMessage", []interface{}{arg1, arg2, arg3}) + fake.writeRTCNodeMessageMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeRouter) WriteRTCNodeMessageCallCount() int { + fake.writeRTCNodeMessageMutex.RLock() + defer fake.writeRTCNodeMessageMutex.RUnlock() + return len(fake.writeRTCNodeMessageArgsForCall) +} + +func (fake *FakeRouter) WriteRTCNodeMessageCalls(stub func(context.Context, string, *livekit.RTCNodeMessage) error) { + fake.writeRTCNodeMessageMutex.Lock() + defer fake.writeRTCNodeMessageMutex.Unlock() + fake.WriteRTCNodeMessageStub = stub +} + +func (fake *FakeRouter) WriteRTCNodeMessageArgsForCall(i int) (context.Context, string, *livekit.RTCNodeMessage) { + fake.writeRTCNodeMessageMutex.RLock() + defer fake.writeRTCNodeMessageMutex.RUnlock() + argsForCall := fake.writeRTCNodeMessageArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeRouter) WriteRTCNodeMessageReturns(result1 error) { + fake.writeRTCNodeMessageMutex.Lock() + defer fake.writeRTCNodeMessageMutex.Unlock() + fake.WriteRTCNodeMessageStub = nil + fake.writeRTCNodeMessageReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeRouter) WriteRTCNodeMessageReturnsOnCall(i int, result1 error) { + fake.writeRTCNodeMessageMutex.Lock() + defer fake.writeRTCNodeMessageMutex.Unlock() + fake.WriteRTCNodeMessageStub = nil + if fake.writeRTCNodeMessageReturnsOnCall == nil { + fake.writeRTCNodeMessageReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.writeRTCNodeMessageReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeRouter) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -972,6 +1048,8 @@ func (fake *FakeRouter) Invocations() map[string][][]interface{} { defer fake.unregisterNodeMutex.RUnlock() fake.writeRTCMessageMutex.RLock() defer fake.writeRTCMessageMutex.RUnlock() + fake.writeRTCNodeMessageMutex.RLock() + defer fake.writeRTCNodeMessageMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index c7781b22d..02b80a91f 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -63,21 +63,18 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq return nil, twirpAuthError(err) } // if the room is currently active, RTC node needs to disconnect clients - // here we are using any user's identity, due to how it works with routing - participants, err := s.roomStore.ListParticipants(ctx, req.Room) + node, err := s.router.GetNodeForRoom(ctx, req.Room) if err != nil { return nil, err } - if len(participants) > 0 { - err = s.writeMessage(ctx, req.Room, participants[0].Identity, &livekit.RTCNodeMessage{ - Message: &livekit.RTCNodeMessage_DeleteRoom{ - DeleteRoom: req, - }, - }) - if err != nil { - return nil, err - } + err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{ + Message: &livekit.RTCNodeMessage_DeleteRoom{ + DeleteRoom: req, + }, + }) + if err != nil { + return nil, err } return &livekit.DeleteRoomResponse{}, nil @@ -195,20 +192,18 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) { // here we are using any user's identity, due to how it works with routing - participants, err := s.roomStore.ListParticipants(ctx, req.Room) + node, err := s.router.GetNodeForRoom(ctx, req.Room) if err != nil { return nil, err } - if len(participants) > 0 { - err := s.writeMessage(ctx, req.Room, participants[0].Identity, &livekit.RTCNodeMessage{ - Message: &livekit.RTCNodeMessage_SendData{ - SendData: req, - }, - }) - if err != nil { - return nil, err - } + err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{ + Message: &livekit.RTCNodeMessage_SendData{ + SendData: req, + }, + }) + if err != nil { + return nil, err } return &livekit.SendDataResponse{}, nil @@ -226,20 +221,18 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat room.Metadata = req.Metadata - participants, err := s.roomStore.ListParticipants(ctx, req.Room) + node, err := s.router.GetNodeForRoom(ctx, req.Room) if err != nil { return nil, err } - if len(participants) > 0 { - err := s.writeMessage(ctx, req.Room, participants[0].Identity, &livekit.RTCNodeMessage{ - Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{ - UpdateRoomMetadata: req, - }, - }) - if err != nil { - return nil, err - } + err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{ + Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{ + UpdateRoomMetadata: req, + }, + }) + if err != nil { + return nil, err } return room, nil