mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-17 11:16:17 +00:00
update sub-protocol for queue rotation
This commit is contained in:
@@ -16,11 +16,13 @@ The proposed approach separates queue rotation from any queue redundancy that ma
|
||||
|
||||
Additional agent messages required for the protocol:
|
||||
|
||||
`SWITCH`: notify the sender that the queue has to be rotated to the new one, includes the address (server, sender ID and DH key) of the new queue. Encoded as `S`.
|
||||
`QNEW`: notify the sender that the queue has to be rotated to the new one, includes the address (server, sender ID and DH key) of the new queue. Encoded as `QN`.
|
||||
|
||||
`KEYS`: pass sender's server key and DH key via existing connection (SMP confirmation message will not be used, to avoid the same "race" of the initial key exchange that would create the risk of intercepting the queue for the attacker), encoded as `K`.
|
||||
`QKEYS`: pass sender's server key and DH key via existing connection (SMP confirmation message will not be used, to avoid the same "race" of the initial key exchange that would create the risk of intercepting the queue for the attacker), encoded as `QK`.
|
||||
|
||||
`USE`: instruct the sender to use the new queue with sender's queue ID as parameter, encoded as `U`.
|
||||
`QREADY`: instruct the sender that the new is ready to use with sender's queue address as parameter, encoded as `QR` - sender will send HELLO to the new queue.
|
||||
|
||||
`QSWITCH`: instruct the sender to use the new queue with sender's queue ID as parameter, encoded as `QR` - sent after receiving HELLO.
|
||||
|
||||
### Protocol
|
||||
|
||||
@@ -32,12 +34,13 @@ participant S as Server that has A's send queue (B's receive queue)
|
||||
participant R' as Server that hosts the new A's receive queue
|
||||
|
||||
A ->> R': create new queue
|
||||
A ->> S ->> B: SWITCH (R'): address of the new queue
|
||||
B ->> R ->> A: KEYS (R'): sender's key for the new queue (to avoid the race of SMP confirmation for the initial exchange)
|
||||
A ->> S ->> B: QNEW (R'): address of the new queue
|
||||
B ->> R ->> A: QKEYS (R'): sender's key for the new queue (to avoid the race of SMP confirmation for the initial exchange)
|
||||
B ->> R ->> A: continue sending new messages to the old queue
|
||||
A ->> R': secure queue
|
||||
A ->> S ->> B: USE (R'): instruction to use new queue
|
||||
A ->> S ->> B: QREADY (R'): instruction to use new queue
|
||||
B ->> R' ->> A: HELLO: to validate that the sender can send messages to the new queue before switching to it
|
||||
A ->> S ->> B: QSWITCH (R'): instruction to start using the new queue
|
||||
B ->> R' ->> A: the first message received to the new queue before the old one is drained and deleted should not be processed, it should be stored in the agent memory (and not acknowledged) and only processed once the old queue is drained.
|
||||
A ->> R: suspend queue, receive all messages
|
||||
A ->> R: delete queue
|
||||
|
||||
@@ -642,9 +642,10 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
|
||||
_ -> connError msgId NOT_ACCEPTED
|
||||
AM_REPLY_ -> notifyDel msgId $ ERR e
|
||||
AM_A_MSG_ -> notifyDel msgId $ MERR mId e
|
||||
AM_SWITCH_ -> pure ()
|
||||
AM_KEYS_ -> pure ()
|
||||
AM_USE_ -> pure ()
|
||||
AM_QNEW_ -> pure ()
|
||||
AM_QKEYS_ -> pure ()
|
||||
AM_QREADY_ -> pure ()
|
||||
AM_QSWITCH_ -> pure ()
|
||||
_
|
||||
-- for other operations BROKER HOST is treated as a permanent error (e.g., when connecting to the server),
|
||||
-- the message sending would be retried
|
||||
@@ -721,15 +722,24 @@ ackMessage' c connId msgId = do
|
||||
withStore' c $ \db -> deleteMsg db connId mId
|
||||
|
||||
-- | Switch connection to the new receive queue
|
||||
switchConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
|
||||
switchConnection' :: AgentMonad m => AgentClient -> ConnId -> m ()
|
||||
switchConnection' c connId =
|
||||
withStore c (`getConn` connId) >>= \case
|
||||
SomeConn _ (DuplexConnection _ rq _) -> switchQueue rq
|
||||
SomeConn _ (DuplexConnection _ _rq _) -> do
|
||||
-- rcv_queue_action = RQACreateNewQueue
|
||||
-- createNewRcvQueue rq
|
||||
pure ()
|
||||
SomeConn _ SndConnection {} -> throwError $ CONN SIMPLEX
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
where
|
||||
switchQueue :: RcvQueue -> m ()
|
||||
switchQueue _rq = pure ()
|
||||
|
||||
createNewRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> m ()
|
||||
createNewRcvQueue _c _rq = do
|
||||
-- unless new rcv queue exists
|
||||
-- then newRcvQueue
|
||||
-- store to the database
|
||||
-- enqueueSwitchMessage
|
||||
-- rcv_queue_action = null
|
||||
pure ()
|
||||
|
||||
-- | Suspend SMP agent connection (OFF command) in Reader monad
|
||||
suspendConnection' :: AgentMonad m => AgentClient -> ConnId -> m Word16
|
||||
@@ -1046,9 +1056,10 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
|
||||
A_MSG body -> do
|
||||
logServer "<--" c srv rId "MSG <MSG>"
|
||||
notify $ MSG msgMeta msgFlags body
|
||||
SWITCH _ _ -> pure ()
|
||||
KEYS _ _ -> pure ()
|
||||
USE _ -> pure ()
|
||||
QNEW _ _ -> pure ()
|
||||
QKEYS _ _ -> pure ()
|
||||
QREADY _ -> pure ()
|
||||
QSWITCH _ -> pure ()
|
||||
Right _ -> prohibited >> ack
|
||||
Left e@(AGENT A_DUPLICATE) -> do
|
||||
withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case
|
||||
|
||||
@@ -433,9 +433,10 @@ data AgentMessageType
|
||||
| AM_HELLO_
|
||||
| AM_REPLY_
|
||||
| AM_A_MSG_
|
||||
| AM_SWITCH_
|
||||
| AM_KEYS_
|
||||
| AM_USE_
|
||||
| AM_QNEW_
|
||||
| AM_QKEYS_
|
||||
| AM_QREADY_
|
||||
| AM_QSWITCH_
|
||||
deriving (Eq, Show)
|
||||
|
||||
instance Encoding AgentMessageType where
|
||||
@@ -445,9 +446,10 @@ instance Encoding AgentMessageType where
|
||||
AM_HELLO_ -> "H"
|
||||
AM_REPLY_ -> "R"
|
||||
AM_A_MSG_ -> "M"
|
||||
AM_SWITCH_ -> "S"
|
||||
AM_KEYS_ -> "K"
|
||||
AM_USE_ -> "U"
|
||||
AM_QNEW_ -> "QN"
|
||||
AM_QKEYS_ -> "QK"
|
||||
AM_QREADY_ -> "QR"
|
||||
AM_QSWITCH_ -> "QS"
|
||||
smpP =
|
||||
A.anyChar >>= \case
|
||||
'C' -> pure AM_CONN_INFO
|
||||
@@ -455,9 +457,13 @@ instance Encoding AgentMessageType where
|
||||
'H' -> pure AM_HELLO_
|
||||
'R' -> pure AM_REPLY_
|
||||
'M' -> pure AM_A_MSG_
|
||||
'S' -> pure AM_SWITCH_
|
||||
'K' -> pure AM_KEYS_
|
||||
'U' -> pure AM_USE_
|
||||
'Q' ->
|
||||
A.anyChar >>= \case
|
||||
'N' -> pure AM_QNEW_
|
||||
'K' -> pure AM_QKEYS_
|
||||
'R' -> pure AM_QREADY_
|
||||
'S' -> pure AM_QSWITCH_
|
||||
_ -> fail "bad AgentMessageType"
|
||||
_ -> fail "bad AgentMessageType"
|
||||
|
||||
agentMessageType :: AgentMessage -> AgentMessageType
|
||||
@@ -473,9 +479,10 @@ agentMessageType = \case
|
||||
-- REPLY is only used in v1
|
||||
REPLY _ -> AM_REPLY_
|
||||
A_MSG _ -> AM_A_MSG_
|
||||
SWITCH {} -> AM_SWITCH_
|
||||
KEYS {} -> AM_KEYS_
|
||||
USE {} -> AM_USE_
|
||||
QNEW {} -> AM_QNEW_
|
||||
QKEYS {} -> AM_QKEYS_
|
||||
QREADY {} -> AM_QREADY_
|
||||
QSWITCH {} -> AM_QSWITCH_
|
||||
|
||||
data APrivHeader = APrivHeader
|
||||
{ -- | sequential ID assigned by the sending agent
|
||||
@@ -494,9 +501,10 @@ data AMsgType
|
||||
= HELLO_
|
||||
| REPLY_
|
||||
| A_MSG_
|
||||
| SWITCH_
|
||||
| KEYS_
|
||||
| USE_
|
||||
| QNEW_
|
||||
| QKEYS_
|
||||
| QREADY_
|
||||
| QSWITCH_
|
||||
deriving (Eq)
|
||||
|
||||
instance Encoding AMsgType where
|
||||
@@ -504,17 +512,22 @@ instance Encoding AMsgType where
|
||||
HELLO_ -> "H"
|
||||
REPLY_ -> "R"
|
||||
A_MSG_ -> "M"
|
||||
SWITCH_ -> "S"
|
||||
KEYS_ -> "K"
|
||||
USE_ -> "U"
|
||||
QNEW_ -> "QN"
|
||||
QKEYS_ -> "QK"
|
||||
QREADY_ -> "QR"
|
||||
QSWITCH_ -> "QS"
|
||||
smpP =
|
||||
smpP >>= \case
|
||||
'H' -> pure HELLO_
|
||||
'R' -> pure REPLY_
|
||||
'M' -> pure A_MSG_
|
||||
'S' -> pure SWITCH_
|
||||
'K' -> pure KEYS_
|
||||
'U' -> pure USE_
|
||||
'Q' ->
|
||||
smpP >>= \case
|
||||
'N' -> pure QNEW_
|
||||
'K' -> pure QKEYS_
|
||||
'R' -> pure QREADY_
|
||||
'S' -> pure QSWITCH_
|
||||
_ -> fail "bad AMsgType"
|
||||
_ -> fail "bad AMsgType"
|
||||
|
||||
-- | Messages sent between SMP agents once SMP queue is secured.
|
||||
@@ -527,12 +540,14 @@ data AMessage
|
||||
REPLY (L.NonEmpty SMPQueueInfo)
|
||||
| -- | agent envelope for the client message
|
||||
A_MSG MsgBody
|
||||
| -- switch the queue to another
|
||||
SWITCH (SMPServer, SMP.SenderId) SMPQueueInfo
|
||||
| -- server key to secure the new queue and DH key to agree e2e encryption
|
||||
KEYS SndPublicVerifyKey SMPQueueInfo
|
||||
| -- instruct sender to switch the queue to another
|
||||
QNEW (SMPServer, SMP.SenderId) SMPQueueInfo
|
||||
| -- send to the recipient server key to secure the new queue and DH key to agree e2e encryption
|
||||
QKEYS SndPublicVerifyKey SMPQueueInfo
|
||||
| -- inform the sender that the queue is ready to use
|
||||
USE (SMPServer, SMP.SenderId)
|
||||
QREADY (SMPServer, SMP.SenderId)
|
||||
| -- inform the sender that the queue is ready to use
|
||||
QSWITCH (SMPServer, SMP.SenderId)
|
||||
deriving (Show)
|
||||
|
||||
instance Encoding AMessage where
|
||||
@@ -540,18 +555,20 @@ instance Encoding AMessage where
|
||||
HELLO -> smpEncode HELLO_
|
||||
REPLY smpQueues -> smpEncode (REPLY_, smpQueues)
|
||||
A_MSG body -> smpEncode (A_MSG_, Tail body)
|
||||
SWITCH addr qInfo -> smpEncode (SWITCH_, addr, qInfo)
|
||||
KEYS qInfo sKey -> smpEncode (KEYS_, qInfo, sKey)
|
||||
USE addr -> smpEncode (USE_, addr)
|
||||
QNEW addr qInfo -> smpEncode (QNEW_, addr, qInfo)
|
||||
QKEYS qInfo sKey -> smpEncode (QKEYS_, qInfo, sKey)
|
||||
QREADY addr -> smpEncode (QREADY_, addr)
|
||||
QSWITCH addr -> smpEncode (QSWITCH_, addr)
|
||||
smpP =
|
||||
smpP
|
||||
>>= \case
|
||||
HELLO_ -> pure HELLO
|
||||
REPLY_ -> REPLY <$> smpP
|
||||
A_MSG_ -> A_MSG . unTail <$> smpP
|
||||
SWITCH_ -> SWITCH <$> smpP <*> smpP
|
||||
KEYS_ -> KEYS <$> smpP <*> smpP
|
||||
USE_ -> USE <$> smpP
|
||||
QNEW_ -> QNEW <$> smpP <*> smpP
|
||||
QKEYS_ -> QKEYS <$> smpP <*> smpP
|
||||
QREADY_ -> QREADY <$> smpP
|
||||
QSWITCH_ -> QSWITCH <$> smpP
|
||||
|
||||
instance forall m. ConnectionModeI m => StrEncoding (ConnectionRequestUri m) where
|
||||
strEncode = \case
|
||||
|
||||
Reference in New Issue
Block a user