diff --git a/go.mod b/go.mod index 6fd6141df..3ce1c9906 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-version v1.6.0 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v1.1.2 + github.com/livekit/protocol v1.1.3-0.20220926180122-bafacf7041a5 github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 github.com/mackerelio/go-osstat v0.2.3 github.com/magefile/mage v1.14.0 diff --git a/go.sum b/go.sum index 76ba724b9..d49f94b4d 100644 --- a/go.sum +++ b/go.sum @@ -240,8 +240,10 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v1.1.2 h1:LDEFKK16T57pwDwxlJkkWMpbgvR0DJ3PozjOnvq29CI= -github.com/livekit/protocol v1.1.2/go.mod h1:eburCdz6ZtbgKSKYkAeCdWP1z33DB9clTphz7uNaxp0= +github.com/livekit/protocol v1.1.3-0.20220922204413-19ead11853f1 h1:HEh5B8U66t0h0QkMt65Udon3oldBJGfl7THeKf/9bmw= +github.com/livekit/protocol v1.1.3-0.20220922204413-19ead11853f1/go.mod h1:eburCdz6ZtbgKSKYkAeCdWP1z33DB9clTphz7uNaxp0= +github.com/livekit/protocol v1.1.3-0.20220926180122-bafacf7041a5 h1:ct0ellNBvJ67i1VSxI9NIYAypBGRBdaJqucVm5PWIE0= +github.com/livekit/protocol v1.1.3-0.20220926180122-bafacf7041a5/go.mod h1:eburCdz6ZtbgKSKYkAeCdWP1z33DB9clTphz7uNaxp0= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.3 h1:jAMXD5erlDE39kdX2CU7YwCGRcxIO33u/p8+Fhe5dJw= diff --git a/pkg/service/ingress.go b/pkg/service/ingress.go index efed11a38..854cf4712 100644 --- a/pkg/service/ingress.go +++ b/pkg/service/ingress.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "google.golang.org/protobuf/proto" @@ -15,6 +16,11 @@ import ( "github.com/livekit/protocol/utils" ) +var ( + initialTimeout = time.Second * 3 + retryTimeout = time.Minute * 1 +) + type IngressService struct { conf *config.IngressConfig rpcClient ingress.RPCClient @@ -93,6 +99,41 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref return info, nil } +func (s *IngressService) sendRPCWithRetry(ctx context.Context, req *livekit.IngressRequest) (*livekit.IngressInfo, error) { + type result struct { + info *livekit.IngressInfo + err error + } + + resChan := make(chan result, 1) + + go func() { + cctx, _ := context.WithTimeout(context.Background(), retryTimeout) + + for { + select { + case <-cctx.Done(): + resChan <- result{nil, ingress.ErrNoResponse} + return + default: + } + + i, err := s.rpcClient.SendRequest(cctx, req) + if err != ingress.ErrNoResponse { + resChan <- result{i, err} + return + } + } + }() + + select { + case res := <-resChan: + return res.info, res.err + case <-time.After(initialTimeout): + return nil, ingress.ErrNoResponse + } +} + func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateIngressRequest) (*livekit.IngressInfo, error) { roomName, err := EnsureJoinPermission(ctx) if err != nil { @@ -139,13 +180,14 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI case livekit.IngressState_ENDPOINT_BUFFERING, livekit.IngressState_ENDPOINT_PUBLISHING: - info, err = s.rpcClient.SendRequest(ctx, &livekit.IngressRequest{ + i, err := s.sendRPCWithRetry(ctx, &livekit.IngressRequest{ IngressId: req.IngressId, Request: &livekit.IngressRequest_Update{Update: req}, }) if err != nil { - logger.Errorw("could not update active ingress", err) - return nil, err + logger.Warnw("could not update active ingress", err) + } else { + info = i } } @@ -193,13 +235,14 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI switch info.State.Status { case livekit.IngressState_ENDPOINT_BUFFERING, livekit.IngressState_ENDPOINT_PUBLISHING: - info, err = s.rpcClient.SendRequest(ctx, &livekit.IngressRequest{ + i, err := s.sendRPCWithRetry(ctx, &livekit.IngressRequest{ IngressId: req.IngressId, Request: &livekit.IngressRequest_Delete{Delete: req}, }) if err != nil { - logger.Errorw("could not stop active ingress", err) - return nil, err + logger.Warnw("could not stop active ingress", err) + } else { + info = i } } @@ -235,7 +278,7 @@ func (s *IngressService) updateWorker() { // save updated info to store err = s.store.UpdateIngress(context.Background(), res) if err != nil { - logger.Errorw("could not update egress", err) + logger.Errorw("could not update ingress", err) } case <-s.shutdown: diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index 1db9bbb68..d022b3711 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -475,7 +475,11 @@ func parseEgressEnded(value string) (roomName string, endedAt int64, err error) return } -func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) error { +func (s *RedisStore) StoreIngress(ctx context.Context, info *livekit.IngressInfo) error { + return s.storeIngress(ctx, info, false) +} + +func (s *RedisStore) storeIngress(_ context.Context, info *livekit.IngressInfo, updateOnly bool) error { if info.IngressId == "" { return errors.New("Missing IngressId") } @@ -501,6 +505,9 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) switch err { case ErrIngressNotFound: // Ingress doesn't exist yet + if updateOnly { + return err + } case nil: oldRoom = oldInfo.RoomName oldStartedAt = oldInfo.State.StartedAt @@ -640,8 +647,8 @@ func (s *RedisStore) ListIngress(_ context.Context, roomName livekit.RoomName) ( return infos, nil } -func (s *RedisStore) UpdateIngress(_ context.Context, info *livekit.IngressInfo) error { - return s.StoreIngress(s.ctx, info) +func (s *RedisStore) UpdateIngress(ctx context.Context, info *livekit.IngressInfo) error { + return s.storeIngress(ctx, info, true) } func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) error {