Update strored version of an active ingress if no ingress server responds (#1031)

This allows deleting and updating an ingress even if the ingress server that was handling it died. It does however mean that if the ingress responds again later, its state will be inconsistent. To somewhat make this less likely, also keep trying contacting the ingress for 1 min in the background.

Also fixing a race where an active deleted Ingress would get recreated on delete because of the update triggered by the ingress session shutdown
This commit is contained in:
Benjamin Pracht
2022-09-26 11:16:27 -07:00
committed by GitHub
parent dfc71d5bf8
commit 932af81f34
4 changed files with 65 additions and 13 deletions
+1 -1
View File
@@ -16,7 +16,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/go-version v1.6.0
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v1.1.2
github.com/livekit/protocol v1.1.3-0.20220926180122-bafacf7041a5
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995
github.com/mackerelio/go-osstat v0.2.3
github.com/magefile/mage v1.14.0
+4 -2
View File
@@ -240,8 +240,10 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v1.1.2 h1:LDEFKK16T57pwDwxlJkkWMpbgvR0DJ3PozjOnvq29CI=
github.com/livekit/protocol v1.1.2/go.mod h1:eburCdz6ZtbgKSKYkAeCdWP1z33DB9clTphz7uNaxp0=
github.com/livekit/protocol v1.1.3-0.20220922204413-19ead11853f1 h1:HEh5B8U66t0h0QkMt65Udon3oldBJGfl7THeKf/9bmw=
github.com/livekit/protocol v1.1.3-0.20220922204413-19ead11853f1/go.mod h1:eburCdz6ZtbgKSKYkAeCdWP1z33DB9clTphz7uNaxp0=
github.com/livekit/protocol v1.1.3-0.20220926180122-bafacf7041a5 h1:ct0ellNBvJ67i1VSxI9NIYAypBGRBdaJqucVm5PWIE0=
github.com/livekit/protocol v1.1.3-0.20220926180122-bafacf7041a5/go.mod h1:eburCdz6ZtbgKSKYkAeCdWP1z33DB9clTphz7uNaxp0=
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw=
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U=
github.com/mackerelio/go-osstat v0.2.3 h1:jAMXD5erlDE39kdX2CU7YwCGRcxIO33u/p8+Fhe5dJw=
+50 -7
View File
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"
"google.golang.org/protobuf/proto"
@@ -15,6 +16,11 @@ import (
"github.com/livekit/protocol/utils"
)
var (
initialTimeout = time.Second * 3
retryTimeout = time.Minute * 1
)
type IngressService struct {
conf *config.IngressConfig
rpcClient ingress.RPCClient
@@ -93,6 +99,41 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref
return info, nil
}
func (s *IngressService) sendRPCWithRetry(ctx context.Context, req *livekit.IngressRequest) (*livekit.IngressInfo, error) {
type result struct {
info *livekit.IngressInfo
err error
}
resChan := make(chan result, 1)
go func() {
cctx, _ := context.WithTimeout(context.Background(), retryTimeout)
for {
select {
case <-cctx.Done():
resChan <- result{nil, ingress.ErrNoResponse}
return
default:
}
i, err := s.rpcClient.SendRequest(cctx, req)
if err != ingress.ErrNoResponse {
resChan <- result{i, err}
return
}
}
}()
select {
case res := <-resChan:
return res.info, res.err
case <-time.After(initialTimeout):
return nil, ingress.ErrNoResponse
}
}
func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateIngressRequest) (*livekit.IngressInfo, error) {
roomName, err := EnsureJoinPermission(ctx)
if err != nil {
@@ -139,13 +180,14 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI
case livekit.IngressState_ENDPOINT_BUFFERING,
livekit.IngressState_ENDPOINT_PUBLISHING:
info, err = s.rpcClient.SendRequest(ctx, &livekit.IngressRequest{
i, err := s.sendRPCWithRetry(ctx, &livekit.IngressRequest{
IngressId: req.IngressId,
Request: &livekit.IngressRequest_Update{Update: req},
})
if err != nil {
logger.Errorw("could not update active ingress", err)
return nil, err
logger.Warnw("could not update active ingress", err)
} else {
info = i
}
}
@@ -193,13 +235,14 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI
switch info.State.Status {
case livekit.IngressState_ENDPOINT_BUFFERING,
livekit.IngressState_ENDPOINT_PUBLISHING:
info, err = s.rpcClient.SendRequest(ctx, &livekit.IngressRequest{
i, err := s.sendRPCWithRetry(ctx, &livekit.IngressRequest{
IngressId: req.IngressId,
Request: &livekit.IngressRequest_Delete{Delete: req},
})
if err != nil {
logger.Errorw("could not stop active ingress", err)
return nil, err
logger.Warnw("could not stop active ingress", err)
} else {
info = i
}
}
@@ -235,7 +278,7 @@ func (s *IngressService) updateWorker() {
// save updated info to store
err = s.store.UpdateIngress(context.Background(), res)
if err != nil {
logger.Errorw("could not update egress", err)
logger.Errorw("could not update ingress", err)
}
case <-s.shutdown:
+10 -3
View File
@@ -475,7 +475,11 @@ func parseEgressEnded(value string) (roomName string, endedAt int64, err error)
return
}
func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) error {
func (s *RedisStore) StoreIngress(ctx context.Context, info *livekit.IngressInfo) error {
return s.storeIngress(ctx, info, false)
}
func (s *RedisStore) storeIngress(_ context.Context, info *livekit.IngressInfo, updateOnly bool) error {
if info.IngressId == "" {
return errors.New("Missing IngressId")
}
@@ -501,6 +505,9 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo)
switch err {
case ErrIngressNotFound:
// Ingress doesn't exist yet
if updateOnly {
return err
}
case nil:
oldRoom = oldInfo.RoomName
oldStartedAt = oldInfo.State.StartedAt
@@ -640,8 +647,8 @@ func (s *RedisStore) ListIngress(_ context.Context, roomName livekit.RoomName) (
return infos, nil
}
func (s *RedisStore) UpdateIngress(_ context.Context, info *livekit.IngressInfo) error {
return s.StoreIngress(s.ctx, info)
func (s *RedisStore) UpdateIngress(ctx context.Context, info *livekit.IngressInfo) error {
return s.storeIngress(ctx, info, true)
}
func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) error {