From bac6ea6e91000119d932a4e0c3181bb7868d158f Mon Sep 17 00:00:00 2001 From: Evgeny Date: Thu, 11 Sep 2025 20:22:55 +0100 Subject: [PATCH] smp server: store messages in PostgreSQL (#1622) * smp server: store messages in PostgreSQL * stored procedures to write and to expire messages * function to export messages * move all message functions to PostgreSQL, remove delete trigger * comments * import messages to db * fix message import, add export * fix export * fix export * fix compilation flags * import messages line by line * fix server start with database storage * fix compilation * comments --- simplexmq.cabal | 1 + .../Messaging/Agent/Store/AgentStore.hs | 51 ++- src/Simplex/Messaging/Server.hs | 31 +- src/Simplex/Messaging/Server/CLI.hs | 15 +- src/Simplex/Messaging/Server/Env/STM.hs | 67 +++- src/Simplex/Messaging/Server/Main.hs | 165 +++++++-- .../Messaging/Server/MsgStore/Journal.hs | 19 +- .../Messaging/Server/MsgStore/Postgres.hs | 325 ++++++++++++++++++ src/Simplex/Messaging/Server/MsgStore/STM.hs | 17 +- .../Messaging/Server/MsgStore/Types.hs | 91 ++--- .../Messaging/Server/QueueStore/Postgres.hs | 27 +- .../Server/QueueStore/Postgres/Migrations.hs | 239 ++++++++++++- .../QueueStore/Postgres/server_schema.sql | 241 ++++++++++++- .../Messaging/Server/QueueStore/STM.hs | 7 +- .../Messaging/Server/QueueStore/Types.hs | 4 +- src/Simplex/Messaging/Util.hs | 3 + tests/AgentTests/FunctionalAPITests.hs | 35 +- tests/CLITests.hs | 2 +- tests/CoreTests/MsgStoreTests.hs | 94 ++++- tests/SMPClient.hs | 11 +- tests/ServerTests.hs | 27 +- tests/Test.hs | 11 +- 22 files changed, 1277 insertions(+), 206 deletions(-) create mode 100644 src/Simplex/Messaging/Server/MsgStore/Postgres.hs diff --git a/simplexmq.cabal b/simplexmq.cabal index 6dc3d0be4..1dcffaca1 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -266,6 +266,7 @@ library Simplex.Messaging.Notifications.Server.Store.Postgres Simplex.Messaging.Notifications.Server.Store.Types Simplex.Messaging.Notifications.Server.StoreLog + Simplex.Messaging.Server.MsgStore.Postgres Simplex.Messaging.Server.QueueStore.Postgres Simplex.Messaging.Server.QueueStore.Postgres.Migrations other-modules: diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index fef829c66..c958ae710 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -287,7 +287,7 @@ import Simplex.Messaging.Protocol import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Agent.Store.Entity import Simplex.Messaging.Transport.Client (TransportHost) -import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, firstRow, firstRow', ifM, maybeFirstRow, tshow, ($>>=), (<$$>)) +import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, firstRow, firstRow', ifM, maybeFirstRow, maybeFirstRow', tshow, ($>>=), (<$$>)) import Simplex.Messaging.Version.Internal import qualified UnliftIO.Exception as E import UnliftIO.STM @@ -426,15 +426,12 @@ deleteConnRecord :: DB.Connection -> ConnId -> IO () deleteConnRecord db connId = DB.execute db "DELETE FROM connections WHERE conn_id = ?" (Only connId) checkConfirmedSndQueueExists_ :: DB.Connection -> NewSndQueue -> IO Bool -checkConfirmedSndQueueExists_ db SndQueue {server, sndId} = do - fromMaybe False - <$> maybeFirstRow - fromOnly - ( DB.query - db - "SELECT 1 FROM snd_queues WHERE host = ? AND port = ? AND snd_id = ? AND status != ? LIMIT 1" - (host server, port server, sndId, New) - ) +checkConfirmedSndQueueExists_ db SndQueue {server, sndId} = + maybeFirstRow' False fromOnly $ + DB.query + db + "SELECT 1 FROM snd_queues WHERE host = ? AND port = ? AND snd_id = ? AND status != ? LIMIT 1" + (host server, port server, sndId, New) getRcvConn :: DB.Connection -> SMPServer -> SMP.RecipientId -> IO (Either StoreError (RcvQueue, SomeConn)) getRcvConn db ProtocolServer {host, port} rcvId = runExceptT $ do @@ -1072,15 +1069,12 @@ toRcvMsg ((agentMsgId, internalTs, brokerId, brokerTs) :. (sndMsgId, integrity, in RcvMsg {internalId = InternalId agentMsgId, msgMeta, msgType, msgBody, internalHash, msgReceipt, userAck} checkRcvMsgHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool -checkRcvMsgHashExists db connId hash = do - fromMaybe False - <$> maybeFirstRow - fromOnly - ( DB.query - db - "SELECT 1 FROM encrypted_rcv_message_hashes WHERE conn_id = ? AND hash = ? LIMIT 1" - (connId, Binary hash) - ) +checkRcvMsgHashExists db connId hash = + maybeFirstRow' False fromOnly $ + DB.query + db + "SELECT 1 FROM encrypted_rcv_message_hashes WHERE conn_id = ? AND hash = ? LIMIT 1" + (connId, Binary hash) getRcvMsgBrokerTs :: DB.Connection -> ConnId -> SMP.MsgId -> IO (Either StoreError BrokerTs) getRcvMsgBrokerTs db connId msgId = @@ -2119,15 +2113,12 @@ addProcessedRatchetKeyHash db connId hash = DB.execute db "INSERT INTO processed_ratchet_key_hashes (conn_id, hash) VALUES (?,?)" (connId, Binary hash) checkRatchetKeyHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool -checkRatchetKeyHashExists db connId hash = do - fromMaybe False - <$> maybeFirstRow - fromOnly - ( DB.query - db - "SELECT 1 FROM processed_ratchet_key_hashes WHERE conn_id = ? AND hash = ? LIMIT 1" - (connId, Binary hash) - ) +checkRatchetKeyHashExists db connId hash = + maybeFirstRow' False fromOnly $ + DB.query + db + "SELECT 1 FROM processed_ratchet_key_hashes WHERE conn_id = ? AND hash = ? LIMIT 1" + (connId, Binary hash) deleteRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> IO () deleteRatchetKeyHashesExpired db ttl = do @@ -2905,8 +2896,8 @@ deleteSndFile' db sndFileId = getSndFileDeleted :: DB.Connection -> DBSndFileId -> IO Bool getSndFileDeleted db sndFileId = - fromMaybe True - <$> maybeFirstRow fromOnlyBI (DB.query db "SELECT deleted FROM snd_files WHERE snd_file_id = ?" (Only sndFileId)) + maybeFirstRow' True fromOnlyBI $ + DB.query db "SELECT deleted FROM snd_files WHERE snd_file_id = ?" (Only sndFileId) createSndFileReplica :: DB.Connection -> SndFileChunk -> NewSndChunkReplica -> IO () createSndFileReplica db SndFileChunk {sndChunkId} = createSndFileReplica_ db sndChunkId diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 61748dd5b..7d6e00ab0 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -105,7 +105,7 @@ import Simplex.Messaging.Server.Control import Simplex.Messaging.Server.Env.STM as Env import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore -import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, getJournalQueueMessages) +import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue (..), getJournalQueueMessages) import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.NtfStore @@ -132,12 +132,17 @@ import UnliftIO.Directory (doesFileExist, renameFile) import UnliftIO.Exception import UnliftIO.IO import UnliftIO.STM + #if MIN_VERSION_base(4,18,0) import Data.List (sort) import GHC.Conc (listThreads, threadStatus) import GHC.Conc.Sync (threadLabel) #endif +#if defined(dbServerPostgres) +import Simplex.Messaging.Server.MsgStore.Postgres (exportDbMessages, getDbMessageStats) +#endif + -- | Runs an SMP server using passed configuration. -- -- See a full server here: https://github.com/simplex-chat/simplexmq/blob/master/apps/smp-server/Main.hs @@ -477,7 +482,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt atomicWriteIORef (msgCount stats) stored atomicModifyIORef'_ (msgExpired stats) (+ expired) printMessageStats "STORE: messages" msgStats - Left e -> logError $ "STORE: withAllMsgQueues, error expiring messages, " <> tshow e + Left e -> logError $ "STORE: expireOldMessages, error expiring messages, " <> tshow e expireNtfsThread :: ServerConfig s -> M s () expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do @@ -2109,6 +2114,9 @@ saveServerMessages drainMsgs ms = case ms of Just f -> exportMessages False ms f drainMsgs Nothing -> logNote "undelivered messages are not saved" StoreJournal _ -> logNote "closed journal message storage" +#if defined(dbServerPostgres) + StoreDatabase _ -> logNote "closed postgres message storage" +#endif exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath -> Bool -> IO () exportMessages tty st f drainMsgs = do @@ -2116,6 +2124,9 @@ exportMessages tty st f drainMsgs = do run $ case st of StoreMemory ms -> exportMessages_ ms $ getMsgs ms StoreJournal ms -> exportMessages_ ms $ getJournalMsgs ms +#if defined(dbServerPostgres) + StoreDatabase ms -> exportDbMessages tty ms +#endif where exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty ms . saveQueueMsgs get run :: (Handle -> IO Int) -> IO () @@ -2125,7 +2136,7 @@ exportMessages tty st f drainMsgs = do logError $ "error exporting messages: " <> tshow e exitFailure getJournalMsgs ms q = - readTVarIO (msgQueue q) >>= \case + readTVarIO (msgQueue' q) >>= \case Just _ -> getMsgs ms q Nothing -> getJournalQueueMessages ms q getMsgs :: MsgStoreClass s' => s' -> StoreQueue s' -> IO [Message] @@ -2149,6 +2160,9 @@ processServerMessages StartOptions {skipWarnings} = do Just f -> ifM (doesFileExist f) (Just <$> importMessages False ms f old_ skipWarnings) (pure Nothing) Nothing -> pure Nothing StoreJournal ms -> processJournalMessages old_ expire ms +#if defined(dbServerPostgres) + StoreDatabase ms -> processDbMessages old_ expire ms +#endif processJournalMessages :: forall s. Maybe Int64 -> Bool -> JournalMsgStore s -> IO (Maybe MessageStats) processJournalMessages old_ expire ms | expire = Just <$> case old_ of @@ -2171,6 +2185,17 @@ processServerMessages StartOptions {skipWarnings} = do processValidateQueue q = unsafeRunStore q "processValidateQueue" $ do storedMsgsCount <- getQueueSize_ =<< getMsgQueue ms q False pure newMessageStats {storedMsgsCount, storedQueues = 1} +#if defined(dbServerPostgres) + processDbMessages old_ expire ms + | expire = Just <$> case old_ of + Just old -> do + -- TODO [messages] expire messages from all queues, not only recent + logNote "expiring database store messages..." + now <- systemSeconds <$> getSystemTime + expireOldMessages False ms now (now - old) + Nothing -> getDbMessageStats ms + | otherwise = logWarn "skipping message expiration" $> Nothing +#endif importMessages :: forall s. MsgStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> Bool -> IO MessageStats importMessages tty ms f old_ skipWarnings = do diff --git a/src/Simplex/Messaging/Server/CLI.hs b/src/Simplex/Messaging/Server/CLI.hs index 04db0231c..89077ea47 100644 --- a/src/Simplex/Messaging/Server/CLI.hs +++ b/src/Simplex/Messaging/Server/CLI.hs @@ -33,7 +33,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Options (DBOpts (..)) import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..)) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), ProtocolServer (..), ProtocolTypeI) -import Simplex.Messaging.Server.Env.STM (ServerStoreCfg (..), StartOptions (..), StorePaths (..)) +import Simplex.Messaging.Server.Env.STM (ServerStoreCfg (..), StartOptions (..), dbStoreCfg, storeLogFile') import Simplex.Messaging.Server.Main.GitCommit import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..)) import Simplex.Messaging.Transport (ASrvTransport, ATransport (..), TLS, Transport (..), simplexMQVersion) @@ -414,12 +414,13 @@ printServerTransports protocol ts = do \Set `port` in smp-server.ini section [TRANSPORT] to `5223,443`\n" printSMPServerConfig :: [(ServiceName, ASrvTransport, AddHTTP)] -> ServerStoreCfg s -> IO () -printSMPServerConfig transports = \case - SSCMemory sp_ -> printServerConfig "SMP" transports $ (\StorePaths {storeLogFile} -> storeLogFile) <$> sp_ - SSCMemoryJournal {storeLogFile} -> printServerConfig "SMP" transports $ Just storeLogFile - SSCDatabaseJournal {storeCfg = PostgresStoreCfg {dbOpts = DBOpts {connstr, schema}}} -> do - B.putStrLn $ "PostgreSQL database: " <> connstr <> ", schema: " <> schema - printServerTransports "SMP" transports +printSMPServerConfig transports st = case dbStoreCfg st of + Just cfg -> printDBConfig cfg + Nothing -> printServerConfig "SMP" transports $ storeLogFile' st + where + printDBConfig PostgresStoreCfg {dbOpts = DBOpts {connstr, schema}} = do + B.putStrLn $ "PostgreSQL database: " <> connstr <> ", schema: " <> schema + printServerTransports "SMP" transports deleteDirIfExists :: FilePath -> IO () deleteDirIfExists path = whenM (doesDirectoryExist path) $ removeDirectoryRecursive path diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 3bc535b8c..b72922f04 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -72,7 +72,10 @@ module Simplex.Messaging.Server.Env.STM defaultIdleQueueInterval, journalMsgStoreDepth, readWriteQueueStore, + noPostgresExitStr, noPostgresExit, + dbStoreCfg, + storeLogFile', ) where @@ -131,6 +134,10 @@ import System.IO (IOMode (..)) import System.Mem.Weak (Weak) import UnliftIO.STM +#if defined(dbServerPostgres) +import Simplex.Messaging.Server.MsgStore.Postgres +#endif + data ServerConfig s = ServerConfig { transports :: [(ServiceName, ASrvTransport, AddHTTP)], smpHandshakeTimeout :: Int, @@ -275,14 +282,25 @@ fromMsgStore :: MsgStore s -> s fromMsgStore = \case StoreMemory s -> s StoreJournal s -> s +#if defined(dbServerPostgres) + StoreDatabase s -> s +#endif {-# INLINE fromMsgStore #-} type family SupportedStore (qs :: QSType) (ms :: MSType) :: Constraint where SupportedStore 'QSMemory 'MSMemory = () SupportedStore 'QSMemory 'MSJournal = () - SupportedStore 'QSPostgres 'MSJournal = () + SupportedStore 'QSMemory 'MSPostgres = + (Int ~ Bool, TypeError ('TE.Text "Storing messages in Postgres DB with queues in memory is not supported")) SupportedStore 'QSPostgres 'MSMemory = - (Int ~ Bool, TypeError ('TE.Text "Storing messages in memory with Postgres DB is not supported")) + (Int ~ Bool, TypeError ('TE.Text "Storing messages in memory with queues in Postgres DB is not supported")) + SupportedStore 'QSPostgres 'MSJournal = () +#if defined(dbServerPostgres) + SupportedStore 'QSPostgres 'MSPostgres = () +#else + SupportedStore 'QSPostgres 'MSPostgres = + (Int ~ Bool, TypeError ('TE.Text "Server compiled without server_postgres flag")) +#endif data AStoreType = forall qs ms. (SupportedStore qs ms, MsgStoreClass (MsgStoreType qs ms)) => @@ -292,16 +310,43 @@ data ServerStoreCfg s where SSCMemory :: Maybe StorePaths -> ServerStoreCfg STMMsgStore SSCMemoryJournal :: {storeLogFile :: FilePath, storeMsgsPath :: FilePath} -> ServerStoreCfg (JournalMsgStore 'QSMemory) SSCDatabaseJournal :: {storeCfg :: PostgresStoreCfg, storeMsgsPath' :: FilePath} -> ServerStoreCfg (JournalMsgStore 'QSPostgres) +#if defined(dbServerPostgres) + SSCDatabase :: PostgresStoreCfg -> ServerStoreCfg PostgresMsgStore +#endif + +dbStoreCfg :: ServerStoreCfg s -> Maybe PostgresStoreCfg +dbStoreCfg = \case + SSCMemory _ -> Nothing + SSCMemoryJournal {} -> Nothing + SSCDatabaseJournal {storeCfg} -> Just storeCfg +#if defined(dbServerPostgres) + SSCDatabase cfg -> Just cfg +#endif + +storeLogFile' :: ServerStoreCfg s -> Maybe FilePath +storeLogFile' = \case + SSCMemory sp_ -> (\StorePaths {storeLogFile} -> storeLogFile) <$> sp_ + SSCMemoryJournal {storeLogFile} -> Just storeLogFile + SSCDatabaseJournal {storeCfg = PostgresStoreCfg {dbStoreLogPath}} -> dbStoreLogPath +#if defined(dbServerPostgres) + SSCDatabase (PostgresStoreCfg {dbStoreLogPath}) -> dbStoreLogPath +#endif data StorePaths = StorePaths {storeLogFile :: FilePath, storeMsgsFile :: Maybe FilePath} type family MsgStoreType (qs :: QSType) (ms :: MSType) where MsgStoreType 'QSMemory 'MSMemory = STMMsgStore MsgStoreType qs 'MSJournal = JournalMsgStore qs +#if defined(dbServerPostgres) + MsgStoreType 'QSPostgres 'MSPostgres = PostgresMsgStore +#endif data MsgStore s where StoreMemory :: STMMsgStore -> MsgStore STMMsgStore StoreJournal :: JournalMsgStore qs -> MsgStore (JournalMsgStore qs) +#if defined(dbServerPostgres) + StoreDatabase :: PostgresMsgStore -> MsgStore PostgresMsgStore +#endif data Server s = Server { clients :: ServerClients s, @@ -533,8 +578,12 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp qsCfg = PQStoreCfg (storeCfg {confirmMigrations} :: PostgresStoreCfg) cfg = mkJournalStoreConfig qsCfg storeMsgsPath' msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval when compactLog $ compactDbStoreLog $ dbStoreLogPath storeCfg - ms <- newMsgStore cfg - pure $ StoreJournal ms + StoreJournal <$> newMsgStore cfg + SSCDatabase storeCfg -> do + let StartOptions {compactLog, confirmMigrations} = startOptions config + cfg = PostgresMsgStoreCfg storeCfg {confirmMigrations} msgQueueQuota + when compactLog $ compactDbStoreLog $ dbStoreLogPath storeCfg + StoreDatabase <$> newMsgStore cfg #else SSCDatabaseJournal {} -> noPostgresExit #endif @@ -628,10 +677,12 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp _ -> SPMMessages noPostgresExit :: IO a -noPostgresExit = do - putStrLn "Error: server binary is compiled without support for PostgreSQL database." - putStrLn "Please download `smp-server-postgres` or re-compile with `cabal build -fserver_postgres`." - exitFailure +noPostgresExit = putStrLn noPostgresExitStr >> exitFailure + +noPostgresExitStr :: String +noPostgresExitStr = + "Error: server binary is compiled without support for PostgreSQL database.\n" + <> "Please download `smp-server-postgres` or re-compile with `cabal build -fserver_postgres`." mkJournalStoreConfig :: QStoreCfg s -> FilePath -> Int -> Int -> Int -> Int64 -> JournalStoreConfig s mkJournalStoreConfig queueStoreCfg storePath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval = diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 59aacc2c4..a4164421f 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -18,9 +18,10 @@ module Simplex.Messaging.Server.Main where import Control.Concurrent.STM -import Control.Exception (finally) +import Control.Exception (SomeException, finally, try) import Control.Logger.Simple import Control.Monad +import qualified Data.Attoparsec.ByteString.Char8 as A import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Char (isAlpha, isAscii, toUpper) @@ -64,7 +65,7 @@ import Simplex.Messaging.Util (eitherToMaybe, ifM, unlessM) import System.Directory (createDirectoryIfMissing, doesDirectoryExist, doesFileExist) import System.Exit (exitFailure) import System.FilePath (combine) -import System.IO (BufferMode (..), hSetBuffering, stderr, stdout) +import System.IO (BufferMode (..), IOMode (..), hSetBuffering, stderr, stdout, withFile) import Text.Read (readMaybe) #if defined(dbServerPostgres) @@ -73,6 +74,7 @@ import Simplex.Messaging.Agent.Store.Postgres (checkSchemaExists) import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue) import Simplex.Messaging.Server.MsgStore.Types (QSType (..)) import Simplex.Messaging.Server.MsgStore.Journal (postgresQueueStore) +import Simplex.Messaging.Server.MsgStore.Postgres (PostgresMsgStoreCfg (..), batchInsertMessages, exportDbMessages) import Simplex.Messaging.Server.QueueStore.Postgres (batchInsertQueues, batchInsertServices, foldQueueRecs, foldServiceRecs) import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..)) import Simplex.Messaging.Server.QueueStore.Types @@ -129,6 +131,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = printMessageStats "Messages" msgStats putStrLn $ case readStoreType ini of Right (ASType SQSMemory SMSMemory) -> "store_messages set to `memory`, update it to `journal` in INI file" + Right (ASType SQSPostgres SMSPostgres) -> "store_messages set to `database`, update it to `journal` in INI file" Right (ASType _ SMSJournal) -> "store_messages set to `journal`" Left e -> e <> ", configure storage correctly" SCExport @@ -149,8 +152,8 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = putStrLn $ case msType of SMSMemory -> "store_messages set to `memory`, start the server." SMSJournal -> "store_messages set to `journal`, update it to `memory` in INI file" - Right (ASType SQSPostgres SMSJournal) -> do #if defined(dbServerPostgres) + Right (ASType SQSPostgres SMSJournal) -> do let dbStoreLogPath = enableDbStoreLog' ini $> storeLogFilePath dbOpts@DBOpts {connstr, schema} = iniDBOptions ini defaultDBOpts unlessM (checkSchemaExists connstr schema) $ do @@ -160,8 +163,11 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = exportMessages True (StoreJournal ms) storeMsgsFilePath False putStrLn "Export completed" putStrLn "store_messages set to `journal`, store_queues is set to `database`.\nExport queues to store log to use memory storage for messages (`smp-server database export`)." + Right (ASType SQSPostgres SMSPostgres) -> do + putStrLn $ "Messages can be exported with `dabatase export --table messages`." + exitFailure #else - noPostgresExit + Right (ASType SQSPostgres SMSJournal) -> noPostgresExit #endif Left e -> putStrLn $ e <> ", configure storage correctly" SCDelete @@ -175,11 +181,12 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = deleteDirIfExists storeMsgsJournalDir putStrLn $ "Deleted all messages in journal " <> storeMsgsJournalDir #if defined(dbServerPostgres) - Database cmd dbOpts@DBOpts {connstr, schema} -> withIniFile $ \ini -> do + Database cmd tables dbOpts@DBOpts {connstr, schema} -> withIniFile $ \ini -> do schemaExists <- checkSchemaExists connstr schema storeLogExists <- doesFileExist storeLogFilePath - case cmd of - SCImport + msgsFileExists <- doesFileExist storeMsgsFilePath + case (cmd, tables) of + (SCImport, DTQueues) | schemaExists && storeLogExists -> exitConfigureQueueStore connstr schema | schemaExists -> do putStrLn $ "Schema " <> B.unpack schema <> " already exists in PostrgreSQL database: " <> B.unpack connstr @@ -197,12 +204,29 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = putStrLn $ case readStoreType ini of Right (ASType SQSMemory SMSMemory) -> setToDbStr <> "\nstore_messages set to `memory`, import messages to journal to use PostgreSQL database for queues (`smp-server journal import`)" Right (ASType SQSMemory SMSJournal) -> setToDbStr - Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, start the server." + Right (ASType SQSPostgres _) -> "store_queues set to `database`, start the server." Left e -> e <> ", configure storage correctly" where setToDbStr :: String setToDbStr = "store_queues set to `memory`, update it to `database` in INI file" - SCExport + (SCImport, DTMessages) + | not schemaExists -> do + putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr + exitFailure + | not msgsFileExists -> do + putStrLn $ storeMsgsFilePath <> " file does not exist." + exitFailure + | otherwise -> do + confirmOrExit + ("WARNING: message log file " <> storeMsgsFilePath <> " will be imported to PostrgreSQL database " <> B.unpack connstr <> ", schema: " <> B.unpack schema) + "Message records not imported" + mCnt <- importMessagesToDatabase storeMsgsFilePath dbOpts + putStrLn $ "Import completed: " <> show mCnt <> " messages" + putStrLn $ case readStoreType ini of + Right (ASType SQSPostgres SMSPostgres) -> "store_queues and store_messages set to `database`, start the server." + Right _ -> "set store_queues and store_messages set to `database` in INI file" + Left e -> e <> ", configure storage correctly" + (SCExport, DTQueues) | schemaExists && storeLogExists -> exitConfigureQueueStore connstr schema | not schemaExists -> do putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr @@ -212,15 +236,33 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = exitFailure | otherwise -> do confirmOrExit - ("WARNING: PostrgreSQL database schema " <> B.unpack schema <> " (database: " <> B.unpack connstr <> ") will be exported to store log file " <> storeLogFilePath) + ("WARNING: PostrgreSQL schema " <> B.unpack schema <> " (database: " <> B.unpack connstr <> ") will be exported to store log file " <> storeLogFilePath) "Queue records not exported" (sCnt, qCnt) <- exportDatabaseToStoreLog logPath dbOpts storeLogFilePath putStrLn $ "Export completed: " <> show sCnt <> " services, " <> show qCnt <> " queues" putStrLn $ case readStoreType ini of - Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, update it to `memory` in INI file." + Right (ASType SQSPostgres _) -> "store_queues or store_messages set to `database`, update it to `memory` in INI file." Right (ASType SQSMemory _) -> "store_queues set to `memory`, start the server" Left e -> e <> ", configure storage correctly" - SCDelete + (SCExport, DTMessages) + | not schemaExists -> do + putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr + exitFailure + | msgsFileExists -> do + putStrLn $ storeMsgsFilePath <> " file already exists." + exitFailure + | otherwise -> do + confirmOrExit + ("WARNING: Messages from PostrgreSQL schema " <> B.unpack schema <> " (database: " <> B.unpack connstr <> ") will be exported to message log file " <> storeMsgsFilePath) + "Message records not exported" + let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL} + ms <- newMsgStore $ PostgresMsgStoreCfg storeCfg defaultMsgQueueQuota + withFile storeMsgsFilePath WriteMode (try . exportDbMessages True ms) >>= \case + Right mCnt -> do + putStrLn $ "Export completed: " <> show mCnt <> " messages" + putStrLn "Export queues with `smp-server database export queues`" + Left (e :: SomeException) -> putStrLn $ "Error exporting messages: " <> show e + (SCDelete, _) | not schemaExists -> do putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr exitFailure @@ -254,8 +296,14 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = readStoreType ini = case (iniStoreQueues, iniStoreMessage) of ("memory", "memory") -> Right $ ASType SQSMemory SMSMemory ("memory", "journal") -> Right $ ASType SQSMemory SMSJournal + ("memory", "database") -> Left "Database and memory storage are not compatible." + ("database", "memory") -> Left "Database and memory storage are not compatible." ("database", "journal") -> Right $ ASType SQSPostgres SMSJournal - ("database", "memory") -> Left "Using PostgreSQL database requires journal memory storage." +#if defined(dbServerPostgres) + ("database", "database") -> Right $ ASType SQSPostgres SMSPostgres +#else + ("database", "database") -> Left noPostgresExitStr +#endif (q, m) -> Left $ T.unpack $ "Invalid storage settings: store_queues: " <> q <> ", store_messages: " <> m where iniStoreQueues = fromRight "memory" $ lookupValue "STORE_LOG" "store_queues" ini @@ -405,6 +453,12 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = let dbStoreLogPath = enableDbStoreLog' ini $> storeLogFilePath storeCfg = PostgresStoreCfg {dbOpts = iniDBOptions ini defaultDBOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini} in SSCDatabaseJournal {storeCfg, storeMsgsPath' = storeMsgsJournalDir} +#if defined(dbServerPostgres) + iniStoreCfg SQSPostgres SMSPostgres = + let dbStoreLogPath = enableDbStoreLog' ini $> storeLogFilePath + storeCfg = PostgresStoreCfg {dbOpts = iniDBOptions ini defaultDBOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini} + in SSCDatabase storeCfg +#endif serverConfig :: ServerStoreCfg s -> ServerConfig s serverConfig serverStoreCfg = ServerConfig @@ -514,6 +568,14 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = msgsFileExists <- doesFileExist storeMsgsFilePath storeLogExists <- doesFileExist storeLogFilePath case mode of +#if defined(dbServerPostgres) + ASType SQSPostgres SMSPostgres + | msgsFileExists || msgsDirExists -> do + putStrLn $ "Error: " <> storeMsgsFilePath <> " file or " <> storeMsgsJournalDir <> " directory are present." + putStrLn "Configure memory storage." + exitFailure + | otherwise -> checkDbStorage ini storeLogExists +#endif ASType qs SMSJournal | msgsFileExists && msgsDirExists -> exitConfigureMsgStorage | msgsFileExists -> do @@ -526,28 +588,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = SQSMemory -> unless (storeLogExists) $ putStrLn $ "store_queues is `memory`, " <> storeLogFilePath <> " file will be created." #if defined(dbServerPostgres) - SQSPostgres -> do - let DBOpts {connstr, schema} = iniDBOptions ini defaultDBOpts - schemaExists <- checkSchemaExists connstr schema - case enableDbStoreLog' ini of - Just () - | not schemaExists -> noDatabaseSchema connstr schema - | not storeLogExists -> do - putStrLn $ "Error: db_store_log is `on`, " <> storeLogFilePath <> " does not exist" - exitFailure - | otherwise -> pure () - Nothing - | storeLogExists && schemaExists -> exitConfigureQueueStore connstr schema - | storeLogExists -> do - putStrLn $ "Error: store_queues is `database` with " <> storeLogFilePath <> " file present." - putStrLn "Set store_queues to `memory` or use `smp-server database import` to migrate." - exitFailure - | not schemaExists -> noDatabaseSchema connstr schema - | otherwise -> pure () - where - noDatabaseSchema connstr schema = do - putStrLn $ "Error: store_queues is `database`, create schema " <> B.unpack schema <> " in PostgreSQL database " <> B.unpack connstr - exitFailure + SQSPostgres -> checkDbStorage ini storeLogExists #else SQSPostgres -> noPostgresExit #endif @@ -565,6 +606,29 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = exitFailure #if defined(dbServerPostgres) + checkDbStorage ini storeLogExists = do + let DBOpts {connstr, schema} = iniDBOptions ini defaultDBOpts + schemaExists <- checkSchemaExists connstr schema + case enableDbStoreLog' ini of + Just () + | not schemaExists -> noDatabaseSchema connstr schema + | not storeLogExists -> do + putStrLn $ "Error: db_store_log is `on`, " <> storeLogFilePath <> " does not exist" + exitFailure + | otherwise -> pure () + Nothing + | storeLogExists && schemaExists -> exitConfigureQueueStore connstr schema + | storeLogExists -> do + putStrLn $ "Error: store_queues is `database` with " <> storeLogFilePath <> " file present." + putStrLn "Set store_queues to `memory` or use `smp-server database import` to migrate." + exitFailure + | not schemaExists -> noDatabaseSchema connstr schema + | otherwise -> pure () + where + noDatabaseSchema connstr schema = do + putStrLn $ "Error: store_queues is `database`, create schema " <> B.unpack schema <> " in PostgreSQL database " <> B.unpack connstr + exitFailure + exitConfigureQueueStore connstr schema = do putStrLn $ "Error: both " <> storeLogFilePath <> " file and " <> B.unpack schema <> " schema are present (database: " <> B.unpack connstr <> ")." putStrLn "Configure queue storage." @@ -585,6 +649,14 @@ importStoreLogToDatabase logPath storeLogFile dbOpts = do renameFile storeLogFile $ storeLogFile <> ".bak" pure (sCnt, qCnt) +importMessagesToDatabase :: FilePath -> DBOpts -> IO Int64 +importMessagesToDatabase msgsLogFile dbOpts = do + let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL} + ms <- newMsgStore $ PostgresMsgStoreCfg storeCfg defaultMsgQueueQuota + mCnt <- batchInsertMessages True msgsLogFile $ queueStore ms + renameFile msgsLogFile $ msgsLogFile <> ".bak" + pure mCnt + exportDatabaseToStoreLog :: FilePath -> DBOpts -> FilePath -> IO (Int, Int) exportDatabaseToStoreLog logPath dbOpts storeLogFilePath = do let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL} @@ -677,10 +749,22 @@ data CliCommand | Start StartOptions | Delete | Journal StoreCmd - | Database StoreCmd DBOpts + | Database StoreCmd DatabaseTable DBOpts data StoreCmd = SCImport | SCExport | SCDelete +data DatabaseTable = DTQueues | DTMessages + +instance StrEncoding DatabaseTable where + strEncode = \case + DTQueues -> "queues" + DTMessages -> "messages" + strP = + A.takeTill (== ' ') >>= \case + "queues" -> pure DTQueues + "messages" -> pure DTMessages + _ -> fail "DatabaseTable" + cliCommandP :: FilePath -> FilePath -> FilePath -> Parser CliCommand cliCommandP cfgPath logPath iniFile = hsubparser @@ -689,7 +773,7 @@ cliCommandP cfgPath logPath iniFile = <> command "start" (info (Start <$> startOptionsP) (progDesc $ "Start server (configuration: " <> iniFile <> ")")) <> command "delete" (info (pure Delete) (progDesc "Delete configuration and log files")) <> command "journal" (info (Journal <$> journalCmdP) (progDesc "Import/export messages to/from journal storage")) - <> command "database" (info (Database <$> databaseCmdP <*> dbOptsP defaultDBOpts) (progDesc "Import/export queues to/from PostgreSQL database storage")) + <> command "database" (info (Database <$> databaseCmdP <*> dbTableP <*> dbOptsP defaultDBOpts) (progDesc "Import/export queues to/from PostgreSQL database storage")) ) where initP :: Parser InitOptions @@ -843,6 +927,13 @@ cliCommandP cfgPath logPath iniFile = <> command "export" (info (pure SCExport) (progDesc $ "Export " <> dest <> " to " <> src)) <> command "delete" (info (pure SCDelete) (progDesc $ "Delete " <> dest)) ) + dbTableP = + option + strParse + ( long "table" + <> help "Database tables: queues/messages" + <> metavar "TABLE" + ) parseBasicAuth :: ReadM ServerPassword parseBasicAuth = eitherReader $ fmap ServerPassword . strDecode . B.pack entityP :: String -> String -> String -> Parser (Maybe Entity, Maybe Text) diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index be73ab681..3a639238b 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -24,7 +24,7 @@ module Simplex.Messaging.Server.MsgStore.Journal ( JournalMsgStore (random, expireBackupsBefore), QStore (..), QStoreCfg (..), - JournalQueue, + JournalQueue (msgQueue'), -- msgQueue' is used in tests JournalMsgQueue (queue, state), JMQueue (queueDirectory, statePath), JournalStoreConfig (..), @@ -291,13 +291,10 @@ newtype StoreIO (s :: QSType) a = StoreIO {unStoreIO :: IO a} deriving newtype (Functor, Applicative, Monad) instance StoreQueueClass (JournalQueue s) where - type MsgQueue (JournalQueue s) = JournalMsgQueue s recipientId = recipientId' {-# INLINE recipientId #-} queueRec = queueRec' {-# INLINE queueRec #-} - msgQueue = msgQueue' - {-# INLINE msgQueue #-} withQueueLock :: JournalQueue s -> Text -> IO a -> IO a withQueueLock JournalQueue {recipientId', queueLock, sharedLock} = withLockWaitShared recipientId' queueLock sharedLock @@ -379,6 +376,7 @@ makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do instance MsgStoreClass (JournalMsgStore s) where type StoreMonad (JournalMsgStore s) = StoreIO s + type MsgQueue (JournalMsgStore s) = JournalMsgQueue s type QueueStore (JournalMsgStore s) = QStore s type StoreQueue (JournalMsgStore s) = JournalQueue s type MsgStoreConfig (JournalMsgStore s) = JournalStoreConfig s @@ -422,7 +420,7 @@ instance MsgStoreClass (JournalMsgStore s) where expireOldMessages :: Bool -> JournalMsgStore s -> Int64 -> Int64 -> IO MessageStats expireOldMessages tty ms now ttl = case queueStore_ ms of MQStore st -> - withLoadedQueues st $ \q -> run $ isolateQueue q "deleteExpiredMsgs" $ do + withLoadedQueues st $ \q -> run $ isolateQueue ms q "deleteExpiredMsgs" $ do StoreIO (readTVarIO $ queueRec q) >>= \case Just QueueRec {updatedAt = Just (RoundedSystemTime t)} | t > veryOld -> expireQueueMsgs ms now old q @@ -587,7 +585,7 @@ instance MsgStoreClass (JournalMsgStore s) where | otherwise = pure [] writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) - writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do + writeMsg ms q' logState msg = isolateQueue ms q' "writeMsg" $ do q <- getMsgQueue ms q' True StoreIO $ (`E.finally` updateActiveAt q') $ do st@MsgQueueState {canWrite, size} <- readTVarIO (state q) @@ -661,8 +659,8 @@ instance MsgStoreClass (JournalMsgStore s) where $>>= \len -> readTVarIO handles $>>= \hs -> updateReadPos q mq logState len hs $> Just () - isolateQueue :: JournalQueue s -> Text -> StoreIO s a -> ExceptT ErrorType IO a - isolateQueue sq op = tryStore' op (recipientId' sq) . withQueueLock sq op . unStoreIO + isolateQueue :: JournalMsgStore s -> JournalQueue s -> Text -> StoreIO s a -> ExceptT ErrorType IO a + isolateQueue _ sq op = tryStore' op (recipientId' sq) . withQueueLock sq op . unStoreIO unsafeRunStore :: JournalQueue s -> Text -> StoreIO s a -> IO a unsafeRunStore sq op a = @@ -977,10 +975,11 @@ deleteQueue_ ms q = pure r where rId = recipientId q - remove r@(_, mq_) = do + remove qr = do + mq_ <- atomically $ swapTVar (msgQueue' q) Nothing mapM_ (closeMsgQueueHandles ms) mq_ removeQueueDirectory ms rId - pure r + pure (qr, mq_) closeMsgQueue :: JournalMsgStore s -> JournalQueue s -> IO () closeMsgQueue ms JournalQueue {msgQueue'} = atomically (swapTVar msgQueue' Nothing) >>= mapM_ (closeMsgQueueHandles ms) diff --git a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs new file mode 100644 index 000000000..0c714e089 --- /dev/null +++ b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs @@ -0,0 +1,325 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE InstanceSigs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MultiWayIf #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE TypeFamilies #-} + +module Simplex.Messaging.Server.MsgStore.Postgres + ( PostgresMsgStore, + PostgresMsgStoreCfg (..), + PostgresQueue, + exportDbMessages, + getDbMessageStats, + batchInsertMessages, + ) +where + +import Control.Concurrent.STM +import Control.Monad +import Control.Monad.Reader +import Control.Monad.Trans.Except +import qualified Data.ByteString as B +import qualified Data.ByteString.Builder as BB +import qualified Data.ByteString.Lazy as LB +import Data.Functor (($>)) +import Data.IORef +import Data.Int (Int64) +import Data.List (intersperse) +import qualified Data.Map.Strict as M +import Data.Text (Text) +import Data.Time.Clock.System (SystemTime (..)) +import Database.PostgreSQL.Simple (Binary (..), Only (..), (:.) (..)) +import qualified Database.PostgreSQL.Simple as DB +import qualified Database.PostgreSQL.Simple.Copy as DB +import Database.PostgreSQL.Simple.SqlQQ (sql) +import Database.PostgreSQL.Simple.ToField (ToField (..)) +import Simplex.Messaging.Agent.Store.Postgres.Common +import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Protocol +import Simplex.Messaging.Server.MsgStore +import Simplex.Messaging.Server.MsgStore.Types +import Simplex.Messaging.Server.QueueStore +import Simplex.Messaging.Server.QueueStore.Postgres +import Simplex.Messaging.Server.QueueStore.Types +import Simplex.Messaging.Server.StoreLog (foldLogLines) +import Simplex.Messaging.Encoding.String +import Simplex.Messaging.Util (maybeFirstRow, maybeFirstRow', (<$$>)) +import System.IO (Handle, hFlush, stdout) + +data PostgresMsgStore = PostgresMsgStore + { config :: PostgresMsgStoreCfg, + queueStore_ :: PostgresQueueStore' + } + +data PostgresMsgStoreCfg = PostgresMsgStoreCfg + { queueStoreCfg :: PostgresStoreCfg, + quota :: Int + } + +type PostgresQueueStore' = PostgresQueueStore PostgresQueue + +data PostgresQueue = PostgresQueue + { recipientId' :: RecipientId, + queueRec' :: TVar (Maybe QueueRec) + } + +instance StoreQueueClass PostgresQueue where + recipientId = recipientId' + {-# INLINE recipientId #-} + queueRec = queueRec' + {-# INLINE queueRec #-} + withQueueLock PostgresQueue {} _ = id -- TODO [messages] maybe it's just transaction? + {-# INLINE withQueueLock #-} + +newtype DBTransaction = DBTransaction {dbConn :: DB.Connection} + +type DBStoreIO a = ReaderT DBTransaction IO a + +instance MsgStoreClass PostgresMsgStore where + type StoreMonad PostgresMsgStore = ReaderT DBTransaction IO + type MsgQueue PostgresMsgStore = () + type QueueStore PostgresMsgStore = PostgresQueueStore' + type StoreQueue PostgresMsgStore = PostgresQueue + type MsgStoreConfig PostgresMsgStore = PostgresMsgStoreCfg + + newMsgStore :: PostgresMsgStoreCfg -> IO PostgresMsgStore + newMsgStore config = do + queueStore_ <- newQueueStore @PostgresQueue $ queueStoreCfg config + pure PostgresMsgStore {config, queueStore_} + + closeMsgStore :: PostgresMsgStore -> IO () + closeMsgStore = closeQueueStore @PostgresQueue . queueStore_ + + withActiveMsgQueues _ _ = error "withActiveMsgQueues not used" + + unsafeWithAllMsgQueues _ _ _ = error "unsafeWithAllMsgQueues not used" + + expireOldMessages :: Bool -> PostgresMsgStore -> Int64 -> Int64 -> IO MessageStats + expireOldMessages _tty ms now ttl = + maybeFirstRow' newMessageStats toMessageStats $ withConnection st $ \db -> + DB.query db "CALL expire_old_messages(?,?,0,0,0)" (now, ttl) + where + st = dbStore $ queueStore_ ms + toMessageStats (expiredMsgsCount, storedMsgsCount, storedQueues) = + MessageStats {expiredMsgsCount, storedMsgsCount, storedQueues} + + logQueueStates _ = error "logQueueStates not used" + + logQueueState _ = error "logQueueState not used" + + queueStore = queueStore_ + {-# INLINE queueStore #-} + + loadedQueueCounts :: PostgresMsgStore -> IO LoadedQueueCounts + loadedQueueCounts ms = do + loadedQueueCount <- M.size <$> readTVarIO queues + loadedNotifierCount <- M.size <$> readTVarIO notifiers + notifierLockCount <- M.size <$> readTVarIO notifierLocks + pure LoadedQueueCounts {loadedQueueCount, loadedNotifierCount, openJournalCount = 0, queueLockCount = 0, notifierLockCount} + where + PostgresQueueStore {queues, notifiers, notifierLocks} = queueStore_ ms + + mkQueue :: PostgresMsgStore -> Bool -> RecipientId -> QueueRec -> IO PostgresQueue + mkQueue _ _keepLock rId qr = PostgresQueue rId <$> newTVarIO (Just qr) + {-# INLINE mkQueue #-} + + getMsgQueue _ _ _ = pure () + {-# INLINE getMsgQueue #-} + + getPeekMsgQueue :: PostgresMsgStore -> PostgresQueue -> DBStoreIO (Maybe ((), Message)) + getPeekMsgQueue _ q = ((),) <$$> tryPeekMsg_ q () + + withIdleMsgQueue :: Int64 -> PostgresMsgStore -> PostgresQueue -> (() -> DBStoreIO a) -> DBStoreIO (Maybe a, Int) + withIdleMsgQueue _ _ _ _ = error "withIdleMsgQueue not used" + + deleteQueue :: PostgresMsgStore -> PostgresQueue -> IO (Either ErrorType QueueRec) + deleteQueue ms q = deleteStoreQueue (queueStore_ ms) q + {-# INLINE deleteQueue #-} + + deleteQueueSize :: PostgresMsgStore -> PostgresQueue -> IO (Either ErrorType (QueueRec, Int)) + deleteQueueSize ms q = runExceptT $ do + size <- getQueueSize ms q + qr <- ExceptT $ deleteStoreQueue (queueStore_ ms) q + pure (qr, size) + + getQueueMessages_ _ _ _ = error "getQueueMessages_ not used" + + writeMsg :: PostgresMsgStore -> PostgresQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) + writeMsg ms q _ msg = + withDB' "writeMsg" (queueStore_ ms) $ \db -> do + let (msgQuota, ntf, body) = case msg of + Message {msgFlags = MsgFlags ntf', msgBody = C.MaxLenBS body'} -> (False, ntf', body') + MessageQuota {} -> (True, False, B.empty) + toResult <$> + DB.query + db + "SELECT quota_written, was_empty FROM write_message(?,?,?,?,?,?,?)" + (recipientId' q, Binary (messageId msg), systemSeconds (messageTs msg), msgQuota, ntf, Binary body, quota) + where + toResult = \case + ((msgQuota, wasEmpty) : _) -> if msgQuota then Nothing else Just (msg, wasEmpty) + [] -> Nothing + PostgresMsgStore {config = PostgresMsgStoreCfg {quota}} = ms + + setOverQuota_ :: PostgresQueue -> IO () -- can ONLY be used while restoring messages, not while server running + setOverQuota_ _ = error "TODO setOverQuota_" -- TODO [messages] + + getQueueSize_ :: () -> DBStoreIO Int + getQueueSize_ _ = error "getQueueSize_ not used" + + getQueueSize :: PostgresMsgStore -> PostgresQueue -> ExceptT ErrorType IO Int + getQueueSize ms q = + withDB' "getQueueSize" (queueStore_ ms) $ \db -> + maybeFirstRow' 0 fromOnly $ + DB.query db "SELECT msg_queue_size FROM msg_queues WHERE recipient_id = ? AND deleted_at IS NULL" (Only (recipientId' q)) + + tryPeekMsg_ :: PostgresQueue -> () -> DBStoreIO (Maybe Message) + tryPeekMsg_ q _ = do + db <- asks dbConn + liftIO $ maybeFirstRow toMessage $ + DB.query + db + [sql| + SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body + FROM messages + WHERE recipient_id = ? + ORDER BY message_id ASC LIMIT 1 + |] + (Only (recipientId' q)) + + tryDeleteMsg_ :: PostgresQueue -> () -> Bool -> DBStoreIO () + tryDeleteMsg_ _q _ _ = error "tryDeleteMsg_ not used" -- do + + isolateQueue :: PostgresMsgStore -> PostgresQueue -> Text -> DBStoreIO a -> ExceptT ErrorType IO a + isolateQueue ms _q op a = withDB' op (queueStore_ ms) $ runReaderT a . DBTransaction + + unsafeRunStore _ _ _ = error "unsafeRunStore not used" + + tryPeekMsg :: PostgresMsgStore -> PostgresQueue -> ExceptT ErrorType IO (Maybe Message) + tryPeekMsg ms q = isolateQueue ms q "tryPeekMsg" $ tryPeekMsg_ q () + {-# INLINE tryPeekMsg #-} + + tryDelMsg :: PostgresMsgStore -> PostgresQueue -> MsgId -> ExceptT ErrorType IO (Maybe Message) + tryDelMsg ms q msgId = + withDB' "tryDelMsg" (queueStore_ ms) $ \db -> + maybeFirstRow toMessage $ + DB.query db "SELECT r_msg_id, r_msg_ts, r_msg_quota, r_msg_ntf_flag, r_msg_body FROM try_del_msg(?, ?)" (recipientId' q, Binary msgId) + + tryDelPeekMsg :: PostgresMsgStore -> PostgresQueue -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message) + tryDelPeekMsg ms q msgId = + withDB' "tryDelPeekMsg" (queueStore_ ms) $ \db -> + toResult . map toMessage + <$> DB.query db "SELECT r_msg_id, r_msg_ts, r_msg_quota, r_msg_ntf_flag, r_msg_body FROM try_del_peek_msg(?, ?)" (recipientId' q, Binary msgId) + where + toResult = \case + [] -> (Nothing, Nothing) + [msg] + | messageId msg == msgId -> (Just msg, Nothing) + | otherwise -> (Nothing, Just msg) + deleted : next : _ -> (Just deleted, Just next) + + deleteExpiredMsgs :: PostgresMsgStore -> PostgresQueue -> Int64 -> ExceptT ErrorType IO Int + deleteExpiredMsgs ms q old = + maybeFirstRow' 0 (fromIntegral @Int64 . fromOnly) $ withDB' "deleteExpiredMsgs" (queueStore_ ms) $ \db -> + DB.query db "SELECT delete_expired_msgs(?, ?)" (recipientId' q, old) + +toMessage :: (Binary MsgId, Int64, Bool, Bool, Binary MsgBody) -> Message +toMessage (Binary msgId, ts, msgQuota, ntf, Binary body) + | msgQuota = MessageQuota {msgId, msgTs} + | otherwise = Message {msgId, msgTs, msgFlags = MsgFlags ntf, msgBody = C.unsafeMaxLenBS body} -- TODO [messages] unsafeMaxLenBS? + where + msgTs = MkSystemTime ts 0 + +exportDbMessages :: Bool -> PostgresMsgStore -> Handle -> IO Int +exportDbMessages tty ms h = do + rows <- newIORef [] + n <- withConnection st $ \db -> DB.foldWithOptions_ opts db query 0 $ \i r -> do + let i' = i + 1 + if i' `mod` 1000 > 0 + then modifyIORef rows (r :) + else do + readIORef rows >>= writeMessages . (r :) + writeIORef rows [] + when tty $ putStr (progress i' <> "\r") >> hFlush stdout + pure i' + readIORef rows >>= \rs -> unless (null rs) $ writeMessages rs + when tty $ putStrLn $ progress n + pure n + where + st = dbStore $ queueStore_ ms + opts = DB.defaultFoldOptions {DB.fetchQuantity = DB.Fixed 1000} + query = + [sql| + SELECT recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body + FROM messages + ORDER BY recipient_id, message_id ASC + |] + writeMessages = BB.hPutBuilder h . encodeMessages . reverse + encodeMessages = mconcat . map (\(Only rId :. msg) -> BB.byteString (strEncode $ MLRv3 rId $ toMessage msg) <> BB.char8 '\n') + progress i = "Processed: " <> show i <> " records" + +getDbMessageStats :: PostgresMsgStore -> IO MessageStats +getDbMessageStats ms = + maybeFirstRow' newMessageStats toMessageStats $ withConnection st $ \db -> + DB.query_ + db + [sql| + SELECT + (SELECT COUNT (1) FROM msg_queues WHERE deleted_at IS NULL), + (SELECT COUNT (1) FROM messages m JOIN msg_queues q USING recipient_id WHERE deleted_at IS NULL) + |] + where + st = dbStore $ queueStore_ ms + toMessageStats (storedQueues, storedMsgsCount) = + MessageStats {storedQueues, storedMsgsCount, expiredMsgsCount = 0} + +-- TODO [messages] update counts +batchInsertMessages :: StoreQueueClass q => Bool -> FilePath -> PostgresQueueStore q -> IO Int64 +batchInsertMessages tty f toStore = do + putStrLn "Importing messages..." + let st = dbStore toStore + (_, inserted) <- + withTransaction st $ \db -> do + DB.copy_ + db + [sql| + COPY messages (recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body) + FROM STDIN WITH (FORMAT CSV) + |] + foldLogLines tty f (putMessage db) (0 :: Int, 0) >>= (DB.putCopyEnd db $>) + Only mCnt : _ <- withTransaction st (`DB.query_` "SELECT count(*) FROM messages") + unless (inserted == mCnt) $ putStrLn $ "WARNING: inserted " <> show inserted <> " rows, table has " <> show mCnt <> " messages." + pure inserted + where + putMessage db (!i, !cnt) _eof s = do + let i' = i + 1 + cnt' <- case strDecode s of + Right (MLRv3 rId msg) -> (cnt + 1) <$ DB.putCopyData db (messageRecToText rId msg) + Left e -> cnt <$ putStrLn ("Error parsing line " <> show i' <> ": " <> e) + pure (i', cnt') + +messageRecToText :: RecipientId -> Message -> B.ByteString +messageRecToText rId msg = + LB.toStrict $ BB.toLazyByteString $ mconcat tabFields <> BB.char7 '\n' + where + tabFields = BB.char7 ',' `intersperse` fields + fields = + [ renderField (toField rId), + renderField (toField $ Binary (messageId msg)), + renderField (toField $ systemSeconds (messageTs msg)), + renderField (toField msgQuota), + renderField (toField ntf), + renderField (toField $ Binary body) + ] + (msgQuota, ntf, body) = case msg of + Message {msgFlags = MsgFlags ntf', msgBody = C.MaxLenBS body'} -> (False, ntf', body') + MessageQuota {} -> (True, False, B.empty) diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 837e0481d..73e1bf398 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -57,18 +57,16 @@ data STMStoreConfig = STMStoreConfig } instance StoreQueueClass STMQueue where - type MsgQueue STMQueue = STMMsgQueue recipientId = recipientId' {-# INLINE recipientId #-} queueRec = queueRec' {-# INLINE queueRec #-} - msgQueue = msgQueue' - {-# INLINE msgQueue #-} withQueueLock _ _ = id {-# INLINE withQueueLock #-} instance MsgStoreClass STMMsgStore where type StoreMonad STMMsgStore = STM + type MsgQueue STMMsgStore = STMMsgQueue type QueueStore STMMsgStore = STMQueueStore STMQueue type StoreQueue STMMsgStore = STMQueue type MsgStoreConfig STMMsgStore = STMStoreConfig @@ -129,10 +127,10 @@ instance MsgStoreClass STMMsgStore where Nothing -> pure (Nothing, 0) deleteQueue :: STMMsgStore -> STMQueue -> IO (Either ErrorType QueueRec) - deleteQueue ms q = fst <$$> deleteStoreQueue (queueStore_ ms) q + deleteQueue ms q = fst <$$> deleteQueue_ ms q deleteQueueSize :: STMMsgStore -> STMQueue -> IO (Either ErrorType (QueueRec, Int)) - deleteQueueSize ms q = deleteStoreQueue (queueStore_ ms) q >>= mapM (traverse getSize) + deleteQueueSize ms q = deleteQueue_ ms q >>= mapM (traverse getSize) -- traverse operates on the second tuple element where getSize = maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size) @@ -179,10 +177,15 @@ instance MsgStoreClass STMMsgStore where Just _ -> modifyTVar' size (subtract 1) _ -> pure () - isolateQueue :: STMQueue -> Text -> STM a -> ExceptT ErrorType IO a - isolateQueue _ _ = liftIO . atomically + isolateQueue :: STMMsgStore -> STMQueue -> Text -> STM a -> ExceptT ErrorType IO a + isolateQueue _ _ _ = liftIO . atomically {-# INLINE isolateQueue #-} unsafeRunStore :: STMQueue -> Text -> STM a -> IO a unsafeRunStore _ _ = atomically {-# INLINE unsafeRunStore #-} + +deleteQueue_ :: STMMsgStore -> STMQueue -> IO (Either ErrorType (QueueRec, Maybe STMMsgQueue)) +deleteQueue_ ms q = deleteStoreQueue (queueStore_ ms) q >>= mapM remove + where + remove qr = (qr,) <$> atomically (swapTVar (msgQueue' q) Nothing) diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index fbb2e194b..98c12d4be 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -34,6 +34,7 @@ import Simplex.Messaging.Util ((<$$>), ($>>=)) class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => MsgStoreClass s where type StoreMonad s = (m :: Type -> Type) | m -> s type MsgStoreConfig s = c | c -> s + type MsgQueue s = q | q -> s type StoreQueue s = q | q -> s type QueueStore s = qs | qs -> s newMsgStore :: MsgStoreConfig s -> IO s @@ -51,29 +52,62 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M -- message store methods mkQueue :: s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s) - getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue (StoreQueue s)) - getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue (StoreQueue s), Message)) + getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s) + getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message)) -- the journal queue will be closed after action if it was initially closed or idle longer than interval in config - withIdleMsgQueue :: Int64 -> s -> StoreQueue s -> (MsgQueue (StoreQueue s) -> StoreMonad s a) -> StoreMonad s (Maybe a, Int) + withIdleMsgQueue :: Int64 -> s -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int) deleteQueue :: s -> StoreQueue s -> IO (Either ErrorType QueueRec) deleteQueueSize :: s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int)) - getQueueMessages_ :: Bool -> StoreQueue s -> MsgQueue (StoreQueue s) -> StoreMonad s [Message] + getQueueMessages_ :: Bool -> StoreQueue s -> MsgQueue s -> StoreMonad s [Message] writeMsg :: s -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running - getQueueSize_ :: MsgQueue (StoreQueue s) -> StoreMonad s Int - tryPeekMsg_ :: StoreQueue s -> MsgQueue (StoreQueue s) -> StoreMonad s (Maybe Message) - tryDeleteMsg_ :: StoreQueue s -> MsgQueue (StoreQueue s) -> Bool -> StoreMonad s () - isolateQueue :: StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a + getQueueSize_ :: MsgQueue s -> StoreMonad s Int + tryPeekMsg_ :: StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message) + tryDeleteMsg_ :: StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s () + isolateQueue :: s -> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a unsafeRunStore :: StoreQueue s -> Text -> StoreMonad s a -> IO a -data MSType = MSMemory | MSJournal + -- default implementations are overridden for PostgreSQL storage of messages + tryPeekMsg :: s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message) + tryPeekMsg st q = snd <$$> withPeekMsgQueue st q "tryPeekMsg" pure + {-# INLINE tryPeekMsg #-} + + tryDelMsg :: s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message) + tryDelMsg st q msgId' = + withPeekMsgQueue st q "tryDelMsg" $ + maybe (pure Nothing) $ \(mq, msg) -> + if + | messageId msg == msgId' -> + tryDeleteMsg_ q mq True $> Just msg + | otherwise -> pure Nothing + + -- atomic delete (== read) last and peek next message if available + tryDelPeekMsg :: s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message) + tryDelPeekMsg st q msgId' = + withPeekMsgQueue st q "tryDelPeekMsg" $ + maybe (pure (Nothing, Nothing)) $ \(mq, msg) -> + if + | messageId msg == msgId' -> (Just msg,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq) + | otherwise -> pure (Nothing, Just msg) + + deleteExpiredMsgs :: s -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int + deleteExpiredMsgs st q old = + isolateQueue st q "deleteExpiredMsgs" $ + getMsgQueue st q False >>= deleteExpireMsgs_ old q + + getQueueSize :: s -> StoreQueue s -> ExceptT ErrorType IO Int + getQueueSize st q = withPeekMsgQueue st q "getQueueSize" $ maybe (pure 0) (getQueueSize_ . fst) + {-# INLINE getQueueSize #-} + +data MSType = MSMemory | MSJournal | MSPostgres data QSType = QSMemory | QSPostgres data SMSType :: MSType -> Type where SMSMemory :: SMSType 'MSMemory SMSJournal :: SMSType 'MSJournal + SMSPostgres :: SMSType 'MSPostgres data SQSType :: QSType -> Type where SQSMemory :: SQSType 'QSMemory @@ -127,48 +161,19 @@ readQueueRec :: StoreQueueClass q => q -> IO (Either ErrorType (q, QueueRec)) readQueueRec q = maybe (Left AUTH) (Right . (q,)) <$> readTVarIO (queueRec q) {-# INLINE readQueueRec #-} -getQueueSize :: MsgStoreClass s => s -> StoreQueue s -> ExceptT ErrorType IO Int -getQueueSize st q = withPeekMsgQueue st q "getQueueSize" $ maybe (pure 0) (getQueueSize_ . fst) -{-# INLINE getQueueSize #-} - -tryPeekMsg :: MsgStoreClass s => s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message) -tryPeekMsg st q = snd <$$> withPeekMsgQueue st q "tryPeekMsg" pure -{-# INLINE tryPeekMsg #-} - -tryDelMsg :: MsgStoreClass s => s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message) -tryDelMsg st q msgId' = - withPeekMsgQueue st q "tryDelMsg" $ - maybe (pure Nothing) $ \(mq, msg) -> - if - | messageId msg == msgId' -> - tryDeleteMsg_ q mq True $> Just msg - | otherwise -> pure Nothing - --- atomic delete (== read) last and peek next message if available -tryDelPeekMsg :: MsgStoreClass s => s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message) -tryDelPeekMsg st q msgId' = - withPeekMsgQueue st q "tryDelPeekMsg" $ - maybe (pure (Nothing, Nothing)) $ \(mq, msg) -> - if - | messageId msg == msgId' -> (Just msg,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq) - | otherwise -> pure (Nothing, Just msg) - -- The action is called with Nothing when it is known that the queue is empty -withPeekMsgQueue :: MsgStoreClass s => s -> StoreQueue s -> Text -> (Maybe (MsgQueue (StoreQueue s), Message) -> StoreMonad s a) -> ExceptT ErrorType IO a -withPeekMsgQueue st q op a = isolateQueue q op $ getPeekMsgQueue st q >>= a +withPeekMsgQueue :: MsgStoreClass s => s -> StoreQueue s -> Text -> (Maybe (MsgQueue s, Message) -> StoreMonad s a) -> ExceptT ErrorType IO a +withPeekMsgQueue st q op a = isolateQueue st q op $ getPeekMsgQueue st q >>= a {-# INLINE withPeekMsgQueue #-} -deleteExpiredMsgs :: MsgStoreClass s => s -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int -deleteExpiredMsgs st q old = - isolateQueue q "deleteExpiredMsgs" $ - getMsgQueue st q False >>= deleteExpireMsgs_ old q - +-- not used with PostgreSQL message store expireQueueMsgs :: MsgStoreClass s => s -> Int64 -> Int64 -> StoreQueue s -> StoreMonad s MessageStats expireQueueMsgs st now old q = do (expired_, stored) <- withIdleMsgQueue now st q $ deleteExpireMsgs_ old q pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1} -deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue (StoreQueue s) -> StoreMonad s Int +-- not used with PostgreSQL message store +deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int deleteExpireMsgs_ old q mq = do n <- loop 0 logQueueState q diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index ff76759d8..68142eab2 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -28,7 +28,10 @@ module Simplex.Messaging.Server.QueueStore.Postgres foldRecentQueueRecs, handleDuplicate, withLog_, + withDB, withDB', + assertUpdated, + renderField, ) where @@ -84,7 +87,7 @@ import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (SMPServiceRole (..)) -import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>)) +import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, maybeFirstRow', tshow, (<$$>)) import System.Exit (exitFailure) import System.IO (IOMode (..), hFlush, stdout) import UnliftIO.STM @@ -409,7 +412,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where rId = recipientId sq -- this method is called from JournalMsgStore deleteQueue that already locks the queue - deleteStoreQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q))) + deleteStoreQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType QueueRec) deleteStoreQueue st sq = E.uninterruptibleMask_ $ runExceptT $ do q <- ExceptT $ readQueueRecIO qr RoundedSystemTime ts <- liftIO getSystemDate @@ -420,9 +423,8 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where forM_ (notifier q) $ \NtfCreds {notifierId} -> do atomically $ TM.delete notifierId $ notifiers st atomically $ TM.delete notifierId $ notifierLocks st - mq_ <- atomically $ swapTVar (msgQueue sq) Nothing withLog "deleteStoreQueue" st (`logDeleteQueue` rId) - pure (q, mq_) + pure q where rId = recipientId sq qr = queueRec sq @@ -488,7 +490,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where getServiceQueueCount :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64) getServiceQueueCount st party serviceId = E.uninterruptibleMask_ $ runExceptT $ withDB' "getServiceQueueCount" st $ \db -> - fmap (fromMaybe 0) $ maybeFirstRow fromOnly $ + maybeFirstRow' 0 fromOnly $ DB.query db query (Only serviceId) where query = case party of @@ -641,13 +643,14 @@ queueRecToText (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey, (linkId_, queueData_) = queueDataColumns queueData nullable :: ToField a => Maybe a -> Builder nullable = maybe mempty (renderField . toField) - renderField :: Action -> Builder - renderField = \case - Plain bld -> bld - Escape s -> BB.byteString s - EscapeByteA s -> BB.string7 "\\x" <> BB.byteStringHex s - EscapeIdentifier s -> BB.byteString s -- Not used in COPY data - Many as -> mconcat (map renderField as) + +renderField :: Action -> Builder +renderField = \case + Plain bld -> bld + Escape s -> BB.byteString s + EscapeByteA s -> BB.string7 "\\x" <> BB.byteStringHex s + EscapeIdentifier s -> BB.byteString s -- Not used in COPY data + Many as -> mconcat (map renderField as) queueDataColumns :: Maybe (LinkId, QueueLinkData) -> (Maybe LinkId, Maybe QueueLinkData) queueDataColumns = \case diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs index e8469d1cc..ae150614c 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs @@ -14,7 +14,8 @@ serverSchemaMigrations = [ ("20250207_initial", m20250207_initial, Nothing), ("20250319_updated_index", m20250319_updated_index, Just down_m20250319_updated_index), ("20250320_short_links", m20250320_short_links, Just down_m20250320_short_links), - ("20250514_service_certs", m20250514_service_certs, Just down_m20250514_service_certs) + ("20250514_service_certs", m20250514_service_certs, Just down_m20250514_service_certs), + ("20250903_store_messages", m20250903_store_messages, Just down_m20250903_store_messages) ] -- | The list of migrations in ascending order by date @@ -159,3 +160,239 @@ DROP INDEX idx_services_service_role; DROP TABLE services; |] + +m20250903_store_messages :: Text +m20250903_store_messages = + T.pack + [r| +CREATE TABLE messages( + message_id BIGINT NOT NULL PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + recipient_id BYTEA NOT NULL REFERENCES msg_queues ON DELETE CASCADE ON UPDATE RESTRICT, + msg_id BYTEA NOT NULL, + msg_ts BIGINT NOT NULL, + msg_quota BOOLEAN NOT NULL, + msg_ntf_flag BOOLEAN NOT NULL, + msg_body BYTEA NOT NULL +); + +ALTER TABLE msg_queues + ADD COLUMN msg_can_write BOOLEAN NOT NULL DEFAULT TRUE, + ADD COLUMN msg_queue_size BIGINT NOT NULL DEFAULT 0; + +CREATE INDEX idx_messages_recipient_id_message_id ON messages (recipient_id, message_id); +CREATE INDEX idx_messages_recipient_id_msg_ts on messages(recipient_id, msg_ts); +CREATE INDEX idx_messages_recipient_id_msg_quota on messages(recipient_id, msg_quota); + +CREATE FUNCTION write_message( + p_recipient_id BYTEA, + p_msg_id BYTEA, + p_msg_ts BIGINT, + p_msg_quota BOOLEAN, + p_msg_ntf_flag BOOLEAN, + p_msg_body BYTEA, + p_quota INT +) +RETURNS TABLE (quota_written BOOLEAN, was_empty BOOLEAN) +LANGUAGE plpgsql AS $$ +DECLARE + q_can_write BOOLEAN; + q_size BIGINT; +BEGIN + SELECT msg_can_write, msg_queue_size INTO q_can_write, q_size + FROM msg_queues + WHERE recipient_id = p_recipient_id AND deleted_at IS NULL + FOR UPDATE; + + IF q_can_write OR q_size = 0 THEN + quota_written := p_msg_quota OR q_size >= p_quota; + was_empty := q_size = 0; + + INSERT INTO messages(recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body) + VALUES (p_recipient_id, p_msg_id, p_msg_ts, quota_written, p_msg_ntf_flag AND NOT quota_written, CASE WHEN quota_written THEN '' :: BYTEA ELSE p_msg_body END); + + UPDATE msg_queues + SET msg_can_write = NOT quota_written, + msg_queue_size = msg_queue_size + 1 + WHERE recipient_id = p_recipient_id; + + RETURN QUERY VALUES (quota_written, was_empty); + END IF; +END; +$$; + +CREATE FUNCTION try_del_msg(p_recipient_id BYTEA, p_msg_id BYTEA) +RETURNS TABLE (r_msg_id BYTEA, r_msg_ts BIGINT, r_msg_quota BOOLEAN, r_msg_ntf_flag BOOLEAN, r_msg_body BYTEA) +LANGUAGE plpgsql AS $$ +DECLARE + q_size BIGINT; + msg RECORD; +BEGIN + SELECT msg_queue_size INTO q_size + FROM msg_queues + WHERE recipient_id = p_recipient_id AND deleted_at IS NULL + FOR UPDATE; + + IF FOUND THEN + SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg + FROM messages + WHERE recipient_id = p_recipient_id + ORDER BY message_id ASC LIMIT 1; + + IF FOUND AND msg.msg_id = p_msg_id THEN + DELETE FROM messages WHERE message_id = msg.message_id; + IF FOUND THEN + CALL dec_msg_count(p_recipient_id, q_size, 1); + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); + END IF; + END IF; + END IF; +END; +$$; + +CREATE FUNCTION try_del_peek_msg(p_recipient_id BYTEA, p_msg_id BYTEA) +RETURNS TABLE (r_msg_id BYTEA, r_msg_ts BIGINT, r_msg_quota BOOLEAN, r_msg_ntf_flag BOOLEAN, r_msg_body BYTEA) +LANGUAGE plpgsql AS $$ +DECLARE + q_size BIGINT; + msg RECORD; +BEGIN + SELECT msg_queue_size INTO q_size + FROM msg_queues + WHERE recipient_id = p_recipient_id AND deleted_at IS NULL + FOR UPDATE; + + IF FOUND THEN + SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg + FROM messages + WHERE recipient_id = p_recipient_id + ORDER BY message_id ASC LIMIT 1; + + IF FOUND THEN + IF msg.msg_id = p_msg_id THEN + DELETE FROM messages WHERE message_id = msg.message_id; + + IF FOUND THEN + CALL dec_msg_count(p_recipient_id, q_size, 1); + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); + END IF; + + RETURN QUERY ( + SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body + FROM messages + WHERE recipient_id = p_recipient_id + ORDER BY message_id ASC LIMIT 1 + ); + ELSE + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); + END IF; + END IF; + END IF; +END; +$$; + +CREATE FUNCTION delete_expired_msgs(p_recipient_id BYTEA, p_old_ts BIGINT) RETURNS BIGINT +LANGUAGE plpgsql AS $$ +DECLARE + q_size BIGINT; + min_id BIGINT; + del_count BIGINT; +BEGIN + SELECT msg_queue_size INTO q_size + FROM msg_queues + WHERE recipient_id = p_recipient_id AND deleted_at IS NULL + FOR UPDATE SKIP LOCKED; + + IF NOT FOUND OR q_size = 0 THEN + RETURN 0; + END IF; + + SELECT LEAST( -- ignores NULLs + (SELECT MIN(message_id) FROM messages WHERE recipient_id = p_recipient_id AND msg_ts >= p_old_ts), + (SELECT MIN(message_id) FROM messages WHERE recipient_id = p_recipient_id AND msg_quota = TRUE) + ) INTO min_id; + + IF min_id IS NULL THEN + DELETE FROM messages WHERE recipient_id = p_recipient_id; + ELSE + DELETE FROM messages WHERE recipient_id = p_recipient_id AND message_id < min_id; + END IF; + + GET DIAGNOSTICS del_count = ROW_COUNT; + IF del_count > 0 THEN + CALL dec_msg_count(p_recipient_id, q_size, del_count); + END IF; + RETURN del_count; +END; +$$; + +CREATE PROCEDURE expire_old_messages( + p_now_ts BIGINT, + p_ttl BIGINT, + OUT r_expired_msgs_count BIGINT, + OUT r_stored_msgs_count BIGINT, + OUT r_stored_queues BIGINT +) +LANGUAGE plpgsql AS $$ +DECLARE + old_ts BIGINT := p_now_ts - p_ttl; + very_old_ts BIGINT := p_now_ts - 2 * p_ttl - 86400; + rid BYTEA; + min_id BIGINT; + q_size BIGINT; + del_count BIGINT; + total_deleted BIGINT := 0; +BEGIN + FOR rid IN + SELECT recipient_id + FROM msg_queues + WHERE deleted_at IS NULL AND updated_at > very_old_ts + LOOP + BEGIN -- sub-transaction for each queue + del_count := delete_expired_msgs(rid, old_ts); + total_deleted := total_deleted + del_count; + EXCEPTION WHEN OTHERS THEN + ROLLBACK; + RAISE WARNING 'STORE, expire_old_messages, error expiring queue %: %', encode(rid, 'base64'), SQLERRM; + CONTINUE; + END; + COMMIT; + END LOOP; + + r_expired_msgs_count := total_deleted; + r_stored_msgs_count := (SELECT COUNT(1) FROM messages); + r_stored_queues := (SELECT COUNT(1) FROM msg_queues WHERE deleted_at IS NULL); +END; +$$; + +CREATE PROCEDURE dec_msg_count(p_recipient_id BYTEA, p_size BIGINT, p_change BIGINT) +LANGUAGE plpgsql AS $$ +BEGIN + UPDATE msg_queues + SET msg_can_write = msg_can_write OR p_size <= p_change, + msg_queue_size = GREATEST(p_size - p_change, 0) + WHERE recipient_id = p_recipient_id; +END; +$$; + |] + +down_m20250903_store_messages :: Text +down_m20250903_store_messages = + T.pack + [r| +DROP FUNCTION write_message; +DROP FUNCTION try_del_msg; +DROP FUNCTION try_del_peek_msg; +DROP FUNCTION delete_expired_msgs; +DROP PROCEDURE expire_old_messages; +DROP PROCEDURE dec_msg_count; + +DROP INDEX idx_messages_recipient_id_message_id; +DROP INDEX idx_messages_recipient_id_msg_ts; +DROP INDEX idx_messages_recipient_id_msg_quota; + +ALTER TABLE msg_queues + DROP COLUMN msg_can_write, + DROP COLUMN msg_queue_size; + +DROP TABLE messages; + |] diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql b/src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql index 6c0501d8b..db49006e2 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql @@ -15,9 +15,224 @@ SET row_security = off; CREATE SCHEMA smp_server; + +CREATE PROCEDURE smp_server.dec_msg_count(IN p_recipient_id bytea, IN p_size bigint, IN p_change bigint) + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE msg_queues + SET msg_can_write = msg_can_write OR p_size <= p_change, + msg_queue_size = GREATEST(p_size - p_change, 0) + WHERE recipient_id = p_recipient_id; +END; +$$; + + + +CREATE FUNCTION smp_server.delete_expired_msgs(p_recipient_id bytea, p_old_ts bigint) RETURNS bigint + LANGUAGE plpgsql + AS $$ +DECLARE + q_size BIGINT; + min_id BIGINT; + del_count BIGINT; +BEGIN + SELECT msg_queue_size INTO q_size + FROM msg_queues + WHERE recipient_id = p_recipient_id AND deleted_at IS NULL + FOR UPDATE SKIP LOCKED; + + IF NOT FOUND OR q_size = 0 THEN + RETURN 0; + END IF; + + SELECT LEAST( -- ignores NULLs + (SELECT MIN(message_id) FROM messages WHERE recipient_id = p_recipient_id AND msg_ts >= p_old_ts), + (SELECT MIN(message_id) FROM messages WHERE recipient_id = p_recipient_id AND msg_quota = TRUE) + ) INTO min_id; + + IF min_id IS NULL THEN + DELETE FROM messages WHERE recipient_id = p_recipient_id; + ELSE + DELETE FROM messages WHERE recipient_id = p_recipient_id AND message_id < min_id; + END IF; + + GET DIAGNOSTICS del_count = ROW_COUNT; + IF del_count > 0 THEN + CALL dec_msg_count(p_recipient_id, q_size, del_count); + END IF; + RETURN del_count; +END; +$$; + + + +CREATE PROCEDURE smp_server.expire_old_messages(IN p_now_ts bigint, IN p_ttl bigint, OUT r_expired_msgs_count bigint, OUT r_stored_msgs_count bigint, OUT r_stored_queues bigint) + LANGUAGE plpgsql + AS $$ +DECLARE + old_ts BIGINT := p_now_ts - p_ttl; + very_old_ts BIGINT := p_now_ts - 2 * p_ttl - 86400; + rid BYTEA; + min_id BIGINT; + q_size BIGINT; + del_count BIGINT; + total_deleted BIGINT := 0; +BEGIN + FOR rid IN + SELECT recipient_id + FROM msg_queues + WHERE deleted_at IS NULL AND updated_at > very_old_ts + LOOP + BEGIN -- sub-transaction for each queue + del_count := delete_expired_msgs(rid, old_ts); + total_deleted := total_deleted + del_count; + EXCEPTION WHEN OTHERS THEN + ROLLBACK; + RAISE WARNING 'STORE, expire_old_messages, error expiring queue %: %', encode(rid, 'base64'), SQLERRM; + CONTINUE; + END; + COMMIT; + END LOOP; + + r_expired_msgs_count := total_deleted; + r_stored_msgs_count := (SELECT COUNT(1) FROM messages); + r_stored_queues := (SELECT COUNT(1) FROM msg_queues WHERE deleted_at IS NULL); +END; +$$; + + + +CREATE FUNCTION smp_server.try_del_msg(p_recipient_id bytea, p_msg_id bytea) RETURNS TABLE(r_msg_id bytea, r_msg_ts bigint, r_msg_quota boolean, r_msg_ntf_flag boolean, r_msg_body bytea) + LANGUAGE plpgsql + AS $$ +DECLARE + q_size BIGINT; + msg RECORD; +BEGIN + SELECT msg_queue_size INTO q_size + FROM msg_queues + WHERE recipient_id = p_recipient_id AND deleted_at IS NULL + FOR UPDATE; + + IF FOUND THEN + SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg + FROM messages + WHERE recipient_id = p_recipient_id + ORDER BY message_id ASC LIMIT 1; + + IF FOUND AND msg.msg_id = p_msg_id THEN + DELETE FROM messages WHERE message_id = msg.message_id; + IF FOUND THEN + CALL dec_msg_count(p_recipient_id, q_size, 1); + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); + END IF; + END IF; + END IF; +END; +$$; + + + +CREATE FUNCTION smp_server.try_del_peek_msg(p_recipient_id bytea, p_msg_id bytea) RETURNS TABLE(r_msg_id bytea, r_msg_ts bigint, r_msg_quota boolean, r_msg_ntf_flag boolean, r_msg_body bytea) + LANGUAGE plpgsql + AS $$ +DECLARE + q_size BIGINT; + msg RECORD; +BEGIN + SELECT msg_queue_size INTO q_size + FROM msg_queues + WHERE recipient_id = p_recipient_id AND deleted_at IS NULL + FOR UPDATE; + + IF FOUND THEN + SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg + FROM messages + WHERE recipient_id = p_recipient_id + ORDER BY message_id ASC LIMIT 1; + + IF FOUND THEN + IF msg.msg_id = p_msg_id THEN + DELETE FROM messages WHERE message_id = msg.message_id; + + IF FOUND THEN + CALL dec_msg_count(p_recipient_id, q_size, 1); + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); + END IF; + + RETURN QUERY ( + SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body + FROM messages + WHERE recipient_id = p_recipient_id + ORDER BY message_id ASC LIMIT 1 + ); + ELSE + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); + END IF; + END IF; + END IF; +END; +$$; + + + +CREATE FUNCTION smp_server.write_message(p_recipient_id bytea, p_msg_id bytea, p_msg_ts bigint, p_msg_quota boolean, p_msg_ntf_flag boolean, p_msg_body bytea, p_quota integer) RETURNS TABLE(quota_written boolean, was_empty boolean) + LANGUAGE plpgsql + AS $$ +DECLARE + q_can_write BOOLEAN; + q_size BIGINT; +BEGIN + SELECT msg_can_write, msg_queue_size INTO q_can_write, q_size + FROM msg_queues + WHERE recipient_id = p_recipient_id AND deleted_at IS NULL + FOR UPDATE; + + IF q_can_write OR q_size = 0 THEN + quota_written := p_msg_quota OR q_size >= p_quota; + was_empty := q_size = 0; + + INSERT INTO messages(recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body) + VALUES (p_recipient_id, p_msg_id, p_msg_ts, quota_written, p_msg_ntf_flag AND NOT quota_written, CASE WHEN quota_written THEN '' :: BYTEA ELSE p_msg_body END); + + UPDATE msg_queues + SET msg_can_write = NOT quota_written, + msg_queue_size = msg_queue_size + 1 + WHERE recipient_id = p_recipient_id; + + RETURN QUERY VALUES (quota_written, was_empty); + END IF; +END; +$$; + + SET default_table_access_method = heap; +CREATE TABLE smp_server.messages ( + message_id bigint NOT NULL, + recipient_id bytea NOT NULL, + msg_id bytea NOT NULL, + msg_ts bigint NOT NULL, + msg_quota boolean NOT NULL, + msg_ntf_flag boolean NOT NULL, + msg_body bytea NOT NULL +); + + + +ALTER TABLE smp_server.messages ALTER COLUMN message_id ADD GENERATED ALWAYS AS IDENTITY ( + SEQUENCE NAME smp_server.messages_message_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1 +); + + + CREATE TABLE smp_server.migrations ( name text NOT NULL, ts timestamp without time zone NOT NULL, @@ -43,7 +258,9 @@ CREATE TABLE smp_server.msg_queues ( fixed_data bytea, user_data bytea, rcv_service_id bytea, - ntf_service_id bytea + ntf_service_id bytea, + msg_can_write boolean DEFAULT true NOT NULL, + msg_queue_size bigint DEFAULT 0 NOT NULL ); @@ -58,6 +275,11 @@ CREATE TABLE smp_server.services ( +ALTER TABLE ONLY smp_server.messages + ADD CONSTRAINT messages_pkey PRIMARY KEY (message_id); + + + ALTER TABLE ONLY smp_server.migrations ADD CONSTRAINT migrations_pkey PRIMARY KEY (name); @@ -78,6 +300,18 @@ ALTER TABLE ONLY smp_server.services +CREATE INDEX idx_messages_recipient_id_message_id ON smp_server.messages USING btree (recipient_id, message_id); + + + +CREATE INDEX idx_messages_recipient_id_msg_quota ON smp_server.messages USING btree (recipient_id, msg_quota); + + + +CREATE INDEX idx_messages_recipient_id_msg_ts ON smp_server.messages USING btree (recipient_id, msg_ts); + + + CREATE UNIQUE INDEX idx_msg_queues_link_id ON smp_server.msg_queues USING btree (link_id); @@ -106,6 +340,11 @@ CREATE INDEX idx_services_service_role ON smp_server.services USING btree (servi +ALTER TABLE ONLY smp_server.messages + ADD CONSTRAINT messages_recipient_id_fkey FOREIGN KEY (recipient_id) REFERENCES smp_server.msg_queues(recipient_id) ON UPDATE RESTRICT ON DELETE CASCADE; + + + ALTER TABLE ONLY smp_server.msg_queues ADD CONSTRAINT msg_queues_ntf_service_id_fkey FOREIGN KEY (ntf_service_id) REFERENCES smp_server.services(service_id) ON UPDATE RESTRICT ON DELETE SET NULL; diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 0cbd33b0c..515a0ee77 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -228,7 +228,7 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where deleteQueueNotifier :: STMQueueStore q -> q -> IO (Either ErrorType (Maybe NtfCreds)) deleteQueueNotifier st sq = withQueueRec qr delete - $>>= \nc_ -> nc_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId sq) + $>>= (<$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId sq)) where qr = queueRec sq delete q = forM (notifier q) $ \nc -> do @@ -264,11 +264,10 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where | changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId sq) t) | otherwise = pure $ Right q - deleteStoreQueue :: STMQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q))) + deleteStoreQueue :: STMQueueStore q -> q -> IO (Either ErrorType QueueRec) deleteStoreQueue st sq = withQueueRec qr delete - $>>= \q -> withLog "deleteStoreQueue" st (`logDeleteQueue` rId) - >>= mapM (\_ -> (q,) <$> atomically (swapTVar (msgQueue sq) Nothing)) + $>>= (<$$ withLog "deleteStoreQueue" st (`logDeleteQueue` rId)) where rId = recipientId sq qr = queueRec sq diff --git a/src/Simplex/Messaging/Server/QueueStore/Types.hs b/src/Simplex/Messaging/Server/QueueStore/Types.hs index 55be4d21d..ee155cf91 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Types.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Types.hs @@ -17,10 +17,8 @@ import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.TMap (TMap) class StoreQueueClass q where - type MsgQueue q = mq | mq -> q recipientId :: q -> RecipientId queueRec :: q -> TVar (Maybe QueueRec) - msgQueue :: q -> TVar (Maybe (MsgQueue q)) withQueueLock :: q -> Text -> IO a -> IO a class StoreQueueClass q => QueueStoreClass q s where @@ -44,7 +42,7 @@ class StoreQueueClass q => QueueStoreClass q s where blockQueue :: s -> q -> BlockingInfo -> IO (Either ErrorType ()) unblockQueue :: s -> q -> IO (Either ErrorType ()) updateQueueTime :: s -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec) - deleteStoreQueue :: s -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q))) + deleteStoreQueue :: s -> q -> IO (Either ErrorType QueueRec) getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId) setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ()) getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) diff --git a/src/Simplex/Messaging/Util.hs b/src/Simplex/Messaging/Util.hs index f93119b3c..1fcee0783 100644 --- a/src/Simplex/Messaging/Util.hs +++ b/src/Simplex/Messaging/Util.hs @@ -211,6 +211,9 @@ firstRow f e a = second f . listToEither e <$> a maybeFirstRow :: Functor f => (a -> b) -> f [a] -> f (Maybe b) maybeFirstRow f q = fmap f . listToMaybe <$> q +maybeFirstRow' :: Functor f => b -> (a -> b) -> f [a] -> f b +maybeFirstRow' def f q = maybe def f . listToMaybe <$> q + firstRow' :: (a -> Either e b) -> e -> IO [a] -> IO (Either e b) firstRow' f e a = (f <=< listToEither e) <$> a diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index d1d0eb344..7357cb3f5 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -114,6 +114,7 @@ import Test.Hspec hiding (fit, it) import UnliftIO import Util import XFTPClient (testXFTPServer) + #if defined(dbPostgres) import Fixtures #endif @@ -122,6 +123,7 @@ import qualified Database.PostgreSQL.Simple as PSQL import Simplex.Messaging.Agent.Store (Connection (..), StoredRcvQueue (..), SomeConn (..)) import Simplex.Messaging.Agent.Store.AgentStore (getConn) import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue) +import Simplex.Messaging.Server.MsgStore.Postgres (PostgresQueue) import Simplex.Messaging.Server.MsgStore.Types (QSType (..)) import Simplex.Messaging.Server.QueueStore.Postgres import Simplex.Messaging.Server.QueueStore.Types (QueueStoreClass (..)) @@ -1524,20 +1526,29 @@ testOldContactQueueShortLink ps@(_, msType) = withAgentClients2 $ \a b -> do A.createConnection a NRMInteractive 1 True SCMContact Nothing Nothing CR.IKPQOn SMOnlyCreate -- make it an "old" queue let updateStoreLog f = replaceSubstringInFile f " queue_mode=C" "" - () <- case testServerStoreConfig msType of - ASSCfg _ _ (SSCMemory (Just StorePaths {storeLogFile})) -> updateStoreLog storeLogFile - ASSCfg _ _ (SSCMemoryJournal {storeLogFile}) -> updateStoreLog storeLogFile - ASSCfg _ _ (SSCDatabaseJournal {storeCfg}) -> do #if defined(dbServerPostgres) - let AgentClient {agentEnv = Env {store}} = a - Right (SomeConn _ (ContactConnection _ RcvQueue {rcvId})) <- withTransaction store (`getConn` contactId) - st :: PostgresQueueStore (JournalQueue 'QSPostgres) <- newQueueStore @(JournalQueue 'QSPostgres) storeCfg - Right 1 <- runExceptT $ withDB' "test" st $ \db -> PSQL.execute db "UPDATE msg_queues SET queue_mode = ? WHERE recipient_id = ?" (Nothing :: Maybe QueueMode, rcvId) - closeQueueStore @(JournalQueue 'QSPostgres) st -#else - error "no dbServerPostgres flag" + updateDbStore :: PostgresQueueStore s -> IO () + updateDbStore st = do + let AgentClient {agentEnv = Env {store}} = a + Right (SomeConn _ (ContactConnection _ RcvQueue {rcvId})) <- withTransaction store (`getConn` contactId) + Right 1 <- runExceptT $ withDB' "test" st $ \db -> PSQL.execute db "UPDATE msg_queues SET queue_mode = ? WHERE recipient_id = ?" (Nothing :: Maybe QueueMode, rcvId) + pure () +#endif + () <- case testServerStoreConfig msType of + ASSCfg _ _ (SSCMemory sp_) -> mapM_ (\StorePaths {storeLogFile} -> updateStoreLog storeLogFile) sp_ + ASSCfg _ _ SSCMemoryJournal {storeLogFile} -> updateStoreLog storeLogFile +#if defined(dbServerPostgres) + ASSCfg _ _ SSCDatabaseJournal {storeCfg} -> do + st :: PostgresQueueStore (JournalQueue 'QSPostgres) <- newQueueStore @(JournalQueue 'QSPostgres) storeCfg + updateDbStore st + closeQueueStore @(JournalQueue 'QSPostgres) st + ASSCfg _ _ (SSCDatabase storeCfg) -> do + st :: PostgresQueueStore PostgresQueue <- newQueueStore @PostgresQueue storeCfg + updateDbStore st + closeQueueStore @PostgresQueue st +#else + ASSCfg _ _ SSCDatabaseJournal {} -> error "no dbServerPostgres flag" #endif - _ -> pure () withSmpServer ps $ do let userData = UserLinkData "some user data" diff --git a/tests/CLITests.hs b/tests/CLITests.hs index c0c7c04d2..30d798ca7 100644 --- a/tests/CLITests.hs +++ b/tests/CLITests.hs @@ -10,7 +10,6 @@ import AgentTests.FunctionalAPITests (runRight_) import Control.Logger.Simple import Control.Monad import qualified Crypto.PubKey.RSA as RSA -import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy as BL import qualified Data.HashMap.Strict as HM import Data.Ini (Ini (..), lookupValue, readIniFile, writeIniFile) @@ -46,6 +45,7 @@ import UnliftIO.Exception (bracket) import Util #if defined(dbServerPostgres) +import qualified Data.ByteString.Char8 as B import qualified Database.PostgreSQL.Simple as PSQL import Database.PostgreSQL.Simple.Types (Query (..)) import NtfClient (ntfTestServerDBConnectInfo, ntfTestServerDBConnstr, ntfTestStoreDBOpts) diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 2ed0da330..dd5d22d2e 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE CPP #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} @@ -41,7 +42,6 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.QueueInfo -import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.StoreLog (closeStoreLog, logCreateQueue) import System.Directory (copyFile, createDirectoryIfMissing, listDirectory, removeFile, renameFile) import System.FilePath (()) @@ -49,28 +49,48 @@ import System.IO (IOMode (..), withFile) import Test.Hspec hiding (fit, it) import Util +#if defined(dbServerPostgres) +import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..)) +import Simplex.Messaging.Server.MsgStore.Postgres +import Simplex.Messaging.Server.QueueStore.Postgres.Config +import SMPClient (postgressBracket, testServerDBConnectInfo, testStoreDBOpts) +#endif + msgStoreTests :: Spec msgStoreTests = do around (withMsgStore testSMTStoreConfig) $ describe "STM message store" someMsgStoreTests around (withMsgStore $ testJournalStoreCfg MQStoreCfg) $ describe "Journal message store" $ do someMsgStoreTests + journalMsgStoreTests it "should export and import journal store" testExportImportStore - describe "queue state" $ do - it "should restore queue state from the last line" testQueueState - it "should recover when message is written and state is not" testMessageState - it "should remove journal files when queue is empty" testRemoveJournals - describe "missing files" $ do - it "should create read file when missing" testReadFileMissing - it "should switch to write file when read file missing" testReadFileMissingSwitch - it "should create write file when missing" testWriteFileMissing - it "should create read file when read and write files are missing" testReadAndWriteFilesMissing +#if defined(dbServerPostgres) + around_ (postgressBracket testServerDBConnectInfo) $ do + around (withMsgStore $ testJournalStoreCfg $ PQStoreCfg testPostgresStoreCfg) $ + describe "Postgres+journal message store" $ do + someMsgStoreTests + journalMsgStoreTests + around (withMsgStore testPostgresStoreConfig) $ + describe "Postgres-only message store" $ someMsgStoreTests +#endif describe "Journal message store: queue state backup expiration" $ do it "should remove old queue state backups" testRemoveQueueStateBackups it "should expire messages in idle queues" testExpireIdleQueues where + journalMsgStoreTests :: SpecWith (JournalMsgStore s) + journalMsgStoreTests = do + describe "queue state" $ do + it "should restore queue state from the last line" testQueueState + it "should recover when message is written and state is not" testMessageState + it "should remove journal files when queue is empty" testRemoveJournals + describe "missing files" $ do + it "should create read file when missing" testReadFileMissing + it "should switch to write file when read file missing" testReadFileMissingSwitch + it "should create write file when missing" testWriteFileMissing + it "should create read file when read and write files are missing" testReadAndWriteFilesMissing someMsgStoreTests :: MsgStoreClass s => SpecWith s someMsgStoreTests = do it "should get queue and store/read messages" testGetQueue + it "should write/ack messages" testWriteAckMessages it "should not fail on EOF when changing read journal" testChangeReadJournal -- TODO constrain to STM stores? @@ -95,6 +115,24 @@ testJournalStoreCfg queueStoreCfg = keepMinBackups = 1 } +#if defined(dbServerPostgres) +testPostgresStoreConfig :: PostgresMsgStoreCfg +testPostgresStoreConfig = + PostgresMsgStoreCfg + { queueStoreCfg = testPostgresStoreCfg, + quota = 3 + } + +testPostgresStoreCfg :: PostgresStoreCfg +testPostgresStoreCfg = + PostgresStoreCfg + { dbOpts = testStoreDBOpts, + dbStoreLogPath = Nothing, + confirmMigrations = MCYesUp, + deletedTTL = 86400 + } +#endif + mkMessage :: MonadIO m => ByteString -> m Message mkMessage body = liftIO $ do g <- C.newRandom @@ -137,7 +175,6 @@ testNewQueueRecData g qm queueData = do where rndId = atomically $ EntityId <$> C.randomBytes 24 g --- TODO constrain to STM stores testGetQueue :: MsgStoreClass s => s -> IO () testGetQueue ms = do g <- C.newRandom @@ -180,7 +217,28 @@ testGetQueue ms = do (Nothing, Nothing) <- tryDelPeekMsg ms q mId8 void $ ExceptT $ deleteQueue ms q --- TODO constrain to STM stores +-- TODO [messages] test concurrent writing and reading +testWriteAckMessages :: MsgStoreClass s => s -> IO () +testWriteAckMessages ms = do + g <- C.newRandom + (rId1, qr1) <- testNewQueueRec g QMMessaging + (rId2, qr2) <- testNewQueueRec g QMMessaging + runRight_ $ do + q1 <- ExceptT $ addQueue ms rId1 qr1 + q2 <- ExceptT $ addQueue ms rId2 qr2 + let write q s = writeMsg ms q True =<< mkMessage s + 0 <- deleteExpiredMsgs ms q1 0 -- won't expire anything, used here to mimic message sending with expiration on SEND + Just (Message {msgId = mId1}, True) <- write q1 "message 1" + (Msg "message 1", Nothing) <- tryDelPeekMsg ms q1 mId1 + 0 <- deleteExpiredMsgs ms q2 0 + Just (Message {msgId = mId2}, True) <- write q2 "message 2" + (Msg "message 2", Nothing) <- tryDelPeekMsg ms q2 mId2 + 0 <- deleteExpiredMsgs ms q2 0 + Just (Message {msgId = mId3}, True) <- write q2 "message 3" + (Msg "message 3", Nothing) <- tryDelPeekMsg ms q2 mId3 + void $ ExceptT $ deleteQueue ms q1 + void $ ExceptT $ deleteQueue ms q2 + testChangeReadJournal :: MsgStoreClass s => s -> IO () testChangeReadJournal ms = do g <- C.newRandom @@ -367,7 +425,7 @@ testRemoveJournals ms = do Nothing <- tryPeekMsg ms q -- still not removed, queue is empty and not opened liftIO $ journalFilesCount dir `shouldReturn` 1 - _mq <- isolateQueue q "test" $ getMsgQueue ms q False + _mq <- isolateQueue ms q "test" $ getMsgQueue ms q False -- journal is removed liftIO $ journalFilesCount dir `shouldReturn` 0 liftIO $ stateBackupCount dir `shouldReturn` 1 @@ -467,7 +525,7 @@ testExpireIdleQueues = do old <- expireBeforeEpoch ExpirationConfig {ttl = 1, checkInterval = 1} -- no old messages now <- systemSeconds <$> getSystemTime - (expired_, stored) <- runRight $ isolateQueue q "" $ withIdleMsgQueue now ms q $ deleteExpireMsgs_ old q + (expired_, stored) <- runRight $ isolateQueue ms q "" $ withIdleMsgQueue now ms q $ deleteExpireMsgs_ old q expired_ `shouldBe` Just 0 stored `shouldBe` 0 (Nothing, False) <- readQueueState ms statePath @@ -484,7 +542,7 @@ testReadFileMissing ms = do Msg "message 1" <- tryPeekMsg ms q pure q - mq <- fromJust <$> readTVarIO (msgQueue q) + mq <- fromJust <$> readTVarIO (msgQueue' q) MsgQueueState {readState = rs} <- readTVarIO $ state mq closeMsgQueue ms q let path = journalFilePath (queueDirectory $ queue mq) $ journalId rs @@ -503,7 +561,7 @@ testReadFileMissingSwitch ms = do (rId, qr) <- testNewQueueRec g QMMessaging q <- writeMessages ms rId qr - mq <- fromJust <$> readTVarIO (msgQueue q) + mq <- fromJust <$> readTVarIO (msgQueue' q) MsgQueueState {readState = rs} <- readTVarIO $ state mq closeMsgQueue ms q let path = journalFilePath (queueDirectory $ queue mq) $ journalId rs @@ -521,7 +579,7 @@ testWriteFileMissing ms = do (rId, qr) <- testNewQueueRec g QMMessaging q <- writeMessages ms rId qr - mq <- fromJust <$> readTVarIO (msgQueue q) + mq <- fromJust <$> readTVarIO (msgQueue' q) MsgQueueState {writeState = ws} <- readTVarIO $ state mq closeMsgQueue ms q let path = journalFilePath (queueDirectory $ queue mq) $ journalId ws @@ -544,7 +602,7 @@ testReadAndWriteFilesMissing ms = do (rId, qr) <- testNewQueueRec g QMMessaging q <- writeMessages ms rId qr - mq <- fromJust <$> readTVarIO (msgQueue q) + mq <- fromJust <$> readTVarIO (msgQueue' q) MsgQueueState {readState = rs, writeState = ws} <- readTVarIO $ state mq closeMsgQueue ms q removeFile $ journalFilePath (queueDirectory $ queue mq) $ journalId rs diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 1f867744a..3c1ac0150 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -274,9 +274,14 @@ serverStoreConfig_ useDbStoreLog = \case ASType SQSMemory SMSJournal -> ASSCfg SQSMemory SMSJournal $ SSCMemoryJournal {storeLogFile = testStoreLogFile, storeMsgsPath = testStoreMsgsDir} ASType SQSPostgres SMSJournal -> - let dbStoreLogPath = if useDbStoreLog then Just testStoreLogFile else Nothing - storeCfg = PostgresStoreCfg {dbOpts = testStoreDBOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = 86400} - in ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeCfg, storeMsgsPath' = testStoreMsgsDir} + ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeCfg, storeMsgsPath' = testStoreMsgsDir} +#if defined(dbServerPostgres) + ASType SQSPostgres SMSPostgres -> + ASSCfg SQSPostgres SMSPostgres $ SSCDatabase storeCfg +#endif + where + dbStoreLogPath = if useDbStoreLog then Just testStoreLogFile else Nothing + storeCfg = PostgresStoreCfg {dbOpts = testStoreDBOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = 86400} cfgV7 :: AServerConfig cfgV7 = updateCfg cfg $ \cfg' -> cfg' {smpServerVRange = mkVersionRange minServerSMPRelayVersion authCmdsSMPVersion} diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 6fd617bdd..53269d6f6 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -14,6 +14,7 @@ {-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} {-# OPTIONS_GHC -Wno-orphans #-} +{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} module ServerTests where @@ -45,7 +46,7 @@ import Simplex.Messaging.Server (exportMessages) import Simplex.Messaging.Server.Env.STM (AStoreType (..), MsgStore (..), ServerConfig (..), ServerStoreCfg (..), readWriteQueueStore) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..), QStoreCfg (..), stmQueueStore) -import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SMSType (..), SQSType (..), newMsgStore) +import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), QSType (..), SMSType (..), SQSType (..), newMsgStore) import Simplex.Messaging.Server.Stats (PeriodStatsData (..), ServerStatsData (..)) import Simplex.Messaging.Server.StoreLog (StoreLogRecord (..), closeStoreLog) import Simplex.Messaging.Transport @@ -59,6 +60,11 @@ import Test.HUnit import Test.Hspec hiding (fit, it) import Util +#if defined(dbServerPostgres) +import CoreTests.MsgStoreTests (testPostgresStoreConfig) +import Simplex.Messaging.Server.MsgStore.Postgres (PostgresMsgStoreCfg (..), exportDbMessages) +#endif + serverTests :: SpecWith (ASrvTransport, AStoreType) serverTests = do describe "SMP queues" $ do @@ -915,14 +921,27 @@ testRestoreExpireMessages = exportStoreMessages :: AStoreType -> IO () exportStoreMessages = \case ASType _ SMSJournal -> export + ASType _ SMSPostgres -> exportDB ASType _ SMSMemory -> pure () where export = do - ms <- newMsgStore (testJournalStoreCfg MQStoreCfg) {quota = 4} - readWriteQueueStore True (mkQueue ms True) testStoreLogFile (stmQueueStore ms) >>= closeStoreLog - removeFileIfExists testStoreMsgsFile + ms <- readWriteQueues exportMessages False (StoreJournal ms) testStoreMsgsFile False closeMsgStore ms +#if defined(dbServerPostgres) + exportDB = do + readWriteQueues >>= closeMsgStore + ms' <- newMsgStore (testPostgresStoreConfig {quota = 4} :: PostgresMsgStoreCfg) + _n <- withFile testStoreMsgsFile WriteMode $ exportDbMessages False ms' + closeMsgStore ms' +#else + exportDB = error "compiled without server_postgres flag" +#endif + readWriteQueues = do + ms <- newMsgStore ((testJournalStoreCfg MQStoreCfg) {quota = 4} :: JournalStoreConfig 'QSMemory) + readWriteQueueStore True (mkQueue ms True) testStoreLogFile (stmQueueStore ms) >>= closeStoreLog + removeFileIfExists testStoreMsgsFile + pure ms runTest :: Transport c => TProxy c 'TServer -> (THandleSMP c 'TClient -> IO ()) -> ThreadId -> Expectation runTest _ test' server = do testSMPClient test' `shouldReturn` () diff --git a/tests/Test.hs b/tests/Test.hs index b88eb3bf5..364080e0c 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -102,9 +102,11 @@ main = do ] -- skipComparisonForDownMigrations testStoreDBOpts "src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql" - around_ (postgressBracket testServerDBConnectInfo) $ + around_ (postgressBracket testServerDBConnectInfo) $ do describe "SMP server via TLS, postgres+jornal message store" $ before (pure (transport @TLS, ASType SQSPostgres SMSJournal)) serverTests + describe "SMP server via TLS, postgres-only message store" $ + before (pure (transport @TLS, ASType SQSPostgres SMSPostgres)) serverTests #endif describe "SMP server via TLS, jornal message store" $ do describe "SMP syntax" $ serverSyntaxTests (transport @TLS) @@ -125,13 +127,18 @@ main = do around_ (postgressBracket ntfTestServerDBConnectInfo) $ do describe "Notifications server (SMP server: jornal store)" $ ntfServerTests (transport @TLS, ASType SQSMemory SMSJournal) - around_ (postgressBracket testServerDBConnectInfo) $ + around_ (postgressBracket testServerDBConnectInfo) $ do describe "Notifications server (SMP server: postgres+jornal store)" $ ntfServerTests (transport @TLS, ASType SQSPostgres SMSJournal) + describe "Notifications server (SMP server: postgres-only store)" $ + ntfServerTests (transport @TLS, ASType SQSPostgres SMSPostgres) around_ (postgressBracket testServerDBConnectInfo) $ do describe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal) + describe "SMP client agent, postgres-only message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSPostgres) describe "SMP proxy, postgres+jornal message store" $ before (pure $ ASType SQSPostgres SMSJournal) smpProxyTests + describe "SMP proxy, postgres-only message store" $ + before (pure $ ASType SQSPostgres SMSPostgres) smpProxyTests #endif describe "SMP client agent, jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal) describe "SMP proxy, jornal message store" $