diff --git a/go.mod b/go.mod index 071cda10a..9a19b25fc 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,8 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e - github.com/livekit/protocol v1.12.1-0.20240410060226-b6a979d8cfce + github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df + github.com/livekit/protocol v1.12.1-0.20240416154343-2d633a51d825 github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 4ef72eab7..dc97b17ca 100644 --- a/go.sum +++ b/go.sum @@ -118,10 +118,10 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e h1:ss4VwrouYiDpuNJ9BUTH+WsW+GDdJS70iZp8ii3/0Lc= -github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.12.1-0.20240410060226-b6a979d8cfce h1:cbuw8FQ5S1vX6avOmlj6f8IwliXsOGjOa3UR0YDucX8= -github.com/livekit/protocol v1.12.1-0.20240410060226-b6a979d8cfce/go.mod h1:jB6PWwf4tdMAwy+jxexqaVWuQiiklAtO4F5zZzWkTII= +github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df h1:DVhJRlF6/CtiyxJVy3QsbS9bf7GyUMuRZONMwZxIWpY= +github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= +github.com/livekit/protocol v1.12.1-0.20240416154343-2d633a51d825 h1:HJWtEtuiRoKE+/cRjLY0UHKnB6iMOAp1/17CXG6e5W0= +github.com/livekit/protocol v1.12.1-0.20240416154343-2d633a51d825/go.mod h1:jB6PWwf4tdMAwy+jxexqaVWuQiiklAtO4F5zZzWkTII= github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be h1:W1nCFZ19rYAORMBNX82NeVPHjADN0UyORr6refbUXpU= github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index 91b437f40..67a573874 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -608,6 +608,7 @@ func (s *RedisStore) storeIngressState(_ context.Context, ingressId string, stat // Use a "transaction" to remove the old room association if it changed txf := func(tx *redis.Tx) error { var oldStartedAt int64 + var oldUpdatedAt int64 oldState, err := s.loadIngressState(tx, ingressId) switch err { @@ -615,6 +616,7 @@ func (s *RedisStore) storeIngressState(_ context.Context, ingressId string, stat // Ingress state doesn't exist yet case nil: oldStartedAt = oldState.StartedAt + oldUpdatedAt = oldState.UpdatedAt default: return err } @@ -625,6 +627,12 @@ func (s *RedisStore) storeIngressState(_ context.Context, ingressId string, stat return ingress.ErrIngressOutOfDate } + if state.StartedAt == oldStartedAt && state.UpdatedAt < oldUpdatedAt { + // Do not overwrite with an old state in case RPCs were delivered out of order. + // All RPCs come from the same ingress server and should thus be on the same clock. + return nil + } + p.Set(s.ctx, IngressStatePrefix+ingressId, data, 0) return nil