mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-28 04:05:17 +00:00
messages for queue rotation
This commit is contained in:
@@ -636,6 +636,10 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
|
||||
_ -> connError msgId NOT_ACCEPTED
|
||||
AM_REPLY_ -> notifyDel msgId $ ERR e
|
||||
AM_A_MSG_ -> notifyDel msgId $ MERR mId e
|
||||
AM_A_ADD_ -> pure ()
|
||||
AM_A_KEY_ -> pure ()
|
||||
AM_A_USE_ -> pure ()
|
||||
AM_A_DEL_ -> pure ()
|
||||
_
|
||||
-- for other operations BROKER HOST is treated as a permanent error (e.g., when connecting to the server),
|
||||
-- the message sending would be retried
|
||||
@@ -1024,6 +1028,10 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
|
||||
A_MSG body -> do
|
||||
logServer "<--" c srv rId "MSG <MSG>"
|
||||
notify $ MSG msgMeta msgFlags body
|
||||
A_ADD _ -> pure ()
|
||||
A_KEY _ _ -> pure ()
|
||||
A_USE _ _ -> pure ()
|
||||
A_DEL _ _ -> pure ()
|
||||
Right _ -> prohibited >> ack
|
||||
Left e@(AGENT A_DUPLICATE) -> do
|
||||
withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case
|
||||
|
||||
@@ -427,7 +427,16 @@ instance Encoding AgentMessage where
|
||||
'M' -> AgentMessage <$> smpP <*> smpP
|
||||
_ -> fail "bad AgentMessage"
|
||||
|
||||
data AgentMessageType = AM_CONN_INFO | AM_CONN_INFO_REPLY | AM_HELLO_ | AM_REPLY_ | AM_A_MSG_
|
||||
data AgentMessageType
|
||||
= AM_CONN_INFO
|
||||
| AM_CONN_INFO_REPLY
|
||||
| AM_HELLO_
|
||||
| AM_REPLY_
|
||||
| AM_A_MSG_
|
||||
| AM_A_ADD_
|
||||
| AM_A_KEY_
|
||||
| AM_A_USE_
|
||||
| AM_A_DEL_
|
||||
deriving (Eq, Show)
|
||||
|
||||
instance Encoding AgentMessageType where
|
||||
@@ -437,6 +446,10 @@ instance Encoding AgentMessageType where
|
||||
AM_HELLO_ -> "H"
|
||||
AM_REPLY_ -> "R"
|
||||
AM_A_MSG_ -> "M"
|
||||
AM_A_ADD_ -> "A"
|
||||
AM_A_KEY_ -> "K"
|
||||
AM_A_USE_ -> "U"
|
||||
AM_A_DEL_ -> "X"
|
||||
smpP =
|
||||
A.anyChar >>= \case
|
||||
'C' -> pure AM_CONN_INFO
|
||||
@@ -444,6 +457,10 @@ instance Encoding AgentMessageType where
|
||||
'H' -> pure AM_HELLO_
|
||||
'R' -> pure AM_REPLY_
|
||||
'M' -> pure AM_A_MSG_
|
||||
'A' -> pure AM_A_ADD_
|
||||
'K' -> pure AM_A_KEY_
|
||||
'U' -> pure AM_A_USE_
|
||||
'X' -> pure AM_A_DEL_
|
||||
_ -> fail "bad AgentMessageType"
|
||||
|
||||
agentMessageType :: AgentMessage -> AgentMessageType
|
||||
@@ -459,6 +476,10 @@ agentMessageType = \case
|
||||
-- REPLY is only used in v1
|
||||
REPLY _ -> AM_REPLY_
|
||||
A_MSG _ -> AM_A_MSG_
|
||||
A_ADD _ -> AM_A_ADD_
|
||||
A_KEY {} -> AM_A_KEY_
|
||||
A_USE {} -> AM_A_USE_
|
||||
A_DEL {} -> AM_A_DEL_
|
||||
|
||||
data APrivHeader = APrivHeader
|
||||
{ -- | sequential ID assigned by the sending agent
|
||||
@@ -473,7 +494,14 @@ instance Encoding APrivHeader where
|
||||
smpEncode (sndMsgId, prevMsgHash)
|
||||
smpP = APrivHeader <$> smpP <*> smpP
|
||||
|
||||
data AMsgType = HELLO_ | REPLY_ | A_MSG_
|
||||
data AMsgType
|
||||
= HELLO_
|
||||
| REPLY_
|
||||
| A_MSG_
|
||||
| A_ADD_
|
||||
| A_KEY_
|
||||
| A_USE_
|
||||
| A_DEL_
|
||||
deriving (Eq)
|
||||
|
||||
instance Encoding AMsgType where
|
||||
@@ -481,11 +509,19 @@ instance Encoding AMsgType where
|
||||
HELLO_ -> "H"
|
||||
REPLY_ -> "R"
|
||||
A_MSG_ -> "M"
|
||||
A_ADD_ -> "A"
|
||||
A_KEY_ -> "K"
|
||||
A_USE_ -> "U"
|
||||
A_DEL_ -> "X"
|
||||
smpP =
|
||||
smpP >>= \case
|
||||
'H' -> pure HELLO_
|
||||
'R' -> pure REPLY_
|
||||
'M' -> pure A_MSG_
|
||||
'A' -> pure A_ADD_
|
||||
'K' -> pure A_KEY_
|
||||
'U' -> pure A_USE_
|
||||
'X' -> pure A_DEL_
|
||||
_ -> fail "bad AMsgType"
|
||||
|
||||
-- | Messages sent between SMP agents once SMP queue is secured.
|
||||
@@ -498,6 +534,14 @@ data AMessage
|
||||
REPLY (L.NonEmpty SMPQueueInfo)
|
||||
| -- | agent envelope for the client message
|
||||
A_MSG MsgBody
|
||||
| -- add queue to connection
|
||||
A_ADD SMPQueueInfo
|
||||
| -- key to secure the added queue and agree e2e encryption key
|
||||
A_KEY SMPQueueInfo SndPublicVerifyKey
|
||||
| -- inform that the queue is ready to use
|
||||
A_USE SMPServer SMP.SenderId
|
||||
| -- inform that the queue will be deleted
|
||||
A_DEL SMPServer SMP.SenderId
|
||||
deriving (Show)
|
||||
|
||||
instance Encoding AMessage where
|
||||
@@ -505,12 +549,20 @@ instance Encoding AMessage where
|
||||
HELLO -> smpEncode HELLO_
|
||||
REPLY smpQueues -> smpEncode (REPLY_, smpQueues)
|
||||
A_MSG body -> smpEncode (A_MSG_, Tail body)
|
||||
A_ADD qInfo -> smpEncode (A_ADD_, qInfo)
|
||||
A_KEY qInfo sKey -> smpEncode (A_KEY_, qInfo, sKey)
|
||||
A_USE srv sndId -> smpEncode (A_USE_, srv, sndId)
|
||||
A_DEL srv sndId -> smpEncode (A_DEL_, srv, sndId)
|
||||
smpP =
|
||||
smpP
|
||||
>>= \case
|
||||
HELLO_ -> pure HELLO
|
||||
REPLY_ -> REPLY <$> smpP
|
||||
A_MSG_ -> A_MSG . unTail <$> smpP
|
||||
A_ADD_ -> A_ADD <$> smpP
|
||||
A_KEY_ -> A_KEY <$> smpP <*> smpP
|
||||
A_USE_ -> A_USE <$> smpP <*> smpP
|
||||
A_DEL_ -> A_DEL <$> smpP <*> smpP
|
||||
|
||||
instance forall m. ConnectionModeI m => StrEncoding (ConnectionRequestUri m) where
|
||||
strEncode = \case
|
||||
|
||||
Reference in New Issue
Block a user