This commit is contained in:
Evgeny Poberezkin
2025-09-09 19:32:48 +01:00
parent a1f4de9ecc
commit 65393a303a
@@ -578,20 +578,13 @@ instance MsgStoreClass (JournalMsgStore s) where
runFast MsgQueueState {writeState = ws, readState = rs, size}
| size > 0 =
readTVarIO (handles q) >>= \case
Just (MsgQueueHandles _ rh wh_) -> case wh_ of
Just wh -> (++) <$> getAllMsgs rh (bytePos rs) (byteCount rs - bytePos rs) <*> getAllMsgs wh 0 (bytePos ws)
Nothing -> getAllMsgs rh (bytePos rs) (bytePos ws - bytePos rs)
Just (MsgQueueHandles _ rh wh_) -> do
msgs <- getJournalRange rh (bytePos rs) (byteCount rs)
case wh_ of
Just wh -> (msgs ++) <$> getJournalRange wh 0 (bytePos ws)
Nothing -> pure msgs
Nothing -> pure []
| otherwise = pure []
getAllMsgs h pos sz
| sz > 0 = do
IO.hSeek h AbsoluteSeek $ fromIntegral pos
parseMsgs =<< B.hGet h (fromIntegral sz)
| otherwise = pure []
parseMsgs s = do
let (errs, msgs) = partitionEithers $ map strDecode $ B.lines s
unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> B.unpack (strEncode $ recipientId' q')
pure msgs
writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do
@@ -1042,26 +1035,29 @@ closeOnException h a = a `E.onException` hClose h
getJournalQueueMessages :: JournalMsgStore s -> JournalQueue s -> IO [Message]
getJournalQueueMessages ms q =
readQueueState ms (msgQueueStatePath dir rId) >>= \case
(Just MsgQueueState {readState = rs, writeState = ws, size}, _)
| size == 0 -> pure []
| otherwise -> do
msgs <- getMsgs rs (bytePos rs) (byteCount rs - bytePos rs)
if journalId rs == journalId ws
then pure msgs
else (msgs ++) <$> getMsgs ws (0 :: Int64) (bytePos ws)
(Just MsgQueueState {readState = rs, writeState = ws, size}, _) | size > 0 -> do
msgs <- getMsgs (journalId rs) (bytePos rs) (byteCount rs)
if journalId rs == journalId ws
then pure msgs
else (msgs ++) <$> getMsgs (journalId ws) 0 (bytePos ws)
_ -> pure []
where
rId = recipientId' q
dir = msgQueueDirectory ms rId
getMsgs :: JournalState t -> Int64 -> Int64 -> IO [Message]
getMsgs js pos sz
| sz > 0 = IO.withFile f ReadWriteMode $ \h' -> do
IO.hSeek h' AbsoluteSeek $ fromIntegral pos
parseMsgs =<< B.hGet h' (fromIntegral sz)
| otherwise = pure []
where
f = journalFilePath dir $ journalId js
parseMsgs s = do
let (errs, msgs) = partitionEithers $ map (strDecode @Message) $ B.lines s
unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f
pure msgs
getMsgs jId from to =
IO.withFile (journalFilePath dir jId) ReadWriteMode $ \h' ->
getJournalRange h' from to
getJournalRange :: Handle -> Int64 -> Int64 -> IO [Message]
getJournalRange h from to
| to > from = do
IO.hSeek h AbsoluteSeek $ fromIntegral from
parseMsgs =<< B.hGet h (fromIntegral $ to - from)
| otherwise = pure []
where
parseMsgs s = do
let (errs, msgs) = partitionEithers $ map strDecode $ B.lines s
unless (null errs) $ do
f <- IO.hShow h
putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f
pure msgs