diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 96588976f..e4585cc22 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -105,7 +105,7 @@ import Simplex.Messaging.Server.Control import Simplex.Messaging.Server.Env.STM as Env import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore -import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue) +import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, getJournalQueueMessages) import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.NtfStore @@ -2103,26 +2103,36 @@ randomId = fmap EntityId . randomId' {-# INLINE randomId #-} saveServerMessages :: Bool -> MsgStore s -> IO () -saveServerMessages drainMsgs = \case - StoreMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of +saveServerMessages drainMsgs ms = case ms of + StoreMemory STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of Just f -> exportMessages False ms f drainMsgs Nothing -> logNote "undelivered messages are not saved" StoreJournal _ -> logNote "closed journal message storage" -exportMessages :: MsgStoreClass s => Bool -> s -> FilePath -> Bool -> IO () +exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath -> Bool -> IO () exportMessages tty ms f drainMsgs = do logNote $ "saving messages to file " <> T.pack f liftIO $ withFile f WriteMode $ \h -> - tryAny (unsafeWithAllMsgQueues tty True ms $ saveQueueMsgs h) >>= \case + tryAny (unsafeWithAllMsgQueues tty False ms' $ saveQueueMsgs h) >>= \case Right (Sum total) -> logNote $ "messages saved: " <> tshow total Left e -> do logError $ "error exporting messages: " <> tshow e exitFailure where - saveQueueMsgs h q = do - msgs <- + ms' :: s + ms' = case ms of + StoreMemory s -> s + StoreJournal s -> s + getMessages q = case ms of + StoreMemory _ -> unsafeRunStore q "saveQueueMsgs" $ - getQueueMessages_ drainMsgs q =<< getMsgQueue ms q False + getQueueMessages_ drainMsgs q =<< getMsgQueue ms' q False + StoreJournal _ -> + unsafeRunStore q "saveQueueMsgs" $ + getQueueMessages_ drainMsgs q =<< getMsgQueue ms' q False + -- getJournalQueueMessages ms' q + saveQueueMsgs h q = do + msgs <- getMessages q unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs pure $ Sum $ length msgs encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n') diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 1782d62c9..1c3765698 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -145,12 +145,11 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = completedExport | otherwise -> do confirmExport - let msType = readStoreType ini case readStoreType ini of Right (ASType SQSMemory _) -> do ms <- newJournalMsgStore logPath MQStoreCfg readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms - exportMessages True ms storeMsgsFilePath False + exportMessages True (StoreJournal ms) storeMsgsFilePath False Right (ASType SQSPostgres SMSJournal) -> do #if defined(dbServerPostgres) let dbStoreLogPath = enableDbStoreLog' ini $> storeLogFilePath @@ -159,7 +158,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr exitFailure ms <- newJournalMsgStore logPath $ PQStoreCfg PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini} - exportMessages True ms storeMsgsFilePath False + exportMessages True (StoreJournal ms) storeMsgsFilePath False #else noPostgresExit #endif diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 032832e0a..7e5743664 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -48,6 +48,7 @@ module Simplex.Messaging.Server.MsgStore.Journal postgresQueueStore, #endif exportJournalMessages, + getJournalQueueMessages, encodeMessages, ) where @@ -569,8 +570,9 @@ instance MsgStoreClass (JournalMsgStore s) where where getSize = maybe (pure (-1)) (fmap size . readTVarIO . state) + -- drainMsgs is never True with Journal storage getQueueMessages_ :: Bool -> JournalQueue s -> JournalMsgQueue s -> StoreIO s [Message] - getQueueMessages_ drainMsgs q' q = StoreIO (run []) + getQueueMessages_ drainMsgs q' q = StoreIO $ if drainMsgs then run [] else readTVarIO (state q) >>= runFast where run msgs = readTVarIO (handles q) >>= maybe (pure []) (getMsg msgs) getMsg msgs hs = chooseReadJournal q' q drainMsgs hs >>= maybe (pure msgs) readMsg @@ -579,6 +581,23 @@ instance MsgStoreClass (JournalMsgStore s) where (msg, len) <- hGetMsgAt h $ bytePos rs updateReadPos q' q drainMsgs len hs (msg :) <$> run msgs + runFast MsgQueueState {writeState = ws, readState = rs, size} + | size > 0 = + readTVarIO (handles q) >>= \case + Just (MsgQueueHandles _ rh wh_) -> case wh_ of + Just wh -> (++) <$> getAllMsgs rh (bytePos rs) (byteCount rs - bytePos rs) <*> getAllMsgs wh 0 (bytePos ws) + _ -> getAllMsgs rh (bytePos rs) (bytePos ws - bytePos rs) + Nothing -> pure [] + | otherwise = pure [] + getAllMsgs h pos sz + | sz > 0 = do + IO.hSeek h AbsoluteSeek $ fromIntegral pos + parseMsgs =<< B.hGet h (fromIntegral sz) + | otherwise = pure [] + parseMsgs s = do + let (errs, msgs) = partitionEithers $ map strDecode $ B.lines s + unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> B.unpack (strEncode $ recipientId' q') + pure msgs writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do @@ -1103,5 +1122,33 @@ exportJournalMessages tty ms@JournalMsgStore {config} h = ifM (doesDirectoryExis (pure $ Just (queueId', path')) (Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping")) +getJournalQueueMessages :: JournalMsgStore s -> JournalQueue s -> IO [Message] +getJournalQueueMessages ms q = do + let rId = recipientId' q + dir = msgQueueDirectory ms rId + statePath = msgQueueStatePath dir $ B.unpack (strEncode rId) + readQueueState ms statePath >>= \case + (Just MsgQueueState {readState = rs, writeState = ws, size}, _) + | size == 0 -> pure [] + | journalId rs == journalId ws -> do + let f = journalFilePath dir $ journalId rs + s <- B.readFile f + parseMsgs f $ B.take (bytePos' ws - bytePos' rs) $ B.drop (bytePos' rs) s + | otherwise -> do + let rf = journalFilePath dir $ journalId rs + wf = journalFilePath dir $ journalId ws + r <- B.readFile rf + w <- B.readFile wf + rMsgs <- parseMsgs rf $ B.take (fromIntegral $ byteCount rs) $ B.drop (bytePos' rs) r + wMsgs <- parseMsgs wf $ B.take (bytePos' ws) w + pure $ rMsgs ++ wMsgs + _ -> pure [] + where + bytePos' = fromIntegral . bytePos + parseMsgs f s = do + let (errs, msgs) = partitionEithers $ map strDecode $ B.lines s + unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f + pure msgs + encodeMessages :: RecipientId -> [Message] -> BLD.Builder encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n') diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index c5b262503..321dc76ff 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -35,7 +35,7 @@ import Simplex.Messaging.Crypto (pattern MaxLenBS) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Protocol (EntityId (..), LinkId, Message (..), QueueLinkData, RecipientId, SParty (..), noMsgFlags) import Simplex.Messaging.Server (exportMessages, importMessages, printMessageStats) -import Simplex.Messaging.Server.Env.STM (journalMsgStoreDepth, readWriteQueueStore) +import Simplex.Messaging.Server.Env.STM (MsgStore (..), journalMsgStoreDepth, readWriteQueueStore) import Simplex.Messaging.Server.Expiration (ExpirationConfig (..), expireBeforeEpoch) import Simplex.Messaging.Server.MsgStore.Journal import Simplex.Messaging.Server.MsgStore.STM @@ -55,7 +55,7 @@ msgStoreTests = do around (withMsgStore testSMTStoreConfig) $ describe "STM message store" someMsgStoreTests around (withMsgStore $ testJournalStoreCfg MQStoreCfg) $ describe "Journal message store" $ do someMsgStoreTests - it "should export and import journal store" testExportImportStore + fit "should export and import journal store" testExportImportStore describe "queue state" $ do it "should restore queue state from the last line" testQueueState it "should recover when message is written and state is not" testMessageState @@ -226,8 +226,8 @@ testExportImportStore ms = do pure () length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3 - exportMessages False ms testStoreMsgsFile False - testFastExport ms testStoreMsgsFile + exportMessages False (StoreJournal ms) testStoreMsgsFile False + -- testFastExport ms testStoreMsgsFile closeMsgStore ms closeStoreLog sl let cfg = (testJournalStoreCfg MQStoreCfg :: JournalStoreConfig 'QSMemory) {storePath = testStoreMsgsDir2} @@ -238,14 +238,14 @@ testExportImportStore ms = do printMessageStats "Messages" stats length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3 -- 2 message files - exportMessages False ms' testStoreMsgsFile2 False - testFastExport ms' testStoreMsgsFile2 + exportMessages False (StoreJournal ms') testStoreMsgsFile2 False + -- testFastExport ms' testStoreMsgsFile2 (B.readFile testStoreMsgsFile2 `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".bak") stmStore <- newMsgStore testSMTStoreConfig readWriteQueueStore True (mkQueue stmStore True) testStoreLogFile (queueStore stmStore) >>= closeStoreLog MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- importMessages False stmStore testStoreMsgsFile2 Nothing False - exportMessages False stmStore testStoreMsgsFile False + exportMessages False (StoreMemory stmStore) testStoreMsgsFile False (B.sort <$> B.readFile testStoreMsgsFile `shouldReturn`) =<< (B.sort <$> B.readFile (testStoreMsgsFile2 <> ".bak")) where testFastExport ms' f = do diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 043f11e31..ddb23d16c 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -42,7 +42,7 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (parseAll, parseString) import Simplex.Messaging.Protocol import Simplex.Messaging.Server (exportMessages) -import Simplex.Messaging.Server.Env.STM (AStoreType (..), ServerConfig (..), ServerStoreCfg (..), readWriteQueueStore) +import Simplex.Messaging.Server.Env.STM (AStoreType (..), MsgStore (..), ServerConfig (..), ServerStoreCfg (..), readWriteQueueStore) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..), QStoreCfg (..), stmQueueStore) import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SMSType (..), SQSType (..), newMsgStore) @@ -921,7 +921,7 @@ testRestoreExpireMessages = ms <- newMsgStore (testJournalStoreCfg MQStoreCfg) {quota = 4} readWriteQueueStore True (mkQueue ms True) testStoreLogFile (stmQueueStore ms) >>= closeStoreLog removeFileIfExists testStoreMsgsFile - exportMessages False ms testStoreMsgsFile False + exportMessages False (StoreJournal ms) testStoreMsgsFile False closeMsgStore ms runTest :: Transport c => TProxy c 'TServer -> (THandleSMP c 'TClient -> IO ()) -> ThreadId -> Expectation runTest _ test' server = do