From 932af81f346d0fb05d6ef5848d64d6f37d3488f0 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 26 Sep 2022 11:16:27 -0700 Subject: [PATCH] Update strored version of an active ingress if no ingress server responds (#1031) This allows deleting and updating an ingress even if the ingress server that was handling it died. It does however mean that if the ingress responds again later, its state will be inconsistent. To somewhat make this less likely, also keep trying contacting the ingress for 1 min in the background. Also fixing a race where an active deleted Ingress would get recreated on delete because of the update triggered by the ingress session shutdown --- go.mod | 2 +- go.sum | 6 +++-- pkg/service/ingress.go | 57 ++++++++++++++++++++++++++++++++++----- pkg/service/redisstore.go | 13 ++++++--- 4 files changed, 65 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 6fd6141df..3ce1c9906 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-version v1.6.0 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v1.1.2 + github.com/livekit/protocol v1.1.3-0.20220926180122-bafacf7041a5 github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 github.com/mackerelio/go-osstat v0.2.3 github.com/magefile/mage v1.14.0 diff --git a/go.sum b/go.sum index 76ba724b9..d49f94b4d 100644 --- a/go.sum +++ b/go.sum @@ -240,8 +240,10 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v1.1.2 h1:LDEFKK16T57pwDwxlJkkWMpbgvR0DJ3PozjOnvq29CI= -github.com/livekit/protocol v1.1.2/go.mod h1:eburCdz6ZtbgKSKYkAeCdWP1z33DB9clTphz7uNaxp0= +github.com/livekit/protocol v1.1.3-0.20220922204413-19ead11853f1 h1:HEh5B8U66t0h0QkMt65Udon3oldBJGfl7THeKf/9bmw= +github.com/livekit/protocol v1.1.3-0.20220922204413-19ead11853f1/go.mod h1:eburCdz6ZtbgKSKYkAeCdWP1z33DB9clTphz7uNaxp0= +github.com/livekit/protocol v1.1.3-0.20220926180122-bafacf7041a5 h1:ct0ellNBvJ67i1VSxI9NIYAypBGRBdaJqucVm5PWIE0= +github.com/livekit/protocol v1.1.3-0.20220926180122-bafacf7041a5/go.mod h1:eburCdz6ZtbgKSKYkAeCdWP1z33DB9clTphz7uNaxp0= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.3 h1:jAMXD5erlDE39kdX2CU7YwCGRcxIO33u/p8+Fhe5dJw= diff --git a/pkg/service/ingress.go b/pkg/service/ingress.go index efed11a38..854cf4712 100644 --- a/pkg/service/ingress.go +++ b/pkg/service/ingress.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "google.golang.org/protobuf/proto" @@ -15,6 +16,11 @@ import ( "github.com/livekit/protocol/utils" ) +var ( + initialTimeout = time.Second * 3 + retryTimeout = time.Minute * 1 +) + type IngressService struct { conf *config.IngressConfig rpcClient ingress.RPCClient @@ -93,6 +99,41 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref return info, nil } +func (s *IngressService) sendRPCWithRetry(ctx context.Context, req *livekit.IngressRequest) (*livekit.IngressInfo, error) { + type result struct { + info *livekit.IngressInfo + err error + } + + resChan := make(chan result, 1) + + go func() { + cctx, _ := context.WithTimeout(context.Background(), retryTimeout) + + for { + select { + case <-cctx.Done(): + resChan <- result{nil, ingress.ErrNoResponse} + return + default: + } + + i, err := s.rpcClient.SendRequest(cctx, req) + if err != ingress.ErrNoResponse { + resChan <- result{i, err} + return + } + } + }() + + select { + case res := <-resChan: + return res.info, res.err + case <-time.After(initialTimeout): + return nil, ingress.ErrNoResponse + } +} + func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateIngressRequest) (*livekit.IngressInfo, error) { roomName, err := EnsureJoinPermission(ctx) if err != nil { @@ -139,13 +180,14 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI case livekit.IngressState_ENDPOINT_BUFFERING, livekit.IngressState_ENDPOINT_PUBLISHING: - info, err = s.rpcClient.SendRequest(ctx, &livekit.IngressRequest{ + i, err := s.sendRPCWithRetry(ctx, &livekit.IngressRequest{ IngressId: req.IngressId, Request: &livekit.IngressRequest_Update{Update: req}, }) if err != nil { - logger.Errorw("could not update active ingress", err) - return nil, err + logger.Warnw("could not update active ingress", err) + } else { + info = i } } @@ -193,13 +235,14 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI switch info.State.Status { case livekit.IngressState_ENDPOINT_BUFFERING, livekit.IngressState_ENDPOINT_PUBLISHING: - info, err = s.rpcClient.SendRequest(ctx, &livekit.IngressRequest{ + i, err := s.sendRPCWithRetry(ctx, &livekit.IngressRequest{ IngressId: req.IngressId, Request: &livekit.IngressRequest_Delete{Delete: req}, }) if err != nil { - logger.Errorw("could not stop active ingress", err) - return nil, err + logger.Warnw("could not stop active ingress", err) + } else { + info = i } } @@ -235,7 +278,7 @@ func (s *IngressService) updateWorker() { // save updated info to store err = s.store.UpdateIngress(context.Background(), res) if err != nil { - logger.Errorw("could not update egress", err) + logger.Errorw("could not update ingress", err) } case <-s.shutdown: diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index 1db9bbb68..d022b3711 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -475,7 +475,11 @@ func parseEgressEnded(value string) (roomName string, endedAt int64, err error) return } -func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) error { +func (s *RedisStore) StoreIngress(ctx context.Context, info *livekit.IngressInfo) error { + return s.storeIngress(ctx, info, false) +} + +func (s *RedisStore) storeIngress(_ context.Context, info *livekit.IngressInfo, updateOnly bool) error { if info.IngressId == "" { return errors.New("Missing IngressId") } @@ -501,6 +505,9 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) switch err { case ErrIngressNotFound: // Ingress doesn't exist yet + if updateOnly { + return err + } case nil: oldRoom = oldInfo.RoomName oldStartedAt = oldInfo.State.StartedAt @@ -640,8 +647,8 @@ func (s *RedisStore) ListIngress(_ context.Context, roomName livekit.RoomName) ( return infos, nil } -func (s *RedisStore) UpdateIngress(_ context.Context, info *livekit.IngressInfo) error { - return s.StoreIngress(s.ctx, info) +func (s *RedisStore) UpdateIngress(ctx context.Context, info *livekit.IngressInfo) error { + return s.storeIngress(ctx, info, true) } func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) error {