mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-29 14:30:22 +00:00
smp server: exception on invalid migration, check queue storage settings on server start (#1478)
* smp server: check queue storage settings on server start * fix incorrect postgres migration not throwing exception
This commit is contained in:
@@ -11,7 +11,9 @@ module Simplex.Messaging.Agent.Store.Postgres.Migrations
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Exception (throwIO)
|
||||
import Control.Monad (void)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.Text as T
|
||||
import qualified Data.Text.Encoding as TE
|
||||
import Data.Time.Clock (getCurrentTime)
|
||||
@@ -22,6 +24,7 @@ import Database.PostgreSQL.Simple.Internal (Connection (..))
|
||||
import Database.PostgreSQL.Simple.SqlQQ (sql)
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Common
|
||||
import Simplex.Messaging.Agent.Store.Shared
|
||||
import Simplex.Messaging.Util (($>>=))
|
||||
import UnliftIO.MVar
|
||||
|
||||
initialize :: DBStore -> IO ()
|
||||
@@ -55,7 +58,9 @@ run st = \case
|
||||
void $ PSQL.execute db "DELETE FROM migrations WHERE name = ?" (Only downName)
|
||||
execSQL db query =
|
||||
withMVar (connectionHandle db) $ \pqConn ->
|
||||
void $ LibPQ.exec pqConn (TE.encodeUtf8 query)
|
||||
LibPQ.exec pqConn (TE.encodeUtf8 query) $>>= LibPQ.resultErrorMessage >>= \case
|
||||
Just e | not (B.null e) -> throwIO $ userError $ B.unpack e
|
||||
_ -> pure ()
|
||||
|
||||
getCurrentMigrations :: PSQL.Connection -> IO [Migration]
|
||||
getCurrentMigrations db = map toMigration <$> PSQL.query_ db "SELECT name, down FROM migrations ORDER BY name ASC;"
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE MultiWayIf #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedLists #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
@@ -431,7 +432,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
|
||||
printServiceInfo serverVersion srv
|
||||
printSourceCode sourceCode'
|
||||
printSMPServerConfig transports serverStoreCfg
|
||||
checkMsgStoreMode iniStoreType
|
||||
checkMsgStoreMode ini iniStoreType
|
||||
putStrLn $ case messageExpiration of
|
||||
Just ExpirationConfig {ttl} -> "expiring messages after " <> showTTL ttl
|
||||
_ -> "not expiring messages"
|
||||
@@ -586,25 +587,43 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
|
||||
pure WebHttpsParams {port, cert, key}
|
||||
webStaticPath' = eitherToMaybe $ T.unpack <$> lookupValue "WEB" "static_path" ini
|
||||
|
||||
checkMsgStoreMode :: AStoreType -> IO ()
|
||||
checkMsgStoreMode mode = do
|
||||
checkMsgStoreMode :: Ini -> AStoreType -> IO ()
|
||||
checkMsgStoreMode ini mode = do
|
||||
msgsDirExists <- doesDirectoryExist storeMsgsJournalDir
|
||||
msgsFileExists <- doesFileExist storeMsgsFilePath
|
||||
storeLogExists <- doesFileExist storeLogFilePath
|
||||
case mode of
|
||||
_ | msgsFileExists && msgsDirExists -> exitConfigureMsgStorage
|
||||
ASType _ SMSJournal -- TODO [postgres]
|
||||
ASType qs SMSJournal
|
||||
| msgsFileExists && msgsDirExists -> exitConfigureMsgStorage
|
||||
| msgsFileExists -> do
|
||||
putStrLn $ "Error: store_messages is `journal` with " <> storeMsgsFilePath <> " file present."
|
||||
putStrLn "Set store_messages to `memory` or use `smp-server journal export` to migrate."
|
||||
exitFailure
|
||||
| not msgsDirExists ->
|
||||
putStrLn $ "store_messages is `journal`, " <> storeMsgsJournalDir <> " directory will be created."
|
||||
ASType _ SMSMemory
|
||||
| otherwise -> case qs of
|
||||
SQSMemory ->
|
||||
unless (storeLogExists) $ putStrLn $ "store_queues is `memory`, " <> storeLogFilePath <> " file will be created."
|
||||
SQSPostgres -> do
|
||||
let DBOpts {connstr, schema} = iniDBOptions ini
|
||||
schemaExists <- checkSchemaExists connstr schema
|
||||
if
|
||||
| storeLogExists && schemaExists -> exitConfigureQueueStore connstr schema
|
||||
| storeLogExists -> do
|
||||
putStrLn $ "Error: store_queues is `database` with " <> storeLogFilePath <> " file present."
|
||||
putStrLn "Set store_queues to `memory` or use `smp-server database import` to migrate."
|
||||
exitFailure
|
||||
| not schemaExists -> do
|
||||
putStrLn $ "Error: store_queues is `database`, create schema " <> B.unpack schema <> " in PostgreSQL database " <> B.unpack connstr
|
||||
exitFailure
|
||||
| otherwise -> pure ()
|
||||
ASType SQSMemory SMSMemory
|
||||
| msgsFileExists && msgsDirExists -> exitConfigureMsgStorage
|
||||
| msgsDirExists -> do
|
||||
putStrLn $ "Error: store_messages is `memory` with " <> storeMsgsJournalDir <> " directory present."
|
||||
putStrLn "Set store_messages to `journal` or use `smp-server journal import` to migrate."
|
||||
exitFailure
|
||||
_ -> pure ()
|
||||
| otherwise -> pure ()
|
||||
|
||||
exitConfigureMsgStorage = do
|
||||
putStrLn $ "Error: both " <> storeMsgsFilePath <> " file and " <> storeMsgsJournalDir <> " directory are present."
|
||||
|
||||
@@ -40,7 +40,7 @@ import System.Info (os)
|
||||
import Test.Hspec
|
||||
import UnliftIO.Concurrent
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM (TMVar, atomically, newEmptyTMVarIO, takeTMVar)
|
||||
import UnliftIO.STM (TMVar, atomically, newEmptyTMVarIO, putTMVar, takeTMVar)
|
||||
import UnliftIO.Timeout (timeout)
|
||||
import Util
|
||||
|
||||
@@ -278,10 +278,15 @@ serverBracket :: HasCallStack => (TMVar Bool -> IO ()) -> IO () -> (HasCallStack
|
||||
serverBracket process afterProcess f = do
|
||||
started <- newEmptyTMVarIO
|
||||
E.bracket
|
||||
(forkIOWithUnmask ($ process started))
|
||||
(forkIOWithUnmask (\unmask -> unmask (process started) `E.catchAny` handleStartError started))
|
||||
(\t -> killThread t >> afterProcess >> waitFor started "stop")
|
||||
(\t -> waitFor started "start" >> f t >>= \r -> r <$ threadDelay 100000)
|
||||
where
|
||||
-- it putTMVar is called twise to unlock both parts of the bracket in case of start failure
|
||||
handleStartError started e = do
|
||||
atomically $ putTMVar started False
|
||||
atomically $ putTMVar started False
|
||||
E.throwIO e
|
||||
waitFor started s =
|
||||
5_000_000 `timeout` atomically (takeTMVar started) >>= \case
|
||||
Nothing -> error $ "server did not " <> s
|
||||
|
||||
Reference in New Issue
Block a user