mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-15 11:45:12 +00:00
agent: function to get multiple work items (#1330)
This commit is contained in:
@@ -115,6 +115,7 @@ module Simplex.Messaging.Agent.Client
|
||||
hasWorkToDo,
|
||||
hasWorkToDo',
|
||||
withWork,
|
||||
withWorkItems,
|
||||
agentOperations,
|
||||
agentOperationBracket,
|
||||
waitUntilActive,
|
||||
@@ -185,7 +186,7 @@ import qualified Data.ByteString.Char8 as B
|
||||
import Data.Either (isRight, partitionEithers)
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (deleteFirstsBy, foldl', partition, (\\))
|
||||
import Data.List (deleteFirstsBy, find, foldl', partition, (\\))
|
||||
import Data.List.NonEmpty (NonEmpty (..), (<|))
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Map.Strict (Map)
|
||||
@@ -1813,12 +1814,37 @@ withWork c doWork getWork action =
|
||||
withStore' c getWork >>= \case
|
||||
Right (Just r) -> action r
|
||||
Right Nothing -> noWork
|
||||
-- worker is stopped here (noWork) because the next iteration is likely to produce the same result
|
||||
Left e@SEWorkItemError {} -> noWork >> notifyErr (CRITICAL False) e
|
||||
Left e -> notifyErr INTERNAL e
|
||||
where
|
||||
noWork = liftIO $ noWorkToDo doWork
|
||||
notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e)
|
||||
|
||||
withWorkItems :: AgentClient -> TMVar () -> (DB.Connection -> IO (Either StoreError [Either StoreError a])) -> (NonEmpty a -> AM ()) -> AM ()
|
||||
withWorkItems c doWork getWork action = do
|
||||
withStore' c getWork >>= \case
|
||||
Right rs -> do
|
||||
let (errs, items) = partitionEithers rs
|
||||
case L.nonEmpty items of
|
||||
Just items' -> action items'
|
||||
Nothing -> do
|
||||
let criticalErr = find workItemError errs
|
||||
forM_ criticalErr $ \ err -> do
|
||||
notifyErr (CRITICAL False) err
|
||||
when (all workItemError errs) noWork
|
||||
unless (null errs) $ atomically $
|
||||
writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS $ map (\e -> ("", INTERNAL $ show e)) errs)
|
||||
Left e
|
||||
| workItemError e -> noWork >> notifyErr (CRITICAL False) e
|
||||
| otherwise -> notifyErr INTERNAL e
|
||||
where
|
||||
workItemError = \case
|
||||
SEWorkItemError {} -> True
|
||||
_ -> False
|
||||
noWork = liftIO $ noWorkToDo doWork
|
||||
notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e)
|
||||
|
||||
noWorkToDo :: TMVar () -> IO ()
|
||||
noWorkToDo = void . atomically . tryTakeTMVar
|
||||
{-# INLINE noWorkToDo #-}
|
||||
|
||||
@@ -1063,17 +1063,26 @@ getPendingQueueMsg db connId SndQueue {dbQueueId} =
|
||||
|
||||
getWorkItem :: Show i => ByteString -> IO (Maybe i) -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError (Maybe a))
|
||||
getWorkItem itemName getId getItem markFailed =
|
||||
runExceptT $ handleErr "getId" getId >>= mapM tryGetItem
|
||||
runExceptT $ handleWrkErr itemName "getId" getId >>= mapM (tryGetItem itemName getItem markFailed)
|
||||
|
||||
getWorkItems :: Show i => ByteString -> IO [i] -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError [Either StoreError a])
|
||||
getWorkItems itemName getIds getItem markFailed =
|
||||
runExceptT $ handleWrkErr itemName "getIds" getIds >>= mapM (tryE . tryGetItem itemName getItem markFailed)
|
||||
|
||||
tryGetItem :: Show i => ByteString -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> i -> ExceptT StoreError IO a
|
||||
tryGetItem itemName getItem markFailed itemId = ExceptT (getItem itemId) `catchStoreError` \e -> mark >> throwE e
|
||||
where
|
||||
tryGetItem itemId = ExceptT (getItem itemId) `catchStoreErrors` \e -> mark itemId >> throwE e
|
||||
mark itemId = handleErr ("markFailed ID " <> bshow itemId) $ markFailed itemId
|
||||
catchStoreErrors = catchAllErrors (SEInternal . bshow)
|
||||
-- Errors caught by this function will suspend worker as if there is no more work,
|
||||
handleErr :: ByteString -> IO a -> ExceptT StoreError IO a
|
||||
handleErr opName action = ExceptT $ first mkError <$> E.try action
|
||||
where
|
||||
mkError :: E.SomeException -> StoreError
|
||||
mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e
|
||||
mark = handleWrkErr itemName ("markFailed ID " <> bshow itemId) $ markFailed itemId
|
||||
|
||||
catchStoreError :: ExceptT StoreError IO a -> (StoreError -> ExceptT StoreError IO a) -> ExceptT StoreError IO a
|
||||
catchStoreError = catchAllErrors (SEInternal . bshow)
|
||||
|
||||
-- Errors caught by this function will suspend worker as if there is no more work,
|
||||
handleWrkErr :: ByteString -> ByteString -> IO a -> ExceptT StoreError IO a
|
||||
handleWrkErr itemName opName action = ExceptT $ first mkError <$> E.try action
|
||||
where
|
||||
mkError :: E.SomeException -> StoreError
|
||||
mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e
|
||||
|
||||
updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO ()
|
||||
updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} =
|
||||
|
||||
Reference in New Issue
Block a user