diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 56432b947..5b77fcac0 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1962,6 +1962,8 @@ cleanupManager c@AgentClient {subQ} = do notify :: forall e. AEntityI e => EntityId -> ACommand 'Agent e -> ExceptT AgentErrorType m () notify entId cmd = atomically $ writeTBQueue subQ ("", entId, APC (sAEntity @e) cmd) +data ACKd = ACKd | ACKPending + -- | make sure to ACK or throw in each message processing branch -- it cannot be finally, unfortunately, as sometimes it needs to be ACK+DEL processSMPTransmission :: forall m. AgentMonad m => AgentClient -> ServerTransmission SMPVersion BrokerMsg -> m () @@ -1976,13 +1978,14 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss} = withConnLock c connId "processSMP" $ case cmd of SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} -> - handleNotifyAck $ do + void . handleNotifyAck $ do msg' <- decryptSMPMessage rq msg - handleNotifyAck $ case msg' of + ack' <- handleNotifyAck $ case msg' of SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} -> processClientMsg srvTs msgFlags msgBody SMP.ClientRcvMsgQuota {} -> queueDrained >> ack whenM (atomically $ hasGetLock c rq) $ notify (MSGNTF $ SMP.rcvMessageMeta srvMsgId msg') + pure ack' where queueDrained = case conn of DuplexConnection _ _ sqs -> void $ enqueueMessages c cData sqs SMP.noMsgFlags $ QCONT (sndAddress rq) @@ -2005,8 +2008,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, decryptClientMessage e2eDh clientMsg >>= \case (SMP.PHEmpty, AgentRatchetKey {agentVersion, e2eEncryption}) -> do conn' <- updateConnVersion conn cData agentVersion - qDuplex conn' "AgentRatchetKey" $ newRatchetKey e2eEncryption - ack + qDuplex conn' "AgentRatchetKey" $ \a -> newRatchetKey e2eEncryption a >> ack (SMP.PHEmpty, AgentMsgEnvelope {agentVersion, encAgentMessage}) -> do conn' <- updateConnVersion conn cData agentVersion -- primary queue is set as Active in helloMsg, below is to set additional queues Active @@ -2033,6 +2035,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, A_MSG body -> do logServer "<--" c srv rId $ "MSG :" <> logSecret srvMsgId notify $ MSG msgMeta msgFlags body + pure ACKPending A_RCVD rcpts -> qDuplex conn'' "RCVD" $ messagesRcvd rcpts msgMeta QCONT addr -> qDuplexAckDel conn'' "QCONT" $ continueSending srvMsgId addr QADD qs -> qDuplexAckDel conn'' "QADD" $ qAddMsg srvMsgId qs @@ -2043,7 +2046,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, QTEST _ -> logServer "<--" c srv rId ("MSG :" <> logSecret srvMsgId) >> ackDel msgId EREADY _ -> qDuplexAckDel conn'' "EREADY" $ ereadyMsg rcPrev where - qDuplexAckDel :: Connection c -> String -> (Connection 'CDuplex -> m ()) -> m () + qDuplexAckDel :: Connection c -> String -> (Connection 'CDuplex -> m ()) -> m ACKd qDuplexAckDel conn'' name a = qDuplex conn'' name a >> ackDel msgId resetRatchetSync :: m (Connection c) resetRatchetSync @@ -2064,7 +2067,8 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, AgentMessage _ (A_MSG body) -> do logServer "<--" c srv rId $ "MSG :" <> logSecret srvMsgId notify $ MSG msgMeta msgFlags body - _ -> pure () + pure ACKPending + _ -> ack _ -> checkDuplicateHash e encryptedMsgHash >> ack Left (AGENT (A_CRYPTO e)) -> do exists <- withStore' c $ \db -> checkRcvMsgHashExists db connId encryptedMsgHash @@ -2117,11 +2121,11 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, pure $ updateConnection cData'' conn' | otherwise -> pure conn' Nothing -> pure conn' - ack :: m () - ack = enqueueCmd $ ICAck rId srvMsgId - ackDel :: InternalId -> m () - ackDel = enqueueCmd . ICAckDel rId srvMsgId - handleNotifyAck :: m () -> m () + ack :: m ACKd + ack = enqueueCmd (ICAck rId srvMsgId) $> ACKd + ackDel :: InternalId -> m ACKd + ackDel aId = enqueueCmd (ICAckDel rId srvMsgId aId) $> ACKd + handleNotifyAck :: m ACKd -> m ACKd handleNotifyAck m = m `catchAgentError` \e -> notify (ERR e) >> ack SMP.END -> atomically (TM.lookup tSess smpClients $>>= (tryReadTMVar . sessionVar) >>= processEND) @@ -2249,14 +2253,16 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, >>= mapM_ (\(_, retryLock) -> tryPutTMVar retryLock ()) Nothing -> qError "QCONT: queue address not found" - messagesRcvd :: NonEmpty AMessageReceipt -> MsgMeta -> Connection 'CDuplex -> m () + messagesRcvd :: NonEmpty AMessageReceipt -> MsgMeta -> Connection 'CDuplex -> m ACKd messagesRcvd rcpts msgMeta@MsgMeta {broker = (srvMsgId, _)} _ = do logServer "<--" c srv rId $ "MSG :" <> logSecret srvMsgId rs <- forM rcpts $ \rcpt -> clientReceipt rcpt `catchAgentError` \e -> notify (ERR e) $> Nothing case L.nonEmpty . catMaybes $ L.toList rs of - Just rs' -> notify $ RCVD msgMeta rs' -- client must ACK once processed - Nothing -> enqueueCmd $ ICAck rId srvMsgId + Just rs' -> notify (RCVD msgMeta rs') $> ACKPending + Nothing -> ack where + ack :: m ACKd + ack = enqueueCmd (ICAck rId srvMsgId) $> ACKd clientReceipt :: AMessageReceipt -> m (Maybe MsgReceipt) clientReceipt AMessageReceipt {agentMsgId, msgHash} = do let sndMsgId = InternalSndId agentMsgId @@ -2347,7 +2353,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, _ -> qError "QUSE: switching SndQueue not found in connection" _ -> qError "QUSE: switched queue address not found in connection" - qError :: String -> m () + qError :: String -> m a qError = throwError . AGENT . A_QUEUE ereadyMsg :: CR.RatchetX448 -> Connection 'CDuplex -> m () @@ -2375,7 +2381,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, pqSupported (_, Compatible (CR.E2ERatchetParams v _ _ _), Compatible agentVersion) = PQSupportOn `CR.pqSupportAnd` versionPQSupport_ agentVersion (Just v) - qDuplex :: Connection c -> String -> (Connection 'CDuplex -> m ()) -> m () + qDuplex :: Connection c -> String -> (Connection 'CDuplex -> m a) -> m a qDuplex conn' name action = case conn' of DuplexConnection {} -> action conn' _ -> qError $ name <> ": message must be sent to duplex connection" diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Common.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Common.hs index 9ba6cd08f..18c16cc8b 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Common.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Common.hs @@ -71,7 +71,8 @@ dbBusyLoop action = loop 500 3000000 loop :: Int -> Int -> IO a loop t tLim = action `E.catch` \(e :: SQLError) -> - if tLim > t && SQL.sqlError e == SQL.ErrorBusy + let se = SQL.sqlError e in + if tLim > t && (se == SQL.ErrorBusy || se == SQL.ErrorLocked) then do threadDelay t loop (t * 9 `div` 8) (tLim - t)