diff --git a/rfcs/2023-06-08-resync-ratchets.md b/rfcs/2023-06-08-resync-ratchets.md new file mode 100644 index 000000000..fc4572eec --- /dev/null +++ b/rfcs/2023-06-08-resync-ratchets.md @@ -0,0 +1,366 @@ +# Re-sync encryption ratchets + +## Problem + +See https://github.com/simplex-chat/simplexmq/pull/743/files for problem and high-level solution. + +## Implementation + +### Diagnosing ratchet de-synchronization + +Message decryption happens in `agentClientMsg`, in `agentRatchetDecrypt`, which can return decryption result or error. Decryption error can be differentiated in `agentClientMsg` result pattern match, in Left cases, where we already differentiate duplicate error (`AGENT A_DUPLICATE`). + +Question: Which decryption errors can be diagnosed as ratchet de-synchronization? + +Possibly any `AGENT A_CRYPTO` error. Definitely on `RATCHET_HEADER`, TBC other. See `cryptoError :: C.CryptoError -> AgentErrorType` for conversion from decryption errors to `A_CRYPTO` or other agent errors. We're only interested in crypto errors, as other are either other client implementation errors, internal errors, or already processed duplicate error. + +Proposed classification of crypto errors, based on `AgentCryptoError`: + +`DECRYPT_AES` -> re-sync allowed (recommended/required?) + +`DECRYPT_CB` -> re-sync allowed (recommended/required?) + +`RATCHET_HEADER` -> **re-sync required** + +`RATCHET_EARLIER` -> re-sync allowed + +`RATCHET_SKIPPED` -> **re-sync required** + +Ratchet re-synchronization could be started automatically on diagnosing de-synchronization, based on these errors. As a potentially dangerous feature (e.g., implementation error could lead to infinite re-sync loop causing large traffic consumption), initially it will be available via agent functional api for client to call. Ratchet de-synchronization will instead produce an event prompting client to re-synchronize. + +Diagnosing possible ratchet de-synchronization also will be recorded as connection state - `ratchet_desync_state` field in `connections` table. Client should be prohibited to start ratchet re-synchronization unless `ratchet_desync_state` is set. + +Event should not be repeated for following received messages that can't be decrypted - based on `ratchet_desync_state`. If a received message can be decrypted, `ratchet_desync_state` should be set to NULL and a new event sent, indicating ratchet has healed. + +New event - `RDESYNC :: RatchetDesyncState -> ConnectionStats -> ACommand Agent AEConn` + +```haskell +data RatchetDesyncState + = RDResyncAllowed + | RDResyncRequired + | RDHealed +``` + +New field should be added to `ConnectionStats` - `ratchetDesyncState :: Maybe RatchetDesyncState`, based on `ratchet_desync_state`. + +> On `RDESYNC` events chat should create chat item, prompting ratchet re-synchronization or notifying it has healed. +> If connection has diagnosed ratchet de-sync, chat item should have a button to start ratchet re-sync. +> We'd have to get `ConnectionStats` on chat level for this instead of chat info. +> This wouldn't work for groups. One option is to add `ConnectionStats` to `GroupMember` type and update on events. +> Same could be done for `Contact` then. + +To consider - allow to start ratchet re-synchronization at any time regardless of this field as an experimental feature. In chat it could be behind "Developer tools" + additional "Experimental" toggle. Agent api would have `force :: Bool` as parameter, allowing to bypass `ratchet_desync_state`. Should `ratchet_resync_state` (see below) still be honored in this case? + +### Re-synchronization process + +\*\*\*\*\* + +Basic idea is the following: + +Both agents send new ratchet keys and compute a new shared secret. Agent that starts re-synchronization should record this fact in the connection state. Agent that receives a new key should respond with a key of its own, unless it has recorded that it itself started re-synchronization in the connection state. + +It can happen that both agents start re-synchronizing simultaneously. In this case they both would record it in the connection state and would not respond with a new message - instead they would use each other's already sent keys. + +Agent has both keys if: + +- It initiates with the first key, and then receives the second key; +- It receives the first key and then generates its own in response. + +After agent has both keys, it initiates new ratchet depending on keys ordering. The agent that sent the lower key should use `initRcvRatchet` function, the agent that sent the greater key should use `initSndRatchet` (or vice versa - but they should deterministically choose different sides). + +\*\*\*\*\* + +State whether the ratchet re-synchronization is in progress should be tracked in database via `connections` table new `ratchet_resync_state` field. + +New functional api: + +```haskell +resyncConnectionRatchet :: AgentErrorMonad m => AgentClient -> ConnId -> m ConnectionStats +``` + +or if we want to allow re-synchronizing ratchet at any time even if de-synchronization wasn't diagnosed: + +```haskell +resyncConnectionRatchet :: AgentErrorMonad m => AgentClient -> ConnId -> Bool -> m ConnectionStats +resyncConnectionRatchet c connId force = ... +``` + +Possibly client command? + +``` haskell +data ACommand (p :: AParty) (e :: AEntity) where + ... + RESYNC_RATCHET :: Bool -> ACommand Client AEConn +``` + +New event - `RRESYNC :: RatchetResyncState -> ConnectionStats -> ACommand Agent AEConn` + +```haskell +data RatchetResyncState + = RRStarted + | RRAgreedSnd + | RRAgreedRcv + | RRComplete +``` + +New `ConnectionStats` field - `ratchetResyncState :: Maybe RatchetResyncState`. + +When called, it should: + +- Generate new keys. +- Update database connection state. + - Set `ratchet_desync_state` to NULL. + - Set `ratchet_resync_state` to `RRStarted`. + - Delete old ratchet from `ratchets` (is it safe?), create new ratchet. +- Send `AgentRatchetKey` message. +- Return updated `ConnectionStats` to client. + +> On `RRESYNC` events chat should create chat item, and reset connection verification. +> Parameterized `RRESYNC` allows to distinguish: start and end of re-synchronization for initiating party; chat item direction - `RRESYNC RRStarted` is snd, `RRESYNC RRAgreedSnd/Rcv` and `RRESYNC RRComplete` are rcv (`RRESYNC RRAgreedSnd/Rcv` chat item could be omitted). + +AgentRatchetKey is a new message on the level of AgentMsgEnvelope - encrypted with queue level e2e encryption, but not with connection level e2e encryption (since ratchet de-synchronized). + +```haskell +data AgentMsgEnvelope + = ... + | AgentRatchetKey + { agentVersion :: Version, + e2eEncryption :: E2ERatchetParams 'C.X448, + info :: ByteString -- for extension + } +``` + +On receiving `AgentRatchetKey`, if the receiving client hasn't started the ratchet re-synchronization itself (check `ratchet_resync_state`), it should: + +- Generate new keys and compute new shared secret, initializing ratchet based on keys comparison. +- Update database connection state. + - Set `ratchet_resync_state` to `RRAgreedSnd/Rcv` (depending on whether ratchet was initialized as sending or receiving). + - Delete old ratchet from `ratchets`, create new ratchet. +- Reply with its own `AgentRatchetKey`. +- Notify client with `RRESYNC RRAgreedSnd/Rcv`. +- If ratchet was initialized as sending, send `EREADY` message, notifying other agent ratchet is re-synced. + +New agent message: + +```haskell +data AMessage + = ... + | -- ratchet re-synchronization is complete, with last decrypted sender message id + EREADY PrevExternalSndId +``` + +On receiving `AgentRatchetKey`, if the receiving client started re-sync: + +- Compute new shared secret, initializing ratchet based on keys comparison. +- Update database connection state. + - Set `ratchet_resync_state` to `RRAgreedSnd/Rcv` (depending on whether ratchet was initialized as sending or receiving). + - Update ratchet. +- Notify client with `RRESYNC RRAgreedSnd/Rcv`. +- If ratchet was initialized as sending, send `EREADY` message. + +After agent receives `EREADY` (or any other message that successfully decrypts): + +- Reset `ratchet_resync_state` to NULL. +- Notify client with `RRESYNC RRComplete`. +- If ratchet was initialized as receiving, send reply `EREADY` message. + +### State transitions + +For initiating party: + +``` + +------------+ + | Ratchet ok | + +------------+ + | + | message received, decryption error + * ---->|-----------------------------------+ + | | + V new (clarifying) V + +-----------------+ error +------------------+ + | Re-sync allowed |--------------->| Re-sync required | + +-----------------+ +------------------+ + | | + |-----------------------------------| + | | alternative - message received, + | re-sync started by client | successfully decrypted + V V + +-----------------+ +------------+ + | Re-sync started | | Ratchet ok | + +-----------------+ +------------+ + | + | other party replied with new ratchet key + V + +----------------+ + | Re-sync agreed |----> * message received, decryption error + | snd / rcv | (should remember agreed state for reply EREADY?) + +----------------+ + | + | message received, successfully decrypted + | (can be, but not necessarily, EREADY) + V + +------------+ + | Ratchet ok | + +------------+ +``` + +For replying party: + +``` + +------------+ + | Ratchet ok | + +------------+ + | + | other party sent new ratchet key + V + +----------------+ + | Re-sync agreed | + | snd / rcv | + +----------------+ + | + | message received, successfully decrypted + | (can be, but not necessarily, EREADY) + V + +------------+ + | Ratchet ok | + +------------+ +``` + +### Ratchet state model + +#### 2 state variables + +Above we considered model with separate de-sync and re-sync state. + +| Desync \ Resync | Nothing | RRStarted | RRAgreedSnd | RRAgreedRcv | +| --- | :---: | :---: | :---: | :---: | +| **Nothing** | 1 | 3 | 4 | 4 | +| **RDResyncAllowed** | 2 | | 5 | 5 | +| **RDResyncRequired** | 2 | | 5 | 5 | + +1: Ratchet is ok. + +2: Re-sync diagnosed, not in progress. + +3: Rs-sync started, diagnosing de-sync is prohibited. + +4: Re-sync agreed. + +5: Re-sync agreed, new de-sync is diagnosed. + +Combination 5 is possible in case de-sync was diagnosed before message that could be decrypted is received, for example if `EREADY` failed to deliver and no other decryptable message followed. We shouldn't prohibit diagnosing de-sync in this case, because agent may never exit "Agreed" state (if new decryptable message is never received). We also shouldn't overwrite/forget state of re-sync, even if we diagnose new possible de-sync, because if the decryptable `EREADY` is received and ratchet is in `RRAgreedRcv` state, it should respond with reply `EREADY`. + +Some combinations should be impossible: + + - `RDResyncAllowed` with `RRStarted`. + + - `RDResyncRequired` with `RRStarted`. + +`RDHealed` is equivalent to `Nothing` and only used for `RDESYNC` event, `Maybe RatchetDesyncState` can be replaced with `RatchetDesyncState`, with single new constructor `RDNoDesync` replacing `Nothing` and `RDHealed`. + +`RRComplete` is equivalent to `Nothing` and only used for `RRESYNC` event, `Maybe RatchetResyncState` can be replaced with `RatchetResyncState`, with single new constructor `RDNoResync` replacing `Nothing` and `RRComplete`. + +#### Single state variable + +Another option is two have a single state variable describing ratchet. + +```haskell +data RatchetState + = RSOk + | RSResyncAllowed + | RSResyncRequired + | RSResyncStarted + | RRResyncAgreedSnd + | RRResyncAgreedRcv + +-- When `resyncConnectionRatchet` is not prohibited. Can override with `force`. +-- Currently we check:`(isJust ratchetDesyncState || force) && ratchetResyncState /= Just RRStarted`. +resyncConnectionRatchetAllowed :: RatchetState -> Bool +resyncConnectionRatchetAllowed = \case + RSOk -> False + RSResyncAllowed -> True + RSResyncRequired -> True + RSResyncStarted -> False -- `force` shouldn't override + RRResyncAgreedSnd -> False + RRResyncAgreedRcv -> False + +-- When we register and notify about ratchet de-synchronization. +-- Currently we check: `(isNothing ratchetDesyncState && ratchetResyncState /= Just RRStarted)`. +-- We should also allow to update from Allowed to Required. +shouldNotifyRDESYNC :: RatchetState -> Bool +shouldNotifyRDESYNC = \case + RSOk -> True + RSResyncAllowed -> False -- only if new error implies Required + RSResyncRequired -> False + RSResyncStarted -> False + RRResyncAgreedSnd -> True + RRResyncAgreedRcv -> True + +-- When we prohibit connection switch, for `checkRatchetDesync`. +-- Currently we check: `(ratchetDesyncState == Just RDResyncRequired || ratchetResyncState == Just RRStarted)` +-- Also use in `runSmpQueueMsgDelivery` to pause delivery? +ratchetDesynced :: RatchetState -> Bool +ratchetDesynced = \case + RSOk -> False + RSResyncAllowed -> False + RSResyncRequired -> True + RSResyncStarted -> True + RRResyncAgreedSnd -> False + RRResyncAgreedRcv -> False +``` + +Having a single state variable limits differentiation described for combination 5 in matrix. It also limits possible differentiations in client between events when ratchet is healed on its own, and when ratchet re-sync is completed after agents negotiation. Overall, since matrix is not very sparse and allows for more fine-grained decision-making, having separate state variables for de-sync and re-sync seems preferred. + +#### Single state variable simplified (final version) + +```haskell +data RatchetSyncState + = RSOk + | RSAllowed + | RSRequired + | RSStarted + | RSAgreed + +-- event +RSYNC :: RatchetSyncState -> ConnectionStats -> ACommand Agent AEConn` + +-- ConnectionStats field +ratchetSyncState :: RatchetSyncState +``` + +Updated design decisions: + +1. Single constructor for "Agreed" state. Differentiating `RRResyncAgreedSnd` and `RRResyncAgreedRcv` allowed for easier processing of `EREADY` by helping to determine whether reply `EREADY` has to be sent. However, it duplicated information already present in ratchet's state, and can be instead worked around by remembering and analyzing ratchet state pre decryption. + +2. Prohibit transition from "Agreed" state to "Desync" states. This would make possible edge-cases that leave ratchet in de-synchronized state without ability to progress (e.g. failed delivery of `AgentRatchetKey`), but would simplify state machine by removing dedicated "Desync" variable. Besides, there's still a recovery way with a `force` option. + +3. Treat "Agreed" as unfinished state - prohibit new messages to be enqueued, etc. Reception of any decryptable message transitions ratchet to "Ok" state. + +Possible improvements: + +- Repeatedly triggering re-synchronization while in "Started"/"Agreed" state re-sends same keys and EREADY. +- Cooldown period, during which repeat re-synchronization is prohibited. + +### Skipped messages + +Options: + +1. Ignore skipped messages. +2. Stop sending new messages while connection re-synchronizes (can use `ratchet_resync_state`). + +- Initiator shouldn't send new messages until receives `AgentRatchetKey` from second party. +- Second party knows new shared secret immediately after processing first `AgentRatchetKey`, so it's not necessary to limit? + +3. 2 + Re-send skipped messages first. + +- Add `last_external_snd_msg_id` to `AgentRatchetKey`? + see link above + +4. Re-send only messages after the latest ratchet step. \* + +It may be okay to ignore skipped messages, or at most implement option 2, as ratchet de-synchronization is usually caused by misuse (human error) - the most common cause of ratchet de-sync seems to be sending and receiving messages after running agent with old database backup. In this case user has already seen most skipped messages, and it can be expected to not have them after switching to an old backup. So in this case the only "really skipped" messages are those that were sent during the latest ratchet step and failed to decrypt, triggering ratchet re-sync (\* another option is to only re-send those). + +Besides, depending on time of backup there may be an arbitrary large number of skipped messages, which may consume a lot of traffic and may halt delivery of up-to-date messages for some time. + +It may be better to have request for repeat delivery as a separate feature, that can be requested in necessary contexts - for example for group stability. + +Can servers delivery failure lead to de-sync? If message is lost on server and never delivered, ratchet wouldn't advance, so there's no room for de-sync? If yes, re-evaluate. diff --git a/simplexmq.cabal b/simplexmq.cabal index 601e0a2e5..326f7433c 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -83,6 +83,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230510_files_pending_replicas_indexes Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230516_encrypted_rcv_message_hashes Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230531_switch_status + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230615_ratchet_sync Simplex.Messaging.Agent.TAsyncs Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 9c1185ab2..f9f502ea6 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -64,6 +64,7 @@ module Simplex.Messaging.Agent ackMessage, switchConnection, abortConnectionSwitch, + synchronizeRatchet, suspendConnection, deleteConnection, deleteConnections, @@ -115,7 +116,7 @@ import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (isJust) +import Data.Maybe (isJust, isNothing) import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock @@ -143,7 +144,7 @@ import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfReg import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..)) import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Parsers (parse) -import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags, NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicVerifyKey, UserProtocol, XFTPServerWithAuth) +import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicVerifyKey, UserProtocol, XFTPServerWithAuth) import qualified Simplex.Messaging.Protocol as SMP import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util @@ -273,6 +274,10 @@ switchConnection c = withAgentEnv c . switchConnection' c abortConnectionSwitch :: AgentErrorMonad m => AgentClient -> ConnId -> m ConnectionStats abortConnectionSwitch c = withAgentEnv c . abortConnectionSwitch' c +-- | Re-synchronize connection ratchet keys +synchronizeRatchet :: AgentErrorMonad m => AgentClient -> ConnId -> Bool -> m ConnectionStats +synchronizeRatchet c = withAgentEnv c .: synchronizeRatchet' c + -- | Suspend SMP agent connection (OFF command) suspendConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m () suspendConnection c = withAgentEnv c . suspendConnection' c @@ -461,7 +466,8 @@ newConnNoQueues :: AgentMonad m => AgentClient -> UserId -> ConnId -> Bool -> SC newConnNoQueues c userId connId enableNtfs cMode = do g <- asks idsDrg connAgentVersion <- asks $ maxVersion . smpAgentVRange . config - let cData = ConnData {userId, connId, connAgentVersion, enableNtfs, duplexHandshake = Nothing, deleted = False} -- connection mode is determined by the accepting agent + -- connection mode is determined by the accepting agent + let cData = ConnData {userId, connId, connAgentVersion, enableNtfs, duplexHandshake = Nothing, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk} withStore c $ \db -> createNewConn db g cData cMode joinConnAsync :: AgentMonad m => AgentClient -> UserId -> ACorrId -> Bool -> ConnectionRequestUri c -> ConnInfo -> m ConnId @@ -471,7 +477,7 @@ joinConnAsync c userId corrId enableNtfs cReqUri@(CRInvitationUri ConnReqUriData Just (Compatible connAgentVersion) -> do g <- asks idsDrg let duplexHS = connAgentVersion /= 1 - cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS, deleted = False} + cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk} connId <- withStore c $ \db -> createNewConn db g cData SCMInvitation enqueueCommand c corrId connId Nothing $ AClientCommand $ APC SAEConn $ JOIN enableNtfs (ACR sConnectionMode cReqUri) cInfo pure connId @@ -536,12 +542,18 @@ switchConnectionAsync' c corrId connId = SomeConn _ (DuplexConnection cData rqs@(rq :| _rqs) sqs) | isJust (switchingRQ rqs) -> throwError $ CMD PROHIBITED | otherwise -> do + checkRatchetSync cData $ CMD PROHIBITED rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSwitchStarted enqueueCommand c corrId connId Nothing $ AClientCommand $ APC SAEConn SWCH let rqs' = updatedQs rq1 rqs pure . connectionStats $ DuplexConnection cData rqs' sqs _ -> throwError $ CMD PROHIBITED +checkRatchetSync :: AgentMonad m => ConnData -> AgentErrorType -> m () +checkRatchetSync ConnData {ratchetSyncState} err = + when (ratchetSyncState `elem` ([RSRequired, RSStarted, RSAgreed] :: [RatchetSyncState])) $ + throwError err + newConn :: AgentMonad m => AgentClient -> UserId -> ConnId -> Bool -> SConnectionMode c -> Maybe CRClientData -> m (ConnId, ConnectionRequestUri c) newConn c userId connId enableNtfs cMode clientData = getSMPServer c userId >>= newConnSrv c userId connId enableNtfs cMode clientData @@ -589,7 +601,7 @@ joinConnSrv c userId connId asyncMode enableNtfs (CRInvitationUri ConnReqUriData let rc = CR.initSndRatchet e2eEncryptVRange rcDHRr rcDHRs $ CR.x3dhSnd pk1 pk2 e2eRcvParams q <- newSndQueue userId "" qInfo let duplexHS = connAgentVersion /= 1 - cData = ConnData {userId, connId, connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS, deleted = False} + cData = ConnData {userId, connId, connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk} connId' <- setUpConn asyncMode cData q rc let sq = (q :: SndQueue) {connId = connId'} cData' = (cData :: ConnData) {connId = connId'} @@ -806,7 +818,9 @@ sendMessage' c connId msgFlags msg = withConnLock c connId "sendMessage" $ do _ -> throwError $ CONN SIMPLEX where enqueueMsgs :: ConnData -> NonEmpty SndQueue -> m AgentMsgId - enqueueMsgs cData sqs = enqueueMessages c cData sqs msgFlags $ A_MSG msg + enqueueMsgs cData sqs = do + checkRatchetSync cData $ CMD PROHIBITED + enqueueMessages c cData sqs msgFlags $ A_MSG msg -- / async command processing v v v @@ -980,7 +994,12 @@ runCommandProcessing c@AgentClient {subQ} server_ = do -- ^ ^ ^ async command processing / enqueueMessages :: AgentMonad m => AgentClient -> ConnData -> NonEmpty SndQueue -> MsgFlags -> AMessage -> m AgentMsgId -enqueueMessages c cData (sq :| sqs) msgFlags aMessage = do +enqueueMessages c cData sqs msgFlags aMessage = do + checkRatchetSync cData $ INTERNAL "enqueueMessages: ratchet is not synchronized" + enqueueMessages' c cData sqs msgFlags aMessage + +enqueueMessages' :: AgentMonad m => AgentClient -> ConnData -> NonEmpty SndQueue -> MsgFlags -> AMessage -> m AgentMsgId +enqueueMessages' c cData (sq :| sqs) msgFlags aMessage = do msgId <- enqueueMessage c cData sq msgFlags aMessage mapM_ (enqueueSavedMessage c cData msgId) $ filter (\SndQueue {status} -> status == Secured || status == Active) sqs @@ -1079,6 +1098,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl SMP SMP.AUTH -> case msgType of AM_CONN_INFO -> connError msgId NOT_AVAILABLE AM_CONN_INFO_REPLY -> connError msgId NOT_AVAILABLE + AM_RATCHET_INFO -> connError msgId NOT_AVAILABLE AM_HELLO_ -- in duplexHandshake mode (v2) HELLO is only sent once, without retrying, -- because the queue must be secured by the time the confirmation or the first HELLO is received @@ -1098,6 +1118,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl AM_QKEY_ -> qError msgId "QKEY: AUTH" AM_QUSE_ -> qError msgId "QUSE: AUTH" AM_QTEST_ -> qError msgId "QTEST: AUTH" + AM_EREADY_ -> notifyDel msgId err _ -- for other operations BROKER HOST is treated as a permanent error (e.g., when connecting to the server), -- the message sending would be retried @@ -1121,6 +1142,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl when (isJust rq_) $ removeConfirmations db connId unless (duplexHandshake == Just True) . void $ enqueueMessage c cData sq SMP.noMsgFlags HELLO AM_CONN_INFO_REPLY -> pure () + AM_RATCHET_INFO -> pure () AM_REPLY_ -> pure () AM_HELLO_ -> do withStore' c $ \db -> setSndQueueStatus db sq Active @@ -1178,6 +1200,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl _ -> internalErr msgId "sent QTEST: there is only one queue in connection" _ -> internalErr msgId "sent QTEST: queue not in connection or not replacing another queue" _ -> internalErr msgId "QTEST sent not in duplex connection" + AM_EREADY_ -> pure () delMsg msgId where delMsg :: InternalId -> m () @@ -1219,9 +1242,10 @@ switchConnection' :: AgentMonad m => AgentClient -> ConnId -> m ConnectionStats switchConnection' c connId = withConnLock c connId "switchConnection" $ withStore c (`getConn` connId) >>= \case - SomeConn _ conn@(DuplexConnection _ rqs@(rq :| _rqs) _) + SomeConn _ conn@(DuplexConnection cData rqs@(rq :| _rqs) _) | isJust (switchingRQ rqs) -> throwError $ CMD PROHIBITED | otherwise -> do + checkRatchetSync cData $ CMD PROHIBITED rq' <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSwitchStarted switchDuplexConnection c conn rq' _ -> throwError $ CMD PROHIBITED @@ -1249,6 +1273,7 @@ abortConnectionSwitch' c connId = SomeConn _ (DuplexConnection cData rqs sqs) -> case switchingRQ rqs of Just rq | canAbortRcvSwitch rq -> do + checkRatchetSync cData $ CMD PROHIBITED -- multiple queues to which the connections switches were possible when repeating switch was allowed let (delRqs, keepRqs) = L.partition ((Just (dbQId rq) ==) . dbReplaceQId) rqs case L.nonEmpty keepRqs of @@ -1265,6 +1290,24 @@ abortConnectionSwitch' c connId = _ -> throwError $ CMD PROHIBITED _ -> throwError $ CMD PROHIBITED +synchronizeRatchet' :: AgentMonad m => AgentClient -> ConnId -> Bool -> m ConnectionStats +synchronizeRatchet' c connId force = withConnLock c connId "synchronizeRatchet" $ do + withStore c (`getConn` connId) >>= \case + SomeConn _ (DuplexConnection cData@ConnData {ratchetSyncState} rqs sqs) + | ratchetSyncState `elem` ([RSAllowed, RSRequired] :: [RatchetSyncState]) || force -> do + -- check queues are not switching? + AgentConfig {e2eEncryptVRange} <- asks config + (pk1, pk2, e2eParams@(CR.E2ERatchetParams _ k1 k2)) <- liftIO . CR.generateE2EParams $ maxVersion e2eEncryptVRange + void $ enqueueRatchetKeyMsgs c cData sqs e2eParams + withStore' c $ \db -> do + setConnRatchetSync db connId RSStarted + setRatchetX3dhKeys db connId pk1 pk2 k1 k2 + let cData' = cData {ratchetSyncState = RSStarted} :: ConnData + conn' = DuplexConnection cData' rqs sqs + pure $ connectionStats conn' + | otherwise -> throwError $ CMD PROHIBITED + _ -> throwError $ CMD PROHIBITED + ackQueueMessage :: AgentMonad m => AgentClient -> RcvQueue -> SMP.MsgId -> m () ackQueueMessage c rq srvMsgId = sendAck c rq srvMsgId `catchError` \case @@ -1402,11 +1445,16 @@ getConnectionRatchetAdHash' c connId = do connectionStats :: Connection c -> ConnectionStats connectionStats = \case - RcvConnection _ rq -> ConnectionStats {rcvQueuesInfo = [rcvQueueInfo rq], sndQueuesInfo = []} - SndConnection _ sq -> ConnectionStats {rcvQueuesInfo = [], sndQueuesInfo = [sndQueueInfo sq]} - DuplexConnection _ rqs sqs -> ConnectionStats {rcvQueuesInfo = map rcvQueueInfo $ L.toList rqs, sndQueuesInfo = map sndQueueInfo $ L.toList sqs} - ContactConnection _ rq -> ConnectionStats {rcvQueuesInfo = [rcvQueueInfo rq], sndQueuesInfo = []} - NewConnection _ -> ConnectionStats {rcvQueuesInfo = [], sndQueuesInfo = []} + RcvConnection ConnData {ratchetSyncState} rq -> + ConnectionStats {rcvQueuesInfo = [rcvQueueInfo rq], sndQueuesInfo = [], ratchetSyncState} + SndConnection ConnData {ratchetSyncState} sq -> + ConnectionStats {rcvQueuesInfo = [], sndQueuesInfo = [sndQueueInfo sq], ratchetSyncState} + DuplexConnection ConnData {ratchetSyncState} rqs sqs -> + ConnectionStats {rcvQueuesInfo = map rcvQueueInfo $ L.toList rqs, sndQueuesInfo = map sndQueueInfo $ L.toList sqs, ratchetSyncState} + ContactConnection ConnData {ratchetSyncState} rq -> + ConnectionStats {rcvQueuesInfo = [rcvQueueInfo rq], sndQueuesInfo = [], ratchetSyncState} + NewConnection ConnData {ratchetSyncState} -> + ConnectionStats {rcvQueuesInfo = [], sndQueuesInfo = [], ratchetSyncState} -- | Change servers to be used for creating new queues, in Reader monad setProtocolServers' :: forall p m. (ProtocolTypeI p, UserProtocol p, AgentMonad m) => AgentClient -> UserId -> NonEmpty (ProtoServerWithAuth p) -> m () @@ -1673,6 +1721,7 @@ cleanupManager c@AgentClient {subQ} = do void . runExceptT $ do deleteConns `catchError` (notify "" . ERR) deleteRcvMsgHashes `catchError` (notify "" . ERR) + deleteProcessedRatchetKeyHashes `catchError` (notify "" . ERR) deleteRcvFilesExpired `catchError` (notify "" . RFERR) deleteRcvFilesDeleted `catchError` (notify "" . RFERR) deleteRcvFilesTmpPaths `catchError` (notify "" . RFERR) @@ -1689,6 +1738,9 @@ cleanupManager c@AgentClient {subQ} = do deleteRcvMsgHashes = do rcvMsgHashesTTL <- asks $ rcvMsgHashesTTL . config withStore' c (`deleteRcvMsgHashesExpired` rcvMsgHashesTTL) + deleteProcessedRatchetKeyHashes = do + rkHashesTTL <- asks $ processedRatchetKeyHashesTTL . config + withStore' c (`deleteProcessedRatchetKeyHashesExpired` rkHashesTTL) deleteRcvFilesExpired = do rcvFilesTTL <- asks $ rcvFilesTTL . config rcvExpired <- withStore' c (`getRcvFilesExpired` rcvFilesTTL) @@ -1735,339 +1787,425 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s processSMP rq conn $ connData conn where processSMP :: RcvQueue -> Connection c -> ConnData -> m () - processSMP rq@RcvQueue {e2ePrivKey, e2eDhSecret, status} conn cData@ConnData {userId, connId, duplexHandshake} = withConnLock c connId "processSMP" $ - case cmd of - SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} -> - handleNotifyAck $ - decryptSMPMessage v rq msg >>= \case - SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} -> processClientMsg srvTs msgFlags msgBody - SMP.ClientRcvMsgQuota {} -> queueDrained >> ack - where - queueDrained = case conn of - DuplexConnection _ _ sqs -> void $ enqueueMessages c cData sqs SMP.noMsgFlags $ QCONT (sndAddress rq) - _ -> pure () - processClientMsg srvTs msgFlags msgBody = do - clientMsg@SMP.ClientMsgEnvelope {cmHeader = SMP.PubHeader phVer e2ePubKey_} <- - parseMessage msgBody - clientVRange <- asks $ smpClientVRange . config - unless (phVer `isCompatible` clientVRange) . throwError $ AGENT A_VERSION - case (e2eDhSecret, e2ePubKey_) of - (Nothing, Just e2ePubKey) -> do - let e2eDh = C.dh' e2ePubKey e2ePrivKey - decryptClientMessage e2eDh clientMsg >>= \case - (SMP.PHConfirmation senderKey, AgentConfirmation {e2eEncryption, encConnInfo, agentVersion}) -> - smpConfirmation senderKey e2ePubKey e2eEncryption encConnInfo phVer agentVersion >> ack - (SMP.PHEmpty, AgentInvitation {connReq, connInfo}) -> - smpInvitation connReq connInfo >> ack - _ -> prohibited >> ack - (Just e2eDh, Nothing) -> do - decryptClientMessage e2eDh clientMsg >>= \case - (SMP.PHEmpty, AgentMsgEnvelope _ encAgentMsg) -> do - -- primary queue is set as Active in helloMsg, below is to set additional queues Active - let RcvQueue {primary, dbReplaceQueueId} = rq - unless (status == Active) . withStore' c $ \db -> setRcvQueueStatus db rq Active - case (conn, dbReplaceQueueId) of - (DuplexConnection _ rqs _, Just replacedId) -> do - when primary . withStore' c $ \db -> setRcvQueuePrimary db connId rq - case find ((replacedId ==) . dbQId) rqs of - Just rq'@RcvQueue {server, rcvId} -> do - checkRQSwchStatus rq' RSSendingQUSE - void $ withStore' c $ \db -> setRcvSwitchStatus db rq' $ Just RSReceivedMessage - enqueueCommand c "" connId (Just server) $ AInternalCommand $ ICQDelete rcvId - _ -> notify . ERR . AGENT $ A_QUEUE "replaced RcvQueue not found in connection" - _ -> pure () - let encryptedMsgHash = C.sha256Hash encAgentMsg - tryError (agentClientMsg encryptedMsgHash) >>= \case - Right (Just (msgId, msgMeta, aMessage)) -> case aMessage of - HELLO -> helloMsg >> ackDel msgId - REPLY cReq -> replyMsg cReq >> ackDel msgId - -- note that there is no ACK sent for A_MSG, it is sent with agent's user ACK command - A_MSG body -> do - logServer "<--" c srv rId "MSG " - notify $ MSG msgMeta msgFlags body - QCONT addr -> qDuplex "QCONT" $ continueSending addr - QADD qs -> qDuplex "QADD" $ qAddMsg qs - QKEY qs -> qDuplex "QKEY" $ qKeyMsg qs - QUSE qs -> qDuplex "QUSE" $ qUseMsg qs - -- no action needed for QTEST - -- any message in the new queue will mark it active and trigger deletion of the old queue - QTEST _ -> logServer "<--" c srv rId "MSG " >> ackDel msgId - where - qDuplex :: String -> (Connection 'CDuplex -> m ()) -> m () - qDuplex name a = case conn of - DuplexConnection {} -> a conn >> ackDel msgId - _ -> qError $ name <> ": message must be sent to duplex connection" - Right _ -> prohibited >> ack - Left e@(AGENT A_DUPLICATE) -> do - withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case - Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck} - | userAck -> ackDel internalId - | otherwise -> do - liftEither (parse smpP (AGENT A_MESSAGE) agentMsgBody) >>= \case - AgentMessage _ (A_MSG body) -> do - logServer "<--" c srv rId "MSG " - notify $ MSG msgMeta msgFlags body - _ -> pure () - _ -> checkDuplicateHash e encryptedMsgHash >> ack - Left e -> checkDuplicateHash e encryptedMsgHash >> ack - where - checkDuplicateHash :: AgentErrorType -> ByteString -> m () - checkDuplicateHash e encryptedMsgHash = - unlessM (withStore' c $ \db -> checkRcvMsgHashExists db connId encryptedMsgHash) $ - throwError e - agentClientMsg :: ByteString -> m (Maybe (InternalId, MsgMeta, AMessage)) - agentClientMsg encryptedMsgHash = withStore c $ \db -> runExceptT $ do - agentMsgBody <- agentRatchetDecrypt db connId encAgentMsg - liftEither (parse smpP (SEAgentError $ AGENT A_MESSAGE) agentMsgBody) >>= \case - agentMsg@(AgentMessage APrivHeader {sndMsgId, prevMsgHash} aMessage) -> do - let msgType = agentMessageType agentMsg - internalHash = C.sha256Hash agentMsgBody - internalTs <- liftIO getCurrentTime - (internalId, internalRcvId, prevExtSndId, prevRcvMsgHash) <- liftIO $ updateRcvIds db connId - let integrity = checkMsgIntegrity prevExtSndId sndMsgId prevRcvMsgHash prevMsgHash - recipient = (unId internalId, internalTs) - broker = (srvMsgId, systemToUTCTime srvTs) - msgMeta = MsgMeta {integrity, recipient, broker, sndMsgId} - rcvMsg = RcvMsgData {msgMeta, msgType, msgFlags, msgBody = agentMsgBody, internalRcvId, internalHash, externalPrevSndHash = prevMsgHash, encryptedMsgHash} - liftIO $ createRcvMsg db connId rq rcvMsg - pure $ Just (internalId, msgMeta, aMessage) - _ -> pure Nothing - _ -> prohibited >> ack - _ -> prohibited >> ack - ack :: m () - ack = enqueueCmd $ ICAck rId srvMsgId - ackDel :: InternalId -> m () - ackDel = enqueueCmd . ICAckDel rId srvMsgId - handleNotifyAck :: m () -> m () - handleNotifyAck m = m `catchError` \e -> notify (ERR e) >> ack - SMP.END -> - atomically (TM.lookup tSess smpClients $>>= tryReadTMVar >>= processEND) - >>= logServer "<--" c srv rId - where - processEND = \case - Just (Right clnt) - | sessId == sessionId clnt -> do - removeSubscription c connId - notify' END - pure "END" - | otherwise -> ignored - _ -> ignored - ignored = pure "END from disconnected client - ignored" - _ -> do - logServer "<--" c srv rId $ "unexpected: " <> bshow cmd - notify . ERR $ BROKER (B.unpack $ strEncode srv) UNEXPECTED - where - notify :: forall e. AEntityI e => ACommand 'Agent e -> m () - notify = atomically . notify' - - notify' :: forall e. AEntityI e => ACommand 'Agent e -> STM () - notify' msg = writeTBQueue subQ ("", connId, APC (sAEntity @e) msg) - - prohibited :: m () - prohibited = notify . ERR $ AGENT A_PROHIBITED - - enqueueCmd :: InternalCommand -> m () - enqueueCmd = enqueueCommand c "" connId (Just srv) . AInternalCommand - - decryptClientMessage :: C.DhSecretX25519 -> SMP.ClientMsgEnvelope -> m (SMP.PrivHeader, AgentMsgEnvelope) - decryptClientMessage e2eDh SMP.ClientMsgEnvelope {cmNonce, cmEncBody} = do - clientMsg <- agentCbDecrypt e2eDh cmNonce cmEncBody - SMP.ClientMessage privHeader clientBody <- parseMessage clientMsg - agentEnvelope <- parseMessage clientBody - -- Version check is removed here, because when connecting via v1 contact address the agent still sends v2 message, - -- to allow duplexHandshake mode, in case the receiving agent was updated to v2 after the address was created. - -- aVRange <- asks $ smpAgentVRange . config - -- if agentVersion agentEnvelope `isCompatible` aVRange - -- then pure (privHeader, agentEnvelope) - -- else throwError $ AGENT A_VERSION - pure (privHeader, agentEnvelope) - - parseMessage :: Encoding a => ByteString -> m a - parseMessage = liftEither . parse smpP (AGENT A_MESSAGE) - - smpConfirmation :: C.APublicVerifyKey -> C.PublicKeyX25519 -> Maybe (CR.E2ERatchetParams 'C.X448) -> ByteString -> Version -> Version -> m () - smpConfirmation senderKey e2ePubKey e2eEncryption encConnInfo smpClientVersion agentVersion = do - logServer "<--" c srv rId "MSG " - AgentConfig {smpClientVRange, smpAgentVRange, e2eEncryptVRange} <- asks config - unless - (agentVersion `isCompatible` smpAgentVRange && smpClientVersion `isCompatible` smpClientVRange) - (throwError $ AGENT A_VERSION) - case status of - New -> case (conn, e2eEncryption) of - -- party initiating connection - (RcvConnection {}, Just e2eSndParams@(CR.E2ERatchetParams e2eVersion _ _)) -> do - unless (e2eVersion `isCompatible` e2eEncryptVRange) (throwError $ AGENT A_VERSION) - (pk1, rcDHRs) <- withStore c (`getRatchetX3dhKeys` connId) - let rc = CR.initRcvRatchet e2eEncryptVRange rcDHRs $ CR.x3dhRcv pk1 rcDHRs e2eSndParams - (agentMsgBody_, rc', skipped) <- liftError cryptoError $ CR.rcDecrypt rc M.empty encConnInfo - case (agentMsgBody_, skipped) of - (Right agentMsgBody, CR.SMDNoChange) -> - parseMessage agentMsgBody >>= \case - AgentConnInfo connInfo -> - processConf connInfo SMPConfirmation {senderKey, e2ePubKey, connInfo, smpReplyQueues = [], smpClientVersion} False - AgentConnInfoReply smpQueues connInfo -> - processConf connInfo SMPConfirmation {senderKey, e2ePubKey, connInfo, smpReplyQueues = L.toList smpQueues, smpClientVersion} True - _ -> prohibited - where - processConf connInfo senderConf duplexHS = do - let newConfirmation = NewConfirmation {connId, senderConf, ratchetState = rc'} - g <- asks idsDrg - confId <- withStore c $ \db -> do - setHandshakeVersion db connId agentVersion duplexHS - createConfirmation db g newConfirmation - let srvs = map qServer $ smpReplyQueues senderConf - notify $ CONF confId srvs connInfo - _ -> prohibited - -- party accepting connection - (DuplexConnection _ (RcvQueue {smpClientVersion = v'} :| _) _, Nothing) -> do - withStore c (\db -> runExceptT $ agentRatchetDecrypt db connId encConnInfo) >>= parseMessage >>= \case - AgentConnInfo connInfo -> do - notify $ INFO connInfo - let dhSecret = C.dh' e2ePubKey e2ePrivKey - withStore' c $ \db -> setRcvQueueConfirmedE2E db rq dhSecret $ min v' smpClientVersion - enqueueCmd $ ICDuplexSecure rId senderKey - _ -> prohibited - _ -> prohibited - _ -> prohibited - - helloMsg :: m () - helloMsg = do - logServer "<--" c srv rId "MSG " - case status of - Active -> prohibited - _ -> - case conn of - DuplexConnection _ _ (sq@SndQueue {status = sndStatus} :| _) - -- `sndStatus == Active` when HELLO was previously sent, and this is the reply HELLO - -- this branch is executed by the accepting party in duplexHandshake mode (v2) - -- and by the initiating party in v1 - -- Also see comment where HELLO is sent. - | sndStatus == Active -> notify CON - | duplexHandshake == Just True -> enqueueDuplexHello sq - | otherwise -> pure () + processSMP + rq@RcvQueue {e2ePrivKey, e2eDhSecret, status} + conn + cData@ConnData {userId, connId, duplexHandshake, ratchetSyncState = rss, lastExternalSndId} = + withConnLock c connId "processSMP" $ case cmd of + SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} -> + handleNotifyAck $ + decryptSMPMessage v rq msg >>= \case + SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} -> processClientMsg srvTs msgFlags msgBody + SMP.ClientRcvMsgQuota {} -> queueDrained >> ack + where + queueDrained = case conn of + DuplexConnection _ _ sqs -> void $ enqueueMessages c cData sqs SMP.noMsgFlags $ QCONT (sndAddress rq) _ -> pure () + processClientMsg srvTs msgFlags msgBody = do + clientMsg@SMP.ClientMsgEnvelope {cmHeader = SMP.PubHeader phVer e2ePubKey_} <- + parseMessage msgBody + clientVRange <- asks $ smpClientVRange . config + unless (phVer `isCompatible` clientVRange) . throwError $ AGENT A_VERSION + case (e2eDhSecret, e2ePubKey_) of + (Nothing, Just e2ePubKey) -> do + let e2eDh = C.dh' e2ePubKey e2ePrivKey + decryptClientMessage e2eDh clientMsg >>= \case + (SMP.PHConfirmation senderKey, AgentConfirmation {e2eEncryption_, encConnInfo, agentVersion}) -> + smpConfirmation senderKey e2ePubKey e2eEncryption_ encConnInfo phVer agentVersion >> ack + (SMP.PHEmpty, AgentInvitation {connReq, connInfo}) -> + smpInvitation connReq connInfo >> ack + _ -> prohibited >> ack + (Just e2eDh, Nothing) -> do + decryptClientMessage e2eDh clientMsg >>= \case + (SMP.PHEmpty, AgentRatchetKey {e2eEncryption}) -> + qDuplex "AgentRatchetKey" (newRatchetKey e2eEncryption) >> ack + (SMP.PHEmpty, AgentMsgEnvelope _ encAgentMsg) -> do + -- primary queue is set as Active in helloMsg, below is to set additional queues Active + let RcvQueue {primary, dbReplaceQueueId} = rq + unless (status == Active) . withStore' c $ \db -> setRcvQueueStatus db rq Active + case (conn, dbReplaceQueueId) of + (DuplexConnection _ rqs _, Just replacedId) -> do + when primary . withStore' c $ \db -> setRcvQueuePrimary db connId rq + case find ((replacedId ==) . dbQId) rqs of + Just rq'@RcvQueue {server, rcvId} -> do + checkRQSwchStatus rq' RSSendingQUSE + void $ withStore' c $ \db -> setRcvSwitchStatus db rq' $ Just RSReceivedMessage + enqueueCommand c "" connId (Just server) $ AInternalCommand $ ICQDelete rcvId + _ -> notify . ERR . AGENT $ A_QUEUE "replaced RcvQueue not found in connection" + _ -> pure () + let encryptedMsgHash = C.sha256Hash encAgentMsg + tryError (agentClientMsg encryptedMsgHash) >>= \case + Right (Just (msgId, msgMeta, aMessage, rcPrev)) -> do + resetRatchetSync + case aMessage of + HELLO -> helloMsg >> ackDel msgId + REPLY cReq -> replyMsg cReq >> ackDel msgId + -- note that there is no ACK sent for A_MSG, it is sent with agent's user ACK command + A_MSG body -> do + logServer "<--" c srv rId "MSG " + notify $ MSG msgMeta msgFlags body + QCONT addr -> qDuplexAckDel "QCONT" $ continueSending addr + QADD qs -> qDuplexAckDel "QADD" $ qAddMsg qs + QKEY qs -> qDuplexAckDel "QKEY" $ qKeyMsg qs + QUSE qs -> qDuplexAckDel "QUSE" $ qUseMsg qs + -- no action needed for QTEST + -- any message in the new queue will mark it active and trigger deletion of the old queue + QTEST _ -> logServer "<--" c srv rId "MSG " >> ackDel msgId + EREADY _ -> qDuplexAckDel "EREADY" $ ereadyMsg rcPrev + where + qDuplexAckDel :: String -> (Connection 'CDuplex -> m ()) -> m () + qDuplexAckDel name a = qDuplex name a >> ackDel msgId + resetRatchetSync :: m () + resetRatchetSync = unless (rss `elem` ([RSOk, RSStarted] :: [RatchetSyncState])) $ + qDuplex "ratchet de-sync reset" $ \(DuplexConnection _ rqs sqs) -> do + let cData' = cData {ratchetSyncState = RSOk} :: ConnData + conn' = DuplexConnection cData' rqs sqs + notify . RSYNC RSOk $ connectionStats conn' + withStore' c $ \db -> setConnRatchetSync db connId RSOk + Right _ -> prohibited >> ack + Left e@(AGENT A_DUPLICATE) -> do + withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case + Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck} + | userAck -> ackDel internalId + | otherwise -> do + liftEither (parse smpP (AGENT A_MESSAGE) agentMsgBody) >>= \case + AgentMessage _ (A_MSG body) -> do + logServer "<--" c srv rId "MSG " + notify $ MSG msgMeta msgFlags body + _ -> pure () + _ -> checkDuplicateHash e encryptedMsgHash >> ack + Left (AGENT (A_CRYPTO e)) -> do + exists <- withStore' c $ \db -> checkRcvMsgHashExists db connId encryptedMsgHash + unless exists notifySync + ack + where + notifySync :: m () + notifySync = qDuplex "AGENT A_CRYPTO error" $ \(DuplexConnection _ rqs sqs) -> do + let rss' = cryptoErrToSyncState e + when (rss == RSOk || (rss == RSAllowed && rss' == RSRequired)) $ do + let cData' = cData {ratchetSyncState = rss'} :: ConnData + conn' = DuplexConnection cData' rqs sqs + notify . RSYNC rss' $ connectionStats conn' + withStore' c $ \db -> setConnRatchetSync db connId rss' + Left e -> checkDuplicateHash e encryptedMsgHash >> ack + where + checkDuplicateHash :: AgentErrorType -> ByteString -> m () + checkDuplicateHash e encryptedMsgHash = + unlessM (withStore' c $ \db -> checkRcvMsgHashExists db connId encryptedMsgHash) $ + throwError e + agentClientMsg :: ByteString -> m (Maybe (InternalId, MsgMeta, AMessage, CR.RatchetX448)) + agentClientMsg encryptedMsgHash = withStore c $ \db -> runExceptT $ do + rc <- ExceptT $ getRatchet db connId -- ratchet state pre-decryption - required for processing EREADY + agentMsgBody <- agentRatchetDecrypt' db connId rc encAgentMsg + liftEither (parse smpP (SEAgentError $ AGENT A_MESSAGE) agentMsgBody) >>= \case + agentMsg@(AgentMessage APrivHeader {sndMsgId, prevMsgHash} aMessage) -> do + let msgType = agentMessageType agentMsg + internalHash = C.sha256Hash agentMsgBody + internalTs <- liftIO getCurrentTime + (internalId, internalRcvId, prevExtSndId, prevRcvMsgHash) <- liftIO $ updateRcvIds db connId + let integrity = checkMsgIntegrity prevExtSndId sndMsgId prevRcvMsgHash prevMsgHash + recipient = (unId internalId, internalTs) + broker = (srvMsgId, systemToUTCTime srvTs) + msgMeta = MsgMeta {integrity, recipient, broker, sndMsgId} + rcvMsg = RcvMsgData {msgMeta, msgType, msgFlags, msgBody = agentMsgBody, internalRcvId, internalHash, externalPrevSndHash = prevMsgHash, encryptedMsgHash} + liftIO $ createRcvMsg db connId rq rcvMsg + pure $ Just (internalId, msgMeta, aMessage, rc) + _ -> pure Nothing + _ -> prohibited >> ack + _ -> prohibited >> ack + ack :: m () + ack = enqueueCmd $ ICAck rId srvMsgId + ackDel :: InternalId -> m () + ackDel = enqueueCmd . ICAckDel rId srvMsgId + handleNotifyAck :: m () -> m () + handleNotifyAck m = m `catchError` \e -> notify (ERR e) >> ack + SMP.END -> + atomically (TM.lookup tSess smpClients $>>= tryReadTMVar >>= processEND) + >>= logServer "<--" c srv rId + where + processEND = \case + Just (Right clnt) + | sessId == sessionId clnt -> do + removeSubscription c connId + notify' END + pure "END" + | otherwise -> ignored + _ -> ignored + ignored = pure "END from disconnected client - ignored" + _ -> do + logServer "<--" c srv rId $ "unexpected: " <> bshow cmd + notify . ERR $ BROKER (B.unpack $ strEncode srv) UNEXPECTED + where + notify :: forall e. AEntityI e => ACommand 'Agent e -> m () + notify = atomically . notify' - enqueueDuplexHello :: SndQueue -> m () - enqueueDuplexHello sq = void $ enqueueMessage c cData sq SMP.MsgFlags {notification = True} HELLO + notify' :: forall e. AEntityI e => ACommand 'Agent e -> STM () + notify' msg = writeTBQueue subQ ("", connId, APC (sAEntity @e) msg) - replyMsg :: NonEmpty SMPQueueInfo -> m () - replyMsg smpQueues = do - logServer "<--" c srv rId "MSG " - case duplexHandshake of - Just True -> prohibited - _ -> case conn of - RcvConnection {} -> do - AcceptedConfirmation {ownConnInfo} <- withStore c (`getAcceptedConfirmation` connId) - connectReplyQueues c cData ownConnInfo smpQueues `catchError` (notify . ERR) + prohibited :: m () + prohibited = notify . ERR $ AGENT A_PROHIBITED + + enqueueCmd :: InternalCommand -> m () + enqueueCmd = enqueueCommand c "" connId (Just srv) . AInternalCommand + + decryptClientMessage :: C.DhSecretX25519 -> SMP.ClientMsgEnvelope -> m (SMP.PrivHeader, AgentMsgEnvelope) + decryptClientMessage e2eDh SMP.ClientMsgEnvelope {cmNonce, cmEncBody} = do + clientMsg <- agentCbDecrypt e2eDh cmNonce cmEncBody + SMP.ClientMessage privHeader clientBody <- parseMessage clientMsg + agentEnvelope <- parseMessage clientBody + -- Version check is removed here, because when connecting via v1 contact address the agent still sends v2 message, + -- to allow duplexHandshake mode, in case the receiving agent was updated to v2 after the address was created. + -- aVRange <- asks $ smpAgentVRange . config + -- if agentVersion agentEnvelope `isCompatible` aVRange + -- then pure (privHeader, agentEnvelope) + -- else throwError $ AGENT A_VERSION + pure (privHeader, agentEnvelope) + + parseMessage :: Encoding a => ByteString -> m a + parseMessage = liftEither . parse smpP (AGENT A_MESSAGE) + + smpConfirmation :: C.APublicVerifyKey -> C.PublicKeyX25519 -> Maybe (CR.E2ERatchetParams 'C.X448) -> ByteString -> Version -> Version -> m () + smpConfirmation senderKey e2ePubKey e2eEncryption encConnInfo smpClientVersion agentVersion = do + logServer "<--" c srv rId "MSG " + AgentConfig {smpClientVRange, smpAgentVRange, e2eEncryptVRange} <- asks config + unless + (agentVersion `isCompatible` smpAgentVRange && smpClientVersion `isCompatible` smpClientVRange) + (throwError $ AGENT A_VERSION) + case status of + New -> case (conn, e2eEncryption) of + -- party initiating connection + (RcvConnection {}, Just e2eSndParams@(CR.E2ERatchetParams e2eVersion _ _)) -> do + unless (e2eVersion `isCompatible` e2eEncryptVRange) (throwError $ AGENT A_VERSION) + (pk1, rcDHRs) <- withStore c (`getRatchetX3dhKeys` connId) + let rc = CR.initRcvRatchet e2eEncryptVRange rcDHRs $ CR.x3dhRcv pk1 rcDHRs e2eSndParams + (agentMsgBody_, rc', skipped) <- liftError cryptoError $ CR.rcDecrypt rc M.empty encConnInfo + case (agentMsgBody_, skipped) of + (Right agentMsgBody, CR.SMDNoChange) -> + parseMessage agentMsgBody >>= \case + AgentConnInfo connInfo -> + processConf connInfo SMPConfirmation {senderKey, e2ePubKey, connInfo, smpReplyQueues = [], smpClientVersion} False + AgentConnInfoReply smpQueues connInfo -> + processConf connInfo SMPConfirmation {senderKey, e2ePubKey, connInfo, smpReplyQueues = L.toList smpQueues, smpClientVersion} True + _ -> prohibited + where + processConf connInfo senderConf duplexHS = do + let newConfirmation = NewConfirmation {connId, senderConf, ratchetState = rc'} + g <- asks idsDrg + confId <- withStore c $ \db -> do + setHandshakeVersion db connId agentVersion duplexHS + createConfirmation db g newConfirmation + let srvs = map qServer $ smpReplyQueues senderConf + notify $ CONF confId srvs connInfo + _ -> prohibited + -- party accepting connection + (DuplexConnection _ (RcvQueue {smpClientVersion = v'} :| _) _, Nothing) -> do + withStore c (\db -> runExceptT $ agentRatchetDecrypt db connId encConnInfo) >>= parseMessage >>= \case + AgentConnInfo connInfo -> do + notify $ INFO connInfo + let dhSecret = C.dh' e2ePubKey e2ePrivKey + withStore' c $ \db -> setRcvQueueConfirmedE2E db rq dhSecret $ min v' smpClientVersion + enqueueCmd $ ICDuplexSecure rId senderKey + _ -> prohibited + _ -> prohibited _ -> prohibited - continueSending :: (SMPServer, SMP.SenderId) -> Connection 'CDuplex -> m () - continueSending addr (DuplexConnection _ _ sqs) = - case findQ addr sqs of - Just sq -> do - logServer "<--" c srv rId "MSG " - atomically $ do - (_, qLock) <- getPendingMsgQ c sq - void $ tryPutTMVar qLock () - Nothing -> qError "QCONT: queue address not found" + helloMsg :: m () + helloMsg = do + logServer "<--" c srv rId "MSG " + case status of + Active -> prohibited + _ -> + case conn of + DuplexConnection _ _ (sq@SndQueue {status = sndStatus} :| _) + -- `sndStatus == Active` when HELLO was previously sent, and this is the reply HELLO + -- this branch is executed by the accepting party in duplexHandshake mode (v2) + -- and by the initiating party in v1 + -- Also see comment where HELLO is sent. + | sndStatus == Active -> notify CON + | duplexHandshake == Just True -> enqueueDuplexHello sq + | otherwise -> pure () + _ -> pure () - -- processed by queue sender - qAddMsg :: NonEmpty (SMPQueueUri, Maybe SndQAddr) -> Connection 'CDuplex -> m () - qAddMsg ((_, Nothing) :| _) _ = qError "adding queue without switching is not supported" - qAddMsg ((qUri, Just addr) :| _) (DuplexConnection _ rqs sqs) = do - clientVRange <- asks $ smpClientVRange . config - case qUri `compatibleVersion` clientVRange of - Just qInfo@(Compatible sqInfo@SMPQueueInfo {queueAddress}) -> - case (findQ (qAddress sqInfo) sqs, findQ addr sqs) of - (Just _, _) -> qError "QADD: queue address is already used in connection" - (_, Just sq@SndQueue {dbQueueId}) -> do - let (delSqs, keepSqs) = L.partition ((Just dbQueueId ==) . dbReplaceQId) sqs - case L.nonEmpty keepSqs of - Just sqs' -> do - -- move inside case? - withStore' c $ \db -> mapM_ (deleteConnSndQueue db connId) delSqs - sq_@SndQueue {sndPublicKey, e2ePubKey} <- newSndQueue userId connId qInfo - let sq'' = (sq_ :: SndQueue) {primary = True, dbQueueId, dbReplaceQueueId = Just dbQueueId} - dbId <- withStore c $ \db -> addConnSndQueue db connId sq'' - let sq2 = (sq'' :: SndQueue) {dbQueueId = dbId} - case (sndPublicKey, e2ePubKey) of - (Just sndPubKey, Just dhPublicKey) -> do - logServer "<--" c srv rId $ "MSG " <> logSecret (senderId queueAddress) - let sqInfo' = (sqInfo :: SMPQueueInfo) {queueAddress = queueAddress {dhPublicKey}} - void . enqueueMessages c cData sqs SMP.noMsgFlags $ QKEY [(sqInfo', sndPubKey)] - sq1 <- withStore' c $ \db -> setSndSwitchStatus db sq $ Just SSSendingQKEY - let sqs'' = updatedQs sq1 sqs' <> [sq2] - conn' = DuplexConnection cData rqs sqs'' - notify . SWITCH QDSnd SPStarted $ connectionStats conn' - _ -> qError "absent sender keys" - _ -> qError "QADD: won't delete all snd queues in connection" - _ -> qError "QADD: replaced queue address is not found in connection" - _ -> throwError $ AGENT A_VERSION + enqueueDuplexHello :: SndQueue -> m () + enqueueDuplexHello sq = void $ enqueueMessage c cData sq SMP.MsgFlags {notification = True} HELLO - -- processed by queue recipient - qKeyMsg :: NonEmpty (SMPQueueInfo, SndPublicVerifyKey) -> Connection 'CDuplex -> m () - qKeyMsg ((qInfo, senderKey) :| _) (DuplexConnection _ rqs _) = do - clientVRange <- asks $ smpClientVRange . config - unless (qInfo `isCompatible` clientVRange) . throwError $ AGENT A_VERSION - case findRQ (smpServer, senderId) rqs of - Just rq'@RcvQueue {rcvId, e2ePrivKey = dhPrivKey, smpClientVersion = cVer, status = status'} - | status' == New || status' == Confirmed -> do - checkRQSwchStatus rq RSSendingQADD - logServer "<--" c srv rId $ "MSG " <> logSecret senderId - let dhSecret = C.dh' dhPublicKey dhPrivKey - withStore' c $ \db -> setRcvQueueConfirmedE2E db rq' dhSecret $ min cVer cVer' - enqueueCommand c "" connId (Just smpServer) $ AInternalCommand $ ICQSecure rcvId senderKey - notify . SWITCH QDRcv SPConfirmed $ connectionStats conn - | otherwise -> qError "QKEY: queue already secured" - _ -> qError "QKEY: queue address not found in connection" - where - SMPQueueInfo cVer' SMPQueueAddress {smpServer, senderId, dhPublicKey} = qInfo + replyMsg :: NonEmpty SMPQueueInfo -> m () + replyMsg smpQueues = do + logServer "<--" c srv rId "MSG " + case duplexHandshake of + Just True -> prohibited + _ -> case conn of + RcvConnection {} -> do + AcceptedConfirmation {ownConnInfo} <- withStore c (`getAcceptedConfirmation` connId) + connectReplyQueues c cData ownConnInfo smpQueues `catchError` (notify . ERR) + _ -> prohibited - -- processed by queue sender - -- mark queue as Secured and to start sending messages to it - qUseMsg :: NonEmpty ((SMPServer, SMP.SenderId), Bool) -> Connection 'CDuplex -> m () - -- NOTE: does not yet support the change of the primary status during the rotation - qUseMsg ((addr, _primary) :| _) (DuplexConnection _ rqs sqs) = - case findQ addr sqs of - Just sq'@SndQueue {dbReplaceQueueId = Just replaceQId} -> do - case find ((replaceQId ==) . dbQId) sqs of - Just sq1 -> do - checkSQSwchStatus sq1 SSSendingQKEY - logServer "<--" c srv rId $ "MSG " <> logSecret (snd addr) - withStore' c $ \db -> setSndQueueStatus db sq' Secured - let sq'' = (sq' :: SndQueue) {status = Secured} - -- sending QTEST to the new queue only, the old one will be removed if sent successfully - void $ enqueueMessages c cData [sq''] SMP.noMsgFlags $ QTEST [addr] - sq1' <- withStore' c $ \db -> setSndSwitchStatus db sq1 $ Just SSSendingQTEST - let sqs' = updatedQs sq1' sqs - conn' = DuplexConnection cData rqs sqs' - notify . SWITCH QDSnd SPSecured $ connectionStats conn' - _ -> qError "QUSE: switching SndQueue not found in connection" - _ -> qError "QUSE: switched queue address not found in connection" + continueSending :: (SMPServer, SMP.SenderId) -> Connection 'CDuplex -> m () + continueSending addr (DuplexConnection _ _ sqs) = + case findQ addr sqs of + Just sq -> do + logServer "<--" c srv rId "MSG " + atomically $ do + (_, qLock) <- getPendingMsgQ c sq + void $ tryPutTMVar qLock () + Nothing -> qError "QCONT: queue address not found" - qError :: String -> m () - qError = throwError . AGENT . A_QUEUE + -- processed by queue sender + qAddMsg :: NonEmpty (SMPQueueUri, Maybe SndQAddr) -> Connection 'CDuplex -> m () + qAddMsg ((_, Nothing) :| _) _ = qError "adding queue without switching is not supported" + qAddMsg ((qUri, Just addr) :| _) (DuplexConnection _ rqs sqs) = do + checkRatchetSync cData $ AGENT (A_QUEUE "ratchet is not synchronized") + clientVRange <- asks $ smpClientVRange . config + case qUri `compatibleVersion` clientVRange of + Just qInfo@(Compatible sqInfo@SMPQueueInfo {queueAddress}) -> + case (findQ (qAddress sqInfo) sqs, findQ addr sqs) of + (Just _, _) -> qError "QADD: queue address is already used in connection" + (_, Just sq@SndQueue {dbQueueId}) -> do + let (delSqs, keepSqs) = L.partition ((Just dbQueueId ==) . dbReplaceQId) sqs + case L.nonEmpty keepSqs of + Just sqs' -> do + -- move inside case? + withStore' c $ \db -> mapM_ (deleteConnSndQueue db connId) delSqs + sq_@SndQueue {sndPublicKey, e2ePubKey} <- newSndQueue userId connId qInfo + let sq'' = (sq_ :: SndQueue) {primary = True, dbQueueId, dbReplaceQueueId = Just dbQueueId} + dbId <- withStore c $ \db -> addConnSndQueue db connId sq'' + let sq2 = (sq'' :: SndQueue) {dbQueueId = dbId} + case (sndPublicKey, e2ePubKey) of + (Just sndPubKey, Just dhPublicKey) -> do + logServer "<--" c srv rId $ "MSG " <> logSecret (senderId queueAddress) + let sqInfo' = (sqInfo :: SMPQueueInfo) {queueAddress = queueAddress {dhPublicKey}} + void . enqueueMessages c cData sqs SMP.noMsgFlags $ QKEY [(sqInfo', sndPubKey)] + sq1 <- withStore' c $ \db -> setSndSwitchStatus db sq $ Just SSSendingQKEY + let sqs'' = updatedQs sq1 sqs' <> [sq2] + conn' = DuplexConnection cData rqs sqs'' + notify . SWITCH QDSnd SPStarted $ connectionStats conn' + _ -> qError "absent sender keys" + _ -> qError "QADD: won't delete all snd queues in connection" + _ -> qError "QADD: replaced queue address is not found in connection" + _ -> throwError $ AGENT A_VERSION - smpInvitation :: ConnectionRequestUri 'CMInvitation -> ConnInfo -> m () - smpInvitation connReq@(CRInvitationUri crData _) cInfo = do - logServer "<--" c srv rId "MSG " - case conn of - ContactConnection {} -> do - g <- asks idsDrg - let newInv = NewInvitation {contactConnId = connId, connReq, recipientConnInfo = cInfo} - invId <- withStore c $ \db -> createInvitation db g newInv - let srvs = L.map qServer $ crSmpQueues crData - notify $ REQ invId srvs cInfo - _ -> prohibited + -- processed by queue recipient + qKeyMsg :: NonEmpty (SMPQueueInfo, SndPublicVerifyKey) -> Connection 'CDuplex -> m () + qKeyMsg ((qInfo, senderKey) :| _) (DuplexConnection _ rqs _) = do + checkRatchetSync cData $ AGENT (A_QUEUE "ratchet is not synchronized") + clientVRange <- asks $ smpClientVRange . config + unless (qInfo `isCompatible` clientVRange) . throwError $ AGENT A_VERSION + case findRQ (smpServer, senderId) rqs of + Just rq'@RcvQueue {rcvId, e2ePrivKey = dhPrivKey, smpClientVersion = cVer, status = status'} + | status' == New || status' == Confirmed -> do + checkRQSwchStatus rq RSSendingQADD + logServer "<--" c srv rId $ "MSG " <> logSecret senderId + let dhSecret = C.dh' dhPublicKey dhPrivKey + withStore' c $ \db -> setRcvQueueConfirmedE2E db rq' dhSecret $ min cVer cVer' + enqueueCommand c "" connId (Just smpServer) $ AInternalCommand $ ICQSecure rcvId senderKey + notify . SWITCH QDRcv SPConfirmed $ connectionStats conn + | otherwise -> qError "QKEY: queue already secured" + _ -> qError "QKEY: queue address not found in connection" + where + SMPQueueInfo cVer' SMPQueueAddress {smpServer, senderId, dhPublicKey} = qInfo - checkMsgIntegrity :: PrevExternalSndId -> ExternalSndId -> PrevRcvMsgHash -> ByteString -> MsgIntegrity - checkMsgIntegrity prevExtSndId extSndId internalPrevMsgHash receivedPrevMsgHash - | extSndId == prevExtSndId + 1 && internalPrevMsgHash == receivedPrevMsgHash = MsgOk - | extSndId < prevExtSndId = MsgError $ MsgBadId extSndId - | extSndId == prevExtSndId = MsgError MsgDuplicate -- ? deduplicate - | extSndId > prevExtSndId + 1 = MsgError $ MsgSkipped (prevExtSndId + 1) (extSndId - 1) - | internalPrevMsgHash /= receivedPrevMsgHash = MsgError MsgBadHash - | otherwise = MsgError MsgDuplicate -- this case is not possible + -- processed by queue sender + -- mark queue as Secured and to start sending messages to it + qUseMsg :: NonEmpty ((SMPServer, SMP.SenderId), Bool) -> Connection 'CDuplex -> m () + -- NOTE: does not yet support the change of the primary status during the rotation + qUseMsg ((addr, _primary) :| _) (DuplexConnection _ rqs sqs) = do + checkRatchetSync cData $ AGENT (A_QUEUE "ratchet is not synchronized") + case findQ addr sqs of + Just sq'@SndQueue {dbReplaceQueueId = Just replaceQId} -> do + case find ((replaceQId ==) . dbQId) sqs of + Just sq1 -> do + checkSQSwchStatus sq1 SSSendingQKEY + logServer "<--" c srv rId $ "MSG " <> logSecret (snd addr) + withStore' c $ \db -> setSndQueueStatus db sq' Secured + let sq'' = (sq' :: SndQueue) {status = Secured} + -- sending QTEST to the new queue only, the old one will be removed if sent successfully + void $ enqueueMessages c cData [sq''] SMP.noMsgFlags $ QTEST [addr] + sq1' <- withStore' c $ \db -> setSndSwitchStatus db sq1 $ Just SSSendingQTEST + let sqs' = updatedQs sq1' sqs + conn' = DuplexConnection cData rqs sqs' + notify . SWITCH QDSnd SPSecured $ connectionStats conn' + _ -> qError "QUSE: switching SndQueue not found in connection" + _ -> qError "QUSE: switched queue address not found in connection" + + qError :: String -> m () + qError = throwError . AGENT . A_QUEUE + + ereadyMsg :: CR.RatchetX448 -> Connection 'CDuplex -> m () + ereadyMsg rcPrev (DuplexConnection cData' _ sqs) = do + let CR.Ratchet {rcSnd} = rcPrev + -- if ratchet was initialized as receiving, it means EREADY wasn't sent on key negotiation + when (isNothing rcSnd) $ + void . enqueueMessages' c cData' sqs SMP.MsgFlags {notification = True} $ EREADY lastExternalSndId + + smpInvitation :: ConnectionRequestUri 'CMInvitation -> ConnInfo -> m () + smpInvitation connReq@(CRInvitationUri crData _) cInfo = do + logServer "<--" c srv rId "MSG " + case conn of + ContactConnection {} -> do + g <- asks idsDrg + let newInv = NewInvitation {contactConnId = connId, connReq, recipientConnInfo = cInfo} + invId <- withStore c $ \db -> createInvitation db g newInv + let srvs = L.map qServer $ crSmpQueues crData + notify $ REQ invId srvs cInfo + _ -> prohibited + + qDuplex :: String -> (Connection 'CDuplex -> m ()) -> m () + qDuplex name a = case conn of + DuplexConnection {} -> a conn + _ -> qError $ name <> ": message must be sent to duplex connection" + + newRatchetKey :: CR.E2ERatchetParams 'C.X448 -> Connection 'CDuplex -> m () + newRatchetKey e2eOtherPartyParams@(CR.E2ERatchetParams e2eVersion k1Rcv k2Rcv) (DuplexConnection cData' rqs sqs) = + unlessM ratchetExists $ do + AgentConfig {e2eEncryptVRange} <- asks config + unless (e2eVersion `isCompatible` e2eEncryptVRange) (throwError $ AGENT A_VERSION) + keys <- getSendRatchetKeys + initRatchet e2eEncryptVRange keys + notifyAgreed + where + rkHashRcv = rkHash k1Rcv k2Rcv + rkHash k1 k2 = C.sha256Hash $ C.pubKeyBytes k1 <> C.pubKeyBytes k2 + ratchetExists :: m Bool + ratchetExists = withStore' c $ \db -> do + exists <- checkProcessedRatchetKeyHashExists db connId rkHashRcv + unless exists $ addProcessedRatchetKeyHash db connId rkHashRcv + pure exists + getSendRatchetKeys :: m (C.PrivateKeyX448, C.PrivateKeyX448, C.PublicKeyX448, C.PublicKeyX448) + getSendRatchetKeys + | rss == RSStarted = withStore c (`getRatchetX3dhKeys'` connId) + | otherwise = do + (pk1, pk2, e2eParams@(CR.E2ERatchetParams _ k1 k2)) <- liftIO . CR.generateE2EParams $ version e2eOtherPartyParams + void $ enqueueRatchetKeyMsgs c cData' sqs e2eParams + pure (pk1, pk2, k1, k2) + notifyAgreed :: m () + notifyAgreed = do + let cData'' = cData' {ratchetSyncState = RSAgreed} :: ConnData + conn' = DuplexConnection cData'' rqs sqs + notify . RSYNC RSAgreed $ connectionStats conn' + recreateRatchet :: CR.Ratchet 'C.X448 -> m () + recreateRatchet rc = withStore' c $ \db -> do + setConnRatchetSync db connId RSAgreed + deleteRatchet db connId + createRatchet db connId rc + -- compare public keys `k1` in AgentRatchetKey messages sent by self and other party + -- to determine ratchet initilization ordering + initRatchet :: VersionRange -> (C.PrivateKeyX448, C.PrivateKeyX448, C.PublicKeyX448, C.PublicKeyX448) -> m () + initRatchet e2eEncryptVRange (pk1, pk2, k1, k2) + | rkHash k1 k2 <= rkHashRcv = do + recreateRatchet $ CR.initRcvRatchet e2eEncryptVRange pk2 $ CR.x3dhRcv pk1 pk2 e2eOtherPartyParams + | otherwise = do + (_, rcDHRs) <- liftIO C.generateKeyPair' + recreateRatchet $ CR.initSndRatchet e2eEncryptVRange k2Rcv rcDHRs $ CR.x3dhSnd pk1 pk2 e2eOtherPartyParams + void . enqueueMessages' c cData' sqs SMP.MsgFlags {notification = True} $ EREADY lastExternalSndId + + checkMsgIntegrity :: PrevExternalSndId -> ExternalSndId -> PrevRcvMsgHash -> ByteString -> MsgIntegrity + checkMsgIntegrity prevExtSndId extSndId internalPrevMsgHash receivedPrevMsgHash + | extSndId == prevExtSndId + 1 && internalPrevMsgHash == receivedPrevMsgHash = MsgOk + | extSndId < prevExtSndId = MsgError $ MsgBadId extSndId + | extSndId == prevExtSndId = MsgError MsgDuplicate -- ? deduplicate + | extSndId > prevExtSndId + 1 = MsgError $ MsgSkipped (prevExtSndId + 1) (extSndId - 1) + | internalPrevMsgHash /= receivedPrevMsgHash = MsgError MsgBadHash + | otherwise = MsgError MsgDuplicate -- this case is not possible checkRQSwchStatus :: AgentMonad m => RcvQueue -> RcvSwitchStatus -> m () checkRQSwchStatus rq@RcvQueue {rcvSwchStatus} expected = @@ -2095,7 +2233,7 @@ connectReplyQueues c cData@ConnData {userId, connId} ownConnInfo (qInfo :| _) = enqueueConfirmation c cData sq {dbQueueId} ownConnInfo Nothing confirmQueue :: forall m. AgentMonad m => Compatible Version -> AgentClient -> ConnData -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> m () -confirmQueue (Compatible agentVersion) c cData@ConnData {connId} sq srv connInfo e2eEncryption = do +confirmQueue (Compatible agentVersion) c cData@ConnData {connId} sq srv connInfo e2eEncryption_ = do aMessage <- mkAgentMessage agentVersion msg <- mkConfirmation aMessage sendConfirmation c sq msg @@ -2105,7 +2243,7 @@ confirmQueue (Compatible agentVersion) c cData@ConnData {connId} sq srv connInfo mkConfirmation aMessage = withStore c $ \db -> runExceptT $ do void . liftIO $ updateSndIds db connId encConnInfo <- agentRatchetEncrypt db connId (smpEncode aMessage) e2eEncConnInfoLength - pure . smpEncode $ AgentConfirmation {agentVersion, e2eEncryption, encConnInfo} + pure . smpEncode $ AgentConfirmation {agentVersion, e2eEncryption_, encConnInfo} mkAgentMessage :: Version -> m AgentMessage mkAgentMessage 1 = pure $ AgentConnInfo connInfo mkAgentMessage _ = do @@ -2113,7 +2251,7 @@ confirmQueue (Compatible agentVersion) c cData@ConnData {connId} sq srv connInfo pure $ AgentConnInfoReply (qInfo :| []) connInfo enqueueConfirmation :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> m () -enqueueConfirmation c cData@ConnData {connId, connAgentVersion} sq connInfo e2eEncryption = do +enqueueConfirmation c cData@ConnData {connId, connAgentVersion} sq connInfo e2eEncryption_ = do resumeMsgDelivery c cData sq msgId <- storeConfirmation queuePendingMsgs c sq [msgId] @@ -2126,7 +2264,35 @@ enqueueConfirmation c cData@ConnData {connId, connAgentVersion} sq connInfo e2eE agentMsgStr = smpEncode agentMsg internalHash = C.sha256Hash agentMsgStr encConnInfo <- agentRatchetEncrypt db connId agentMsgStr e2eEncConnInfoLength - let msgBody = smpEncode $ AgentConfirmation {agentVersion = connAgentVersion, e2eEncryption, encConnInfo} + let msgBody = smpEncode $ AgentConfirmation {agentVersion = connAgentVersion, e2eEncryption_, encConnInfo} + msgType = agentMessageType agentMsg + msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgBody, msgFlags = SMP.MsgFlags {notification = True}, internalHash, prevMsgHash} + liftIO $ createSndMsg db connId msgData + liftIO $ createSndMsgDelivery db connId sq internalId + pure internalId + +enqueueRatchetKeyMsgs :: forall m. AgentMonad m => AgentClient -> ConnData -> NonEmpty SndQueue -> CR.E2ERatchetParams 'C.X448 -> m AgentMsgId +enqueueRatchetKeyMsgs c cData (sq :| sqs) e2eEncryption = do + msgId <- enqueueRatchetKey c cData sq e2eEncryption + mapM_ (enqueueSavedMessage c cData msgId) $ + filter (\SndQueue {status} -> status == Secured || status == Active) sqs + pure msgId + +enqueueRatchetKey :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> CR.E2ERatchetParams 'C.X448 -> m AgentMsgId +enqueueRatchetKey c cData@ConnData {connId, connAgentVersion} sq e2eEncryption = do + resumeMsgDelivery c cData sq + msgId <- storeRatchetKey + queuePendingMsgs c sq [msgId] + pure $ unId msgId + where + storeRatchetKey :: m InternalId + storeRatchetKey = withStore c $ \db -> runExceptT $ do + internalTs <- liftIO getCurrentTime + (internalId, internalSndId, prevMsgHash) <- liftIO $ updateSndIds db connId + let agentMsg = AgentRatchetInfo "" + agentMsgStr = smpEncode agentMsg + internalHash = C.sha256Hash agentMsgStr + let msgBody = smpEncode $ AgentRatchetKey {agentVersion = connAgentVersion, e2eEncryption, info = agentMsgStr} msgType = agentMessageType agentMsg msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgBody, msgFlags = SMP.MsgFlags {notification = True}, internalHash, prevMsgHash} liftIO $ createSndMsg db connId msgData @@ -2145,6 +2311,10 @@ agentRatchetEncrypt db connId msg paddedLen = do agentRatchetDecrypt :: DB.Connection -> ConnId -> ByteString -> ExceptT StoreError IO ByteString agentRatchetDecrypt db connId encAgentMsg = do rc <- ExceptT $ getRatchet db connId + agentRatchetDecrypt' db connId rc encAgentMsg + +agentRatchetDecrypt' :: DB.Connection -> ConnId -> CR.RatchetX448 -> ByteString -> ExceptT StoreError IO ByteString +agentRatchetDecrypt' db connId rc encAgentMsg = do skipped <- liftIO $ getSkippedMsgKeys db connId (agentMsgBody_, rc', skippedDiff) <- liftE (SEAgentError . cryptoError) $ CR.rcDecrypt rc skipped encAgentMsg liftIO $ updateRatchet db connId rc' skippedDiff diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 3664cbb5b..e199f0b79 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -83,6 +83,7 @@ data AgentConfig = AgentConfig initialCleanupDelay :: Int64, cleanupInterval :: Int64, rcvMsgHashesTTL :: NominalDiffTime, + processedRatchetKeyHashesTTL :: NominalDiffTime, rcvFilesTTL :: NominalDiffTime, sndFilesTTL :: NominalDiffTime, xftpNotifyErrsOnRetry :: Bool, @@ -147,6 +148,7 @@ defaultAgentConfig = initialCleanupDelay = 30 * 1000000, -- 30 seconds cleanupInterval = 30 * 60 * 1000000, -- 30 minutes rcvMsgHashesTTL = 30 * nominalDay, + processedRatchetKeyHashesTTL = 30 * nominalDay, rcvFilesTTL = 2 * nominalDay, sndFilesTTL = nominalDay, xftpNotifyErrsOnRetry = True, diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index 1a4f1f83f..98a4cad5b 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -62,6 +62,7 @@ module Simplex.Messaging.Agent.Protocol RcvSwitchStatus (..), SndSwitchStatus (..), QueueDirection (..), + RatchetSyncState (..), SMPConfirmation (..), AgentMsgEnvelope (..), AgentMessage (..), @@ -98,6 +99,7 @@ module Simplex.Messaging.Agent.Protocol BrokerErrorType (..), SMPAgentError (..), AgentCryptoError (..), + cryptoErrToSyncState, ATransmission, ATransmissionOrError, ARawTransmission, @@ -324,6 +326,7 @@ data ACommand (p :: AParty) (e :: AEntity) where DOWN :: SMPServer -> [ConnId] -> ACommand Agent AENone UP :: SMPServer -> [ConnId] -> ACommand Agent AENone SWITCH :: QueueDirection -> SwitchPhase -> ConnectionStats -> ACommand Agent AEConn + RSYNC :: RatchetSyncState -> ConnectionStats -> ACommand Agent AEConn SEND :: MsgFlags -> MsgBody -> ACommand Client AEConn MID :: AgentMsgId -> ACommand Agent AEConn SENT :: AgentMsgId -> ACommand Agent AEConn @@ -382,6 +385,7 @@ data ACommandTag (p :: AParty) (e :: AEntity) where DOWN_ :: ACommandTag Agent AENone UP_ :: ACommandTag Agent AENone SWITCH_ :: ACommandTag Agent AEConn + RSYNC_ :: ACommandTag Agent AEConn SEND_ :: ACommandTag Client AEConn MID_ :: ACommandTag Agent AEConn SENT_ :: ACommandTag Agent AEConn @@ -433,6 +437,7 @@ aCommandTag = \case DOWN {} -> DOWN_ UP {} -> UP_ SWITCH {} -> SWITCH_ + RSYNC {} -> RSYNC_ SEND {} -> SEND_ MID _ -> MID_ SENT _ -> SENT_ @@ -559,6 +564,41 @@ instance ToJSON SndSwitchStatus where instance FromJSON SndSwitchStatus where parseJSON = strParseJSON "SndSwitchStatus" +data RatchetSyncState + = RSOk + | RSAllowed + | RSRequired + | RSStarted + | RSAgreed + deriving (Eq, Show) + +instance StrEncoding RatchetSyncState where + strEncode = \case + RSOk -> "ok" + RSAllowed -> "allowed" + RSRequired -> "required" + RSStarted -> "started" + RSAgreed -> "agreed" + strP = + A.takeTill (== ' ') >>= \case + "ok" -> pure RSOk + "allowed" -> pure RSAllowed + "required" -> pure RSRequired + "started" -> pure RSStarted + "agreed" -> pure RSAgreed + _ -> fail "bad RatchetSyncState" + +instance FromField RatchetSyncState where fromField = fromTextField_ $ eitherToMaybe . strDecode . encodeUtf8 + +instance ToField RatchetSyncState where toField = toField . decodeLatin1 . strEncode + +instance ToJSON RatchetSyncState where + toEncoding = strToJEncoding + toJSON = strToJSON + +instance FromJSON RatchetSyncState where + parseJSON = strParseJSON "RatchetSyncState" + data RcvQueueInfo = RcvQueueInfo { rcvServer :: SMPServer, rcvSwitchStatus :: Maybe RcvSwitchStatus, @@ -597,17 +637,21 @@ instance StrEncoding SndQueueInfo where data ConnectionStats = ConnectionStats { rcvQueuesInfo :: [RcvQueueInfo], - sndQueuesInfo :: [SndQueueInfo] + sndQueuesInfo :: [SndQueueInfo], + ratchetSyncState :: RatchetSyncState } deriving (Eq, Show, Generic) instance StrEncoding ConnectionStats where - strEncode ConnectionStats {rcvQueuesInfo, sndQueuesInfo} = - "rcv=" <> strEncodeList rcvQueuesInfo <> " snd=" <> strEncodeList sndQueuesInfo + strEncode ConnectionStats {rcvQueuesInfo, sndQueuesInfo, ratchetSyncState} = + "rcv=" <> strEncodeList rcvQueuesInfo + <> (" snd=" <> strEncodeList sndQueuesInfo) + <> (" sync=" <> strEncode ratchetSyncState) strP = do rcvQueuesInfo <- "rcv=" *> strListP sndQueuesInfo <- " snd=" *> strListP - pure ConnectionStats {rcvQueuesInfo, sndQueuesInfo} + ratchetSyncState <- " sync=" *> strP + pure ConnectionStats {rcvQueuesInfo, sndQueuesInfo, ratchetSyncState} instance ToJSON ConnectionStats where toEncoding = J.genericToEncoding J.defaultOptions @@ -710,7 +754,7 @@ data SMPConfirmation = SMPConfirmation data AgentMsgEnvelope = AgentConfirmation { agentVersion :: Version, - e2eEncryption :: Maybe (E2ERatchetParams 'C.X448), + e2eEncryption_ :: Maybe (E2ERatchetParams 'C.X448), encConnInfo :: ByteString } | AgentMsgEnvelope @@ -722,22 +766,29 @@ data AgentMsgEnvelope connReq :: ConnectionRequestUri 'CMInvitation, connInfo :: ByteString -- this message is only encrypted with per-queue E2E, not with double ratchet, } + | AgentRatchetKey + { agentVersion :: Version, + e2eEncryption :: E2ERatchetParams 'C.X448, + info :: ByteString + } deriving (Show) instance Encoding AgentMsgEnvelope where smpEncode = \case - AgentConfirmation {agentVersion, e2eEncryption, encConnInfo} -> - smpEncode (agentVersion, 'C', e2eEncryption, Tail encConnInfo) + AgentConfirmation {agentVersion, e2eEncryption_, encConnInfo} -> + smpEncode (agentVersion, 'C', e2eEncryption_, Tail encConnInfo) AgentMsgEnvelope {agentVersion, encAgentMessage} -> smpEncode (agentVersion, 'M', Tail encAgentMessage) AgentInvitation {agentVersion, connReq, connInfo} -> smpEncode (agentVersion, 'I', Large $ strEncode connReq, Tail connInfo) + AgentRatchetKey {agentVersion, e2eEncryption, info} -> + smpEncode (agentVersion, 'R', e2eEncryption, Tail info) smpP = do agentVersion <- smpP smpP >>= \case 'C' -> do - (e2eEncryption, Tail encConnInfo) <- smpP - pure AgentConfirmation {agentVersion, e2eEncryption, encConnInfo} + (e2eEncryption_, Tail encConnInfo) <- smpP + pure AgentConfirmation {agentVersion, e2eEncryption_, encConnInfo} 'M' -> do Tail encAgentMessage <- smpP pure AgentMsgEnvelope {agentVersion, encAgentMessage} @@ -745,15 +796,21 @@ instance Encoding AgentMsgEnvelope where connReq <- strDecode . unLarge <$?> smpP Tail connInfo <- smpP pure AgentInvitation {agentVersion, connReq, connInfo} + 'R' -> do + e2eEncryption <- smpP + Tail info <- smpP + pure AgentRatchetKey {agentVersion, e2eEncryption, info} _ -> fail "bad AgentMsgEnvelope" -- SMP agent message formats (after double ratchet decryption, -- or in case of AgentInvitation - in plain text body) +-- AgentRatchetInfo is not encrypted with double ratchet, but with per-queue E2E encryption data AgentMessage = AgentConnInfo ConnInfo | -- AgentConnInfoReply is only used in duplexHandshake mode (v2), allowing to include reply queue(s) in the initial confirmation. -- It makes REPLY message unnecessary. AgentConnInfoReply (L.NonEmpty SMPQueueInfo) ConnInfo + | AgentRatchetInfo ByteString | AgentMessage APrivHeader AMessage deriving (Show) @@ -761,17 +818,20 @@ instance Encoding AgentMessage where smpEncode = \case AgentConnInfo cInfo -> smpEncode ('I', Tail cInfo) AgentConnInfoReply smpQueues cInfo -> smpEncode ('D', smpQueues, Tail cInfo) -- 'D' stands for "duplex" + AgentRatchetInfo info -> smpEncode ('R', Tail info) AgentMessage hdr aMsg -> smpEncode ('M', hdr, aMsg) smpP = smpP >>= \case 'I' -> AgentConnInfo . unTail <$> smpP 'D' -> AgentConnInfoReply <$> smpP <*> (unTail <$> smpP) + 'R' -> AgentRatchetInfo . unTail <$> smpP 'M' -> AgentMessage <$> smpP <*> smpP _ -> fail "bad AgentMessage" data AgentMessageType = AM_CONN_INFO | AM_CONN_INFO_REPLY + | AM_RATCHET_INFO | AM_HELLO_ | AM_REPLY_ | AM_A_MSG_ @@ -780,12 +840,14 @@ data AgentMessageType | AM_QKEY_ | AM_QUSE_ | AM_QTEST_ + | AM_EREADY_ deriving (Eq, Show) instance Encoding AgentMessageType where smpEncode = \case AM_CONN_INFO -> "C" AM_CONN_INFO_REPLY -> "D" + AM_RATCHET_INFO -> "S" AM_HELLO_ -> "H" AM_REPLY_ -> "R" AM_A_MSG_ -> "M" @@ -794,10 +856,12 @@ instance Encoding AgentMessageType where AM_QKEY_ -> "QK" AM_QUSE_ -> "QU" AM_QTEST_ -> "QT" + AM_EREADY_ -> "E" smpP = A.anyChar >>= \case 'C' -> pure AM_CONN_INFO 'D' -> pure AM_CONN_INFO_REPLY + 'S' -> pure AM_RATCHET_INFO 'H' -> pure AM_HELLO_ 'R' -> pure AM_REPLY_ 'M' -> pure AM_A_MSG_ @@ -809,12 +873,14 @@ instance Encoding AgentMessageType where 'U' -> pure AM_QUSE_ 'T' -> pure AM_QTEST_ _ -> fail "bad AgentMessageType" + 'E' -> pure AM_EREADY_ _ -> fail "bad AgentMessageType" agentMessageType :: AgentMessage -> AgentMessageType agentMessageType = \case AgentConnInfo _ -> AM_CONN_INFO AgentConnInfoReply {} -> AM_CONN_INFO_REPLY + AgentRatchetInfo _ -> AM_RATCHET_INFO AgentMessage _ aMsg -> case aMsg of -- HELLO is used both in v1 and in v2, but differently. -- - in v1 (and, possibly, in v2 for simplex connections) can be sent multiple times, @@ -829,6 +895,7 @@ agentMessageType = \case QKEY _ -> AM_QKEY_ QUSE _ -> AM_QUSE_ QTEST _ -> AM_QTEST_ + EREADY _ -> AM_EREADY_ data APrivHeader = APrivHeader { -- | sequential ID assigned by the sending agent @@ -852,6 +919,7 @@ data AMsgType | QKEY_ | QUSE_ | QTEST_ + | EREADY_ deriving (Eq) instance Encoding AMsgType where @@ -864,6 +932,7 @@ instance Encoding AMsgType where QKEY_ -> "QK" QUSE_ -> "QU" QTEST_ -> "QT" + EREADY_ -> "E" smpP = A.anyChar >>= \case 'H' -> pure HELLO_ @@ -877,6 +946,7 @@ instance Encoding AMsgType where 'U' -> pure QUSE_ 'T' -> pure QTEST_ _ -> fail "bad AMsgType" + 'E' -> pure EREADY_ _ -> fail "bad AMsgType" -- | Messages sent between SMP agents once SMP queue is secured. @@ -899,6 +969,8 @@ data AMessage QUSE (L.NonEmpty (SndQAddr, Bool)) | -- sent by the sender to test new queues and to complete switching QTEST (L.NonEmpty SndQAddr) + | -- ratchet re-synchronization is complete, with last decrypted sender message id (recipient's `last_external_snd_msg_id`) + EREADY Int64 deriving (Show) type SndQAddr = (SMPServer, SMP.SenderId) @@ -913,6 +985,7 @@ instance Encoding AMessage where QKEY qs -> smpEncode (QKEY_, qs) QUSE qs -> smpEncode (QUSE_, qs) QTEST qs -> smpEncode (QTEST_, qs) + EREADY lastDecryptedMsgId -> smpEncode (EREADY_, lastDecryptedMsgId) smpP = smpP >>= \case @@ -924,6 +997,7 @@ instance Encoding AMessage where QKEY_ -> QKEY <$> smpP QUSE_ -> QUSE <$> smpP QTEST_ -> QTEST <$> smpP + EREADY_ -> EREADY <$> smpP instance forall m. ConnectionModeI m => StrEncoding (ConnectionRequestUri m) where strEncode = \case @@ -1419,6 +1493,14 @@ instance Arbitrary SMPAgentError where arbitrary = genericArbitraryU instance Arbitrary AgentCryptoError where arbitrary = genericArbitraryU +cryptoErrToSyncState :: AgentCryptoError -> RatchetSyncState +cryptoErrToSyncState = \case + DECRYPT_AES -> RSAllowed + DECRYPT_CB -> RSAllowed + RATCHET_HEADER -> RSRequired + RATCHET_EARLIER _ -> RSAllowed + RATCHET_SKIPPED _ -> RSRequired + -- | SMP agent command and response parser for commands passed via network (only parses binary length) networkCommandP :: Parser ACmd networkCommandP = commandP A.takeByteString @@ -1448,6 +1530,7 @@ instance StrEncoding ACmdTag where "DOWN" -> nt DOWN_ "UP" -> nt UP_ "SWITCH" -> ct SWITCH_ + "RSYNC" -> ct RSYNC_ "SEND" -> t SEND_ "MID" -> ct MID_ "SENT" -> ct SENT_ @@ -1501,6 +1584,7 @@ instance (APartyI p, AEntityI e) => StrEncoding (ACommandTag p e) where DOWN_ -> "DOWN" UP_ -> "UP" SWITCH_ -> "SWITCH" + RSYNC_ -> "RSYNC" SEND_ -> "SEND" MID_ -> "MID" SENT_ -> "SENT" @@ -1568,6 +1652,7 @@ commandP binaryP = DOWN_ -> s (DOWN <$> strP_ <*> connections) UP_ -> s (UP <$> strP_ <*> connections) SWITCH_ -> s (SWITCH <$> strP_ <*> strP_ <*> strP) + RSYNC_ -> s (RSYNC <$> strP_ <*> strP) MID_ -> s (MID <$> A.decimal) SENT_ -> s (SENT <$> A.decimal) MERR_ -> s (MERR <$> A.decimal <* A.space <*> strP) @@ -1626,6 +1711,7 @@ serializeCommand = \case DOWN srv conns -> B.unwords [s DOWN_, s srv, connections conns] UP srv conns -> B.unwords [s UP_, s srv, connections conns] SWITCH dir phase srvs -> s (SWITCH_, dir, phase, srvs) + RSYNC rrState cstats -> s (RSYNC_, rrState, cstats) SEND msgFlags msgBody -> B.unwords [s SEND_, smpEncode msgFlags, serializeBinary msgBody] MID mId -> s (MID_, Str $ bshow mId) SENT mId -> s (SENT_, Str $ bshow mId) diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 7fbe9dfe2..871a82563 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -293,7 +293,9 @@ data ConnData = ConnData connAgentVersion :: Version, enableNtfs :: Bool, duplexHandshake :: Maybe Bool, -- added in agent protocol v2 - deleted :: Bool + lastExternalSndId :: PrevExternalSndId, + deleted :: Bool, + ratchetSyncState :: RatchetSyncState } deriving (Eq, Show) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 6a9341b74..eec4169dd 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -53,6 +53,10 @@ module Simplex.Messaging.Agent.Store.SQLite getConnData, setConnDeleted, getDeletedConnIds, + setConnRatchetSync, + addProcessedRatchetKeyHash, + checkProcessedRatchetKeyHashExists, + deleteProcessedRatchetKeyHashesExpired, getRcvConn, getRcvQueueById, getSndQueueById, @@ -107,7 +111,11 @@ module Simplex.Messaging.Agent.Store.SQLite -- Double ratchet persistence createRatchetX3dhKeys, getRatchetX3dhKeys, + createRatchetX3dhKeys', + getRatchetX3dhKeys', + setRatchetX3dhKeys, createRatchet, + deleteRatchet, getRatchet, getSkippedMsgKeys, updateRatchet, @@ -1029,6 +1037,35 @@ getRatchetX3dhKeys db connId = Right (Just k1, Just k2) -> Right (k1, k2) _ -> Left SEX3dhKeysNotFound +createRatchetX3dhKeys' :: DB.Connection -> ConnId -> C.PrivateKeyX448 -> C.PrivateKeyX448 -> C.PublicKeyX448 -> C.PublicKeyX448 -> IO () +createRatchetX3dhKeys' db connId x3dhPrivKey1 x3dhPrivKey2 x3dhPubKey1 x3dhPubKey2 = + DB.execute + db + "INSERT INTO ratchets (conn_id, x3dh_priv_key_1, x3dh_priv_key_2, x3dh_pub_key_1, x3dh_pub_key_2) VALUES (?,?,?,?,?)" + (connId, x3dhPrivKey1, x3dhPrivKey2, x3dhPubKey1, x3dhPubKey2) + +getRatchetX3dhKeys' :: DB.Connection -> ConnId -> IO (Either StoreError (C.PrivateKeyX448, C.PrivateKeyX448, C.PublicKeyX448, C.PublicKeyX448)) +getRatchetX3dhKeys' db connId = + fmap hasKeys $ + firstRow id SEX3dhKeysNotFound $ + DB.query db "SELECT x3dh_priv_key_1, x3dh_priv_key_2, x3dh_pub_key_1, x3dh_pub_key_2 FROM ratchets WHERE conn_id = ?" (Only connId) + where + hasKeys = \case + Right (Just pk1, Just pk2, Just k1, Just k2) -> Right (pk1, pk2, k1, k2) + _ -> Left SEX3dhKeysNotFound + +-- used to remember new keys when starting ratchet re-synchronization +setRatchetX3dhKeys :: DB.Connection -> ConnId -> C.PrivateKeyX448 -> C.PrivateKeyX448 -> C.PublicKeyX448 -> C.PublicKeyX448 -> IO () +setRatchetX3dhKeys db connId x3dhPrivKey1 x3dhPrivKey2 x3dhPubKey1 x3dhPubKey2 = + DB.execute + db + [sql| + UPDATE ratchets + SET x3dh_priv_key_1 = ?, x3dh_priv_key_2 = ?, x3dh_pub_key_1 = ?, x3dh_pub_key_2 = ? + WHERE conn_id = ? + |] + (x3dhPrivKey1, x3dhPrivKey2, x3dhPubKey1, x3dhPubKey2, connId) + createRatchet :: DB.Connection -> ConnId -> RatchetX448 -> IO () createRatchet db connId rc = DB.executeNamed @@ -1039,10 +1076,16 @@ createRatchet db connId rc = ON CONFLICT (conn_id) DO UPDATE SET ratchet_state = :ratchet_state, x3dh_priv_key_1 = NULL, - x3dh_priv_key_2 = NULL + x3dh_priv_key_2 = NULL, + x3dh_pub_key_1 = NULL, + x3dh_pub_key_2 = NULL |] [":conn_id" := connId, ":ratchet_state" := rc] +deleteRatchet :: DB.Connection -> ConnId -> IO () +deleteRatchet db connId = + DB.execute db "DELETE FROM ratchets WHERE conn_id = ?" (Only connId) + getRatchet :: DB.Connection -> ConnId -> IO (Either StoreError RatchetX448) getRatchet db connId = firstRow' ratchet SERatchetNotFound $ DB.query db "SELECT ratchet_state FROM ratchets WHERE conn_id = ?" (Only connId) @@ -1643,10 +1686,21 @@ getAnyConns_ deleted' db connIds = forM connIds $ E.handle handleDBError . getAn handleDBError = pure . Left . SEInternal . bshow getConnData :: DB.Connection -> ConnId -> IO (Maybe (ConnData, ConnectionMode)) -getConnData dbConn connId' = - maybeFirstRow cData $ DB.query dbConn "SELECT user_id, conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake, deleted FROM connections WHERE conn_id = ?;" (Only connId') +getConnData db connId' = + maybeFirstRow cData $ + DB.query + db + [sql| + SELECT + user_id, conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake, + last_external_snd_msg_id, deleted, ratchet_sync_state + FROM connections + WHERE conn_id = ? + |] + (Only connId') where - cData (userId, connId, cMode, connAgentVersion, enableNtfs_, duplexHandshake, deleted) = (ConnData {userId, connId, connAgentVersion, enableNtfs = fromMaybe True enableNtfs_, duplexHandshake, deleted}, cMode) + cData (userId, connId, cMode, connAgentVersion, enableNtfs_, duplexHandshake, lastExternalSndId, deleted, ratchetSyncState) = + (ConnData {userId, connId, connAgentVersion, enableNtfs = fromMaybe True enableNtfs_, duplexHandshake, lastExternalSndId, deleted, ratchetSyncState}, cMode) setConnDeleted :: DB.Connection -> ConnId -> IO () setConnDeleted db connId = DB.execute db "UPDATE connections SET deleted = ? WHERE conn_id = ?" (True, connId) @@ -1654,6 +1708,30 @@ setConnDeleted db connId = DB.execute db "UPDATE connections SET deleted = ? WHE getDeletedConnIds :: DB.Connection -> IO [ConnId] getDeletedConnIds db = map fromOnly <$> DB.query db "SELECT conn_id FROM connections WHERE deleted = ?" (Only True) +setConnRatchetSync :: DB.Connection -> ConnId -> RatchetSyncState -> IO () +setConnRatchetSync db connId ratchetSyncState = + DB.execute db "UPDATE connections SET ratchet_sync_state = ? WHERE conn_id = ?" (ratchetSyncState, connId) + +addProcessedRatchetKeyHash :: DB.Connection -> ConnId -> ByteString -> IO () +addProcessedRatchetKeyHash db connId hash = + DB.execute db "INSERT INTO processed_ratchet_key_hashes (conn_id, hash) VALUES (?,?)" (connId, hash) + +checkProcessedRatchetKeyHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool +checkProcessedRatchetKeyHashExists db connId hash = do + fromMaybe False + <$> maybeFirstRow + fromOnly + ( DB.query + db + "SELECT 1 FROM processed_ratchet_key_hashes WHERE conn_id = ? AND hash = ? LIMIT 1" + (connId, hash) + ) + +deleteProcessedRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> IO () +deleteProcessedRatchetKeyHashesExpired db ttl = do + cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime + DB.execute db "DELETE FROM processed_ratchet_key_hashes WHERE created_at < ?" (Only cutoffTs) + -- | returns all connection queues, the first queue is the primary one getRcvQueuesByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueue)) getRcvQueuesByConnId_ db connId = diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index 4e03c4e1a..bc91cd8f0 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -61,6 +61,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230510_files_pending_replicas_indexes import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230516_encrypted_rcv_message_hashes import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230531_switch_status +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230615_ratchet_sync import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON) import Simplex.Messaging.Transport.Client (TransportHost) @@ -88,7 +89,8 @@ schemaMigrations = ("m20230401_snd_files", m20230401_snd_files, Just down_m20230401_snd_files), ("m20230510_files_pending_replicas_indexes", m20230510_files_pending_replicas_indexes, Just down_m20230510_files_pending_replicas_indexes), ("m20230516_encrypted_rcv_message_hashes", m20230516_encrypted_rcv_message_hashes, Just down_m20230516_encrypted_rcv_message_hashes), - ("m20230531_switch_status", m20230531_switch_status, Just down_m20230531_switch_status) + ("m20230531_switch_status", m20230531_switch_status, Just down_m20230531_switch_status), + ("m20230615_ratchet_sync", m20230615_ratchet_sync, Just down_m20230615_ratchet_sync) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230615_ratchet_sync.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230615_ratchet_sync.hs new file mode 100644 index 000000000..38db93060 --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230615_ratchet_sync.hs @@ -0,0 +1,41 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230615_ratchet_sync where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +-- Ratchet public keys are saved when ratchet re-synchronization is started - upon receiving other party's public keys, +-- keys are compared to determine ratchet initialization ordering for both parties. +-- This solves a possible race when both parties start ratchet re-synchronization at the same time. +m20230615_ratchet_sync :: Query +m20230615_ratchet_sync = + [sql| +ALTER TABLE connections ADD COLUMN ratchet_sync_state TEXT NOT NULL DEFAULT 'ok'; + +ALTER TABLE ratchets ADD COLUMN x3dh_pub_key_1 BLOB; +ALTER TABLE ratchets ADD COLUMN x3dh_pub_key_2 BLOB; + +CREATE TABLE processed_ratchet_key_hashes( + processed_ratchet_key_hash_id INTEGER PRIMARY KEY, + conn_id BLOB NOT NULL REFERENCES connections ON DELETE CASCADE, + hash BLOB NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) +); + +CREATE INDEX idx_processed_ratchet_key_hashes_hash ON processed_ratchet_key_hashes(conn_id, hash); +|] + +down_m20230615_ratchet_sync :: Query +down_m20230615_ratchet_sync = + [sql| +DROP INDEX idx_processed_ratchet_key_hashes_hash; + +DROP TABLE processed_ratchet_key_hashes; + +ALTER TABLE ratchets DROP COLUMN x3dh_pub_key_2; +ALTER TABLE ratchets DROP COLUMN x3dh_pub_key_1; + +ALTER TABLE connections DROP COLUMN ratchet_sync_state; +|] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index c391e484c..ca07c8244 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -25,7 +25,8 @@ CREATE TABLE connections( enable_ntfs INTEGER, deleted INTEGER DEFAULT 0 CHECK(deleted NOT NULL), user_id INTEGER CHECK(user_id NOT NULL) - REFERENCES users ON DELETE CASCADE + REFERENCES users ON DELETE CASCADE, + ratchet_sync_state TEXT NOT NULL DEFAULT 'ok' ) WITHOUT ROWID; CREATE TABLE rcv_queues( host TEXT NOT NULL, @@ -154,6 +155,9 @@ CREATE TABLE ratchets( -- ratchet is initially empty on the receiving side(the side offering the connection) ratchet_state BLOB, e2e_version INTEGER NOT NULL DEFAULT 1 + , + x3dh_pub_key_1 BLOB, + x3dh_pub_key_2 BLOB ) WITHOUT ROWID; CREATE TABLE skipped_messages( skipped_message_id INTEGER PRIMARY KEY, @@ -356,6 +360,13 @@ CREATE TABLE encrypted_rcv_message_hashes( created_at TEXT NOT NULL DEFAULT(datetime('now')), updated_at TEXT NOT NULL DEFAULT(datetime('now')) ); +CREATE TABLE processed_ratchet_key_hashes( + processed_ratchet_key_hash_id INTEGER PRIMARY KEY, + conn_id BLOB NOT NULL REFERENCES connections ON DELETE CASCADE, + hash BLOB NOT NULL, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); CREATE UNIQUE INDEX idx_rcv_queues_ntf ON rcv_queues(host, port, ntf_id); CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(conn_id, rcv_queue_id); CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(conn_id, snd_queue_id); @@ -446,3 +457,7 @@ CREATE INDEX idx_encrypted_rcv_message_hashes_hash ON encrypted_rcv_message_hash conn_id, hash ); +CREATE INDEX idx_processed_ratchet_key_hashes_hash ON processed_ratchet_key_hashes( + conn_id, + hash +); diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index c95d8f304..c1a70dec8 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -125,16 +125,16 @@ runRight action = Right x -> pure x Left e -> error $ "Unexpected error: " <> show e -getInAnyOrder :: HasCallStack => AgentClient -> [AEntityTransmission 'AEConn -> Bool] -> Expectation +getInAnyOrder :: HasCallStack => AgentClient -> [ATransmission 'Agent -> Bool] -> Expectation getInAnyOrder _ [] = pure () getInAnyOrder c rs = do - r <- get c + r <- pGet c let rest = filter (not . expected r) rs if length rest < length rs then getInAnyOrder c rest else error $ "unexpected event: " <> show r where - expected :: AEntityTransmission 'AEConn -> (AEntityTransmission 'AEConn -> Bool) -> Bool + expected :: ATransmission 'Agent -> (ATransmission 'Agent -> Bool) -> Bool expected r rp = rp r functionalAPITests :: ATransport -> Spec @@ -169,8 +169,17 @@ functionalAPITests t = do testDuplicateMessage t it "should report error via msg integrity on skipped messages" $ testSkippedMessages t - it "should report decryption error on ratchet becoming out of sync" $ - testDecryptionError t + describe "Ratchet synchronization" $ do + it "should report ratchet de-synchronization, synchronize ratchets" $ + testRatchetSync t + it "should synchronize ratchets after server being offline" $ + testRatchetSyncServerOffline t + it "should synchronize ratchets after client restart" $ + testRatchetSyncClientRestart t + it "should synchronize ratchets after suspend/foreground" $ + testRatchetSyncSuspendForeground t + it "should synchronize ratchets when clients start synchronization simultaneously" $ + testRatchetSyncSimultaneous t describe "Inactive client disconnection" $ do it "should disconnect clients if it was inactive longer than TTL" $ testInactiveClientDisconnected t @@ -639,53 +648,222 @@ testSkippedMessages t = do get bob2 =##> \case ("", c, Msg "hello 6") -> c == aliceId; _ -> False ackMessage bob2 aliceId 6 -testDecryptionError :: HasCallStack => ATransport -> IO () -testDecryptionError t = do +testRatchetSync :: HasCallStack => ATransport -> IO () +testRatchetSync t = do alice <- getSMPAgentClient' agentCfg initAgentServers testDB bob <- getSMPAgentClient' agentCfg initAgentServers testDB2 withSmpServerStoreMsgLogOn t testPort $ \_ -> do - (aliceId, bobId) <- runRight $ makeConnection alice bob + (aliceId, bobId, bob2) <- setupDesynchronizedRatchet alice bob + runRight $ do + ConnectionStats {ratchetSyncState} <- synchronizeRatchet bob2 aliceId False + liftIO $ ratchetSyncState `shouldBe` RSStarted + + get alice =##> ratchetSyncP bobId RSAgreed + + get bob2 =##> ratchetSyncP aliceId RSAgreed + + get alice =##> ratchetSyncP bobId RSOk + + get bob2 =##> ratchetSyncP aliceId RSOk + + exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 + +setupDesynchronizedRatchet :: HasCallStack => AgentClient -> AgentClient -> IO (ConnId, ConnId, AgentClient) +setupDesynchronizedRatchet alice bob = do + (aliceId, bobId) <- runRight $ makeConnection alice bob + runRight_ $ do + 4 <- sendMessage alice bobId SMP.noMsgFlags "hello" + get alice ##> ("", bobId, SENT 4) + get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False + ackMessage bob aliceId 4 + + 5 <- sendMessage bob aliceId SMP.noMsgFlags "hello 2" + get bob ##> ("", aliceId, SENT 5) + get alice =##> \case ("", c, Msg "hello 2") -> c == bobId; _ -> False + ackMessage alice bobId 5 + + liftIO $ copyFile testDB2 (testDB2 <> ".bak") + + 6 <- sendMessage alice bobId SMP.noMsgFlags "hello 3" + get alice ##> ("", bobId, SENT 6) + get bob =##> \case ("", c, Msg "hello 3") -> c == aliceId; _ -> False + ackMessage bob aliceId 6 + + 7 <- sendMessage bob aliceId SMP.noMsgFlags "hello 4" + get bob ##> ("", aliceId, SENT 7) + get alice =##> \case ("", c, Msg "hello 4") -> c == bobId; _ -> False + ackMessage alice bobId 7 + + disconnectAgentClient bob + + -- importing database backup after progressing ratchet de-synchronizes ratchet + liftIO $ renameFile (testDB2 <> ".bak") testDB2 + + bob2 <- getSMPAgentClient' agentCfg initAgentServers testDB2 + + runRight_ $ do + subscribeConnection bob2 aliceId + + Left Agent.CMD {cmdErr = PROHIBITED} <- runExceptT $ synchronizeRatchet bob2 aliceId False + + 8 <- sendMessage alice bobId SMP.noMsgFlags "hello 5" + get alice ##> ("", bobId, SENT 8) + get bob2 =##> ratchetSyncP aliceId RSRequired + + Left Agent.CMD {cmdErr = PROHIBITED} <- runExceptT $ sendMessage bob2 aliceId SMP.noMsgFlags "hello 6" + pure () + + pure (aliceId, bobId, bob2) + +ratchetSyncP :: ConnId -> RatchetSyncState -> AEntityTransmission 'AEConn -> Bool +ratchetSyncP cId rrs = \case + (_, cId', RSYNC rrs' ConnectionStats {ratchetSyncState}) -> + cId' == cId && rrs' == rrs && ratchetSyncState == rrs + _ -> False + +ratchetSyncP' :: ConnId -> RatchetSyncState -> ATransmission 'Agent -> Bool +ratchetSyncP' cId rrs = \case + (_, cId', APC SAEConn (RSYNC rrs' ConnectionStats {ratchetSyncState})) -> + cId' == cId && rrs' == rrs && ratchetSyncState == rrs + _ -> False + +testRatchetSyncServerOffline :: HasCallStack => ATransport -> IO () +testRatchetSyncServerOffline t = do + alice <- getSMPAgentClient' agentCfg initAgentServers testDB + bob <- getSMPAgentClient' agentCfg initAgentServers testDB2 + (aliceId, bobId, bob2) <- withSmpServerStoreMsgLogOn t testPort $ \_ -> + setupDesynchronizedRatchet alice bob + + ("", "", DOWN _ _) <- nGet alice + ("", "", DOWN _ _) <- nGet bob2 + + ConnectionStats {ratchetSyncState} <- runRight $ synchronizeRatchet bob2 aliceId False + liftIO $ ratchetSyncState `shouldBe` RSStarted + + withSmpServerStoreMsgLogOn t testPort $ \_ -> do runRight_ $ do - 4 <- sendMessage alice bobId SMP.noMsgFlags "hello" - get alice ##> ("", bobId, SENT 4) - get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False - ackMessage bob aliceId 4 + liftIO . getInAnyOrder alice $ + [ ratchetSyncP' bobId RSAgreed, + serverUpP + ] - 5 <- sendMessage bob aliceId SMP.noMsgFlags "hello 2" - get bob ##> ("", aliceId, SENT 5) - get alice =##> \case ("", c, Msg "hello 2") -> c == bobId; _ -> False - ackMessage alice bobId 5 + liftIO . getInAnyOrder bob2 $ + [ ratchetSyncP' aliceId RSAgreed, + serverUpP + ] - liftIO $ copyFile testDB2 (testDB2 <> ".bak") + get alice =##> ratchetSyncP bobId RSOk - 6 <- sendMessage alice bobId SMP.noMsgFlags "hello 3" - get alice ##> ("", bobId, SENT 6) - get bob =##> \case ("", c, Msg "hello 3") -> c == aliceId; _ -> False - ackMessage bob aliceId 6 + get bob2 =##> ratchetSyncP aliceId RSOk - 7 <- sendMessage bob aliceId SMP.noMsgFlags "hello 4" - get bob ##> ("", aliceId, SENT 7) - get alice =##> \case ("", c, Msg "hello 4") -> c == bobId; _ -> False - ackMessage alice bobId 7 + exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 - disconnectAgentClient bob +serverUpP :: ATransmission 'Agent -> Bool +serverUpP = \case + ("", "", APC SAENone (UP _ _)) -> True + _ -> False - -- importing database backup after progressing ratchet de-synchronizes ratchet, - -- this will be fixed by ratchet re-negotiation - liftIO $ renameFile (testDB2 <> ".bak") testDB2 +testRatchetSyncClientRestart :: HasCallStack => ATransport -> IO () +testRatchetSyncClientRestart t = do + alice <- getSMPAgentClient' agentCfg initAgentServers testDB + bob <- getSMPAgentClient' agentCfg initAgentServers testDB2 + (aliceId, bobId, bob2) <- withSmpServerStoreMsgLogOn t testPort $ \_ -> + setupDesynchronizedRatchet alice bob - bob2 <- getSMPAgentClient' agentCfg initAgentServers testDB2 + ("", "", DOWN _ _) <- nGet alice + ("", "", DOWN _ _) <- nGet bob2 + ConnectionStats {ratchetSyncState} <- runRight $ synchronizeRatchet bob2 aliceId False + liftIO $ ratchetSyncState `shouldBe` RSStarted + + disconnectAgentClient bob2 + + bob3 <- getSMPAgentClient' agentCfg initAgentServers testDB2 + + withSmpServerStoreMsgLogOn t testPort $ \_ -> do runRight_ $ do - subscribeConnection bob2 aliceId + ("", "", UP _ _) <- nGet alice - 8 <- sendMessage alice bobId SMP.noMsgFlags "hello 5" - get alice ##> ("", bobId, SENT 8) - get bob2 =##> \case ("", c, ERR AGENT {agentErr = A_CRYPTO {cryptoErr = RATCHET_HEADER}}) -> c == aliceId; _ -> False + subscribeConnection bob3 aliceId - 6 <- sendMessage bob2 aliceId SMP.noMsgFlags "hello 6" - get bob2 ##> ("", aliceId, SENT 6) - get alice =##> \case ("", c, ERR AGENT {agentErr = A_CRYPTO {cryptoErr = RATCHET_HEADER}}) -> c == bobId; _ -> False + get alice =##> ratchetSyncP bobId RSAgreed + + get bob3 =##> ratchetSyncP aliceId RSAgreed + + get alice =##> ratchetSyncP bobId RSOk + + get bob3 =##> ratchetSyncP aliceId RSOk + + exchangeGreetingsMsgIds alice bobId 12 bob3 aliceId 9 + +testRatchetSyncSuspendForeground :: HasCallStack => ATransport -> IO () +testRatchetSyncSuspendForeground t = do + alice <- getSMPAgentClient' agentCfg initAgentServers testDB + bob <- getSMPAgentClient' agentCfg initAgentServers testDB2 + (aliceId, bobId, bob2) <- withSmpServerStoreMsgLogOn t testPort $ \_ -> + setupDesynchronizedRatchet alice bob + + ("", "", DOWN _ _) <- nGet alice + ("", "", DOWN _ _) <- nGet bob2 + + ConnectionStats {ratchetSyncState} <- runRight $ synchronizeRatchet bob2 aliceId False + liftIO $ ratchetSyncState `shouldBe` RSStarted + + suspendAgent bob2 0 + threadDelay 100000 + foregroundAgent bob2 + + withSmpServerStoreMsgLogOn t testPort $ \_ -> do + runRight_ $ do + liftIO . getInAnyOrder alice $ + [ ratchetSyncP' bobId RSAgreed, + serverUpP + ] + + liftIO . getInAnyOrder bob2 $ + [ ratchetSyncP' aliceId RSAgreed, + serverUpP + ] + + get alice =##> ratchetSyncP bobId RSOk + + get bob2 =##> ratchetSyncP aliceId RSOk + + exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 + +testRatchetSyncSimultaneous :: HasCallStack => ATransport -> IO () +testRatchetSyncSimultaneous t = do + alice <- getSMPAgentClient' agentCfg initAgentServers testDB + bob <- getSMPAgentClient' agentCfg initAgentServers testDB2 + (aliceId, bobId, bob2) <- withSmpServerStoreMsgLogOn t testPort $ \_ -> + setupDesynchronizedRatchet alice bob + + ("", "", DOWN _ _) <- nGet alice + ("", "", DOWN _ _) <- nGet bob2 + + ConnectionStats {ratchetSyncState = bRSS} <- runRight $ synchronizeRatchet bob2 aliceId False + liftIO $ bRSS `shouldBe` RSStarted + + ConnectionStats {ratchetSyncState = aRSS} <- runRight $ synchronizeRatchet alice bobId True + liftIO $ aRSS `shouldBe` RSStarted + + withSmpServerStoreMsgLogOn t testPort $ \_ -> do + runRight_ $ do + liftIO . getInAnyOrder alice $ + [ ratchetSyncP' bobId RSAgreed, + serverUpP + ] + + liftIO . getInAnyOrder bob2 $ + [ ratchetSyncP' aliceId RSAgreed, + serverUpP + ] + + get alice =##> ratchetSyncP bobId RSOk + + get bob2 =##> ratchetSyncP aliceId RSOk + + exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 makeConnection :: AgentClient -> AgentClient -> ExceptT AgentErrorType IO (ConnId, ConnId) makeConnection alice bob = makeConnectionForUsers alice 1 bob 1 @@ -1265,20 +1443,20 @@ testAbortSwitchStartedReinitiate servers = do withB :: (AgentClient -> IO a) -> IO a withB = withAgent agentCfg {initialClientId = 1} servers testDB2 -switchPhaseRcvP :: ConnId -> SwitchPhase -> [Maybe RcvSwitchStatus] -> AEntityTransmission 'AEConn -> Bool +switchPhaseRcvP :: ConnId -> SwitchPhase -> [Maybe RcvSwitchStatus] -> ATransmission 'Agent -> Bool switchPhaseRcvP cId sphase swchStatuses = switchPhaseP cId QDRcv sphase (\stats -> rcvSwchStatuses' stats == swchStatuses) -switchPhaseSndP :: ConnId -> SwitchPhase -> [Maybe SndSwitchStatus] -> AEntityTransmission 'AEConn -> Bool +switchPhaseSndP :: ConnId -> SwitchPhase -> [Maybe SndSwitchStatus] -> ATransmission 'Agent -> Bool switchPhaseSndP cId sphase swchStatuses = switchPhaseP cId QDSnd sphase (\stats -> sndSwchStatuses' stats == swchStatuses) -switchPhaseP :: ConnId -> QueueDirection -> SwitchPhase -> (ConnectionStats -> Bool) -> AEntityTransmission 'AEConn -> Bool +switchPhaseP :: ConnId -> QueueDirection -> SwitchPhase -> (ConnectionStats -> Bool) -> ATransmission 'Agent -> Bool switchPhaseP cId qd sphase statsP = \case - (_, cId', SWITCH qd' sphase' stats) -> cId' == cId && qd' == qd && sphase' == sphase && statsP stats + (_, cId', APC SAEConn (SWITCH qd' sphase' stats)) -> cId' == cId && qd' == qd && sphase' == sphase && statsP stats _ -> False -errQueueNotFoundP :: ConnId -> AEntityTransmission 'AEConn -> Bool +errQueueNotFoundP :: ConnId -> ATransmission 'Agent -> Bool errQueueNotFoundP cId = \case - (_, cId', ERR AGENT {agentErr = A_QUEUE {queueErr = "QKEY: queue address not found in connection"}}) -> cId' == cId + (_, cId', APC SAEConn (ERR AGENT {agentErr = A_QUEUE {queueErr = "QKEY: queue address not found in connection"}})) -> cId' == cId _ -> False testCannotAbortSwitchSecured :: HasCallStack => InitialAgentServers -> IO () @@ -1327,43 +1505,44 @@ testSwitch2Connections servers = do (aId2, bId2) <- makeConnection a b exchangeGreetingsMsgId 4 a bId2 b aId2 pure (aId1, bId1, aId2, bId2) - withA $ \a -> runRight_ $ do - void $ subscribeConnections a [bId1, bId2] + let withA' = sessionSubscribe withA [bId1, bId2] + withB' = sessionSubscribe withB [aId1, aId2] + withA' $ \a -> do stats1 <- switchConnectionAsync a "" bId1 liftIO $ rcvSwchStatuses' stats1 `shouldMatchList` [Just RSSwitchStarted] phaseRcv a bId1 SPStarted [Just RSSendingQADD, Nothing] stats2 <- switchConnectionAsync a "" bId2 liftIO $ rcvSwchStatuses' stats2 `shouldMatchList` [Just RSSwitchStarted] phaseRcv a bId2 SPStarted [Just RSSendingQADD, Nothing] - withA $ \a -> withB $ \b -> runRight_ $ do - void $ subscribeConnections a [bId1, bId2] - void $ subscribeConnections b [aId1, aId2] - + withB' $ \b -> do liftIO . getInAnyOrder b $ [ switchPhaseSndP aId1 SPStarted [Just SSSendingQKEY, Nothing], switchPhaseSndP aId1 SPConfirmed [Just SSSendingQKEY, Nothing], switchPhaseSndP aId2 SPStarted [Just SSSendingQKEY, Nothing], switchPhaseSndP aId2 SPConfirmed [Just SSSendingQKEY, Nothing] ] - + withA' $ \a -> do liftIO . getInAnyOrder a $ [ switchPhaseRcvP bId1 SPConfirmed [Just RSSendingQADD, Nothing], switchPhaseRcvP bId1 SPSecured [Just RSSendingQUSE, Nothing], switchPhaseRcvP bId2 SPConfirmed [Just RSSendingQADD, Nothing], switchPhaseRcvP bId2 SPSecured [Just RSSendingQUSE, Nothing] ] - + withB' $ \b -> do liftIO . getInAnyOrder b $ [ switchPhaseSndP aId1 SPSecured [Just SSSendingQTEST, Nothing], switchPhaseSndP aId1 SPCompleted [Nothing], switchPhaseSndP aId2 SPSecured [Just SSSendingQTEST, Nothing], switchPhaseSndP aId2 SPCompleted [Nothing] ] - + withA' $ \a -> do liftIO . getInAnyOrder a $ [ switchPhaseRcvP bId1 SPCompleted [Nothing], switchPhaseRcvP bId2 SPCompleted [Nothing] ] + withA $ \a -> withB $ \b -> runRight_ $ do + void $ subscribeConnections a [bId1, bId2] + void $ subscribeConnections b [aId1, aId2] exchangeGreetingsMsgId 10 a bId1 b aId1 exchangeGreetingsMsgId 10 a bId2 b aId2 @@ -1594,3 +1773,18 @@ exchangeGreetingsMsgId msgId alice bobId bob aliceId = do get bob ##> ("", aliceId, SENT msgId') get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False ackMessage alice bobId msgId' + +exchangeGreetingsMsgIds :: HasCallStack => AgentClient -> ConnId -> Int64 -> AgentClient -> ConnId -> Int64 -> ExceptT AgentErrorType IO () +exchangeGreetingsMsgIds alice bobId aliceMsgId bob aliceId bobMsgId = do + msgId1 <- sendMessage alice bobId SMP.noMsgFlags "hello" + liftIO $ msgId1 `shouldBe` aliceMsgId + get alice ##> ("", bobId, SENT aliceMsgId) + get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False + ackMessage bob aliceId bobMsgId + msgId2 <- sendMessage bob aliceId SMP.noMsgFlags "hello too" + let aliceMsgId' = aliceMsgId + 1 + bobMsgId' = bobMsgId + 1 + liftIO $ msgId2 `shouldBe` bobMsgId' + get bob ##> ("", aliceId, SENT bobMsgId') + get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False + ackMessage alice bobId aliceMsgId' diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 89b54face..3aa0721d4 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -140,7 +140,7 @@ testForeignKeysEnabled = `shouldThrow` (\e -> DB.sqlError e == DB.ErrorConstraint) cData1 :: ConnData -cData1 = ConnData {userId = 1, connId = "conn1", connAgentVersion = 1, enableNtfs = True, duplexHandshake = Nothing, deleted = False} +cData1 = ConnData {userId = 1, connId = "conn1", connAgentVersion = 1, enableNtfs = True, duplexHandshake = Nothing, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk} testPrivateSignKey :: C.APrivateSignKey testPrivateSignKey = C.APrivateSignKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe"