mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 20:45:52 +00:00
agent: remove withStoreCtx (#1044)
This commit is contained in:
@@ -612,9 +612,9 @@ ackMessageAsync' c corrId connId msgId rcptInfo_ = do
|
||||
enqueueAck :: m ()
|
||||
enqueueAck = do
|
||||
let mId = InternalId msgId
|
||||
RcvMsg {msgType} <- withStoreCtx "ackMessageAsync': getRcvMsg" c $ \db -> getRcvMsg db connId mId
|
||||
RcvMsg {msgType} <- withStore c $ \db -> getRcvMsg db connId mId
|
||||
when (isJust rcptInfo_ && msgType /= AM_A_MSG_) $ throwError $ CMD PROHIBITED
|
||||
(RcvQueue {server}, _) <- withStoreCtx "ackMessageAsync': setMsgUserAck" c $ \db -> setMsgUserAck db connId mId
|
||||
(RcvQueue {server}, _) <- withStore c $ \db -> setMsgUserAck db connId mId
|
||||
enqueueCommand c corrId connId (Just server) . AClientCommand $ APC SAEConn $ ACK msgId rcptInfo_
|
||||
|
||||
deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> Bool -> ConnId -> m ()
|
||||
@@ -1367,13 +1367,13 @@ ackMessage' c connId msgId rcptInfo_ = withConnLock c connId "ackMessage" $ do
|
||||
ack :: m ()
|
||||
ack = do
|
||||
-- the stored message was delivered via a specific queue, the rest failed to decrypt and were already acknowledged
|
||||
(rq, srvMsgId) <- withStoreCtx "ackMessage': setMsgUserAck" c $ \db -> setMsgUserAck db connId $ InternalId msgId
|
||||
(rq, srvMsgId) <- withStore c $ \db -> setMsgUserAck db connId $ InternalId msgId
|
||||
ackQueueMessage c rq srvMsgId
|
||||
del :: m ()
|
||||
del = withStoreCtx' "ackMessage': deleteMsg" c $ \db -> deleteMsg db connId $ InternalId msgId
|
||||
del = withStore' c $ \db -> deleteMsg db connId $ InternalId msgId
|
||||
sendRcpt :: Connection 'CDuplex -> m ()
|
||||
sendRcpt (DuplexConnection cData@ConnData {connAgentVersion} _ sqs) = do
|
||||
msg@RcvMsg {msgType, msgReceipt} <- withStoreCtx "ackMessage': getRcvMsg" c $ \db -> getRcvMsg db connId $ InternalId msgId
|
||||
msg@RcvMsg {msgType, msgReceipt} <- withStore c $ \db -> getRcvMsg db connId $ InternalId msgId
|
||||
case rcptInfo_ of
|
||||
Just rcptInfo -> do
|
||||
unless (msgType == AM_A_MSG_) $ throwError (CMD PROHIBITED)
|
||||
@@ -1384,7 +1384,7 @@ ackMessage' c connId msgId rcptInfo_ = withConnLock c connId "ackMessage" $ do
|
||||
Nothing -> case (msgType, msgReceipt) of
|
||||
-- only remove sent message if receipt hash was Ok, both to debug and for future redundancy
|
||||
(AM_A_RCVD_, Just MsgReceipt {agentMsgId = sndMsgId, msgRcptStatus = MROk}) ->
|
||||
withStoreCtx' "ackMessage': deleteDeliveredSndMsg" c $ \db -> deleteDeliveredSndMsg db connId $ InternalId sndMsgId
|
||||
withStore' c $ \db -> deleteDeliveredSndMsg db connId $ InternalId sndMsgId
|
||||
_ -> pure ()
|
||||
|
||||
switchConnection' :: AgentMonad m => AgentClient -> ConnId -> m ConnectionStats
|
||||
@@ -2059,7 +2059,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
|
||||
| otherwise = pure conn'
|
||||
Right _ -> prohibited >> ack
|
||||
Left e@(AGENT A_DUPLICATE) -> do
|
||||
withStoreCtx' "processSMP: getLastMsg" c (\db -> getLastMsg db connId srvMsgId) >>= \case
|
||||
withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case
|
||||
Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck}
|
||||
| userAck -> ackDel internalId
|
||||
| otherwise -> do
|
||||
@@ -2266,7 +2266,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
|
||||
clientReceipt :: AMessageReceipt -> m (Maybe MsgReceipt)
|
||||
clientReceipt AMessageReceipt {agentMsgId, msgHash} = do
|
||||
let sndMsgId = InternalSndId agentMsgId
|
||||
SndMsg {internalId = InternalId msgId, msgType, internalHash, msgReceipt} <- withStoreCtx "messagesRcvd: getSndMsgViaRcpt" c $ \db -> getSndMsgViaRcpt db connId sndMsgId
|
||||
SndMsg {internalId = InternalId msgId, msgType, internalHash, msgReceipt} <- withStore c $ \db -> getSndMsgViaRcpt db connId sndMsgId
|
||||
if msgType /= AM_A_MSG_
|
||||
then notify (ERR $ AGENT A_PROHIBITED) $> Nothing -- unexpected message type for receipt
|
||||
else case msgReceipt of
|
||||
|
||||
@@ -110,8 +110,6 @@ module Simplex.Messaging.Agent.Client
|
||||
whenSuspending,
|
||||
withStore,
|
||||
withStore',
|
||||
withStoreCtx,
|
||||
withStoreCtx',
|
||||
withStoreBatch,
|
||||
withStoreBatch',
|
||||
storeError,
|
||||
@@ -1457,34 +1455,13 @@ waitUntilForeground :: AgentClient -> STM ()
|
||||
waitUntilForeground c = unlessM ((ASForeground ==) <$> readTVar (agentState c)) retry
|
||||
|
||||
withStore' :: AgentMonad m => AgentClient -> (DB.Connection -> IO a) -> m a
|
||||
withStore' = withStoreCtx_' Nothing
|
||||
withStore' c action = withStore c $ fmap Right . action
|
||||
|
||||
withStore :: AgentMonad m => AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> m a
|
||||
withStore = withStoreCtx_ Nothing
|
||||
|
||||
withStoreCtx' :: AgentMonad m => String -> AgentClient -> (DB.Connection -> IO a) -> m a
|
||||
withStoreCtx' = withStoreCtx_' . Just
|
||||
|
||||
withStoreCtx :: AgentMonad m => String -> AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> m a
|
||||
withStoreCtx = withStoreCtx_ . Just
|
||||
|
||||
withStoreCtx_' :: AgentMonad m => Maybe String -> AgentClient -> (DB.Connection -> IO a) -> m a
|
||||
withStoreCtx_' ctx_ c action = withStoreCtx_ ctx_ c $ fmap Right . action
|
||||
|
||||
withStoreCtx_ :: AgentMonad m => Maybe String -> AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> m a
|
||||
withStoreCtx_ ctx_ c action = do
|
||||
withStore c action = do
|
||||
st <- asks store
|
||||
liftEitherError storeError . agentOperationBracket c AODatabase (\_ -> pure ()) $ case ctx_ of
|
||||
Nothing -> withTransaction st action `E.catch` handleInternal ""
|
||||
-- uncomment to debug store performance
|
||||
-- Just ctx -> do
|
||||
-- t1 <- liftIO getCurrentTime
|
||||
-- putStrLn $ "agent withStoreCtx start :: " <> show t1 <> " :: " <> ctx
|
||||
-- r <- withTransaction st action `E.catch` handleInternal (" (" <> ctx <> ")")
|
||||
-- t2 <- liftIO getCurrentTime
|
||||
-- putStrLn $ "agent withStoreCtx end :: " <> show t2 <> " :: " <> ctx <> " :: duration=" <> show (diffToMilliseconds $ diffUTCTime t2 t1)
|
||||
-- pure r
|
||||
Just _ -> withTransaction st action `E.catch` handleInternal ""
|
||||
liftEitherError storeError . agentOperationBracket c AODatabase (\_ -> pure ()) $
|
||||
withTransaction st action `E.catch` handleInternal ""
|
||||
where
|
||||
handleInternal :: String -> E.SomeException -> IO (Either StoreError a)
|
||||
handleInternal ctxStr e = pure . Left . SEInternal . B.pack $ show e <> ctxStr
|
||||
|
||||
Reference in New Issue
Block a user