diff --git a/rfcs/2022-08-14-queue-rotation.md b/rfcs/2022-08-14-queue-rotation.md index e794ec5c4..787bb9f9a 100644 --- a/rfcs/2022-08-14-queue-rotation.md +++ b/rfcs/2022-08-14-queue-rotation.md @@ -22,9 +22,11 @@ Additional agent messages required for the protocol: `QREADY`: instruct the sender that the new is ready to use with sender's queue address as parameter, encoded as `QR` - sender will send HELLO to the new queue. -`QHELLO`: sender sends to the new queue to confirm it's working, encoded as `QH` +`QTEST`: sender sends to the new queue to confirm it's working, encoded as `QT` -`QSWITCH`: instruct the sender to use the new queue with sender's queue ID as parameter, encoded as `QR` - sent after receiving `QHELLO`. +`QSWITCH`: instruct the sender to use the new queue with sender's queue ID as parameter, encoded as `QS` - sent after receiving `QTEST`. + +`QHELLO`: sender sends to the new queue to confirm switch was successful - all new messages after this message will be sent to the new queue. Encoded as `QH` ### Protocol @@ -41,12 +43,12 @@ B ->> R ->> A: QKEYS (R'): sender's key for the new queue (to avoid the race of B ->> R ->> A: continue sending new messages to the old queue A ->> R': secure queue A ->> S ->> B: QREADY (R'): notify sender that the queue is secured -B ->> R' ->> A: QHELLO: to validate that the sender can send messages to the new queue before switching to it +B ->> R' ->> A: QTEST: to validate that the sender can send messages to the new queue before switching to it A ->> S ->> B: QSWITCH (R'): instruction to start using the new queue +B ->> R' ->> A: QHELLO: to confirm that the delivery is now switched to the new queue B ->> R' ->> A: the first message received to the new queue before the old one is drained and deleted should not be processed, it should be stored in the agent memory (and not acknowledged) and only processed once the old queue is drained. A ->> R: suspend queue, receive all messages A ->> R: delete queue -B ->> R' ->> A: once sending fails with AUTH error, start sending new (and any undelivered) messages to the new queue ``` It will also require extending SMP protocol: diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 44fbb6fa3..f10e15206 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -699,6 +699,12 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh SMP SMP.QUOTA -> case msgType of AM_CONN_INFO -> connError msgId NOT_AVAILABLE AM_CONN_INFO_REPLY -> connError msgId NOT_AVAILABLE + AM_QTEST_ -> do + -- cancel switching, delete new send queue + pure () + AM_QHELLO_ -> do + -- cancel switching, delete new send queue + pure () _ -> retrySending loop SMP SMP.AUTH -> case msgType of AM_CONN_INFO -> connError msgId NOT_AVAILABLE @@ -722,10 +728,13 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh -- TODO new send queue status = Confirmed pure () AM_QREADY_ -> pure () - AM_QHELLO_ -> do + AM_QTEST_ -> do -- cancel switching, delete new send queue pure () AM_QSWITCH_ -> pure () + AM_QHELLO_ -> do + -- cancel switching, delete new send queue + pure () _ -- for other operations BROKER HOST is treated as a permanent error (e.g., when connecting to the server), -- the message sending would be retried @@ -1136,8 +1145,9 @@ processSMPTransmission c@AgentClient {smpClients, subQ} transmission@(srv, v, se QNEW currAddr nextQUri -> rqNewMsg currAddr nextQUri >> ackDelete msgId QKEYS sKey nextQInfo -> rqKeys sKey nextQInfo $ ackDelete msgId QREADY addr -> rqReady addr >> ackDelete msgId - QHELLO -> rqHello $ ackDelete msgId + QTEST -> rqTest >> ackDelete msgId QSWITCH addr -> rqSwitch addr >> ackDelete msgId + QHELLO -> rqHello $ ackDelete msgId Right _ -> prohibited >> ack Left e@(AGENT A_DUPLICATE) -> do withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case @@ -1361,23 +1371,18 @@ processSMPTransmission c@AgentClient {smpClients, subQ} transmission@(srv, v, se withStore' c (`getNextSndQueue` dbNextSndQueueId sq) >>= \case Just sq'@SndQueue {server, sndId} -> do unless (smpServer == server && senderId == sndId) . throwError $ INTERNAL "incorrect queue address" - void $ enqueueMessage c cData sq' SMP.noMsgFlags QHELLO + void $ enqueueMessage c cData sq' SMP.noMsgFlags QTEST _ -> throwError $ INTERNAL "message can only be sent during rotation" _ -> throwError $ INTERNAL "message can only be sent to duplex connection" -- processed by queue recipient, received from the new queue - rqHello :: m () -> m () - rqHello ackDelete = do + rqTest :: m () + rqTest = do when currRcvQueue . throwError $ INTERNAL "message can only be sent to the next queue" case conn of - DuplexConnection _ currRq sq -> do + DuplexConnection _ _ sq -> do let RcvQueue {server, sndId} = rq void . enqueueMessage c cData sq SMP.noMsgFlags $ QSWITCH (server, sndId) - withStore' c $ \db -> do - setRcvQueueStatus db rq Active - setRcvQueueAction db currRq $ Just RQASuspendCurrQueue - ackDelete - suspendCurrRcvQueue c cData currRq sq rq _ -> throwError $ INTERNAL "message can only be sent to duplex connection" -- processed by queue sender @@ -1394,6 +1399,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} transmission@(srv, v, se atomically (switchDeliveries qKey' qKey) throwError e unless ok $ throwError $ INTERNAL "switching snd queue failed in STM" + void $ enqueueMessage c cData sq' SMP.noMsgFlags QHELLO where switchQueues :: MsgDeliveryKey -> MsgDeliveryKey -> m Bool switchQueues k k' = withStore' c $ \db -> do @@ -1412,6 +1418,19 @@ processSMPTransmission c@AgentClient {smpClients, subQ} transmission@(srv, v, se _ -> throwError $ INTERNAL "message can only be sent during rotation" _ -> throwError $ INTERNAL "message can only be sent to duplex connection" + -- processed by queue recipient, received from the new queue + rqHello :: m () -> m () + rqHello ackDelete = do + when currRcvQueue . throwError $ INTERNAL "message can only be sent to the next queue" + case conn of + DuplexConnection _ currRq sq -> do + withStore' c $ \db -> do + setRcvQueueStatus db rq Active + setRcvQueueAction db currRq $ Just RQASuspendCurrQueue + ackDelete + suspendCurrRcvQueue c cData currRq sq rq + _ -> throwError $ INTERNAL "message can only be sent to duplex connection" + smpInvitation :: ConnectionRequestUri 'CMInvitation -> ConnInfo -> m () smpInvitation connReq@(CRInvitationUri crData _) cInfo = do logServer "<--" c srv rId "MSG " diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index f7a6b893d..5679f3a78 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -460,8 +460,9 @@ data AgentMessageType | AM_QNEW_ | AM_QKEYS_ | AM_QREADY_ - | AM_QHELLO_ + | AM_QTEST_ | AM_QSWITCH_ + | AM_QHELLO_ deriving (Eq, Show) instance Encoding AgentMessageType where @@ -474,8 +475,9 @@ instance Encoding AgentMessageType where AM_QNEW_ -> "QN" AM_QKEYS_ -> "QK" AM_QREADY_ -> "QR" - AM_QHELLO_ -> "QH" + AM_QTEST_ -> "QT" AM_QSWITCH_ -> "QS" + AM_QHELLO_ -> "QH" smpP = A.anyChar >>= \case 'C' -> pure AM_CONN_INFO @@ -488,8 +490,9 @@ instance Encoding AgentMessageType where 'N' -> pure AM_QNEW_ 'K' -> pure AM_QKEYS_ 'R' -> pure AM_QREADY_ - 'H' -> pure AM_QHELLO_ + 'T' -> pure AM_QTEST_ 'S' -> pure AM_QSWITCH_ + 'H' -> pure AM_QHELLO_ _ -> fail "bad AgentMessageType" _ -> fail "bad AgentMessageType" @@ -509,8 +512,9 @@ agentMessageType = \case QNEW {} -> AM_QNEW_ QKEYS {} -> AM_QKEYS_ QREADY {} -> AM_QREADY_ - QHELLO -> AM_QHELLO_ + QTEST -> AM_QTEST_ QSWITCH {} -> AM_QSWITCH_ + QHELLO -> AM_QHELLO_ data APrivHeader = APrivHeader { -- | sequential ID assigned by the sending agent @@ -532,8 +536,9 @@ data AMsgType | QNEW_ | QKEYS_ | QREADY_ - | QHELLO_ + | QTEST_ | QSWITCH_ + | QHELLO_ deriving (Eq) instance Encoding AMsgType where @@ -544,8 +549,9 @@ instance Encoding AMsgType where QNEW_ -> "QN" QKEYS_ -> "QK" QREADY_ -> "QR" - QHELLO_ -> "QH" + QTEST_ -> "QT" QSWITCH_ -> "QS" + QHELLO_ -> "QH" smpP = A.anyChar >>= \case 'H' -> pure HELLO_ @@ -556,8 +562,9 @@ instance Encoding AMsgType where 'N' -> pure QNEW_ 'K' -> pure QKEYS_ 'R' -> pure QREADY_ - 'H' -> pure QHELLO_ + 'T' -> pure QTEST_ 'S' -> pure QSWITCH_ + 'H' -> pure QHELLO_ _ -> fail "bad AMsgType" _ -> fail "bad AMsgType" @@ -571,16 +578,18 @@ data AMessage REPLY (L.NonEmpty SMPQueueInfo) | -- | agent envelope for the client message A_MSG MsgBody - | -- instruct sender to switch the queue to another + | -- | instruct sender to switch the queue to another QNEW {currentAddress :: (SMPServer, SMP.SenderId), nextQueueUri :: SMPQueueUri} - | -- send server key and queue e2e DH key to the recipient + | -- | send server key and queue e2e DH key to the recipient QKEYS {nextSenderKey :: SndPublicVerifyKey, nextQueueInfo :: SMPQueueInfo} - | -- inform the sender that the queue is ready to use - sender sends QHELLO to it + | -- | inform the sender that the queue is ready to use - sender sends QHELLO to it QREADY {nextAddress :: (SMPServer, SMP.SenderId)} - | -- the first message sent by the sender to the new queue - QHELLO - | -- instruct the sender to start sending messages to the new queue - after recipient receives HELLO + | -- | the message sent by the sender to the new queue to test delivery + QTEST + | -- | instruct the sender to start sending messages to the new queue - after recipient receives HELLO QSWITCH {nextAddress :: (SMPServer, SMP.SenderId)} + | -- | confirm that the delivery is switched, all new messages will be sent to the new queue + QHELLO deriving (Show) instance Encoding AMessage where @@ -591,8 +600,9 @@ instance Encoding AMessage where QNEW currAddr nextQUri -> smpEncode (QNEW_, currAddr, strEncode nextQUri) QKEYS sKey nextQInfo -> smpEncode (QKEYS_, sKey, nextQInfo) QREADY addr -> smpEncode (QREADY_, addr) - QHELLO -> smpEncode QHELLO_ + QTEST -> smpEncode QTEST_ QSWITCH addr -> smpEncode (QSWITCH_, addr) + QHELLO -> smpEncode QHELLO_ smpP = smpP >>= \case @@ -605,8 +615,9 @@ instance Encoding AMessage where pure QNEW {currentAddress, nextQueueUri} QKEYS_ -> QKEYS <$> smpP <*> smpP QREADY_ -> QREADY <$> smpP - QHELLO_ -> pure QHELLO + QTEST_ -> pure QTEST QSWITCH_ -> QSWITCH <$> smpP + QHELLO_ -> pure QHELLO instance forall m. ConnectionModeI m => StrEncoding (ConnectionRequestUri m) where strEncode = \case