Fix running multiple migration process in parallel (#5329)

This commit is contained in:
Quentin Gliech
2025-12-10 22:08:51 +01:00
committed by GitHub
10 changed files with 381 additions and 41 deletions

2
Cargo.lock generated
View File

@@ -3657,6 +3657,7 @@ version = "1.8.0"
dependencies = [
"async-trait",
"chrono",
"crc",
"futures-util",
"mas-data-model",
"mas-iana",
@@ -3673,6 +3674,7 @@ dependencies = [
"sha2",
"sqlx",
"thiserror 2.0.17",
"tokio",
"tracing",
"ulid",
"url",

View File

@@ -12,10 +12,9 @@ use clap::Parser;
use figment::Figment;
use mas_config::{ConfigurationSection, RootConfig, SyncConfig};
use mas_data_model::{Clock as _, SystemClock};
use mas_storage_pg::MIGRATOR;
use rand::SeedableRng;
use tokio::io::AsyncWriteExt;
use tracing::{Instrument, info, info_span};
use tracing::{info, info_span};
use crate::util::database_connection_from_config;
@@ -129,9 +128,7 @@ impl Options {
// Grab a connection to the database
let mut conn = database_connection_from_config(&config.database).await?;
MIGRATOR
.run(&mut conn)
.instrument(info_span!("db.migrate"))
mas_storage_pg::migrate(&mut conn)
.await
.context("could not run migrations")?;

View File

@@ -10,8 +10,7 @@ use anyhow::Context;
use clap::Parser;
use figment::Figment;
use mas_config::{ConfigurationSectionExt, DatabaseConfig};
use mas_storage_pg::MIGRATOR;
use tracing::{Instrument, info_span};
use tracing::info_span;
use crate::util::database_connection_from_config;
@@ -35,9 +34,7 @@ impl Options {
let mut conn = database_connection_from_config(&config).await?;
// Run pending migrations
MIGRATOR
.run(&mut conn)
.instrument(info_span!("db.migrate"))
mas_storage_pg::migrate(&mut conn)
.await
.context("could not run migrations")?;

View File

@@ -4,7 +4,7 @@
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
// Please see LICENSE files in the repository root for full details.
use std::{collections::BTreeSet, process::ExitCode, sync::Arc, time::Duration};
use std::{process::ExitCode, sync::Arc, time::Duration};
use anyhow::Context;
use clap::Parser;
@@ -18,9 +18,8 @@ use mas_data_model::SystemClock;
use mas_handlers::{ActivityTracker, CookieManager, Limiter, MetadataCache};
use mas_listener::server::Server;
use mas_router::UrlBuilder;
use mas_storage_pg::{MIGRATOR, PgRepositoryFactory};
use sqlx::migrate::Migrate;
use tracing::{Instrument, info, info_span, warn};
use mas_storage_pg::PgRepositoryFactory;
use tracing::{info, info_span, warn};
use crate::{
app_state::AppState,
@@ -73,24 +72,20 @@ impl Options {
let pool = database_pool_from_config(&config.database).await?;
if self.no_migrate {
// Check that we applied all the migrations
let mut conn = pool.acquire().await?;
let applied = conn.list_applied_migrations().await?;
let applied: BTreeSet<_> = applied.into_iter().map(|m| m.version).collect();
let has_missing_migrations = MIGRATOR.iter().any(|m| !applied.contains(&m.version));
if has_missing_migrations {
let pending_migrations = mas_storage_pg::pending_migrations(&mut conn).await?;
if !pending_migrations.is_empty() {
// Refuse to start if there are pending migrations
return Err(anyhow::anyhow!(
"The server is running with `--no-migrate` but there are pending. Please run them first with `mas-cli database migrate`, or omit the `--no-migrate` flag to apply them automatically on startup."
"The server is running with `--no-migrate` but there are pending migrations. Please run them first with `mas-cli database migrate`, or omit the `--no-migrate` flag to apply them automatically on startup."
));
}
} else {
info!("Running pending database migrations");
MIGRATOR
.run(&pool)
.instrument(info_span!("db.migrate"))
let mut conn = pool.acquire().await?;
mas_storage_pg::migrate(&mut conn)
.await
.context("could not run database migrations")?;
.context("could not run migrations")?;
}
let encrypter = config.secrets.encrypter().await?;

View File

@@ -14,13 +14,12 @@ use mas_config::{
UpstreamOAuth2Config,
};
use mas_data_model::SystemClock;
use mas_storage_pg::MIGRATOR;
use rand::thread_rng;
use sqlx::{Connection, Either, PgConnection, postgres::PgConnectOptions, types::Uuid};
use syn2mas::{
LockedMasDatabase, MasWriter, Progress, ProgressStage, SynapseReader, synapse_config,
};
use tracing::{Instrument, error, info, info_span};
use tracing::{Instrument, error, info};
use crate::util::{DatabaseConnectOptions, database_connection_from_config_with_options};
@@ -122,9 +121,7 @@ impl Options {
)
.await?;
MIGRATOR
.run(&mut mas_connection)
.instrument(info_span!("db.migrate"))
mas_storage_pg::migrate(&mut mas_connection)
.await
.context("could not run migrations")?;

View File

@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "SELECT current_database() as \"current_database!\"",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "current_database!",
"type_info": "Name"
}
],
"parameters": {
"Left": []
},
"nullable": [
null
]
},
"hash": "2f66991d7b9ba58f011d9aef0eb6a38f3b244c2f46444c0ab345de7feff54aba"
}

View File

@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT EXISTS (\n SELECT 1\n FROM information_schema.tables\n WHERE table_name = '_sqlx_migrations'\n ) AS \"exists!\"\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "exists!",
"type_info": "Bool"
}
],
"parameters": {
"Left": []
},
"nullable": [
null
]
},
"hash": "fbf926f630df5d588df4f1c9c0dc0f594332be5829d5d7c6b66183ac25b3d166"
}

