mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 20:45:52 +00:00
refactor message meta to MsgMeta type (#164)
This commit is contained in:
committed by
GitHub
parent
7af7272635
commit
e4d9b481ec
@@ -534,34 +534,18 @@ processSMPTransmission c@AgentClient {subQ} (srv, rId, cmd) = do
|
||||
sendConMsg toConn reConn = atomically $ writeTBQueue subQ ("", toConn, ICON reConn)
|
||||
|
||||
agentClientMsg :: PrevRcvMsgHash -> (ExternalSndId, ExternalSndTs) -> (BrokerId, BrokerTs) -> MsgBody -> MsgHash -> m ()
|
||||
agentClientMsg receivedPrevMsgHash senderMeta brokerMeta msgBody msgHash = do
|
||||
agentClientMsg externalPrevSndHash sender broker msgBody internalHash = do
|
||||
logServer "<--" c srv rId "MSG <MSG>"
|
||||
case status of
|
||||
Active -> do
|
||||
internalTs <- liftIO getCurrentTime
|
||||
(internalId, internalRcvId, prevExtSndId, prevRcvMsgHash) <- withStore (`updateRcvIds` connId)
|
||||
let msgIntegrity = checkMsgIntegrity prevExtSndId (fst senderMeta) prevRcvMsgHash receivedPrevMsgHash
|
||||
withStore $ \st ->
|
||||
createRcvMsg st connId $
|
||||
RcvMsgData
|
||||
{ internalId,
|
||||
internalRcvId,
|
||||
internalTs,
|
||||
senderMeta,
|
||||
brokerMeta,
|
||||
msgBody,
|
||||
internalHash = msgHash,
|
||||
externalPrevSndHash = receivedPrevMsgHash,
|
||||
msgIntegrity
|
||||
}
|
||||
notify
|
||||
MSG
|
||||
{ recipientMeta = (unId internalId, internalTs),
|
||||
senderMeta,
|
||||
brokerMeta,
|
||||
msgBody,
|
||||
msgIntegrity
|
||||
}
|
||||
let integrity = checkMsgIntegrity prevExtSndId (fst sender) prevRcvMsgHash externalPrevSndHash
|
||||
recipient = (unId internalId, internalTs)
|
||||
msgMeta = MsgMeta {integrity, recipient, sender, broker}
|
||||
rcvMsg = RcvMsgData {msgMeta, msgBody, internalRcvId, internalHash, externalPrevSndHash}
|
||||
withStore $ \st -> createRcvMsg st connId rcvMsg
|
||||
notify $ MSG msgMeta msgBody
|
||||
_ -> prohibited
|
||||
|
||||
checkMsgIntegrity :: PrevExternalSndId -> ExternalSndId -> PrevRcvMsgHash -> ByteString -> MsgIntegrity
|
||||
|
||||
@@ -32,6 +32,7 @@ module Simplex.Messaging.Agent.Protocol
|
||||
ACommand (..),
|
||||
AParty (..),
|
||||
SAParty (..),
|
||||
MsgMeta (..),
|
||||
SMPMessage (..),
|
||||
AMessage (..),
|
||||
SMPServer (..),
|
||||
@@ -166,14 +167,7 @@ data ACommand (p :: AParty) where
|
||||
-- STAT :: QueueDirection -> Maybe QueueStatus -> Maybe SubMode -> ACommand Agent
|
||||
SEND :: MsgBody -> ACommand Client
|
||||
SENT :: AgentMsgId -> ACommand Agent
|
||||
MSG ::
|
||||
{ recipientMeta :: (AgentMsgId, UTCTime),
|
||||
brokerMeta :: (MsgId, UTCTime),
|
||||
senderMeta :: (AgentMsgId, UTCTime),
|
||||
msgIntegrity :: MsgIntegrity,
|
||||
msgBody :: MsgBody
|
||||
} ->
|
||||
ACommand Agent
|
||||
MSG :: MsgMeta -> MsgBody -> ACommand Agent
|
||||
-- ACK :: AgentMsgId -> ACommand Client
|
||||
-- RCVD :: AgentMsgId -> ACommand Agent
|
||||
OFF :: ACommand Client
|
||||
@@ -185,6 +179,15 @@ deriving instance Eq (ACommand p)
|
||||
|
||||
deriving instance Show (ACommand p)
|
||||
|
||||
-- | Agent message metadata sent to the client
|
||||
data MsgMeta = MsgMeta
|
||||
{ integrity :: MsgIntegrity,
|
||||
recipient :: (AgentMsgId, UTCTime),
|
||||
broker :: (MsgId, UTCTime),
|
||||
sender :: (AgentMsgId, UTCTime)
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
-- | SMP message formats.
|
||||
data SMPMessage
|
||||
= -- | SMP confirmation
|
||||
@@ -496,16 +499,16 @@ commandP =
|
||||
sendCmd = ACmd SClient . SEND <$> A.takeByteString
|
||||
sentResp = ACmd SAgent . SENT <$> A.decimal
|
||||
iconMsg = ACmd SAgent . ICON <$> A.takeTill wordEnd
|
||||
message = do
|
||||
msgIntegrity <- msgIntegrityP <* A.space
|
||||
recipientMeta <- "R=" *> partyMeta A.decimal
|
||||
brokerMeta <- "B=" *> partyMeta base64P
|
||||
senderMeta <- "S=" *> partyMeta A.decimal
|
||||
msgBody <- A.takeByteString
|
||||
return $ ACmd SAgent MSG {recipientMeta, brokerMeta, senderMeta, msgIntegrity, msgBody}
|
||||
message = ACmd SAgent <$> (MSG <$> msgMetaP <* A.space <*> A.takeByteString)
|
||||
msgMetaP = do
|
||||
integrity <- msgIntegrityP
|
||||
recipient <- " R=" *> partyMeta A.decimal
|
||||
broker <- " B=" *> partyMeta base64P
|
||||
sender <- " S=" *> partyMeta A.decimal
|
||||
pure MsgMeta {integrity, recipient, broker, sender}
|
||||
introP f = f <$> A.takeTill (== ' ') <* A.space <*> A.takeByteString
|
||||
replyMode = ReplyMode <$> (" NO_REPLY" $> Off <|> pure On)
|
||||
partyMeta idParser = (,) <$> idParser <* "," <*> tsISO8601P <* A.space
|
||||
partyMeta idParser = (,) <$> idParser <* "," <*> tsISO8601P
|
||||
agentError = ACmd SAgent . ERR <$> agentErrorTypeP
|
||||
|
||||
-- | Message integrity validation result parser.
|
||||
@@ -534,15 +537,8 @@ serializeCommand = \case
|
||||
END -> "END"
|
||||
SEND msgBody -> "SEND " <> serializeMsg msgBody
|
||||
SENT mId -> "SENT " <> bshow mId
|
||||
MSG {recipientMeta = (rmId, rTs), brokerMeta = (bmId, bTs), senderMeta = (smId, sTs), msgIntegrity, msgBody} ->
|
||||
B.unwords
|
||||
[ "MSG",
|
||||
serializeMsgIntegrity msgIntegrity,
|
||||
"R=" <> bshow rmId <> "," <> showTs rTs,
|
||||
"B=" <> encode bmId <> "," <> showTs bTs,
|
||||
"S=" <> bshow smId <> "," <> showTs sTs,
|
||||
serializeMsg msgBody
|
||||
]
|
||||
MSG msgMeta msgBody ->
|
||||
"MSG " <> serializeMsgMeta msgMeta <> " " <> serializeMsg msgBody
|
||||
OFF -> "OFF"
|
||||
DEL -> "DEL"
|
||||
CON -> "CON"
|
||||
@@ -556,6 +552,14 @@ serializeCommand = \case
|
||||
ReplyMode On -> ""
|
||||
showTs :: UTCTime -> ByteString
|
||||
showTs = B.pack . formatISO8601Millis
|
||||
serializeMsgMeta :: MsgMeta -> ByteString
|
||||
serializeMsgMeta MsgMeta {integrity, recipient = (rmId, rTs), broker = (bmId, bTs), sender = (smId, sTs)} =
|
||||
B.unwords
|
||||
[ serializeMsgIntegrity integrity,
|
||||
"R=" <> bshow rmId <> "," <> showTs rTs,
|
||||
"B=" <> encode bmId <> "," <> showTs bTs,
|
||||
"S=" <> bshow smId <> "," <> showTs sTs
|
||||
]
|
||||
|
||||
-- | Serialize message integrity validation result.
|
||||
serializeMsgIntegrity :: MsgIntegrity -> ByteString
|
||||
@@ -636,7 +640,7 @@ tGet party h = liftIO (tGetRaw h) >>= tParseLoadBody
|
||||
cmdWithMsgBody :: ACommand p -> m (Either AgentErrorType (ACommand p))
|
||||
cmdWithMsgBody = \case
|
||||
SEND body -> SEND <$$> getMsgBody body
|
||||
MSG agentMsgId srvTS agentTS integrity body -> MSG agentMsgId srvTS agentTS integrity <$$> getMsgBody body
|
||||
MSG msgMeta body -> MSG msgMeta <$$> getMsgBody body
|
||||
INTRO introId cInfo -> INTRO introId <$$> getMsgBody cInfo
|
||||
REQ introId cInfo -> REQ introId <$$> getMsgBody cInfo
|
||||
ACPT introId cInfo -> ACPT introId <$$> getMsgBody cInfo
|
||||
|
||||
@@ -170,15 +170,11 @@ type PrevSndMsgHash = MsgHash
|
||||
-- * Message data containers - used on Msg creation to reduce number of parameters
|
||||
|
||||
data RcvMsgData = RcvMsgData
|
||||
{ internalId :: InternalId,
|
||||
internalRcvId :: InternalRcvId,
|
||||
internalTs :: InternalTs,
|
||||
senderMeta :: (ExternalSndId, ExternalSndTs),
|
||||
brokerMeta :: (BrokerId, BrokerTs),
|
||||
{ msgMeta :: MsgMeta,
|
||||
msgBody :: MsgBody,
|
||||
internalRcvId :: InternalRcvId,
|
||||
internalHash :: MsgHash,
|
||||
externalPrevSndHash :: MsgHash,
|
||||
msgIntegrity :: MsgIntegrity
|
||||
externalPrevSndHash :: MsgHash
|
||||
}
|
||||
|
||||
data SndMsgData = SndMsgData
|
||||
|
||||
@@ -746,7 +746,8 @@ updateLastIdsRcv_ dbConn connId newInternalId newInternalRcvId =
|
||||
-- * createRcvMsg helpers
|
||||
|
||||
insertRcvMsgBase_ :: DB.Connection -> ConnId -> RcvMsgData -> IO ()
|
||||
insertRcvMsgBase_ dbConn connId RcvMsgData {..} = do
|
||||
insertRcvMsgBase_ dbConn connId RcvMsgData {msgMeta, msgBody, internalRcvId} = do
|
||||
let MsgMeta {recipient = (internalId, internalTs)} = msgMeta
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
[sql|
|
||||
@@ -763,7 +764,8 @@ insertRcvMsgBase_ dbConn connId RcvMsgData {..} = do
|
||||
]
|
||||
|
||||
insertRcvMsgDetails_ :: DB.Connection -> ConnId -> RcvMsgData -> IO ()
|
||||
insertRcvMsgDetails_ dbConn connId RcvMsgData {..} =
|
||||
insertRcvMsgDetails_ dbConn connId RcvMsgData {msgMeta, internalRcvId, internalHash, externalPrevSndHash} = do
|
||||
let MsgMeta {integrity, recipient, sender, broker} = msgMeta
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
[sql|
|
||||
@@ -778,19 +780,19 @@ insertRcvMsgDetails_ dbConn connId RcvMsgData {..} =
|
||||
|]
|
||||
[ ":conn_alias" := connId,
|
||||
":internal_rcv_id" := internalRcvId,
|
||||
":internal_id" := internalId,
|
||||
":external_snd_id" := fst senderMeta,
|
||||
":external_snd_ts" := snd senderMeta,
|
||||
":broker_id" := fst brokerMeta,
|
||||
":broker_ts" := snd brokerMeta,
|
||||
":internal_id" := fst recipient,
|
||||
":external_snd_id" := fst sender,
|
||||
":external_snd_ts" := snd sender,
|
||||
":broker_id" := fst broker,
|
||||
":broker_ts" := snd broker,
|
||||
":rcv_status" := Received,
|
||||
":internal_hash" := internalHash,
|
||||
":external_prev_snd_hash" := externalPrevSndHash,
|
||||
":integrity" := msgIntegrity
|
||||
":integrity" := integrity
|
||||
]
|
||||
|
||||
updateHashRcv_ :: DB.Connection -> ConnId -> RcvMsgData -> IO ()
|
||||
updateHashRcv_ dbConn connId RcvMsgData {..} =
|
||||
updateHashRcv_ dbConn connId RcvMsgData {msgMeta, internalHash, internalRcvId} =
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
-- last_internal_rcv_msg_id equality check prevents race condition in case next id was reserved
|
||||
@@ -801,7 +803,7 @@ updateHashRcv_ dbConn connId RcvMsgData {..} =
|
||||
WHERE conn_alias = :conn_alias
|
||||
AND last_internal_rcv_msg_id = :last_internal_rcv_msg_id;
|
||||
|]
|
||||
[ ":last_external_snd_msg_id" := fst senderMeta,
|
||||
[ ":last_external_snd_msg_id" := fst (sender msgMeta),
|
||||
":last_rcv_msg_hash" := internalHash,
|
||||
":conn_alias" := connId,
|
||||
":last_internal_rcv_msg_id" := internalRcvId
|
||||
|
||||
@@ -101,7 +101,7 @@ h #:# err = tryGet `shouldReturn` ()
|
||||
_ -> return ()
|
||||
|
||||
pattern Msg :: MsgBody -> ACommand 'Agent
|
||||
pattern Msg msgBody <- MSG {msgBody, msgIntegrity = MsgOk}
|
||||
pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} msgBody
|
||||
|
||||
testDuplexConnection :: Transport c => TProxy c -> c -> c -> IO ()
|
||||
testDuplexConnection _ alice bob = do
|
||||
|
||||
@@ -402,21 +402,24 @@ ts = UTCTime (fromGregorian 2021 02 24) (secondsToDiffTime 0)
|
||||
mkRcvMsgData :: InternalId -> InternalRcvId -> ExternalSndId -> BrokerId -> MsgHash -> RcvMsgData
|
||||
mkRcvMsgData internalId internalRcvId externalSndId brokerId internalHash =
|
||||
RcvMsgData
|
||||
{ internalId,
|
||||
internalRcvId,
|
||||
internalTs = ts,
|
||||
senderMeta = (externalSndId, ts),
|
||||
brokerMeta = (brokerId, ts),
|
||||
{ internalRcvId,
|
||||
msgMeta =
|
||||
MsgMeta
|
||||
{ integrity = MsgOk,
|
||||
recipient = (unId internalId, ts),
|
||||
sender = (externalSndId, ts),
|
||||
broker = (brokerId, ts)
|
||||
},
|
||||
msgBody = hw,
|
||||
internalHash,
|
||||
externalPrevSndHash = "hash_from_sender",
|
||||
msgIntegrity = MsgOk
|
||||
externalPrevSndHash = "hash_from_sender"
|
||||
}
|
||||
|
||||
testCreateRcvMsg' :: SQLiteStore -> PrevExternalSndId -> PrevRcvMsgHash -> ConnId -> RcvMsgData -> Expectation
|
||||
testCreateRcvMsg' st expectedPrevSndId expectedPrevHash connId rcvMsgData@RcvMsgData {..} = do
|
||||
let MsgMeta {recipient = (internalId, _)} = msgMeta
|
||||
updateRcvIds st connId
|
||||
`returnsResult` (internalId, internalRcvId, expectedPrevSndId, expectedPrevHash)
|
||||
`returnsResult` (InternalId internalId, internalRcvId, expectedPrevSndId, expectedPrevHash)
|
||||
createRcvMsg st connId rcvMsgData
|
||||
`returnsResult` ()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user