This commit is contained in:
Evgeny Poberezkin
2025-09-09 15:55:58 +01:00
parent bcd4f1b447
commit bedea07379
3 changed files with 36 additions and 35 deletions
+9 -7
View File
@@ -105,7 +105,7 @@ import Simplex.Messaging.Server.Control
import Simplex.Messaging.Server.Env.STM as Env
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.MsgStore
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, getJournalQueueMessages)
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, msgQueueDirectory)
import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.NtfStore
@@ -128,7 +128,7 @@ import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode)
import System.Mem.Weak (deRefWeak)
import UnliftIO (timeout)
import UnliftIO.Concurrent
import UnliftIO.Directory (doesFileExist, renameFile)
import UnliftIO.Directory (doesDirectoryExist, doesFileExist, renameFile)
import UnliftIO.Exception
import UnliftIO.IO
import UnliftIO.STM
@@ -2125,13 +2125,15 @@ exportMessages tty ms f drainMsgs = do
StoreMemory s -> s
StoreJournal s -> s
getMessages q = case ms of
StoreMemory _ ->
unsafeRunStore q "saveQueueMsgs" $
getQueueMessages_ drainMsgs q =<< getMsgQueue ms' q False
StoreMemory _ -> getMsgs
StoreJournal _ ->
unsafeRunStore q "saveQueueMsgs" $
getQueueMessages_ drainMsgs q =<< getMsgQueue ms' q False
ifM
(doesDirectoryExist $ msgQueueDirectory ms' $ recipientId q)
getMsgs
(pure [])
-- getJournalQueueMessages ms' q
where
getMsgs = unsafeRunStore q "saveQueueMsgs" $ getQueueMessages_ drainMsgs q =<< getMsgQueue ms' q False
saveQueueMsgs h q = do
msgs <- getMessages q
unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs
@@ -1064,31 +1064,25 @@ exportJournalMessages tty ms@JournalMsgStore {config} h = ifM (doesDirectoryExis
readQueueState ms statePath >>= \case
(Just MsgQueueState {readState = rs, writeState = ws, size}, _)
| size == 0 -> pure []
| journalId rs == journalId ws -> do
-- just one journal
let f = journalFilePath dir $ journalId rs
s <- B.readFile f
let (errs, msgs) = parseMsgs $ B.take (bytePos' ws - bytePos' rs) $ B.drop (bytePos' rs) s
unless (null errs) $ do
when tty $ putStrLn $ progress i'
logErr errs f
pure msgs
| otherwise -> do
let rf = journalFilePath dir $ journalId rs
wf = journalFilePath dir $ journalId ws
r <- B.readFile rf
w <- B.readFile wf
let (rErrs, rMsgs) = parseMsgs $ B.take (fromIntegral $ byteCount rs) $ B.drop (bytePos' rs) r
(wErrs, wMsgs) = parseMsgs $ B.take (bytePos' ws) w
unless (null rErrs && null wErrs) $ do
when tty $ putStrLn $ progress i'
unless (null rErrs) $ logErr rErrs rf
unless (null wErrs) $ logErr wErrs wf
pure $ rMsgs ++ wMsgs
| journalId rs == journalId ws ->
getMsgs rs (bytePos rs) (bytePos ws - bytePos rs)
| otherwise ->
(++)
<$> getMsgs rs (byteCount rs - bytePos rs) (bytePos rs)
<*> getMsgs ws (0 :: Int64) (bytePos ws)
where
bytePos' = fromIntegral . bytePos
parseMsgs = partitionEithers . map strDecode . B.lines
logErr errs f = putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f
getMsgs :: JournalState t -> Int64 -> Int64 -> IO [Message]
getMsgs js pos sz
| sz > 0 = IO.withFile f ReadWriteMode $ \h' -> do
IO.hSeek h' AbsoluteSeek $ fromIntegral pos
parseMsgs f =<< B.hGet h' (fromIntegral sz)
| otherwise = pure []
where
f = journalFilePath dir $ journalId js
parseMsgs f s = do
let (errs, msgs) = partitionEithers $ map (strDecode @Message) $ B.lines s
unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f
pure msgs
_ -> pure []
unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages rId msgs
pure (i', count + length msgs)
@@ -559,11 +559,16 @@ foldQueueRecs tty withData st skipOld_ f = do
where
foldRecs db acc f' = case skipOld_ of
Nothing
| withData -> DB.fold_ db (queueRecQueryWithData <> " WHERE deleted_at IS NULL") acc $ \acc' -> f' acc' . rowToQueueRecWithData
| otherwise -> DB.fold_ db (queueRecQuery <> " WHERE deleted_at IS NULL") acc $ \acc' -> f' acc' . rowToQueueRec
| withData -> DB.fold_ db (queueRecQueryWithData <> cond) acc $ \acc' -> f' acc' . rowToQueueRecWithData
| otherwise -> DB.fold_ db (queueRecQuery <> cond) acc $ \acc' -> f' acc' . rowToQueueRec
where
cond = " WHERE deleted_at IS NULL ORDER BY recipient_id ASC"
Just old
| withData -> DB.fold db (queueRecQueryWithData <> " WHERE deleted_at IS NULL AND updated_at > ?") (Only old) acc $ \acc' -> f' acc' . rowToQueueRecWithData
| otherwise -> DB.fold db (queueRecQuery <> " WHERE deleted_at IS NULL AND updated_at > ?") (Only old) acc $ \acc' -> f' acc' . rowToQueueRec
| withData -> DB.fold db (queueRecQueryWithData <> cond) (Only old) acc $ \acc' -> f' acc' . rowToQueueRecWithData
| otherwise -> DB.fold db (queueRecQuery <> cond) (Only old) acc $ \acc' -> f' acc' . rowToQueueRec
where
cond = " WHERE deleted_at IS NULL AND updated_at > ? ORDER BY recipient_id ASC"
orderBy = " ORDER BY recipient_id ASC"
progress i = "Processed: " <> show i <> " records"
queueRecQuery :: Query