From 70fe7616f2fe29e09467684bbbc781669f4f8db6 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 20 Jan 2021 21:01:54 +0000 Subject: [PATCH] Sending messages end to end (#21) * duplex connection end-to-end (working, debug logs) * agent: send, receive, acknowledge messages * logging proposal * logging: client/server (dis)connections * agent scenario testing framework * add tests, remove logs * clean up --- docs/logging.md | 70 ++++++++++++++++++++ src/Simplex/Messaging/Agent.hs | 59 ++++++++++++++--- src/Simplex/Messaging/Agent/Transmission.hs | 62 ++++++++++++----- src/Simplex/Messaging/Client.hs | 15 +++-- src/Simplex/Messaging/Server.hs | 2 +- src/Simplex/Messaging/Server/Transmission.hs | 13 ++-- src/Simplex/Messaging/Transport.hs | 34 +++++----- tests/AgentTests.hs | 27 ++++++++ tests/SMPAgentClient.hs | 63 +++++++++++++++--- tests/SMPClient.hs | 19 ++++-- 10 files changed, 292 insertions(+), 72 deletions(-) create mode 100644 docs/logging.md diff --git a/docs/logging.md b/docs/logging.md new file mode 100644 index 000000000..04d2784aa --- /dev/null +++ b/docs/logging.md @@ -0,0 +1,70 @@ +# SMP agent logging + +## Problem and proposed solution. + +SMP agent performs multiple actions in response to the client commands and to the messages received from other SMP agents (wrapped in SMP protocol messages). + +Customary approach for the network protocol clients is to have a debug/verbose mode that enables logging of all sent and received messages and any other actions that the client performs. + +This document proposes a logging format for SMP agent that would be enabled if the agent is run with `--verbose` / `-v` command line option. + +We can also consider logging the database operations that change the data. + +## Types of actions and the associated log line format. + +### Client connected / disconnected + +``` +client n connected to Agent +client n disconnected from Agent +``` + +where `n` is a sequential number of a connected agent client, starting from 1 (over the agent run-time). + +### Server connected / disconnected + +``` +Agent connected to host:port +Agent disconnected from host:port +``` + +### Received command from the client + +``` +n --> A : corrId connAlias parsed_command // raw_command +``` + +`raw_command` is added only in case of parsing failure. + +### Sent command to SMP server + +``` +A --> host:port : corrId queueId parsed_command +``` + +### Received response / message from the SMP server + +``` +A <-- host:port : corrId queueId parsed_command // raw_command +``` + +In case the response is a message or notification, corrId should be replaced with `_` + +### Interpreted ("unwrapped") SMP message as agent message + +``` +Agent msg : connAlias parsed_message // raw_message +``` + +### Sent response / message to the client + +``` +n <-- A : corrId connAlias parsed_command // raw_command +``` + +### Database changes + +``` +DB : insert/delete/update table key +DB : insert/delete/update table key +``` \ No newline at end of file diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index cc885eeac..a20ba3817 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -42,7 +42,7 @@ runSMPAgent cfg@AgentConfig {tcpPort} = do where smpAgent :: (MonadUnliftIO m', MonadReader Env m') => m' () smpAgent = runTCPServer tcpPort $ \h -> do - putLn h "Welcome to SMP v0.2.0 agent" + liftIO $ putLn h "Welcome to SMP v0.2.0 agent" q <- asks $ tbqSize . config c <- atomically $ newAgentClient q race_ (connectClient h c) (runClient c) @@ -102,6 +102,8 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) = case cmd of NEW smpServer -> createNewConnection smpServer JOIN smpQueueInfo replyMode -> joinConnection smpQueueInfo replyMode + SEND msgBody -> sendMessage msgBody + ACK aMsgId -> ackMessage aMsgId _ -> throwError PROHIBITED where createNewConnection :: SMPServer -> m () @@ -126,11 +128,35 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) = ReplyOff -> return () respond CON + sendMessage :: SMP.MsgBody -> m () + sendMessage msgBody = + withStore (`getConn` connAlias) >>= \case + SomeConn _ (DuplexConnection _ _ sq) -> sendMsg sq + SomeConn _ (SendConnection _ sq) -> sendMsg sq + -- TODO possibly there should be a separate error type trying to send the message to the connection without SendQueue + _ -> throwError PROHIBITED -- NOT_READY ? + where + sendMsg sq = do + sendAgentMessage sq $ A_MSG msgBody + -- TODO respond $ SENT aMsgId + respond OK + + ackMessage :: AgentMsgId -> m () + ackMessage _aMsgId = + withStore (`getConn` connAlias) >>= \case + SomeConn _ (DuplexConnection _ rq _) -> ackMsg rq + SomeConn _ (ReceiveConnection _ rq) -> ackMsg rq + -- TODO possibly there should be a separate error type trying to send the message to the connection without SendQueue + -- NOT_READY ? + _ -> throwError PROHIBITED + where + ackMsg rq = sendAck c rq >> respond OK + sendReplyQInfo :: SMPServer -> SendQueue -> m () - sendReplyQInfo srv sndQueue = do - (rcvQueue, qInfo) <- newReceiveQueue srv - withStore $ \st -> addRcvQueue st connAlias rcvQueue - sendAgentMessage sndQueue $ REPLY qInfo + sendReplyQInfo srv sq = do + (rq, qInfo) <- newReceiveQueue srv + withStore $ \st -> addRcvQueue st connAlias rq + sendAgentMessage sq $ REPLY qInfo newReceiveQueue :: SMPServer -> m (ReceiveQueue, SMPQueueInfo) newReceiveQueue server = do @@ -180,34 +206,40 @@ processSMPTransmission :: m () processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do case cmd of - SMP.MSG _msgId _ts msgBody -> do + SMP.MSG _ srvTs msgBody -> do -- TODO deduplicate with previously received (connAlias, rcvQueue@ReceiveQueue {decryptKey, status}) <- withStore $ \st -> getReceiveQueue st srv rId agentMsg <- liftEither . parseSMPMessage =<< decryptMessage decryptKey msgBody case agentMsg of SMPConfirmation senderKey -> do case status of - New -> + New -> do -- TODO currently it automatically allows whoever sends the confirmation -- Commands CONF and LET are not implemented yet -- They are probably not needed in v0.2? -- TODO notification that connection confirmed? secureQueue rcvQueue senderKey + sendAck c rcvQueue s -> -- TODO maybe send notification to the user liftIO . putStrLn $ "unexpected SMP confirmation, queue status " <> show s - SMPMessage {agentMessage} -> + SMPMessage {agentMessage, agentMsgId, agentTimestamp} -> case agentMessage of HELLO _verifyKey _ -> do -- TODO send status update to the user? withStore $ \st -> updateRcvQueueStatus st rcvQueue Active + sendAck c rcvQueue REPLY qInfo -> do (sndQueue, senderKey) <- newSendQueue qInfo withStore $ \st -> addSndQueue st connAlias sndQueue sendConfirmation c sndQueue senderKey sendHello c sndQueue atomically $ writeTBQueue sndQ ("", connAlias, CON) - A_MSG _msgBody -> return () + sendAck c rcvQueue + A_MSG body -> do + -- TODO check message status + let msg = MSG agentMsgId agentTimestamp srvTs MsgOk body + atomically $ writeTBQueue sndQ ("", connAlias, msg) return () SMP.END -> return () _ -> liftIO $ do @@ -296,7 +328,7 @@ sendHello :: m () sendHello c sq@SendQueue {server, sndId, sndPrivateKey, encryptKey} = do smp <- getSMPServerClient c server - msg <- mkHello "" $ AckMode On -- TODO verifyKey + msg <- mkHello "5678" $ AckMode On -- TODO verifyKey _send smp 20 msg withStore $ \st -> updateSndQueueStatus st sq Active where @@ -315,6 +347,11 @@ sendHello c sq@SendQueue {server, sndId, sndPrivateKey, encryptKey} = do _ -> throwError INTERNAL -- TODO wrap client error in some constructor ) +sendAck :: (MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m) => AgentClient -> ReceiveQueue -> m () +sendAck c ReceiveQueue {server, rcvId, rcvPrivateKey} = do + smp <- getSMPServerClient c server + liftSMP $ ackSMPMessage smp rcvPrivateKey rcvId + mkAgentMessage :: (MonadUnliftIO m) => PrivateKey -> AMessage -> m ByteString mkAgentMessage _encKey agentMessage = do agentTimestamp <- liftIO getCurrentTime @@ -323,7 +360,7 @@ mkAgentMessage _encKey agentMessage = do SMPMessage { agentMsgId = 0, agentTimestamp, - previousMsgHash = "", + previousMsgHash = "1234", -- TODO hash of the previous message agentMessage } -- TODO encryption diff --git a/src/Simplex/Messaging/Agent/Transmission.hs b/src/Simplex/Messaging/Agent/Transmission.hs index 7eaa49456..bb42d5584 100644 --- a/src/Simplex/Messaging/Agent/Transmission.hs +++ b/src/Simplex/Messaging/Agent/Transmission.hs @@ -130,12 +130,12 @@ parseSMPMessage = parse (smpMessageP <* A.endOfLine) $ SYNTAX errBadMessage smpClientMessageP = SMPMessage <$> A.decimal <* A.space - <*> tsIso8601P <* A.space + <*> tsISO8601P <* A.space <*> base64P <* A.endOfLine <*> agentMessageP - tsIso8601P :: Parser UTCTime - tsIso8601P = maybe (fail "timestamp") pure . parseISO8601 . B.unpack =<< A.takeTill (== ' ') +tsISO8601P :: Parser UTCTime +tsISO8601P = maybe (fail "timestamp") pure . parseISO8601 . B.unpack =<< A.takeTill (== ' ') serializeSMPMessage :: SMPMessage -> ByteString serializeSMPMessage = \case @@ -164,7 +164,7 @@ agentMessageP = smpQueueInfoP :: Parser SMPQueueInfo smpQueueInfoP = - SMPQueueInfo <$> ("smp::" *> smpServerP <* "::") <*> (base64P <* "::") <*> base64P + "smp::" *> (SMPQueueInfo <$> smpServerP <* "::" <*> base64P <* "::" <*> base64P) smpServerP :: Parser SMPServer smpServerP = SMPServer <$> server <*> port <*> msgHash @@ -190,12 +190,12 @@ errParams = Left $ SYNTAX errBadParameters serializeAgentMessage :: AMessage -> ByteString serializeAgentMessage = \case - HELLO _verKey _ackMode -> "HELLO" -- TODO + HELLO verifyKey ackMode -> "HELLO " <> encode verifyKey <> if ackMode == AckMode Off then " NO_ACK" else "" REPLY qInfo -> "REPLY " <> serializeSmpQueueInfo qInfo - A_MSG msgBody -> "A_MSG " <> msgBody + A_MSG body -> "MSG " <> serializeMsg body <> "\n" serializeSmpQueueInfo :: SMPQueueInfo -> ByteString -serializeSmpQueueInfo (SMPQueueInfo srv qId ek) = "smp::" <> serializeServer srv <> "::" <> encode qId <> "::" <> encode ek +serializeSmpQueueInfo (SMPQueueInfo srv qId ek) = B.intercalate "::" ["smp", serializeServer srv, encode qId, encode ek] serializeServer :: SMPServer -> ByteString serializeServer SMPServer {host, port, keyHash} = B.pack $ host <> maybe "" (':' :) port <> maybe "" (('#' :) . B.unpack) keyHash @@ -233,7 +233,7 @@ data QueueDirection = SND | RCV deriving (Show) data QueueStatus = New | Confirmed | Secured | Active | Disabled deriving (Eq, Show, Read) -type AgentMsgId = Int +type AgentMsgId = Integer data MsgStatus = MsgOk | MsgError MsgErrorType deriving (Show) @@ -243,7 +243,6 @@ data MsgErrorType = MsgSkipped AgentMsgId AgentMsgId | MsgBadId AgentMsgId | Msg data ErrorType = UNKNOWN - | UNSUPPORTED -- TODO remove once all commands implemented | PROHIBITED | SYNTAX Int | BROKER Natural @@ -291,16 +290,30 @@ parseCommandP = "NEW " *> newCmd <|> "INV " *> invResp <|> "JOIN " *> joinCmd + <|> "SEND " *> sendCmd + <|> "MSG " *> message + <|> "ACK " *> acknowledge + -- <|> "ERR " *> agentError - TODO <|> "CON" $> ACmd SAgent CON <|> "OK" $> ACmd SAgent OK where newCmd = ACmd SClient . NEW <$> smpServerP invResp = ACmd SAgent . INV <$> smpQueueInfoP joinCmd = ACmd SClient <$> (JOIN <$> smpQueueInfoP <*> replyMode) + sendCmd = ACmd SClient <$> (SEND <$> A.takeByteString) + message = + let sp = A.space; msgId = A.decimal <* sp; ts = tsISO8601P <* sp; body = A.takeByteString + in ACmd SAgent <$> (MSG <$> msgId <*> ts <*> ts <*> status <* sp <*> body) + acknowledge = ACmd SClient <$> (ACK <$> A.decimal) replyMode = " NO_REPLY" $> ReplyOff <|> A.space *> (ReplyVia <$> smpServerP) <|> pure ReplyOn + status = "OK" $> MsgOk <|> "ERR " *> (MsgError <$> msgErrorType) + msgErrorType = + "ID " *> (MsgBadId <$> A.decimal) + <|> "NO_ID " *> (MsgSkipped <$> A.decimal <* A.space <*> A.decimal) + <|> "HASH" $> MsgBadHash parseCommand :: ByteString -> Either ErrorType ACmd parseCommand = parse parseCommandP $ SYNTAX errBadCommand @@ -310,6 +323,10 @@ serializeCommand = \case NEW srv -> "NEW " <> serializeServer srv INV qInfo -> "INV " <> serializeSmpQueueInfo qInfo JOIN qInfo rMode -> "JOIN " <> serializeSmpQueueInfo qInfo <> replyMode rMode + SEND msgBody -> "SEND " <> serializeMsg msgBody + MSG aMsgId aTs ts st body -> + B.unwords ["MSG", B.pack $ show aMsgId, B.pack $ formatISO8601Millis aTs, B.pack $ formatISO8601Millis ts, msgStatus st, serializeMsg body] + ACK aMsgId -> "ACK " <> B.pack (show aMsgId) CON -> "CON" ERR e -> "ERR " <> B.pack (show e) OK -> "OK" @@ -320,14 +337,27 @@ serializeCommand = \case ReplyOff -> " NO_REPLY" ReplyVia srv -> " " <> serializeServer srv ReplyOn -> "" + msgStatus :: MsgStatus -> ByteString + msgStatus = \case + MsgOk -> "OK" + MsgError e -> + "ERR" <> case e of + MsgSkipped fromMsgId toMsgId -> + B.unwords ["NO_ID", B.pack $ show fromMsgId, B.pack $ show toMsgId] + MsgBadId aMsgId -> "ID " <> B.pack (show aMsgId) + MsgBadHash -> "HASH" -tPutRaw :: MonadIO m => Handle -> ARawTransmission -> m () +-- TODO - save function as in the server Transmission - re-use? +serializeMsg :: ByteString -> ByteString +serializeMsg body = B.pack (show $ B.length body) <> "\n" <> body + +tPutRaw :: Handle -> ARawTransmission -> IO () tPutRaw h (corrId, connAlias, command) = do putLn h corrId putLn h connAlias putLn h command -tGetRaw :: MonadIO m => Handle -> m ARawTransmission +tGetRaw :: Handle -> IO ARawTransmission tGetRaw h = do corrId <- getLn h connAlias <- getLn h @@ -335,11 +365,12 @@ tGetRaw h = do return (corrId, connAlias, command) tPut :: MonadIO m => Handle -> ATransmission p -> m () -tPut h (corrId, connAlias, command) = tPutRaw h (bs corrId, connAlias, serializeCommand command) +tPut h (corrId, connAlias, command) = + liftIO $ tPutRaw h (bs corrId, connAlias, serializeCommand command) -- | get client and agent transmissions tGet :: forall m p. MonadIO m => SAParty p -> Handle -> m (ATransmissionOrError p) -tGet party h = tGetRaw h >>= tParseLoadBody +tGet party h = liftIO (tGetRaw h) >>= tParseLoadBody where tParseLoadBody :: ARawTransmission -> m (ATransmissionOrError p) tParseLoadBody t@(corrId, connAlias, command) = do @@ -370,13 +401,14 @@ tGet party h = tGetRaw h >>= tParseLoadBody MSG agentMsgId srvTS agentTS status body -> MSG agentMsgId srvTS agentTS status <$$> getMsgBody body cmd -> return $ Right cmd + -- TODO refactor with server getMsgBody :: MsgBody -> m (Either ErrorType MsgBody) getMsgBody msgBody = case B.unpack msgBody of ':' : body -> return . Right $ B.pack body str -> case readMaybe str :: Maybe Int of - Just size -> do - body <- getBytes h size + Just size -> liftIO $ do + body <- B.hGet h size s <- getLn h return $ if B.null s then Right body else Left SIZE Nothing -> return . Left $ SYNTAX errMessageBody diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index c6ef8d61a..d8ef4885f 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -94,14 +94,15 @@ getSMPClient smpServer@SMPServer {host, port} SMPClientConfig {qSize, defaultPor receive SMPClient {rcvQ} h = forever $ tGet fromServer h >>= atomically . writeTBQueue rcvQ process :: SMPClient -> IO () - process SMPClient {rcvQ, sentCommands} = forever . atomically $ do - (_, (corrId, qId, respOrErr)) <- readTBQueue rcvQ - cs <- readTVar sentCommands + process SMPClient {rcvQ, sentCommands} = forever $ do + (_, (corrId, qId, respOrErr)) <- atomically $ readTBQueue rcvQ + cs <- readTVarIO sentCommands case M.lookup corrId cs of - Nothing -> case respOrErr of - Right (Cmd SBroker cmd) -> writeTBQueue msgQ (smpServer, qId, cmd) - _ -> return () - Just Request {queueId, responseVar} -> do + Nothing -> do + case respOrErr of + Right (Cmd SBroker cmd) -> atomically $ writeTBQueue msgQ (smpServer, qId, cmd) + _ -> return () + Just Request {queueId, responseVar} -> atomically $ do modifyTVar sentCommands $ M.delete corrId putTMVar responseVar $ if queueId == qId diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 5cca687a3..371a2d10c 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -58,7 +58,7 @@ runSMPServer cfg@ServerConfig {tcpPort} = do runClient :: (MonadUnliftIO m, MonadReader Env m) => Handle -> m () runClient h = do - putLn h "Welcome to SMP v0.2.0" + liftIO $ putLn h "Welcome to SMP v0.2.0" q <- asks $ tbqSize . config c <- atomically $ newClient q s <- asks server diff --git a/src/Simplex/Messaging/Server/Transmission.hs b/src/Simplex/Messaging/Server/Transmission.hs index 54dce5066..c37cb92ea 100644 --- a/src/Simplex/Messaging/Server/Transmission.hs +++ b/src/Simplex/Messaging/Server/Transmission.hs @@ -191,14 +191,14 @@ errNoQueueId = 5 errMessageBody :: Int errMessageBody = 6 -tPutRaw :: MonadIO m => Handle -> RawTransmission -> m () +tPutRaw :: Handle -> RawTransmission -> IO () tPutRaw h (signature, corrId, queueId, command) = do putLn h signature putLn h corrId putLn h queueId putLn h command -tGetRaw :: MonadIO m => Handle -> m RawTransmission +tGetRaw :: Handle -> IO RawTransmission tGetRaw h = do signature <- getLn h corrId <- getLn h @@ -207,7 +207,8 @@ tGetRaw h = do return (signature, corrId, queueId, command) tPut :: MonadIO m => Handle -> Transmission -> m () -tPut h (signature, (corrId, queueId, command)) = tPutRaw h (encode signature, bs corrId, encode queueId, serializeCommand command) +tPut h (signature, (corrId, queueId, command)) = + liftIO $ tPutRaw h (encode signature, bs corrId, encode queueId, serializeCommand command) fromClient :: Cmd -> Either ErrorType Cmd fromClient = \case @@ -223,7 +224,7 @@ fromServer = \case -- `fromParty` is used to limit allowed senders - `fromClient` or `fromServer` should be used tGet :: forall m. MonadIO m => (Cmd -> Either ErrorType Cmd) -> Handle -> m TransmissionOrError tGet fromParty h = do - (signature, corrId, queueId, command) <- tGetRaw h + (signature, corrId, queueId, command) <- liftIO $ tGetRaw h let decodedTransmission = liftM2 (,corrId,,command) (decode signature) (decode queueId) either (const $ tError corrId) tParseLoadBody decodedTransmission where @@ -272,8 +273,8 @@ tGet fromParty h = do case B.unpack msgBody of ':' : body -> return . Right $ B.pack body str -> case readMaybe str :: Maybe Int of - Just size -> do - body <- getBytes h size + Just size -> liftIO $ do + body <- B.hGet h size s <- getLn h return $ if B.null s then Right body else Left SIZE Nothing -> return . Left $ SYNTAX errMessageBody diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index 4dd31c016..48764164f 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -2,6 +2,7 @@ {-# LANGUAGE BlockArguments #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} module Simplex.Messaging.Transport where @@ -20,8 +21,8 @@ import UnliftIO.Exception (Exception, IOException) import qualified UnliftIO.Exception as E import qualified UnliftIO.IO as IO -startTCPServer :: MonadIO m => ServiceName -> m Socket -startTCPServer port = liftIO . withSocketsDo $ resolve >>= open +startTCPServer :: ServiceName -> IO Socket +startTCPServer port = withSocketsDo $ resolve >>= open where resolve = do let hints = defaultHints {addrFlags = [AI_PASSIVE], addrSocketType = Stream} @@ -36,18 +37,18 @@ startTCPServer port = liftIO . withSocketsDo $ resolve >>= open runTCPServer :: MonadUnliftIO m => ServiceName -> (Handle -> m ()) -> m () runTCPServer port server = - E.bracket (startTCPServer port) (liftIO . close) $ \sock -> forever $ do - h <- acceptTCPConn sock + E.bracket (liftIO $ startTCPServer port) (liftIO . close) $ \sock -> forever $ do + h <- liftIO $ acceptTCPConn sock forkFinally (server h) (const $ IO.hClose h) -acceptTCPConn :: MonadIO m => Socket -> m Handle -acceptTCPConn sock = liftIO $ do +acceptTCPConn :: Socket -> IO Handle +acceptTCPConn sock = do (conn, _) <- accept sock getSocketHandle conn -startTCPClient :: MonadUnliftIO m => HostName -> ServiceName -> m Handle +startTCPClient :: HostName -> ServiceName -> IO Handle startTCPClient host port = - liftIO . withSocketsDo $ + withSocketsDo $ resolve >>= foldM tryOpen (Left err) >>= either E.throwIO return where err :: IOException @@ -70,22 +71,19 @@ startTCPClient host port = runTCPClient :: MonadUnliftIO m => HostName -> ServiceName -> (Handle -> m a) -> m a runTCPClient host port client = do - h <- startTCPClient host port + h <- liftIO $ startTCPClient host port client h `E.finally` IO.hClose h -getSocketHandle :: MonadIO m => Socket -> m Handle -getSocketHandle conn = liftIO $ do +getSocketHandle :: Socket -> IO Handle +getSocketHandle conn = do h <- socketToHandle conn ReadWriteMode hSetBinaryMode h True hSetNewlineMode h NewlineMode {inputNL = CRLF, outputNL = CRLF} hSetBuffering h LineBuffering return h -putLn :: MonadIO m => Handle -> ByteString -> m () -putLn h = liftIO . hPutStrLn h . B.unpack +putLn :: Handle -> ByteString -> IO () +putLn h = B.hPut h . (<> "\r\n") -getLn :: MonadIO m => Handle -> m ByteString -getLn h = B.pack <$> liftIO (hGetLine h) - -getBytes :: MonadIO m => Handle -> Int -> m ByteString -getBytes h = liftIO . B.hGet h +getLn :: Handle -> IO ByteString +getLn h = B.pack <$> hGetLine h diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index 1dfab7bbe..8255fa631 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -1,4 +1,6 @@ {-# LANGUAGE BlockArguments #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} @@ -9,12 +11,21 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import SMPAgentClient import Simplex.Messaging.Agent.Transmission +import System.IO (Handle) import Test.Hspec agentTests :: Spec agentTests = do describe "SQLite store" storeTests describe "SMP agent protocol syntax" syntaxTests + describe "Establishing duplex connection" do + it "should connect via one server and one agent" $ + smpAgentTest2_1 testDuplexConnection1 + it "should connect via one server and two agents" $ + smpAgentTest2 testDuplexConnection1 + +sendRecv :: Handle -> (ByteString, ByteString, ByteString) -> IO (ATransmissionOrError 'Agent) +sendRecv h (corrId, connAlias, cmd) = tPutRaw h (corrId, connAlias, cmd) >> tGet SAgent h (>#>) :: ARawTransmission -> ARawTransmission -> Expectation command >#> response = smpAgentTest command `shouldReturn` response @@ -22,6 +33,22 @@ command >#> response = smpAgentTest command `shouldReturn` response (>#>=) :: ARawTransmission -> ((ByteString, ByteString, [ByteString]) -> Bool) -> Expectation command >#>= p = smpAgentTest command >>= (`shouldSatisfy` p . \(cId, cAlias, cmd) -> (cId, cAlias, B.words cmd)) +testDuplexConnection1 :: Handle -> Handle -> IO () +testDuplexConnection1 alice bob = do + ("1", "bob", Right (INV qInfo)) <- sendRecv alice ("1", "bob", "NEW localhost:5000") + ("11", "alice", Right CON) <- sendRecv bob ("11", "alice", "JOIN " <> serializeSmpQueueInfo qInfo) + ("", "bob", Right CON) <- tGet SAgent alice + ("2", "bob", Right OK) <- sendRecv alice ("2", "bob", "SEND :hello") + ("3", "bob", Right OK) <- sendRecv alice ("3", "bob", "SEND :how are you?") + ("", "alice", Right (MSG _ _ _ _ "hello")) <- tGet SAgent bob + ("12", "alice", Right OK) <- sendRecv bob ("12", "alice", "ACK 0") + ("", "alice", Right (MSG _ _ _ _ "how are you?")) <- tGet SAgent bob + ("13", "alice", Right OK) <- sendRecv bob ("13", "alice", "ACK 0") + ("14", "alice", Right OK) <- sendRecv bob ("14", "alice", "SEND 9\nhello too") + ("", "bob", Right (MSG _ _ _ _ "hello too")) <- tGet SAgent alice + ("4", "bob", Right OK) <- sendRecv alice ("4", "bob", "ACK 0") + return () + syntaxTests :: Spec syntaxTests = do it "unknown command" $ ("1", "5678", "HELLO") >#> ("1", "5678", "ERR SYNTAX 11") diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index d94fd785a..7399c8695 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -1,6 +1,7 @@ {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} module SMPAgentClient where @@ -14,6 +15,7 @@ import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Transmission import Simplex.Messaging.Client (SMPClientConfig (..)) import Simplex.Messaging.Transport +import Test.Hspec import UnliftIO.Concurrent import UnliftIO.Directory import qualified UnliftIO.Exception as E @@ -25,15 +27,54 @@ agentTestHost = "localhost" agentTestPort :: ServiceName agentTestPort = "5001" +agentTestPort2 :: ServiceName +agentTestPort2 = "5011" + testDB :: String testDB = "smp-agent.test.protocol.db" +testDB2 :: String +testDB2 = "smp-agent2.test.protocol.db" + smpAgentTest :: ARawTransmission -> IO ARawTransmission smpAgentTest cmd = runSmpAgentTest $ \h -> tPutRaw h cmd >> tGetRaw h runSmpAgentTest :: (MonadUnliftIO m, MonadRandom m) => (Handle -> m a) -> m a runSmpAgentTest test = withSmpServer . withSmpAgent $ testSMPAgentClient test +runSmpAgentTestN :: forall m a. (MonadUnliftIO m, MonadRandom m) => [(ServiceName, String)] -> ([Handle] -> m a) -> m a +runSmpAgentTestN agents test = withSmpServer $ run agents [] + where + run :: [(ServiceName, String)] -> [Handle] -> m a + run [] hs = test hs + run (a@(p, _) : as) hs = withSmpAgentOn a $ testSMPAgentClientOn p $ \h -> run as (h : hs) + +runSmpAgentTestN_1 :: forall m a. (MonadUnliftIO m, MonadRandom m) => Int -> ([Handle] -> m a) -> m a +runSmpAgentTestN_1 nClients test = withSmpServer . withSmpAgent $ run nClients [] + where + run :: Int -> [Handle] -> m a + run 0 hs = test hs + run n hs = testSMPAgentClient $ \h -> run (n - 1) (h : hs) + +smpAgentTestN :: [(ServiceName, String)] -> ([Handle] -> IO ()) -> Expectation +smpAgentTestN agents test' = runSmpAgentTestN agents test' `shouldReturn` () + +smpAgentTestN_1 :: Int -> ([Handle] -> IO ()) -> Expectation +smpAgentTestN_1 n test' = runSmpAgentTestN_1 n test' `shouldReturn` () + +smpAgentTest2 :: (Handle -> Handle -> IO ()) -> Expectation +smpAgentTest2 test' = + smpAgentTestN [(agentTestPort, testDB), (agentTestPort2, testDB2)] _test + where + _test [h1, h2] = test' h1 h2 + _test _ = error "expected 2 handles" + +smpAgentTest2_1 :: (Handle -> Handle -> IO ()) -> Expectation +smpAgentTest2_1 test' = smpAgentTestN_1 2 _test + where + _test [h1, h2] = test' h1 h2 + _test _ = error "expected 2 handles" + cfg :: AgentConfig cfg = AgentConfig @@ -48,18 +89,24 @@ cfg = } } -withSmpAgent :: (MonadUnliftIO m, MonadRandom m) => m a -> m a -withSmpAgent = +withSmpAgentOn :: (MonadUnliftIO m, MonadRandom m) => (ServiceName, String) -> m a -> m a +withSmpAgentOn (port', db') = E.bracket - (forkIO $ runSMPAgent cfg) - (liftIO . killThread >=> const (removeFile testDB)) + (forkIO $ runSMPAgent cfg {tcpPort = port', dbFile = db'}) + (liftIO . killThread >=> const (removeFile db')) . const -testSMPAgentClient :: MonadUnliftIO m => (Handle -> m a) -> m a -testSMPAgentClient client = do +withSmpAgent :: (MonadUnliftIO m, MonadRandom m) => m a -> m a +withSmpAgent = withSmpAgentOn (agentTestPort, testDB) + +testSMPAgentClientOn :: MonadUnliftIO m => ServiceName -> (Handle -> m a) -> m a +testSMPAgentClientOn port' client = do threadDelay 50_000 -- TODO hack: thread delay for SMP agent to start - runTCPClient agentTestHost agentTestPort $ \h -> do - line <- getLn h + runTCPClient agentTestHost port' $ \h -> do + line <- liftIO $ getLn h if line == "Welcome to SMP v0.2.0 agent" then client h else error "not connected" + +testSMPAgentClient :: MonadUnliftIO m => (Handle -> m a) -> m a +testSMPAgentClient = testSMPAgentClientOn agentTestPort diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 926da68e4..1386014ea 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -26,7 +26,7 @@ testSMPClient :: MonadUnliftIO m => (Handle -> m a) -> m a testSMPClient client = do threadDelay 5000 -- TODO hack: thread delay for SMP server to start runTCPClient testHost testPort $ \h -> do - line <- getLn h + line <- liftIO $ getLn h if line == "Welcome to SMP v0.2.0" then client h else error "not connected" @@ -40,18 +40,25 @@ cfg = msgIdBytes = 6 } +withSmpServerOn :: (MonadUnliftIO m, MonadRandom m) => ServiceName -> m a -> m a +withSmpServerOn port = + E.bracket + (forkIO $ runSMPServer cfg {tcpPort = port}) + (liftIO . killThread) + . const + withSmpServer :: (MonadUnliftIO m, MonadRandom m) => m a -> m a -withSmpServer = E.bracket (forkIO $ runSMPServer cfg) (liftIO . killThread) . const +withSmpServer = withSmpServerOn testPort runSmpTest :: (MonadUnliftIO m, MonadRandom m) => (Handle -> m a) -> m a runSmpTest test = withSmpServer $ testSMPClient test runSmpTestN :: forall m a. (MonadUnliftIO m, MonadRandom m) => Int -> ([Handle] -> m a) -> m a -runSmpTestN nClients test = withSmpServer $ run [] nClients +runSmpTestN nClients test = withSmpServer $ run nClients [] where - run :: [Handle] -> Int -> m a - run hs 0 = test hs - run hs n = testSMPClient $ \h -> run (h : hs) (n - 1) + run :: Int -> [Handle] -> m a + run 0 hs = test hs + run n hs = testSMPClient $ \h -> run (n - 1) (h : hs) smpServerTest :: RawTransmission -> IO RawTransmission smpServerTest cmd = runSmpTest $ \h -> tPutRaw h cmd >> tGetRaw h