diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 9b2499c44..38d444b95 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -519,9 +519,9 @@ ackMessageAsync' c corrId connId msgId rcptInfo_ = do enqueueAck :: m () enqueueAck = do let mId = InternalId msgId - RcvMsg {msgType} <- withStore c $ \db -> getRcvMsg db connId mId + RcvMsg {msgType} <- withStoreCtx "ackMessageAsync': getRcvMsg" c $ \db -> getRcvMsg db connId mId when (isJust rcptInfo_ && msgType /= AM_A_MSG_) $ throwError $ CMD PROHIBITED - (RcvQueue {server}, _) <- withStore c $ \db -> setMsgUserAck db connId mId + (RcvQueue {server}, _) <- withStoreCtx "ackMessageAsync': setMsgUserAck" c $ \db -> setMsgUserAck db connId mId enqueueCommand c corrId connId (Just server) . AClientCommand $ APC SAEConn $ ACK msgId rcptInfo_ deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ConnId -> m () @@ -1239,13 +1239,13 @@ ackMessage' c connId msgId rcptInfo_ = withConnLock c connId "ackMessage" $ do ack :: m () ack = do -- the stored message was delivered via a specific queue, the rest failed to decrypt and were already acknowledged - (rq, srvMsgId) <- withStore c $ \db -> setMsgUserAck db connId $ InternalId msgId + (rq, srvMsgId) <- withStoreCtx "ackMessage': setMsgUserAck" c $ \db -> setMsgUserAck db connId $ InternalId msgId ackQueueMessage c rq srvMsgId del :: m () - del = withStore' c $ \db -> deleteMsg db connId $ InternalId msgId + del = withStoreCtx' "ackMessage': deleteMsg" c $ \db -> deleteMsg db connId $ InternalId msgId sendRcpt :: Connection 'CDuplex -> m () sendRcpt (DuplexConnection cData _ sqs) = do - msg@RcvMsg {msgType, msgReceipt} <- withStore c $ \db -> getRcvMsg db connId $ InternalId msgId + msg@RcvMsg {msgType, msgReceipt} <- withStoreCtx "ackMessage': getRcvMsg" c $ \db -> getRcvMsg db connId $ InternalId msgId case rcptInfo_ of Just rcptInfo -> do unless (msgType == AM_A_MSG_) $ throwError (CMD PROHIBITED) @@ -1256,7 +1256,7 @@ ackMessage' c connId msgId rcptInfo_ = withConnLock c connId "ackMessage" $ do Nothing -> case (msgType, msgReceipt) of -- only remove sent message if receipt hash was Ok, both to debug and for future redundancy (AM_A_RCVD_, Just MsgReceipt {agentMsgId = sndMsgId, msgRcptStatus = MROk}) -> - withStore' c $ \db -> deleteDeliveredSndMsg db connId $ InternalId sndMsgId + withStoreCtx' "ackMessage': deleteDeliveredSndMsg" c $ \db -> deleteDeliveredSndMsg db connId $ InternalId sndMsgId _ -> pure () switchConnection' :: AgentMonad m => AgentClient -> ConnId -> m ConnectionStats @@ -1900,7 +1900,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s | otherwise = pure conn' Right _ -> prohibited >> ack Left e@(AGENT A_DUPLICATE) -> do - withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case + withStoreCtx' "processSMP: getLastMsg" c (\db -> getLastMsg db connId srvMsgId) >>= \case Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck} | userAck -> ackDel internalId | otherwise -> do @@ -2112,7 +2112,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s clientReceipt :: AMessageReceipt -> m (Maybe MsgReceipt) clientReceipt AMessageReceipt {agentMsgId, msgHash} = do let sndMsgId = InternalSndId agentMsgId - SndMsg {internalId = InternalId msgId, msgType, internalHash, msgReceipt} <- withStore c $ \db -> getSndMsgViaRcpt db connId sndMsgId + SndMsg {internalId = InternalId msgId, msgType, internalHash, msgReceipt} <- withStoreCtx "messagesRcvd: getSndMsgViaRcpt" c $ \db -> getSndMsgViaRcpt db connId sndMsgId if msgType /= AM_A_MSG_ then notify (ERR $ AGENT A_PROHIBITED) $> Nothing -- unexpected message type for receipt else case msgReceipt of diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 1d33fa1bc..bc65b4a96 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -90,6 +90,8 @@ module Simplex.Messaging.Agent.Client whenSuspending, withStore, withStore', + withStoreCtx, + withStoreCtx', storeError, userServers, pickServer, @@ -127,6 +129,7 @@ import Data.Set (Set) import qualified Data.Set as S import Data.Text.Encoding import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime) +import Data.Time.Clock (diffUTCTime) import Data.Word (Word16) import qualified Database.SQLite.Simple as DB import GHC.Generics (Generic) @@ -1236,16 +1239,37 @@ waitUntilForeground :: AgentClient -> STM () waitUntilForeground c = unlessM ((ASForeground ==) <$> readTVar (agentState c)) retry withStore' :: AgentMonad m => AgentClient -> (DB.Connection -> IO a) -> m a -withStore' c action = withStore c $ fmap Right . action +withStore' = withStoreCtx_' Nothing withStore :: AgentMonad m => AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> m a -withStore c action = do +withStore = withStoreCtx_ Nothing + +withStoreCtx' :: AgentMonad m => String -> AgentClient -> (DB.Connection -> IO a) -> m a +withStoreCtx' = withStoreCtx_' . Just + +withStoreCtx :: AgentMonad m => String -> AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> m a +withStoreCtx = withStoreCtx_ . Just + +withStoreCtx_' :: AgentMonad m => Maybe String -> AgentClient -> (DB.Connection -> IO a) -> m a +withStoreCtx_' ctx_ c action = withStoreCtx_ ctx_ c $ fmap Right . action + +withStoreCtx_ :: AgentMonad m => Maybe String -> AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> m a +withStoreCtx_ ctx_ c action = do st <- asks store - liftEitherError storeError . agentOperationBracket c AODatabase (\_ -> pure ()) $ - withTransaction st action `E.catch` handleInternal + liftEitherError storeError . agentOperationBracket c AODatabase (\_ -> pure ()) $ case ctx_ of + Nothing -> withTransaction st action `E.catch` handleInternal "" + -- uncomment to debug store performance + -- Just ctx -> do + -- t1 <- liftIO getCurrentTime + -- putStrLn $ "agent withStoreCtx start :: " <> show t1 <> " :: " <> ctx + -- r <- withTransaction st action `E.catch` handleInternal (" (" <> ctx <> ")") + -- t2 <- liftIO getCurrentTime + -- putStrLn $ "agent withStoreCtx end :: " <> show t2 <> " :: " <> ctx <> " :: duration=" <> show (diffToMilliseconds $ diffUTCTime t2 t1) + -- pure r + Just _ -> withTransaction st action `E.catch` handleInternal "" where - handleInternal :: E.SomeException -> IO (Either StoreError a) - handleInternal = pure . Left . SEInternal . bshow + handleInternal :: String -> E.SomeException -> IO (Either StoreError a) + handleInternal ctxStr e = pure . Left . SEInternal . B.pack $ show e <> ctxStr storeError :: StoreError -> AgentErrorType storeError = \case diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 87c342836..cfd572755 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -906,7 +906,7 @@ getSndMsgViaRcpt db connId sndMsgId = [sql| SELECT s.internal_id, m.msg_type, s.internal_hash, s.rcpt_internal_id, s.rcpt_status FROM snd_messages s - JOIN messages m ON s.internal_id = m.internal_id + JOIN messages m ON s.conn_id = m.conn_id AND s.internal_id = m.internal_id WHERE s.conn_id = ? AND s.internal_snd_id = ? |] (connId, sndMsgId) @@ -976,8 +976,8 @@ getRcvMsg db connId agentMsgId = r.internal_id, m.internal_ts, r.broker_id, r.broker_ts, r.external_snd_id, r.integrity, r.internal_hash, m.msg_type, m.msg_body, s.internal_id, s.rcpt_status, r.user_ack FROM rcv_messages r - JOIN messages m ON r.internal_id = m.internal_id - LEFT JOIN snd_messages s ON s.rcpt_internal_id = r.internal_id + JOIN messages m ON r.conn_id = m.conn_id AND r.internal_id = m.internal_id + LEFT JOIN snd_messages s ON s.conn_id = r.conn_id AND s.rcpt_internal_id = r.internal_id WHERE r.conn_id = ? AND r.internal_id = ? |] (connId, agentMsgId) @@ -992,9 +992,9 @@ getLastMsg db connId msgId = r.internal_id, m.internal_ts, r.broker_id, r.broker_ts, r.external_snd_id, r.integrity, r.internal_hash, m.msg_type, m.msg_body, s.internal_id, s.rcpt_status, r.user_ack FROM rcv_messages r - JOIN messages m ON r.internal_id = m.internal_id + JOIN messages m ON r.conn_id = m.conn_id AND r.internal_id = m.internal_id JOIN connections c ON r.conn_id = c.conn_id AND c.last_internal_msg_id = r.internal_id - LEFT JOIN snd_messages s ON s.rcpt_internal_id = r.internal_id + LEFT JOIN snd_messages s ON s.conn_id = r.conn_id AND s.rcpt_internal_id = r.internal_id WHERE r.conn_id = ? AND r.broker_id = ? |] (connId, msgId)