Revert "concurrent read messages"

This reverts commit 05a32e6491.
This commit is contained in:
Evgeny Poberezkin
2025-09-09 22:27:02 +01:00
parent 05a32e6491
commit d82308e320
2 changed files with 8 additions and 29 deletions
+7 -28
View File
@@ -126,7 +126,7 @@ import System.Environment (lookupEnv)
import System.Exit (exitFailure, exitSuccess)
import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode)
import System.Mem.Weak (deRefWeak)
import UnliftIO (mapConcurrently, timeout)
import UnliftIO (timeout)
import UnliftIO.Concurrent
import UnliftIO.Directory (doesFileExist, renameFile)
import UnliftIO.Exception
@@ -2114,32 +2114,10 @@ exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath ->
exportMessages tty st f drainMsgs = do
logNote $ "saving messages to file " <> T.pack f
run $ case st of
StoreMemory ms -> exportMessages_ getMsgs ms
StoreJournal ms -> exportMessages_' getJournalMsgs ms
StoreMemory ms -> exportMessages_ ms $ getMsgs ms
StoreJournal ms -> exportMessages_ ms $ getJournalMsgs ms
where
exportMessages_ get ms h =
fmap (\(Sum n) -> n) $ unsafeWithAllMsgQueues tty ms $ \q ->
get ms q >>= saveQueueMsgs h q
exportMessages_' get ms h = do
qv <- newIORef []
Sum n <- unsafeWithAllMsgQueues tty ms $ \q -> do
qs <- (q :) <$> readIORef qv
if length qs < 8
then writeIORef qv qs $> Sum 0
else do
writeIORef qv []
saveQueues qs
qs <- readIORef qv
if null qs
then pure n
else do
Sum n' <- saveQueues qs
pure $ n + n'
where
saveQueues qs = do
let qs' = reverse qs
qMsgs <- zip qs' <$> mapConcurrently (get ms) qs'
mconcat <$> mapM (uncurry $ saveQueueMsgs h) qMsgs
exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty ms . saveQueueMsgs get
run :: (Handle -> IO Int) -> IO ()
run a = liftIO $ withFile f WriteMode $ tryAny . a >=> \case
Right n -> logNote $ "messages saved: " <> tshow n
@@ -2152,8 +2130,9 @@ exportMessages tty st f drainMsgs = do
Nothing -> getJournalQueueMessages ms q
getMsgs :: MsgStoreClass s' => s' -> StoreQueue s' -> IO [Message]
getMsgs ms q = unsafeRunStore q "saveQueueMsgs" $ getQueueMessages_ drainMsgs q =<< getMsgQueue ms q False
saveQueueMsgs :: Handle -> StoreQueue s -> [Message] -> IO (Sum Int)
saveQueueMsgs h q msgs = do
saveQueueMsgs :: (StoreQueue s -> IO [Message]) -> Handle -> StoreQueue s -> IO (Sum Int)
saveQueueMsgs get h q = do
msgs <- get q
unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs
pure $ Sum $ length msgs
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')
+1 -1
View File
@@ -54,7 +54,7 @@ msgStoreTests = do
around (withMsgStore testSMTStoreConfig) $ describe "STM message store" someMsgStoreTests
around (withMsgStore $ testJournalStoreCfg MQStoreCfg) $ describe "Journal message store" $ do
someMsgStoreTests
fit "should export and import journal store" testExportImportStore
it "should export and import journal store" testExportImportStore
describe "queue state" $ do
it "should restore queue state from the last line" testQueueState
it "should recover when message is written and state is not" testMessageState