diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 71dbc0668..3f91f08ba 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -105,7 +105,7 @@ import Simplex.Messaging.Server.Control import Simplex.Messaging.Server.Env.STM as Env import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore -import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, getJournalQueueMessages) +import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, msgQueueDirectory) import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.NtfStore @@ -128,7 +128,7 @@ import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) import UnliftIO (timeout) import UnliftIO.Concurrent -import UnliftIO.Directory (doesFileExist, renameFile) +import UnliftIO.Directory (doesDirectoryExist, doesFileExist, renameFile) import UnliftIO.Exception import UnliftIO.IO import UnliftIO.STM @@ -2125,13 +2125,15 @@ exportMessages tty ms f drainMsgs = do StoreMemory s -> s StoreJournal s -> s getMessages q = case ms of - StoreMemory _ -> - unsafeRunStore q "saveQueueMsgs" $ - getQueueMessages_ drainMsgs q =<< getMsgQueue ms' q False + StoreMemory _ -> getMsgs StoreJournal _ -> - unsafeRunStore q "saveQueueMsgs" $ - getQueueMessages_ drainMsgs q =<< getMsgQueue ms' q False + ifM + (doesDirectoryExist $ msgQueueDirectory ms' $ recipientId q) + getMsgs + (pure []) -- getJournalQueueMessages ms' q + where + getMsgs = unsafeRunStore q "saveQueueMsgs" $ getQueueMessages_ drainMsgs q =<< getMsgQueue ms' q False saveQueueMsgs h q = do msgs <- getMessages q unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index bcc7a7e0c..3fca45c04 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -1064,31 +1064,25 @@ exportJournalMessages tty ms@JournalMsgStore {config} h = ifM (doesDirectoryExis readQueueState ms statePath >>= \case (Just MsgQueueState {readState = rs, writeState = ws, size}, _) | size == 0 -> pure [] - | journalId rs == journalId ws -> do - -- just one journal - let f = journalFilePath dir $ journalId rs - s <- B.readFile f - let (errs, msgs) = parseMsgs $ B.take (bytePos' ws - bytePos' rs) $ B.drop (bytePos' rs) s - unless (null errs) $ do - when tty $ putStrLn $ progress i' - logErr errs f - pure msgs - | otherwise -> do - let rf = journalFilePath dir $ journalId rs - wf = journalFilePath dir $ journalId ws - r <- B.readFile rf - w <- B.readFile wf - let (rErrs, rMsgs) = parseMsgs $ B.take (fromIntegral $ byteCount rs) $ B.drop (bytePos' rs) r - (wErrs, wMsgs) = parseMsgs $ B.take (bytePos' ws) w - unless (null rErrs && null wErrs) $ do - when tty $ putStrLn $ progress i' - unless (null rErrs) $ logErr rErrs rf - unless (null wErrs) $ logErr wErrs wf - pure $ rMsgs ++ wMsgs + | journalId rs == journalId ws -> + getMsgs rs (bytePos rs) (bytePos ws - bytePos rs) + | otherwise -> + (++) + <$> getMsgs rs (byteCount rs - bytePos rs) (bytePos rs) + <*> getMsgs ws (0 :: Int64) (bytePos ws) where - bytePos' = fromIntegral . bytePos - parseMsgs = partitionEithers . map strDecode . B.lines - logErr errs f = putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f + getMsgs :: JournalState t -> Int64 -> Int64 -> IO [Message] + getMsgs js pos sz + | sz > 0 = IO.withFile f ReadWriteMode $ \h' -> do + IO.hSeek h' AbsoluteSeek $ fromIntegral pos + parseMsgs f =<< B.hGet h' (fromIntegral sz) + | otherwise = pure [] + where + f = journalFilePath dir $ journalId js + parseMsgs f s = do + let (errs, msgs) = partitionEithers $ map (strDecode @Message) $ B.lines s + unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f + pure msgs _ -> pure [] unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages rId msgs pure (i', count + length msgs) diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 5ed0754ec..174ff9ab6 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -559,11 +559,16 @@ foldQueueRecs tty withData st skipOld_ f = do where foldRecs db acc f' = case skipOld_ of Nothing - | withData -> DB.fold_ db (queueRecQueryWithData <> " WHERE deleted_at IS NULL") acc $ \acc' -> f' acc' . rowToQueueRecWithData - | otherwise -> DB.fold_ db (queueRecQuery <> " WHERE deleted_at IS NULL") acc $ \acc' -> f' acc' . rowToQueueRec + | withData -> DB.fold_ db (queueRecQueryWithData <> cond) acc $ \acc' -> f' acc' . rowToQueueRecWithData + | otherwise -> DB.fold_ db (queueRecQuery <> cond) acc $ \acc' -> f' acc' . rowToQueueRec + where + cond = " WHERE deleted_at IS NULL ORDER BY recipient_id ASC" Just old - | withData -> DB.fold db (queueRecQueryWithData <> " WHERE deleted_at IS NULL AND updated_at > ?") (Only old) acc $ \acc' -> f' acc' . rowToQueueRecWithData - | otherwise -> DB.fold db (queueRecQuery <> " WHERE deleted_at IS NULL AND updated_at > ?") (Only old) acc $ \acc' -> f' acc' . rowToQueueRec + | withData -> DB.fold db (queueRecQueryWithData <> cond) (Only old) acc $ \acc' -> f' acc' . rowToQueueRecWithData + | otherwise -> DB.fold db (queueRecQuery <> cond) (Only old) acc $ \acc' -> f' acc' . rowToQueueRec + where + cond = " WHERE deleted_at IS NULL AND updated_at > ? ORDER BY recipient_id ASC" + orderBy = " ORDER BY recipient_id ASC" progress i = "Processed: " <> show i <> " records" queueRecQuery :: Query