mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-06 15:31:55 +00:00
agent: partially fix message queries, debug message times (#810)
* agent: partially fix message queries, debug message times * fix more joins
This commit is contained in:
committed by
GitHub
parent
c54be32135
commit
6314bb1706
@@ -519,9 +519,9 @@ ackMessageAsync' c corrId connId msgId rcptInfo_ = do
|
||||
enqueueAck :: m ()
|
||||
enqueueAck = do
|
||||
let mId = InternalId msgId
|
||||
RcvMsg {msgType} <- withStore c $ \db -> getRcvMsg db connId mId
|
||||
RcvMsg {msgType} <- withStoreCtx "ackMessageAsync': getRcvMsg" c $ \db -> getRcvMsg db connId mId
|
||||
when (isJust rcptInfo_ && msgType /= AM_A_MSG_) $ throwError $ CMD PROHIBITED
|
||||
(RcvQueue {server}, _) <- withStore c $ \db -> setMsgUserAck db connId mId
|
||||
(RcvQueue {server}, _) <- withStoreCtx "ackMessageAsync': setMsgUserAck" c $ \db -> setMsgUserAck db connId mId
|
||||
enqueueCommand c corrId connId (Just server) . AClientCommand $ APC SAEConn $ ACK msgId rcptInfo_
|
||||
|
||||
deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
|
||||
@@ -1239,13 +1239,13 @@ ackMessage' c connId msgId rcptInfo_ = withConnLock c connId "ackMessage" $ do
|
||||
ack :: m ()
|
||||
ack = do
|
||||
-- the stored message was delivered via a specific queue, the rest failed to decrypt and were already acknowledged
|
||||
(rq, srvMsgId) <- withStore c $ \db -> setMsgUserAck db connId $ InternalId msgId
|
||||
(rq, srvMsgId) <- withStoreCtx "ackMessage': setMsgUserAck" c $ \db -> setMsgUserAck db connId $ InternalId msgId
|
||||
ackQueueMessage c rq srvMsgId
|
||||
del :: m ()
|
||||
del = withStore' c $ \db -> deleteMsg db connId $ InternalId msgId
|
||||
del = withStoreCtx' "ackMessage': deleteMsg" c $ \db -> deleteMsg db connId $ InternalId msgId
|
||||
sendRcpt :: Connection 'CDuplex -> m ()
|
||||
sendRcpt (DuplexConnection cData _ sqs) = do
|
||||
msg@RcvMsg {msgType, msgReceipt} <- withStore c $ \db -> getRcvMsg db connId $ InternalId msgId
|
||||
msg@RcvMsg {msgType, msgReceipt} <- withStoreCtx "ackMessage': getRcvMsg" c $ \db -> getRcvMsg db connId $ InternalId msgId
|
||||
case rcptInfo_ of
|
||||
Just rcptInfo -> do
|
||||
unless (msgType == AM_A_MSG_) $ throwError (CMD PROHIBITED)
|
||||
@@ -1256,7 +1256,7 @@ ackMessage' c connId msgId rcptInfo_ = withConnLock c connId "ackMessage" $ do
|
||||
Nothing -> case (msgType, msgReceipt) of
|
||||
-- only remove sent message if receipt hash was Ok, both to debug and for future redundancy
|
||||
(AM_A_RCVD_, Just MsgReceipt {agentMsgId = sndMsgId, msgRcptStatus = MROk}) ->
|
||||
withStore' c $ \db -> deleteDeliveredSndMsg db connId $ InternalId sndMsgId
|
||||
withStoreCtx' "ackMessage': deleteDeliveredSndMsg" c $ \db -> deleteDeliveredSndMsg db connId $ InternalId sndMsgId
|
||||
_ -> pure ()
|
||||
|
||||
switchConnection' :: AgentMonad m => AgentClient -> ConnId -> m ConnectionStats
|
||||
@@ -1900,7 +1900,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s
|
||||
| otherwise = pure conn'
|
||||
Right _ -> prohibited >> ack
|
||||
Left e@(AGENT A_DUPLICATE) -> do
|
||||
withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case
|
||||
withStoreCtx' "processSMP: getLastMsg" c (\db -> getLastMsg db connId srvMsgId) >>= \case
|
||||
Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck}
|
||||
| userAck -> ackDel internalId
|
||||
| otherwise -> do
|
||||
@@ -2112,7 +2112,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s
|
||||
clientReceipt :: AMessageReceipt -> m (Maybe MsgReceipt)
|
||||
clientReceipt AMessageReceipt {agentMsgId, msgHash} = do
|
||||
let sndMsgId = InternalSndId agentMsgId
|
||||
SndMsg {internalId = InternalId msgId, msgType, internalHash, msgReceipt} <- withStore c $ \db -> getSndMsgViaRcpt db connId sndMsgId
|
||||
SndMsg {internalId = InternalId msgId, msgType, internalHash, msgReceipt} <- withStoreCtx "messagesRcvd: getSndMsgViaRcpt" c $ \db -> getSndMsgViaRcpt db connId sndMsgId
|
||||
if msgType /= AM_A_MSG_
|
||||
then notify (ERR $ AGENT A_PROHIBITED) $> Nothing -- unexpected message type for receipt
|
||||
else case msgReceipt of
|
||||
|
||||
@@ -90,6 +90,8 @@ module Simplex.Messaging.Agent.Client
|
||||
whenSuspending,
|
||||
withStore,
|
||||
withStore',
|
||||
withStoreCtx,
|
||||
withStoreCtx',
|
||||
storeError,
|
||||
userServers,
|
||||
pickServer,
|
||||
@@ -127,6 +129,7 @@ import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import Data.Text.Encoding
|
||||
import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime)
|
||||
import Data.Time.Clock (diffUTCTime)
|
||||
import Data.Word (Word16)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import GHC.Generics (Generic)
|
||||
@@ -1236,16 +1239,37 @@ waitUntilForeground :: AgentClient -> STM ()
|
||||
waitUntilForeground c = unlessM ((ASForeground ==) <$> readTVar (agentState c)) retry
|
||||
|
||||
withStore' :: AgentMonad m => AgentClient -> (DB.Connection -> IO a) -> m a
|
||||
withStore' c action = withStore c $ fmap Right . action
|
||||
withStore' = withStoreCtx_' Nothing
|
||||
|
||||
withStore :: AgentMonad m => AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> m a
|
||||
withStore c action = do
|
||||
withStore = withStoreCtx_ Nothing
|
||||
|
||||
withStoreCtx' :: AgentMonad m => String -> AgentClient -> (DB.Connection -> IO a) -> m a
|
||||
withStoreCtx' = withStoreCtx_' . Just
|
||||
|
||||
withStoreCtx :: AgentMonad m => String -> AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> m a
|
||||
withStoreCtx = withStoreCtx_ . Just
|
||||
|
||||
withStoreCtx_' :: AgentMonad m => Maybe String -> AgentClient -> (DB.Connection -> IO a) -> m a
|
||||
withStoreCtx_' ctx_ c action = withStoreCtx_ ctx_ c $ fmap Right . action
|
||||
|
||||
withStoreCtx_ :: AgentMonad m => Maybe String -> AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> m a
|
||||
withStoreCtx_ ctx_ c action = do
|
||||
st <- asks store
|
||||
liftEitherError storeError . agentOperationBracket c AODatabase (\_ -> pure ()) $
|
||||
withTransaction st action `E.catch` handleInternal
|
||||
liftEitherError storeError . agentOperationBracket c AODatabase (\_ -> pure ()) $ case ctx_ of
|
||||
Nothing -> withTransaction st action `E.catch` handleInternal ""
|
||||
-- uncomment to debug store performance
|
||||
-- Just ctx -> do
|
||||
-- t1 <- liftIO getCurrentTime
|
||||
-- putStrLn $ "agent withStoreCtx start :: " <> show t1 <> " :: " <> ctx
|
||||
-- r <- withTransaction st action `E.catch` handleInternal (" (" <> ctx <> ")")
|
||||
-- t2 <- liftIO getCurrentTime
|
||||
-- putStrLn $ "agent withStoreCtx end :: " <> show t2 <> " :: " <> ctx <> " :: duration=" <> show (diffToMilliseconds $ diffUTCTime t2 t1)
|
||||
-- pure r
|
||||
Just _ -> withTransaction st action `E.catch` handleInternal ""
|
||||
where
|
||||
handleInternal :: E.SomeException -> IO (Either StoreError a)
|
||||
handleInternal = pure . Left . SEInternal . bshow
|
||||
handleInternal :: String -> E.SomeException -> IO (Either StoreError a)
|
||||
handleInternal ctxStr e = pure . Left . SEInternal . B.pack $ show e <> ctxStr
|
||||
|
||||
storeError :: StoreError -> AgentErrorType
|
||||
storeError = \case
|
||||
|
||||
@@ -906,7 +906,7 @@ getSndMsgViaRcpt db connId sndMsgId =
|
||||
[sql|
|
||||
SELECT s.internal_id, m.msg_type, s.internal_hash, s.rcpt_internal_id, s.rcpt_status
|
||||
FROM snd_messages s
|
||||
JOIN messages m ON s.internal_id = m.internal_id
|
||||
JOIN messages m ON s.conn_id = m.conn_id AND s.internal_id = m.internal_id
|
||||
WHERE s.conn_id = ? AND s.internal_snd_id = ?
|
||||
|]
|
||||
(connId, sndMsgId)
|
||||
@@ -976,8 +976,8 @@ getRcvMsg db connId agentMsgId =
|
||||
r.internal_id, m.internal_ts, r.broker_id, r.broker_ts, r.external_snd_id, r.integrity, r.internal_hash,
|
||||
m.msg_type, m.msg_body, s.internal_id, s.rcpt_status, r.user_ack
|
||||
FROM rcv_messages r
|
||||
JOIN messages m ON r.internal_id = m.internal_id
|
||||
LEFT JOIN snd_messages s ON s.rcpt_internal_id = r.internal_id
|
||||
JOIN messages m ON r.conn_id = m.conn_id AND r.internal_id = m.internal_id
|
||||
LEFT JOIN snd_messages s ON s.conn_id = r.conn_id AND s.rcpt_internal_id = r.internal_id
|
||||
WHERE r.conn_id = ? AND r.internal_id = ?
|
||||
|]
|
||||
(connId, agentMsgId)
|
||||
@@ -992,9 +992,9 @@ getLastMsg db connId msgId =
|
||||
r.internal_id, m.internal_ts, r.broker_id, r.broker_ts, r.external_snd_id, r.integrity, r.internal_hash,
|
||||
m.msg_type, m.msg_body, s.internal_id, s.rcpt_status, r.user_ack
|
||||
FROM rcv_messages r
|
||||
JOIN messages m ON r.internal_id = m.internal_id
|
||||
JOIN messages m ON r.conn_id = m.conn_id AND r.internal_id = m.internal_id
|
||||
JOIN connections c ON r.conn_id = c.conn_id AND c.last_internal_msg_id = r.internal_id
|
||||
LEFT JOIN snd_messages s ON s.rcpt_internal_id = r.internal_id
|
||||
LEFT JOIN snd_messages s ON s.conn_id = r.conn_id AND s.rcpt_internal_id = r.internal_id
|
||||
WHERE r.conn_id = ? AND r.broker_id = ?
|
||||
|]
|
||||
(connId, msgId)
|
||||
|
||||
Reference in New Issue
Block a user