From 1f67b403deff984c83231ac9e7d482adb66fe76b Mon Sep 17 00:00:00 2001 From: Evgeny Date: Wed, 25 Sep 2024 10:55:45 +0100 Subject: [PATCH] agent: function to get multiple work items (#1330) --- src/Simplex/Messaging/Agent/Client.hs | 28 +++++++++++++++++++- src/Simplex/Messaging/Agent/Store/SQLite.hs | 29 ++++++++++++++------- 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 6737a3155..88d1eeac2 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -115,6 +115,7 @@ module Simplex.Messaging.Agent.Client hasWorkToDo, hasWorkToDo', withWork, + withWorkItems, agentOperations, agentOperationBracket, waitUntilActive, @@ -185,7 +186,7 @@ import qualified Data.ByteString.Char8 as B import Data.Either (isRight, partitionEithers) import Data.Functor (($>)) import Data.Int (Int64) -import Data.List (deleteFirstsBy, foldl', partition, (\\)) +import Data.List (deleteFirstsBy, find, foldl', partition, (\\)) import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) @@ -1813,12 +1814,37 @@ withWork c doWork getWork action = withStore' c getWork >>= \case Right (Just r) -> action r Right Nothing -> noWork + -- worker is stopped here (noWork) because the next iteration is likely to produce the same result Left e@SEWorkItemError {} -> noWork >> notifyErr (CRITICAL False) e Left e -> notifyErr INTERNAL e where noWork = liftIO $ noWorkToDo doWork notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e) +withWorkItems :: AgentClient -> TMVar () -> (DB.Connection -> IO (Either StoreError [Either StoreError a])) -> (NonEmpty a -> AM ()) -> AM () +withWorkItems c doWork getWork action = do + withStore' c getWork >>= \case + Right rs -> do + let (errs, items) = partitionEithers rs + case L.nonEmpty items of + Just items' -> action items' + Nothing -> do + let criticalErr = find workItemError errs + forM_ criticalErr $ \ err -> do + notifyErr (CRITICAL False) err + when (all workItemError errs) noWork + unless (null errs) $ atomically $ + writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS $ map (\e -> ("", INTERNAL $ show e)) errs) + Left e + | workItemError e -> noWork >> notifyErr (CRITICAL False) e + | otherwise -> notifyErr INTERNAL e + where + workItemError = \case + SEWorkItemError {} -> True + _ -> False + noWork = liftIO $ noWorkToDo doWork + notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e) + noWorkToDo :: TMVar () -> IO () noWorkToDo = void . atomically . tryTakeTMVar {-# INLINE noWorkToDo #-} diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 55f1ab85e..c331c4d44 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -1063,17 +1063,26 @@ getPendingQueueMsg db connId SndQueue {dbQueueId} = getWorkItem :: Show i => ByteString -> IO (Maybe i) -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError (Maybe a)) getWorkItem itemName getId getItem markFailed = - runExceptT $ handleErr "getId" getId >>= mapM tryGetItem + runExceptT $ handleWrkErr itemName "getId" getId >>= mapM (tryGetItem itemName getItem markFailed) + +getWorkItems :: Show i => ByteString -> IO [i] -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError [Either StoreError a]) +getWorkItems itemName getIds getItem markFailed = + runExceptT $ handleWrkErr itemName "getIds" getIds >>= mapM (tryE . tryGetItem itemName getItem markFailed) + +tryGetItem :: Show i => ByteString -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> i -> ExceptT StoreError IO a +tryGetItem itemName getItem markFailed itemId = ExceptT (getItem itemId) `catchStoreError` \e -> mark >> throwE e where - tryGetItem itemId = ExceptT (getItem itemId) `catchStoreErrors` \e -> mark itemId >> throwE e - mark itemId = handleErr ("markFailed ID " <> bshow itemId) $ markFailed itemId - catchStoreErrors = catchAllErrors (SEInternal . bshow) - -- Errors caught by this function will suspend worker as if there is no more work, - handleErr :: ByteString -> IO a -> ExceptT StoreError IO a - handleErr opName action = ExceptT $ first mkError <$> E.try action - where - mkError :: E.SomeException -> StoreError - mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e + mark = handleWrkErr itemName ("markFailed ID " <> bshow itemId) $ markFailed itemId + +catchStoreError :: ExceptT StoreError IO a -> (StoreError -> ExceptT StoreError IO a) -> ExceptT StoreError IO a +catchStoreError = catchAllErrors (SEInternal . bshow) + +-- Errors caught by this function will suspend worker as if there is no more work, +handleWrkErr :: ByteString -> ByteString -> IO a -> ExceptT StoreError IO a +handleWrkErr itemName opName action = ExceptT $ first mkError <$> E.try action + where + mkError :: E.SomeException -> StoreError + mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO () updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} =