mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-19 02:25:13 +00:00
parse agent messages with attoparsec (#18)
* parse agent messages with attoparsec (WIP) * agent: refactor parsers * agent: parse commands and responses with attoparsec * refactor UTCTime parser * fix: updateRcvQueueStatus args * remove outdated comment * message parsing: PR feedback Co-authored-by: Efim Poberezkin <efim.poberezkin@gmail.com>
This commit is contained in:
committed by
Efim Poberezkin
parent
d47b57ac87
commit
65dd693c83
+1
-1
@@ -13,6 +13,7 @@ extra-source-files:
|
||||
|
||||
dependencies:
|
||||
- async == 2.2.*
|
||||
- attoparsec == 0.13.*
|
||||
- base >= 4.7 && < 5
|
||||
- base64-bytestring >= 1.0 && < 1.3
|
||||
- bytestring == 0.10.*
|
||||
@@ -21,7 +22,6 @@ dependencies:
|
||||
- iso8601-time == 0.1.*
|
||||
- mtl
|
||||
- network == 3.1.*
|
||||
- split == 0.2.*
|
||||
- sqlite-simple == 0.4.*
|
||||
- stm
|
||||
- template-haskell == 2.15.*
|
||||
|
||||
@@ -113,23 +113,24 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
|
||||
respond $ INV qInfo
|
||||
|
||||
joinConnection :: SMPQueueInfo -> ReplyMode -> m ()
|
||||
joinConnection qInfo replyMode = do
|
||||
joinConnection qInfo@(SMPQueueInfo srv _ _) replyMode = do
|
||||
-- TODO create connection alias if not passed
|
||||
-- make connAlias Maybe?
|
||||
(sndQueue, senderKey) <- newSendQueue qInfo
|
||||
withStore $ \st -> createSndConn st connAlias sndQueue
|
||||
sendConfirmation c sndQueue senderKey
|
||||
sendHello c sndQueue
|
||||
sendReplyQueue sndQueue replyMode
|
||||
respond OK
|
||||
case replyMode of
|
||||
ReplyOn -> sendReplyQInfo srv sndQueue
|
||||
ReplyVia srv' -> sendReplyQInfo srv' sndQueue
|
||||
ReplyOff -> return ()
|
||||
respond CON
|
||||
|
||||
sendReplyQueue :: SendQueue -> ReplyMode -> m ()
|
||||
sendReplyQueue sndQueue = \case
|
||||
ReplyOff -> return ()
|
||||
ReplyOn srv -> do
|
||||
(rcvQueue, qInfo) <- newReceiveQueue srv
|
||||
withStore $ \st -> addRcvQueue st connAlias rcvQueue
|
||||
sendAgentMessage sndQueue $ REPLY qInfo
|
||||
sendReplyQInfo :: SMPServer -> SendQueue -> m ()
|
||||
sendReplyQInfo srv sndQueue = do
|
||||
(rcvQueue, qInfo) <- newReceiveQueue srv
|
||||
withStore $ \st -> addRcvQueue st connAlias rcvQueue
|
||||
sendAgentMessage sndQueue $ REPLY qInfo
|
||||
|
||||
newReceiveQueue :: SMPServer -> m (ReceiveQueue, SMPQueueInfo)
|
||||
newReceiveQueue server = do
|
||||
@@ -197,7 +198,9 @@ processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do
|
||||
liftIO . putStrLn $ "unexpected SMP confirmation, queue status " <> show s
|
||||
SMPMessage {agentMessage} ->
|
||||
case agentMessage of
|
||||
HELLO _verifyKey _ -> return ()
|
||||
HELLO _verifyKey _ -> do
|
||||
-- TODO send status update to the user?
|
||||
withStore $ \st -> updateRcvQueueStatus st rcvQueue Active
|
||||
REPLY qInfo -> do
|
||||
(sndQueue, senderKey) <- newSendQueue qInfo
|
||||
withStore $ \st -> addSndQueue st connAlias sndQueue
|
||||
|
||||
@@ -12,13 +12,17 @@
|
||||
|
||||
module Simplex.Messaging.Agent.Transmission where
|
||||
|
||||
import Control.Monad
|
||||
import Control.Applicative ((<|>))
|
||||
import Control.Monad.IO.Class
|
||||
import Data.Attoparsec.ByteString.Char8 (Parser)
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString.Base64
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Char (isAlphaNum)
|
||||
import Data.Functor
|
||||
import Data.Kind
|
||||
import Data.List.Split (splitOn)
|
||||
import Data.Time.Clock (UTCTime)
|
||||
import Data.Time.ISO8601
|
||||
import Data.Type.Equality
|
||||
@@ -103,60 +107,83 @@ data SMPMessage
|
||||
previousMsgHash :: ByteString,
|
||||
agentMessage :: AMessage
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
data AMessage where
|
||||
HELLO :: VerificationKey -> AckMode -> AMessage
|
||||
REPLY :: SMPQueueInfo -> AMessage
|
||||
A_MSG :: MsgBody -> AMessage
|
||||
deriving (Show)
|
||||
|
||||
parseSMPMessage :: ByteString -> Either ErrorType SMPMessage
|
||||
parseSMPMessage _ = Left INTERNAL
|
||||
parseSMPMessage = parse (smpMessageP <* A.endOfLine) $ SYNTAX errBadMessage
|
||||
where
|
||||
smpMessageP :: Parser SMPMessage
|
||||
smpMessageP =
|
||||
smpConfirmationP <* A.endOfLine
|
||||
<|> A.endOfLine *> smpClientMessageP
|
||||
|
||||
smpConfirmationP :: Parser SMPMessage
|
||||
smpConfirmationP = SMPConfirmation <$> ("KEY " *> base64P <* A.endOfLine)
|
||||
|
||||
smpClientMessageP :: Parser SMPMessage
|
||||
smpClientMessageP =
|
||||
SMPMessage
|
||||
<$> A.decimal <* A.space
|
||||
<*> tsIso8601P <* A.space
|
||||
<*> base64P <* A.endOfLine
|
||||
<*> agentMessageP
|
||||
|
||||
tsIso8601P :: Parser UTCTime
|
||||
tsIso8601P = maybe (fail "timestamp") pure . parseISO8601 . B.unpack =<< A.takeTill (== ' ')
|
||||
|
||||
serializeSMPMessage :: SMPMessage -> ByteString
|
||||
serializeSMPMessage = \case
|
||||
SMPConfirmation sKey -> "KEY " <> sKey <> "\r\n\r\n"
|
||||
SMPConfirmation sKey -> smpMessage ("KEY " <> encode sKey) "" ""
|
||||
SMPMessage {agentMsgId, agentTimestamp, previousMsgHash, agentMessage} ->
|
||||
"\r\n" <> messageHeader agentMsgId agentTimestamp previousMsgHash <> "\r\n" <> serializeAgentMessage agentMessage
|
||||
let header = messageHeader agentMsgId agentTimestamp previousMsgHash
|
||||
body = serializeAgentMessage agentMessage
|
||||
in smpMessage "" header body
|
||||
where
|
||||
messageHeader agentMsgId agentTimestamp previousMsgHash =
|
||||
B.unwords [B.pack $ show agentMsgId, B.pack (formatISO8601Millis agentTimestamp), encode previousMsgHash]
|
||||
messageHeader msgId ts prevMsgHash =
|
||||
B.unwords [B.pack $ show msgId, B.pack $ formatISO8601Millis ts, encode prevMsgHash]
|
||||
smpMessage smpHeader aHeader aBody = B.intercalate "\n" [smpHeader, aHeader, aBody, ""]
|
||||
|
||||
agentMessageP :: Parser AMessage
|
||||
agentMessageP =
|
||||
"HELLO " *> hello
|
||||
<|> "REPLY " *> reply
|
||||
<|> "MSG " *> a_msg
|
||||
where
|
||||
hello = HELLO <$> base64P <*> ackMode
|
||||
reply = REPLY <$> smpQueueInfoP
|
||||
a_msg = do
|
||||
size :: Int <- A.decimal
|
||||
A_MSG <$> (A.endOfLine *> A.take size <* A.endOfLine)
|
||||
ackMode = " NO_ACK" $> AckMode Off <|> pure (AckMode On)
|
||||
|
||||
smpQueueInfoP :: Parser SMPQueueInfo
|
||||
smpQueueInfoP =
|
||||
SMPQueueInfo <$> ("smp::" *> smpServerP <* "::") <*> (base64P <* "::") <*> base64P
|
||||
|
||||
smpServerP :: Parser SMPServer
|
||||
smpServerP = SMPServer <$> server <*> port <*> msgHash
|
||||
where
|
||||
server = B.unpack <$> A.takeTill (A.inClass ":# ")
|
||||
port = A.char ':' *> (Just . show <$> (A.decimal :: Parser Int)) <|> pure Nothing
|
||||
msgHash = A.char '#' *> (Just <$> base64P) <|> pure Nothing
|
||||
|
||||
base64P :: Parser ByteString
|
||||
base64P = do
|
||||
str <- A.takeWhile1 (\c -> isAlphaNum c || c == '+' || c == '/')
|
||||
pad <- A.takeWhile (== '=')
|
||||
either fail pure $ decode (str <> pad)
|
||||
|
||||
parseAgentMessage :: ByteString -> Either ErrorType AMessage
|
||||
parseAgentMessage msg = case B.words msg of
|
||||
["HELLO", key, ackMode] -> HELLO key <$> parseAckMode ackMode
|
||||
["REPLY", qInfo] -> REPLY <$> parseSmpQueueInfo qInfo
|
||||
["A_MSG", msgBody] -> Right $ A_MSG msgBody
|
||||
_ -> Left UNKNOWN
|
||||
parseAgentMessage = parse agentMessageP $ SYNTAX errBadMessage
|
||||
|
||||
parseSmpQueueInfo :: ByteString -> Either ErrorType SMPQueueInfo
|
||||
parseSmpQueueInfo qInfo = case splitOn "::" $ B.unpack qInfo of
|
||||
["smp", srv, qId, ek] -> liftM3 SMPQueueInfo (parseSmpServer $ B.pack srv) (parseDec64 qId) (parseDec64 ek)
|
||||
_ -> Left $ SYNTAX errBadInvitation
|
||||
|
||||
parseSmpServer :: ByteString -> Either ErrorType SMPServer
|
||||
parseSmpServer srv =
|
||||
let (s, kf) = span (/= '#') $ B.unpack srv
|
||||
(h, p) = span (/= ':') s
|
||||
in SMPServer h (parseSrvPart p) <$> traverse parseDec64 (parseSrvPart kf)
|
||||
|
||||
parseDec64 :: String -> Either ErrorType ByteString
|
||||
parseDec64 s = case decode $ B.pack s of
|
||||
Left _ -> Left $ SYNTAX errBadEncoding
|
||||
Right b -> Right b
|
||||
|
||||
parseSrvPart :: String -> Maybe String
|
||||
parseSrvPart s = if length s > 1 then Just $ tail s else Nothing
|
||||
|
||||
parseAckMode :: ByteString -> Either ErrorType AckMode
|
||||
parseAckMode am = case B.split '=' am of
|
||||
["ACK", mode] -> AckMode <$> getMode mode
|
||||
_ -> errParams
|
||||
|
||||
getMode :: ByteString -> Either ErrorType Mode
|
||||
getMode mode = case mode of
|
||||
"ON" -> Right On
|
||||
"OFF" -> Right Off
|
||||
_ -> errParams
|
||||
parse :: Parser a -> e -> (ByteString -> Either e a)
|
||||
parse parser err = first (const err) . A.parseOnly (parser <* A.endOfInput)
|
||||
|
||||
errParams :: Either ErrorType a
|
||||
errParams = Left $ SYNTAX errBadParameters
|
||||
@@ -165,7 +192,7 @@ serializeAgentMessage :: AMessage -> ByteString
|
||||
serializeAgentMessage = \case
|
||||
HELLO _verKey _ackMode -> "HELLO" -- TODO
|
||||
REPLY qInfo -> "REPLY " <> serializeSmpQueueInfo qInfo
|
||||
A_MSG msgBody -> "A_MSG " <> msgBody -- ? whitespaces missing
|
||||
A_MSG msgBody -> "A_MSG " <> msgBody
|
||||
|
||||
serializeSmpQueueInfo :: SMPQueueInfo -> ByteString
|
||||
serializeSmpQueueInfo (SMPQueueInfo srv qId ek) = "smp::" <> serializeServer srv <> "::" <> encode qId <> "::" <> encode ek
|
||||
@@ -195,7 +222,7 @@ newtype SubMode = SubMode Mode deriving (Show)
|
||||
data SMPQueueInfo = SMPQueueInfo SMPServer SenderId EncryptionKey
|
||||
deriving (Show)
|
||||
|
||||
data ReplyMode = ReplyOn SMPServer | ReplyOff deriving (Show)
|
||||
data ReplyMode = ReplyOff | ReplyOn | ReplyVia SMPServer deriving (Show)
|
||||
|
||||
type EncryptionKey = PublicKey
|
||||
|
||||
@@ -235,12 +262,21 @@ data AckErrorType = AckUnknown | AckProhibited | AckSyntax Int -- etc.
|
||||
errBadEncoding :: Int
|
||||
errBadEncoding = 10
|
||||
|
||||
errBadCommand :: Int
|
||||
errBadCommand = 11
|
||||
|
||||
errBadInvitation :: Int
|
||||
errBadInvitation = 12
|
||||
|
||||
errNoConnAlias :: Int
|
||||
errNoConnAlias = 13
|
||||
|
||||
errBadMessage :: Int
|
||||
errBadMessage = 14
|
||||
|
||||
errBadServer :: Int
|
||||
errBadServer = 15
|
||||
|
||||
smpErrTCPConnection :: Natural
|
||||
smpErrTCPConnection = 1
|
||||
|
||||
@@ -250,47 +286,40 @@ smpErrCorrelationId = 2
|
||||
smpUnexpectedResponse :: Natural
|
||||
smpUnexpectedResponse = 3
|
||||
|
||||
parseCommand :: ByteString -> Either ErrorType ACmd
|
||||
parseCommand command = case B.words command of
|
||||
["NEW", srv] -> newConn srv -- . Right $ AckMode On
|
||||
-- ["NEW", srv, am] -> newConn srv $ parseAckMode am
|
||||
["INV", qInfo] -> ACmd SAgent . INV <$> parseSmpQueueInfo qInfo
|
||||
"JOIN" : qInfo : ws -> joinConn qInfo ws
|
||||
["CON"] -> Right . ACmd SAgent $ CON
|
||||
"NEW" : _ -> errParams
|
||||
"INV" : _ -> errParams
|
||||
"JOIN" : _ -> errParams
|
||||
"CON" : _ -> errParams
|
||||
_ -> Left UNKNOWN
|
||||
parseCommandP :: Parser ACmd
|
||||
parseCommandP =
|
||||
"NEW " *> newCmd
|
||||
<|> "INV " *> invResp
|
||||
<|> "JOIN " *> joinCmd
|
||||
<|> "CON" $> ACmd SAgent CON
|
||||
<|> "OK" $> ACmd SAgent OK
|
||||
where
|
||||
newConn :: ByteString -> Either ErrorType ACmd
|
||||
newConn srv = ACmd SClient . NEW <$> parseSmpServer srv
|
||||
newCmd = ACmd SClient . NEW <$> smpServerP
|
||||
invResp = ACmd SAgent . INV <$> smpQueueInfoP
|
||||
joinCmd = ACmd SClient <$> (JOIN <$> smpQueueInfoP <*> replyMode)
|
||||
replyMode =
|
||||
" NO_REPLY" $> ReplyOff
|
||||
<|> A.space *> (ReplyVia <$> smpServerP)
|
||||
<|> pure ReplyOn
|
||||
|
||||
joinConn :: ByteString -> [ByteString] -> Either ErrorType ACmd
|
||||
joinConn qInfo ws = do
|
||||
q <- parseSmpQueueInfo qInfo
|
||||
case ws of
|
||||
[] -> let SMPQueueInfo srv _ _ = q in joinCmd q $ ReplyOn srv
|
||||
["NO_REPLY"] -> joinCmd q ReplyOff
|
||||
[srv] -> do
|
||||
s <- parseSmpServer srv
|
||||
joinCmd q $ ReplyOn s
|
||||
_ -> errParams
|
||||
where
|
||||
joinCmd q r = return $ ACmd SClient $ JOIN q r
|
||||
parseCommand :: ByteString -> Either ErrorType ACmd
|
||||
parseCommand = parse parseCommandP $ SYNTAX errBadCommand
|
||||
|
||||
serializeCommand :: ACommand p -> ByteString
|
||||
serializeCommand = \case
|
||||
NEW srv -> "NEW " <> serializeServer srv
|
||||
INV qInfo -> "INV " <> serializeSmpQueueInfo qInfo
|
||||
JOIN qInfo rMode ->
|
||||
"JOIN " <> serializeSmpQueueInfo qInfo <> " "
|
||||
<> case rMode of
|
||||
ReplyOff -> "NO_REPLY"
|
||||
ReplyOn srv -> serializeServer srv
|
||||
JOIN qInfo rMode -> "JOIN " <> serializeSmpQueueInfo qInfo <> replyMode rMode
|
||||
CON -> "CON"
|
||||
ERR e -> "ERR " <> B.pack (show e)
|
||||
OK -> "OK"
|
||||
c -> B.pack $ show c
|
||||
where
|
||||
replyMode :: ReplyMode -> ByteString
|
||||
replyMode = \case
|
||||
ReplyOff -> " NO_REPLY"
|
||||
ReplyVia srv -> " " <> serializeServer srv
|
||||
ReplyOn -> ""
|
||||
|
||||
tPutRaw :: MonadIO m => Handle -> ARawTransmission -> m ()
|
||||
tPutRaw h (corrId, connAlias, command) = do
|
||||
|
||||
+5
-6
@@ -24,7 +24,7 @@ command >#>= p = smpAgentTest command >>= (`shouldSatisfy` p . \(cId, cAlias, cm
|
||||
|
||||
syntaxTests :: Spec
|
||||
syntaxTests = do
|
||||
it "unknown command" $ ("1", "5678", "HELLO") >#> ("1", "5678", "ERR UNKNOWN")
|
||||
it "unknown command" $ ("1", "5678", "HELLO") >#> ("1", "5678", "ERR SYNTAX 11")
|
||||
describe "NEW" do
|
||||
describe "valid" do
|
||||
-- TODO: ERROR no connection alias in the response (it does not generate it yet if not provided)
|
||||
@@ -35,17 +35,16 @@ syntaxTests = do
|
||||
it "with port and keyHash" $ ("214", "", "NEW localhost:5000#1234") >#>= \case ("214", "", "INV" : _) -> True; _ -> False
|
||||
describe "invalid" do
|
||||
-- TODO: add tests with defined connection alias
|
||||
it "no parameters" $ ("221", "", "NEW") >#> ("221", "", "ERR SYNTAX 2")
|
||||
it "many parameters" $ ("222", "", "NEW localhost:5000 hi") >#> ("222", "", "ERR SYNTAX 2")
|
||||
it "invalid server keyHash" $ ("223", "", "NEW localhost:5000#1") >#> ("223", "", "ERR SYNTAX 10")
|
||||
it "no parameters" $ ("221", "", "NEW") >#> ("221", "", "ERR SYNTAX 11")
|
||||
it "many parameters" $ ("222", "", "NEW localhost:5000 hi") >#> ("222", "", "ERR SYNTAX 11")
|
||||
it "invalid server keyHash" $ ("223", "", "NEW localhost:5000#1") >#> ("223", "", "ERR SYNTAX 11")
|
||||
|
||||
describe "JOIN" do
|
||||
describe "valid" do
|
||||
-- TODO: ERROR no connection alias in the response (it does not generate it yet if not provided)
|
||||
-- TODO: add tests with defined connection alias
|
||||
-- TODO: JOIN is not merged yet - to be added
|
||||
it "using same server as in invitation" $
|
||||
("311", "", "JOIN smp::localhost:5000::1234::5678") >#> ("311", "", "ERR SMP AUTH")
|
||||
describe "invalid" do
|
||||
-- TODO: JOIN is not merged yet - to be added
|
||||
it "no parameters" $ ("321", "", "JOIN") >#> ("321", "", "ERR SYNTAX 2")
|
||||
it "no parameters" $ ("321", "", "JOIN") >#> ("321", "", "ERR SYNTAX 11")
|
||||
|
||||
@@ -57,7 +57,7 @@ withSmpAgent =
|
||||
|
||||
testSMPAgentClient :: MonadUnliftIO m => (Handle -> m a) -> m a
|
||||
testSMPAgentClient client = do
|
||||
threadDelay 100_000 -- TODO hack: thread delay for SMP agent to start
|
||||
threadDelay 50_000 -- TODO hack: thread delay for SMP agent to start
|
||||
runTCPClient agentTestHost agentTestPort $ \h -> do
|
||||
line <- getLn h
|
||||
if line == "Welcome to SMP v0.2.0 agent"
|
||||
|
||||
Reference in New Issue
Block a user