View File

@@ -19,6 +19,7 @@ workspace = true
[dependencies]
async-trait.workspace = true
chrono.workspace = true
crc.workspace = true
futures-util.workspace = true
opentelemetry-semantic-conventions.workspace = true
opentelemetry.workspace = true
@@ -31,6 +32,7 @@ sha2.workspace = true
sqlx.workspace = true
thiserror.workspace = true
tracing.workspace = true
tokio.workspace = true
ulid.workspace = true
url.workspace = true
uuid.workspace = true

View File

@@ -160,7 +160,15 @@
#![deny(clippy::future_not_send, missing_docs)]
#![allow(clippy::module_name_repetitions, clippy::blocks_in_conditions)]
use sqlx::migrate::Migrator;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use ::tracing::{Instrument, debug, info, info_span, warn};
use opentelemetry_semantic_conventions::trace::DB_QUERY_TEXT;
use sqlx::{
Either, PgConnection,
migrate::{AppliedMigration, Migrate, MigrateError, Migration, Migrator},
postgres::{PgAdvisoryLock, PgAdvisoryLockKey},
};
pub mod app_session;
pub mod compat;
@@ -186,14 +194,290 @@ pub use self::{
tracing::ExecuteExt,
};
/// Embedded migrations, allowing them to run on startup
pub static MIGRATOR: Migrator = {
// XXX: The macro does not let us ignore missing migrations, so we have to do it
// like this. See https://github.com/launchbadge/sqlx/issues/1788
let mut m = sqlx::migrate!();
/// Embedded migrations in the binary
pub static MIGRATOR: Migrator = sqlx::migrate!();
// We manually removed some migrations because they made us depend on the
// `pgcrypto` extension. See: https://github.com/matrix-org/matrix-authentication-service/issues/1557
m.ignore_missing = true;
m
};
fn available_migrations() -> BTreeMap<i64, &'static Migration> {
MIGRATOR.iter().map(|m| (m.version, m)).collect()
}
/// This is the list of migrations we've removed from the migration history but
/// might have been applied in the past
#[allow(clippy::inconsistent_digit_grouping)]
const ALLOWED_MISSING_MIGRATIONS: &[i64] = &[
// https://github.com/matrix-org/matrix-authentication-service/pull/1585
20220709_210445,
20230330_210841,
20230408_110421,
];
fn allowed_missing_migrations() -> BTreeSet<i64> {
ALLOWED_MISSING_MIGRATIONS.iter().copied().collect()
}
/// This is a list of possible additional checksums from previous versions of
/// migrations. The checksum we store in the database is 48 bytes long. We're
/// not really concerned with partial hash collisions, and to avoid this file to
/// be completely unreadable, we only store the upper 16 bytes of that hash.
#[allow(clippy::inconsistent_digit_grouping)]
const ALLOWED_ALTERNATE_CHECKSUMS: &[(i64, u128)] = &[
// https://github.com/element-hq/matrix-authentication-service/pull/5300
(20250410_000000, 0x8811_c3ef_dbee_8c00_5b49_25da_5d55_9c3f),
(20250410_000001, 0x7990_37b3_2193_8a5d_c72f_bccd_95fd_82e5),
(20250410_000002, 0xf2b8_f120_deae_27e7_60d0_79a3_0b77_eea3),
(20250410_000003, 0x06be_fc2b_cedc_acf4_b981_02c7_b40c_c469),
(20250410_000004, 0x0a90_9c6a_dba7_545c_10d9_60eb_6d30_2f50),
(20250410_000006, 0xcc7f_5152_6497_5729_d94b_be0d_9c95_8316),
(20250410_000007, 0x12e7_cfab_a017_a5a5_4f2c_18fa_541c_ce62),
(20250410_000008, 0x171d_62e5_ee1a_f0d9_3639_6c5a_277c_54cd),
(20250410_000009, 0xb1a0_93c7_6645_92ad_df45_b395_57bb_a281),
(20250410_000010, 0x8089_86ac_7cff_8d86_2850_d287_cdb1_2b57),
(20250410_000011, 0x8d9d_3fae_02c9_3d3f_81e4_6242_2b39_b5b8),
(20250410_000012, 0x9805_1372_41aa_d5b0_ebe1_ba9d_28c7_faf6),
(20250410_000013, 0x7291_9a97_e4d1_0d45_1791_6e8c_3f2d_e34d),
(20250410_000014, 0x811d_f965_8127_e168_4aa2_f177_a4e6_f077),
(20250410_000015, 0xa639_0780_aab7_d60d_5fcb_771d_13ed_73ee),
(20250410_000016, 0x22b6_e909_6de4_39e3_b2b9_c684_7417_fe07),
(20250410_000017, 0x9dfe_b6d3_89e4_e509_651b_2793_8d8d_cd32),
(20250410_000018, 0x638f_bdbc_2276_5094_020b_cec1_ab95_c07f),
(20250410_000019, 0xa283_84bc_5fd5_7cbd_b5fb_b5fe_0255_6845),
(20250410_000020, 0x17d1_54b1_7c6e_fc48_61dd_da3d_f8a5_9546),
(20250410_000022, 0xbc36_af82_994a_6f93_8aca_a46b_fc3c_ffde),
(20250410_000023, 0x54ec_3b07_ac79_443b_9e18_a2b3_2d17_5ab9),
(20250410_000024, 0x8ab4_4f80_00b6_58b2_d757_c40f_bc72_3d87),
(20250410_000025, 0x5dc4_2ff3_3042_2f45_046d_10af_ab3a_b583),
(20250410_000026, 0x5263_c547_0b64_6425_5729_48b2_ce84_7cad),
(20250410_000027, 0x0aad_cb50_1d6a_7794_9017_d24d_55e7_1b9d),
(20250410_000028, 0x8fc1_92f8_68df_ca4e_3e2b_cddf_bc12_cffe),
(20250410_000029, 0x416c_9446_b6a3_1b49_2940_a8ac_c1c2_665a),
(20250410_000030, 0x83a5_e51e_25a6_77fb_2b79_6ea5_db1e_364f),
(20250410_000031, 0xfa18_a707_9438_dbc7_2cde_b5f1_ee21_5c7e),
(20250410_000032, 0xd669_662e_8930_838a_b142_c3fa_7b39_d2a0),
(20250410_000033, 0x4019_1053_cabc_191c_c02e_9aa9_407c_0de5),
(20250410_000034, 0xdd59_e595_24e6_4dad_c5f7_fef2_90b8_df57),
(20250410_000035, 0x09b4_ea53_2da4_9c39_eb10_db33_6a6d_608b),
(20250410_000036, 0x3ca5_9c78_8480_e342_d729_907c_d293_2049),
(20250410_000037, 0xc857_2a10_450b_0612_822c_2b86_535a_ea7d),
(20250410_000038, 0x1642_39da_9c3b_d9fd_b1e1_72b1_db78_b978),
(20250410_000039, 0xdd70_b211_6016_bb84_0d84_f04e_eb8a_59d9),
(20250410_000040, 0xe435_ead6_c363_a0b6_e048_dd85_0ecb_9499),
(20250410_000041, 0xe9f3_122f_70d4_9839_c818_4b18_0192_ae26),
(20250410_000043, 0xec5e_1400_483d_c4bf_6014_aba4_ffc3_6236),
(20250410_000044, 0x4750_5eba_4095_6664_78d0_27f9_64bf_64f4),
(20250410_000045, 0x9a53_bd70_4cad_2bf1_61d4_f143_0c82_681d),
(20250410_121612, 0x25f0_9d20_a897_df18_162d_1c47_b68e_81bd),
(20250602_212101, 0xd1a8_782c_b3f0_5045_3f46_49a0_bab0_822b),
(20250708_155857, 0xb78e_6957_a588_c16a_d292_a0c7_cae9_f290),
(20250915_092635, 0x6854_d58b_99d7_3ac5_82f8_25e5_b1c3_cc0b),
(20251127_145951, 0x3bcd_d92e_8391_2a2c_8a18_1d76_354f_96c6),
];
fn alternate_checksums_map() -> BTreeMap<i64, HashSet<u128>> {
let mut map = BTreeMap::new();
for (version, checksum) in ALLOWED_ALTERNATE_CHECKSUMS {
map.entry(*version)
.or_insert_with(HashSet::new)
.insert(*checksum);
}
map
}
/// Load the list of applied migrations into a map.
///
/// It's important to use a [`BTreeMap`] so that the migrations are naturally
/// ordered by version.
async fn applied_migrations_map(
conn: &mut PgConnection,
) -> Result<BTreeMap<i64, AppliedMigration>, MigrateError> {
let applied_migrations = conn
.list_applied_migrations()
.await?
.into_iter()
.map(|m| (m.version, m))
.collect();
Ok(applied_migrations)
}
/// Checks if the migration table exists
async fn migration_table_exists(conn: &mut PgConnection) -> Result<bool, sqlx::Error> {
sqlx::query_scalar!(
r#"
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = '_sqlx_migrations'
) AS "exists!"
"#,
)
.fetch_one(conn)
.await
}
/// Run the migrations on the given connection
///
/// This function acquires an advisory lock on the database to ensure that only
/// one migrator is running at a time.
///
/// # Errors
///
/// This function returns an error if the migration fails.
#[::tracing::instrument(name = "db.migrate", skip_all, err)]
pub async fn migrate(conn: &mut PgConnection) -> Result<(), MigrateError> {
// Get the database name and use it to derive an advisory lock key. This
// is the same lock key used by SQLx default migrator, so that it works even
// with older versions of MAS, and when running through `cargo sqlx migrate run`
let database_name = sqlx::query_scalar!(r#"SELECT current_database() as "current_database!""#)
.fetch_one(&mut *conn)
.await
.map_err(MigrateError::from)?;
let lock =
PgAdvisoryLock::with_key(PgAdvisoryLockKey::BigInt(generate_lock_id(&database_name)));
// Try to acquire the migration lock in a loop.
//
// The reason we do that with a `try_acquire` is because in Postgres, `CREATE
// INDEX CONCURRENTLY` will *not* complete whilst an advisory lock is being
// acquired on another connection. This then means that if we run two
// migration process at the same time, one of them will go through and block
// on concurrent index creations, because the other will get stuck trying to
// acquire this lock.
//
// To avoid this, we use `try_acquire`/`pg_advisory_lock_try` in a loop, which
// will fail immediately if the lock is held by another connection, allowing
// potential 'CREATE INDEX CONCURRENTLY' statements to complete.
let mut backoff = std::time::Duration::from_millis(250);
let mut conn = conn;
let mut locked_connection = loop {
match lock.try_acquire(conn).await? {
Either::Left(guard) => break guard,
Either::Right(conn_) => {
warn!(
"Another process is already running migrations on the database, waiting {duration}s and trying again…",
duration = backoff.as_secs_f32()
);
tokio::time::sleep(backoff).await;
backoff = std::cmp::min(backoff * 2, std::time::Duration::from_secs(5));
conn = conn_;
}
}
};
// Creates the migration table if missing
// We check if the table exists before calling `ensure_migrations_table` to
// avoid the pesky 'relation "_sqlx_migrations" already exists, skipping' notice
if !migration_table_exists(locked_connection.as_mut()).await? {
locked_connection.as_mut().ensure_migrations_table().await?;
}
for migration in pending_migrations(locked_connection.as_mut()).await? {
info!(
"Applying migration {version}: {description}",
version = migration.version,
description = migration.description
);
locked_connection
.as_mut()
.apply(migration)
.instrument(info_span!(
"db.migrate.run_migration",
db.migration.version = migration.version,
db.migration.description = &*migration.description,
{ DB_QUERY_TEXT } = &*migration.sql,
))
.await?;
}
locked_connection.release_now().await?;
Ok(())
}
/// Get the list of pending migrations
///
/// # Errors
///
/// This function returns an error if there is a problem checking the applied
/// migrations
pub async fn pending_migrations(
conn: &mut PgConnection,
) -> Result<Vec<&'static Migration>, MigrateError> {
// Load the maps of available migrations, applied migrations, migrations that
// are allowed to be missing, alternate checksums for migrations that changed
let available_migrations = available_migrations();
let allowed_missing = allowed_missing_migrations();
let alternate_checksums = alternate_checksums_map();
let applied_migrations = if migration_table_exists(&mut *conn).await? {
applied_migrations_map(&mut *conn).await?
} else {
BTreeMap::new()
};
// Check that all applied migrations are still valid
for applied_migration in applied_migrations.values() {
// Check that we know about the applied migration
if let Some(migration) = available_migrations.get(&applied_migration.version) {
// Check the migration checksum
if applied_migration.checksum != migration.checksum {
// The checksum we have in the database doesn't match the one we
// have embedded. This might be because a migration was
// intentionally changed, so we check the alternate checksums
if let Some(alternates) = alternate_checksums.get(&applied_migration.version) {
// This converts the first 16 bytes of the checksum into a u128
let Some(applied_checksum_prefix) = applied_migration
.checksum
.get(..16)
.and_then(|bytes| bytes.try_into().ok())
.map(u128::from_be_bytes)
else {
return Err(MigrateError::ExecuteMigration(
sqlx::Error::InvalidArgument(
"checksum stored in database is invalid".to_owned(),
),
applied_migration.version,
));
};
if !alternates.contains(&applied_checksum_prefix) {
warn!(
"The database has a migration applied ({version}) which has known alternative checksums {alternates:x?}, but none of them matched {applied_checksum_prefix:x}",
version = applied_migration.version,
);
return Err(MigrateError::VersionMismatch(applied_migration.version));
}
} else {
return Err(MigrateError::VersionMismatch(applied_migration.version));
}
}
} else if allowed_missing.contains(&applied_migration.version) {
// The migration is missing, but allowed to be missing
debug!(
"The database has a migration applied ({version}) that doesn't exist anymore, but it was intentionally removed",
version = applied_migration.version
);
} else {
// The migration is missing, warn about it
warn!(
"The database has a migration applied ({version}) that doesn't exist anymore! This should not happen, unless rolling back to an older version of MAS.",
version = applied_migration.version
);
}
}
Ok(available_migrations
.values()
.copied()
.filter(|migration| {
!migration.migration_type.is_down_migration()
&& !applied_migrations.contains_key(&migration.version)
})
.collect())
}
// Copied from the sqlx source code, so that we generate the same lock ID
fn generate_lock_id(database_name: &str) -> i64 {
const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
// 0x3d32ad9e chosen by fair dice roll
0x3d32_ad9e * i64::from(CRC_IEEE.checksum(database_name.as_bytes()))
}

