smp server: store messages in PostgreSQL (#1622)

* smp server: store messages in PostgreSQL

* stored procedures to write and to expire messages

* function to export messages

* move all message functions to PostgreSQL, remove delete trigger

* comments

* import messages to db

* fix message import, add export

* fix export

* fix export

* fix compilation flags

* import messages line by line

* fix server start with database storage

* fix compilation

* comments
This commit is contained in:
Evgeny
2025-09-11 20:22:55 +01:00
committed by GitHub
parent 0c1030cf02
commit bac6ea6e91
22 changed files with 1277 additions and 206 deletions

View File

@@ -266,6 +266,7 @@ library
Simplex.Messaging.Notifications.Server.Store.Postgres
Simplex.Messaging.Notifications.Server.Store.Types
Simplex.Messaging.Notifications.Server.StoreLog
Simplex.Messaging.Server.MsgStore.Postgres
Simplex.Messaging.Server.QueueStore.Postgres
Simplex.Messaging.Server.QueueStore.Postgres.Migrations
other-modules:

View File

@@ -287,7 +287,7 @@ import Simplex.Messaging.Protocol
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Agent.Store.Entity
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, firstRow, firstRow', ifM, maybeFirstRow, tshow, ($>>=), (<$$>))
import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, firstRow, firstRow', ifM, maybeFirstRow, maybeFirstRow', tshow, ($>>=), (<$$>))
import Simplex.Messaging.Version.Internal
import qualified UnliftIO.Exception as E
import UnliftIO.STM
@@ -426,15 +426,12 @@ deleteConnRecord :: DB.Connection -> ConnId -> IO ()
deleteConnRecord db connId = DB.execute db "DELETE FROM connections WHERE conn_id = ?" (Only connId)
checkConfirmedSndQueueExists_ :: DB.Connection -> NewSndQueue -> IO Bool
checkConfirmedSndQueueExists_ db SndQueue {server, sndId} = do
fromMaybe False
<$> maybeFirstRow
fromOnly
( DB.query
db
"SELECT 1 FROM snd_queues WHERE host = ? AND port = ? AND snd_id = ? AND status != ? LIMIT 1"
(host server, port server, sndId, New)
)
checkConfirmedSndQueueExists_ db SndQueue {server, sndId} =
maybeFirstRow' False fromOnly $
DB.query
db
"SELECT 1 FROM snd_queues WHERE host = ? AND port = ? AND snd_id = ? AND status != ? LIMIT 1"
(host server, port server, sndId, New)
getRcvConn :: DB.Connection -> SMPServer -> SMP.RecipientId -> IO (Either StoreError (RcvQueue, SomeConn))
getRcvConn db ProtocolServer {host, port} rcvId = runExceptT $ do
@@ -1072,15 +1069,12 @@ toRcvMsg ((agentMsgId, internalTs, brokerId, brokerTs) :. (sndMsgId, integrity,
in RcvMsg {internalId = InternalId agentMsgId, msgMeta, msgType, msgBody, internalHash, msgReceipt, userAck}
checkRcvMsgHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool
checkRcvMsgHashExists db connId hash = do
fromMaybe False
<$> maybeFirstRow
fromOnly
( DB.query
db
"SELECT 1 FROM encrypted_rcv_message_hashes WHERE conn_id = ? AND hash = ? LIMIT 1"
(connId, Binary hash)
)
checkRcvMsgHashExists db connId hash =
maybeFirstRow' False fromOnly $
DB.query
db
"SELECT 1 FROM encrypted_rcv_message_hashes WHERE conn_id = ? AND hash = ? LIMIT 1"
(connId, Binary hash)
getRcvMsgBrokerTs :: DB.Connection -> ConnId -> SMP.MsgId -> IO (Either StoreError BrokerTs)
getRcvMsgBrokerTs db connId msgId =
@@ -2119,15 +2113,12 @@ addProcessedRatchetKeyHash db connId hash =
DB.execute db "INSERT INTO processed_ratchet_key_hashes (conn_id, hash) VALUES (?,?)" (connId, Binary hash)
checkRatchetKeyHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool
checkRatchetKeyHashExists db connId hash = do
fromMaybe False
<$> maybeFirstRow
fromOnly
( DB.query
db
"SELECT 1 FROM processed_ratchet_key_hashes WHERE conn_id = ? AND hash = ? LIMIT 1"
(connId, Binary hash)
)
checkRatchetKeyHashExists db connId hash =
maybeFirstRow' False fromOnly $
DB.query
db
"SELECT 1 FROM processed_ratchet_key_hashes WHERE conn_id = ? AND hash = ? LIMIT 1"
(connId, Binary hash)
deleteRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> IO ()
deleteRatchetKeyHashesExpired db ttl = do
@@ -2905,8 +2896,8 @@ deleteSndFile' db sndFileId =
getSndFileDeleted :: DB.Connection -> DBSndFileId -> IO Bool
getSndFileDeleted db sndFileId =
fromMaybe True
<$> maybeFirstRow fromOnlyBI (DB.query db "SELECT deleted FROM snd_files WHERE snd_file_id = ?" (Only sndFileId))
maybeFirstRow' True fromOnlyBI $
DB.query db "SELECT deleted FROM snd_files WHERE snd_file_id = ?" (Only sndFileId)
createSndFileReplica :: DB.Connection -> SndFileChunk -> NewSndChunkReplica -> IO ()
createSndFileReplica db SndFileChunk {sndChunkId} = createSndFileReplica_ db sndChunkId

View File

@@ -105,7 +105,7 @@ import Simplex.Messaging.Server.Control
import Simplex.Messaging.Server.Env.STM as Env
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.MsgStore
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, getJournalQueueMessages)
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue (..), getJournalQueueMessages)
import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.NtfStore
@@ -132,12 +132,17 @@ import UnliftIO.Directory (doesFileExist, renameFile)
import UnliftIO.Exception
import UnliftIO.IO
import UnliftIO.STM
#if MIN_VERSION_base(4,18,0)
import Data.List (sort)
import GHC.Conc (listThreads, threadStatus)
import GHC.Conc.Sync (threadLabel)
#endif
#if defined(dbServerPostgres)
import Simplex.Messaging.Server.MsgStore.Postgres (exportDbMessages, getDbMessageStats)
#endif
-- | Runs an SMP server using passed configuration.
--
-- See a full server here: https://github.com/simplex-chat/simplexmq/blob/master/apps/smp-server/Main.hs
@@ -477,7 +482,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
atomicWriteIORef (msgCount stats) stored
atomicModifyIORef'_ (msgExpired stats) (+ expired)
printMessageStats "STORE: messages" msgStats
Left e -> logError $ "STORE: withAllMsgQueues, error expiring messages, " <> tshow e
Left e -> logError $ "STORE: expireOldMessages, error expiring messages, " <> tshow e
expireNtfsThread :: ServerConfig s -> M s ()
expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do
@@ -2109,6 +2114,9 @@ saveServerMessages drainMsgs ms = case ms of
Just f -> exportMessages False ms f drainMsgs
Nothing -> logNote "undelivered messages are not saved"
StoreJournal _ -> logNote "closed journal message storage"
#if defined(dbServerPostgres)
StoreDatabase _ -> logNote "closed postgres message storage"
#endif
exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath -> Bool -> IO ()
exportMessages tty st f drainMsgs = do
@@ -2116,6 +2124,9 @@ exportMessages tty st f drainMsgs = do
run $ case st of
StoreMemory ms -> exportMessages_ ms $ getMsgs ms
StoreJournal ms -> exportMessages_ ms $ getJournalMsgs ms
#if defined(dbServerPostgres)
StoreDatabase ms -> exportDbMessages tty ms
#endif
where
exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty ms . saveQueueMsgs get
run :: (Handle -> IO Int) -> IO ()
@@ -2125,7 +2136,7 @@ exportMessages tty st f drainMsgs = do
logError $ "error exporting messages: " <> tshow e
exitFailure
getJournalMsgs ms q =
readTVarIO (msgQueue q) >>= \case
readTVarIO (msgQueue' q) >>= \case
Just _ -> getMsgs ms q
Nothing -> getJournalQueueMessages ms q
getMsgs :: MsgStoreClass s' => s' -> StoreQueue s' -> IO [Message]
@@ -2149,6 +2160,9 @@ processServerMessages StartOptions {skipWarnings} = do
Just f -> ifM (doesFileExist f) (Just <$> importMessages False ms f old_ skipWarnings) (pure Nothing)
Nothing -> pure Nothing
StoreJournal ms -> processJournalMessages old_ expire ms
#if defined(dbServerPostgres)
StoreDatabase ms -> processDbMessages old_ expire ms
#endif
processJournalMessages :: forall s. Maybe Int64 -> Bool -> JournalMsgStore s -> IO (Maybe MessageStats)
processJournalMessages old_ expire ms
| expire = Just <$> case old_ of
@@ -2171,6 +2185,17 @@ processServerMessages StartOptions {skipWarnings} = do
processValidateQueue q = unsafeRunStore q "processValidateQueue" $ do
storedMsgsCount <- getQueueSize_ =<< getMsgQueue ms q False
pure newMessageStats {storedMsgsCount, storedQueues = 1}
#if defined(dbServerPostgres)
processDbMessages old_ expire ms
| expire = Just <$> case old_ of
Just old -> do
-- TODO [messages] expire messages from all queues, not only recent
logNote "expiring database store messages..."
now <- systemSeconds <$> getSystemTime
expireOldMessages False ms now (now - old)
Nothing -> getDbMessageStats ms
| otherwise = logWarn "skipping message expiration" $> Nothing
#endif
importMessages :: forall s. MsgStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> Bool -> IO MessageStats
importMessages tty ms f old_ skipWarnings = do

View File

@@ -33,7 +33,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Options (DBOpts (..))
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..))
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), ProtocolServer (..), ProtocolTypeI)
import Simplex.Messaging.Server.Env.STM (ServerStoreCfg (..), StartOptions (..), StorePaths (..))
import Simplex.Messaging.Server.Env.STM (ServerStoreCfg (..), StartOptions (..), dbStoreCfg, storeLogFile')
import Simplex.Messaging.Server.Main.GitCommit
import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..))
import Simplex.Messaging.Transport (ASrvTransport, ATransport (..), TLS, Transport (..), simplexMQVersion)
@@ -414,12 +414,13 @@ printServerTransports protocol ts = do
\Set `port` in smp-server.ini section [TRANSPORT] to `5223,443`\n"
printSMPServerConfig :: [(ServiceName, ASrvTransport, AddHTTP)] -> ServerStoreCfg s -> IO ()
printSMPServerConfig transports = \case
SSCMemory sp_ -> printServerConfig "SMP" transports $ (\StorePaths {storeLogFile} -> storeLogFile) <$> sp_
SSCMemoryJournal {storeLogFile} -> printServerConfig "SMP" transports $ Just storeLogFile
SSCDatabaseJournal {storeCfg = PostgresStoreCfg {dbOpts = DBOpts {connstr, schema}}} -> do
B.putStrLn $ "PostgreSQL database: " <> connstr <> ", schema: " <> schema
printServerTransports "SMP" transports
printSMPServerConfig transports st = case dbStoreCfg st of
Just cfg -> printDBConfig cfg
Nothing -> printServerConfig "SMP" transports $ storeLogFile' st
where
printDBConfig PostgresStoreCfg {dbOpts = DBOpts {connstr, schema}} = do
B.putStrLn $ "PostgreSQL database: " <> connstr <> ", schema: " <> schema
printServerTransports "SMP" transports
deleteDirIfExists :: FilePath -> IO ()
deleteDirIfExists path = whenM (doesDirectoryExist path) $ removeDirectoryRecursive path

View File

