From 7b42aaa13207d4077cbd3acec0b6cd54f402491e Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 10 Mar 2025 09:31:50 +0000 Subject: [PATCH] smp server: expire messages in postgres database, mark queues as deleted, combine tables (#1471) * smp server: expire messages in postgres database * tty * fail if nothing updated in db * remove old deleted queues * index * fix tests --- src/Simplex/Messaging/Server.hs | 22 +- src/Simplex/Messaging/Server/CLI.hs | 3 +- src/Simplex/Messaging/Server/Env/STM.hs | 9 +- src/Simplex/Messaging/Server/Main.hs | 29 ++- .../Messaging/Server/MsgStore/Journal.hs | 65 +----- .../Messaging/Server/MsgStore/Types.hs | 2 +- .../Messaging/Server/QueueStore/Postgres.hs | 214 ++++++++++-------- .../Server/QueueStore/Postgres/Migrations.hs | 15 +- .../Messaging/Server/QueueStore/STM.hs | 4 +- .../Messaging/Server/QueueStore/Types.hs | 3 +- tests/CoreTests/MsgStoreTests.hs | 5 +- tests/SMPClient.hs | 4 +- 12 files changed, 184 insertions(+), 191 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index b0edfa2dd..f0b97529a 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -394,16 +394,20 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt let interval = checkInterval expCfg * 1000000 stats <- asks serverStats labelMyThread "expireMessagesThread" - liftIO $ forever $ do - threadDelay' interval - old <- expireBeforeEpoch expCfg - now <- systemSeconds <$> getSystemTime - msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} <- - withActiveMsgQueues ms $ expireQueueMsgs now ms old - atomicWriteIORef (msgCount stats) stored - atomicModifyIORef'_ (msgExpired stats) (+ expired) - printMessageStats "STORE: messages" msgStats + liftIO $ forever $ expire ms stats interval where + expire :: forall s. MsgStoreClass s => s -> ServerStats -> Int64 -> IO () + expire ms stats interval = do + threadDelay' interval + n <- compactQueues @(StoreQueue s) $ queueStore ms + when (n > 0) $ logInfo $ "Removed " <> tshow n <> " old deleted queues from the database." + old <- expireBeforeEpoch expCfg + now <- systemSeconds <$> getSystemTime + msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} <- + withAllMsgQueues False ms $ expireQueueMsgs now ms old + atomicWriteIORef (msgCount stats) stored + atomicModifyIORef'_ (msgExpired stats) (+ expired) + printMessageStats "STORE: messages" msgStats expireQueueMsgs now ms old q = fmap (fromRight newMessageStats) . runExceptT $ do (expired_, stored) <- idleDeleteExpiredMsgs now ms q old pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1} diff --git a/src/Simplex/Messaging/Server/CLI.hs b/src/Simplex/Messaging/Server/CLI.hs index 3e52d2001..a1e9f8d83 100644 --- a/src/Simplex/Messaging/Server/CLI.hs +++ b/src/Simplex/Messaging/Server/CLI.hs @@ -31,6 +31,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Common (DBOpts (..)) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), ProtocolServer (..), ProtocolTypeI) import Simplex.Messaging.Server.Env.STM (AServerStoreCfg (..), ServerStoreCfg (..), StorePaths (..)) +import Simplex.Messaging.Server.QueueStore.Postgres (PostgresStoreCfg (..)) import Simplex.Messaging.Transport (ATransport (..), TLS, Transport (..)) import Simplex.Messaging.Transport.Server (AddHTTP, loadFileFingerprint) import Simplex.Messaging.Transport.WebSockets (WS) @@ -310,7 +311,7 @@ printSMPServerConfig :: [(ServiceName, ATransport, AddHTTP)] -> AServerStoreCfg printSMPServerConfig transports (ASSCfg _ _ cfg) = case cfg of SSCMemory sp_ -> printServerConfig transports $ (\StorePaths {storeLogFile} -> storeLogFile) <$> sp_ SSCMemoryJournal {storeLogFile} -> printServerConfig transports $ Just storeLogFile - SSCDatabaseJournal {storeDBOpts = DBOpts {connstr, schema}} -> do + SSCDatabaseJournal {storeCfg = PostgresStoreCfg {dbOpts = DBOpts {connstr, schema}}} -> do B.putStrLn $ "PostgreSQL database: " <> connstr <> ", schema: " <> schema printServerTransports transports diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index da8c22b1a..3e890c236 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -16,6 +16,7 @@ #if __GLASGOW_HASKELL__ == 810 {-# LANGUAGE UndecidableInstances #-} #endif +{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} module Simplex.Messaging.Server.Env.STM where @@ -43,7 +44,6 @@ import Network.Socket (ServiceName) import qualified Network.TLS as T import Numeric.Natural import Simplex.Messaging.Agent.Lock -import Simplex.Messaging.Agent.Store.Postgres.Common (DBOpts) import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..)) import Simplex.Messaging.Client.Agent (SMPClientAgent, SMPClientAgentConfig, newSMPClientAgent) import Simplex.Messaging.Crypto (KeyHash (..)) @@ -56,6 +56,7 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.NtfStore import Simplex.Messaging.Server.QueueStore +import Simplex.Messaging.Server.QueueStore.Postgres (PostgresStoreCfg (..)) import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore, setStoreLog) import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.Stats @@ -216,7 +217,7 @@ data AStoreType = forall qs ms. SupportedStore qs ms => ASType (SQSType qs) (SMS data ServerStoreCfg qs ms where SSCMemory :: Maybe StorePaths -> ServerStoreCfg 'QSMemory 'MSMemory SSCMemoryJournal :: {storeLogFile :: FilePath, storeMsgsPath :: FilePath} -> ServerStoreCfg 'QSMemory 'MSJournal - SSCDatabaseJournal :: {storeDBOpts :: DBOpts, confirmMigrations :: MigrationConfirmation, storeMsgsPath' :: FilePath} -> ServerStoreCfg 'QSPostgres 'MSJournal + SSCDatabaseJournal :: {storeCfg :: PostgresStoreCfg, storeMsgsPath' :: FilePath} -> ServerStoreCfg 'QSPostgres 'MSJournal data StorePaths = StorePaths {storeLogFile :: FilePath, storeMsgsFile :: Maybe FilePath} @@ -338,9 +339,9 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp ms <- newMsgStore cfg loadStoreLog (mkQueue ms) storeLogFile $ stmQueueStore ms pure $ AMS qt mt ms - ASSCfg qt mt SSCDatabaseJournal {storeDBOpts, storeMsgsPath'} -> do + ASSCfg qt mt SSCDatabaseJournal {storeCfg, storeMsgsPath'} -> do let StartOptions {confirmMigrations} = startOptions - qsCfg = PQStoreCfg storeDBOpts confirmMigrations + qsCfg = PQStoreCfg (storeCfg {confirmMigrations} :: PostgresStoreCfg) cfg = mkJournalStoreConfig qsCfg storeMsgsPath' msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval ms <- newMsgStore cfg pure $ AMS qt mt ms diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 388a6a069..66dfba70b 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -24,6 +24,7 @@ import Data.Char (isAlpha, isAscii, toUpper) import Data.Either (fromRight) import Data.Functor (($>)) import Data.Ini (Ini, lookupValue, readIniFile) +import Data.Int (Int64) import Data.List (find, isPrefixOf) import qualified Data.List.NonEmpty as L import Data.Maybe (fromMaybe, isJust, isNothing) @@ -52,7 +53,7 @@ import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.Information import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore (..), JournalQueue, QStoreCfg (..), postgresQueueStore, stmQueueStore) import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), QSType (..), SQSType (..), SMSType (..), newMsgStore) -import Simplex.Messaging.Server.QueueStore.Postgres (batchInsertQueues, foldQueueRecs) +import Simplex.Messaging.Server.QueueStore.Postgres (PostgresStoreCfg (..), batchInsertQueues, foldQueueRecs) import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.StoreLog (logCreateQueue, openWriteStoreLog) import Simplex.Messaging.Server.StoreLog.ReadWrite (readQueueStore) @@ -165,10 +166,11 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = ms <- newJournalMsgStore MQStoreCfg readQueueStore True (mkQueue ms) storeLogFile (queueStore ms) queues <- readTVarIO $ loadedQueues $ stmQueueStore ms - ps <- newJournalMsgStore $ PQStoreCfg dbOpts {createSchema = True} MCConsole - (qCnt, nCnt) <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps + let storeCfg = PostgresStoreCfg {dbOpts = dbOpts {createSchema = True}, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini} + ps <- newJournalMsgStore $ PQStoreCfg storeCfg + qCnt <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps renameFile storeLogFile $ storeLogFile <> ".bak" - putStrLn $ "Import completed: " <> show qCnt <> " queues, " <> show nCnt <> " notifiers" + putStrLn $ "Import completed: " <> show qCnt <> " queues" putStrLn $ case readStoreType ini of Right (ASType SQSMemory SMSMemory) -> "store_messages set to `memory`.\nImport messages to journal to use PostgreSQL database for queues (`smp-server journal import`)" Right (ASType SQSMemory SMSJournal) -> "store_queues set to `memory`, update it to `database` in INI file" @@ -186,9 +188,10 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = confirmOrExit ("WARNING: PostrgreSQL database schema " <> B.unpack schema <> " (database: " <> B.unpack connstr <> ") will be exported to store log file " <> storeLogFilePath) "Queue records not exported" - ps <- newJournalMsgStore $ PQStoreCfg dbOpts MCConsole + let storeCfg = PostgresStoreCfg {dbOpts, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini} + ps <- newJournalMsgStore $ PQStoreCfg storeCfg sl <- openWriteStoreLog storeLogFilePath - Sum qCnt <- foldQueueRecs True (postgresQueueStore ps) $ \rId qr -> logCreateQueue sl rId qr $> Sum (1 :: Int) + Sum qCnt <- foldQueueRecs True (postgresQueueStore ps) $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int) putStrLn $ "Export completed: " <> show qCnt <> " queues" putStrLn $ case readStoreType ini of Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, update it to `memory` in INI file." @@ -242,14 +245,15 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = DBOpts { connstr = either (const defaultDBConnStr) encodeUtf8 $ lookupValue "STORE_LOG" "db_connection" ini, schema = either (const defaultDBSchema) encodeUtf8 $ lookupValue "STORE_LOG" "db_schema" ini, - poolSize = either (const defaultDBPoolSize) (read . T.unpack) $ lookupValue "STORE_LOG" "db_pool_size" ini, + poolSize = readIniDefault defaultDBPoolSize "STORE_LOG" "db_pool_size" ini, createSchema = False } dbOptsIniContent :: DBOpts -> Text - dbOptsIniContent DBOpts {connstr, schema, poolSize } = + dbOptsIniContent DBOpts {connstr, schema, poolSize} = (optDisabled' (connstr == defaultDBConnStr) <> "db_connection: " <> safeDecodeUtf8 connstr <> "\n") <> (optDisabled' (schema == defaultDBSchema) <> "db_schema: " <> safeDecodeUtf8 schema <> "\n") <> (optDisabled' (poolSize == defaultDBPoolSize) <> "db_pool_size: " <> tshow poolSize <> "\n\n") + iniDeletedTTL ini = readIniDefault (86400 * defaultDeletedTTL) "STORE_LOG" "db_deleted_ttl" ini httpsCertFile = combine cfgPath "web.crt" httpsKeyFile = combine cfgPath "web.key" defaultStaticPath = combine logPath "www" @@ -333,6 +337,8 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = \store_queues: memory\n\n\ \# Database connection settings for PostgreSQL database (`store_queues: database`).\n" <> dbOptsIniContent dbOptions + <> "# Time to retain deleted queues in the database, days.\n" + <> ("db_deleted_ttl: " <> tshow defaultDeletedTTL <> "\n\n") <> "# Message storage mode: `memory` or `journal`.\n\ \store_messages: memory\n\n\ \# When store_messages is `memory`, undelivered messages are optionally saved and restored\n\ @@ -498,7 +504,8 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = ASType SQSMemory SMSJournal -> ASSCfg SQSMemory SMSJournal $ SSCMemoryJournal {storeLogFile = storeLogFilePath, storeMsgsPath = storeMsgsJournalDir} ASType SQSPostgres SMSJournal -> - ASSCfg SQSPostgres SMSJournal $ SSCDatabaseJournal {storeDBOpts = iniDBOptions ini, confirmMigrations = MCYesUp, storeMsgsPath' = storeMsgsJournalDir}, + let storeCfg = PostgresStoreCfg {dbOpts = iniDBOptions ini, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini} + in ASSCfg SQSPostgres SMSJournal $ SSCDatabaseJournal {storeCfg, storeMsgsPath' = storeMsgsJournalDir}, storeNtfsFile = restoreMessagesFile storeNtfsFilePath, -- allow creating new queues by default allowNewQueues = fromMaybe True $ iniOnOff "AUTH" "new_queues" ini, @@ -640,6 +647,10 @@ defaultDBSchema = "smp_server" defaultDBPoolSize :: Natural defaultDBPoolSize = 10 +-- time to retain deleted queues in the database (days), for debugging +defaultDeletedTTL :: Int64 +defaultDeletedTTL = 21 + defaultControlPort :: Int defaultControlPort = 5224 diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 32fefb386..f7a65488d 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -58,7 +58,7 @@ import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) import Data.Int (Int64) import Data.List (intercalate, sort) -import Data.Maybe (catMaybes, fromMaybe, isNothing, mapMaybe) +import Data.Maybe (fromMaybe, isNothing, mapMaybe) import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime) @@ -67,8 +67,6 @@ import Data.Time.Format.ISO8601 (iso8601Show, iso8601ParseM) import GHC.IO (catchAny) import Simplex.Messaging.Agent.Client (getMapLock, withLockMap) import Simplex.Messaging.Agent.Lock -import Simplex.Messaging.Agent.Store.Postgres.Common (DBOpts) -import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol import Simplex.Messaging.Server.MsgStore.Types @@ -81,9 +79,8 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>)) import System.Directory -import System.Exit import System.FilePath (takeFileName, ()) -import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..), stdout) +import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..)) import qualified System.IO as IO import System.Random (StdGen, genByteString, newStdGen) @@ -137,7 +134,7 @@ data JournalStoreConfig s = JournalStoreConfig data QStoreCfg s where MQStoreCfg :: QStoreCfg 'QSMemory - PQStoreCfg :: DBOpts -> MigrationConfirmation -> QStoreCfg 'QSPostgres + PQStoreCfg :: PostgresStoreCfg -> QStoreCfg 'QSPostgres data JournalQueue (s :: QSType) = JournalQueue { recipientId' :: RecipientId, @@ -289,10 +286,12 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where newQueueStore :: QStoreCfg s -> IO (QStore s) newQueueStore = \case MQStoreCfg -> MQStore <$> newQueueStore @(JournalQueue s) () - PQStoreCfg dbOpts confirmMigrations -> PQStore <$> newQueueStore @(JournalQueue s) (dbOpts, confirmMigrations) + PQStoreCfg cfg -> PQStore <$> newQueueStore @(JournalQueue s) cfg loadedQueues = withQS loadedQueues {-# INLINE loadedQueues #-} + compactQueues = withQS (compactQueues @(JournalQueue s)) + {-# INLINE compactQueues #-} queueCounts = withQS (queueCounts @(JournalQueue s)) {-# INLINE queueCounts #-} addQueue_ = withQS addQueue_ @@ -341,54 +340,12 @@ instance MsgStoreClass (JournalMsgStore s) where closeQueues qs = readTVarIO qs >>= mapM_ closeMsgQueue withActiveMsgQueues :: Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a - withActiveMsgQueues ms f = case queueStore_ ms of - MQStore st -> withLoadedQueues st f - PQStore st -> withLoadedQueues st f + withActiveMsgQueues = withQS withLoadedQueues . queueStore_ - -- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result. - -- It is used to export storage to a single file and also to expire messages and validate all queues when server is started. - -- TODO this function requires case-sensitive file system, because it uses queue directory as recipient ID. - -- It can be made to support case-insensite FS by supporting more than one queue per directory, by getting recipient ID from state file name. - -- TODO [postgres] this should simply load all known queues and process them - withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a - withAllMsgQueues tty ms@JournalMsgStore {config} action = ifM (doesDirectoryExist storePath) processStore (pure mempty) - where - processStore = do - (!count, !res) <- foldQueues 0 processQueue (0, mempty) ("", storePath) - putStrLn $ progress count - pure res - JournalStoreConfig {storePath, pathParts} = config - processQueue :: (Int, a) -> (String, FilePath) -> IO (Int, a) - processQueue (!i, !r) (queueId, dir) = do - when (tty && i `mod` 100 == 0) $ putStr (progress i <> "\r") >> IO.hFlush stdout - r' <- case strDecode $ B.pack queueId of - Right rId -> - getQueue ms SRecipient rId >>= \case - Right q -> unStoreIO (getMsgQueue ms q False) *> action q <* closeMsgQueue q - Left AUTH -> do - logWarn $ "STORE: processQueue, queue " <> T.pack queueId <> " was removed, removing " <> T.pack dir - removeQueueDirectory_ dir - pure mempty - Left e -> do - logError $ "STORE: processQueue, error getting queue " <> T.pack queueId <> ", " <> tshow e - exitFailure - Left e -> do - logError $ "STORE: processQueue, message queue directory " <> T.pack dir <> " is invalid, " <> tshow e - exitFailure - pure (i + 1, r <> r') - progress i = "Processed: " <> show i <> " queues" - foldQueues depth f acc (queueId, path) = do - let f' = if depth == pathParts - 1 then f else foldQueues (depth + 1) f - listDirs >>= foldM f' acc - where - listDirs = fmap catMaybes . mapM queuePath =<< listDirectory path - queuePath dir = do - let !path' = path dir - !queueId' = queueId <> dir - ifM - (doesDirectoryExist path') - (pure $ Just (queueId', path')) - (Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping")) + withAllMsgQueues :: Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a + withAllMsgQueues tty ms action = case queueStore_ ms of + MQStore st -> withLoadedQueues st action + PQStore st -> foldQueues tty st (mkQueue ms) action logQueueStates :: JournalMsgStore s -> IO () logQueueStates ms = withActiveMsgQueues ms $ unStoreIO . logQueueState diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 2aa741a8f..420517ac4 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -83,7 +83,7 @@ getQueueRec st party qId = $>>= (\q -> maybe (Left AUTH) (Right . (q,)) <$> readTVarIO (queueRec q)) getQueueMessages :: MsgStoreClass s => Bool -> s -> StoreQueue s -> ExceptT ErrorType IO [Message] -getQueueMessages drainMsgs st q = withPeekMsgQueue st q "getQueueSize" $ maybe (pure []) (getQueueMessages_ drainMsgs q . fst) +getQueueMessages drainMsgs st q = withPeekMsgQueue st q "getQueueMessages" $ maybe (pure []) (getQueueMessages_ drainMsgs q . fst) {-# INLINE getQueueMessages #-} getQueueSize :: MsgStoreClass s => s -> StoreQueue s -> ExceptT ErrorType IO Int diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 8988cb3f3..0923ab244 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -20,8 +20,10 @@ module Simplex.Messaging.Server.QueueStore.Postgres ( PostgresQueueStore (..), + PostgresStoreCfg (..), batchInsertQueues, foldQueueRecs, + foldQueues, ) where @@ -32,21 +34,23 @@ import Control.Monad.Except import Control.Monad.IO.Class import Control.Monad.Trans.Except import Data.Bitraversable (bimapM) +import Data.Either (fromRight) import Data.Functor (($>)) import Data.Int (Int64) import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes, mapMaybe) +import Data.Maybe (catMaybes) import qualified Data.Text as T -import Database.PostgreSQL.Simple (Binary (..), Only (..), Query, SqlError, (:.) (..)) -import qualified Database.PostgreSQL.Simple as PSQL +import Data.Time.Clock.System (SystemTime (..), getSystemTime) +import Database.PostgreSQL.Simple (Binary (..), Only (..), Query, SqlError) +import qualified Database.PostgreSQL.Simple as DB +import Database.PostgreSQL.Simple.FromField (FromField (..)) +import Database.PostgreSQL.Simple.ToField (ToField (..)) import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation) import Database.PostgreSQL.Simple.SqlQQ (sql) import Simplex.Messaging.Agent.Client (withLockMap) import Simplex.Messaging.Agent.Lock (Lock) import Simplex.Messaging.Agent.Store.Postgres (createDBStore) import Simplex.Messaging.Agent.Store.Postgres.Common -import Simplex.Messaging.Agent.Store.Postgres.DB (FromField (..), ToField (..)) -import qualified Simplex.Messaging.Agent.Store.Postgres.DB as DB import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.QueueStore @@ -73,20 +77,27 @@ data PostgresQueueStore q = PostgresQueueStore senders :: TMap SenderId RecipientId, -- this map only cashes the queues that were attempted to be subscribed to, notifiers :: TMap NotifierId RecipientId, - notifierLocks :: TMap NotifierId Lock + notifierLocks :: TMap NotifierId Lock, + deletedTTL :: Int64 + } + +data PostgresStoreCfg = PostgresStoreCfg + { dbOpts :: DBOpts, + confirmMigrations :: MigrationConfirmation, + deletedTTL :: Int64 } instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where - type QueueStoreCfg (PostgresQueueStore q) = (DBOpts, MigrationConfirmation) + type QueueStoreCfg (PostgresQueueStore q) = PostgresStoreCfg - newQueueStore :: (DBOpts, MigrationConfirmation) -> IO (PostgresQueueStore q) - newQueueStore (dbOpts, confirmMigrations) = do + newQueueStore :: PostgresStoreCfg -> IO (PostgresQueueStore q) + newQueueStore PostgresStoreCfg {dbOpts, confirmMigrations, deletedTTL} = do dbStore <- either err pure =<< createDBStore dbOpts serverMigrations confirmMigrations queues <- TM.emptyIO senders <- TM.emptyIO notifiers <- TM.emptyIO notifierLocks <- TM.emptyIO - pure PostgresQueueStore {dbStore, queues, senders, notifiers, notifierLocks} + pure PostgresQueueStore {dbStore, queues, senders, notifiers, notifierLocks, deletedTTL} where err e = do logError $ "STORE: newQueueStore, error opening PostgreSQL database, " <> tshow e @@ -95,6 +106,12 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where loadedQueues = queues {-# INLINE loadedQueues #-} + compactQueues :: PostgresQueueStore q -> IO Int64 + compactQueues st@PostgresQueueStore {deletedTTL} = do + old <- subtract deletedTTL . systemSeconds <$> liftIO getSystemTime + fmap (fromRight 0) $ runExceptT $ withDB' "removeDeletedQueues" st $ \db -> + DB.execute db "DELETE FROM msg_queues WHERE deleted_at < ?" (Only old) + queueCounts :: PostgresQueueStore q -> IO QueueCounts queueCounts st = withConnection (dbStore st) $ \db -> do @@ -103,8 +120,8 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where db [sql| SELECT - (SELECT COUNT(1) FROM msg_queues) AS queue_count, - (SELECT COUNT(1) FROM msg_notifiers) AS notifier_count + (SELECT COUNT(1) FROM msg_queues WHERE deleted_at IS NULL) AS queue_count, + (SELECT COUNT(1) FROM msg_queues WHERE deleted_at IS NULL AND notifier_id IS NOT NULL) AS notifier_count |] pure QueueCounts {queueCount, notifierCount} @@ -114,8 +131,9 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where addQueue_ st mkQ rId qr = do sq <- mkQ rId qr withQueueLock sq "addQueue_" $ runExceptT $ do - withDB "addQueue_" st $ \db -> - E.try (insertQueueDB db rId qr) >>= bimapM handleDuplicate pure + void $ withDB "addQueue_" st $ \db -> + E.try (DB.execute db insertQueueQuery $ queueRecToRow (rId, qr)) + >>= bimapM handleDuplicate pure atomically $ TM.insert rId sq queues atomically $ TM.insert (senderId qr) rId senders pure sq @@ -134,32 +152,21 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where where PostgresQueueStore {queues, senders, notifiers} = st getRcvQueue rId = TM.lookupIO rId queues >>= maybe loadRcvQueue (pure . Right) - loadRcvQueue = loadQueue " WHERE q.recipient_id = ?" $ \_ -> pure () - loadSndQueue = loadQueue " WHERE q.sender_id = ?" $ \rId -> TM.insert qId rId senders - loadNtfQueue = loadQueue " WHERE n.notifier_id = ?" $ \_ -> pure () -- do NOT cache ref - ntf subscriptions are rare - loadQueue condition insertRef = runExceptT $ do - (rId, qRec) <- loadQueueRec - sq <- liftIO $ mkQ rId qRec - atomically $ - -- checking the cache again for concurrent reads, - -- use previously loaded queue if exists. - TM.lookup rId queues >>= \case - Just sq' -> pure sq' - Nothing -> do - insertRef rId - TM.insert rId sq queues - pure sq + loadRcvQueue = loadQueue " WHERE recipient_id = ?" $ \_ -> pure () + loadSndQueue = loadQueue " WHERE sender_id = ?" $ \rId -> TM.insert qId rId senders + loadNtfQueue = loadQueue " WHERE notifier_id = ?" $ \_ -> pure () -- do NOT cache ref - ntf subscriptions are rare + loadQueue condition insertRef = runExceptT $ loadQueueRec >>= liftIO . cachedOrLoadedQueue st mkQ insertRef where loadQueueRec = withDB "getQueue_" st $ \db -> firstRow rowToQueueRec AUTH $ - DB.query db (queueRecQuery <> condition) (Only qId) + DB.query db (queueRecQuery <> condition <> " AND deleted_at IS NULL") (Only qId) secureQueue :: PostgresQueueStore q -> q -> SndPublicAuthKey -> IO (Either ErrorType ()) secureQueue st sq sKey = withQueueDB sq "secureQueue" $ \q -> do verify q - withDB' "secureQueue" st $ \db -> - DB.execute db "UPDATE msg_queues SET sender_key = ? WHERE recipient_id = ?" (sKey, recipientId sq) + assertUpdated $ withDB' "secureQueue" st $ \db -> + DB.execute db "UPDATE msg_queues SET sender_key = ? WHERE recipient_id = ? AND deleted_at IS NULL" (sKey, recipientId sq) atomically $ writeTVar (queueRec sq) $ Just q {senderKey = Just sKey} where verify q = case senderKey q of @@ -171,8 +178,8 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where withQueueDB sq "addQueueNotifier" $ \q -> ExceptT $ withLockMap (notifierLocks st) nId "addQueueNotifier" $ ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ runExceptT $ do - withDB "addQueueNotifier" st $ \db -> - E.try (insert db) >>= bimapM handleDuplicate pure + assertUpdated $ withDB "addQueueNotifier" st $ \db -> + E.try (update db) >>= bimapM handleDuplicate pure nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> notifierId let !q' = q {notifier = Just ntfCreds} atomically $ writeTVar (queueRec sq) $ Just q' @@ -183,29 +190,35 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where PostgresQueueStore {notifiers} = st rId = recipientId sq -- TODO [postgres] test how this query works with duplicate recipient_id (updates) and notifier_id (fails) - insert db = + update db = DB.execute db [sql| - INSERT INTO msg_notifiers (recipient_id, notifier_id, notifier_key, rcv_ntf_dh_secret) - VALUES (?, ?, ?, ?) - ON CONFLICT (recipient_id) DO UPDATE - SET notifier_id = EXCLUDED.notifier_id, - notifier_key = EXCLUDED.notifier_key, - rcv_ntf_dh_secret = EXCLUDED.rcv_ntf_dh_secret + UPDATE msg_queues + SET notifier_id = ?, notifier_key = ?, rcv_ntf_dh_secret = ? + WHERE recipient_id = ? AND deleted_at IS NULL |] - (rId, nId, notifierKey, rcvNtfDhSecret) + (nId, notifierKey, rcvNtfDhSecret, rId) deleteQueueNotifier :: PostgresQueueStore q -> q -> IO (Either ErrorType (Maybe NotifierId)) deleteQueueNotifier st sq = withQueueDB sq "deleteQueueNotifier" $ \q -> ExceptT $ fmap sequence $ forM (notifier q) $ \NtfCreds {notifierId = nId} -> withLockMap (notifierLocks st) nId "deleteQueueNotifier" $ runExceptT $ do - withDB' "deleteQueueNotifier" st $ \db -> - DB.execute db "DELETE FROM msg_notifiers WHERE notifier_id = ?" (Only nId) + assertUpdated $ withDB' "deleteQueueNotifier" st update atomically $ TM.delete nId $ notifiers st atomically $ writeTVar (queueRec sq) $ Just q {notifier = Nothing} pure nId + where + update db = + DB.execute + db + [sql| + UPDATE msg_queues + SET notifier_id = NULL, notifier_key = NULL, rcv_ntf_dh_secret = NULL + WHERE recipient_id = ? AND deleted_at IS NULL + |] + (Only $ recipientId sq) suspendQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType ()) suspendQueue st sq = setStatusDB "suspendQueue" st sq EntityOff @@ -222,8 +235,8 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where if updatedAt == Just t then pure q else do - withDB' "updateQueueTime" st $ \db -> - DB.execute db "UPDATE msg_queues SET updated_at = ? WHERE recipient_id = ?" (t, recipientId sq) + assertUpdated $ withDB' "updateQueueTime" st $ \db -> + DB.execute db "UPDATE msg_queues SET updated_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (t, recipientId sq) let !q' = q {updatedAt = Just t} atomically $ writeTVar (queueRec sq) $ Just q' pure q' @@ -232,8 +245,9 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where deleteStoreQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q))) deleteStoreQueue st sq = runExceptT $ do q <- ExceptT $ readQueueRecIO qr - withDB' "deleteStoreQueue" st $ \db -> - DB.execute db "DELETE FROM msg_queues WHERE recipient_id = ?" (Only $ Binary $ unEntityId $ recipientId sq) + RoundedSystemTime ts <- liftIO getSystemDate + assertUpdated $ withDB' "deleteStoreQueue" st $ \db -> + DB.execute db "UPDATE msg_queues SET deleted_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (ts, recipientId sq) atomically $ writeTVar qr Nothing atomically $ TM.delete (senderId q) $ senders st forM_ (notifier q) $ \NtfCreds {notifierId} -> atomically $ TM.delete notifierId $ notifiers st @@ -241,32 +255,21 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where where qr = queueRec sq -insertQueueDB :: DB.Connection -> RecipientId -> QueueRec -> IO () -insertQueueDB db rId QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} = do - DB.execute db insertQueueQuery (rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, status, updatedAt) - forM_ notifier $ \NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} -> - DB.execute db insertNotifierQuery (rId, notifierId, notifierKey, rcvNtfDhSecret) - -batchInsertQueues :: StoreQueueClass q => Bool -> M.Map RecipientId q -> PostgresQueueStore q' -> IO (Int64, Int64) +batchInsertQueues :: StoreQueueClass q => Bool -> M.Map RecipientId q -> PostgresQueueStore q' -> IO Int64 batchInsertQueues tty queues toStore = do qs <- catMaybes <$> mapM (\(rId, q) -> (rId,) <$$> readTVarIO (queueRec q)) (M.assocs queues) putStrLn $ "Importing " <> show (length qs) <> " queues..." let st = dbStore toStore - (ns, count) <- foldM (processChunk st) ((0, 0), 0) $ toChunks 1000000 qs + (qCnt, count) <- foldM (processChunk st) (0, 0) $ toChunks 1000000 qs putStrLn $ progress count - pure ns + pure qCnt where - processChunk st ((qCnt, nCnt), i) qs = do - qCnt' <- withConnection st $ \db -> PSQL.executeMany db insertQueueQuery $ map toQueueRow qs - nCnt' <- withConnection st $ \db -> PSQL.executeMany db insertNotifierQuery $ mapMaybe toNotifierRow qs + processChunk st (qCnt, i) qs = do + qCnt' <- withConnection st $ \db -> DB.executeMany db insertQueueQuery $ map queueRecToRow qs let i' = i + length qs when tty $ putStr (progress i' <> "\r") >> hFlush stdout - pure ((qCnt + qCnt', nCnt + nCnt'), i') + pure (qCnt + qCnt', i') progress i = "Imported: " <> show i <> " queues" - toQueueRow (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, status, updatedAt}) = - (rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, status, updatedAt) - toNotifierRow (rId, QueueRec {notifier}) = - (\NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} -> (rId, notifierId, notifierKey, rcvNtfDhSecret)) <$> notifier toChunks :: Int -> [a] -> [[a]] toChunks _ [] = [] toChunks n xs = @@ -277,62 +280,81 @@ insertQueueQuery :: Query insertQueueQuery = [sql| INSERT INTO msg_queues - (recipient_id, recipient_key, rcv_dh_secret, sender_id, sender_key, snd_secure, status, updated_at) - VALUES (?,?,?,?,?,?,?,?) + (recipient_id, recipient_key, rcv_dh_secret, sender_id, sender_key, snd_secure, notifier_id, notifier_key, rcv_ntf_dh_secret, status, updated_at) + VALUES (?,?,?,?,?,?,?,?,?,?,?) |] -insertNotifierQuery :: Query -insertNotifierQuery = - [sql| - INSERT INTO msg_notifiers (recipient_id, notifier_id, notifier_key, rcv_ntf_dh_secret) - VALUES (?, ?, ?, ?) - |] +foldQueues :: Monoid a => Bool -> PostgresQueueStore q -> (RecipientId -> QueueRec -> IO q) -> (q -> IO a) -> IO a +foldQueues tty st mkQ f = + foldQueueRecs tty st $ cachedOrLoadedQueue st mkQ (\_ -> pure ()) >=> f -foldQueueRecs :: Monoid a => Bool -> PostgresQueueStore q -> (RecipientId -> QueueRec -> IO a) -> IO a +foldQueueRecs :: Monoid a => Bool -> PostgresQueueStore q -> ((RecipientId, QueueRec) -> IO a) -> IO a foldQueueRecs tty st f = do - fmap snd $ withConnection (dbStore st) $ \db -> - PSQL.fold_ db queueRecQuery (0 :: Int, mempty) $ \(!i, !acc) row -> do - r <- uncurry f (rowToQueueRec row) + (n, r) <- withConnection (dbStore st) $ \db -> + DB.fold_ db (queueRecQuery <> " WHERE deleted_at IS NULL") (0 :: Int, mempty) $ \(!i, !acc) row -> do + r <- f $ rowToQueueRec row let i' = i + 1 - when (tty && i' `mod` 100000 == 0) $ putStr ("Processed: " <> show i <> " records\r") >> hFlush stdout + when (tty && i' `mod` 100000 == 0) $ putStr (progress i <> "\r") >> hFlush stdout pure (i', acc <> r) + when tty $ putStrLn $ progress n + pure r + where + progress i = "Processed: " <> show i <> " records" queueRecQuery :: Query queueRecQuery = [sql| - SELECT q.recipient_id, q.recipient_key, q.rcv_dh_secret, q.sender_id, q.sender_key, q.snd_secure, q.status, q.updated_at, - n.notifier_id, n.notifier_key, n.rcv_ntf_dh_secret - FROM msg_queues q - LEFT JOIN msg_notifiers n ON q.recipient_id = n.recipient_id + SELECT recipient_id, recipient_key, rcv_dh_secret, + sender_id, sender_key, snd_secure, + notifier_id, notifier_key, rcv_ntf_dh_secret, + status, updated_at + FROM msg_queues |] -rowToQueueRec :: ( (RecipientId, RcvPublicAuthKey, RcvDhSecret, SenderId, Maybe SndPublicAuthKey, SenderCanSecure, ServerEntityStatus, Maybe RoundedSystemTime) - :. (Maybe NotifierId, Maybe NtfPublicAuthKey, Maybe RcvNtfDhSecret) - ) -> (RecipientId, QueueRec) -rowToQueueRec ((rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, status, updatedAt) :. (notifierId_, notifierKey_, rcvNtfDhSecret_)) = +cachedOrLoadedQueue :: PostgresQueueStore q -> (RecipientId -> QueueRec -> IO q) -> (RecipientId -> STM ()) -> (RecipientId, QueueRec) -> IO q +cachedOrLoadedQueue PostgresQueueStore {queues} mkQ insertRef (rId, qRec) = do + sq <- liftIO $ mkQ rId qRec -- loaded queue + atomically $ + -- checking the cache again for concurrent reads, + -- use previously loaded queue if exists. + TM.lookup rId queues >>= \case + Just sq' -> pure sq' + Nothing -> do + insertRef rId + TM.insert rId sq queues + pure sq + +type QueueRecRow = (RecipientId, RcvPublicAuthKey, RcvDhSecret, SenderId, Maybe SndPublicAuthKey, SenderCanSecure, Maybe NotifierId, Maybe NtfPublicAuthKey, Maybe RcvNtfDhSecret, ServerEntityStatus, Maybe RoundedSystemTime) + +queueRecToRow :: (RecipientId, QueueRec) -> QueueRecRow +queueRecToRow (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier = n, status, updatedAt}) = + (rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifierId <$> n, notifierKey <$> n, rcvNtfDhSecret <$> n, status, updatedAt) + +rowToQueueRec :: QueueRecRow -> (RecipientId, QueueRec) +rowToQueueRec (rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifierId_, notifierKey_, rcvNtfDhSecret_, status, updatedAt) = let notifier = NtfCreds <$> notifierId_ <*> notifierKey_ <*> rcvNtfDhSecret_ - in (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt}) + in (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt}) setStatusDB :: StoreQueueClass q => String -> PostgresQueueStore q -> q -> ServerEntityStatus -> IO (Either ErrorType ()) setStatusDB op st sq status = withQueueDB sq op $ \q -> do - withDB' op st $ \db -> - DB.execute db "UPDATE msg_queues SET status = ? WHERE recipient_id = ?" (status, recipientId sq) + assertUpdated $ withDB' op st $ \db -> + DB.execute db "UPDATE msg_queues SET status = ? WHERE recipient_id = ? AND deleted_at IS NULL" (status, recipientId sq) atomically $ writeTVar (queueRec sq) $ Just q {status} withQueueDB :: StoreQueueClass q => q -> String -> (QueueRec -> ExceptT ErrorType IO a) -> IO (Either ErrorType a) withQueueDB sq op action = - withQueueLock sq op $ runExceptT $ do - q <- ExceptT $ readQueueRecIO $ queueRec sq - action q + withQueueLock sq op $ runExceptT $ ExceptT (readQueueRecIO $ queueRec sq) >>= action + +assertUpdated :: ExceptT ErrorType IO Int64 -> ExceptT ErrorType IO () +assertUpdated = (>>= \n -> when (n == 0) (throwE AUTH)) withDB' :: String -> PostgresQueueStore q -> (DB.Connection -> IO a) -> ExceptT ErrorType IO a -withDB' op st' action = withDB op st' $ fmap Right . action +withDB' op st action = withDB op st $ fmap Right . action --- TODO [postgres] possibly, use with connection if queries in addQueue_ are combined withDB :: forall a q. String -> PostgresQueueStore q -> (DB.Connection -> IO (Either ErrorType a)) -> ExceptT ErrorType IO a -withDB op st' action = - ExceptT $ E.try (withTransaction (dbStore st') action) >>= either logErr pure +withDB op st action = + ExceptT $ E.try (withConnection (dbStore st) action) >>= either logErr pure where logErr :: E.SomeException -> IO (Either ErrorType a) logErr e = logError ("STORE: " <> T.pack err) $> Left (STORE err) diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs index 98027b5b6..03b6fecf6 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs @@ -31,19 +31,16 @@ CREATE TABLE msg_queues( sender_id BYTEA NOT NULL, sender_key BYTEA, snd_secure BOOLEAN NOT NULL, + notifier_id BYTEA, + notifier_key BYTEA, + rcv_ntf_dh_secret BYTEA, status TEXT NOT NULL, updated_at BIGINT, + deleted_at BIGINT, PRIMARY KEY (recipient_id) ); -CREATE TABLE msg_notifiers( - notifier_id BYTEA NOT NULL, - recipient_id BYTEA NOT NULL REFERENCES msg_queues(recipient_id) ON DELETE CASCADE ON UPDATE RESTRICT, - notifier_key BYTEA NOT NULL, - rcv_ntf_dh_secret BYTEA NOT NULL, - PRIMARY KEY (notifier_id) -); - CREATE UNIQUE INDEX idx_msg_queues_sender_id ON msg_queues(sender_id); -CREATE UNIQUE INDEX idx_msg_notifiers_recipient_id ON msg_notifiers(recipient_id); +CREATE UNIQUE INDEX idx_msg_queues_notifier_id ON msg_queues(notifier_id); +CREATE INDEX idx_msg_queues_deleted_at ON msg_queues (deleted_at); |] diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index bebadf7c5..648748162 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -64,8 +64,8 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where loadedQueues = queues {-# INLINE loadedQueues #-} - -- foldAllQueues = withLoadedQueues - -- {-# INLINE foldAllQueues #-} + compactQueues _ = pure 0 + {-# INLINE compactQueues #-} queueCounts :: STMQueueStore q -> IO QueueCounts queueCounts st = do diff --git a/src/Simplex/Messaging/Server/QueueStore/Types.hs b/src/Simplex/Messaging/Server/QueueStore/Types.hs index 47283ac7c..2951e4038 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Types.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Types.hs @@ -9,6 +9,7 @@ module Simplex.Messaging.Server.QueueStore.Types where import Control.Concurrent.STM import Control.Monad +import Data.Int (Int64) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.TMap (TMap) @@ -25,7 +26,7 @@ class StoreQueueClass q => QueueStoreClass q s where newQueueStore :: QueueStoreCfg s -> IO s queueCounts :: s -> IO QueueCounts loadedQueues :: s -> TMap RecipientId q - -- foldAllQueues :: Monoid a => s -> (q -> IO a) -> IO a + compactQueues :: s -> IO Int64 addQueue_ :: s -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q) getQueue_ :: DirectParty p => s -> (RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) secureQueue :: s -> q -> SndPublicAuthKey -> IO (Either ErrorType ()) diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index ccd8ea66d..bbb7a7e8c 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -217,11 +217,8 @@ testExportImportStore ms = do length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3 exportMessages False ms testStoreMsgsFile False - renameFile testStoreMsgsFile (testStoreMsgsFile <> ".copy") closeMsgStore ms closeStoreLog sl - exportMessages False ms testStoreMsgsFile False - (B.readFile testStoreMsgsFile `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".copy") let cfg = (testJournalStoreCfg MQStoreCfg :: JournalStoreConfig 'QSMemory) {storePath = testStoreMsgsDir2} ms' <- newMsgStore cfg readWriteQueueStore True (mkQueue ms') testStoreLogFile (queueStore ms') >>= closeStoreLog @@ -229,7 +226,7 @@ testExportImportStore ms = do importMessages False ms' testStoreMsgsFile Nothing False printMessageStats "Messages" stats length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 - length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 4 -- state file is backed up, 2 message files + length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3 -- 2 message files exportMessages False ms' testStoreMsgsFile2 False (B.readFile testStoreMsgsFile2 `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".bak") stmStore <- newMsgStore testSMTStoreConfig diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 28639b0a5..4bae8215a 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -28,6 +28,7 @@ import Simplex.Messaging.Protocol import Simplex.Messaging.Server (runSMPServerBlocking) import Simplex.Messaging.Server.Env.STM import Simplex.Messaging.Server.MsgStore.Types (SMSType (..), SQSType (..)) +import Simplex.Messaging.Server.QueueStore.Postgres (PostgresStoreCfg (..)) import Simplex.Messaging.Transport import Simplex.Messaging.Transport.Client import qualified Simplex.Messaging.Transport.Client as Client @@ -180,7 +181,8 @@ cfgMS msType = ASType SQSMemory SMSJournal -> ASSCfg SQSMemory SMSJournal $ SSCMemoryJournal {storeLogFile = testStoreLogFile, storeMsgsPath = testStoreMsgsDir} ASType SQSPostgres SMSJournal -> - ASSCfg SQSPostgres SMSJournal $ SSCDatabaseJournal {storeDBOpts = testStoreDBOpts, confirmMigrations = MCYesUp, storeMsgsPath' = testStoreMsgsDir}, + let storeCfg = PostgresStoreCfg {dbOpts = testStoreDBOpts, confirmMigrations = MCYesUp, deletedTTL = 86400} + in ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeCfg, storeMsgsPath' = testStoreMsgsDir}, storeNtfsFile = Nothing, allowNewQueues = True, newQueueBasicAuth = Nothing,