Only update an IngressInfo if the StartedAt field is newer or equal to the one currently in storage (#897)

This is meant to ensure that we will not overwrite the state of an ingress that already reconnected to a different server on failure or timeout
This commit is contained in:
Benjamin Pracht
2022-08-09 19:07:56 -07:00
committed by GitHub
parent 4ec7e71b2d
commit 67d3f21122
5 changed files with 41 additions and 8 deletions
+1 -1
View File
@@ -16,7 +16,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/go-version v1.6.0
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v1.0.0
github.com/livekit/protocol v1.0.1-0.20220809040042-d76090cba26e
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a
github.com/mackerelio/go-osstat v0.2.2
github.com/magefile/mage v1.13.0
+2
View File
@@ -242,6 +242,8 @@ github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJV
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v1.0.0 h1:41aGhSIHtyPJBwzw4Aw1Y4BQpKxLBlS1wK31G8uME8A=
github.com/livekit/protocol v1.0.0/go.mod h1:x51sLXmdYpzHvw+xtaootF4EP5Tasg+CDOpv0UYA3DY=
github.com/livekit/protocol v1.0.1-0.20220809040042-d76090cba26e h1:J8Y/cu8AFW/1KbHXVaKpHEGxqActUQpmcaoxot2DmJQ=
github.com/livekit/protocol v1.0.1-0.20220809040042-d76090cba26e/go.mod h1:JBAOkbmwYmZc4yMTpDrjLFs4RVPApEmoWP1idrjBjdI=
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I=
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U=
github.com/mackerelio/go-osstat v0.2.2 h1:7jVyXGXTkQL3+6lDVUDBY+Fpo8VQPfyOkZeXxxsXX4c=
+1
View File
@@ -16,4 +16,5 @@ var (
ErrRoomUnlockFailed = errors.New("could not unlock room, lock token does not match")
ErrTrackNotFound = errors.New("track is not found")
ErrWebHookMissingAPIKey = errors.New("api_key is required to use webhooks")
ErrIngressOutOfDate = errors.New("trying to ovewrite an ingress with an older version")
)
+15 -7
View File
@@ -446,6 +446,10 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo)
return errors.New("Missing StreamKey")
}
if info.State == nil {
info.State = &livekit.IngressState{}
}
data, err := proto.Marshal(info)
if err != nil {
return err
@@ -454,6 +458,7 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo)
// Use a "transaction" to remove the old room association if it changed
txf := func(tx *redis.Tx) error {
var oldRoom string
var oldStartedAt int64
oldInfo, err := s.loadIngress(tx, info.IngressId)
switch err {
@@ -461,11 +466,17 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo)
// Ingress doesn't exist yet
case nil:
oldRoom = oldInfo.RoomName
oldStartedAt = oldInfo.State.StartedAt
default:
return err
}
results, err := tx.TxPipelined(s.ctx, func(p redis.Pipeliner) error {
if info.State.StartedAt < oldStartedAt {
// Do not overwrite the info and state of a more recent session
return ErrIngressOutOfDate
}
p.HSet(s.ctx, IngressKey, info.IngressId, data)
p.HSet(s.ctx, StreamKeyKey, info.StreamKey, info.IngressId)
@@ -497,16 +508,13 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo)
// Retry if the key has been changed.
for i := 0; i < maxRetries; i++ {
err := s.rc.Watch(s.ctx, txf, IngressKey, StreamKeyKey)
if err == nil {
// Success.
return nil
}
if err == redis.TxFailedErr {
switch err {
case redis.TxFailedErr:
// Optimistic lock lost. Retry.
continue
default:
return err
}
// Return any other error.
return err
}
return nil
+22
View File
@@ -214,11 +214,18 @@ func TestIngressStore(t *testing.T) {
info := &livekit.IngressInfo{
IngressId: "ingressId",
StreamKey: "streamKey",
State: &livekit.IngressState{
StartedAt: 2,
},
}
err := rs.StoreIngress(ctx, info)
require.NoError(t, err)
t.Cleanup(func() {
rs.DeleteIngress(ctx, info)
})
pulledInfo, err := rs.LoadIngress(ctx, "ingressId")
require.NoError(t, err)
compareIngressInfo(t, pulledInfo, info)
@@ -245,6 +252,21 @@ func TestIngressStore(t *testing.T) {
infos, err = rs.ListIngress(ctx, "room")
require.NoError(t, err)
require.Equal(t, 0, len(infos))
info.RoomName = "room2"
info.State.StartedAt = 1
err = rs.UpdateIngress(ctx, info)
require.Equal(t, service.ErrIngressOutOfDate, err)
info.RoomName = "room2"
info.State.StartedAt = 3
err = rs.UpdateIngress(ctx, info)
require.NoError(t, err)
infos, err = rs.ListIngress(ctx, "")
require.NoError(t, err)
require.Equal(t, 1, len(infos))
require.Equal(t, "room2", infos[0].RoomName)
}
func compareIngressInfo(t *testing.T, expected, v *livekit.IngressInfo) {