@@ -72,7 +72,10 @@ module Simplex.Messaging.Server.Env.STM
defaultIdleQueueInterval,
journalMsgStoreDepth,
readWriteQueueStore,
noPostgresExitStr,
noPostgresExit,
dbStoreCfg,
storeLogFile',
)
where
@@ -131,6 +134,10 @@ import System.IO (IOMode (..))
import System.Mem.Weak (Weak)
import UnliftIO.STM
#if defined(dbServerPostgres)
import Simplex.Messaging.Server.MsgStore.Postgres
#endif
data ServerConfig s = ServerConfig
{ transports :: [(ServiceName, ASrvTransport, AddHTTP)],
smpHandshakeTimeout :: Int,
@@ -275,14 +282,25 @@ fromMsgStore :: MsgStore s -> s
fromMsgStore = \case
StoreMemory s -> s
StoreJournal s -> s
#if defined(dbServerPostgres)
StoreDatabase s -> s
#endif
{-# INLINE fromMsgStore #-}
type family SupportedStore (qs :: QSType) (ms :: MSType) :: Constraint where
SupportedStore 'QSMemory 'MSMemory = ()
SupportedStore 'QSMemory 'MSJournal = ()
SupportedStore 'QSPostgres 'MSJournal = ()
SupportedStore 'QSMemory 'MSPostgres =
(Int ~ Bool, TypeError ('TE.Text "Storing messages in Postgres DB with queues in memory is not supported"))
SupportedStore 'QSPostgres 'MSMemory =
(Int ~ Bool, TypeError ('TE.Text "Storing messages in memory with Postgres DB is not supported"))
(Int ~ Bool, TypeError ('TE.Text "Storing messages in memory with queues in Postgres DB is not supported"))
SupportedStore 'QSPostgres 'MSJournal = ()
#if defined(dbServerPostgres)
SupportedStore 'QSPostgres 'MSPostgres = ()
#else
SupportedStore 'QSPostgres 'MSPostgres =
(Int ~ Bool, TypeError ('TE.Text "Server compiled without server_postgres flag"))
#endif
data AStoreType =
forall qs ms. (SupportedStore qs ms, MsgStoreClass (MsgStoreType qs ms)) =>
@@ -292,16 +310,43 @@ data ServerStoreCfg s where
SSCMemory :: Maybe StorePaths -> ServerStoreCfg STMMsgStore
SSCMemoryJournal :: {storeLogFile :: FilePath, storeMsgsPath :: FilePath} -> ServerStoreCfg (JournalMsgStore 'QSMemory)
SSCDatabaseJournal :: {storeCfg :: PostgresStoreCfg, storeMsgsPath' :: FilePath} -> ServerStoreCfg (JournalMsgStore 'QSPostgres)
#if defined(dbServerPostgres)
SSCDatabase :: PostgresStoreCfg -> ServerStoreCfg PostgresMsgStore
#endif
dbStoreCfg :: ServerStoreCfg s -> Maybe PostgresStoreCfg
dbStoreCfg = \case
SSCMemory _ -> Nothing
SSCMemoryJournal {} -> Nothing
SSCDatabaseJournal {storeCfg} -> Just storeCfg
#if defined(dbServerPostgres)
SSCDatabase cfg -> Just cfg
#endif
storeLogFile' :: ServerStoreCfg s -> Maybe FilePath
storeLogFile' = \case
SSCMemory sp_ -> (\StorePaths {storeLogFile} -> storeLogFile) <$> sp_
SSCMemoryJournal {storeLogFile} -> Just storeLogFile
SSCDatabaseJournal {storeCfg = PostgresStoreCfg {dbStoreLogPath}} -> dbStoreLogPath
#if defined(dbServerPostgres)
SSCDatabase (PostgresStoreCfg {dbStoreLogPath}) -> dbStoreLogPath
#endif
data StorePaths = StorePaths {storeLogFile :: FilePath, storeMsgsFile :: Maybe FilePath}
type family MsgStoreType (qs :: QSType) (ms :: MSType) where
MsgStoreType 'QSMemory 'MSMemory = STMMsgStore
MsgStoreType qs 'MSJournal = JournalMsgStore qs
#if defined(dbServerPostgres)
MsgStoreType 'QSPostgres 'MSPostgres = PostgresMsgStore
#endif
data MsgStore s where
StoreMemory :: STMMsgStore -> MsgStore STMMsgStore
StoreJournal :: JournalMsgStore qs -> MsgStore (JournalMsgStore qs)
#if defined(dbServerPostgres)
StoreDatabase :: PostgresMsgStore -> MsgStore PostgresMsgStore
#endif
data Server s = Server
{ clients :: ServerClients s,
@@ -533,8 +578,12 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp
qsCfg = PQStoreCfg (storeCfg {confirmMigrations} :: PostgresStoreCfg)
cfg = mkJournalStoreConfig qsCfg storeMsgsPath' msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval
when compactLog $ compactDbStoreLog $ dbStoreLogPath storeCfg
ms <- newMsgStore cfg
pure $ StoreJournal ms
StoreJournal <$> newMsgStore cfg
SSCDatabase storeCfg -> do
let StartOptions {compactLog, confirmMigrations} = startOptions config
cfg = PostgresMsgStoreCfg storeCfg {confirmMigrations} msgQueueQuota
when compactLog $ compactDbStoreLog $ dbStoreLogPath storeCfg
StoreDatabase <$> newMsgStore cfg
#else
SSCDatabaseJournal {} -> noPostgresExit
#endif
@@ -628,10 +677,12 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp
_ -> SPMMessages
noPostgresExit :: IO a
noPostgresExit = do
putStrLn "Error: server binary is compiled without support for PostgreSQL database."
putStrLn "Please download `smp-server-postgres` or re-compile with `cabal build -fserver_postgres`."
exitFailure
noPostgresExit = putStrLn noPostgresExitStr >> exitFailure
noPostgresExitStr :: String
noPostgresExitStr =
"Error: server binary is compiled without support for PostgreSQL database.\n"
<> "Please download `smp-server-postgres` or re-compile with `cabal build -fserver_postgres`."
mkJournalStoreConfig :: QStoreCfg s -> FilePath -> Int -> Int -> Int -> Int64 -> JournalStoreConfig s
mkJournalStoreConfig queueStoreCfg storePath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval =

View File

@@ -18,9 +18,10 @@
module Simplex.Messaging.Server.Main where
import Control.Concurrent.STM
import Control.Exception (finally)
import Control.Exception (SomeException, finally, try)
import Control.Logger.Simple
import Control.Monad
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Char (isAlpha, isAscii, toUpper)
@@ -64,7 +65,7 @@ import Simplex.Messaging.Util (eitherToMaybe, ifM, unlessM)
import System.Directory (createDirectoryIfMissing, doesDirectoryExist, doesFileExist)
import System.Exit (exitFailure)
import System.FilePath (combine)
import System.IO (BufferMode (..), hSetBuffering, stderr, stdout)
import System.IO (BufferMode (..), IOMode (..), hSetBuffering, stderr, stdout, withFile)
import Text.Read (readMaybe)
#if defined(dbServerPostgres)
@@ -73,6 +74,7 @@ import Simplex.Messaging.Agent.Store.Postgres (checkSchemaExists)
import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue)
import Simplex.Messaging.Server.MsgStore.Types (QSType (..))
import Simplex.Messaging.Server.MsgStore.Journal (postgresQueueStore)
import Simplex.Messaging.Server.MsgStore.Postgres (PostgresMsgStoreCfg (..), batchInsertMessages, exportDbMessages)
import Simplex.Messaging.Server.QueueStore.Postgres (batchInsertQueues, batchInsertServices, foldQueueRecs, foldServiceRecs)
import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..))
import Simplex.Messaging.Server.QueueStore.Types
@@ -129,6 +131,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
printMessageStats "Messages" msgStats
putStrLn $ case readStoreType ini of
Right (ASType SQSMemory SMSMemory) -> "store_messages set to `memory`, update it to `journal` in INI file"
Right (ASType SQSPostgres SMSPostgres) -> "store_messages set to `database`, update it to `journal` in INI file"
Right (ASType _ SMSJournal) -> "store_messages set to `journal`"
Left e -> e <> ", configure storage correctly"
SCExport
@@ -149,8 +152,8 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
putStrLn $ case msType of
SMSMemory -> "store_messages set to `memory`, start the server."
SMSJournal -> "store_messages set to `journal`, update it to `memory` in INI file"
Right (ASType SQSPostgres SMSJournal) -> do
#if defined(dbServerPostgres)
Right (ASType SQSPostgres SMSJournal) -> do
let dbStoreLogPath = enableDbStoreLog' ini $> storeLogFilePath
dbOpts@DBOpts {connstr, schema} = iniDBOptions ini defaultDBOpts
unlessM (checkSchemaExists connstr schema) $ do
@@ -160,8 +163,11 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
exportMessages True (StoreJournal ms) storeMsgsFilePath False
putStrLn "Export completed"
putStrLn "store_messages set to `journal`, store_queues is set to `database`.\nExport queues to store log to use memory storage for messages (`smp-server database export`)."
Right (ASType SQSPostgres SMSPostgres) -> do
putStrLn $ "Messages can be exported with `dabatase export --table messages`."
exitFailure
#else
noPostgresExit
Right (ASType SQSPostgres SMSJournal) -> noPostgresExit
#endif
Left e -> putStrLn $ e <> ", configure storage correctly"
SCDelete
@@ -175,11 +181,12 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
deleteDirIfExists storeMsgsJournalDir
putStrLn $ "Deleted all messages in journal " <> storeMsgsJournalDir
#if defined(dbServerPostgres)
Database cmd dbOpts@DBOpts {connstr, schema} -> withIniFile $ \ini -> do
Database cmd tables dbOpts@DBOpts {connstr, schema} -> withIniFile $ \ini -> do
schemaExists <- checkSchemaExists connstr schema
storeLogExists <- doesFileExist storeLogFilePath
case cmd of
SCImport
msgsFileExists <- doesFileExist storeMsgsFilePath
case (cmd, tables) of
(SCImport, DTQueues)
| schemaExists && storeLogExists -> exitConfigureQueueStore connstr schema
| schemaExists -> do
putStrLn $ "Schema " <> B.unpack schema <> " already exists in PostrgreSQL database: " <> B.unpack connstr
@@ -197,12 +204,29 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
putStrLn $ case readStoreType ini of
Right (ASType SQSMemory SMSMemory) -> setToDbStr <> "\nstore_messages set to `memory`, import messages to journal to use PostgreSQL database for queues (`smp-server journal import`)"
Right (ASType SQSMemory SMSJournal) -> setToDbStr
Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, start the server."
Right (ASType SQSPostgres _) -> "store_queues set to `database`, start the server."
Left e -> e <> ", configure storage correctly"
where
setToDbStr :: String
setToDbStr = "store_queues set to `memory`, update it to `database` in INI file"
SCExport
(SCImport, DTMessages)
| not schemaExists -> do
putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr
exitFailure
| not msgsFileExists -> do
putStrLn $ storeMsgsFilePath <> " file does not exist."
exitFailure
| otherwise -> do
confirmOrExit
("WARNING: message log file " <> storeMsgsFilePath <> " will be imported to PostrgreSQL database " <> B.unpack connstr <> ", schema: " <> B.unpack schema)
"Message records not imported"
mCnt <- importMessagesToDatabase storeMsgsFilePath dbOpts
putStrLn $ "Import completed: " <> show mCnt <> " messages"
putStrLn $ case readStoreType ini of
Right (ASType SQSPostgres SMSPostgres) -> "store_queues and store_messages set to `database`, start the server."
Right _ -> "set store_queues and store_messages set to `database` in INI file"
Left e -> e <> ", configure storage correctly"
(SCExport, DTQueues)
| schemaExists && storeLogExists -> exitConfigureQueueStore connstr schema
| not schemaExists -> do
putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr
@@ -212,15 +236,33 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
exitFailure
| otherwise -> do
confirmOrExit
("WARNING: PostrgreSQL database schema " <> B.unpack schema <> " (database: " <> B.unpack connstr <> ") will be exported to store log file " <> storeLogFilePath)
("WARNING: PostrgreSQL schema " <> B.unpack schema <> " (database: " <> B.unpack connstr <> ") will be exported to store log file " <> storeLogFilePath)
"Queue records not exported"
(sCnt, qCnt) <- exportDatabaseToStoreLog logPath dbOpts storeLogFilePath
putStrLn $ "Export completed: " <> show sCnt <> " services, " <> show qCnt <> " queues"
putStrLn $ case readStoreType ini of
Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, update it to `memory` in INI file."
Right (ASType SQSPostgres _) -> "store_queues or store_messages set to `database`, update it to `memory` in INI file."
Right (ASType SQSMemory _) -> "store_queues set to `memory`, start the server"
Left e -> e <> ", configure storage correctly"
SCDelete
(SCExport, DTMessages)
| not schemaExists -> do
putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr
exitFailure
| msgsFileExists -> do
putStrLn $ storeMsgsFilePath <> " file already exists."
exitFailure
| otherwise -> do
confirmOrExit
("WARNING: Messages from PostrgreSQL schema " <> B.unpack schema <> " (database: " <> B.unpack connstr <> ") will be exported to message log file " <> storeMsgsFilePath)
"Message records not exported"
let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL}
ms <- newMsgStore $ PostgresMsgStoreCfg storeCfg defaultMsgQueueQuota
withFile storeMsgsFilePath WriteMode (try . exportDbMessages True ms) >>= \case
Right mCnt -> do
putStrLn $ "Export completed: " <> show mCnt <> " messages"
putStrLn "Export queues with `smp-server database export queues`"
Left (e :: SomeException) -> putStrLn $ "Error exporting messages: " <> show e
(SCDelete, _)
| not schemaExists -> do
putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr
exitFailure
@@ -254,8 +296,14 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
readStoreType ini = case (iniStoreQueues, iniStoreMessage) of
("memory", "memory") -> Right $ ASType SQSMemory SMSMemory
("memory", "journal") -> Right $ ASType SQSMemory SMSJournal
("memory", "database") -> Left "Database and memory storage are not compatible."
("database", "memory") -> Left "Database and memory storage are not compatible."
("database", "journal") -> Right $ ASType SQSPostgres SMSJournal
("database", "memory") -> Left "Using PostgreSQL database requires journal memory storage."
#if defined(dbServerPostgres)
("database", "database") -> Right $ ASType SQSPostgres SMSPostgres
#else
("database", "database") -> Left noPostgresExitStr
#endif
(q, m) -> Left $ T.unpack $ "Invalid storage settings: store_queues: " <> q <> ", store_messages: " <> m
where
iniStoreQueues = fromRight "memory" $ lookupValue "STORE_LOG" "store_queues" ini
@@ -405,6 +453,12 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
let dbStoreLogPath = enableDbStoreLog' ini $> storeLogFilePath
storeCfg = PostgresStoreCfg {dbOpts = iniDBOptions ini defaultDBOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini}
in SSCDatabaseJournal {storeCfg, storeMsgsPath' = storeMsgsJournalDir}
#if defined(dbServerPostgres)
iniStoreCfg SQSPostgres SMSPostgres =
let dbStoreLogPath = enableDbStoreLog' ini $> storeLogFilePath
storeCfg = PostgresStoreCfg {dbOpts = iniDBOptions ini defaultDBOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini}
in SSCDatabase storeCfg
#endif
serverConfig :: ServerStoreCfg s -> ServerConfig s
serverConfig serverStoreCfg =
ServerConfig
@@ -514,6 +568,14 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
msgsFileExists <- doesFileExist storeMsgsFilePath
storeLogExists <- doesFileExist storeLogFilePath
case mode of
#if defined(dbServerPostgres)
ASType SQSPostgres SMSPostgres
| msgsFileExists || msgsDirExists -> do
putStrLn $ "Error: " <> storeMsgsFilePath <> " file or " <> storeMsgsJournalDir <> " directory are present."
putStrLn "Configure memory storage."
exitFailure
| otherwise -> checkDbStorage ini storeLogExists
#endif
ASType qs SMSJournal
| msgsFileExists && msgsDirExists -> exitConfigureMsgStorage
| msgsFileExists -> do
@@ -526,28 +588,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
SQSMemory ->
unless (storeLogExists) $ putStrLn $ "store_queues is `memory`, " <> storeLogFilePath <> " file will be created."
#if defined(dbServerPostgres)
SQSPostgres -> do
let DBOpts {connstr, schema} = iniDBOptions ini defaultDBOpts
schemaExists <- checkSchemaExists connstr schema
case enableDbStoreLog' ini of
Just ()
| not schemaExists -> noDatabaseSchema connstr schema
| not storeLogExists -> do
putStrLn $ "Error: db_store_log is `on`, " <> storeLogFilePath <> " does not exist"
exitFailure
| otherwise -> pure ()
Nothing
| storeLogExists && schemaExists -> exitConfigureQueueStore connstr schema
| storeLogExists -> do
putStrLn $ "Error: store_queues is `database` with " <> storeLogFilePath <> " file present."
putStrLn "Set store_queues to `memory` or use `smp-server database import` to migrate."
exitFailure
| not schemaExists -> noDatabaseSchema connstr schema
| otherwise -> pure ()
where
noDatabaseSchema connstr schema = do
putStrLn $ "Error: store_queues is `database`, create schema " <> B.unpack schema <> " in PostgreSQL database " <> B.unpack connstr
exitFailure
SQSPostgres -> checkDbStorage ini storeLogExists
#else
SQSPostgres -> noPostgresExit
#endif
@@ -565,6 +606,29 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
exitFailure
#if defined(dbServerPostgres)
checkDbStorage ini storeLogExists = do
let DBOpts {connstr, schema} = iniDBOptions ini defaultDBOpts
schemaExists <- checkSchemaExists connstr schema
case enableDbStoreLog' ini of
Just ()
| not schemaExists -> noDatabaseSchema connstr schema
| not storeLogExists -> do
putStrLn $ "Error: db_store_log is `on`, " <> storeLogFilePath <> " does not exist"
exitFailure
| otherwise -> pure ()
Nothing
| storeLogExists && schemaExists -> exitConfigureQueueStore connstr schema
| storeLogExists -> do
putStrLn $ "Error: store_queues is `database` with " <> storeLogFilePath <> " file present."
putStrLn "Set store_queues to `memory` or use `smp-server database import` to migrate."
exitFailure
| not schemaExists -> noDatabaseSchema connstr schema
| otherwise -> pure ()
where
noDatabaseSchema connstr schema = do
putStrLn $ "Error: store_queues is `database`, create schema " <> B.unpack schema <> " in PostgreSQL database " <> B.unpack connstr
exitFailure
exitConfigureQueueStore connstr schema = do
putStrLn $ "Error: both " <> storeLogFilePath <> " file and " <> B.unpack schema <> " schema are present (database: " <> B.unpack connstr <> ")."
putStrLn "Configure queue storage."
@@ -585,6 +649,14 @@ importStoreLogToDatabase logPath storeLogFile dbOpts = do
renameFile storeLogFile $ storeLogFile <> ".bak"
pure (sCnt, qCnt)
importMessagesToDatabase :: FilePath -> DBOpts -> IO Int64
importMessagesToDatabase msgsLogFile dbOpts = do
let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL}
ms <- newMsgStore $ PostgresMsgStoreCfg storeCfg defaultMsgQueueQuota
mCnt <- batchInsertMessages True msgsLogFile $ queueStore ms
renameFile msgsLogFile $ msgsLogFile <> ".bak"
pure mCnt
exportDatabaseToStoreLog :: FilePath -> DBOpts -> FilePath -> IO (Int, Int)
exportDatabaseToStoreLog logPath dbOpts storeLogFilePath = do
let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL}
@@ -677,10 +749,22 @@ data CliCommand
| Start StartOptions
| Delete
| Journal StoreCmd
| Database StoreCmd DBOpts
| Database StoreCmd DatabaseTable DBOpts
data StoreCmd = SCImport | SCExport | SCDelete
data DatabaseTable = DTQueues | DTMessages
instance StrEncoding DatabaseTable where
strEncode = \case
DTQueues -> "queues"
DTMessages -> "messages"
strP =
A.takeTill (== ' ') >>= \case
"queues" -> pure DTQueues
"messages" -> pure DTMessages
_ -> fail "DatabaseTable"
cliCommandP :: FilePath -> FilePath -> FilePath -> Parser CliCommand
cliCommandP cfgPath logPath iniFile =
hsubparser
@@ -689,7 +773,7 @@ cliCommandP cfgPath logPath iniFile =
<> command "start" (info (Start <$> startOptionsP) (progDesc $ "Start server (configuration: " <> iniFile <> ")"))
<> command "delete" (info (pure Delete) (progDesc "Delete configuration and log files"))
<> command "journal" (info (Journal <$> journalCmdP) (progDesc "Import/export messages to/from journal storage"))
<> command "database" (info (Database <$> databaseCmdP <*> dbOptsP defaultDBOpts) (progDesc "Import/export queues to/from PostgreSQL database storage"))
<> command "database" (info (Database <$> databaseCmdP <*> dbTableP <*> dbOptsP defaultDBOpts) (progDesc "Import/export queues to/from PostgreSQL database storage"))
)
where
initP :: Parser InitOptions
@@ -843,6 +927,13 @@ cliCommandP cfgPath logPath iniFile =
<> command "export" (info (pure SCExport) (progDesc $ "Export " <> dest <> " to " <> src))
<> command "delete" (info (pure SCDelete) (progDesc $ "Delete " <> dest))
)
dbTableP =
option
strParse
( long "table"
<> help "Database tables: queues/messages"
<> metavar "TABLE"
)
parseBasicAuth :: ReadM ServerPassword
parseBasicAuth = eitherReader $ fmap ServerPassword . strDecode . B.pack
entityP :: String -> String -> String -> Parser (Maybe Entity, Maybe Text)

View File

@@ -24,7 +24,7 @@ module Simplex.Messaging.Server.MsgStore.Journal
( JournalMsgStore (random, expireBackupsBefore),
QStore (..),
QStoreCfg (..),
JournalQueue,
JournalQueue (msgQueue'), -- msgQueue' is used in tests
JournalMsgQueue (queue, state),
JMQueue (queueDirectory, statePath),
JournalStoreConfig (..),
@@ -291,13 +291,10 @@ newtype StoreIO (s :: QSType) a = StoreIO {unStoreIO :: IO a}
deriving newtype (Functor, Applicative, Monad)
instance StoreQueueClass (JournalQueue s) where
type MsgQueue (JournalQueue s) = JournalMsgQueue s
recipientId = recipientId'
{-# INLINE recipientId #-}
queueRec = queueRec'
{-# INLINE queueRec #-}
msgQueue = msgQueue'
{-# INLINE msgQueue #-}
withQueueLock :: JournalQueue s -> Text -> IO a -> IO a
withQueueLock JournalQueue {recipientId', queueLock, sharedLock} =
withLockWaitShared recipientId' queueLock sharedLock
@@ -379,6 +376,7 @@ makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do
instance MsgStoreClass (JournalMsgStore s) where
type StoreMonad (JournalMsgStore s) = StoreIO s
type MsgQueue (JournalMsgStore s) = JournalMsgQueue s
type QueueStore (JournalMsgStore s) = QStore s
type StoreQueue (JournalMsgStore s) = JournalQueue s
type MsgStoreConfig (JournalMsgStore s) = JournalStoreConfig s
@@ -422,7 +420,7 @@ instance MsgStoreClass (JournalMsgStore s) where
expireOldMessages :: Bool -> JournalMsgStore s -> Int64 -> Int64 -> IO MessageStats
expireOldMessages tty ms now ttl = case queueStore_ ms of
MQStore st ->
withLoadedQueues st $ \q -> run $ isolateQueue q "deleteExpiredMsgs" $ do
withLoadedQueues st $ \q -> run $ isolateQueue ms q "deleteExpiredMsgs" $ do
StoreIO (readTVarIO $ queueRec q) >>= \case
Just QueueRec {updatedAt = Just (RoundedSystemTime t)} | t > veryOld ->
expireQueueMsgs ms now old q
@@ -587,7 +585,7 @@ instance MsgStoreClass (JournalMsgStore s) where
| otherwise = pure []
writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do
writeMsg ms q' logState msg = isolateQueue ms q' "writeMsg" $ do
q <- getMsgQueue ms q' True
StoreIO $ (`E.finally` updateActiveAt q') $ do
st@MsgQueueState {canWrite, size} <- readTVarIO (state q)
@@ -661,8 +659,8 @@ instance MsgStoreClass (JournalMsgStore s) where
$>>= \len -> readTVarIO handles
$>>= \hs -> updateReadPos q mq logState len hs $> Just ()
isolateQueue :: JournalQueue s -> Text -> StoreIO s a -> ExceptT ErrorType IO a
isolateQueue sq op = tryStore' op (recipientId' sq) . withQueueLock sq op . unStoreIO
isolateQueue :: JournalMsgStore s -> JournalQueue s -> Text -> StoreIO s a -> ExceptT ErrorType IO a
isolateQueue _ sq op = tryStore' op (recipientId' sq) . withQueueLock sq op . unStoreIO
unsafeRunStore :: JournalQueue s -> Text -> StoreIO s a -> IO a
unsafeRunStore sq op a =
@@ -977,10 +975,11 @@ deleteQueue_ ms q =
pure r
where
rId = recipientId q
remove r@(_, mq_) = do
remove qr = do
mq_ <- atomically $ swapTVar (msgQueue' q) Nothing
mapM_ (closeMsgQueueHandles ms) mq_
removeQueueDirectory ms rId
pure r
pure (qr, mq_)
closeMsgQueue :: JournalMsgStore s -> JournalQueue s -> IO ()
closeMsgQueue ms JournalQueue {msgQueue'} = atomically (swapTVar msgQueue' Nothing) >>= mapM_ (closeMsgQueueHandles ms)

View File

@@ -0,0 +1,325 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
module Simplex.Messaging.Server.MsgStore.Postgres
( PostgresMsgStore,
PostgresMsgStoreCfg (..),
PostgresQueue,
exportDbMessages,
getDbMessageStats,
batchInsertMessages,
)
where
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Reader
import Control.Monad.Trans.Except
import qualified Data.ByteString as B
import qualified Data.ByteString.Builder as BB
import qualified Data.ByteString.Lazy as LB
import Data.Functor (($>))
import Data.IORef
import Data.Int (Int64)
import Data.List (intersperse)
import qualified Data.Map.Strict as M
import Data.Text (Text)
import Data.Time.Clock.System (SystemTime (..))
import Database.PostgreSQL.Simple (Binary (..), Only (..), (:.) (..))
import qualified Database.PostgreSQL.Simple as DB
import qualified Database.PostgreSQL.Simple.Copy as DB
import Database.PostgreSQL.Simple.SqlQQ (sql)
import Database.PostgreSQL.Simple.ToField (ToField (..))
import Simplex.Messaging.Agent.Store.Postgres.Common
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.MsgStore
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.Postgres
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.StoreLog (foldLogLines)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Util (maybeFirstRow, maybeFirstRow', (<$$>))
import System.IO (Handle, hFlush, stdout)
data PostgresMsgStore = PostgresMsgStore
{ config :: PostgresMsgStoreCfg,
queueStore_ :: PostgresQueueStore'
}
data PostgresMsgStoreCfg = PostgresMsgStoreCfg
{ queueStoreCfg :: PostgresStoreCfg,
quota :: Int
}
type PostgresQueueStore' = PostgresQueueStore PostgresQueue
data PostgresQueue = PostgresQueue
{ recipientId' :: RecipientId,
queueRec' :: TVar (Maybe QueueRec)
}
instance StoreQueueClass PostgresQueue where
recipientId = recipientId'
{-# INLINE recipientId #-}
queueRec = queueRec'
{-# INLINE queueRec #-}
withQueueLock PostgresQueue {} _ = id -- TODO [messages] maybe it's just transaction?
{-# INLINE withQueueLock #-}
newtype DBTransaction = DBTransaction {dbConn :: DB.Connection}
type DBStoreIO a = ReaderT DBTransaction IO a
instance MsgStoreClass PostgresMsgStore where
type StoreMonad PostgresMsgStore = ReaderT DBTransaction IO
type MsgQueue PostgresMsgStore = ()
type QueueStore PostgresMsgStore = PostgresQueueStore'
type StoreQueue PostgresMsgStore = PostgresQueue
type MsgStoreConfig PostgresMsgStore = PostgresMsgStoreCfg
newMsgStore :: PostgresMsgStoreCfg -> IO PostgresMsgStore
newMsgStore config = do
queueStore_ <- newQueueStore @PostgresQueue $ queueStoreCfg config
pure PostgresMsgStore {config, queueStore_}
closeMsgStore :: PostgresMsgStore -> IO ()
closeMsgStore = closeQueueStore @PostgresQueue . queueStore_
withActiveMsgQueues _ _ = error "withActiveMsgQueues not used"
unsafeWithAllMsgQueues _ _ _ = error "unsafeWithAllMsgQueues not used"
expireOldMessages :: Bool -> PostgresMsgStore -> Int64 -> Int64 -> IO MessageStats
expireOldMessages _tty ms now ttl =
maybeFirstRow' newMessageStats toMessageStats $ withConnection st $ \db ->
DB.query db "CALL expire_old_messages(?,?,0,0,0)" (now, ttl)
where
st = dbStore $ queueStore_ ms
toMessageStats (expiredMsgsCount, storedMsgsCount, storedQueues) =
MessageStats {expiredMsgsCount, storedMsgsCount, storedQueues}
logQueueStates _ = error "logQueueStates not used"
logQueueState _ = error "logQueueState not used"
queueStore = queueStore_
{-# INLINE queueStore #-}
loadedQueueCounts :: PostgresMsgStore -> IO LoadedQueueCounts
loadedQueueCounts ms = do
loadedQueueCount <- M.size <$> readTVarIO queues
loadedNotifierCount <- M.size <$> readTVarIO notifiers
notifierLockCount <- M.size <$> readTVarIO notifierLocks
pure LoadedQueueCounts {loadedQueueCount, loadedNotifierCount, openJournalCount = 0, queueLockCount = 0, notifierLockCount}
where
PostgresQueueStore {queues, notifiers, notifierLocks} = queueStore_ ms
mkQueue :: PostgresMsgStore -> Bool -> RecipientId -> QueueRec -> IO PostgresQueue
mkQueue _ _keepLock rId qr = PostgresQueue rId <$> newTVarIO (Just qr)
{-# INLINE mkQueue #-}
getMsgQueue _ _ _ = pure ()
{-# INLINE getMsgQueue #-}
getPeekMsgQueue :: PostgresMsgStore -> PostgresQueue -> DBStoreIO (Maybe ((), Message))
getPeekMsgQueue _ q = ((),) <$$> tryPeekMsg_ q ()
withIdleMsgQueue :: Int64 -> PostgresMsgStore -> PostgresQueue -> (() -> DBStoreIO a) -> DBStoreIO (Maybe a, Int)
withIdleMsgQueue _ _ _ _ = error "withIdleMsgQueue not used"
deleteQueue :: PostgresMsgStore -> PostgresQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms q = deleteStoreQueue (queueStore_ ms) q
{-# INLINE deleteQueue #-}
deleteQueueSize :: PostgresMsgStore -> PostgresQueue -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize ms q = runExceptT $ do
size <- getQueueSize ms q
qr <- ExceptT $ deleteStoreQueue (queueStore_ ms) q
pure (qr, size)
getQueueMessages_ _ _ _ = error "getQueueMessages_ not used"
writeMsg :: PostgresMsgStore -> PostgresQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms q _ msg =
withDB' "writeMsg" (queueStore_ ms) $ \db -> do
let (msgQuota, ntf, body) = case msg of
Message {msgFlags = MsgFlags ntf', msgBody = C.MaxLenBS body'} -> (False, ntf', body')
MessageQuota {} -> (True, False, B.empty)
toResult <$>
DB.query
db
"SELECT quota_written, was_empty FROM write_message(?,?,?,?,?,?,?)"
(recipientId' q, Binary (messageId msg), systemSeconds (messageTs msg), msgQuota, ntf, Binary body, quota)
where
toResult = \case
((msgQuota, wasEmpty) : _) -> if msgQuota then Nothing else Just (msg, wasEmpty)
[] -> Nothing
PostgresMsgStore {config = PostgresMsgStoreCfg {quota}} = ms
setOverQuota_ :: PostgresQueue -> IO () -- can ONLY be used while restoring messages, not while server running
setOverQuota_ _ = error "TODO setOverQuota_" -- TODO [messages]
getQueueSize_ :: () -> DBStoreIO Int
getQueueSize_ _ = error "getQueueSize_ not used"
getQueueSize :: PostgresMsgStore -> PostgresQueue -> ExceptT ErrorType IO Int
getQueueSize ms q =
withDB' "getQueueSize" (queueStore_ ms) $ \db ->
maybeFirstRow' 0 fromOnly $
DB.query db "SELECT msg_queue_size FROM msg_queues WHERE recipient_id = ? AND deleted_at IS NULL" (Only (recipientId' q))
tryPeekMsg_ :: PostgresQueue -> () -> DBStoreIO (Maybe Message)
tryPeekMsg_ q _ = do
db <- asks dbConn
liftIO $ maybeFirstRow toMessage $
DB.query
db
[sql|
SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
FROM messages
WHERE recipient_id = ?
ORDER BY message_id ASC LIMIT 1
|]
(Only (recipientId' q))
tryDeleteMsg_ :: PostgresQueue -> () -> Bool -> DBStoreIO ()
tryDeleteMsg_ _q _ _ = error "tryDeleteMsg_ not used" -- do
isolateQueue :: PostgresMsgStore -> PostgresQueue -> Text -> DBStoreIO a -> ExceptT ErrorType IO a
isolateQueue ms _q op a = withDB' op (queueStore_ ms) $ runReaderT a . DBTransaction
unsafeRunStore _ _ _ = error "unsafeRunStore not used"
tryPeekMsg :: PostgresMsgStore -> PostgresQueue -> ExceptT ErrorType IO (Maybe Message)
tryPeekMsg ms q = isolateQueue ms q "tryPeekMsg" $ tryPeekMsg_ q ()
{-# INLINE tryPeekMsg #-}
tryDelMsg :: PostgresMsgStore -> PostgresQueue -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg ms q msgId =
withDB' "tryDelMsg" (queueStore_ ms) $ \db ->
maybeFirstRow toMessage $
DB.query db "SELECT r_msg_id, r_msg_ts, r_msg_quota, r_msg_ntf_flag, r_msg_body FROM try_del_msg(?, ?)" (recipientId' q, Binary msgId)
tryDelPeekMsg :: PostgresMsgStore -> PostgresQueue -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
tryDelPeekMsg ms q msgId =
withDB' "tryDelPeekMsg" (queueStore_ ms) $ \db ->
toResult . map toMessage
<$> DB.query db "SELECT r_msg_id, r_msg_ts, r_msg_quota, r_msg_ntf_flag, r_msg_body FROM try_del_peek_msg(?, ?)" (recipientId' q, Binary msgId)
where
toResult = \case
[] -> (Nothing, Nothing)
[msg]
| messageId msg == msgId -> (Just msg, Nothing)
| otherwise -> (Nothing, Just msg)
deleted : next : _ -> (Just deleted, Just next)
deleteExpiredMsgs :: PostgresMsgStore -> PostgresQueue -> Int64 -> ExceptT ErrorType IO Int
deleteExpiredMsgs ms q old =
maybeFirstRow' 0 (fromIntegral @Int64 . fromOnly) $ withDB' "deleteExpiredMsgs" (queueStore_ ms) $ \db ->
DB.query db "SELECT delete_expired_msgs(?, ?)" (recipientId' q, old)
toMessage :: (Binary MsgId, Int64, Bool, Bool, Binary MsgBody) -> Message
toMessage (Binary msgId, ts, msgQuota, ntf, Binary body)
| msgQuota = MessageQuota {msgId, msgTs}
| otherwise = Message {msgId, msgTs, msgFlags = MsgFlags ntf, msgBody = C.unsafeMaxLenBS body} -- TODO [messages] unsafeMaxLenBS?
where
msgTs = MkSystemTime ts 0
exportDbMessages :: Bool -> PostgresMsgStore -> Handle -> IO Int
exportDbMessages tty ms h = do
rows <- newIORef []
n <- withConnection st $ \db -> DB.foldWithOptions_ opts db query 0 $ \i r -> do
let i' = i + 1
if i' `mod` 1000 > 0
then modifyIORef rows (r :)
else do
readIORef rows >>= writeMessages . (r :)
writeIORef rows []
when tty $ putStr (progress i' <> "\r") >> hFlush stdout
pure i'
readIORef rows >>= \rs -> unless (null rs) $ writeMessages rs
when tty $ putStrLn $ progress n
pure n
where
st = dbStore $ queueStore_ ms
opts = DB.defaultFoldOptions {DB.fetchQuantity = DB.Fixed 1000}
query =
[sql|
SELECT recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
FROM messages
ORDER BY recipient_id, message_id ASC
|]
writeMessages = BB.hPutBuilder h . encodeMessages . reverse
encodeMessages = mconcat . map (\(Only rId :. msg) -> BB.byteString (strEncode $ MLRv3 rId $ toMessage msg) <> BB.char8 '\n')
progress i = "Processed: " <> show i <> " records"
getDbMessageStats :: PostgresMsgStore -> IO MessageStats
getDbMessageStats ms =
maybeFirstRow' newMessageStats toMessageStats $ withConnection st $ \db ->
DB.query_
db
[sql|
SELECT
(SELECT COUNT (1) FROM msg_queues WHERE deleted_at IS NULL),
(SELECT COUNT (1) FROM messages m JOIN msg_queues q USING recipient_id WHERE deleted_at IS NULL)
|]
where
st = dbStore $ queueStore_ ms
toMessageStats (storedQueues, storedMsgsCount) =
MessageStats {storedQueues, storedMsgsCount, expiredMsgsCount = 0}
-- TODO [messages] update counts
batchInsertMessages :: StoreQueueClass q => Bool -> FilePath -> PostgresQueueStore q -> IO Int64
batchInsertMessages tty f toStore = do
putStrLn "Importing messages..."
let st = dbStore toStore
(_, inserted) <-
withTransaction st $ \db -> do
DB.copy_
db
[sql|
COPY messages (recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body)
FROM STDIN WITH (FORMAT CSV)
|]
foldLogLines tty f (putMessage db) (0 :: Int, 0) >>= (DB.putCopyEnd db $>)
Only mCnt : _ <- withTransaction st (`DB.query_` "SELECT count(*) FROM messages")
unless (inserted == mCnt) $ putStrLn $ "WARNING: inserted " <> show inserted <> " rows, table has " <> show mCnt <> " messages."
pure inserted
where
putMessage db (!i, !cnt) _eof s = do
let i' = i + 1
cnt' <- case strDecode s of
Right (MLRv3 rId msg) -> (cnt + 1) <$ DB.putCopyData db (messageRecToText rId msg)
Left e -> cnt <$ putStrLn ("Error parsing line " <> show i' <> ": " <> e)
pure (i', cnt')
messageRecToText :: RecipientId -> Message -> B.ByteString
messageRecToText rId msg =
LB.toStrict $ BB.toLazyByteString $ mconcat tabFields <> BB.char7 '\n'
where
tabFields = BB.char7 ',' `intersperse` fields
fields =
[ renderField (toField rId),
renderField (toField $ Binary (messageId msg)),
renderField (toField $ systemSeconds (messageTs msg)),
renderField (toField msgQuota),
renderField (toField ntf),
renderField (toField $ Binary body)
]
(msgQuota, ntf, body) = case msg of
Message {msgFlags = MsgFlags ntf', msgBody = C.MaxLenBS body'} -> (False, ntf', body')
MessageQuota {} -> (True, False, B.empty)

View File

@@ -57,18 +57,16 @@ data STMStoreConfig = STMStoreConfig
}
instance StoreQueueClass STMQueue where
type MsgQueue STMQueue = STMMsgQueue
recipientId = recipientId'
{-# INLINE recipientId #-}
queueRec = queueRec'
{-# INLINE queueRec #-}
msgQueue = msgQueue'
{-# INLINE msgQueue #-}
withQueueLock _ _ = id
{-# INLINE withQueueLock #-}
instance MsgStoreClass STMMsgStore where
type StoreMonad STMMsgStore = STM
type MsgQueue STMMsgStore = STMMsgQueue
type QueueStore STMMsgStore = STMQueueStore STMQueue
type StoreQueue STMMsgStore = STMQueue
type MsgStoreConfig STMMsgStore = STMStoreConfig
@@ -129,10 +127,10 @@ instance MsgStoreClass STMMsgStore where
Nothing -> pure (Nothing, 0)
deleteQueue :: STMMsgStore -> STMQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms q = fst <$$> deleteStoreQueue (queueStore_ ms) q
deleteQueue ms q = fst <$$> deleteQueue_ ms q
deleteQueueSize :: STMMsgStore -> STMQueue -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize ms q = deleteStoreQueue (queueStore_ ms) q >>= mapM (traverse getSize)
deleteQueueSize ms q = deleteQueue_ ms q >>= mapM (traverse getSize)
-- traverse operates on the second tuple element
where
getSize = maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size)
@@ -179,10 +177,15 @@ instance MsgStoreClass STMMsgStore where
Just _ -> modifyTVar' size (subtract 1)
_ -> pure ()
isolateQueue :: STMQueue -> Text -> STM a -> ExceptT ErrorType IO a
isolateQueue _ _ = liftIO . atomically
isolateQueue :: STMMsgStore -> STMQueue -> Text -> STM a -> ExceptT ErrorType IO a
isolateQueue _ _ _ = liftIO . atomically
{-# INLINE isolateQueue #-}
unsafeRunStore :: STMQueue -> Text -> STM a -> IO a
unsafeRunStore _ _ = atomically
{-# INLINE unsafeRunStore #-}
deleteQueue_ :: STMMsgStore -> STMQueue -> IO (Either ErrorType (QueueRec, Maybe STMMsgQueue))
deleteQueue_ ms q = deleteStoreQueue (queueStore_ ms) q >>= mapM remove
where
remove qr = (qr,) <$> atomically (swapTVar (msgQueue' q) Nothing)

View File

@@ -34,6 +34,7 @@ import Simplex.Messaging.Util ((<$$>), ($>>=))
class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => MsgStoreClass s where
type StoreMonad s = (m :: Type -> Type) | m -> s
type MsgStoreConfig s = c | c -> s
type MsgQueue s = q | q -> s
type StoreQueue s = q | q -> s
type QueueStore s = qs | qs -> s
newMsgStore :: MsgStoreConfig s -> IO s
@@ -51,29 +52,62 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M
-- message store methods
mkQueue :: s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue (StoreQueue s))
getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue (StoreQueue s), Message))
getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))
-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
withIdleMsgQueue :: Int64 -> s -> StoreQueue s -> (MsgQueue (StoreQueue s) -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
withIdleMsgQueue :: Int64 -> s -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
deleteQueue :: s -> StoreQueue s -> IO (Either ErrorType QueueRec)
deleteQueueSize :: s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
getQueueMessages_ :: Bool -> StoreQueue s -> MsgQueue (StoreQueue s) -> StoreMonad s [Message]
getQueueMessages_ :: Bool -> StoreQueue s -> MsgQueue s -> StoreMonad s [Message]
writeMsg :: s -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running
getQueueSize_ :: MsgQueue (StoreQueue s) -> StoreMonad s Int
tryPeekMsg_ :: StoreQueue s -> MsgQueue (StoreQueue s) -> StoreMonad s (Maybe Message)
tryDeleteMsg_ :: StoreQueue s -> MsgQueue (StoreQueue s) -> Bool -> StoreMonad s ()
isolateQueue :: StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
getQueueSize_ :: MsgQueue s -> StoreMonad s Int
tryPeekMsg_ :: StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
tryDeleteMsg_ :: StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
isolateQueue :: s -> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
unsafeRunStore :: StoreQueue s -> Text -> StoreMonad s a -> IO a
data MSType = MSMemory | MSJournal
-- default implementations are overridden for PostgreSQL storage of messages
tryPeekMsg :: s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
tryPeekMsg st q = snd <$$> withPeekMsgQueue st q "tryPeekMsg" pure
{-# INLINE tryPeekMsg #-}
tryDelMsg :: s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg st q msgId' =
withPeekMsgQueue st q "tryDelMsg" $
maybe (pure Nothing) $ \(mq, msg) ->
if
| messageId msg == msgId' ->
tryDeleteMsg_ q mq True $> Just msg
| otherwise -> pure Nothing
-- atomic delete (== read) last and peek next message if available
tryDelPeekMsg :: s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
tryDelPeekMsg st q msgId' =
withPeekMsgQueue st q "tryDelPeekMsg" $
maybe (pure (Nothing, Nothing)) $ \(mq, msg) ->
if
| messageId msg == msgId' -> (Just msg,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq)
| otherwise -> pure (Nothing, Just msg)
deleteExpiredMsgs :: s -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
deleteExpiredMsgs st q old =
isolateQueue st q "deleteExpiredMsgs" $
getMsgQueue st q False >>= deleteExpireMsgs_ old q
getQueueSize :: s -> StoreQueue s -> ExceptT ErrorType IO Int
getQueueSize st q = withPeekMsgQueue st q "getQueueSize" $ maybe (pure 0) (getQueueSize_ . fst)
{-# INLINE getQueueSize #-}
data MSType = MSMemory | MSJournal | MSPostgres
data QSType = QSMemory | QSPostgres
data SMSType :: MSType -> Type where
SMSMemory :: SMSType 'MSMemory
SMSJournal :: SMSType 'MSJournal
SMSPostgres :: SMSType 'MSPostgres
data SQSType :: QSType -> Type where
SQSMemory :: SQSType 'QSMemory
@@ -127,48 +161,19 @@ readQueueRec :: StoreQueueClass q => q -> IO (Either ErrorType (q, QueueRec))
readQueueRec q = maybe (Left AUTH) (Right . (q,)) <$> readTVarIO (queueRec q)
{-# INLINE readQueueRec #-}
getQueueSize :: MsgStoreClass s => s -> StoreQueue s -> ExceptT ErrorType IO Int
getQueueSize st q = withPeekMsgQueue st q "getQueueSize" $ maybe (pure 0) (getQueueSize_ . fst)
{-# INLINE getQueueSize #-}
tryPeekMsg :: MsgStoreClass s => s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
tryPeekMsg st q = snd <$$> withPeekMsgQueue st q "tryPeekMsg" pure
{-# INLINE tryPeekMsg #-}
tryDelMsg :: MsgStoreClass s => s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg st q msgId' =
withPeekMsgQueue st q "tryDelMsg" $
maybe (pure Nothing) $ \(mq, msg) ->
if
| messageId msg == msgId' ->
tryDeleteMsg_ q mq True $> Just msg
| otherwise -> pure Nothing
-- atomic delete (== read) last and peek next message if available
tryDelPeekMsg :: MsgStoreClass s => s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
tryDelPeekMsg st q msgId' =
withPeekMsgQueue st q "tryDelPeekMsg" $
maybe (pure (Nothing, Nothing)) $ \(mq, msg) ->
if
| messageId msg == msgId' -> (Just msg,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq)
| otherwise -> pure (Nothing, Just msg)
-- The action is called with Nothing when it is known that the queue is empty
withPeekMsgQueue :: MsgStoreClass s => s -> StoreQueue s -> Text -> (Maybe (MsgQueue (StoreQueue s), Message) -> StoreMonad s a) -> ExceptT ErrorType IO a
withPeekMsgQueue st q op a = isolateQueue q op $ getPeekMsgQueue st q >>= a
withPeekMsgQueue :: MsgStoreClass s => s -> StoreQueue s -> Text -> (Maybe (MsgQueue s, Message) -> StoreMonad s a) -> ExceptT ErrorType IO a
withPeekMsgQueue st q op a = isolateQueue st q op $ getPeekMsgQueue st q >>= a
{-# INLINE withPeekMsgQueue #-}
deleteExpiredMsgs :: MsgStoreClass s => s -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
deleteExpiredMsgs st q old =
isolateQueue q "deleteExpiredMsgs" $
getMsgQueue st q False >>= deleteExpireMsgs_ old q
-- not used with PostgreSQL message store
expireQueueMsgs :: MsgStoreClass s => s -> Int64 -> Int64 -> StoreQueue s -> StoreMonad s MessageStats
expireQueueMsgs st now old q = do
(expired_, stored) <- withIdleMsgQueue now st q $ deleteExpireMsgs_ old q
pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1}
deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue (StoreQueue s) -> StoreMonad s Int
-- not used with PostgreSQL message store
deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ old q mq = do
n <- loop 0
logQueueState q

View File

@@ -28,7 +28,10 @@ module Simplex.Messaging.Server.QueueStore.Postgres
foldRecentQueueRecs,
handleDuplicate,
withLog_,
withDB,
withDB',
assertUpdated,
renderField,
)
where
@@ -84,7 +87,7 @@ import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPServiceRole (..))
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>))
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, maybeFirstRow', tshow, (<$$>))
import System.Exit (exitFailure)
import System.IO (IOMode (..), hFlush, stdout)
import UnliftIO.STM
@@ -409,7 +412,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
rId = recipientId sq
-- this method is called from JournalMsgStore deleteQueue that already locks the queue
deleteStoreQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q)))
deleteStoreQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType QueueRec)
deleteStoreQueue st sq = E.uninterruptibleMask_ $ runExceptT $ do
q <- ExceptT $ readQueueRecIO qr
RoundedSystemTime ts <- liftIO getSystemDate
@@ -420,9 +423,8 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
forM_ (notifier q) $ \NtfCreds {notifierId} -> do
atomically $ TM.delete notifierId $ notifiers st
atomically $ TM.delete notifierId $ notifierLocks st
mq_ <- atomically $ swapTVar (msgQueue sq) Nothing
withLog "deleteStoreQueue" st (`logDeleteQueue` rId)
pure (q, mq_)
pure q
where
rId = recipientId sq
qr = queueRec sq
@@ -488,7 +490,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
getServiceQueueCount :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
getServiceQueueCount st party serviceId =
E.uninterruptibleMask_ $ runExceptT $ withDB' "getServiceQueueCount" st $ \db ->
fmap (fromMaybe 0) $ maybeFirstRow fromOnly $
maybeFirstRow' 0 fromOnly $
DB.query db query (Only serviceId)
where
query = case party of
@@ -641,13 +643,14 @@ queueRecToText (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey,
(linkId_, queueData_) = queueDataColumns queueData
nullable :: ToField a => Maybe a -> Builder
nullable = maybe mempty (renderField . toField)
renderField :: Action -> Builder
renderField = \case
Plain bld -> bld
Escape s -> BB.byteString s
EscapeByteA s -> BB.string7 "\\x" <> BB.byteStringHex s
EscapeIdentifier s -> BB.byteString s -- Not used in COPY data
Many as -> mconcat (map renderField as)
renderField :: Action -> Builder
renderField = \case
Plain bld -> bld
Escape s -> BB.byteString s
EscapeByteA s -> BB.string7 "\\x" <> BB.byteStringHex s
EscapeIdentifier s -> BB.byteString s -- Not used in COPY data
Many as -> mconcat (map renderField as)
queueDataColumns :: Maybe (LinkId, QueueLinkData) -> (Maybe LinkId, Maybe QueueLinkData)
queueDataColumns = \case

View File

@@ -14,7 +14,8 @@ serverSchemaMigrations =
[ ("20250207_initial", m20250207_initial, Nothing),
("20250319_updated_index", m20250319_updated_index, Just down_m20250319_updated_index),
("20250320_short_links", m20250320_short_links, Just down_m20250320_short_links),
("20250514_service_certs", m20250514_service_certs, Just down_m20250514_service_certs)
("20250514_service_certs", m20250514_service_certs, Just down_m20250514_service_certs),
("20250903_store_messages", m20250903_store_messages, Just down_m20250903_store_messages)
]
-- | The list of migrations in ascending order by date
@@ -159,3 +160,239 @@ DROP INDEX idx_services_service_role;
DROP TABLE services;
|]
m20250903_store_messages :: Text
m20250903_store_messages =
T.pack
[r|
CREATE TABLE messages(
message_id BIGINT NOT NULL PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
recipient_id BYTEA NOT NULL REFERENCES msg_queues ON DELETE CASCADE ON UPDATE RESTRICT,
msg_id BYTEA NOT NULL,
msg_ts BIGINT NOT NULL,
msg_quota BOOLEAN NOT NULL,
msg_ntf_flag BOOLEAN NOT NULL,
msg_body BYTEA NOT NULL
);
ALTER TABLE msg_queues
ADD COLUMN msg_can_write BOOLEAN NOT NULL DEFAULT TRUE,
ADD COLUMN msg_queue_size BIGINT NOT NULL DEFAULT 0;
CREATE INDEX idx_messages_recipient_id_message_id ON messages (recipient_id, message_id);
CREATE INDEX idx_messages_recipient_id_msg_ts on messages(recipient_id, msg_ts);
CREATE INDEX idx_messages_recipient_id_msg_quota on messages(recipient_id, msg_quota);
CREATE FUNCTION write_message(
p_recipient_id BYTEA,
p_msg_id BYTEA,
p_msg_ts BIGINT,
p_msg_quota BOOLEAN,
p_msg_ntf_flag BOOLEAN,
p_msg_body BYTEA,
p_quota INT
)
RETURNS TABLE (quota_written BOOLEAN, was_empty BOOLEAN)
LANGUAGE plpgsql AS $$
DECLARE
q_can_write BOOLEAN;
q_size BIGINT;
BEGIN
SELECT msg_can_write, msg_queue_size INTO q_can_write, q_size
FROM msg_queues
WHERE recipient_id = p_recipient_id AND deleted_at IS NULL
FOR UPDATE;
IF q_can_write OR q_size = 0 THEN
quota_written := p_msg_quota OR q_size >= p_quota;
was_empty := q_size = 0;
INSERT INTO messages(recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body)
VALUES (p_recipient_id, p_msg_id, p_msg_ts, quota_written, p_msg_ntf_flag AND NOT quota_written, CASE WHEN quota_written THEN '' :: BYTEA ELSE p_msg_body END);
UPDATE msg_queues
SET msg_can_write = NOT quota_written,
msg_queue_size = msg_queue_size + 1
WHERE recipient_id = p_recipient_id;
RETURN QUERY VALUES (quota_written, was_empty);
END IF;
END;
$$;
CREATE FUNCTION try_del_msg(p_recipient_id BYTEA, p_msg_id BYTEA)
RETURNS TABLE (r_msg_id BYTEA, r_msg_ts BIGINT, r_msg_quota BOOLEAN, r_msg_ntf_flag BOOLEAN, r_msg_body BYTEA)
LANGUAGE plpgsql AS $$
DECLARE
q_size BIGINT;
msg RECORD;
BEGIN
SELECT msg_queue_size INTO q_size
FROM msg_queues
WHERE recipient_id = p_recipient_id AND deleted_at IS NULL
FOR UPDATE;
IF FOUND THEN
SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg
FROM messages
WHERE recipient_id = p_recipient_id
ORDER BY message_id ASC LIMIT 1;
IF FOUND AND msg.msg_id = p_msg_id THEN
DELETE FROM messages WHERE message_id = msg.message_id;
IF FOUND THEN
CALL dec_msg_count(p_recipient_id, q_size, 1);
RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
END IF;
END IF;
END IF;
END;
$$;
CREATE FUNCTION try_del_peek_msg(p_recipient_id BYTEA, p_msg_id BYTEA)
RETURNS TABLE (r_msg_id BYTEA, r_msg_ts BIGINT, r_msg_quota BOOLEAN, r_msg_ntf_flag BOOLEAN, r_msg_body BYTEA)
LANGUAGE plpgsql AS $$
DECLARE
q_size BIGINT;
msg RECORD;
BEGIN
SELECT msg_queue_size INTO q_size
FROM msg_queues
WHERE recipient_id = p_recipient_id AND deleted_at IS NULL
FOR UPDATE;
IF FOUND THEN
SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg
FROM messages
WHERE recipient_id = p_recipient_id
ORDER BY message_id ASC LIMIT 1;
IF FOUND THEN
IF msg.msg_id = p_msg_id THEN
DELETE FROM messages WHERE message_id = msg.message_id;
IF FOUND THEN
CALL dec_msg_count(p_recipient_id, q_size, 1);
RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
END IF;
RETURN QUERY (
SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
FROM messages
WHERE recipient_id = p_recipient_id
ORDER BY message_id ASC LIMIT 1
);
ELSE
RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
END IF;
END IF;
END IF;
END;
$$;
CREATE FUNCTION delete_expired_msgs(p_recipient_id BYTEA, p_old_ts BIGINT) RETURNS BIGINT
LANGUAGE plpgsql AS $$
DECLARE
q_size BIGINT;
min_id BIGINT;
del_count BIGINT;
BEGIN
SELECT msg_queue_size INTO q_size
FROM msg_queues
WHERE recipient_id = p_recipient_id AND deleted_at IS NULL
FOR UPDATE SKIP LOCKED;
IF NOT FOUND OR q_size = 0 THEN
RETURN 0;
END IF;
SELECT LEAST( -- ignores NULLs
(SELECT MIN(message_id) FROM messages WHERE recipient_id = p_recipient_id AND msg_ts >= p_old_ts),
(SELECT MIN(message_id) FROM messages WHERE recipient_id = p_recipient_id AND msg_quota = TRUE)
) INTO min_id;
IF min_id IS NULL THEN
DELETE FROM messages WHERE recipient_id = p_recipient_id;
ELSE
DELETE FROM messages WHERE recipient_id = p_recipient_id AND message_id < min_id;
END IF;
GET DIAGNOSTICS del_count = ROW_COUNT;
IF del_count > 0 THEN
CALL dec_msg_count(p_recipient_id, q_size, del_count);
END IF;
RETURN del_count;
END;
$$;
CREATE PROCEDURE expire_old_messages(
p_now_ts BIGINT,
p_ttl BIGINT,
OUT r_expired_msgs_count BIGINT,
OUT r_stored_msgs_count BIGINT,
OUT r_stored_queues BIGINT
)
LANGUAGE plpgsql AS $$
DECLARE
old_ts BIGINT := p_now_ts - p_ttl;
very_old_ts BIGINT := p_now_ts - 2 * p_ttl - 86400;
rid BYTEA;
min_id BIGINT;
q_size BIGINT;
del_count BIGINT;
total_deleted BIGINT := 0;
BEGIN
FOR rid IN
SELECT recipient_id
FROM msg_queues
WHERE deleted_at IS NULL AND updated_at > very_old_ts
LOOP
BEGIN -- sub-transaction for each queue
del_count := delete_expired_msgs(rid, old_ts);
total_deleted := total_deleted + del_count;
EXCEPTION WHEN OTHERS THEN
ROLLBACK;
RAISE WARNING 'STORE, expire_old_messages, error expiring queue %: %', encode(rid, 'base64'), SQLERRM;
CONTINUE;
END;
COMMIT;
END LOOP;
r_expired_msgs_count := total_deleted;
r_stored_msgs_count := (SELECT COUNT(1) FROM messages);
r_stored_queues := (SELECT COUNT(1) FROM msg_queues WHERE deleted_at IS NULL);
END;
$$;
CREATE PROCEDURE dec_msg_count(p_recipient_id BYTEA, p_size BIGINT, p_change BIGINT)
LANGUAGE plpgsql AS $$
BEGIN
UPDATE msg_queues
SET msg_can_write = msg_can_write OR p_size <= p_change,
msg_queue_size = GREATEST(p_size - p_change, 0)
WHERE recipient_id = p_recipient_id;
END;
$$;
|]
down_m20250903_store_messages :: Text
down_m20250903_store_messages =
T.pack
[r|
DROP FUNCTION write_message;
DROP FUNCTION try_del_msg;
DROP FUNCTION try_del_peek_msg;
DROP FUNCTION delete_expired_msgs;
DROP PROCEDURE expire_old_messages;
DROP PROCEDURE dec_msg_count;
DROP INDEX idx_messages_recipient_id_message_id;
DROP INDEX idx_messages_recipient_id_msg_ts;
DROP INDEX idx_messages_recipient_id_msg_quota;
ALTER TABLE msg_queues
DROP COLUMN msg_can_write,
DROP COLUMN msg_queue_size;
DROP TABLE messages;
|]

View File

@@ -15,9 +15,224 @@ SET row_security = off;
CREATE SCHEMA smp_server;
CREATE PROCEDURE smp_server.dec_msg_count(IN p_recipient_id bytea, IN p_size bigint, IN p_change bigint)
LANGUAGE plpgsql
AS $$
BEGIN
UPDATE msg_queues
SET msg_can_write = msg_can_write OR p_size <= p_change,
msg_queue_size = GREATEST(p_size - p_change, 0)
WHERE recipient_id = p_recipient_id;
END;
$$;
CREATE FUNCTION smp_server.delete_expired_msgs(p_recipient_id bytea, p_old_ts bigint) RETURNS bigint
LANGUAGE plpgsql
AS $$
DECLARE
q_size BIGINT;
min_id BIGINT;
del_count BIGINT;
BEGIN
SELECT msg_queue_size INTO q_size
FROM msg_queues
WHERE recipient_id = p_recipient_id AND deleted_at IS NULL
FOR UPDATE SKIP LOCKED;
IF NOT FOUND OR q_size = 0 THEN
RETURN 0;
END IF;
SELECT LEAST( -- ignores NULLs
(SELECT MIN(message_id) FROM messages WHERE recipient_id = p_recipient_id AND msg_ts >= p_old_ts),
(SELECT MIN(message_id) FROM messages WHERE recipient_id = p_recipient_id AND msg_quota = TRUE)
) INTO min_id;
IF min_id IS NULL THEN
DELETE FROM messages WHERE recipient_id = p_recipient_id;
ELSE
DELETE FROM messages WHERE recipient_id = p_recipient_id AND message_id < min_id;
END IF;
GET DIAGNOSTICS del_count = ROW_COUNT;
IF del_count > 0 THEN
CALL dec_msg_count(p_recipient_id, q_size, del_count);
END IF;
RETURN del_count;
END;
$$;
CREATE PROCEDURE smp_server.expire_old_messages(IN p_now_ts bigint, IN p_ttl bigint, OUT r_expired_msgs_count bigint, OUT r_stored_msgs_count bigint, OUT r_stored_queues bigint)
LANGUAGE plpgsql
AS $$
DECLARE
old_ts BIGINT := p_now_ts - p_ttl;
very_old_ts BIGINT := p_now_ts - 2 * p_ttl - 86400;
rid BYTEA;
min_id BIGINT;
q_size BIGINT;
del_count BIGINT;
total_deleted BIGINT := 0;
BEGIN
FOR rid IN
SELECT recipient_id
FROM msg_queues
WHERE deleted_at IS NULL AND updated_at > very_old_ts
LOOP
BEGIN -- sub-transaction for each queue
del_count := delete_expired_msgs(rid, old_ts);
total_deleted := total_deleted + del_count;
EXCEPTION WHEN OTHERS THEN
ROLLBACK;
RAISE WARNING 'STORE, expire_old_messages, error expiring queue %: %', encode(rid, 'base64'), SQLERRM;
CONTINUE;
END;
COMMIT;
END LOOP;
r_expired_msgs_count := total_deleted;
r_stored_msgs_count := (SELECT COUNT(1) FROM messages);
r_stored_queues := (SELECT COUNT(1) FROM msg_queues WHERE deleted_at IS NULL);
END;
$$;
CREATE FUNCTION smp_server.try_del_msg(p_recipient_id bytea, p_msg_id bytea) RETURNS TABLE(r_msg_id bytea, r_msg_ts bigint, r_msg_quota boolean, r_msg_ntf_flag boolean, r_msg_body bytea)
LANGUAGE plpgsql
AS $$
DECLARE
q_size BIGINT;
msg RECORD;
BEGIN
SELECT msg_queue_size INTO q_size
FROM msg_queues
WHERE recipient_id = p_recipient_id AND deleted_at IS NULL
FOR UPDATE;
IF FOUND THEN
SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg
FROM messages
WHERE recipient_id = p_recipient_id
ORDER BY message_id ASC LIMIT 1;
IF FOUND AND msg.msg_id = p_msg_id THEN
DELETE FROM messages WHERE message_id = msg.message_id;
IF FOUND THEN
CALL dec_msg_count(p_recipient_id, q_size, 1);
RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
END IF;
END IF;
END IF;
END;
$$;
CREATE FUNCTION smp_server.try_del_peek_msg(p_recipient_id bytea, p_msg_id bytea) RETURNS TABLE(r_msg_id bytea, r_msg_ts bigint, r_msg_quota boolean, r_msg_ntf_flag boolean, r_msg_body bytea)
LANGUAGE plpgsql
AS $$
DECLARE
q_size BIGINT;
msg RECORD;
BEGIN
SELECT msg_queue_size INTO q_size
FROM msg_queues
WHERE recipient_id = p_recipient_id AND deleted_at IS NULL
FOR UPDATE;
IF FOUND THEN
SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg
FROM messages
WHERE recipient_id = p_recipient_id
ORDER BY message_id ASC LIMIT 1;
IF FOUND THEN
IF msg.msg_id = p_msg_id THEN
DELETE FROM messages WHERE message_id = msg.message_id;
IF FOUND THEN
CALL dec_msg_count(p_recipient_id, q_size, 1);
RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
END IF;
RETURN QUERY (
SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
FROM messages
WHERE recipient_id = p_recipient_id
ORDER BY message_id ASC LIMIT 1
);
ELSE
RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body);
END IF;
END IF;
END IF;
END;
$$;
CREATE FUNCTION smp_server.write_message(p_recipient_id bytea, p_msg_id bytea, p_msg_ts bigint, p_msg_quota boolean, p_msg_ntf_flag boolean, p_msg_body bytea, p_quota integer) RETURNS TABLE(quota_written boolean, was_empty boolean)
LANGUAGE plpgsql
AS $$
DECLARE
q_can_write BOOLEAN;
q_size BIGINT;
BEGIN
SELECT msg_can_write, msg_queue_size INTO q_can_write, q_size
FROM msg_queues
WHERE recipient_id = p_recipient_id AND deleted_at IS NULL
FOR UPDATE;
IF q_can_write OR q_size = 0 THEN
quota_written := p_msg_quota OR q_size >= p_quota;
was_empty := q_size = 0;
INSERT INTO messages(recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body)
VALUES (p_recipient_id, p_msg_id, p_msg_ts, quota_written, p_msg_ntf_flag AND NOT quota_written, CASE WHEN quota_written THEN '' :: BYTEA ELSE p_msg_body END);
UPDATE msg_queues
SET msg_can_write = NOT quota_written,
msg_queue_size = msg_queue_size + 1
WHERE recipient_id = p_recipient_id;
RETURN QUERY VALUES (quota_written, was_empty);
END IF;
END;
$$;
SET default_table_access_method = heap;
CREATE TABLE smp_server.messages (
message_id bigint NOT NULL,
recipient_id bytea NOT NULL,
msg_id bytea NOT NULL,
msg_ts bigint NOT NULL,
msg_quota boolean NOT NULL,
msg_ntf_flag boolean NOT NULL,
msg_body bytea NOT NULL
);
ALTER TABLE smp_server.messages ALTER COLUMN message_id ADD GENERATED ALWAYS AS IDENTITY (
SEQUENCE NAME smp_server.messages_message_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1
);
CREATE TABLE smp_server.migrations (
name text NOT NULL,
ts timestamp without time zone NOT NULL,
@@ -43,7 +258,9 @@ CREATE TABLE smp_server.msg_queues (
fixed_data bytea,
user_data bytea,
rcv_service_id bytea,
ntf_service_id bytea
ntf_service_id bytea,
msg_can_write boolean DEFAULT true NOT NULL,
msg_queue_size bigint DEFAULT 0 NOT NULL
);
@@ -58,6 +275,11 @@ CREATE TABLE smp_server.services (
ALTER TABLE ONLY smp_server.messages
ADD CONSTRAINT messages_pkey PRIMARY KEY (message_id);
ALTER TABLE ONLY smp_server.migrations
ADD CONSTRAINT migrations_pkey PRIMARY KEY (name);
@@ -78,6 +300,18 @@ ALTER TABLE ONLY smp_server.services
CREATE INDEX idx_messages_recipient_id_message_id ON smp_server.messages USING btree (recipient_id, message_id);
CREATE INDEX idx_messages_recipient_id_msg_quota ON smp_server.messages USING btree (recipient_id, msg_quota);
CREATE INDEX idx_messages_recipient_id_msg_ts ON smp_server.messages USING btree (recipient_id, msg_ts);
CREATE UNIQUE INDEX idx_msg_queues_link_id ON smp_server.msg_queues USING btree (link_id);
@@ -106,6 +340,11 @@ CREATE INDEX idx_services_service_role ON smp_server.services USING btree (servi
ALTER TABLE ONLY smp_server.messages
ADD CONSTRAINT messages_recipient_id_fkey FOREIGN KEY (recipient_id) REFERENCES smp_server.msg_queues(recipient_id) ON UPDATE RESTRICT ON DELETE CASCADE;
ALTER TABLE ONLY smp_server.msg_queues
ADD CONSTRAINT msg_queues_ntf_service_id_fkey FOREIGN KEY (ntf_service_id) REFERENCES smp_server.services(service_id) ON UPDATE RESTRICT ON DELETE SET NULL;

View File

@@ -228,7 +228,7 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
deleteQueueNotifier :: STMQueueStore q -> q -> IO (Either ErrorType (Maybe NtfCreds))
deleteQueueNotifier st sq =
withQueueRec qr delete
$>>= \nc_ -> nc_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId sq)
$>>= (<$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId sq))
where
qr = queueRec sq
delete q = forM (notifier q) $ \nc -> do
@@ -264,11 +264,10 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
| changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId sq) t)
| otherwise = pure $ Right q
deleteStoreQueue :: STMQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q)))
deleteStoreQueue :: STMQueueStore q -> q -> IO (Either ErrorType QueueRec)
deleteStoreQueue st sq =
withQueueRec qr delete
$>>= \q -> withLog "deleteStoreQueue" st (`logDeleteQueue` rId)
>>= mapM (\_ -> (q,) <$> atomically (swapTVar (msgQueue sq) Nothing))
$>>= (<$$ withLog "deleteStoreQueue" st (`logDeleteQueue` rId))
where
rId = recipientId sq
qr = queueRec sq

View File

@@ -17,10 +17,8 @@ import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.TMap (TMap)
class StoreQueueClass q where
type MsgQueue q = mq | mq -> q
recipientId :: q -> RecipientId
queueRec :: q -> TVar (Maybe QueueRec)
msgQueue :: q -> TVar (Maybe (MsgQueue q))
withQueueLock :: q -> Text -> IO a -> IO a
class StoreQueueClass q => QueueStoreClass q s where
@@ -44,7 +42,7 @@ class StoreQueueClass q => QueueStoreClass q s where
blockQueue :: s -> q -> BlockingInfo -> IO (Either ErrorType ())
unblockQueue :: s -> q -> IO (Either ErrorType ())
updateQueueTime :: s -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
deleteStoreQueue :: s -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q)))
deleteStoreQueue :: s -> q -> IO (Either ErrorType QueueRec)
getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId)
setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())
getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))

View File

@@ -211,6 +211,9 @@ firstRow f e a = second f . listToEither e <$> a
maybeFirstRow :: Functor f => (a -> b) -> f [a] -> f (Maybe b)
maybeFirstRow f q = fmap f . listToMaybe <$> q
maybeFirstRow' :: Functor f => b -> (a -> b) -> f [a] -> f b
maybeFirstRow' def f q = maybe def f . listToMaybe <$> q
firstRow' :: (a -> Either e b) -> e -> IO [a] -> IO (Either e b)
firstRow' f e a = (f <=< listToEither e) <$> a

View File

@@ -114,6 +114,7 @@ import Test.Hspec hiding (fit, it)
import UnliftIO
import Util
import XFTPClient (testXFTPServer)
#if defined(dbPostgres)
import Fixtures
#endif
@@ -122,6 +123,7 @@ import qualified Database.PostgreSQL.Simple as PSQL
import Simplex.Messaging.Agent.Store (Connection (..), StoredRcvQueue (..), SomeConn (..))
import Simplex.Messaging.Agent.Store.AgentStore (getConn)
import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue)
import Simplex.Messaging.Server.MsgStore.Postgres (PostgresQueue)
import Simplex.Messaging.Server.MsgStore.Types (QSType (..))
import Simplex.Messaging.Server.QueueStore.Postgres
import Simplex.Messaging.Server.QueueStore.Types (QueueStoreClass (..))
@@ -1524,20 +1526,29 @@ testOldContactQueueShortLink ps@(_, msType) = withAgentClients2 $ \a b -> do
A.createConnection a NRMInteractive 1 True SCMContact Nothing Nothing CR.IKPQOn SMOnlyCreate
-- make it an "old" queue
let updateStoreLog f = replaceSubstringInFile f " queue_mode=C" ""
() <- case testServerStoreConfig msType of
ASSCfg _ _ (SSCMemory (Just StorePaths {storeLogFile})) -> updateStoreLog storeLogFile
ASSCfg _ _ (SSCMemoryJournal {storeLogFile}) -> updateStoreLog storeLogFile
ASSCfg _ _ (SSCDatabaseJournal {storeCfg}) -> do
#if defined(dbServerPostgres)
let AgentClient {agentEnv = Env {store}} = a
Right (SomeConn _ (ContactConnection _ RcvQueue {rcvId})) <- withTransaction store (`getConn` contactId)
st :: PostgresQueueStore (JournalQueue 'QSPostgres) <- newQueueStore @(JournalQueue 'QSPostgres) storeCfg
Right 1 <- runExceptT $ withDB' "test" st $ \db -> PSQL.execute db "UPDATE msg_queues SET queue_mode = ? WHERE recipient_id = ?" (Nothing :: Maybe QueueMode, rcvId)
closeQueueStore @(JournalQueue 'QSPostgres) st
#else
error "no dbServerPostgres flag"
updateDbStore :: PostgresQueueStore s -> IO ()
updateDbStore st = do
let AgentClient {agentEnv = Env {store}} = a
Right (SomeConn _ (ContactConnection _ RcvQueue {rcvId})) <- withTransaction store (`getConn` contactId)
Right 1 <- runExceptT $ withDB' "test" st $ \db -> PSQL.execute db "UPDATE msg_queues SET queue_mode = ? WHERE recipient_id = ?" (Nothing :: Maybe QueueMode, rcvId)
pure ()
#endif
() <- case testServerStoreConfig msType of
ASSCfg _ _ (SSCMemory sp_) -> mapM_ (\StorePaths {storeLogFile} -> updateStoreLog storeLogFile) sp_
ASSCfg _ _ SSCMemoryJournal {storeLogFile} -> updateStoreLog storeLogFile
#if defined(dbServerPostgres)
ASSCfg _ _ SSCDatabaseJournal {storeCfg} -> do
st :: PostgresQueueStore (JournalQueue 'QSPostgres) <- newQueueStore @(JournalQueue 'QSPostgres) storeCfg
updateDbStore st
closeQueueStore @(JournalQueue 'QSPostgres) st
ASSCfg _ _ (SSCDatabase storeCfg) -> do
st :: PostgresQueueStore PostgresQueue <- newQueueStore @PostgresQueue storeCfg
updateDbStore st
closeQueueStore @PostgresQueue st
#else
ASSCfg _ _ SSCDatabaseJournal {} -> error "no dbServerPostgres flag"
#endif
_ -> pure ()
withSmpServer ps $ do
let userData = UserLinkData "some user data"

View File

@@ -10,7 +10,6 @@ import AgentTests.FunctionalAPITests (runRight_)
import Control.Logger.Simple
import Control.Monad
import qualified Crypto.PubKey.RSA as RSA
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.HashMap.Strict as HM
import Data.Ini (Ini (..), lookupValue, readIniFile, writeIniFile)
@@ -46,6 +45,7 @@ import UnliftIO.Exception (bracket)
import Util
#if defined(dbServerPostgres)
import qualified Data.ByteString.Char8 as B
import qualified Database.PostgreSQL.Simple as PSQL
import Database.PostgreSQL.Simple.Types (Query (..))
import NtfClient (ntfTestServerDBConnectInfo, ntfTestServerDBConnstr, ntfTestStoreDBOpts)

View File

@@ -1,3 +1,4 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
@@ -41,7 +42,6 @@ import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.QueueInfo
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.StoreLog (closeStoreLog, logCreateQueue)
import System.Directory (copyFile, createDirectoryIfMissing, listDirectory, removeFile, renameFile)
import System.FilePath ((</>))
@@ -49,28 +49,48 @@ import System.IO (IOMode (..), withFile)
import Test.Hspec hiding (fit, it)
import Util
#if defined(dbServerPostgres)
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..))
import Simplex.Messaging.Server.MsgStore.Postgres
import Simplex.Messaging.Server.QueueStore.Postgres.Config
import SMPClient (postgressBracket, testServerDBConnectInfo, testStoreDBOpts)
#endif
msgStoreTests :: Spec
msgStoreTests = do
around (withMsgStore testSMTStoreConfig) $ describe "STM message store" someMsgStoreTests
around (withMsgStore $ testJournalStoreCfg MQStoreCfg) $ describe "Journal message store" $ do
someMsgStoreTests
journalMsgStoreTests
it "should export and import journal store" testExportImportStore
describe "queue state" $ do
it "should restore queue state from the last line" testQueueState
it "should recover when message is written and state is not" testMessageState
it "should remove journal files when queue is empty" testRemoveJournals
describe "missing files" $ do
it "should create read file when missing" testReadFileMissing
it "should switch to write file when read file missing" testReadFileMissingSwitch
it "should create write file when missing" testWriteFileMissing
it "should create read file when read and write files are missing" testReadAndWriteFilesMissing
#if defined(dbServerPostgres)
around_ (postgressBracket testServerDBConnectInfo) $ do
around (withMsgStore $ testJournalStoreCfg $ PQStoreCfg testPostgresStoreCfg) $
describe "Postgres+journal message store" $ do
someMsgStoreTests
journalMsgStoreTests
around (withMsgStore testPostgresStoreConfig) $
describe "Postgres-only message store" $ someMsgStoreTests
#endif
describe "Journal message store: queue state backup expiration" $ do
it "should remove old queue state backups" testRemoveQueueStateBackups
it "should expire messages in idle queues" testExpireIdleQueues
where
journalMsgStoreTests :: SpecWith (JournalMsgStore s)
journalMsgStoreTests = do
describe "queue state" $ do
it "should restore queue state from the last line" testQueueState
it "should recover when message is written and state is not" testMessageState
it "should remove journal files when queue is empty" testRemoveJournals
describe "missing files" $ do
it "should create read file when missing" testReadFileMissing
it "should switch to write file when read file missing" testReadFileMissingSwitch
it "should create write file when missing" testWriteFileMissing
it "should create read file when read and write files are missing" testReadAndWriteFilesMissing
someMsgStoreTests :: MsgStoreClass s => SpecWith s
someMsgStoreTests = do
it "should get queue and store/read messages" testGetQueue
it "should write/ack messages" testWriteAckMessages
it "should not fail on EOF when changing read journal" testChangeReadJournal
-- TODO constrain to STM stores?
@@ -95,6 +115,24 @@ testJournalStoreCfg queueStoreCfg =
keepMinBackups = 1
}
#if defined(dbServerPostgres)
testPostgresStoreConfig :: PostgresMsgStoreCfg
testPostgresStoreConfig =
PostgresMsgStoreCfg
{ queueStoreCfg = testPostgresStoreCfg,
quota = 3
}
testPostgresStoreCfg :: PostgresStoreCfg
testPostgresStoreCfg =
PostgresStoreCfg
{ dbOpts = testStoreDBOpts,
dbStoreLogPath = Nothing,
confirmMigrations = MCYesUp,
deletedTTL = 86400
}
#endif
mkMessage :: MonadIO m => ByteString -> m Message
mkMessage body = liftIO $ do
g <- C.newRandom
@@ -137,7 +175,6 @@ testNewQueueRecData g qm queueData = do
where
rndId = atomically $ EntityId <$> C.randomBytes 24 g
-- TODO constrain to STM stores
testGetQueue :: MsgStoreClass s => s -> IO ()
testGetQueue ms = do
g <- C.newRandom
@@ -180,7 +217,28 @@ testGetQueue ms = do
(Nothing, Nothing) <- tryDelPeekMsg ms q mId8
void $ ExceptT $ deleteQueue ms q
-- TODO constrain to STM stores
-- TODO [messages] test concurrent writing and reading
testWriteAckMessages :: MsgStoreClass s => s -> IO ()
testWriteAckMessages ms = do
g <- C.newRandom
(rId1, qr1) <- testNewQueueRec g QMMessaging
(rId2, qr2) <- testNewQueueRec g QMMessaging
runRight_ $ do
q1 <- ExceptT $ addQueue ms rId1 qr1
q2 <- ExceptT $ addQueue ms rId2 qr2
let write q s = writeMsg ms q True =<< mkMessage s
0 <- deleteExpiredMsgs ms q1 0 -- won't expire anything, used here to mimic message sending with expiration on SEND
Just (Message {msgId = mId1}, True) <- write q1 "message 1"
(Msg "message 1", Nothing) <- tryDelPeekMsg ms q1 mId1
0 <- deleteExpiredMsgs ms q2 0
Just (Message {msgId = mId2}, True) <- write q2 "message 2"
(Msg "message 2", Nothing) <- tryDelPeekMsg ms q2 mId2
0 <- deleteExpiredMsgs ms q2 0
Just (Message {msgId = mId3}, True) <- write q2 "message 3"
(Msg "message 3", Nothing) <- tryDelPeekMsg ms q2 mId3
void $ ExceptT $ deleteQueue ms q1
void $ ExceptT $ deleteQueue ms q2
testChangeReadJournal :: MsgStoreClass s => s -> IO ()
testChangeReadJournal ms = do
g <- C.newRandom
@@ -367,7 +425,7 @@ testRemoveJournals ms = do
Nothing <- tryPeekMsg ms q
-- still not removed, queue is empty and not opened
liftIO $ journalFilesCount dir `shouldReturn` 1
_mq <- isolateQueue q "test" $ getMsgQueue ms q False
_mq <- isolateQueue ms q "test" $ getMsgQueue ms q False
-- journal is removed
liftIO $ journalFilesCount dir `shouldReturn` 0
liftIO $ stateBackupCount dir `shouldReturn` 1
@@ -467,7 +525,7 @@ testExpireIdleQueues = do
old <- expireBeforeEpoch ExpirationConfig {ttl = 1, checkInterval = 1} -- no old messages
now <- systemSeconds <$> getSystemTime
(expired_, stored) <- runRight $ isolateQueue q "" $ withIdleMsgQueue now ms q $ deleteExpireMsgs_ old q
(expired_, stored) <- runRight $ isolateQueue ms q "" $ withIdleMsgQueue now ms q $ deleteExpireMsgs_ old q
expired_ `shouldBe` Just 0
stored `shouldBe` 0
(Nothing, False) <- readQueueState ms statePath
@@ -484,7 +542,7 @@ testReadFileMissing ms = do
Msg "message 1" <- tryPeekMsg ms q
pure q
mq <- fromJust <$> readTVarIO (msgQueue q)
mq <- fromJust <$> readTVarIO (msgQueue' q)
MsgQueueState {readState = rs} <- readTVarIO $ state mq
closeMsgQueue ms q
let path = journalFilePath (queueDirectory $ queue mq) $ journalId rs
@@ -503,7 +561,7 @@ testReadFileMissingSwitch ms = do
(rId, qr) <- testNewQueueRec g QMMessaging
q <- writeMessages ms rId qr
mq <- fromJust <$> readTVarIO (msgQueue q)
mq <- fromJust <$> readTVarIO (msgQueue' q)
MsgQueueState {readState = rs} <- readTVarIO $ state mq
closeMsgQueue ms q
let path = journalFilePath (queueDirectory $ queue mq) $ journalId rs
@@ -521,7 +579,7 @@ testWriteFileMissing ms = do
(rId, qr) <- testNewQueueRec g QMMessaging
q <- writeMessages ms rId qr
mq <- fromJust <$> readTVarIO (msgQueue q)
mq <- fromJust <$> readTVarIO (msgQueue' q)
MsgQueueState {writeState = ws} <- readTVarIO $ state mq
closeMsgQueue ms q
let path = journalFilePath (queueDirectory $ queue mq) $ journalId ws
@@ -544,7 +602,7 @@ testReadAndWriteFilesMissing ms = do
(rId, qr) <- testNewQueueRec g QMMessaging
q <- writeMessages ms rId qr
mq <- fromJust <$> readTVarIO (msgQueue q)
mq <- fromJust <$> readTVarIO (msgQueue' q)
MsgQueueState {readState = rs, writeState = ws} <- readTVarIO $ state mq
closeMsgQueue ms q
removeFile $ journalFilePath (queueDirectory $ queue mq) $ journalId rs

View File

@@ -274,9 +274,14 @@ serverStoreConfig_ useDbStoreLog = \case
ASType SQSMemory SMSJournal ->
ASSCfg SQSMemory SMSJournal $ SSCMemoryJournal {storeLogFile = testStoreLogFile, storeMsgsPath = testStoreMsgsDir}
ASType SQSPostgres SMSJournal ->
let dbStoreLogPath = if useDbStoreLog then Just testStoreLogFile else Nothing
storeCfg = PostgresStoreCfg {dbOpts = testStoreDBOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = 86400}
in ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeCfg, storeMsgsPath' = testStoreMsgsDir}
ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeCfg, storeMsgsPath' = testStoreMsgsDir}
#if defined(dbServerPostgres)
ASType SQSPostgres SMSPostgres ->
ASSCfg SQSPostgres SMSPostgres $ SSCDatabase storeCfg
#endif
where
dbStoreLogPath = if useDbStoreLog then Just testStoreLogFile else Nothing
storeCfg = PostgresStoreCfg {dbOpts = testStoreDBOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = 86400}
cfgV7 :: AServerConfig
cfgV7 = updateCfg cfg $ \cfg' -> cfg' {smpServerVRange = mkVersionRange minServerSMPRelayVersion authCmdsSMPVersion}

View File

@@ -14,6 +14,7 @@
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -Wno-orphans #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
module ServerTests where
@@ -45,7 +46,7 @@ import Simplex.Messaging.Server (exportMessages)
import Simplex.Messaging.Server.Env.STM (AStoreType (..), MsgStore (..), ServerConfig (..), ServerStoreCfg (..), readWriteQueueStore)
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..), QStoreCfg (..), stmQueueStore)
import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SMSType (..), SQSType (..), newMsgStore)
import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), QSType (..), SMSType (..), SQSType (..), newMsgStore)
import Simplex.Messaging.Server.Stats (PeriodStatsData (..), ServerStatsData (..))
import Simplex.Messaging.Server.StoreLog (StoreLogRecord (..), closeStoreLog)
import Simplex.Messaging.Transport
@@ -59,6 +60,11 @@ import Test.HUnit
import Test.Hspec hiding (fit, it)
import Util
#if defined(dbServerPostgres)
import CoreTests.MsgStoreTests (testPostgresStoreConfig)
import Simplex.Messaging.Server.MsgStore.Postgres (PostgresMsgStoreCfg (..), exportDbMessages)
#endif
serverTests :: SpecWith (ASrvTransport, AStoreType)
serverTests = do
describe "SMP queues" $ do
@@ -915,14 +921,27 @@ testRestoreExpireMessages =
exportStoreMessages :: AStoreType -> IO ()
exportStoreMessages = \case
ASType _ SMSJournal -> export
ASType _ SMSPostgres -> exportDB
ASType _ SMSMemory -> pure ()
where
export = do
ms <- newMsgStore (testJournalStoreCfg MQStoreCfg) {quota = 4}
readWriteQueueStore True (mkQueue ms True) testStoreLogFile (stmQueueStore ms) >>= closeStoreLog
removeFileIfExists testStoreMsgsFile
ms <- readWriteQueues
exportMessages False (StoreJournal ms) testStoreMsgsFile False
closeMsgStore ms
#if defined(dbServerPostgres)
exportDB = do
readWriteQueues >>= closeMsgStore
ms' <- newMsgStore (testPostgresStoreConfig {quota = 4} :: PostgresMsgStoreCfg)
_n <- withFile testStoreMsgsFile WriteMode $ exportDbMessages False ms'
closeMsgStore ms'
#else
exportDB = error "compiled without server_postgres flag"
#endif
readWriteQueues = do
ms <- newMsgStore ((testJournalStoreCfg MQStoreCfg) {quota = 4} :: JournalStoreConfig 'QSMemory)
readWriteQueueStore True (mkQueue ms True) testStoreLogFile (stmQueueStore ms) >>= closeStoreLog
removeFileIfExists testStoreMsgsFile
pure ms
runTest :: Transport c => TProxy c 'TServer -> (THandleSMP c 'TClient -> IO ()) -> ThreadId -> Expectation
runTest _ test' server = do
testSMPClient test' `shouldReturn` ()

View File

@@ -102,9 +102,11 @@ main = do
] -- skipComparisonForDownMigrations
testStoreDBOpts
"src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql"
around_ (postgressBracket testServerDBConnectInfo) $
around_ (postgressBracket testServerDBConnectInfo) $ do
describe "SMP server via TLS, postgres+jornal message store" $
before (pure (transport @TLS, ASType SQSPostgres SMSJournal)) serverTests
describe "SMP server via TLS, postgres-only message store" $
before (pure (transport @TLS, ASType SQSPostgres SMSPostgres)) serverTests
#endif
describe "SMP server via TLS, jornal message store" $ do
describe "SMP syntax" $ serverSyntaxTests (transport @TLS)
@@ -125,13 +127,18 @@ main = do
around_ (postgressBracket ntfTestServerDBConnectInfo) $ do
describe "Notifications server (SMP server: jornal store)" $
ntfServerTests (transport @TLS, ASType SQSMemory SMSJournal)
around_ (postgressBracket testServerDBConnectInfo) $
around_ (postgressBracket testServerDBConnectInfo) $ do
describe "Notifications server (SMP server: postgres+jornal store)" $
ntfServerTests (transport @TLS, ASType SQSPostgres SMSJournal)
describe "Notifications server (SMP server: postgres-only store)" $
ntfServerTests (transport @TLS, ASType SQSPostgres SMSPostgres)
around_ (postgressBracket testServerDBConnectInfo) $ do
describe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal)
describe "SMP client agent, postgres-only message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSPostgres)
describe "SMP proxy, postgres+jornal message store" $
before (pure $ ASType SQSPostgres SMSJournal) smpProxyTests
describe "SMP proxy, postgres-only message store" $
before (pure $ ASType SQSPostgres SMSPostgres) smpProxyTests
#endif
describe "SMP client agent, jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal)
describe "SMP proxy, jornal message store" $