diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 5b376008c..df2ce5ebc 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -502,7 +502,7 @@ deleteCurrRcvQueue c ConnData {connId} rq _sq rq'@RcvQueue {server, rcvId} = do withStore' c $ \db -> switchCurrRcvQueue db rq rq' atomically $ TM.lookupDelete (connId, server, rcvId) (nextRcvQueueMsgs c) - >>= mapM_ (mapM_ . writeTBQueue $ msgQ c) + >>= mapM_ ((mapM_ . writeTBQueue $ msgQ c) . reverse) subscribeConnections' :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ())) subscribeConnections' _ [] = pure M.empty @@ -644,8 +644,9 @@ enqueueMessage c cData@ConnData {connId, connAgentVersion} sq msgFlags aMessage resumeMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> m () resumeMsgDelivery c cData@ConnData {connId} sq@SndQueue {server, sndId} = do let qKey = (connId, server, sndId) - unlessM (queueDelivering qKey) $ - async (runSmpQueueMsgDelivery c cData sq) + unlessM (queueDelivering qKey) $ do + mq <- atomically $ getPendingMsgQ c connId sq + async (runSmpQueueMsgDelivery c cData mq) >>= \a -> atomically (TM.insert qKey a $ smpQueueMsgDeliveries c) unlessM connQueued $ withStore' c (`getPendingMsgs` connId) @@ -672,9 +673,8 @@ getPendingMsgQ c connId SndQueue {server, sndId} = do TM.insert qKey mq $ smpQueueMsgQueues c pure mq -runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> m () -runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandshake} sq = do - mq <- atomically $ getPendingMsgQ c connId sq +runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> TQueue InternalId -> m () +runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandshake} mq = do ri <- asks $ messageRetryInterval . config forever $ do atomically $ endAgentOperation c AOSndNetwork @@ -686,7 +686,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh E.try (withStore c $ \db -> getPendingMsgData db connId msgId) >>= \case Left (e :: E.SomeException) -> notify $ MERR mId (INTERNAL $ show e) - Right (rq_, PendingMsgData {msgType, msgBody, msgFlags, internalTs}) -> + Right (rq_, sq, PendingMsgData {msgType, msgBody, msgFlags, internalTs}) -> withRetryInterval ri $ \loop -> do resp <- tryError $ case msgType of AM_CONN_INFO -> sendConfirmation c sq msgBody @@ -1094,7 +1094,7 @@ subscriber c@AgentClient {msgQ} = forever $ do Right _ -> return () processSMPTransmission :: forall m. AgentMonad m => AgentClient -> ServerTransmission BrokerMsg -> m () -processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cmd) = do +processSMPTransmission c@AgentClient {smpClients, subQ} transmission@(srv, v, sessId, rId, cmd) = do (rq, SomeConn _ conn) <- withStore c $ \db -> getRcvConn db srv rId processSMP conn (connData conn) rq where @@ -1109,6 +1109,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm unless (phVer `isCompatible` clientVRange) . throwError $ AGENT A_VERSION case (e2eDhSecret, e2ePubKey_) of (Nothing, Just e2ePubKey) -> do + unless (currRcvQueue) . throwError $ INTERNAL "can only be sent to the current queue" let e2eDh = C.dh' e2ePubKey e2ePrivKey decryptClientMessage e2eDh clientMsg >>= \case (SMP.PHConfirmation senderKey, AgentConfirmation {e2eEncryption, encConnInfo, agentVersion}) -> @@ -1124,9 +1125,13 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm HELLO -> helloMsg >> ackDelete msgId REPLY cReq -> replyMsg cReq >> ackDelete msgId -- note that there is no ACK sent for A_MSG, it is sent with agent's user ACK command - A_MSG body -> do - logServer "<--" c srv rId "MSG " - notify $ MSG msgMeta msgFlags body + A_MSG body + | currRcvQueue -> do + logServer "<--" c srv rId "MSG " + notify $ MSG msgMeta msgFlags body + | otherwise -> atomically $ TM.alter addTransmission (connId, srv, rId) (nextRcvQueueMsgs c) + where + addTransmission = Just . maybe [transmission] (transmission :) QNEW currAddr nextQUri -> rqNewMsg currAddr nextQUri >> ackDelete msgId QKEYS sKey nextQInfo -> rqKeys sKey nextQInfo $ ackDelete msgId QREADY addr -> rqReady addr >> ackDelete msgId @@ -1141,9 +1146,18 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm withStore' c $ \db -> deleteMsg db connId internalId | otherwise -> do liftEither (parse smpP (AGENT A_MESSAGE) agentMsgBody) >>= \case - AgentMessage _ (A_MSG body) -> do - logServer "<--" c srv rId "MSG " - notify $ MSG msgMeta msgFlags body + AgentMessage _ (A_MSG body) + | currRcvQueue -> do + logServer "<--" c srv rId "MSG " + notify $ MSG msgMeta msgFlags body + | otherwise -> atomically $ TM.alter addTransmission (connId, srv, rId) (nextRcvQueueMsgs c) + where + addTransmission = Just . maybe [transmission] prependIfDifferent + prependIfDifferent = \case + [] -> [transmission] + ts@((_, _, _, _, cmd') : _) + | cmd == cmd' -> ts + | otherwise -> transmission : ts _ -> pure () _ -> throwError e Left e -> throwError e @@ -1266,6 +1280,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm helloMsg :: m () helloMsg = do + unless currRcvQueue . throwError $ INTERNAL "can only be sent to the current queue" logServer "<--" c srv rId "MSG " case status of Active -> prohibited @@ -1287,6 +1302,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm replyMsg :: L.NonEmpty SMPQueueInfo -> m () replyMsg smpQueues = do + unless currRcvQueue . throwError $ INTERNAL "can only be sent to the current queue" logServer "<--" c srv rId "MSG " case duplexHandshake of Just True -> prohibited @@ -1298,70 +1314,61 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm -- processed by queue sender rqNewMsg :: (SMPServer, SMP.SenderId) -> SMPQueueUri -> m () - rqNewMsg (smpServer, senderId) nextQUri - | currRcvQueue = case conn of - DuplexConnection _ _ sq@SndQueue {server, sndId} - | smpServer == server && senderId == sndId -> do - clientVRange <- asks $ smpClientVRange . config - case (nextQUri `compatibleVersion` clientVRange) of - Just qInfo@(Compatible nextQInfo) -> do - sq'@SndQueue {sndPublicKey, e2ePubKey} <- newSndQueue qInfo False - withStore' c $ \db -> dbCreateNextSndQueue db sq sq' - case (sndPublicKey, e2ePubKey) of - (Just nextSenderKey, Just dhPublicKey) -> do - let qAddr = (queueAddress (nextQInfo :: SMPQueueInfo)) {dhPublicKey} - nextQueueInfo = (nextQInfo :: SMPQueueInfo) {queueAddress = qAddr} - void . enqueueMessage c cData sq SMP.noMsgFlags $ QKEYS {nextSenderKey, nextQueueInfo} - rq' <- withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq) - notify . SWITCH SPStarted $ connectionStats conn rq' (Just sq') - _ -> throwError $ INTERNAL "absent sender keys" - _ -> throwError $ AGENT A_VERSION - | otherwise -> throwError $ INTERNAL "incorrect queue address" - _ -> throwError $ INTERNAL "message can only be sent to duplex connection" - | otherwise = throwError $ INTERNAL "message can only be sent to current queue" + rqNewMsg (smpServer, senderId) nextQUri = case conn of + DuplexConnection _ _ sq@SndQueue {server, sndId} -> do + unless (smpServer == server && senderId == sndId) . throwError $ INTERNAL "incorrect queue address" + clientVRange <- asks $ smpClientVRange . config + case (nextQUri `compatibleVersion` clientVRange) of + Just qInfo@(Compatible nextQInfo) -> do + sq'@SndQueue {sndPublicKey, e2ePubKey} <- newSndQueue qInfo False + withStore' c $ \db -> dbCreateNextSndQueue db sq sq' + case (sndPublicKey, e2ePubKey) of + (Just nextSenderKey, Just dhPublicKey) -> do + let qAddr = (queueAddress (nextQInfo :: SMPQueueInfo)) {dhPublicKey} + nextQueueInfo = (nextQInfo :: SMPQueueInfo) {queueAddress = qAddr} + void . enqueueMessage c cData sq SMP.noMsgFlags $ QKEYS {nextSenderKey, nextQueueInfo} + rq' <- withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq) + notify . SWITCH SPStarted $ connectionStats conn rq' (Just sq') + _ -> throwError $ INTERNAL "absent sender keys" + _ -> throwError $ AGENT A_VERSION + _ -> throwError $ INTERNAL "message can only be sent to duplex connection" -- processed by queue recipient rqKeys :: SndPublicVerifyKey -> SMPQueueInfo -> m () -> m () - rqKeys senderKey qInfo ackDelete - | currRcvQueue = case conn of + rqKeys senderKey qInfo@(SMPQueueInfo clntVer' SMPQueueAddress {smpServer, senderId, dhPublicKey}) ackDelete = do + unless currRcvQueue . throwError $ INTERNAL "message can only be sent to current queue" + case conn of DuplexConnection _ _ sq -> do clientVRange <- asks $ smpClientVRange . config - case qInfo `proveCompatible` clientVRange of - Just (Compatible (SMPQueueInfo clntVer' SMPQueueAddress {smpServer, senderId, dhPublicKey})) -> do - withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq) >>= \case - Just rq'@RcvQueue {server, sndId, e2ePrivKey = dhPrivKey, smpClientVersion = clntVer} - | server == smpServer && sndId == senderId -> do - let dhSecret = C.dh' dhPublicKey dhPrivKey - withStore' c $ \db -> do - setRcvQueueConfirmedE2E db rq' senderKey dhSecret $ min clntVer clntVer' - setRcvQueueAction db rq $ Just RQASecureNextQueue - ackDelete - secureNextRcvQueue c cData rq sq rq' - | otherwise -> throwError $ INTERNAL "incorrect queue address" - _ -> throwError $ INTERNAL "message can only be sent during rotation" - _ -> throwError $ AGENT A_VERSION + unless (qInfo `isCompatible` clientVRange) . throwError $ AGENT A_VERSION + withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq) >>= \case + Just rq'@RcvQueue {server, sndId, e2ePrivKey = dhPrivKey, smpClientVersion = clntVer} -> do + unless (smpServer == server && senderId == sndId) . throwError $ INTERNAL "incorrect queue address" + let dhSecret = C.dh' dhPublicKey dhPrivKey + withStore' c $ \db -> do + setRcvQueueConfirmedE2E db rq' senderKey dhSecret $ min clntVer clntVer' + setRcvQueueAction db rq $ Just RQASecureNextQueue + ackDelete + secureNextRcvQueue c cData rq sq rq' + _ -> throwError $ INTERNAL "message can only be sent during rotation" _ -> throwError $ INTERNAL "message can only be sent to duplex connection" - | otherwise = throwError $ INTERNAL "message can only be sent to current queue" -- processed by queue sender rqReady :: (SMPServer, SMP.SenderId) -> m () - rqReady (smpServer, senderId) - | currRcvQueue = case conn of - DuplexConnection _ _ sq -> - withStore' c (`getNextSndQueue` dbNextSndQueueId sq) >>= \case - Just sq'@SndQueue {server, sndId} - | server == smpServer && sndId == senderId -> - void $ enqueueMessage c cData sq' SMP.noMsgFlags QHELLO - | otherwise -> throwError $ INTERNAL "incorrect queue address" - _ -> throwError $ INTERNAL "message can only be sent during rotation" - _ -> throwError $ INTERNAL "message can only be sent to duplex connection" - | otherwise = throwError $ INTERNAL "message can only be sent to current queue" + rqReady (smpServer, senderId) = case conn of + DuplexConnection _ _ sq -> + withStore' c (`getNextSndQueue` dbNextSndQueueId sq) >>= \case + Just sq'@SndQueue {server, sndId} -> do + unless (smpServer == server && senderId == sndId) . throwError $ INTERNAL "incorrect queue address" + void $ enqueueMessage c cData sq' SMP.noMsgFlags QHELLO + _ -> throwError $ INTERNAL "message can only be sent during rotation" + _ -> throwError $ INTERNAL "message can only be sent to duplex connection" -- processed by queue recipient, received from the new queue rqHello :: m () -> m () - rqHello ackDelete - | currRcvQueue = throwError $ INTERNAL "message can only be sent to the next queue" - | otherwise = case conn of + rqHello ackDelete = do + when currRcvQueue . throwError $ INTERNAL "message can only be sent to the next queue" + case conn of DuplexConnection _ currRq sq -> do let RcvQueue {server, sndId} = rq void . enqueueMessage c cData sq SMP.noMsgFlags $ QSWITCH (server, sndId) @@ -1374,11 +1381,15 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm -- processed by queue sender rqSwitch :: (SMPServer, SMP.SenderId) -> m () - rqSwitch _addr = do - -- make new queue active - -- delete old queue from the database - -- update unsent messages? or just restart message deliveries? - pure () + rqSwitch (smpServer, senderId) = case conn of + DuplexConnection _ _ sq -> do + withStore' c (`getNextSndQueue` dbNextSndQueueId sq) >>= \case + Just sq'@SndQueue {server, sndId} -> do + unless (smpServer == server && senderId == sndId) . throwError $ INTERNAL "incorrect queue address" + withStore' c $ \db -> switchCurrSndQueue db sq sq' + -- update unsent messages? or just restart message deliveries? + _ -> throwError $ INTERNAL "message can only be sent during rotation" + _ -> throwError $ INTERNAL "message can only be sent to duplex connection" smpInvitation :: ConnectionRequestUri 'CMInvitation -> ConnInfo -> m () smpInvitation connReq@(CRInvitationUri crData _) cInfo = do diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index c49b289d2..2da08521f 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -44,6 +44,7 @@ module Simplex.Messaging.Agent.Store.SQLite dbCreateNextSndQueue, setRcvQueueAction, switchCurrRcvQueue, + switchCurrSndQueue, -- Confirmations createConfirmation, acceptConfirmation, @@ -429,6 +430,13 @@ setRcvQueueAction _db _rq _rqAction_ = pure () switchCurrRcvQueue :: DB.Connection -> RcvQueue -> RcvQueue -> IO () switchCurrRcvQueue _db _rq _nextRq = do -- make a new queue a main one + -- delete old queue from the database + pure () + +switchCurrSndQueue :: DB.Connection -> SndQueue -> SndQueue -> IO () +switchCurrSndQueue _db _sq _nextSq = do + -- make new queue active + -- delete old queue from the database pure () type SMPConfirmationRow = (SndPublicVerifyKey, C.PublicKeyX25519, ConnInfo, Maybe [SMPQueueInfo], Maybe Version) @@ -601,10 +609,11 @@ createSndMsg db connId sndMsgData = do insertSndMsgDetails_ db connId sndMsgData updateHashSnd_ db connId sndMsgData -getPendingMsgData :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError (Maybe RcvQueue, PendingMsgData)) -getPendingMsgData db connId msgId = do - rq_ <- getRcvQueueByConnId_ db connId - (rq_,) <$$> firstRow pendingMsgData SEMsgNotFound getMsgData_ +getPendingMsgData :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError (Maybe RcvQueue, SndQueue, PendingMsgData)) +getPendingMsgData db connId msgId = runExceptT $ do + rq_ <- liftIO $ getRcvQueueByConnId_ db connId + sq <- ExceptT $ maybe (Left SEConnNotFound) Right <$> getSndQueueByConnId_ db connId + ExceptT $ (rq_,sq,) <$$> firstRow pendingMsgData SEMsgNotFound getMsgData_ where getMsgData_ = DB.query