diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 806c7fc34..e3db02259 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -167,7 +167,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = ms <- newJournalMsgStore MQStoreCfg readQueueStore True (mkQueue ms) storeLogFile (queueStore ms) queues <- readTVarIO $ loadedQueues $ stmQueueStore ms - let storeCfg = PostgresStoreCfg {dbOpts = dbOpts {createSchema = True}, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini} + let storeCfg = PostgresStoreCfg {dbOpts = dbOpts {createSchema = True}, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini} ps <- newJournalMsgStore $ PQStoreCfg storeCfg qCnt <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps renameFile storeLogFile $ storeLogFile <> ".bak" @@ -189,9 +189,9 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = confirmOrExit ("WARNING: PostrgreSQL database schema " <> B.unpack schema <> " (database: " <> B.unpack connstr <> ") will be exported to store log file " <> storeLogFilePath) "Queue records not exported" - let storeCfg = PostgresStoreCfg {dbOpts, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini} + let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini} ps <- newJournalMsgStore $ PQStoreCfg storeCfg - sl <- openWriteStoreLog storeLogFilePath + sl <- openWriteStoreLog False storeLogFilePath Sum qCnt <- foldQueueRecs True (postgresQueueStore ps) $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int) putStrLn $ "Export completed: " <> show qCnt <> " queues" putStrLn $ case readStoreType ini of @@ -211,8 +211,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = True -> readIniFile iniFile >>= either exitError a _ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`." getRequiredStoreLogFile ini = do - let enableStoreLog = settingIsOn "STORE_LOG" "enable" ini - case enableStoreLog $> storeLogFilePath of + case enableStoreLog' ini $> storeLogFilePath of Just storeLogFile -> do ifM (doesFileExist storeLogFile) @@ -258,6 +257,8 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = httpsCertFile = combine cfgPath "web.crt" httpsKeyFile = combine cfgPath "web.key" defaultStaticPath = combine logPath "www" + enableStoreLog' = settingIsOn "STORE_LOG" "enable" + enableDbStoreLog' = settingIsOn "STORE_LOG" "db_store_log" initializeServer opts@InitOptions {ip, fqdn, sourceCode = src', webStaticPath = sp', disableWeb = noWeb', scripted} | scripted = initialize opts | otherwise = do @@ -338,7 +339,9 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = \store_queues: memory\n\n\ \# Database connection settings for PostgreSQL database (`store_queues: database`).\n" <> dbOptsIniContent dbOptions - <> "# Time to retain deleted queues in the database, days.\n" + <> "# Write database changes to store log file\n\ + \# db_store_log: off\n\n\ + \# Time to retain deleted queues in the database, days.\n" <> ("db_deleted_ttl: " <> tshow defaultDeletedTTL <> "\n\n") <> "# Message storage mode: `memory` or `journal`.\n\ \store_messages: memory\n\n\ @@ -471,14 +474,13 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = runSMPServer cfg Nothing logDebug "Bye" where - enableStoreLog = settingIsOn "STORE_LOG" "enable" ini logStats = settingIsOn "STORE_LOG" "log_stats" ini c = combine cfgPath . ($ defaultX509Config) restoreMessagesFile path = case iniOnOff "STORE_LOG" "restore_messages" ini of Just True -> Just path Just False -> Nothing -- if the setting is not set, it is enabled when store log is enabled - _ -> enableStoreLog $> path + _ -> enableStoreLog' ini $> path transports = iniTransports ini sharedHTTP = any (\(_, _, addHTTP) -> addHTTP) transports iniStoreType = either error id $! readStoreType ini @@ -501,11 +503,12 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = httpCredentials = (\WebHttpsParams {key, cert} -> ServerCredentials {caCertificateFile = Nothing, privateKeyFile = key, certificateFile = cert}) <$> webHttpsParams', serverStoreCfg = case iniStoreType of ASType SQSMemory SMSMemory -> - ASSCfg SQSMemory SMSMemory $ SSCMemory $ enableStoreLog $> StorePaths {storeLogFile = storeLogFilePath, storeMsgsFile = restoreMessagesFile storeMsgsFilePath} + ASSCfg SQSMemory SMSMemory $ SSCMemory $ enableStoreLog' ini $> StorePaths {storeLogFile = storeLogFilePath, storeMsgsFile = restoreMessagesFile storeMsgsFilePath} ASType SQSMemory SMSJournal -> ASSCfg SQSMemory SMSJournal $ SSCMemoryJournal {storeLogFile = storeLogFilePath, storeMsgsPath = storeMsgsJournalDir} ASType SQSPostgres SMSJournal -> - let storeCfg = PostgresStoreCfg {dbOpts = iniDBOptions ini, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini} + let dbStoreLogPath = enableDbStoreLog' ini $> storeLogFilePath + storeCfg = PostgresStoreCfg {dbOpts = iniDBOptions ini, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini} in ASSCfg SQSPostgres SMSJournal $ SSCDatabaseJournal {storeCfg, storeMsgsPath' = storeMsgsJournalDir}, storeNtfsFile = restoreMessagesFile storeNtfsFilePath, -- allow creating new queues by default @@ -607,16 +610,25 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = SQSPostgres -> do let DBOpts {connstr, schema} = iniDBOptions ini schemaExists <- checkSchemaExists connstr schema - if - | storeLogExists && schemaExists -> exitConfigureQueueStore connstr schema - | storeLogExists -> do - putStrLn $ "Error: store_queues is `database` with " <> storeLogFilePath <> " file present." - putStrLn "Set store_queues to `memory` or use `smp-server database import` to migrate." - exitFailure - | not schemaExists -> do - putStrLn $ "Error: store_queues is `database`, create schema " <> B.unpack schema <> " in PostgreSQL database " <> B.unpack connstr - exitFailure - | otherwise -> pure () + case enableDbStoreLog' ini of + Just () + | not schemaExists -> noDatabaseSchema connstr schema + | not storeLogExists -> do + putStrLn $ "Error: db_store_log is `on`, " <> storeLogFilePath <> " does not exist" + exitFailure + | otherwise -> pure () + Nothing + | storeLogExists && schemaExists -> exitConfigureQueueStore connstr schema + | storeLogExists -> do + putStrLn $ "Error: store_queues is `database` with " <> storeLogFilePath <> " file present." + putStrLn "Set store_queues to `memory` or use `smp-server database import` to migrate." + exitFailure + | not schemaExists -> noDatabaseSchema connstr schema + | otherwise -> pure () + where + noDatabaseSchema connstr schema = do + putStrLn $ "Error: store_queues is `database`, create schema " <> B.unpack schema <> " in PostgreSQL database " <> B.unpack connstr + exitFailure ASType SQSMemory SMSMemory | msgsFileExists && msgsDirExists -> exitConfigureMsgStorage | msgsDirExists -> do diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index f7a65488d..74ebac2d3 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -76,7 +76,6 @@ import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>)) import System.Directory import System.FilePath (takeFileName, ()) @@ -288,6 +287,8 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where MQStoreCfg -> MQStore <$> newQueueStore @(JournalQueue s) () PQStoreCfg cfg -> PQStore <$> newQueueStore @(JournalQueue s) cfg + closeQueueStore = withQS (closeQueueStore @(JournalQueue s)) + {-# INLINE closeQueueStore #-} loadedQueues = withQS loadedQueues {-# INLINE loadedQueues #-} compactQueues = withQS (compactQueues @(JournalQueue s)) @@ -330,12 +331,10 @@ instance MsgStoreClass (JournalMsgStore s) where pure JournalMsgStore {config, random, queueLocks, queueStore_, expireBackupsBefore} closeMsgStore :: JournalMsgStore s -> IO () - closeMsgStore ms = case queueStore_ ms of - MQStore st -> do - readTVarIO (storeLog st) >>= mapM_ closeStoreLog - closeQueues $ loadedQueues @(JournalQueue s) st - PQStore st -> - closeQueues $ loadedQueues @(JournalQueue s) st + closeMsgStore ms = do + let st = queueStore_ ms + closeQueues $ loadedQueues @(JournalQueue s) st + closeQueueStore @(JournalQueue s) st where closeQueues qs = readTVarIO qs >>= mapM_ closeMsgQueue diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index d2a31d3d2..ff7a93db8 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -28,7 +28,6 @@ import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.QueueStore.Types -import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.Util ((<$$>), ($>>=)) data STMMsgStore = STMMsgStore @@ -77,8 +76,8 @@ instance MsgStoreClass STMMsgStore where queueStore_ <- newQueueStore @STMQueue () pure STMMsgStore {storeConfig, queueStore_} - closeMsgStore st = readTVarIO (storeLog $ queueStore_ st) >>= mapM_ closeStoreLog - + closeMsgStore = closeQueueStore @STMQueue . queueStore_ + {-# INLINE closeMsgStore #-} withActiveMsgQueues = withLoadedQueues . queueStore_ {-# INLINE withActiveMsgQueues #-} withAllMsgQueues _ = withLoadedQueues . queueStore_ diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 0923ab244..0e30ed66d 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -47,9 +47,10 @@ import Database.PostgreSQL.Simple.FromField (FromField (..)) import Database.PostgreSQL.Simple.ToField (ToField (..)) import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation) import Database.PostgreSQL.Simple.SqlQQ (sql) +import GHC.IO (catchAny) import Simplex.Messaging.Agent.Client (withLockMap) import Simplex.Messaging.Agent.Lock (Lock) -import Simplex.Messaging.Agent.Store.Postgres (createDBStore) +import Simplex.Messaging.Agent.Store.Postgres (createDBStore, closeDBStore) import Simplex.Messaging.Agent.Store.Postgres.Common import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation) import Simplex.Messaging.Protocol @@ -57,11 +58,12 @@ import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.Postgres.Migrations (serverMigrations) import Simplex.Messaging.Server.QueueStore.STM (readQueueRecIO) import Simplex.Messaging.Server.QueueStore.Types +import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util (firstRow, ifM, tshow, (<$$>)) import System.Exit (exitFailure) -import System.IO (hFlush, stdout) +import System.IO (IOMode (..), hFlush, stdout) import UnliftIO.STM #if !defined(dbPostgres) import Simplex.Messaging.Agent.Store.Postgres.DB (blobFieldDecoder) @@ -71,6 +73,7 @@ import Simplex.Messaging.Encoding.String data PostgresQueueStore q = PostgresQueueStore { dbStore :: DBStore, + dbStoreLog :: Maybe (StoreLog 'WriteMode), -- this map caches all created and opened queues queues :: TMap RecipientId q, -- this map only cashes the queues that were attempted to send messages to, @@ -83,6 +86,7 @@ data PostgresQueueStore q = PostgresQueueStore data PostgresStoreCfg = PostgresStoreCfg { dbOpts :: DBOpts, + dbStoreLogPath :: Maybe FilePath, confirmMigrations :: MigrationConfirmation, deletedTTL :: Int64 } @@ -91,18 +95,24 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where type QueueStoreCfg (PostgresQueueStore q) = PostgresStoreCfg newQueueStore :: PostgresStoreCfg -> IO (PostgresQueueStore q) - newQueueStore PostgresStoreCfg {dbOpts, confirmMigrations, deletedTTL} = do + newQueueStore PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations, deletedTTL} = do dbStore <- either err pure =<< createDBStore dbOpts serverMigrations confirmMigrations + dbStoreLog <- mapM (openWriteStoreLog True) dbStoreLogPath queues <- TM.emptyIO senders <- TM.emptyIO notifiers <- TM.emptyIO notifierLocks <- TM.emptyIO - pure PostgresQueueStore {dbStore, queues, senders, notifiers, notifierLocks, deletedTTL} + pure PostgresQueueStore {dbStore, dbStoreLog, queues, senders, notifiers, notifierLocks, deletedTTL} where err e = do logError $ "STORE: newQueueStore, error opening PostgreSQL database, " <> tshow e exitFailure + closeQueueStore :: PostgresQueueStore q -> IO () + closeQueueStore PostgresQueueStore {dbStore, dbStoreLog} = do + closeDBStore dbStore + mapM_ closeStoreLog dbStoreLog + loadedQueues = queues {-# INLINE loadedQueues #-} @@ -136,6 +146,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where >>= bimapM handleDuplicate pure atomically $ TM.insert rId sq queues atomically $ TM.insert (senderId qr) rId senders + withLog "addStoreQueue" st $ \s -> logCreateQueue s rId qr pure sq where PostgresQueueStore {queues, senders} = st @@ -166,9 +177,11 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where withQueueDB sq "secureQueue" $ \q -> do verify q assertUpdated $ withDB' "secureQueue" st $ \db -> - DB.execute db "UPDATE msg_queues SET sender_key = ? WHERE recipient_id = ? AND deleted_at IS NULL" (sKey, recipientId sq) + DB.execute db "UPDATE msg_queues SET sender_key = ? WHERE recipient_id = ? AND deleted_at IS NULL" (sKey, rId) atomically $ writeTVar (queueRec sq) $ Just q {senderKey = Just sKey} + withLog "secureQueue" st $ \s -> logSecureQueue s rId sKey where + rId = recipientId sq verify q = case senderKey q of Just k | sKey /= k -> throwE AUTH _ -> pure () @@ -185,6 +198,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where atomically $ writeTVar (queueRec sq) $ Just q' -- cache queue notifier ID – after notifier is added ntf server will likely subscribe atomically $ TM.insert nId rId notifiers + withLog "addQueueNotifier" st $ \s -> logAddNotifier s rId ntfCreds pure nId_ where PostgresQueueStore {notifiers} = st @@ -208,8 +222,10 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where assertUpdated $ withDB' "deleteQueueNotifier" st update atomically $ TM.delete nId $ notifiers st atomically $ writeTVar (queueRec sq) $ Just q {notifier = Nothing} + withLog "deleteQueueNotifier" st (`logDeleteNotifier` rId) pure nId where + rId = recipientId sq update db = DB.execute db @@ -218,16 +234,22 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where SET notifier_id = NULL, notifier_key = NULL, rcv_ntf_dh_secret = NULL WHERE recipient_id = ? AND deleted_at IS NULL |] - (Only $ recipientId sq) + (Only rId) suspendQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType ()) - suspendQueue st sq = setStatusDB "suspendQueue" st sq EntityOff + suspendQueue st sq = + setStatusDB "suspendQueue" st sq EntityOff $ + withLog "suspendQueue" st (`logSuspendQueue` recipientId sq) blockQueue :: PostgresQueueStore q -> q -> BlockingInfo -> IO (Either ErrorType ()) - blockQueue st sq info = setStatusDB "blockQueue" st sq (EntityBlocked info) + blockQueue st sq info = + setStatusDB "blockQueue" st sq (EntityBlocked info) $ + withLog "blockQueue" st $ \sl -> logBlockQueue sl (recipientId sq) info unblockQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType ()) - unblockQueue st sq = setStatusDB "unblockQueue" st sq EntityActive + unblockQueue st sq = + setStatusDB "unblockQueue" st sq EntityActive $ + withLog "unblockQueue" st (`logUnblockQueue` recipientId sq) updateQueueTime :: PostgresQueueStore q -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec) updateQueueTime st sq t = @@ -236,10 +258,13 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where then pure q else do assertUpdated $ withDB' "updateQueueTime" st $ \db -> - DB.execute db "UPDATE msg_queues SET updated_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (t, recipientId sq) + DB.execute db "UPDATE msg_queues SET updated_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (t, rId) let !q' = q {updatedAt = Just t} atomically $ writeTVar (queueRec sq) $ Just q' + withLog "updateQueueTime" st $ \sl -> logUpdateQueueTime sl rId t pure q' + where + rId = recipientId sq -- this method is called from JournalMsgStore deleteQueue that already locks the queue deleteStoreQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q))) @@ -247,12 +272,15 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where q <- ExceptT $ readQueueRecIO qr RoundedSystemTime ts <- liftIO getSystemDate assertUpdated $ withDB' "deleteStoreQueue" st $ \db -> - DB.execute db "UPDATE msg_queues SET deleted_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (ts, recipientId sq) + DB.execute db "UPDATE msg_queues SET deleted_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (ts, rId) atomically $ writeTVar qr Nothing atomically $ TM.delete (senderId q) $ senders st forM_ (notifier q) $ \NtfCreds {notifierId} -> atomically $ TM.delete notifierId $ notifiers st - (q,) <$> atomically (swapTVar (msgQueue sq) Nothing) + mq_ <- atomically $ swapTVar (msgQueue sq) Nothing + withLog "deleteStoreQueue" st (`logDeleteQueue` rId) + pure (q, mq_) where + rId = recipientId sq qr = queueRec sq batchInsertQueues :: StoreQueueClass q => Bool -> M.Map RecipientId q -> PostgresQueueStore q' -> IO Int64 @@ -335,12 +363,13 @@ rowToQueueRec (rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, n let notifier = NtfCreds <$> notifierId_ <*> notifierKey_ <*> rcvNtfDhSecret_ in (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt}) -setStatusDB :: StoreQueueClass q => String -> PostgresQueueStore q -> q -> ServerEntityStatus -> IO (Either ErrorType ()) -setStatusDB op st sq status = +setStatusDB :: StoreQueueClass q => String -> PostgresQueueStore q -> q -> ServerEntityStatus -> ExceptT ErrorType IO () -> IO (Either ErrorType ()) +setStatusDB op st sq status writeLog = withQueueDB sq op $ \q -> do assertUpdated $ withDB' op st $ \db -> DB.execute db "UPDATE msg_queues SET status = ? WHERE recipient_id = ? AND deleted_at IS NULL" (status, recipientId sq) atomically $ writeTVar (queueRec sq) $ Just q {status} + writeLog withQueueDB :: StoreQueueClass q => q -> String -> (QueueRec -> ExceptT ErrorType IO a) -> IO (Either ErrorType a) withQueueDB sq op action = @@ -361,6 +390,11 @@ withDB op st action = where err = op <> ", withLog, " <> show e +withLog :: MonadIO m => String -> PostgresQueueStore q -> (StoreLog 'WriteMode -> IO ()) -> m () +withLog op PostgresQueueStore {dbStoreLog} action = + forM_ dbStoreLog $ \sl -> liftIO $ E.uninterruptibleMask_ (action sl) `catchAny` \e -> + logWarn $ "STORE: " <> T.pack (op <> ", withLog, " <> show e) + handleDuplicate :: SqlError -> IO ErrorType handleDuplicate e = case constraintViolation e of Just (UniqueViolation _) -> pure AUTH diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 648748162..db415e297 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -62,6 +62,9 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where storeLog <- newTVarIO Nothing pure STMQueueStore {queues, senders, notifiers, storeLog} + closeQueueStore :: STMQueueStore q -> IO () + closeQueueStore st = readTVarIO (storeLog st) >>= mapM_ closeStoreLog + loadedQueues = queues {-# INLINE loadedQueues #-} compactQueues _ = pure 0 @@ -194,7 +197,7 @@ readQueueRecIO qr = maybe (Left AUTH) Right <$> readTVarIO qr withLog' :: String -> TVar (Maybe (StoreLog 'WriteMode)) -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ()) withLog' name sl action = readTVarIO sl - >>= maybe (pure $ Right ()) (E.try . action >=> bimapM logErr pure) + >>= maybe (pure $ Right ()) (E.try . E.uninterruptibleMask_ . action >=> bimapM logErr pure) where logErr :: E.SomeException -> IO ErrorType logErr e = logError ("STORE: " <> T.pack err) $> STORE err diff --git a/src/Simplex/Messaging/Server/QueueStore/Types.hs b/src/Simplex/Messaging/Server/QueueStore/Types.hs index 2951e4038..8af65a335 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Types.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Types.hs @@ -24,6 +24,7 @@ class StoreQueueClass q where class StoreQueueClass q => QueueStoreClass q s where type QueueStoreCfg s newQueueStore :: QueueStoreCfg s -> IO s + closeQueueStore :: s -> IO () queueCounts :: s -> IO QueueCounts loadedQueues :: s -> TMap RecipientId q compactQueues :: s -> IO Int64 diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index d82062ec3..1e0565a61 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -158,9 +158,9 @@ instance StrEncoding StoreLogRecord where DeleteNotifier_ -> DeleteNotifier <$> strP UpdateTime_ -> UpdateTime <$> strP_ <*> strP -openWriteStoreLog :: FilePath -> IO (StoreLog 'WriteMode) -openWriteStoreLog f = do - h <- openFile f WriteMode +openWriteStoreLog :: Bool -> FilePath -> IO (StoreLog 'WriteMode) +openWriteStoreLog append f = do + h <- openFile f $ if append then AppendMode else WriteMode hSetBuffering h LineBuffering pure $ WriteStoreLog f h @@ -239,7 +239,7 @@ readWriteStoreLog readStore writeStore f st = removeStoreLogBackups f pure s writeLog msg = do - s <- openWriteStoreLog f + s <- openWriteStoreLog False f logInfo msg writeStore s st pure s diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index ec2d07039..d871f5b0a 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -101,7 +101,7 @@ storeLogTests = testSMPStoreLog :: String -> [SMPStoreLogTestCase] -> Spec testSMPStoreLog testSuite tests = describe testSuite $ forM_ tests $ \t@SLTC {name, saved} -> it name $ do - l <- openWriteStoreLog testStoreLogFile + l <- openWriteStoreLog False testStoreLogFile mapM_ (writeStoreLogRecord l) saved closeStoreLog l replicateM_ 3 $ testReadWrite t diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 653a03b84..2eb3fe17b 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -162,7 +162,7 @@ journalCfg cfg' storeLogFile storeMsgsPath = cfg' {serverStoreCfg = ASSCfg SQSMe journalCfgDB :: ServerConfig -> DBOpts -> FilePath -> ServerConfig journalCfgDB cfg' dbOpts storeMsgsPath' = - let storeCfg = PostgresStoreCfg {dbOpts, confirmMigrations = MCYesUp, deletedTTL = 86400} + let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCYesUp, deletedTTL = 86400} in cfg' {serverStoreCfg = ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeCfg, storeMsgsPath'}} cfgMS :: AStoreType -> ServerConfig @@ -213,13 +213,17 @@ cfgMS msType = } serverStoreConfig :: AStoreType -> AServerStoreCfg -serverStoreConfig = \case +serverStoreConfig = serverStoreConfig_ False + +serverStoreConfig_ :: Bool -> AStoreType -> AServerStoreCfg +serverStoreConfig_ useDbStoreLog = \case ASType SQSMemory SMSMemory -> ASSCfg SQSMemory SMSMemory $ SSCMemory $ Just StorePaths {storeLogFile = testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile} ASType SQSMemory SMSJournal -> ASSCfg SQSMemory SMSJournal $ SSCMemoryJournal {storeLogFile = testStoreLogFile, storeMsgsPath = testStoreMsgsDir} ASType SQSPostgres SMSJournal -> - let storeCfg = PostgresStoreCfg {dbOpts = testStoreDBOpts, confirmMigrations = MCYesUp, deletedTTL = 86400} + let dbStoreLogPath = if useDbStoreLog then Just testStoreLogFile else Nothing + storeCfg = PostgresStoreCfg {dbOpts = testStoreDBOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = 86400} in ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeCfg, storeMsgsPath' = testStoreMsgsDir} cfgV7 :: ServerConfig diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index fcf1943ad..824338452 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -564,7 +564,7 @@ testExceedQueueQuota = testWithStoreLog :: SpecWith (ATransport, AStoreType) testWithStoreLog = - it "should store simplex queues to log and restore them after server restart" $ \ps@(at@(ATransport t), _) -> do + it "should store simplex queues to log and restore them after server restart" $ \(at@(ATransport t), msType) -> do g <- C.newRandom (sPub1, sKey1) <- atomically $ C.generateAuthKeyPair C.SEd25519 g (sPub2, sKey2) <- atomically $ C.generateAuthKeyPair C.SEd25519 g @@ -576,7 +576,8 @@ testWithStoreLog = senderId2 <- newTVarIO NoEntity notifierId <- newTVarIO NoEntity - withSmpServerStoreLogOn ps testPort . runTest t $ \h -> runClient t $ \h1 -> do + let (cfg', compacting) = serverStoreLogCfg msType + withSmpServerConfigOn at cfg' testPort . runTest t $ \h -> runClient t $ \h1 -> do (sId1, rId1, rKey1, dhShared) <- createAndSecureQueue h sPub1 (rcvNtfPubDhKey, _) <- atomically $ C.generateKeyPair g Resp "abcd" _ (NID nId _) <- signSendRecv h rKey1 ("abcd", rId1, NKEY nPub rcvNtfPubDhKey) @@ -607,16 +608,16 @@ testWithStoreLog = Resp "dabc" _ OK <- signSendRecv h rKey2 ("dabc", rId2, DEL) pure () - when (usesStoreLog ps) $ logSize testStoreLogFile `shouldReturn` 6 + logSize testStoreLogFile `shouldReturn` 6 - let cfg' = cfg {serverStoreCfg = ASSCfg SQSMemory SMSMemory $ SSCMemory Nothing} - withSmpServerConfigOn at cfg' testPort . runTest t $ \h -> do + let cfg'' = cfg {serverStoreCfg = ASSCfg SQSMemory SMSMemory $ SSCMemory Nothing} + withSmpServerConfigOn at cfg'' testPort . runTest t $ \h -> do sId1 <- readTVarIO senderId1 -- fails if store log is disabled Resp "bcda" _ (ERR AUTH) <- signSendRecv h sKey1 ("bcda", sId1, _SEND "hello") pure () - withSmpServerStoreLogOn ps testPort . runTest t $ \h -> runClient t $ \h1 -> do + withSmpServerConfigOn at cfg' testPort . runTest t $ \h -> runClient t $ \h1 -> do -- this queue is restored rId1 <- readTVarIO recipientId1 Just rKey1 <- readTVarIO recipientKey1 @@ -633,9 +634,9 @@ testWithStoreLog = Resp "cdab" _ (ERR AUTH) <- signSendRecv h sKey2 ("cdab", sId2, _SEND "hello too") pure () - when (usesStoreLog ps) $ do - logSize testStoreLogFile `shouldReturn` 1 - removeFile testStoreLogFile + -- when (usesStoreLog ps) $ do + logSize testStoreLogFile `shouldReturn` (if compacting then 1 else 6) + removeFile testStoreLogFile where runTest :: Transport c => TProxy c -> (THandleSMP c 'TClient -> IO ()) -> ThreadId -> Expectation runTest _ test' server = do @@ -645,10 +646,14 @@ testWithStoreLog = runClient :: Transport c => TProxy c -> (THandleSMP c 'TClient -> IO ()) -> Expectation runClient _ test' = testSMPClient test' `shouldReturn` () -usesStoreLog :: (ATransport, AStoreType) -> Bool -usesStoreLog (_, ASType qsType _) = case qsType of - SQSMemory -> True - SQSPostgres -> False +serverStoreLogCfg :: AStoreType -> (ServerConfig, Bool) +serverStoreLogCfg msType = + let serverStoreCfg = serverStoreConfig_ True msType + cfg' = (cfgMS msType) {serverStoreCfg, storeNtfsFile = Just testStoreNtfsFile, serverStatsBackupFile = Just testServerStatsBackupFile} + compacting = case msType of + ASType SQSPostgres _ -> False + _ -> True + in (cfg', compacting) logSize :: FilePath -> IO Int logSize f = go (10 :: Int) @@ -662,7 +667,7 @@ logSize f = go (10 :: Int) testRestoreMessages :: SpecWith (ATransport, AStoreType) testRestoreMessages = - it "should store messages on exit and restore on start" $ \ps@(ATransport t, _) -> do + it "should store messages on exit and restore on start" $ \(at@(ATransport t), msType) -> do removeFileIfExists testStoreLogFile removeFileIfExists testStoreMsgsFile whenM (doesDirectoryExist testStoreMsgsDir) $ removeDirectoryRecursive testStoreMsgsDir @@ -674,7 +679,8 @@ testRestoreMessages = recipientKey <- newTVarIO Nothing dhShared <- newTVarIO Nothing senderId <- newTVarIO NoEntity - withSmpServerStoreMsgLogOn ps testPort . runTest t $ \h -> do + let (cfg', compacting) = serverStoreLogCfg msType + withSmpServerConfigOn at cfg' testPort . runTest t $ \h -> do runClient t $ \h1 -> do (sId, rId, rKey, dh) <- createAndSecureQueue h1 sPub atomically $ do @@ -695,11 +701,11 @@ testRestoreMessages = Resp "6" _ (ERR QUOTA) <- signSendRecv h sKey ("6", sId, _SEND "hello 6") pure () rId <- readTVarIO recipientId - when (usesStoreLog ps) $ logSize testStoreLogFile `shouldReturn` 2 + logSize testStoreLogFile `shouldReturn` 2 logSize testServerStatsBackupFile `shouldReturn` 76 Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats1 [rId] 5 1 - withSmpServerStoreMsgLogOn ps testPort . runTest t $ \h -> do + withSmpServerConfigOn at cfg' testPort . runTest t $ \h -> do Just rKey <- readTVarIO recipientKey Just dh <- readTVarIO dhShared let dec = decryptMsgV3 dh @@ -709,14 +715,14 @@ testRestoreMessages = (dec mId3 msg3, Right "hello 3") #== "restored message delivered" Resp "4" _ (Msg mId4 msg4) <- signSendRecv h rKey ("4", rId, ACK mId3) (dec mId4 msg4, Right "hello 4") #== "restored message delivered" - when (usesStoreLog ps) $ logSize testStoreLogFile `shouldReturn` 1 + logSize testStoreLogFile `shouldReturn` (if compacting then 1 else 2) -- the last message is not removed because it was not ACK'd -- logSize testStoreMsgsFile `shouldReturn` 3 logSize testServerStatsBackupFile `shouldReturn` 76 Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats2 [rId] 5 3 - withSmpServerStoreMsgLogOn ps testPort . runTest t $ \h -> do + withSmpServerConfigOn at cfg' testPort . runTest t $ \h -> do Just rKey <- readTVarIO recipientKey Just dh <- readTVarIO dhShared let dec = decryptMsgV3 dh @@ -728,9 +734,8 @@ testRestoreMessages = (dec mId6 msg6, Left "ClientRcvMsgQuota") #== "restored message delivered" Resp "7" _ OK <- signSendRecv h rKey ("7", rId, ACK mId6) pure () - when (usesStoreLog ps) $ do - logSize testStoreLogFile `shouldReturn` 1 - removeFile testStoreLogFile + logSize testStoreLogFile `shouldReturn` (if compacting then 1 else 2) + removeFile testStoreLogFile logSize testServerStatsBackupFile `shouldReturn` 76 Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats3 [rId] 5 5 @@ -764,14 +769,15 @@ checkStats s qs sent received = do testRestoreExpireMessages :: SpecWith (ATransport, AStoreType) testRestoreExpireMessages = - it "should store messages on exit and restore on start (old / v2)" $ \ps@(at@(ATransport t), msType) -> do + it "should store messages on exit and restore on start (old / v2)" $ \(at@(ATransport t), msType) -> do g <- C.newRandom (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g recipientId <- newTVarIO NoEntity recipientKey <- newTVarIO Nothing dhShared <- newTVarIO Nothing senderId <- newTVarIO NoEntity - withSmpServerStoreMsgLogOn ps testPort . runTest t $ \h -> do + let (cfg', _compacting) = serverStoreLogCfg msType + withSmpServerConfigOn at cfg' testPort . runTest t $ \h -> do runClient t $ \h1 -> do (sId, rId, rKey, dh) <- createAndSecureQueue h1 sPub atomically $ do @@ -786,36 +792,29 @@ testRestoreExpireMessages = Resp "3" _ OK <- signSendRecv h sKey ("3", sId, _SEND "hello 3") Resp "4" _ OK <- signSendRecv h sKey ("4", sId, _SEND "hello 4") pure () - msgs <- - if usesStoreLog ps - then do - logSize testStoreLogFile `shouldReturn` 2 - exportStoreMessages msType - msgs <- B.readFile testStoreMsgsFile - length (B.lines msgs) `shouldBe` 4 - pure msgs - else pure [] + logSize testStoreLogFile `shouldReturn` 2 + exportStoreMessages msType + msgs <- B.readFile testStoreMsgsFile + length (B.lines msgs) `shouldBe` 4 let expCfg1 = Just ExpirationConfig {ttl = 86400, checkInterval = 43200} - cfg1 = (cfgMS msType) {messageExpiration = expCfg1, serverStatsBackupFile = Just testServerStatsBackupFile} + cfg1 = cfg' {messageExpiration = expCfg1, serverStatsBackupFile = Just testServerStatsBackupFile} withSmpServerConfigOn at cfg1 testPort . runTest t $ \_ -> pure () - when (usesStoreLog ps) $ do - logSize testStoreLogFile `shouldReturn` 1 - exportStoreMessages msType - msgs' <- B.readFile testStoreMsgsFile - msgs' `shouldBe` msgs + logSize testStoreLogFile `shouldReturn` 1 + exportStoreMessages msType + msgs' <- B.readFile testStoreMsgsFile + msgs' `shouldBe` msgs let expCfg2 = Just ExpirationConfig {ttl = 2, checkInterval = 43200} - cfg2 = (cfgMS msType) {messageExpiration = expCfg2, serverStatsBackupFile = Just testServerStatsBackupFile} + cfg2 = cfg' {messageExpiration = expCfg2, serverStatsBackupFile = Just testServerStatsBackupFile} withSmpServerConfigOn at cfg2 testPort . runTest t $ \_ -> pure () - when (usesStoreLog ps) $ do - logSize testStoreLogFile `shouldReturn` 1 - -- two messages expired - exportStoreMessages msType - msgs'' <- B.readFile testStoreMsgsFile - length (B.lines msgs'') `shouldBe` 2 - B.lines msgs'' `shouldBe` drop 2 (B.lines msgs) + logSize testStoreLogFile `shouldReturn` 1 + -- two messages expired + exportStoreMessages msType + msgs'' <- B.readFile testStoreMsgsFile + length (B.lines msgs'') `shouldBe` 2 + B.lines msgs'' `shouldBe` drop 2 (B.lines msgs) Right ServerStatsData {_msgExpired} <- strDecode <$> B.readFile testServerStatsBackupFile _msgExpired `shouldBe` 2 where