From a8eab1f7e33e8d78c0a352938db3c68efac218c4 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Tue, 9 Sep 2025 22:53:42 +0100 Subject: [PATCH] concurrent read/write --- src/Simplex/Messaging/Server.hs | 27 +++++++++++++++++++-------- tests/CoreTests/MsgStoreTests.hs | 2 +- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 61748dd5b..02455846c 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -44,6 +44,7 @@ module Simplex.Messaging.Server ) where +import Control.Concurrent.Async (concurrently) import Control.Concurrent.STM (throwSTM) import Control.Logger.Simple import Control.Monad @@ -74,7 +75,6 @@ import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Maybe (fromMaybe, isJust, isNothing) -import Data.Semigroup (Sum (..)) import qualified Data.Set as S import Data.Text (Text) import qualified Data.Text as T @@ -2114,10 +2114,22 @@ exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath -> exportMessages tty st f drainMsgs = do logNote $ "saving messages to file " <> T.pack f run $ case st of - StoreMemory ms -> exportMessages_ ms $ getMsgs ms - StoreJournal ms -> exportMessages_ ms $ getJournalMsgs ms + StoreMemory ms -> exportMessages_ getMsgs ms + StoreJournal ms -> exportMessages_ getJournalMsgs ms where - exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty ms . saveQueueMsgs get + exportMessages_ get ms h = do + saveQ <- newTBQueueIO 100 + let readMsgs = do + unsafeWithAllMsgQueues tty ms $ \q -> + get ms q >>= \msgs -> unless (null msgs) $ atomically $ writeTBQueue saveQ $ Just (q, msgs) + atomically $ writeTBQueue saveQ Nothing + writeMsgs n = + atomically (readTBQueue saveQ) >>= \case + Nothing -> pure n + Just (q, msgs) -> do + n' <- saveQueueMsgs h q msgs + writeMsgs $ n + n' + snd <$> concurrently readMsgs (writeMsgs 0) run :: (Handle -> IO Int) -> IO () run a = liftIO $ withFile f WriteMode $ tryAny . a >=> \case Right n -> logNote $ "messages saved: " <> tshow n @@ -2130,11 +2142,10 @@ exportMessages tty st f drainMsgs = do Nothing -> getJournalQueueMessages ms q getMsgs :: MsgStoreClass s' => s' -> StoreQueue s' -> IO [Message] getMsgs ms q = unsafeRunStore q "saveQueueMsgs" $ getQueueMessages_ drainMsgs q =<< getMsgQueue ms q False - saveQueueMsgs :: (StoreQueue s -> IO [Message]) -> Handle -> StoreQueue s -> IO (Sum Int) - saveQueueMsgs get h q = do - msgs <- get q + saveQueueMsgs :: Handle -> StoreQueue s -> [Message] -> IO Int + saveQueueMsgs h q msgs = do unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs - pure $ Sum $ length msgs + pure $ length msgs encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n') processServerMessages :: forall s'. StartOptions -> M s' (Maybe MessageStats) diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 2ed0da330..c5cf3a437 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -54,7 +54,7 @@ msgStoreTests = do around (withMsgStore testSMTStoreConfig) $ describe "STM message store" someMsgStoreTests around (withMsgStore $ testJournalStoreCfg MQStoreCfg) $ describe "Journal message store" $ do someMsgStoreTests - it "should export and import journal store" testExportImportStore + fit "should export and import journal store" testExportImportStore describe "queue state" $ do it "should restore queue state from the last line" testQueueState it "should recover when message is written and state is not" testMessageState