View File

@@ -40,7 +40,7 @@ cargo sqlx prepare
## Migrations
Migration files live in the `migrations` folder in the `mas-core` crate.
Migration files live in the `migrations` folder in the `mas-storage-pg` crate.
```sh
cd crates/storage-pg/ # Again, in the mas-storage-pg crate folder
@@ -50,3 +50,29 @@ cargo sqlx migrate add [description] # Add new migration files
```
Note that migrations are embedded in the final binary and can be run from the service CLI tool.
### Removing migrations
For various reasons, we may want to delete migrations.
In case we do, we *must* declare that migration version as allowed to be missing.
This is because on startup, MAS will validate that all the applied migrations are known, and warn if some are missing.
To do so, get the migration version and add it to the `ALLOWED_MISSING_MIGRATIONS` array in the `mas-storage-pg` crate.
### Modifying existing migrations
We may want to modify existing migrations to fix mistakes.
In case we do, we *must* save the hash of the original migration file so that MAS can validate it on startup.
To do so, extract the first 16 bytes of the existing applied migration and append it to the `ALLOWED_ALTERNATE_CHECKSUMS` array in the `mas-storage-pg` crate.
```sql
SELECT version, ENCODE(SUBSTRING(checksum FOR 16), 'hex') AS short_checksum
FROM _sqlx_migrations
WHERE version = 20250410000002;
```
```
version | short_checksum
----------------+----------------------------------
20250410000002 | f2b8f120deae27e760d079a30b77eea3
```