mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-26 03:14:53 +00:00
add QTEST to the rotation protocol
This commit is contained in:
@@ -22,9 +22,11 @@ Additional agent messages required for the protocol:
|
||||
|
||||
`QREADY`: instruct the sender that the new is ready to use with sender's queue address as parameter, encoded as `QR` - sender will send HELLO to the new queue.
|
||||
|
||||
`QHELLO`: sender sends to the new queue to confirm it's working, encoded as `QH`
|
||||
`QTEST`: sender sends to the new queue to confirm it's working, encoded as `QT`
|
||||
|
||||
`QSWITCH`: instruct the sender to use the new queue with sender's queue ID as parameter, encoded as `QR` - sent after receiving `QHELLO`.
|
||||
`QSWITCH`: instruct the sender to use the new queue with sender's queue ID as parameter, encoded as `QS` - sent after receiving `QTEST`.
|
||||
|
||||
`QHELLO`: sender sends to the new queue to confirm switch was successful - all new messages after this message will be sent to the new queue. Encoded as `QH`
|
||||
|
||||
### Protocol
|
||||
|
||||
@@ -41,12 +43,12 @@ B ->> R ->> A: QKEYS (R'): sender's key for the new queue (to avoid the race of
|
||||
B ->> R ->> A: continue sending new messages to the old queue
|
||||
A ->> R': secure queue
|
||||
A ->> S ->> B: QREADY (R'): notify sender that the queue is secured
|
||||
B ->> R' ->> A: QHELLO: to validate that the sender can send messages to the new queue before switching to it
|
||||
B ->> R' ->> A: QTEST: to validate that the sender can send messages to the new queue before switching to it
|
||||
A ->> S ->> B: QSWITCH (R'): instruction to start using the new queue
|
||||
B ->> R' ->> A: QHELLO: to confirm that the delivery is now switched to the new queue
|
||||
B ->> R' ->> A: the first message received to the new queue before the old one is drained and deleted should not be processed, it should be stored in the agent memory (and not acknowledged) and only processed once the old queue is drained.
|
||||
A ->> R: suspend queue, receive all messages
|
||||
A ->> R: delete queue
|
||||
B ->> R' ->> A: once sending fails with AUTH error, start sending new (and any undelivered) messages to the new queue
|
||||
```
|
||||
|
||||
It will also require extending SMP protocol:
|
||||
|
||||
@@ -699,6 +699,12 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
|
||||
SMP SMP.QUOTA -> case msgType of
|
||||
AM_CONN_INFO -> connError msgId NOT_AVAILABLE
|
||||
AM_CONN_INFO_REPLY -> connError msgId NOT_AVAILABLE
|
||||
AM_QTEST_ -> do
|
||||
-- cancel switching, delete new send queue
|
||||
pure ()
|
||||
AM_QHELLO_ -> do
|
||||
-- cancel switching, delete new send queue
|
||||
pure ()
|
||||
_ -> retrySending loop
|
||||
SMP SMP.AUTH -> case msgType of
|
||||
AM_CONN_INFO -> connError msgId NOT_AVAILABLE
|
||||
@@ -722,10 +728,13 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
|
||||
-- TODO new send queue status = Confirmed
|
||||
pure ()
|
||||
AM_QREADY_ -> pure ()
|
||||
AM_QHELLO_ -> do
|
||||
AM_QTEST_ -> do
|
||||
-- cancel switching, delete new send queue
|
||||
pure ()
|
||||
AM_QSWITCH_ -> pure ()
|
||||
AM_QHELLO_ -> do
|
||||
-- cancel switching, delete new send queue
|
||||
pure ()
|
||||
_
|
||||
-- for other operations BROKER HOST is treated as a permanent error (e.g., when connecting to the server),
|
||||
-- the message sending would be retried
|
||||
@@ -1136,8 +1145,9 @@ processSMPTransmission c@AgentClient {smpClients, subQ} transmission@(srv, v, se
|
||||
QNEW currAddr nextQUri -> rqNewMsg currAddr nextQUri >> ackDelete msgId
|
||||
QKEYS sKey nextQInfo -> rqKeys sKey nextQInfo $ ackDelete msgId
|
||||
QREADY addr -> rqReady addr >> ackDelete msgId
|
||||
QHELLO -> rqHello $ ackDelete msgId
|
||||
QTEST -> rqTest >> ackDelete msgId
|
||||
QSWITCH addr -> rqSwitch addr >> ackDelete msgId
|
||||
QHELLO -> rqHello $ ackDelete msgId
|
||||
Right _ -> prohibited >> ack
|
||||
Left e@(AGENT A_DUPLICATE) -> do
|
||||
withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case
|
||||
@@ -1361,23 +1371,18 @@ processSMPTransmission c@AgentClient {smpClients, subQ} transmission@(srv, v, se
|
||||
withStore' c (`getNextSndQueue` dbNextSndQueueId sq) >>= \case
|
||||
Just sq'@SndQueue {server, sndId} -> do
|
||||
unless (smpServer == server && senderId == sndId) . throwError $ INTERNAL "incorrect queue address"
|
||||
void $ enqueueMessage c cData sq' SMP.noMsgFlags QHELLO
|
||||
void $ enqueueMessage c cData sq' SMP.noMsgFlags QTEST
|
||||
_ -> throwError $ INTERNAL "message can only be sent during rotation"
|
||||
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"
|
||||
|
||||
-- processed by queue recipient, received from the new queue
|
||||
rqHello :: m () -> m ()
|
||||
rqHello ackDelete = do
|
||||
rqTest :: m ()
|
||||
rqTest = do
|
||||
when currRcvQueue . throwError $ INTERNAL "message can only be sent to the next queue"
|
||||
case conn of
|
||||
DuplexConnection _ currRq sq -> do
|
||||
DuplexConnection _ _ sq -> do
|
||||
let RcvQueue {server, sndId} = rq
|
||||
void . enqueueMessage c cData sq SMP.noMsgFlags $ QSWITCH (server, sndId)
|
||||
withStore' c $ \db -> do
|
||||
setRcvQueueStatus db rq Active
|
||||
setRcvQueueAction db currRq $ Just RQASuspendCurrQueue
|
||||
ackDelete
|
||||
suspendCurrRcvQueue c cData currRq sq rq
|
||||
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"
|
||||
|
||||
-- processed by queue sender
|
||||
@@ -1394,6 +1399,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} transmission@(srv, v, se
|
||||
atomically (switchDeliveries qKey' qKey)
|
||||
throwError e
|
||||
unless ok $ throwError $ INTERNAL "switching snd queue failed in STM"
|
||||
void $ enqueueMessage c cData sq' SMP.noMsgFlags QHELLO
|
||||
where
|
||||
switchQueues :: MsgDeliveryKey -> MsgDeliveryKey -> m Bool
|
||||
switchQueues k k' = withStore' c $ \db -> do
|
||||
@@ -1412,6 +1418,19 @@ processSMPTransmission c@AgentClient {smpClients, subQ} transmission@(srv, v, se
|
||||
_ -> throwError $ INTERNAL "message can only be sent during rotation"
|
||||
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"
|
||||
|
||||
-- processed by queue recipient, received from the new queue
|
||||
rqHello :: m () -> m ()
|
||||
rqHello ackDelete = do
|
||||
when currRcvQueue . throwError $ INTERNAL "message can only be sent to the next queue"
|
||||
case conn of
|
||||
DuplexConnection _ currRq sq -> do
|
||||
withStore' c $ \db -> do
|
||||
setRcvQueueStatus db rq Active
|
||||
setRcvQueueAction db currRq $ Just RQASuspendCurrQueue
|
||||
ackDelete
|
||||
suspendCurrRcvQueue c cData currRq sq rq
|
||||
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"
|
||||
|
||||
smpInvitation :: ConnectionRequestUri 'CMInvitation -> ConnInfo -> m ()
|
||||
smpInvitation connReq@(CRInvitationUri crData _) cInfo = do
|
||||
logServer "<--" c srv rId "MSG <KEY>"
|
||||
|
||||
@@ -460,8 +460,9 @@ data AgentMessageType
|
||||
| AM_QNEW_
|
||||
| AM_QKEYS_
|
||||
| AM_QREADY_
|
||||
| AM_QHELLO_
|
||||
| AM_QTEST_
|
||||
| AM_QSWITCH_
|
||||
| AM_QHELLO_
|
||||
deriving (Eq, Show)
|
||||
|
||||
instance Encoding AgentMessageType where
|
||||
@@ -474,8 +475,9 @@ instance Encoding AgentMessageType where
|
||||
AM_QNEW_ -> "QN"
|
||||
AM_QKEYS_ -> "QK"
|
||||
AM_QREADY_ -> "QR"
|
||||
AM_QHELLO_ -> "QH"
|
||||
AM_QTEST_ -> "QT"
|
||||
AM_QSWITCH_ -> "QS"
|
||||
AM_QHELLO_ -> "QH"
|
||||
smpP =
|
||||
A.anyChar >>= \case
|
||||
'C' -> pure AM_CONN_INFO
|
||||
@@ -488,8 +490,9 @@ instance Encoding AgentMessageType where
|
||||
'N' -> pure AM_QNEW_
|
||||
'K' -> pure AM_QKEYS_
|
||||
'R' -> pure AM_QREADY_
|
||||
'H' -> pure AM_QHELLO_
|
||||
'T' -> pure AM_QTEST_
|
||||
'S' -> pure AM_QSWITCH_
|
||||
'H' -> pure AM_QHELLO_
|
||||
_ -> fail "bad AgentMessageType"
|
||||
_ -> fail "bad AgentMessageType"
|
||||
|
||||
@@ -509,8 +512,9 @@ agentMessageType = \case
|
||||
QNEW {} -> AM_QNEW_
|
||||
QKEYS {} -> AM_QKEYS_
|
||||
QREADY {} -> AM_QREADY_
|
||||
QHELLO -> AM_QHELLO_
|
||||
QTEST -> AM_QTEST_
|
||||
QSWITCH {} -> AM_QSWITCH_
|
||||
QHELLO -> AM_QHELLO_
|
||||
|
||||
data APrivHeader = APrivHeader
|
||||
{ -- | sequential ID assigned by the sending agent
|
||||
@@ -532,8 +536,9 @@ data AMsgType
|
||||
| QNEW_
|
||||
| QKEYS_
|
||||
| QREADY_
|
||||
| QHELLO_
|
||||
| QTEST_
|
||||
| QSWITCH_
|
||||
| QHELLO_
|
||||
deriving (Eq)
|
||||
|
||||
instance Encoding AMsgType where
|
||||
@@ -544,8 +549,9 @@ instance Encoding AMsgType where
|
||||
QNEW_ -> "QN"
|
||||
QKEYS_ -> "QK"
|
||||
QREADY_ -> "QR"
|
||||
QHELLO_ -> "QH"
|
||||
QTEST_ -> "QT"
|
||||
QSWITCH_ -> "QS"
|
||||
QHELLO_ -> "QH"
|
||||
smpP =
|
||||
A.anyChar >>= \case
|
||||
'H' -> pure HELLO_
|
||||
@@ -556,8 +562,9 @@ instance Encoding AMsgType where
|
||||
'N' -> pure QNEW_
|
||||
'K' -> pure QKEYS_
|
||||
'R' -> pure QREADY_
|
||||
'H' -> pure QHELLO_
|
||||
'T' -> pure QTEST_
|
||||
'S' -> pure QSWITCH_
|
||||
'H' -> pure QHELLO_
|
||||
_ -> fail "bad AMsgType"
|
||||
_ -> fail "bad AMsgType"
|
||||
|
||||
@@ -571,16 +578,18 @@ data AMessage
|
||||
REPLY (L.NonEmpty SMPQueueInfo)
|
||||
| -- | agent envelope for the client message
|
||||
A_MSG MsgBody
|
||||
| -- instruct sender to switch the queue to another
|
||||
| -- | instruct sender to switch the queue to another
|
||||
QNEW {currentAddress :: (SMPServer, SMP.SenderId), nextQueueUri :: SMPQueueUri}
|
||||
| -- send server key and queue e2e DH key to the recipient
|
||||
| -- | send server key and queue e2e DH key to the recipient
|
||||
QKEYS {nextSenderKey :: SndPublicVerifyKey, nextQueueInfo :: SMPQueueInfo}
|
||||
| -- inform the sender that the queue is ready to use - sender sends QHELLO to it
|
||||
| -- | inform the sender that the queue is ready to use - sender sends QHELLO to it
|
||||
QREADY {nextAddress :: (SMPServer, SMP.SenderId)}
|
||||
| -- the first message sent by the sender to the new queue
|
||||
QHELLO
|
||||
| -- instruct the sender to start sending messages to the new queue - after recipient receives HELLO
|
||||
| -- | the message sent by the sender to the new queue to test delivery
|
||||
QTEST
|
||||
| -- | instruct the sender to start sending messages to the new queue - after recipient receives HELLO
|
||||
QSWITCH {nextAddress :: (SMPServer, SMP.SenderId)}
|
||||
| -- | confirm that the delivery is switched, all new messages will be sent to the new queue
|
||||
QHELLO
|
||||
deriving (Show)
|
||||
|
||||
instance Encoding AMessage where
|
||||
@@ -591,8 +600,9 @@ instance Encoding AMessage where
|
||||
QNEW currAddr nextQUri -> smpEncode (QNEW_, currAddr, strEncode nextQUri)
|
||||
QKEYS sKey nextQInfo -> smpEncode (QKEYS_, sKey, nextQInfo)
|
||||
QREADY addr -> smpEncode (QREADY_, addr)
|
||||
QHELLO -> smpEncode QHELLO_
|
||||
QTEST -> smpEncode QTEST_
|
||||
QSWITCH addr -> smpEncode (QSWITCH_, addr)
|
||||
QHELLO -> smpEncode QHELLO_
|
||||
smpP =
|
||||
smpP
|
||||
>>= \case
|
||||
@@ -605,8 +615,9 @@ instance Encoding AMessage where
|
||||
pure QNEW {currentAddress, nextQueueUri}
|
||||
QKEYS_ -> QKEYS <$> smpP <*> smpP
|
||||
QREADY_ -> QREADY <$> smpP
|
||||
QHELLO_ -> pure QHELLO
|
||||
QTEST_ -> pure QTEST
|
||||
QSWITCH_ -> QSWITCH <$> smpP
|
||||
QHELLO_ -> pure QHELLO
|
||||
|
||||
instance forall m. ConnectionModeI m => StrEncoding (ConnectionRequestUri m) where
|
||||
strEncode = \case
|
||||
|
||||
Reference in New Issue
Block a user