mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-29 10:10:06 +00:00
support for additional database migrations (#1644)
This commit is contained in:
@@ -2516,7 +2516,7 @@ execAgentStoreSQL :: AgentClient -> Text -> AE [Text]
|
||||
execAgentStoreSQL c sql = withAgentEnv c $ withStore' c (`execSQL` sql)
|
||||
|
||||
getAgentMigrations :: AgentClient -> AE [UpMigration]
|
||||
getAgentMigrations c = withAgentEnv c $ map upMigration <$> withStore' c getCurrentMigrations
|
||||
getAgentMigrations c = withAgentEnv c $ map upMigration <$> withStore' c (getCurrentMigrations Nothing)
|
||||
|
||||
debugAgentLocks :: AgentClient -> IO AgentLocks
|
||||
debugAgentLocks AgentClient {connLocks = cs, invLocks = is, deleteLock = d} = do
|
||||
|
||||
@@ -8,6 +8,7 @@ module Simplex.Messaging.Agent.Store.Postgres
|
||||
( DBOpts (..),
|
||||
Migrations.getCurrentMigrations,
|
||||
checkSchemaExists,
|
||||
migrateDBSchema,
|
||||
createDBStore,
|
||||
closeDBStore,
|
||||
reopenDBStore,
|
||||
@@ -38,18 +39,20 @@ import System.Exit (exitFailure)
|
||||
-- If passed schema does not exist in connectInfo database, it will be created.
|
||||
-- Applies necessary migrations to schema.
|
||||
createDBStore :: DBOpts -> [Migration] -> MigrationConfig -> IO (Either MigrationError DBStore)
|
||||
createDBStore opts migrations MigrationConfig {confirm} = do
|
||||
createDBStore opts migrations migrationConfig = do
|
||||
st <- connectPostgresStore opts
|
||||
r <- migrateSchema st `onException` closeDBStore st
|
||||
r <- migrateDBSchema st opts Nothing migrations migrationConfig `onException` closeDBStore st
|
||||
case r of
|
||||
Right () -> pure $ Right st
|
||||
Left e -> closeDBStore st $> Left e
|
||||
where
|
||||
migrateSchema st =
|
||||
let initialize = Migrations.initialize st
|
||||
getCurrent = withTransaction st Migrations.getCurrentMigrations
|
||||
dbm = DBMigrate {initialize, getCurrent, run = Migrations.run st, backup = Nothing}
|
||||
in sharedMigrateSchema dbm (dbNew st) migrations confirm
|
||||
|
||||
migrateDBSchema :: DBStore -> DBOpts -> Maybe Query -> [Migration] -> MigrationConfig -> IO (Either MigrationError ())
|
||||
migrateDBSchema st _opts migrationsTable migrations MigrationConfig {confirm} =
|
||||
let initialize = Migrations.initialize st migrationsTable
|
||||
getCurrent = withTransaction st $ Migrations.getCurrentMigrations migrationsTable
|
||||
run = Migrations.run st migrationsTable
|
||||
dbm = DBMigrate {initialize, getCurrent, run, backup = Nothing}
|
||||
in sharedMigrateSchema dbm (dbNew st) migrations confirm
|
||||
|
||||
connectPostgresStore :: DBOpts -> IO DBStore
|
||||
connectPostgresStore DBOpts {connstr, schema, poolSize, createSchema} = do
|
||||
|
||||
@@ -14,55 +14,50 @@ where
|
||||
import Control.Exception (throwIO)
|
||||
import Control.Monad (void)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Maybe (fromMaybe)
|
||||
import qualified Data.Text as T
|
||||
import qualified Data.Text.Encoding as TE
|
||||
import Data.Time.Clock (getCurrentTime)
|
||||
import qualified Database.PostgreSQL.LibPQ as LibPQ
|
||||
import Database.PostgreSQL.Simple (Only (..))
|
||||
import Database.PostgreSQL.Simple (Only (..), Query)
|
||||
import qualified Database.PostgreSQL.Simple as PSQL
|
||||
import Database.PostgreSQL.Simple.Internal (Connection (..))
|
||||
import Database.PostgreSQL.Simple.SqlQQ (sql)
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Common
|
||||
import Simplex.Messaging.Agent.Store.Shared
|
||||
import Simplex.Messaging.Util (($>>=))
|
||||
import UnliftIO.MVar
|
||||
|
||||
initialize :: DBStore -> IO ()
|
||||
initialize st = withTransaction' st $ \db ->
|
||||
void $
|
||||
PSQL.execute_
|
||||
db
|
||||
[sql|
|
||||
CREATE TABLE IF NOT EXISTS migrations (
|
||||
name TEXT NOT NULL,
|
||||
ts TIMESTAMP NOT NULL,
|
||||
down TEXT,
|
||||
PRIMARY KEY (name)
|
||||
)
|
||||
|]
|
||||
initialize :: DBStore -> Maybe Query -> IO ()
|
||||
initialize st migrationsTable = withTransaction' st $ \db ->
|
||||
void $ PSQL.execute_ db $
|
||||
"CREATE TABLE IF NOT EXISTS "
|
||||
<> fromMaybe "migrations" migrationsTable
|
||||
<> " (name TEXT NOT NULL PRIMARY KEY, ts TIMESTAMP NOT NULL, down TEXT)"
|
||||
|
||||
run :: DBStore -> MigrationsToRun -> IO ()
|
||||
run st = \case
|
||||
run :: DBStore -> Maybe Query -> MigrationsToRun -> IO ()
|
||||
run st migrationsTable = \case
|
||||
MTRUp [] -> pure ()
|
||||
MTRUp ms -> mapM_ runUp ms
|
||||
MTRDown ms -> mapM_ runDown $ reverse ms
|
||||
MTRNone -> pure ()
|
||||
where
|
||||
table = fromMaybe "migrations" migrationsTable
|
||||
runUp Migration {name, up, down} = withTransaction' st $ \db -> do
|
||||
insert db
|
||||
execSQL db up
|
||||
where
|
||||
insert db = void $ PSQL.execute db "INSERT INTO migrations (name, down, ts) VALUES (?,?,?)" . (name,down,) =<< getCurrentTime
|
||||
insert db = void $ PSQL.execute db ("INSERT INTO " <> table <> " (name, down, ts) VALUES (?,?,?)") . (name,down,) =<< getCurrentTime
|
||||
runDown DownMigration {downName, downQuery} = withTransaction' st $ \db -> do
|
||||
execSQL db downQuery
|
||||
void $ PSQL.execute db "DELETE FROM migrations WHERE name = ?" (Only downName)
|
||||
void $ PSQL.execute db ("DELETE FROM " <> table <> " WHERE name = ?") (Only downName)
|
||||
execSQL db query =
|
||||
withMVar (connectionHandle db) $ \pqConn ->
|
||||
LibPQ.exec pqConn (TE.encodeUtf8 query) $>>= LibPQ.resultErrorMessage >>= \case
|
||||
Just e | not (B.null e) -> throwIO $ userError $ B.unpack e
|
||||
_ -> pure ()
|
||||
|
||||
getCurrentMigrations :: PSQL.Connection -> IO [Migration]
|
||||
getCurrentMigrations db = map toMigration <$> PSQL.query_ db "SELECT name, down FROM migrations ORDER BY name ASC;"
|
||||
getCurrentMigrations :: Maybe Query -> PSQL.Connection -> IO [Migration]
|
||||
getCurrentMigrations migrationsTable db = map toMigration <$> PSQL.query_ db ("SELECT name, down FROM " <> table <> " ORDER BY name ASC;")
|
||||
where
|
||||
table = fromMaybe "migrations" migrationsTable
|
||||
toMigration (name, down) = Migration {name, up = T.pack "", down}
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
module Simplex.Messaging.Agent.Store.SQLite
|
||||
( DBOpts (..),
|
||||
Migrations.getCurrentMigrations,
|
||||
migrateDBSchema,
|
||||
createDBStore,
|
||||
closeDBStore,
|
||||
reopenDBStore,
|
||||
@@ -68,25 +69,27 @@ import UnliftIO.STM
|
||||
-- * SQLite Store implementation
|
||||
|
||||
createDBStore :: DBOpts -> [Migration] -> MigrationConfig -> IO (Either MigrationError DBStore)
|
||||
createDBStore DBOpts {dbFilePath, dbKey, keepKey, track, vacuum} migrations MigrationConfig {confirm, backupPath} = do
|
||||
createDBStore opts@DBOpts {dbFilePath, dbKey, keepKey, track} migrations migrationConfig = do
|
||||
let dbDir = takeDirectory dbFilePath
|
||||
createDirectoryIfMissing True dbDir
|
||||
st <- connectSQLiteStore dbFilePath dbKey keepKey track
|
||||
r <- migrateSchema st `onException` closeDBStore st
|
||||
r <- migrateDBSchema st opts Nothing migrations migrationConfig `onException` closeDBStore st
|
||||
case r of
|
||||
Right () -> pure $ Right st
|
||||
Left e -> closeDBStore st $> Left e
|
||||
where
|
||||
migrateSchema st =
|
||||
let initialize = Migrations.initialize st
|
||||
getCurrent = withTransaction st Migrations.getCurrentMigrations
|
||||
run = Migrations.run st vacuum
|
||||
backup = mkBackup <$> backupPath
|
||||
mkBackup bp =
|
||||
let f = if null bp then dbFilePath else bp </> takeFileName dbFilePath
|
||||
in copyFile dbFilePath $ f <> ".bak"
|
||||
dbm = DBMigrate {initialize, getCurrent, run, backup}
|
||||
in sharedMigrateSchema dbm (dbNew st) migrations confirm
|
||||
|
||||
migrateDBSchema :: DBStore -> DBOpts -> Maybe Query -> [Migration] -> MigrationConfig -> IO (Either MigrationError ())
|
||||
migrateDBSchema st DBOpts {dbFilePath, vacuum} migrationsTable migrations MigrationConfig {confirm, backupPath} =
|
||||
let initialize = Migrations.initialize st migrationsTable
|
||||
getCurrent = withTransaction st $ Migrations.getCurrentMigrations migrationsTable
|
||||
run = Migrations.run st migrationsTable vacuum
|
||||
backup = mkBackup <$> backupPath
|
||||
mkBackup bp =
|
||||
let f = if null bp then dbFilePath else bp </> takeFileName dbFilePath
|
||||
in copyFile dbFilePath $ f <> ".bak"
|
||||
dbm = DBMigrate {initialize, getCurrent, run, backup}
|
||||
in sharedMigrateSchema dbm (dbNew st) migrations confirm
|
||||
|
||||
connectSQLiteStore :: FilePath -> ScrubbedBytes -> Bool -> DB.TrackQueries -> IO DBStore
|
||||
connectSQLiteStore dbFilePath key keepKey track = do
|
||||
|
||||
@@ -17,12 +17,12 @@ where
|
||||
import Control.Monad (forM_, when)
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Data.Text (Text)
|
||||
import Data.Text.Encoding (decodeLatin1)
|
||||
import Data.Time.Clock (getCurrentTime)
|
||||
import Database.SQLite.Simple (Only (..), Query (..))
|
||||
import qualified Database.SQLite.Simple as SQL
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
import qualified Database.SQLite3 as SQLite3
|
||||
import Simplex.Messaging.Agent.Protocol (extraSMPServerHosts)
|
||||
import qualified Simplex.Messaging.Agent.Store.DB as DB
|
||||
@@ -32,13 +32,16 @@ import Simplex.Messaging.Agent.Store.Shared
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Transport.Client (TransportHost)
|
||||
|
||||
getCurrentMigrations :: DB.Connection -> IO [Migration]
|
||||
getCurrentMigrations DB.Connection {DB.conn} = map toMigration <$> SQL.query_ conn "SELECT name, down FROM migrations ORDER BY name ASC;"
|
||||
getCurrentMigrations :: Maybe Query -> DB.Connection -> IO [Migration]
|
||||
getCurrentMigrations migrationsTable DB.Connection {DB.conn} =
|
||||
map toMigration
|
||||
<$> SQL.query_ conn ("SELECT name, down FROM " <> table <> " ORDER BY name ASC;")
|
||||
where
|
||||
table = fromMaybe "migrations" migrationsTable
|
||||
toMigration (name, down) = Migration {name, up = "", down}
|
||||
|
||||
run :: DBStore -> Bool -> MigrationsToRun -> IO ()
|
||||
run st vacuum = \case
|
||||
run :: DBStore -> Maybe Query -> Bool -> MigrationsToRun -> IO ()
|
||||
run st migrationsTable vacuum = \case
|
||||
MTRUp [] -> pure ()
|
||||
MTRUp ms -> do
|
||||
mapM_ runUp ms
|
||||
@@ -46,11 +49,12 @@ run st vacuum = \case
|
||||
MTRDown ms -> mapM_ runDown $ reverse ms
|
||||
MTRNone -> pure ()
|
||||
where
|
||||
table = fromMaybe "migrations" migrationsTable
|
||||
runUp Migration {name, up, down} = withTransaction' st $ \db -> do
|
||||
when (name == "m20220811_onion_hosts") $ updateServers db
|
||||
insert db >> execSQL db up'
|
||||
where
|
||||
insert db = SQL.execute db "INSERT INTO migrations (name, down, ts) VALUES (?,?,?)" . (name,down,) =<< getCurrentTime
|
||||
insert db = SQL.execute db ("INSERT INTO " <> table <> " (name, down, ts) VALUES (?,?,?)") . (name,down,) =<< getCurrentTime
|
||||
up'
|
||||
| dbNew st && name == "m20230110_users" = fromQuery new_m20230110_users
|
||||
| otherwise = up
|
||||
@@ -59,24 +63,19 @@ run st vacuum = \case
|
||||
in SQL.execute db "UPDATE servers SET host = ? WHERE host = ?" (hs, decodeLatin1 $ strEncode h)
|
||||
runDown DownMigration {downName, downQuery} = withTransaction' st $ \db -> do
|
||||
execSQL db downQuery
|
||||
SQL.execute db "DELETE FROM migrations WHERE name = ?" (Only downName)
|
||||
SQL.execute db ("DELETE FROM " <> table <> " WHERE name = ?") (Only downName)
|
||||
execSQL db = SQLite3.exec $ SQL.connectionHandle db
|
||||
|
||||
initialize :: DBStore -> IO ()
|
||||
initialize st = withTransaction' st $ \db -> do
|
||||
cs :: [Text] <- map fromOnly <$> SQL.query_ db "SELECT name FROM pragma_table_info('migrations')"
|
||||
initialize :: DBStore -> Maybe Query -> IO ()
|
||||
initialize st migrationsTable = withTransaction' st $ \db -> do
|
||||
cs :: [Text] <- map fromOnly <$> SQL.query_ db ("SELECT name FROM pragma_table_info('" <> table <> "')")
|
||||
case cs of
|
||||
[] -> createMigrations db
|
||||
_ -> when ("down" `notElem` cs) $ SQL.execute_ db "ALTER TABLE migrations ADD COLUMN down TEXT"
|
||||
_ -> when ("down" `notElem` cs) $ SQL.execute_ db $ "ALTER TABLE " <> table <> " ADD COLUMN down TEXT"
|
||||
where
|
||||
table = fromMaybe "migrations" migrationsTable
|
||||
createMigrations db =
|
||||
SQL.execute_
|
||||
db
|
||||
[sql|
|
||||
CREATE TABLE IF NOT EXISTS migrations (
|
||||
name TEXT NOT NULL,
|
||||
ts TEXT NOT NULL,
|
||||
down TEXT,
|
||||
PRIMARY KEY (name)
|
||||
);
|
||||
|]
|
||||
SQL.execute_ db $
|
||||
"CREATE TABLE IF NOT EXISTS "
|
||||
<> table
|
||||
<> " (name TEXT NOT NULL PRIMARY KEY, ts TEXT NOT NULL, down TEXT)"
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
CREATE TABLE migrations(
|
||||
name TEXT NOT NULL,
|
||||
name TEXT NOT NULL PRIMARY KEY,
|
||||
ts TEXT NOT NULL,
|
||||
down TEXT,
|
||||
PRIMARY KEY(name)
|
||||
down TEXT
|
||||
);
|
||||
CREATE TABLE servers(
|
||||
host TEXT NOT NULL,
|
||||
|
||||
@@ -642,7 +642,7 @@ testReopenEncryptedStoreKeepKey = do
|
||||
hasMigrations st
|
||||
|
||||
getMigrations :: DBStore -> IO Bool
|
||||
getMigrations st = not . null <$> withTransaction st getCurrentMigrations
|
||||
getMigrations st = not . null <$> withTransaction st (getCurrentMigrations Nothing)
|
||||
|
||||
hasMigrations :: DBStore -> Expectation
|
||||
hasMigrations st = getMigrations st `shouldReturn` True
|
||||
|
||||
@@ -76,14 +76,14 @@ testSchemaMigrations = do
|
||||
putStrLn $ "down migration " <> name m
|
||||
let downMigr = fromJust $ toDownMigration m
|
||||
schema <- getSchema testDB testSchema
|
||||
Migrations.run st True $ MTRUp [m]
|
||||
Migrations.run st Nothing True $ MTRUp [m]
|
||||
schema' <- getSchema testDB testSchema
|
||||
schema' `shouldNotBe` schema
|
||||
Migrations.run st True $ MTRDown [downMigr]
|
||||
Migrations.run st Nothing True $ MTRDown [downMigr]
|
||||
unless (name m `elem` skipComparisonForDownMigrations) $ do
|
||||
schema'' <- getSchema testDB testSchema
|
||||
schema'' `shouldBe` schema
|
||||
Migrations.run st True $ MTRUp [m]
|
||||
Migrations.run st Nothing True $ MTRUp [m]
|
||||
schema''' <- getSchema testDB testSchema
|
||||
schema''' `shouldBe` schema'
|
||||
|
||||
|
||||
@@ -44,14 +44,14 @@ postgresSchemaDumpTest migrations skipComparisonForDownMigrations testDBOpts@DBO
|
||||
putStrLn $ "down migration " <> name m
|
||||
let downMigr = fromJust $ toDownMigration m
|
||||
schema <- getSchema testSchemaPath
|
||||
Migrations.run st $ MTRUp [m]
|
||||
Migrations.run st Nothing $ MTRUp [m]
|
||||
schema' <- getSchema testSchemaPath
|
||||
schema' `shouldNotBe` schema
|
||||
Migrations.run st $ MTRDown [downMigr]
|
||||
Migrations.run st Nothing $ MTRDown [downMigr]
|
||||
unless (name m `elem` skipComparisonForDownMigrations) $ do
|
||||
schema'' <- getSchema testSchemaPath
|
||||
schema'' `shouldBe` schema
|
||||
Migrations.run st $ MTRUp [m]
|
||||
Migrations.run st Nothing $ MTRUp [m]
|
||||
schema''' <- getSchema testSchemaPath
|
||||
schema''' `shouldBe` schema'
|
||||
|
||||
|
||||
Reference in New Issue
Block a user