diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 95f1e8b5b..61748dd5b 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -126,7 +126,7 @@ import System.Environment (lookupEnv) import System.Exit (exitFailure, exitSuccess) import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) -import UnliftIO (mapConcurrently, timeout) +import UnliftIO (timeout) import UnliftIO.Concurrent import UnliftIO.Directory (doesFileExist, renameFile) import UnliftIO.Exception @@ -2114,32 +2114,10 @@ exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath -> exportMessages tty st f drainMsgs = do logNote $ "saving messages to file " <> T.pack f run $ case st of - StoreMemory ms -> exportMessages_ getMsgs ms - StoreJournal ms -> exportMessages_' getJournalMsgs ms + StoreMemory ms -> exportMessages_ ms $ getMsgs ms + StoreJournal ms -> exportMessages_ ms $ getJournalMsgs ms where - exportMessages_ get ms h = - fmap (\(Sum n) -> n) $ unsafeWithAllMsgQueues tty ms $ \q -> - get ms q >>= saveQueueMsgs h q - exportMessages_' get ms h = do - qv <- newIORef [] - Sum n <- unsafeWithAllMsgQueues tty ms $ \q -> do - qs <- (q :) <$> readIORef qv - if length qs < 8 - then writeIORef qv qs $> Sum 0 - else do - writeIORef qv [] - saveQueues qs - qs <- readIORef qv - if null qs - then pure n - else do - Sum n' <- saveQueues qs - pure $ n + n' - where - saveQueues qs = do - let qs' = reverse qs - qMsgs <- zip qs' <$> mapConcurrently (get ms) qs' - mconcat <$> mapM (uncurry $ saveQueueMsgs h) qMsgs + exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty ms . saveQueueMsgs get run :: (Handle -> IO Int) -> IO () run a = liftIO $ withFile f WriteMode $ tryAny . a >=> \case Right n -> logNote $ "messages saved: " <> tshow n @@ -2152,8 +2130,9 @@ exportMessages tty st f drainMsgs = do Nothing -> getJournalQueueMessages ms q getMsgs :: MsgStoreClass s' => s' -> StoreQueue s' -> IO [Message] getMsgs ms q = unsafeRunStore q "saveQueueMsgs" $ getQueueMessages_ drainMsgs q =<< getMsgQueue ms q False - saveQueueMsgs :: Handle -> StoreQueue s -> [Message] -> IO (Sum Int) - saveQueueMsgs h q msgs = do + saveQueueMsgs :: (StoreQueue s -> IO [Message]) -> Handle -> StoreQueue s -> IO (Sum Int) + saveQueueMsgs get h q = do + msgs <- get q unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs pure $ Sum $ length msgs encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n') diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index c5cf3a437..2ed0da330 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -54,7 +54,7 @@ msgStoreTests = do around (withMsgStore testSMTStoreConfig) $ describe "STM message store" someMsgStoreTests around (withMsgStore $ testJournalStoreCfg MQStoreCfg) $ describe "Journal message store" $ do someMsgStoreTests - fit "should export and import journal store" testExportImportStore + it "should export and import journal store" testExportImportStore describe "queue state" $ do it "should restore queue state from the last line" testQueueState it "should recover when message is written and state is not" testMessageState