diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 82eb13d1d..89b181748 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -578,20 +578,13 @@ instance MsgStoreClass (JournalMsgStore s) where runFast MsgQueueState {writeState = ws, readState = rs, size} | size > 0 = readTVarIO (handles q) >>= \case - Just (MsgQueueHandles _ rh wh_) -> case wh_ of - Just wh -> (++) <$> getAllMsgs rh (bytePos rs) (byteCount rs - bytePos rs) <*> getAllMsgs wh 0 (bytePos ws) - Nothing -> getAllMsgs rh (bytePos rs) (bytePos ws - bytePos rs) + Just (MsgQueueHandles _ rh wh_) -> do + msgs <- getJournalRange rh (bytePos rs) (byteCount rs) + case wh_ of + Just wh -> (msgs ++) <$> getJournalRange wh 0 (bytePos ws) + Nothing -> pure msgs Nothing -> pure [] | otherwise = pure [] - getAllMsgs h pos sz - | sz > 0 = do - IO.hSeek h AbsoluteSeek $ fromIntegral pos - parseMsgs =<< B.hGet h (fromIntegral sz) - | otherwise = pure [] - parseMsgs s = do - let (errs, msgs) = partitionEithers $ map strDecode $ B.lines s - unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> B.unpack (strEncode $ recipientId' q') - pure msgs writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do @@ -1042,26 +1035,29 @@ closeOnException h a = a `E.onException` hClose h getJournalQueueMessages :: JournalMsgStore s -> JournalQueue s -> IO [Message] getJournalQueueMessages ms q = readQueueState ms (msgQueueStatePath dir rId) >>= \case - (Just MsgQueueState {readState = rs, writeState = ws, size}, _) - | size == 0 -> pure [] - | otherwise -> do - msgs <- getMsgs rs (bytePos rs) (byteCount rs - bytePos rs) - if journalId rs == journalId ws - then pure msgs - else (msgs ++) <$> getMsgs ws (0 :: Int64) (bytePos ws) + (Just MsgQueueState {readState = rs, writeState = ws, size}, _) | size > 0 -> do + msgs <- getMsgs (journalId rs) (bytePos rs) (byteCount rs) + if journalId rs == journalId ws + then pure msgs + else (msgs ++) <$> getMsgs (journalId ws) 0 (bytePos ws) _ -> pure [] where rId = recipientId' q dir = msgQueueDirectory ms rId - getMsgs :: JournalState t -> Int64 -> Int64 -> IO [Message] - getMsgs js pos sz - | sz > 0 = IO.withFile f ReadWriteMode $ \h' -> do - IO.hSeek h' AbsoluteSeek $ fromIntegral pos - parseMsgs =<< B.hGet h' (fromIntegral sz) - | otherwise = pure [] - where - f = journalFilePath dir $ journalId js - parseMsgs s = do - let (errs, msgs) = partitionEithers $ map (strDecode @Message) $ B.lines s - unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f - pure msgs + getMsgs jId from to = + IO.withFile (journalFilePath dir jId) ReadWriteMode $ \h' -> + getJournalRange h' from to + +getJournalRange :: Handle -> Int64 -> Int64 -> IO [Message] +getJournalRange h from to + | to > from = do + IO.hSeek h AbsoluteSeek $ fromIntegral from + parseMsgs =<< B.hGet h (fromIntegral $ to - from) + | otherwise = pure [] + where + parseMsgs s = do + let (errs, msgs) = partitionEithers $ map strDecode $ B.lines s + unless (null errs) $ do + f <- IO.hShow h + putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f + pure msgs