From 025eb1164ca3d51ec78a75ed0a5fabc3ac99fb74 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Thu, 25 Jan 2024 15:48:12 -0800 Subject: [PATCH] retry signal stream start (#2410) --- go.mod | 3 ++- go.sum | 6 ++++-- pkg/routing/signal.go | 21 +++++++++++++++++++-- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 784eba4eb..a10a85928 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/livekit/livekit-server go 1.20 require ( + github.com/avast/retry-go/v4 v4.5.1 github.com/bep/debounce v1.2.1 github.com/d5/tengo/v2 v2.16.1 github.com/dustin/go-humanize v1.0.1 @@ -101,7 +102,7 @@ require ( golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.17.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect google.golang.org/grpc v1.61.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index f927c8661..678a999e7 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o= +github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY= @@ -421,8 +423,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe h1:bQnxqljG/wqi4NTXu2+DJ3n7APcEA882QZ1JvhQAq9o= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0= google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/routing/signal.go b/pkg/routing/signal.go index e7181038d..fbad8da97 100644 --- a/pkg/routing/signal.go +++ b/pkg/routing/signal.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/avast/retry-go/v4" "go.uber.org/atomic" "google.golang.org/protobuf/proto" @@ -99,13 +100,19 @@ func (r *signalClient) StartParticipantSignal( l.Debugw("starting signal connection") - stream, err := r.client.RelaySignal(ctx, nodeID) + var stream psrpc.ClientStream[*rpc.RelaySignalRequest, *rpc.RelaySignalResponse] + err = r.retry(ctx, func() (err error) { + stream, err = r.client.RelaySignal(ctx, nodeID) + return + }) if err != nil { prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) return } - err = stream.Send(&rpc.RelaySignalRequest{StartSession: ss}) + err = r.retry(ctx, func() error { + return stream.Send(&rpc.RelaySignalRequest{StartSession: ss}) + }) if err != nil { stream.Close(err) prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) @@ -141,6 +148,16 @@ func (r *signalClient) StartParticipantSignal( return connectionID, sink, resChan, nil } +func (r *signalClient) retry(ctx context.Context, fn retry.RetryableFunc) error { + return retry.Do( + fn, + retry.Context(ctx), + retry.Delay(r.config.MinRetryInterval), + retry.MaxDelay(r.config.MaxRetryInterval), + retry.DelayType(retry.BackOffDelay), + ) +} + type signalRequestMessageWriter struct{} func (e signalRequestMessageWriter) Write(seq uint64, close bool, msgs []proto.Message) *rpc.RelaySignalRequest {