retry signal stream start (#2410)

This commit is contained in:
Paul Wells
2024-01-25 15:48:12 -08:00
committed by GitHub
parent d5b3bbac61
commit 025eb1164c
3 changed files with 25 additions and 5 deletions

3
go.mod
View File

@@ -3,6 +3,7 @@ module github.com/livekit/livekit-server
go 1.20
require (
github.com/avast/retry-go/v4 v4.5.1
github.com/bep/debounce v1.2.1
github.com/d5/tengo/v2 v2.16.1
github.com/dustin/go-humanize v1.0.1
@@ -101,7 +102,7 @@ require (
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect
google.golang.org/grpc v1.61.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

6
go.sum
View File

@@ -1,3 +1,5 @@
github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o=
github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY=
@@ -421,8 +423,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe h1:bQnxqljG/wqi4NTXu2+DJ3n7APcEA882QZ1JvhQAq9o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s=
google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0=
google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=

View File

@@ -20,6 +20,7 @@ import (
"sync"
"time"
"github.com/avast/retry-go/v4"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
@@ -99,13 +100,19 @@ func (r *signalClient) StartParticipantSignal(
l.Debugw("starting signal connection")
stream, err := r.client.RelaySignal(ctx, nodeID)
var stream psrpc.ClientStream[*rpc.RelaySignalRequest, *rpc.RelaySignalResponse]
err = r.retry(ctx, func() (err error) {
stream, err = r.client.RelaySignal(ctx, nodeID)
return
})
if err != nil {
prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1)
return
}
err = stream.Send(&rpc.RelaySignalRequest{StartSession: ss})
err = r.retry(ctx, func() error {
return stream.Send(&rpc.RelaySignalRequest{StartSession: ss})
})
if err != nil {
stream.Close(err)
prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1)
@@ -141,6 +148,16 @@ func (r *signalClient) StartParticipantSignal(
return connectionID, sink, resChan, nil
}
func (r *signalClient) retry(ctx context.Context, fn retry.RetryableFunc) error {
return retry.Do(
fn,
retry.Context(ctx),
retry.Delay(r.config.MinRetryInterval),
retry.MaxDelay(r.config.MaxRetryInterval),
retry.DelayType(retry.BackOffDelay),
)
}
type signalRequestMessageWriter struct{}
func (e signalRequestMessageWriter) Write(seq uint64, close bool, msgs []proto.Message) *rpc.RelaySignalRequest {