From a4f049d8da1021ee76fbd1b25635d16aef24154a Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Mon, 8 Sep 2025 15:38:08 +0000 Subject: [PATCH] agent: parameterize withWork, getWorkItem with StoreError; parameterized Binary for SQLite (#1617) * agent: parameterize withWork StoreError * getWorkItem * export * binary * remove handleWrkErr AnyStoreError constraint * put AnyError in AnyStoreError constraint * move typeclass --------- Co-authored-by: Evgeny Poberezkin --- src/Simplex/Messaging/Agent/Client.hs | 18 ++++++++---------- src/Simplex/Messaging/Agent/Store.hs | 12 +++++++++++- .../Messaging/Agent/Store/AgentStore.hs | 16 +++++++++------- src/Simplex/Messaging/Agent/Store/SQLite/DB.hs | 2 +- 4 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 4a1a1c40e..21c0436ee 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1974,19 +1974,20 @@ withWork :: AgentClient -> TMVar () -> (DB.Connection -> IO (Either StoreError ( withWork c doWork = withWork_ c doWork . withStore' c {-# INLINE withWork #-} -withWork_ :: MonadIO m => AgentClient -> TMVar () -> ExceptT e m (Either StoreError (Maybe a)) -> (a -> ExceptT e m ()) -> ExceptT e m () +withWork_ :: (AnyStoreError e', MonadIO m) => AgentClient -> TMVar () -> ExceptT e m (Either e' (Maybe a)) -> (a -> ExceptT e m ()) -> ExceptT e m () withWork_ c doWork getWork action = getWork >>= \case Right (Just r) -> action r Right Nothing -> noWork -- worker is stopped here (noWork) because the next iteration is likely to produce the same result - Left e@SEWorkItemError {} -> noWork >> notifyErr (CRITICAL False) e - Left e -> notifyErr INTERNAL e + Left e + | isWorkItemError e -> noWork >> notifyErr (CRITICAL False) e + | otherwise -> notifyErr INTERNAL e where noWork = liftIO $ noWorkToDo doWork notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e) -withWorkItems :: MonadIO m => AgentClient -> TMVar () -> ExceptT e m (Either StoreError [Either StoreError a]) -> (NonEmpty a -> ExceptT e m ()) -> ExceptT e m () +withWorkItems :: (AnyStoreError e', MonadIO m) => AgentClient -> TMVar () -> ExceptT e m (Either e' [Either e' a]) -> (NonEmpty a -> ExceptT e m ()) -> ExceptT e m () withWorkItems c doWork getWork action = do getWork >>= \case Right [] -> noWork @@ -1995,20 +1996,17 @@ withWorkItems c doWork getWork action = do case L.nonEmpty items of Just items' -> action items' Nothing -> do - let criticalErr = find workItemError errs + let criticalErr = find isWorkItemError errs forM_ criticalErr $ \err -> do notifyErr (CRITICAL False) err - when (all workItemError errs) noWork + when (all isWorkItemError errs) noWork unless (null errs) $ atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS $ map (\e -> ("", INTERNAL $ show e)) errs) Left e - | workItemError e -> noWork >> notifyErr (CRITICAL False) e + | isWorkItemError e -> noWork >> notifyErr (CRITICAL False) e | otherwise -> notifyErr INTERNAL e where - workItemError = \case - SEWorkItemError {} -> True - _ -> False noWork = liftIO $ noWorkToDo doWork notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e) diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 8dee07037..7a1fc391d 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -693,10 +693,20 @@ data StoreError | -- | XFTP Deleted snd chunk replica not found. SEDeletedSndChunkReplicaNotFound | -- | Error when reading work item that suspends worker - do not use! - SEWorkItemError ByteString + SEWorkItemError {errContext :: String} | -- | Servers stats not found. SEServersStatsNotFound deriving (Eq, Show, Exception) instance AnyError StoreError where fromSomeException = SEInternal . bshow + +class (Show e, AnyError e) => AnyStoreError e where + isWorkItemError :: e -> Bool + mkWorkItemError :: String -> e + +instance AnyStoreError StoreError where + isWorkItemError = \case + SEWorkItemError {} -> True + _ -> False + mkWorkItemError errContext = SEWorkItemError {errContext} diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 59175e942..fef829c66 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -237,6 +237,8 @@ module Simplex.Messaging.Agent.Store.AgentStore firstRow', maybeFirstRow, fromOnlyBI, + getWorkItem, + getWorkItems, ) where @@ -966,25 +968,25 @@ getPendingQueueMsg db connId SndQueue {dbQueueId} = _ -> Left $ SEInternal "unexpected snd msg data" markMsgFailed msgId = DB.execute db "UPDATE snd_message_deliveries SET failed = 1 WHERE conn_id = ? AND internal_id = ?" (connId, msgId) -getWorkItem :: Show i => ByteString -> IO (Maybe i) -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError (Maybe a)) +getWorkItem :: (Show i, AnyStoreError e) => String -> IO (Maybe i) -> (i -> IO (Either e a)) -> (i -> IO ()) -> IO (Either e (Maybe a)) getWorkItem itemName getId getItem markFailed = runExceptT $ handleWrkErr itemName "getId" getId >>= mapM (tryGetItem itemName getItem markFailed) -getWorkItems :: Show i => ByteString -> IO [i] -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError [Either StoreError a]) +getWorkItems :: (Show i, AnyStoreError e) => String -> IO [i] -> (i -> IO (Either e a)) -> (i -> IO ()) -> IO (Either e [Either e a]) getWorkItems itemName getIds getItem markFailed = runExceptT $ handleWrkErr itemName "getIds" getIds >>= mapM (tryE . tryGetItem itemName getItem markFailed) -tryGetItem :: Show i => ByteString -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> i -> ExceptT StoreError IO a +tryGetItem :: (Show i, AnyStoreError e) => String -> (i -> IO (Either e a)) -> (i -> IO ()) -> i -> ExceptT e IO a tryGetItem itemName getItem markFailed itemId = ExceptT (getItem itemId) `catchAllErrors` \e -> mark >> throwE e where - mark = handleWrkErr itemName ("markFailed ID " <> bshow itemId) $ markFailed itemId + mark = handleWrkErr itemName ("markFailed ID " <> show itemId) $ markFailed itemId -- Errors caught by this function will suspend worker as if there is no more work, -handleWrkErr :: ByteString -> ByteString -> IO a -> ExceptT StoreError IO a +handleWrkErr :: forall e a. AnyStoreError e => String -> String -> IO a -> ExceptT e IO a handleWrkErr itemName opName action = ExceptT $ first mkError <$> E.try action where - mkError :: E.SomeException -> StoreError - mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e + mkError :: E.SomeException -> e + mkError e = mkWorkItemError $ itemName <> " " <> opName <> " error: " <> show e updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO () updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} = diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/DB.hs b/src/Simplex/Messaging/Agent/Store/SQLite/DB.hs index 7da6b2ca2..2620e561b 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/DB.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/DB.hs @@ -52,7 +52,7 @@ import Simplex.Messaging.Util (diffToMicroseconds, tshow) newtype BoolInt = BI {unBI :: Bool} deriving newtype (FromField, ToField) -newtype Binary = Binary {fromBinary :: ByteString} +newtype Binary a = Binary {fromBinary :: a} deriving newtype (FromField, ToField) data Connection = Connection