core: check ACK handling with return type (#1041)

* core: check ACK handling with return type

* fix ratchet sync

* add SQL Locked to dbBusyLoop

* rename

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
Alexander Bondarenko
2024-03-11 21:06:53 +02:00
committed by GitHub
parent 78eb4f764f
commit 2cad0cb201
2 changed files with 24 additions and 17 deletions
+22 -16
View File
@@ -1962,6 +1962,8 @@ cleanupManager c@AgentClient {subQ} = do
notify :: forall e. AEntityI e => EntityId -> ACommand 'Agent e -> ExceptT AgentErrorType m ()
notify entId cmd = atomically $ writeTBQueue subQ ("", entId, APC (sAEntity @e) cmd)
data ACKd = ACKd | ACKPending
-- | make sure to ACK or throw in each message processing branch
-- it cannot be finally, unfortunately, as sometimes it needs to be ACK+DEL
processSMPTransmission :: forall m. AgentMonad m => AgentClient -> ServerTransmission SMPVersion BrokerMsg -> m ()
@@ -1976,13 +1978,14 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss} =
withConnLock c connId "processSMP" $ case cmd of
SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} ->
handleNotifyAck $ do
void . handleNotifyAck $ do
msg' <- decryptSMPMessage rq msg
handleNotifyAck $ case msg' of
ack' <- handleNotifyAck $ case msg' of
SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} -> processClientMsg srvTs msgFlags msgBody
SMP.ClientRcvMsgQuota {} -> queueDrained >> ack
whenM (atomically $ hasGetLock c rq) $
notify (MSGNTF $ SMP.rcvMessageMeta srvMsgId msg')
pure ack'
where
queueDrained = case conn of
DuplexConnection _ _ sqs -> void $ enqueueMessages c cData sqs SMP.noMsgFlags $ QCONT (sndAddress rq)
@@ -2005,8 +2008,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
decryptClientMessage e2eDh clientMsg >>= \case
(SMP.PHEmpty, AgentRatchetKey {agentVersion, e2eEncryption}) -> do
conn' <- updateConnVersion conn cData agentVersion
qDuplex conn' "AgentRatchetKey" $ newRatchetKey e2eEncryption
ack
qDuplex conn' "AgentRatchetKey" $ \a -> newRatchetKey e2eEncryption a >> ack
(SMP.PHEmpty, AgentMsgEnvelope {agentVersion, encAgentMessage}) -> do
conn' <- updateConnVersion conn cData agentVersion
-- primary queue is set as Active in helloMsg, below is to set additional queues Active
@@ -2033,6 +2035,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
A_MSG body -> do
logServer "<--" c srv rId $ "MSG <MSG>:" <> logSecret srvMsgId
notify $ MSG msgMeta msgFlags body
pure ACKPending
A_RCVD rcpts -> qDuplex conn'' "RCVD" $ messagesRcvd rcpts msgMeta
QCONT addr -> qDuplexAckDel conn'' "QCONT" $ continueSending srvMsgId addr
QADD qs -> qDuplexAckDel conn'' "QADD" $ qAddMsg srvMsgId qs
@@ -2043,7 +2046,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
QTEST _ -> logServer "<--" c srv rId ("MSG <QTEST>:" <> logSecret srvMsgId) >> ackDel msgId
EREADY _ -> qDuplexAckDel conn'' "EREADY" $ ereadyMsg rcPrev
where
qDuplexAckDel :: Connection c -> String -> (Connection 'CDuplex -> m ()) -> m ()
qDuplexAckDel :: Connection c -> String -> (Connection 'CDuplex -> m ()) -> m ACKd
qDuplexAckDel conn'' name a = qDuplex conn'' name a >> ackDel msgId
resetRatchetSync :: m (Connection c)
resetRatchetSync
@@ -2064,7 +2067,8 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
AgentMessage _ (A_MSG body) -> do
logServer "<--" c srv rId $ "MSG <MSG>:" <> logSecret srvMsgId
notify $ MSG msgMeta msgFlags body
_ -> pure ()
pure ACKPending
_ -> ack
_ -> checkDuplicateHash e encryptedMsgHash >> ack
Left (AGENT (A_CRYPTO e)) -> do
exists <- withStore' c $ \db -> checkRcvMsgHashExists db connId encryptedMsgHash
@@ -2117,11 +2121,11 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
pure $ updateConnection cData'' conn'
| otherwise -> pure conn'
Nothing -> pure conn'
ack :: m ()
ack = enqueueCmd $ ICAck rId srvMsgId
ackDel :: InternalId -> m ()
ackDel = enqueueCmd . ICAckDel rId srvMsgId
handleNotifyAck :: m () -> m ()
ack :: m ACKd
ack = enqueueCmd (ICAck rId srvMsgId) $> ACKd
ackDel :: InternalId -> m ACKd
ackDel aId = enqueueCmd (ICAckDel rId srvMsgId aId) $> ACKd
handleNotifyAck :: m ACKd -> m ACKd
handleNotifyAck m = m `catchAgentError` \e -> notify (ERR e) >> ack
SMP.END ->
atomically (TM.lookup tSess smpClients $>>= (tryReadTMVar . sessionVar) >>= processEND)
@@ -2249,14 +2253,16 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
>>= mapM_ (\(_, retryLock) -> tryPutTMVar retryLock ())
Nothing -> qError "QCONT: queue address not found"
messagesRcvd :: NonEmpty AMessageReceipt -> MsgMeta -> Connection 'CDuplex -> m ()
messagesRcvd :: NonEmpty AMessageReceipt -> MsgMeta -> Connection 'CDuplex -> m ACKd
messagesRcvd rcpts msgMeta@MsgMeta {broker = (srvMsgId, _)} _ = do
logServer "<--" c srv rId $ "MSG <RCPT>:" <> logSecret srvMsgId
rs <- forM rcpts $ \rcpt -> clientReceipt rcpt `catchAgentError` \e -> notify (ERR e) $> Nothing
case L.nonEmpty . catMaybes $ L.toList rs of
Just rs' -> notify $ RCVD msgMeta rs' -- client must ACK once processed
Nothing -> enqueueCmd $ ICAck rId srvMsgId
Just rs' -> notify (RCVD msgMeta rs') $> ACKPending
Nothing -> ack
where
ack :: m ACKd
ack = enqueueCmd (ICAck rId srvMsgId) $> ACKd
clientReceipt :: AMessageReceipt -> m (Maybe MsgReceipt)
clientReceipt AMessageReceipt {agentMsgId, msgHash} = do
let sndMsgId = InternalSndId agentMsgId
@@ -2347,7 +2353,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
_ -> qError "QUSE: switching SndQueue not found in connection"
_ -> qError "QUSE: switched queue address not found in connection"
qError :: String -> m ()
qError :: String -> m a
qError = throwError . AGENT . A_QUEUE
ereadyMsg :: CR.RatchetX448 -> Connection 'CDuplex -> m ()
@@ -2375,7 +2381,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
pqSupported (_, Compatible (CR.E2ERatchetParams v _ _ _), Compatible agentVersion) =
PQSupportOn `CR.pqSupportAnd` versionPQSupport_ agentVersion (Just v)
qDuplex :: Connection c -> String -> (Connection 'CDuplex -> m ()) -> m ()
qDuplex :: Connection c -> String -> (Connection 'CDuplex -> m a) -> m a
qDuplex conn' name action = case conn' of
DuplexConnection {} -> action conn'
_ -> qError $ name <> ": message must be sent to duplex connection"
@@ -71,7 +71,8 @@ dbBusyLoop action = loop 500 3000000
loop :: Int -> Int -> IO a
loop t tLim =
action `E.catch` \(e :: SQLError) ->
if tLim > t && SQL.sqlError e == SQL.ErrorBusy
let se = SQL.sqlError e in
if tLim > t && (se == SQL.ErrorBusy || se == SQL.ErrorLocked)
then do
threadDelay t
loop (t * 9 `div` 8) (tLim - t)