From 65dd693c830b688b8943efb702a4ceeb3bd003ac Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 18 Jan 2021 20:53:02 +0000 Subject: [PATCH] parse agent messages with attoparsec (#18) * parse agent messages with attoparsec (WIP) * agent: refactor parsers * agent: parse commands and responses with attoparsec * refactor UTCTime parser * fix: updateRcvQueueStatus args * remove outdated comment * message parsing: PR feedback Co-authored-by: Efim Poberezkin --- package.yaml | 2 +- src/Simplex/Messaging/Agent.hs | 25 +-- src/Simplex/Messaging/Agent/Transmission.hs | 177 ++++++++++++-------- tests/AgentTests.hs | 11 +- tests/SMPAgentClient.hs | 2 +- 5 files changed, 124 insertions(+), 93 deletions(-) diff --git a/package.yaml b/package.yaml index ea587f3b6..8e74c8eea 100644 --- a/package.yaml +++ b/package.yaml @@ -13,6 +13,7 @@ extra-source-files: dependencies: - async == 2.2.* + - attoparsec == 0.13.* - base >= 4.7 && < 5 - base64-bytestring >= 1.0 && < 1.3 - bytestring == 0.10.* @@ -21,7 +22,6 @@ dependencies: - iso8601-time == 0.1.* - mtl - network == 3.1.* - - split == 0.2.* - sqlite-simple == 0.4.* - stm - template-haskell == 2.15.* diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 2802f3d18..cc885eeac 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -113,23 +113,24 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) = respond $ INV qInfo joinConnection :: SMPQueueInfo -> ReplyMode -> m () - joinConnection qInfo replyMode = do + joinConnection qInfo@(SMPQueueInfo srv _ _) replyMode = do -- TODO create connection alias if not passed -- make connAlias Maybe? (sndQueue, senderKey) <- newSendQueue qInfo withStore $ \st -> createSndConn st connAlias sndQueue sendConfirmation c sndQueue senderKey sendHello c sndQueue - sendReplyQueue sndQueue replyMode - respond OK + case replyMode of + ReplyOn -> sendReplyQInfo srv sndQueue + ReplyVia srv' -> sendReplyQInfo srv' sndQueue + ReplyOff -> return () + respond CON - sendReplyQueue :: SendQueue -> ReplyMode -> m () - sendReplyQueue sndQueue = \case - ReplyOff -> return () - ReplyOn srv -> do - (rcvQueue, qInfo) <- newReceiveQueue srv - withStore $ \st -> addRcvQueue st connAlias rcvQueue - sendAgentMessage sndQueue $ REPLY qInfo + sendReplyQInfo :: SMPServer -> SendQueue -> m () + sendReplyQInfo srv sndQueue = do + (rcvQueue, qInfo) <- newReceiveQueue srv + withStore $ \st -> addRcvQueue st connAlias rcvQueue + sendAgentMessage sndQueue $ REPLY qInfo newReceiveQueue :: SMPServer -> m (ReceiveQueue, SMPQueueInfo) newReceiveQueue server = do @@ -197,7 +198,9 @@ processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do liftIO . putStrLn $ "unexpected SMP confirmation, queue status " <> show s SMPMessage {agentMessage} -> case agentMessage of - HELLO _verifyKey _ -> return () + HELLO _verifyKey _ -> do + -- TODO send status update to the user? + withStore $ \st -> updateRcvQueueStatus st rcvQueue Active REPLY qInfo -> do (sndQueue, senderKey) <- newSendQueue qInfo withStore $ \st -> addSndQueue st connAlias sndQueue diff --git a/src/Simplex/Messaging/Agent/Transmission.hs b/src/Simplex/Messaging/Agent/Transmission.hs index 2d45efd74..7eaa49456 100644 --- a/src/Simplex/Messaging/Agent/Transmission.hs +++ b/src/Simplex/Messaging/Agent/Transmission.hs @@ -12,13 +12,17 @@ module Simplex.Messaging.Agent.Transmission where -import Control.Monad +import Control.Applicative ((<|>)) import Control.Monad.IO.Class +import Data.Attoparsec.ByteString.Char8 (Parser) +import qualified Data.Attoparsec.ByteString.Char8 as A +import Data.Bifunctor (first) import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import Data.Char (isAlphaNum) +import Data.Functor import Data.Kind -import Data.List.Split (splitOn) import Data.Time.Clock (UTCTime) import Data.Time.ISO8601 import Data.Type.Equality @@ -103,60 +107,83 @@ data SMPMessage previousMsgHash :: ByteString, agentMessage :: AMessage } + deriving (Show) data AMessage where HELLO :: VerificationKey -> AckMode -> AMessage REPLY :: SMPQueueInfo -> AMessage A_MSG :: MsgBody -> AMessage + deriving (Show) parseSMPMessage :: ByteString -> Either ErrorType SMPMessage -parseSMPMessage _ = Left INTERNAL +parseSMPMessage = parse (smpMessageP <* A.endOfLine) $ SYNTAX errBadMessage + where + smpMessageP :: Parser SMPMessage + smpMessageP = + smpConfirmationP <* A.endOfLine + <|> A.endOfLine *> smpClientMessageP + + smpConfirmationP :: Parser SMPMessage + smpConfirmationP = SMPConfirmation <$> ("KEY " *> base64P <* A.endOfLine) + + smpClientMessageP :: Parser SMPMessage + smpClientMessageP = + SMPMessage + <$> A.decimal <* A.space + <*> tsIso8601P <* A.space + <*> base64P <* A.endOfLine + <*> agentMessageP + + tsIso8601P :: Parser UTCTime + tsIso8601P = maybe (fail "timestamp") pure . parseISO8601 . B.unpack =<< A.takeTill (== ' ') serializeSMPMessage :: SMPMessage -> ByteString serializeSMPMessage = \case - SMPConfirmation sKey -> "KEY " <> sKey <> "\r\n\r\n" + SMPConfirmation sKey -> smpMessage ("KEY " <> encode sKey) "" "" SMPMessage {agentMsgId, agentTimestamp, previousMsgHash, agentMessage} -> - "\r\n" <> messageHeader agentMsgId agentTimestamp previousMsgHash <> "\r\n" <> serializeAgentMessage agentMessage + let header = messageHeader agentMsgId agentTimestamp previousMsgHash + body = serializeAgentMessage agentMessage + in smpMessage "" header body where - messageHeader agentMsgId agentTimestamp previousMsgHash = - B.unwords [B.pack $ show agentMsgId, B.pack (formatISO8601Millis agentTimestamp), encode previousMsgHash] + messageHeader msgId ts prevMsgHash = + B.unwords [B.pack $ show msgId, B.pack $ formatISO8601Millis ts, encode prevMsgHash] + smpMessage smpHeader aHeader aBody = B.intercalate "\n" [smpHeader, aHeader, aBody, ""] + +agentMessageP :: Parser AMessage +agentMessageP = + "HELLO " *> hello + <|> "REPLY " *> reply + <|> "MSG " *> a_msg + where + hello = HELLO <$> base64P <*> ackMode + reply = REPLY <$> smpQueueInfoP + a_msg = do + size :: Int <- A.decimal + A_MSG <$> (A.endOfLine *> A.take size <* A.endOfLine) + ackMode = " NO_ACK" $> AckMode Off <|> pure (AckMode On) + +smpQueueInfoP :: Parser SMPQueueInfo +smpQueueInfoP = + SMPQueueInfo <$> ("smp::" *> smpServerP <* "::") <*> (base64P <* "::") <*> base64P + +smpServerP :: Parser SMPServer +smpServerP = SMPServer <$> server <*> port <*> msgHash + where + server = B.unpack <$> A.takeTill (A.inClass ":# ") + port = A.char ':' *> (Just . show <$> (A.decimal :: Parser Int)) <|> pure Nothing + msgHash = A.char '#' *> (Just <$> base64P) <|> pure Nothing + +base64P :: Parser ByteString +base64P = do + str <- A.takeWhile1 (\c -> isAlphaNum c || c == '+' || c == '/') + pad <- A.takeWhile (== '=') + either fail pure $ decode (str <> pad) parseAgentMessage :: ByteString -> Either ErrorType AMessage -parseAgentMessage msg = case B.words msg of - ["HELLO", key, ackMode] -> HELLO key <$> parseAckMode ackMode - ["REPLY", qInfo] -> REPLY <$> parseSmpQueueInfo qInfo - ["A_MSG", msgBody] -> Right $ A_MSG msgBody - _ -> Left UNKNOWN +parseAgentMessage = parse agentMessageP $ SYNTAX errBadMessage -parseSmpQueueInfo :: ByteString -> Either ErrorType SMPQueueInfo -parseSmpQueueInfo qInfo = case splitOn "::" $ B.unpack qInfo of - ["smp", srv, qId, ek] -> liftM3 SMPQueueInfo (parseSmpServer $ B.pack srv) (parseDec64 qId) (parseDec64 ek) - _ -> Left $ SYNTAX errBadInvitation - -parseSmpServer :: ByteString -> Either ErrorType SMPServer -parseSmpServer srv = - let (s, kf) = span (/= '#') $ B.unpack srv - (h, p) = span (/= ':') s - in SMPServer h (parseSrvPart p) <$> traverse parseDec64 (parseSrvPart kf) - -parseDec64 :: String -> Either ErrorType ByteString -parseDec64 s = case decode $ B.pack s of - Left _ -> Left $ SYNTAX errBadEncoding - Right b -> Right b - -parseSrvPart :: String -> Maybe String -parseSrvPart s = if length s > 1 then Just $ tail s else Nothing - -parseAckMode :: ByteString -> Either ErrorType AckMode -parseAckMode am = case B.split '=' am of - ["ACK", mode] -> AckMode <$> getMode mode - _ -> errParams - -getMode :: ByteString -> Either ErrorType Mode -getMode mode = case mode of - "ON" -> Right On - "OFF" -> Right Off - _ -> errParams +parse :: Parser a -> e -> (ByteString -> Either e a) +parse parser err = first (const err) . A.parseOnly (parser <* A.endOfInput) errParams :: Either ErrorType a errParams = Left $ SYNTAX errBadParameters @@ -165,7 +192,7 @@ serializeAgentMessage :: AMessage -> ByteString serializeAgentMessage = \case HELLO _verKey _ackMode -> "HELLO" -- TODO REPLY qInfo -> "REPLY " <> serializeSmpQueueInfo qInfo - A_MSG msgBody -> "A_MSG " <> msgBody -- ? whitespaces missing + A_MSG msgBody -> "A_MSG " <> msgBody serializeSmpQueueInfo :: SMPQueueInfo -> ByteString serializeSmpQueueInfo (SMPQueueInfo srv qId ek) = "smp::" <> serializeServer srv <> "::" <> encode qId <> "::" <> encode ek @@ -195,7 +222,7 @@ newtype SubMode = SubMode Mode deriving (Show) data SMPQueueInfo = SMPQueueInfo SMPServer SenderId EncryptionKey deriving (Show) -data ReplyMode = ReplyOn SMPServer | ReplyOff deriving (Show) +data ReplyMode = ReplyOff | ReplyOn | ReplyVia SMPServer deriving (Show) type EncryptionKey = PublicKey @@ -235,12 +262,21 @@ data AckErrorType = AckUnknown | AckProhibited | AckSyntax Int -- etc. errBadEncoding :: Int errBadEncoding = 10 +errBadCommand :: Int +errBadCommand = 11 + errBadInvitation :: Int errBadInvitation = 12 errNoConnAlias :: Int errNoConnAlias = 13 +errBadMessage :: Int +errBadMessage = 14 + +errBadServer :: Int +errBadServer = 15 + smpErrTCPConnection :: Natural smpErrTCPConnection = 1 @@ -250,47 +286,40 @@ smpErrCorrelationId = 2 smpUnexpectedResponse :: Natural smpUnexpectedResponse = 3 -parseCommand :: ByteString -> Either ErrorType ACmd -parseCommand command = case B.words command of - ["NEW", srv] -> newConn srv -- . Right $ AckMode On - -- ["NEW", srv, am] -> newConn srv $ parseAckMode am - ["INV", qInfo] -> ACmd SAgent . INV <$> parseSmpQueueInfo qInfo - "JOIN" : qInfo : ws -> joinConn qInfo ws - ["CON"] -> Right . ACmd SAgent $ CON - "NEW" : _ -> errParams - "INV" : _ -> errParams - "JOIN" : _ -> errParams - "CON" : _ -> errParams - _ -> Left UNKNOWN +parseCommandP :: Parser ACmd +parseCommandP = + "NEW " *> newCmd + <|> "INV " *> invResp + <|> "JOIN " *> joinCmd + <|> "CON" $> ACmd SAgent CON + <|> "OK" $> ACmd SAgent OK where - newConn :: ByteString -> Either ErrorType ACmd - newConn srv = ACmd SClient . NEW <$> parseSmpServer srv + newCmd = ACmd SClient . NEW <$> smpServerP + invResp = ACmd SAgent . INV <$> smpQueueInfoP + joinCmd = ACmd SClient <$> (JOIN <$> smpQueueInfoP <*> replyMode) + replyMode = + " NO_REPLY" $> ReplyOff + <|> A.space *> (ReplyVia <$> smpServerP) + <|> pure ReplyOn - joinConn :: ByteString -> [ByteString] -> Either ErrorType ACmd - joinConn qInfo ws = do - q <- parseSmpQueueInfo qInfo - case ws of - [] -> let SMPQueueInfo srv _ _ = q in joinCmd q $ ReplyOn srv - ["NO_REPLY"] -> joinCmd q ReplyOff - [srv] -> do - s <- parseSmpServer srv - joinCmd q $ ReplyOn s - _ -> errParams - where - joinCmd q r = return $ ACmd SClient $ JOIN q r +parseCommand :: ByteString -> Either ErrorType ACmd +parseCommand = parse parseCommandP $ SYNTAX errBadCommand serializeCommand :: ACommand p -> ByteString serializeCommand = \case NEW srv -> "NEW " <> serializeServer srv INV qInfo -> "INV " <> serializeSmpQueueInfo qInfo - JOIN qInfo rMode -> - "JOIN " <> serializeSmpQueueInfo qInfo <> " " - <> case rMode of - ReplyOff -> "NO_REPLY" - ReplyOn srv -> serializeServer srv + JOIN qInfo rMode -> "JOIN " <> serializeSmpQueueInfo qInfo <> replyMode rMode CON -> "CON" ERR e -> "ERR " <> B.pack (show e) + OK -> "OK" c -> B.pack $ show c + where + replyMode :: ReplyMode -> ByteString + replyMode = \case + ReplyOff -> " NO_REPLY" + ReplyVia srv -> " " <> serializeServer srv + ReplyOn -> "" tPutRaw :: MonadIO m => Handle -> ARawTransmission -> m () tPutRaw h (corrId, connAlias, command) = do diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index eb2fdb10b..1dfab7bbe 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -24,7 +24,7 @@ command >#>= p = smpAgentTest command >>= (`shouldSatisfy` p . \(cId, cAlias, cm syntaxTests :: Spec syntaxTests = do - it "unknown command" $ ("1", "5678", "HELLO") >#> ("1", "5678", "ERR UNKNOWN") + it "unknown command" $ ("1", "5678", "HELLO") >#> ("1", "5678", "ERR SYNTAX 11") describe "NEW" do describe "valid" do -- TODO: ERROR no connection alias in the response (it does not generate it yet if not provided) @@ -35,17 +35,16 @@ syntaxTests = do it "with port and keyHash" $ ("214", "", "NEW localhost:5000#1234") >#>= \case ("214", "", "INV" : _) -> True; _ -> False describe "invalid" do -- TODO: add tests with defined connection alias - it "no parameters" $ ("221", "", "NEW") >#> ("221", "", "ERR SYNTAX 2") - it "many parameters" $ ("222", "", "NEW localhost:5000 hi") >#> ("222", "", "ERR SYNTAX 2") - it "invalid server keyHash" $ ("223", "", "NEW localhost:5000#1") >#> ("223", "", "ERR SYNTAX 10") + it "no parameters" $ ("221", "", "NEW") >#> ("221", "", "ERR SYNTAX 11") + it "many parameters" $ ("222", "", "NEW localhost:5000 hi") >#> ("222", "", "ERR SYNTAX 11") + it "invalid server keyHash" $ ("223", "", "NEW localhost:5000#1") >#> ("223", "", "ERR SYNTAX 11") describe "JOIN" do describe "valid" do -- TODO: ERROR no connection alias in the response (it does not generate it yet if not provided) -- TODO: add tests with defined connection alias - -- TODO: JOIN is not merged yet - to be added it "using same server as in invitation" $ ("311", "", "JOIN smp::localhost:5000::1234::5678") >#> ("311", "", "ERR SMP AUTH") describe "invalid" do -- TODO: JOIN is not merged yet - to be added - it "no parameters" $ ("321", "", "JOIN") >#> ("321", "", "ERR SYNTAX 2") + it "no parameters" $ ("321", "", "JOIN") >#> ("321", "", "ERR SYNTAX 11") diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index 9fd3536ff..d94fd785a 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -57,7 +57,7 @@ withSmpAgent = testSMPAgentClient :: MonadUnliftIO m => (Handle -> m a) -> m a testSMPAgentClient client = do - threadDelay 100_000 -- TODO hack: thread delay for SMP agent to start + threadDelay 50_000 -- TODO hack: thread delay for SMP agent to start runTCPClient agentTestHost agentTestPort $ \h -> do line <- getLn h if line == "Welcome to SMP v0.2.0 agent"