From 6b5de2c51bee62668dd7388a782bb5e7ec9f659f Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 10 Mar 2025 17:28:26 +0000 Subject: [PATCH] smp server: exception on invalid migration, check queue storage settings on server start (#1478) * smp server: check queue storage settings on server start * fix incorrect postgres migration not throwing exception --- .../Agent/Store/Postgres/Migrations.hs | 7 +++- src/Simplex/Messaging/Server/Main.hs | 33 +++++++++++++++---- tests/SMPClient.hs | 9 +++-- 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations.hs b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations.hs index daa0c73f1..d6f552937 100644 --- a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations.hs @@ -11,7 +11,9 @@ module Simplex.Messaging.Agent.Store.Postgres.Migrations ) where +import Control.Exception (throwIO) import Control.Monad (void) +import qualified Data.ByteString.Char8 as B import qualified Data.Text as T import qualified Data.Text.Encoding as TE import Data.Time.Clock (getCurrentTime) @@ -22,6 +24,7 @@ import Database.PostgreSQL.Simple.Internal (Connection (..)) import Database.PostgreSQL.Simple.SqlQQ (sql) import Simplex.Messaging.Agent.Store.Postgres.Common import Simplex.Messaging.Agent.Store.Shared +import Simplex.Messaging.Util (($>>=)) import UnliftIO.MVar initialize :: DBStore -> IO () @@ -55,7 +58,9 @@ run st = \case void $ PSQL.execute db "DELETE FROM migrations WHERE name = ?" (Only downName) execSQL db query = withMVar (connectionHandle db) $ \pqConn -> - void $ LibPQ.exec pqConn (TE.encodeUtf8 query) + LibPQ.exec pqConn (TE.encodeUtf8 query) $>>= LibPQ.resultErrorMessage >>= \case + Just e | not (B.null e) -> throwIO $ userError $ B.unpack e + _ -> pure () getCurrentMigrations :: PSQL.Connection -> IO [Migration] getCurrentMigrations db = map toMigration <$> PSQL.query_ db "SELECT name, down FROM migrations ORDER BY name ASC;" diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 66dfba70b..806c7fc34 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -3,6 +3,7 @@ {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} @@ -431,7 +432,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = printServiceInfo serverVersion srv printSourceCode sourceCode' printSMPServerConfig transports serverStoreCfg - checkMsgStoreMode iniStoreType + checkMsgStoreMode ini iniStoreType putStrLn $ case messageExpiration of Just ExpirationConfig {ttl} -> "expiring messages after " <> showTTL ttl _ -> "not expiring messages" @@ -586,25 +587,43 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = pure WebHttpsParams {port, cert, key} webStaticPath' = eitherToMaybe $ T.unpack <$> lookupValue "WEB" "static_path" ini - checkMsgStoreMode :: AStoreType -> IO () - checkMsgStoreMode mode = do + checkMsgStoreMode :: Ini -> AStoreType -> IO () + checkMsgStoreMode ini mode = do msgsDirExists <- doesDirectoryExist storeMsgsJournalDir msgsFileExists <- doesFileExist storeMsgsFilePath + storeLogExists <- doesFileExist storeLogFilePath case mode of - _ | msgsFileExists && msgsDirExists -> exitConfigureMsgStorage - ASType _ SMSJournal -- TODO [postgres] + ASType qs SMSJournal + | msgsFileExists && msgsDirExists -> exitConfigureMsgStorage | msgsFileExists -> do putStrLn $ "Error: store_messages is `journal` with " <> storeMsgsFilePath <> " file present." putStrLn "Set store_messages to `memory` or use `smp-server journal export` to migrate." exitFailure | not msgsDirExists -> putStrLn $ "store_messages is `journal`, " <> storeMsgsJournalDir <> " directory will be created." - ASType _ SMSMemory + | otherwise -> case qs of + SQSMemory -> + unless (storeLogExists) $ putStrLn $ "store_queues is `memory`, " <> storeLogFilePath <> " file will be created." + SQSPostgres -> do + let DBOpts {connstr, schema} = iniDBOptions ini + schemaExists <- checkSchemaExists connstr schema + if + | storeLogExists && schemaExists -> exitConfigureQueueStore connstr schema + | storeLogExists -> do + putStrLn $ "Error: store_queues is `database` with " <> storeLogFilePath <> " file present." + putStrLn "Set store_queues to `memory` or use `smp-server database import` to migrate." + exitFailure + | not schemaExists -> do + putStrLn $ "Error: store_queues is `database`, create schema " <> B.unpack schema <> " in PostgreSQL database " <> B.unpack connstr + exitFailure + | otherwise -> pure () + ASType SQSMemory SMSMemory + | msgsFileExists && msgsDirExists -> exitConfigureMsgStorage | msgsDirExists -> do putStrLn $ "Error: store_messages is `memory` with " <> storeMsgsJournalDir <> " directory present." putStrLn "Set store_messages to `journal` or use `smp-server journal import` to migrate." exitFailure - _ -> pure () + | otherwise -> pure () exitConfigureMsgStorage = do putStrLn $ "Error: both " <> storeMsgsFilePath <> " file and " <> storeMsgsJournalDir <> " directory are present." diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 4bae8215a..a1527b78a 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -40,7 +40,7 @@ import System.Info (os) import Test.Hspec import UnliftIO.Concurrent import qualified UnliftIO.Exception as E -import UnliftIO.STM (TMVar, atomically, newEmptyTMVarIO, takeTMVar) +import UnliftIO.STM (TMVar, atomically, newEmptyTMVarIO, putTMVar, takeTMVar) import UnliftIO.Timeout (timeout) import Util @@ -278,10 +278,15 @@ serverBracket :: HasCallStack => (TMVar Bool -> IO ()) -> IO () -> (HasCallStack serverBracket process afterProcess f = do started <- newEmptyTMVarIO E.bracket - (forkIOWithUnmask ($ process started)) + (forkIOWithUnmask (\unmask -> unmask (process started) `E.catchAny` handleStartError started)) (\t -> killThread t >> afterProcess >> waitFor started "stop") (\t -> waitFor started "start" >> f t >>= \r -> r <$ threadDelay 100000) where + -- it putTMVar is called twise to unlock both parts of the bracket in case of start failure + handleStartError started e = do + atomically $ putTMVar started False + atomically $ putTMVar started False + E.throwIO e waitFor started s = 5_000_000 `timeout` atomically (takeTMVar started) >>= \case Nothing -> error $ "server did not " <> s