agent: parameterize withWork, getWorkItem with StoreError; parameterized Binary for SQLite (#1617)

* agent: parameterize withWork StoreError

* getWorkItem

* export

* binary

* remove handleWrkErr AnyStoreError constraint

* put AnyError in AnyStoreError constraint

* move typeclass

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
spaced4ndy
2025-09-08 15:38:08 +00:00
committed by GitHub
parent 8fea15245a
commit a4f049d8da
4 changed files with 29 additions and 19 deletions
+8 -10
View File
@@ -1974,19 +1974,20 @@ withWork :: AgentClient -> TMVar () -> (DB.Connection -> IO (Either StoreError (
withWork c doWork = withWork_ c doWork . withStore' c
{-# INLINE withWork #-}
withWork_ :: MonadIO m => AgentClient -> TMVar () -> ExceptT e m (Either StoreError (Maybe a)) -> (a -> ExceptT e m ()) -> ExceptT e m ()
withWork_ :: (AnyStoreError e', MonadIO m) => AgentClient -> TMVar () -> ExceptT e m (Either e' (Maybe a)) -> (a -> ExceptT e m ()) -> ExceptT e m ()
withWork_ c doWork getWork action =
getWork >>= \case
Right (Just r) -> action r
Right Nothing -> noWork
-- worker is stopped here (noWork) because the next iteration is likely to produce the same result
Left e@SEWorkItemError {} -> noWork >> notifyErr (CRITICAL False) e
Left e -> notifyErr INTERNAL e
Left e
| isWorkItemError e -> noWork >> notifyErr (CRITICAL False) e
| otherwise -> notifyErr INTERNAL e
where
noWork = liftIO $ noWorkToDo doWork
notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e)
withWorkItems :: MonadIO m => AgentClient -> TMVar () -> ExceptT e m (Either StoreError [Either StoreError a]) -> (NonEmpty a -> ExceptT e m ()) -> ExceptT e m ()
withWorkItems :: (AnyStoreError e', MonadIO m) => AgentClient -> TMVar () -> ExceptT e m (Either e' [Either e' a]) -> (NonEmpty a -> ExceptT e m ()) -> ExceptT e m ()
withWorkItems c doWork getWork action = do
getWork >>= \case
Right [] -> noWork
@@ -1995,20 +1996,17 @@ withWorkItems c doWork getWork action = do
case L.nonEmpty items of
Just items' -> action items'
Nothing -> do
let criticalErr = find workItemError errs
let criticalErr = find isWorkItemError errs
forM_ criticalErr $ \err -> do
notifyErr (CRITICAL False) err
when (all workItemError errs) noWork
when (all isWorkItemError errs) noWork
unless (null errs) $
atomically $
writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS $ map (\e -> ("", INTERNAL $ show e)) errs)
Left e
| workItemError e -> noWork >> notifyErr (CRITICAL False) e
| isWorkItemError e -> noWork >> notifyErr (CRITICAL False) e
| otherwise -> notifyErr INTERNAL e
where
workItemError = \case
SEWorkItemError {} -> True
_ -> False
noWork = liftIO $ noWorkToDo doWork
notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e)
+11 -1
View File
@@ -693,10 +693,20 @@ data StoreError
| -- | XFTP Deleted snd chunk replica not found.
SEDeletedSndChunkReplicaNotFound
| -- | Error when reading work item that suspends worker - do not use!
SEWorkItemError ByteString
SEWorkItemError {errContext :: String}
| -- | Servers stats not found.
SEServersStatsNotFound
deriving (Eq, Show, Exception)
instance AnyError StoreError where
fromSomeException = SEInternal . bshow
class (Show e, AnyError e) => AnyStoreError e where
isWorkItemError :: e -> Bool
mkWorkItemError :: String -> e
instance AnyStoreError StoreError where
isWorkItemError = \case
SEWorkItemError {} -> True
_ -> False
mkWorkItemError errContext = SEWorkItemError {errContext}
@@ -237,6 +237,8 @@ module Simplex.Messaging.Agent.Store.AgentStore
firstRow',
maybeFirstRow,
fromOnlyBI,
getWorkItem,
getWorkItems,
)
where
@@ -966,25 +968,25 @@ getPendingQueueMsg db connId SndQueue {dbQueueId} =
_ -> Left $ SEInternal "unexpected snd msg data"
markMsgFailed msgId = DB.execute db "UPDATE snd_message_deliveries SET failed = 1 WHERE conn_id = ? AND internal_id = ?" (connId, msgId)
getWorkItem :: Show i => ByteString -> IO (Maybe i) -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError (Maybe a))
getWorkItem :: (Show i, AnyStoreError e) => String -> IO (Maybe i) -> (i -> IO (Either e a)) -> (i -> IO ()) -> IO (Either e (Maybe a))
getWorkItem itemName getId getItem markFailed =
runExceptT $ handleWrkErr itemName "getId" getId >>= mapM (tryGetItem itemName getItem markFailed)
getWorkItems :: Show i => ByteString -> IO [i] -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError [Either StoreError a])
getWorkItems :: (Show i, AnyStoreError e) => String -> IO [i] -> (i -> IO (Either e a)) -> (i -> IO ()) -> IO (Either e [Either e a])
getWorkItems itemName getIds getItem markFailed =
runExceptT $ handleWrkErr itemName "getIds" getIds >>= mapM (tryE . tryGetItem itemName getItem markFailed)
tryGetItem :: Show i => ByteString -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> i -> ExceptT StoreError IO a
tryGetItem :: (Show i, AnyStoreError e) => String -> (i -> IO (Either e a)) -> (i -> IO ()) -> i -> ExceptT e IO a
tryGetItem itemName getItem markFailed itemId = ExceptT (getItem itemId) `catchAllErrors` \e -> mark >> throwE e
where
mark = handleWrkErr itemName ("markFailed ID " <> bshow itemId) $ markFailed itemId
mark = handleWrkErr itemName ("markFailed ID " <> show itemId) $ markFailed itemId
-- Errors caught by this function will suspend worker as if there is no more work,
handleWrkErr :: ByteString -> ByteString -> IO a -> ExceptT StoreError IO a
handleWrkErr :: forall e a. AnyStoreError e => String -> String -> IO a -> ExceptT e IO a
handleWrkErr itemName opName action = ExceptT $ first mkError <$> E.try action
where
mkError :: E.SomeException -> StoreError
mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e
mkError :: E.SomeException -> e
mkError e = mkWorkItemError $ itemName <> " " <> opName <> " error: " <> show e
updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO ()
updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} =
@@ -52,7 +52,7 @@ import Simplex.Messaging.Util (diffToMicroseconds, tshow)
newtype BoolInt = BI {unBI :: Bool}
deriving newtype (FromField, ToField)
newtype Binary = Binary {fromBinary :: ByteString}
newtype Binary a = Binary {fromBinary :: a}
deriving newtype (FromField, ToField)
data Connection = Connection