Use the ingress state updated_at field to ensure that out of order RPC do not overwrite state (#2657)

This commit is contained in:
Benjamin Pracht
2024-04-16 18:10:14 +02:00
committed by GitHub
parent b77f0256c7
commit 6afa63ded3
3 changed files with 14 additions and 6 deletions
+2 -2
View File
@@ -18,8 +18,8 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e
github.com/livekit/protocol v1.12.1-0.20240410060226-b6a979d8cfce
github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df
github.com/livekit/protocol v1.12.1-0.20240416154343-2d633a51d825
github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
+4 -4
View File
@@ -118,10 +118,10 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e h1:ss4VwrouYiDpuNJ9BUTH+WsW+GDdJS70iZp8ii3/0Lc=
github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.12.1-0.20240410060226-b6a979d8cfce h1:cbuw8FQ5S1vX6avOmlj6f8IwliXsOGjOa3UR0YDucX8=
github.com/livekit/protocol v1.12.1-0.20240410060226-b6a979d8cfce/go.mod h1:jB6PWwf4tdMAwy+jxexqaVWuQiiklAtO4F5zZzWkTII=
github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df h1:DVhJRlF6/CtiyxJVy3QsbS9bf7GyUMuRZONMwZxIWpY=
github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.12.1-0.20240416154343-2d633a51d825 h1:HJWtEtuiRoKE+/cRjLY0UHKnB6iMOAp1/17CXG6e5W0=
github.com/livekit/protocol v1.12.1-0.20240416154343-2d633a51d825/go.mod h1:jB6PWwf4tdMAwy+jxexqaVWuQiiklAtO4F5zZzWkTII=
github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be h1:W1nCFZ19rYAORMBNX82NeVPHjADN0UyORr6refbUXpU=
github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
+8
View File
@@ -608,6 +608,7 @@ func (s *RedisStore) storeIngressState(_ context.Context, ingressId string, stat
// Use a "transaction" to remove the old room association if it changed
txf := func(tx *redis.Tx) error {
var oldStartedAt int64
var oldUpdatedAt int64
oldState, err := s.loadIngressState(tx, ingressId)
switch err {
@@ -615,6 +616,7 @@ func (s *RedisStore) storeIngressState(_ context.Context, ingressId string, stat
// Ingress state doesn't exist yet
case nil:
oldStartedAt = oldState.StartedAt
oldUpdatedAt = oldState.UpdatedAt
default:
return err
}
@@ -625,6 +627,12 @@ func (s *RedisStore) storeIngressState(_ context.Context, ingressId string, stat
return ingress.ErrIngressOutOfDate
}
if state.StartedAt == oldStartedAt && state.UpdatedAt < oldUpdatedAt {
// Do not overwrite with an old state in case RPCs were delivered out of order.
// All RPCs come from the same ingress server and should thus be on the same clock.
return nil
}
p.Set(s.ctx, IngressStatePrefix+ingressId, data, 0)
return nil