From 67d3f21122d3d90557d2c3facf352dc78ee1b4b8 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Tue, 9 Aug 2022 19:07:56 -0700 Subject: [PATCH] Only update an IngressInfo if the StartedAt field is newer or equal to the one currently in storage (#897) This is meant to ensure that we will not overwrite the state of an ingress that already reconnected to a different server on failure or timeout --- go.mod | 2 +- go.sum | 2 ++ pkg/service/errors.go | 1 + pkg/service/redisstore.go | 22 +++++++++++++++------- pkg/service/redisstore_test.go | 22 ++++++++++++++++++++++ 5 files changed, 41 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 7f3b4e665..cc115bc1a 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-version v1.6.0 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v1.0.0 + github.com/livekit/protocol v1.0.1-0.20220809040042-d76090cba26e github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a github.com/mackerelio/go-osstat v0.2.2 github.com/magefile/mage v1.13.0 diff --git a/go.sum b/go.sum index 9d47222d2..48b33b6a6 100644 --- a/go.sum +++ b/go.sum @@ -242,6 +242,8 @@ github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJV github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/protocol v1.0.0 h1:41aGhSIHtyPJBwzw4Aw1Y4BQpKxLBlS1wK31G8uME8A= github.com/livekit/protocol v1.0.0/go.mod h1:x51sLXmdYpzHvw+xtaootF4EP5Tasg+CDOpv0UYA3DY= +github.com/livekit/protocol v1.0.1-0.20220809040042-d76090cba26e h1:J8Y/cu8AFW/1KbHXVaKpHEGxqActUQpmcaoxot2DmJQ= +github.com/livekit/protocol v1.0.1-0.20220809040042-d76090cba26e/go.mod h1:JBAOkbmwYmZc4yMTpDrjLFs4RVPApEmoWP1idrjBjdI= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.2 h1:7jVyXGXTkQL3+6lDVUDBY+Fpo8VQPfyOkZeXxxsXX4c= diff --git a/pkg/service/errors.go b/pkg/service/errors.go index 44d802606..e2249e7ab 100644 --- a/pkg/service/errors.go +++ b/pkg/service/errors.go @@ -16,4 +16,5 @@ var ( ErrRoomUnlockFailed = errors.New("could not unlock room, lock token does not match") ErrTrackNotFound = errors.New("track is not found") ErrWebHookMissingAPIKey = errors.New("api_key is required to use webhooks") + ErrIngressOutOfDate = errors.New("trying to ovewrite an ingress with an older version") ) diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index b3f6709a5..82109bb06 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -446,6 +446,10 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) return errors.New("Missing StreamKey") } + if info.State == nil { + info.State = &livekit.IngressState{} + } + data, err := proto.Marshal(info) if err != nil { return err @@ -454,6 +458,7 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) // Use a "transaction" to remove the old room association if it changed txf := func(tx *redis.Tx) error { var oldRoom string + var oldStartedAt int64 oldInfo, err := s.loadIngress(tx, info.IngressId) switch err { @@ -461,11 +466,17 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) // Ingress doesn't exist yet case nil: oldRoom = oldInfo.RoomName + oldStartedAt = oldInfo.State.StartedAt default: return err } results, err := tx.TxPipelined(s.ctx, func(p redis.Pipeliner) error { + if info.State.StartedAt < oldStartedAt { + // Do not overwrite the info and state of a more recent session + return ErrIngressOutOfDate + } + p.HSet(s.ctx, IngressKey, info.IngressId, data) p.HSet(s.ctx, StreamKeyKey, info.StreamKey, info.IngressId) @@ -497,16 +508,13 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) // Retry if the key has been changed. for i := 0; i < maxRetries; i++ { err := s.rc.Watch(s.ctx, txf, IngressKey, StreamKeyKey) - if err == nil { - // Success. - return nil - } - if err == redis.TxFailedErr { + switch err { + case redis.TxFailedErr: // Optimistic lock lost. Retry. continue + default: + return err } - // Return any other error. - return err } return nil diff --git a/pkg/service/redisstore_test.go b/pkg/service/redisstore_test.go index 17c689042..38cbde8ba 100644 --- a/pkg/service/redisstore_test.go +++ b/pkg/service/redisstore_test.go @@ -214,11 +214,18 @@ func TestIngressStore(t *testing.T) { info := &livekit.IngressInfo{ IngressId: "ingressId", StreamKey: "streamKey", + State: &livekit.IngressState{ + StartedAt: 2, + }, } err := rs.StoreIngress(ctx, info) require.NoError(t, err) + t.Cleanup(func() { + rs.DeleteIngress(ctx, info) + }) + pulledInfo, err := rs.LoadIngress(ctx, "ingressId") require.NoError(t, err) compareIngressInfo(t, pulledInfo, info) @@ -245,6 +252,21 @@ func TestIngressStore(t *testing.T) { infos, err = rs.ListIngress(ctx, "room") require.NoError(t, err) require.Equal(t, 0, len(infos)) + + info.RoomName = "room2" + info.State.StartedAt = 1 + err = rs.UpdateIngress(ctx, info) + require.Equal(t, service.ErrIngressOutOfDate, err) + + info.RoomName = "room2" + info.State.StartedAt = 3 + err = rs.UpdateIngress(ctx, info) + require.NoError(t, err) + + infos, err = rs.ListIngress(ctx, "") + require.NoError(t, err) + require.Equal(t, 1, len(infos)) + require.Equal(t, "room2", infos[0].RoomName) } func compareIngressInfo(t *testing.T, expected, v *livekit.IngressInfo) {