Node updates stats with KeepAlive message to self (#177)

* node sends KeepAlive message to self

* use WriteRTCNodeMessage instead of participants[0]
This commit is contained in:
Mathew Kamkar
2021-11-09 17:19:46 -08:00
committed by GitHub
parent 01cf22f2c4
commit 94aec3b98d
7 changed files with 168 additions and 55 deletions
+9 -2
View File
@@ -5,16 +5,18 @@ go 1.15
require (
github.com/bep/debounce v1.2.0
github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/gammazero/deque v0.1.0
github.com/gammazero/workerpool v1.1.2
github.com/go-logr/logr v1.2.0
github.com/go-logr/zapr v1.1.0
github.com/go-redis/redis/v8 v8.11.3
github.com/google/subcommands v1.2.0 // indirect
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/livekit/protocol v0.10.1-0.20211109000312-b3847c8d35ff
github.com/livekit/protocol v0.10.1-0.20211110010924-91c31dabb096
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0
@@ -31,13 +33,18 @@ require (
github.com/pion/webrtc/v3 v3.1.5
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/rs/zerolog v1.25.0
github.com/rs/zerolog v1.26.0
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/thoas/go-funk v0.8.0
github.com/twitchtv/twirp v8.1.0+incompatible
github.com/urfave/cli/v2 v2.3.0
github.com/urfave/negroni v1.0.0
go.uber.org/zap v1.19.1
golang.org/x/mod v0.5.1 // indirect
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.20.16
+18 -11
View File
@@ -18,8 +18,9 @@ github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8/go.mod h1:uEyr4WpAH
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -75,8 +76,9 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/subcommands v1.0.1 h1:/eqq+otEXm5vhfBrbREPCSVQbvofip6kIz+mX5TUH7k=
github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE=
github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -104,8 +106,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.10.1-0.20211109000312-b3847c8d35ff h1:21SZ2sh5e7ELCVdXT01hlpdSZyNlwDv6KTOlcplBrQ8=
github.com/livekit/protocol v0.10.1-0.20211109000312-b3847c8d35ff/go.mod h1:7ir9zSlgnrPQoGGNv4f8U/c9QrWh+ogC9B5xVbJNedM=
github.com/livekit/protocol v0.10.1-0.20211110010924-91c31dabb096 h1:/WVPy3AjPO5xf/pKWaoChcuEK3x2JwHbCujXnt/1oqc=
github.com/livekit/protocol v0.10.1-0.20211110010924-91c31dabb096/go.mod h1:7ir9zSlgnrPQoGGNv4f8U/c9QrWh+ogC9B5xVbJNedM=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=
github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
@@ -205,14 +207,14 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.25.0 h1:Rj7XygbUHKUlDPcVdoLyR91fJBsduXj5fRxyqIQj/II=
github.com/rs/zerolog v1.25.0/go.mod h1:7KHcEGe0QZPOm2IE4Kpb5rTh6n1h2hIgS5OOnu1rUaI=
github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
github.com/rs/zerolog v1.26.0 h1:ORM4ibhEZeTeQlCojCK2kPz1ogAY4bGs4tD+SaAdGaE=
github.com/rs/zerolog v1.26.0/go.mod h1:yBiM87lvSqX8h0Ww4sdzNSkVYZ8dL2xjZJG1lAuGZEo=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw=
github.com/sclevine/spec v1.4.0 h1:z/Q9idDcay5m5irkZ28M7PtQM4aOISzOpj4bUPkDee8=
github.com/sclevine/spec v1.4.0/go.mod h1:LvpgJaFyvQzRvc1kaDs0bulYwzC70PbiYjC4QnFHkOM=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
@@ -236,6 +238,7 @@ github.com/urfave/negroni v1.0.0 h1:kIimOitoypq34K7TG7DUaJ9kq/N4Ofuwi1sjz0KipXc=
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
@@ -257,8 +260,9 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38=
golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -312,8 +316,10 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
@@ -327,8 +333,9 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201023174141-c8cfbd0f21e6/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.7 h1:6j8CgantCy3yc8JGBqkDLMKWqZ0RDU2g1HVgacojGWQ=
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+1
View File
@@ -60,6 +60,7 @@ type Router interface {
// WriteRTCMessage sends a message to the RTC node
WriteRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error
WriteRTCNodeMessage(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error
// OnNewParticipantRTC is called to start a new participant's RTC connection
OnNewParticipantRTC(callback NewParticipantCallback)
+12 -3
View File
@@ -115,12 +115,21 @@ func (r *LocalRouter) WriteRTCMessage(ctx context.Context, roomName, identity st
// create a new one
r.rtcMessageChan = NewMessageChannel()
}
return r.writeRTCMessage(roomName, identity, msg, r.rtcMessageChan)
msg.ParticipantKey = participantKey(roomName, identity)
return r.writeRTCMessage(r.rtcMessageChan, msg)
}
func (r *LocalRouter) writeRTCMessage(roomName, identity string, msg *livekit.RTCNodeMessage, sink MessageSink) error {
func (r *LocalRouter) WriteRTCNodeMessage(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error {
if r.rtcMessageChan.IsClosed() {
// create a new one
r.rtcMessageChan = NewMessageChannel()
}
return r.writeRTCMessage(r.rtcMessageChan, msg)
}
func (r *LocalRouter) writeRTCMessage(sink MessageSink, msg *livekit.RTCNodeMessage) error {
defer sink.Close()
msg.ParticipantKey = participantKey(roomName, identity)
msg.SenderTime = time.Now().Unix()
return sink.WriteMessage(msg)
}
+26 -8
View File
@@ -176,7 +176,13 @@ func (r *RedisRouter) WriteRTCMessage(ctx context.Context, roomName, identity st
}
rtcSink := NewRTCNodeSink(r.rc, rtcNode, pkey)
return r.writeRTCMessage(roomName, identity, msg, rtcSink)
msg.ParticipantKey = participantKey(roomName, identity)
return r.writeRTCMessage(rtcSink, msg)
}
func (r *RedisRouter) WriteRTCNodeMessage(ctx context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error {
rtcSink := NewRTCNodeSink(r.rc, rtcNodeID, "")
return r.writeRTCMessage(rtcSink, msg)
}
func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantKey string) error {
@@ -189,7 +195,8 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK
if rtcNode.Id != r.currentNode.Id {
err = ErrIncorrectRTCNode
logger.Errorw("called participant on incorrect node", err,
"rtcNode", rtcNode, "nodeID", r.currentNode.Id)
"rtcNode", rtcNode,
)
return err
}
@@ -312,16 +319,14 @@ func (r *RedisRouter) statsWorker() {
// update periodically seconds
select {
case <-time.After(statsUpdateInterval):
if err := prometheus.UpdateCurrentNodeStats(r.currentNode.Stats); err != nil {
logger.Errorw("could not update node stats", err, "nodeID", r.currentNode.Id)
}
if err := r.RegisterNode(); err != nil {
logger.Errorw("could not update node", err, "nodeID", r.currentNode.Id)
}
r.WriteRTCNodeMessage(context.Background(), r.currentNode.Id, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_KeepAlive{},
})
case <-r.ctx.Done():
return
}
}
}
// worker that consumes redis messages intended for this node
@@ -421,6 +426,19 @@ func (r *RedisRouter) handleRTCMessage(rm *livekit.RTCNodeMessage) error {
return err
}
case *livekit.RTCNodeMessage_KeepAlive:
if time.Unix(rm.SenderTime, 0).Add(statsUpdateInterval).Before(time.Now()) {
logger.Infow("keep alive too old, skipping", "senderTime", rm.SenderTime)
break
}
if err := prometheus.UpdateCurrentNodeStats(r.currentNode.Stats); err != nil {
logger.Errorw("could not update node stats", err)
}
if err := r.RegisterNode(); err != nil {
logger.Errorw("could not update node", err)
}
default:
// route it to handler
if r.onRTCMessage != nil {
+78
View File
@@ -165,6 +165,19 @@ type FakeRouter struct {
writeRTCMessageReturnsOnCall map[int]struct {
result1 error
}
WriteRTCNodeMessageStub func(context.Context, string, *livekit.RTCNodeMessage) error
writeRTCNodeMessageMutex sync.RWMutex
writeRTCNodeMessageArgsForCall []struct {
arg1 context.Context
arg2 string
arg3 *livekit.RTCNodeMessage
}
writeRTCNodeMessageReturns struct {
result1 error
}
writeRTCNodeMessageReturnsOnCall map[int]struct {
result1 error
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
@@ -939,6 +952,69 @@ func (fake *FakeRouter) WriteRTCMessageReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeRouter) WriteRTCNodeMessage(arg1 context.Context, arg2 string, arg3 *livekit.RTCNodeMessage) error {
fake.writeRTCNodeMessageMutex.Lock()
ret, specificReturn := fake.writeRTCNodeMessageReturnsOnCall[len(fake.writeRTCNodeMessageArgsForCall)]
fake.writeRTCNodeMessageArgsForCall = append(fake.writeRTCNodeMessageArgsForCall, struct {
arg1 context.Context
arg2 string
arg3 *livekit.RTCNodeMessage
}{arg1, arg2, arg3})
stub := fake.WriteRTCNodeMessageStub
fakeReturns := fake.writeRTCNodeMessageReturns
fake.recordInvocation("WriteRTCNodeMessage", []interface{}{arg1, arg2, arg3})
fake.writeRTCNodeMessageMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeRouter) WriteRTCNodeMessageCallCount() int {
fake.writeRTCNodeMessageMutex.RLock()
defer fake.writeRTCNodeMessageMutex.RUnlock()
return len(fake.writeRTCNodeMessageArgsForCall)
}
func (fake *FakeRouter) WriteRTCNodeMessageCalls(stub func(context.Context, string, *livekit.RTCNodeMessage) error) {
fake.writeRTCNodeMessageMutex.Lock()
defer fake.writeRTCNodeMessageMutex.Unlock()
fake.WriteRTCNodeMessageStub = stub
}
func (fake *FakeRouter) WriteRTCNodeMessageArgsForCall(i int) (context.Context, string, *livekit.RTCNodeMessage) {
fake.writeRTCNodeMessageMutex.RLock()
defer fake.writeRTCNodeMessageMutex.RUnlock()
argsForCall := fake.writeRTCNodeMessageArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeRouter) WriteRTCNodeMessageReturns(result1 error) {
fake.writeRTCNodeMessageMutex.Lock()
defer fake.writeRTCNodeMessageMutex.Unlock()
fake.WriteRTCNodeMessageStub = nil
fake.writeRTCNodeMessageReturns = struct {
result1 error
}{result1}
}
func (fake *FakeRouter) WriteRTCNodeMessageReturnsOnCall(i int, result1 error) {
fake.writeRTCNodeMessageMutex.Lock()
defer fake.writeRTCNodeMessageMutex.Unlock()
fake.WriteRTCNodeMessageStub = nil
if fake.writeRTCNodeMessageReturnsOnCall == nil {
fake.writeRTCNodeMessageReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.writeRTCNodeMessageReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeRouter) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
@@ -972,6 +1048,8 @@ func (fake *FakeRouter) Invocations() map[string][][]interface{} {
defer fake.unregisterNodeMutex.RUnlock()
fake.writeRTCMessageMutex.RLock()
defer fake.writeRTCMessageMutex.RUnlock()
fake.writeRTCNodeMessageMutex.RLock()
defer fake.writeRTCNodeMessageMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
+24 -31
View File
@@ -63,21 +63,18 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
return nil, twirpAuthError(err)
}
// if the room is currently active, RTC node needs to disconnect clients
// here we are using any user's identity, due to how it works with routing
participants, err := s.roomStore.ListParticipants(ctx, req.Room)
node, err := s.router.GetNodeForRoom(ctx, req.Room)
if err != nil {
return nil, err
}
if len(participants) > 0 {
err = s.writeMessage(ctx, req.Room, participants[0].Identity, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_DeleteRoom{
DeleteRoom: req,
},
})
if err != nil {
return nil, err
}
err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_DeleteRoom{
DeleteRoom: req,
},
})
if err != nil {
return nil, err
}
return &livekit.DeleteRoomResponse{}, nil
@@ -195,20 +192,18 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda
func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) {
// here we are using any user's identity, due to how it works with routing
participants, err := s.roomStore.ListParticipants(ctx, req.Room)
node, err := s.router.GetNodeForRoom(ctx, req.Room)
if err != nil {
return nil, err
}
if len(participants) > 0 {
err := s.writeMessage(ctx, req.Room, participants[0].Identity, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_SendData{
SendData: req,
},
})
if err != nil {
return nil, err
}
err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_SendData{
SendData: req,
},
})
if err != nil {
return nil, err
}
return &livekit.SendDataResponse{}, nil
@@ -226,20 +221,18 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
room.Metadata = req.Metadata
participants, err := s.roomStore.ListParticipants(ctx, req.Room)
node, err := s.router.GetNodeForRoom(ctx, req.Room)
if err != nil {
return nil, err
}
if len(participants) > 0 {
err := s.writeMessage(ctx, req.Room, participants[0].Identity, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{
UpdateRoomMetadata: req,
},
})
if err != nil {
return nil, err
}
err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{
UpdateRoomMetadata: req,
},
})
if err != nil {
return nil, err
}
return room, nil