database migrations (#153)

* database migrations

* fix: reverse order of down migrations

* use positional parameters in queries

* simplify migrations

* typo

* rename SchemaMigration to Migration

* move store initialization to IO monad
This commit is contained in:
Evgeny Poberezkin
2021-05-31 16:38:35 +01:00
committed by GitHub
parent 92bd8ef335
commit 84ce001598
10 changed files with 248 additions and 196 deletions

View File

@@ -70,7 +70,7 @@ jobs:
- name: Build & test
id: build_test
run: |
stack build --test
stack build --test --force-dirty
echo "::set-output name=LOCAL_INSTALL_ROOT::$(stack path --local-install-root)"
- name: Upload binaries to release

1
.gitignore vendored
View File

@@ -1,4 +1,5 @@
*.lock
*.cabal
*.db
*.db.bak
*.session.sql

View File

@@ -0,0 +1,117 @@
CREATE TABLE IF NOT EXISTS servers(
host TEXT NOT NULL,
port TEXT NOT NULL,
key_hash BLOB,
PRIMARY KEY (host, port)
) WITHOUT ROWID;
CREATE TABLE IF NOT EXISTS rcv_queues(
host TEXT NOT NULL,
port TEXT NOT NULL,
rcv_id BLOB NOT NULL,
conn_alias BLOB NOT NULL,
rcv_private_key BLOB NOT NULL,
snd_id BLOB,
snd_key BLOB,
decrypt_key BLOB NOT NULL,
verify_key BLOB,
status TEXT NOT NULL,
PRIMARY KEY (host, port, rcv_id),
FOREIGN KEY (host, port) REFERENCES servers (host, port),
FOREIGN KEY (conn_alias)
REFERENCES connections (conn_alias)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED,
UNIQUE (host, port, snd_id)
) WITHOUT ROWID;
CREATE TABLE IF NOT EXISTS snd_queues(
host TEXT NOT NULL,
port TEXT NOT NULL,
snd_id BLOB NOT NULL,
conn_alias BLOB NOT NULL,
snd_private_key BLOB NOT NULL,
encrypt_key BLOB NOT NULL,
sign_key BLOB NOT NULL,
status TEXT NOT NULL,
PRIMARY KEY (host, port, snd_id),
FOREIGN KEY (host, port) REFERENCES servers (host, port),
FOREIGN KEY (conn_alias)
REFERENCES connections (conn_alias)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED
) WITHOUT ROWID;
CREATE TABLE IF NOT EXISTS connections(
conn_alias BLOB NOT NULL,
rcv_host TEXT,
rcv_port TEXT,
rcv_id BLOB,
snd_host TEXT,
snd_port TEXT,
snd_id BLOB,
last_internal_msg_id INTEGER NOT NULL,
last_internal_rcv_msg_id INTEGER NOT NULL,
last_internal_snd_msg_id INTEGER NOT NULL,
last_external_snd_msg_id INTEGER NOT NULL,
last_rcv_msg_hash BLOB NOT NULL,
last_snd_msg_hash BLOB NOT NULL,
PRIMARY KEY (conn_alias),
FOREIGN KEY (rcv_host, rcv_port, rcv_id) REFERENCES rcv_queues (host, port, rcv_id),
FOREIGN KEY (snd_host, snd_port, snd_id) REFERENCES snd_queues (host, port, snd_id)
) WITHOUT ROWID;
CREATE TABLE IF NOT EXISTS messages(
conn_alias BLOB NOT NULL,
internal_id INTEGER NOT NULL,
internal_ts TEXT NOT NULL,
internal_rcv_id INTEGER,
internal_snd_id INTEGER,
body TEXT NOT NULL,
PRIMARY KEY (conn_alias, internal_id),
FOREIGN KEY (conn_alias)
REFERENCES connections (conn_alias)
ON DELETE CASCADE,
FOREIGN KEY (conn_alias, internal_rcv_id)
REFERENCES rcv_messages (conn_alias, internal_rcv_id)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED,
FOREIGN KEY (conn_alias, internal_snd_id)
REFERENCES snd_messages (conn_alias, internal_snd_id)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED
) WITHOUT ROWID;
CREATE TABLE IF NOT EXISTS rcv_messages(
conn_alias BLOB NOT NULL,
internal_rcv_id INTEGER NOT NULL,
internal_id INTEGER NOT NULL,
external_snd_id INTEGER NOT NULL,
external_snd_ts TEXT NOT NULL,
broker_id BLOB NOT NULL,
broker_ts TEXT NOT NULL,
rcv_status TEXT NOT NULL,
ack_brocker_ts TEXT,
ack_sender_ts TEXT,
internal_hash BLOB NOT NULL,
external_prev_snd_hash BLOB NOT NULL,
integrity BLOB NOT NULL,
PRIMARY KEY (conn_alias, internal_rcv_id),
FOREIGN KEY (conn_alias, internal_id)
REFERENCES messages (conn_alias, internal_id)
ON DELETE CASCADE
) WITHOUT ROWID;
CREATE TABLE IF NOT EXISTS snd_messages(
conn_alias BLOB NOT NULL,
internal_snd_id INTEGER NOT NULL,
internal_id INTEGER NOT NULL,
snd_status TEXT NOT NULL,
sent_ts TEXT,
delivered_ts TEXT,
internal_hash BLOB NOT NULL,
PRIMARY KEY (conn_alias, internal_snd_id),
FOREIGN KEY (conn_alias, internal_id)
REFERENCES messages (conn_alias, internal_id)
ON DELETE CASCADE
) WITHOUT ROWID;

9
migrations/README.md Normal file
View File

@@ -0,0 +1,9 @@
# SQLite database migrations
These migrations are [embedded](../src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs) into the executable and run when SMP agent starts (as a separate executable or as a part of [simplex-chat](https://github.com/simplex-chat/simplex-chat) app).
Migration file names must have a format `YYYYMMDD-name.sql` - they will be executed in the order or lexicographic sorting of the names, the files with any other extension than `.sql` are ignored.
The proposed approach is to minimize the number of migrations and merge them together when possible, to align with the agent releases.
**Please note**: Adding or editing migrations will NOT update the migrations embedded into the executable, unless the [Migrations](../src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs) module is rebuilt - use `stack build --force-dirty` (in addition to edited files it seems to rebuild the files with TH splices and their dependencies, not all files as with `stack clean`).

View File

@@ -33,7 +33,9 @@ dependencies:
- constraints == 0.12.*
- containers == 0.6.*
- cryptonite == 0.27.*
- direct-sqlite == 2.3.*
- directory == 1.3.*
- file-embed == 0.0.14.*
- filepath == 1.4.*
- generic-random == 1.3.*
- iso8601-time == 0.1.*

View File

@@ -7,7 +7,6 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
-- |
-- Module : Simplex.Messaging.Agent
@@ -101,8 +100,8 @@ logConnection c connected =
runSMPAgentClient :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
runSMPAgentClient c = do
db <- asks $ dbFile . config
s1 <- connectSQLiteStore db
s2 <- connectSQLiteStore db
s1 <- liftIO $ connectSQLiteStore db
s2 <- liftIO $ connectSQLiteStore db
race_ (subscriber c s1) (client c s2)
receive :: forall c m. (Transport c, MonadUnliftIO m) => c -> AgentClient -> m ()

View File

@@ -37,7 +37,7 @@ data Env = Env
newSMPAgentEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env
newSMPAgentEnv config = do
idsDrg <- newTVarIO =<< drgNew
_ <- createSQLiteStore $ dbFile config
_ <- liftIO $ createSQLiteStore $ dbFile config
clientCounter <- newTVarIO 0
randomServer <- newTVarIO =<< liftIO newStdGen
return Env {config, idsDrg, clientCounter, reservedMsgSize, randomServer}

View File

@@ -22,10 +22,11 @@ module Simplex.Messaging.Agent.Store.SQLite
where
import Control.Concurrent (threadDelay)
import Control.Monad (when)
import Control.Monad (unless, when)
import Control.Monad.Except (MonadError (throwError), MonadIO (liftIO))
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Data.Bifunctor (first)
import Data.Char (toLower)
import Data.List (find)
import Data.Maybe (fromMaybe)
import Data.Text (isPrefixOf)
@@ -41,51 +42,70 @@ import Database.SQLite.Simple.ToField (ToField (..))
import Network.Socket (ServiceName)
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite.Schema (createSchema)
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Parsers (blobFieldParser)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Util (bshow, liftIOEither)
import System.Exit (ExitCode (ExitFailure), exitWith)
import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist)
import System.Exit (exitFailure)
import System.FilePath (takeDirectory)
import System.IO (hFlush, stdout)
import Text.Read (readMaybe)
import UnliftIO.Directory (createDirectoryIfMissing)
import qualified UnliftIO.Exception as E
-- * SQLite Store implementation
data SQLiteStore = SQLiteStore
{ dbFilePath :: FilePath,
dbConn :: DB.Connection
dbConn :: DB.Connection,
dbNew :: Bool
}
createSQLiteStore :: MonadUnliftIO m => FilePath -> m SQLiteStore
createSQLiteStore :: FilePath -> IO SQLiteStore
createSQLiteStore dbFilePath = do
let dbDir = takeDirectory dbFilePath
createDirectoryIfMissing False dbDir
store <- connectSQLiteStore dbFilePath
compileOptions <- liftIO (DB.query_ (dbConn store) "pragma COMPILE_OPTIONS;" :: IO [[T.Text]])
compileOptions <- DB.query_ (dbConn store) "pragma COMPILE_OPTIONS;" :: IO [[T.Text]]
let threadsafeOption = find (isPrefixOf "THREADSAFE=") (concat compileOptions)
liftIO $ case threadsafeOption of
Just "THREADSAFE=0" -> do
putStrLn "SQLite compiled with not threadsafe code, continue (y/n):"
s <- getLine
when (s /= "y") (exitWith $ ExitFailure 2)
case threadsafeOption of
Just "THREADSAFE=0" -> confirmOrExit "SQLite compiled with non-threadsafe code."
Nothing -> putStrLn "Warning: SQLite THREADSAFE compile option not found"
_ -> return ()
liftIO . createSchema $ dbConn store
return store
migrateSchema store
pure store
connectSQLiteStore :: MonadUnliftIO m => FilePath -> m SQLiteStore
migrateSchema :: SQLiteStore -> IO ()
migrateSchema SQLiteStore {dbConn, dbFilePath, dbNew} = do
Migrations.initialize dbConn
Migrations.get dbConn Migrations.app >>= \case
Left e -> confirmOrExit $ "Database error: " <> e
Right [] -> pure ()
Right ms -> do
unless dbNew $ do
confirmOrExit "The app has a newer version than the database - it will be backed up and upgraded."
copyFile dbFilePath $ dbFilePath <> ".bak"
Migrations.run dbConn ms
confirmOrExit :: String -> IO ()
confirmOrExit s = do
putStrLn s
putStr "Continue (y/N): "
hFlush stdout
ok <- getLine
when (map toLower ok /= "y") exitFailure
connectSQLiteStore :: FilePath -> IO SQLiteStore
connectSQLiteStore dbFilePath = do
dbConn <- liftIO $ DB.open dbFilePath
liftIO $
DB.execute_
dbConn
[sql|
PRAGMA foreign_keys = ON;
PRAGMA journal_mode = WAL;
|]
return SQLiteStore {dbFilePath, dbConn}
dbNew <- not <$> doesFileExist dbFilePath
dbConn <- DB.open dbFilePath
DB.execute_
dbConn
[sql|
PRAGMA foreign_keys = ON;
PRAGMA journal_mode = WAL;
|]
pure SQLiteStore {dbFilePath, dbConn, dbNew}
checkDuplicate :: (MonadUnliftIO m, MonadError StoreError m) => IO () -> m ()
checkDuplicate action = liftIOEither $ first handleError <$> E.try action

View File

@@ -0,0 +1,71 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TupleSections #-}
module Simplex.Messaging.Agent.Store.SQLite.Migrations
( app,
initialize,
get,
run,
)
where
import Control.Monad (forM_)
import Data.FileEmbed (embedDir, makeRelativeToProject)
import Data.Function (on)
import Data.List (intercalate, sortBy)
import Data.Text (Text)
import Data.Text.Encoding (decodeUtf8)
import Data.Time.Clock (getCurrentTime)
import Database.SQLite.Simple (Connection, Only (..))
import qualified Database.SQLite.Simple as DB
import Database.SQLite.Simple.QQ (sql)
import qualified Database.SQLite3 as SQLite3
import System.FilePath (takeBaseName, takeExtension)
data Migration = Migration {name :: String, up :: Text}
deriving (Show)
-- | The list of migrations in ascending order by date
app :: [Migration]
app =
sortBy (compare `on` name) . map migration . filter sqlFile $
$(makeRelativeToProject "migrations" >>= embedDir)
where
sqlFile (file, _) = takeExtension file == ".sql"
migration (file, qStr) = Migration {name = takeBaseName file, up = decodeUtf8 qStr}
get :: Connection -> [Migration] -> IO (Either String [Migration])
get conn migrations =
migrationsToRun migrations . map fromOnly
<$> DB.query_ conn "SELECT name FROM migrations ORDER BY name ASC;"
run :: Connection -> [Migration] -> IO ()
run conn ms = DB.withImmediateTransaction conn . forM_ ms $
\Migration {name, up} -> insert name >> execSQL up
where
insert name = DB.execute conn "INSERT INTO migrations (name, ts) VALUES (?, ?);" . (name,) =<< getCurrentTime
execSQL = SQLite3.exec $ DB.connectionHandle conn
initialize :: Connection -> IO ()
initialize conn =
DB.execute_
conn
[sql|
CREATE TABLE IF NOT EXISTS migrations (
name TEXT NOT NULL,
ts TEXT NOT NULL,
PRIMARY KEY (name)
);
|]
migrationsToRun :: [Migration] -> [String] -> Either String [Migration]
migrationsToRun appMs [] = Right appMs
migrationsToRun [] dbMs = Left $ "database version is newer than the app: " <> intercalate ", " dbMs
migrationsToRun (a : as) (d : ds)
| name a == d = migrationsToRun as ds
| otherwise = Left $ "different migration in the app/database: " <> name a <> " / " <> d

View File

@@ -1,167 +0,0 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.SQLite.Schema (createSchema) where
import Database.SQLite.Simple (Connection, Query, execute_)
import Database.SQLite.Simple.QQ (sql)
createSchema :: Connection -> IO ()
createSchema conn =
mapM_
(execute_ conn)
[ servers,
rcvQueues,
sndQueues,
connections,
messages,
rcvMessages,
sndMessages
]
-- port is either a port number or a service name, see Network.Socket.Info.ServiceName
servers :: Query
servers =
[sql|
CREATE TABLE IF NOT EXISTS servers(
host TEXT NOT NULL,
port TEXT NOT NULL,
key_hash BLOB,
PRIMARY KEY (host, port)
) WITHOUT ROWID;
|]
rcvQueues :: Query
rcvQueues =
[sql|
CREATE TABLE IF NOT EXISTS rcv_queues(
host TEXT NOT NULL,
port TEXT NOT NULL,
rcv_id BLOB NOT NULL,
conn_alias BLOB NOT NULL,
rcv_private_key BLOB NOT NULL,
snd_id BLOB,
snd_key BLOB,
decrypt_key BLOB NOT NULL,
verify_key BLOB,
status TEXT NOT NULL,
PRIMARY KEY (host, port, rcv_id),
FOREIGN KEY (host, port) REFERENCES servers (host, port),
FOREIGN KEY (conn_alias)
REFERENCES connections (conn_alias)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED,
UNIQUE (host, port, snd_id)
) WITHOUT ROWID;
|]
sndQueues :: Query
sndQueues =
[sql|
CREATE TABLE IF NOT EXISTS snd_queues(
host TEXT NOT NULL,
port TEXT NOT NULL,
snd_id BLOB NOT NULL,
conn_alias BLOB NOT NULL,
snd_private_key BLOB NOT NULL,
encrypt_key BLOB NOT NULL,
sign_key BLOB NOT NULL,
status TEXT NOT NULL,
PRIMARY KEY (host, port, snd_id),
FOREIGN KEY (host, port) REFERENCES servers (host, port),
FOREIGN KEY (conn_alias)
REFERENCES connections (conn_alias)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED
) WITHOUT ROWID;
|]
connections :: Query
connections =
[sql|
CREATE TABLE IF NOT EXISTS connections(
conn_alias BLOB NOT NULL,
rcv_host TEXT,
rcv_port TEXT,
rcv_id BLOB,
snd_host TEXT,
snd_port TEXT,
snd_id BLOB,
last_internal_msg_id INTEGER NOT NULL,
last_internal_rcv_msg_id INTEGER NOT NULL,
last_internal_snd_msg_id INTEGER NOT NULL,
last_external_snd_msg_id INTEGER NOT NULL,
last_rcv_msg_hash BLOB NOT NULL,
last_snd_msg_hash BLOB NOT NULL,
PRIMARY KEY (conn_alias),
FOREIGN KEY (rcv_host, rcv_port, rcv_id) REFERENCES rcv_queues (host, port, rcv_id),
FOREIGN KEY (snd_host, snd_port, snd_id) REFERENCES snd_queues (host, port, snd_id)
) WITHOUT ROWID;
|]
messages :: Query
messages =
[sql|
CREATE TABLE IF NOT EXISTS messages(
conn_alias BLOB NOT NULL,
internal_id INTEGER NOT NULL,
internal_ts TEXT NOT NULL,
internal_rcv_id INTEGER,
internal_snd_id INTEGER,
body TEXT NOT NULL,
PRIMARY KEY (conn_alias, internal_id),
FOREIGN KEY (conn_alias)
REFERENCES connections (conn_alias)
ON DELETE CASCADE,
FOREIGN KEY (conn_alias, internal_rcv_id)
REFERENCES rcv_messages (conn_alias, internal_rcv_id)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED,
FOREIGN KEY (conn_alias, internal_snd_id)
REFERENCES snd_messages (conn_alias, internal_snd_id)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED
) WITHOUT ROWID;
|]
rcvMessages :: Query
rcvMessages =
[sql|
CREATE TABLE IF NOT EXISTS rcv_messages(
conn_alias BLOB NOT NULL,
internal_rcv_id INTEGER NOT NULL,
internal_id INTEGER NOT NULL,
external_snd_id INTEGER NOT NULL,
external_snd_ts TEXT NOT NULL,
broker_id BLOB NOT NULL,
broker_ts TEXT NOT NULL,
rcv_status TEXT NOT NULL,
ack_brocker_ts TEXT,
ack_sender_ts TEXT,
internal_hash BLOB NOT NULL,
external_prev_snd_hash BLOB NOT NULL,
integrity BLOB NOT NULL,
PRIMARY KEY (conn_alias, internal_rcv_id),
FOREIGN KEY (conn_alias, internal_id)
REFERENCES messages (conn_alias, internal_id)
ON DELETE CASCADE
) WITHOUT ROWID;
|]
sndMessages :: Query
sndMessages =
[sql|
CREATE TABLE IF NOT EXISTS snd_messages(
conn_alias BLOB NOT NULL,
internal_snd_id INTEGER NOT NULL,
internal_id INTEGER NOT NULL,
snd_status TEXT NOT NULL,
sent_ts TEXT,
delivered_ts TEXT,
internal_hash BLOB NOT NULL,
PRIMARY KEY (conn_alias, internal_snd_id),
FOREIGN KEY (conn_alias, internal_id)
REFERENCES messages (conn_alias, internal_id)
ON DELETE CASCADE
) WITHOUT ROWID;
|]