smp server: skip invalid message(s) when reading from journal

This commit is contained in:
Evgeny Poberezkin
2025-04-22 21:57:38 +01:00
parent 2da730209f
commit d340df2153
@@ -559,9 +559,9 @@ instance MsgStoreClass (JournalMsgStore s) where
getMsg msgs hs = chooseReadJournal q' q drainMsgs hs >>= maybe (pure msgs) readMsg
where
readMsg (rs, h) = do
(msg, len) <- hGetMsgAt h $ bytePos rs
(msg_, len) <- hGetMsgAt h $ bytePos rs
updateReadPos q' q drainMsgs len hs
(msg :) <$> run msgs
maybe id (fmap . (:)) msg_ $ run msgs
writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do
@@ -620,15 +620,20 @@ instance MsgStoreClass (JournalMsgStore s) where
getQueueSize_ JournalMsgQueue {state} = StoreIO $ size <$> readTVarIO state
tryPeekMsg_ :: JournalQueue s -> JournalMsgQueue s -> StoreIO s (Maybe Message)
tryPeekMsg_ q mq@JournalMsgQueue {tipMsg, handles} =
StoreIO $ (readTVarIO handles $>>= chooseReadJournal q mq True $>>= peekMsg)
tryPeekMsg_ q mq@JournalMsgQueue {tipMsg, handles} = StoreIO go
where
peekMsg (rs, h) = readTVarIO tipMsg >>= maybe readMsg (pure . fmap fst)
go = readTVarIO handles $>>= \hs -> chooseReadJournal q mq True hs $>>= peekMsg hs
peekMsg hs (rs, h) = readTVarIO tipMsg >>= maybe readMsg (pure . fmap fst)
where
readMsg = do
ml@(msg, _) <- hGetMsgAt h $ bytePos rs
atomically $ writeTVar tipMsg $ Just (Just ml)
pure $ Just msg
(msg_, len) <- hGetMsgAt h $ bytePos rs
case msg_ of
Just msg -> do
atomically $ writeTVar tipMsg $ Just (Just (msg, len))
pure $ Just msg
Nothing -> do
updateReadPos q mq True len hs
go
tryDeleteMsg_ :: JournalQueue s -> JournalMsgQueue s -> Bool -> StoreIO s ()
tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $
@@ -984,15 +989,18 @@ hAppend h pos s = do
IO.hSeek h SeekFromEnd 0
B.hPutStr h s
hGetMsgAt :: Handle -> Int64 -> IO (Message, Int64)
hGetMsgAt :: Handle -> Int64 -> IO (Maybe Message, Int64)
hGetMsgAt h pos = do
IO.hSeek h AbsoluteSeek $ fromIntegral pos
s <- B.hGetLine h
case strDecode s of
Right !msg ->
let !len = fromIntegral (B.length s) + 1
in pure (msg, len)
Left e -> E.throwIO $ userError $ "hGetMsgAt invalid message: " <> e
let !len = fromIntegral (B.length s) + 1
msg_ <- case strDecode s of
Right !msg -> pure $ Just msg
Left e -> do
name <- IO.hShow h
logError $ "STORE: hGetMsgAt, " <> T.pack name <> ", invalid message at pos " <> tshow pos <> ": " <> tshow e
pure Nothing
pure (msg_, len)
openFile :: FilePath -> IOMode -> IO Handle
openFile f mode = do