From 4b8f6417f85826a270d5b58d1ec0a636fcc9b800 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 28 Dec 2020 13:55:53 +0000 Subject: [PATCH 1/4] add correlationId to SMP transmissions, fix tests --- src/Simplex/Messaging/Server.hs | 38 ++++++------- src/Simplex/Messaging/Server/Transmission.hs | 8 +-- src/Simplex/Messaging/Transport.hs | 33 +++++++----- tests/SMPClient.hs | 2 +- tests/Test.hs | 57 +++++++++++--------- 5 files changed, 76 insertions(+), 62 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 85e1b3395..5508081c3 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -51,7 +51,7 @@ runSMPServer cfg@ServerConfig {tcpPort} = do (rId, clnt) <- readTBQueue subscribedQ cs <- readTVar subscribers case M.lookup rId cs of - Just Client {rcvQ} -> writeTBQueue rcvQ (rId, Cmd SBroker END) + Just Client {rcvQ} -> writeTBQueue rcvQ (B.empty, rId, Cmd SBroker END) Nothing -> return () writeTVar subscribers $ M.insert rId clnt cs @@ -75,10 +75,10 @@ cancelSub = \case receive :: (MonadUnliftIO m, MonadReader Env m) => Handle -> Client -> m () receive h Client {rcvQ} = forever $ do - (signature, (queueId, cmdOrError)) <- tGet fromClient h + (signature, (corrId, queueId, cmdOrError)) <- tGet fromClient h signed <- case cmdOrError of - Left e -> return . mkResp queueId $ ERR e - Right cmd -> verifyTransmission signature queueId cmd + Left e -> return . mkResp corrId queueId $ ERR e + Right cmd -> verifyTransmission (signature, (corrId, queueId, cmd)) atomically $ writeTBQueue rcvQ signed send :: MonadUnliftIO m => Handle -> Client -> m () @@ -86,12 +86,12 @@ send h Client {sndQ} = forever $ do signed <- atomically $ readTBQueue sndQ tPut h (B.empty, signed) -mkResp :: QueueId -> Command 'Broker -> Signed -mkResp queueId command = (queueId, Cmd SBroker command) +mkResp :: CorrelationId -> QueueId -> Command 'Broker -> Signed +mkResp corrId queueId command = (corrId, queueId, Cmd SBroker command) -verifyTransmission :: forall m. (MonadUnliftIO m, MonadReader Env m) => Signature -> QueueId -> Cmd -> m Signed -verifyTransmission signature queueId cmd = do - (queueId,) <$> case cmd of +verifyTransmission :: forall m. (MonadUnliftIO m, MonadReader Env m) => Transmission -> m Signed +verifyTransmission (signature, (corrId, queueId, cmd)) = do + (corrId,queueId,) <$> case cmd of Cmd SBroker _ -> return $ smpErr INTERNAL -- it can only be client command, because `fromClient` was used Cmd SRecipient (NEW _) -> return cmd Cmd SRecipient _ -> withQueueRec SRecipient $ verifySignature . recipientKey @@ -121,11 +121,11 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = >>= atomically . writeTBQueue sndQ where processCommand :: Signed -> m Signed - processCommand (queueId, cmd) = do + processCommand (corrId, queueId, cmd) = do st <- asks queueStore case cmd of - Cmd SBroker END -> unsubscribeQueue $> (queueId, cmd) - Cmd SBroker _ -> return (queueId, cmd) + Cmd SBroker END -> unsubscribeQueue $> (corrId, queueId, cmd) + Cmd SBroker _ -> return (corrId, queueId, cmd) Cmd SSender (SEND msgBody) -> sendMessage st msgBody Cmd SRecipient command -> case command of NEW rKey -> createQueue st rKey @@ -136,7 +136,7 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = DEL -> delQueueAndMsgs st where createQueue :: QueueStore -> RecipientKey -> m Signed - createQueue st rKey = mkResp B.empty <$> addSubscribe + createQueue st rKey = mkResp corrId B.empty <$> addSubscribe where addSubscribe = addQueueRetry 3 >>= \case @@ -217,7 +217,7 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = q <- atomically $ getMsgQueue ms rId atomically (tryPeek q) >>= \case Nothing -> forkSub q $> ok - Just msg -> atomically setDelivered $> msgResp rId msg + Just msg -> atomically setDelivered $> msgResp corrId rId msg _ -> return ok where forkSub :: MsgQueue -> m () @@ -231,7 +231,7 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = subscriber :: MsgQueue -> m () subscriber q = atomically $ do msg <- peekMsg q - writeTBQueue sndQ $ msgResp rId msg + writeTBQueue sndQ $ msgResp B.empty rId msg setSub (\s -> s {subThread = NoSub}) void setDelivered @@ -250,16 +250,16 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = Right _ -> delMsgQueue ms queueId $> ok ok :: Signed - ok = mkResp queueId OK + ok = mkResp corrId queueId OK err :: ErrorType -> Signed - err = mkResp queueId . ERR + err = mkResp corrId queueId . ERR okResp :: Either ErrorType () -> Signed okResp = either err $ const ok - msgResp :: RecipientId -> Message -> Signed - msgResp rId Message {msgId, ts, msgBody} = mkResp rId $ MSG msgId ts msgBody + msgResp :: CorrelationId -> RecipientId -> Message -> Signed + msgResp _corrId rId Message {msgId, ts, msgBody} = mkResp _corrId rId $ MSG msgId ts msgBody randomId :: (MonadUnliftIO m, MonadReader Env m) => Int -> m Encoded randomId n = do diff --git a/src/Simplex/Messaging/Server/Transmission.hs b/src/Simplex/Messaging/Server/Transmission.hs index 11e5b250f..9287e9254 100644 --- a/src/Simplex/Messaging/Server/Transmission.hs +++ b/src/Simplex/Messaging/Server/Transmission.hs @@ -33,15 +33,15 @@ data Cmd where deriving instance Show Cmd -type Signed = (QueueId, Cmd) +type Signed = (CorrelationId, QueueId, Cmd) type Transmission = (Signature, Signed) -type SignedOrError = (QueueId, Either ErrorType Cmd) +type SignedOrError = (CorrelationId, QueueId, Either ErrorType Cmd) type TransmissionOrError = (Signature, SignedOrError) -type RawTransmission = (ByteString, ByteString, ByteString) +type RawTransmission = (ByteString, ByteString, ByteString, ByteString) data Command (a :: Party) where NEW :: RecipientKey -> Command Recipient @@ -137,6 +137,8 @@ serializeCommand = \case type Encoded = ByteString +type CorrelationId = ByteString + type PublicKey = Encoded type Signature = Encoded diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index c0a8d2a44..193ec99d4 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -80,20 +80,22 @@ getBytes :: MonadIO m => Handle -> Int -> m ByteString getBytes h = liftIO . B.hGet h tPutRaw :: MonadIO m => Handle -> RawTransmission -> m () -tPutRaw h (signature, queueId, command) = do - putLn h (encode signature) - putLn h (encode queueId) +tPutRaw h (signature, corrId, queueId, command) = do + putLn h signature + putLn h corrId + putLn h queueId putLn h command -tGetRaw :: MonadIO m => Handle -> m (Either String RawTransmission) +tGetRaw :: MonadIO m => Handle -> m RawTransmission tGetRaw h = do - signature <- decode <$> getLn h - queueId <- decode <$> getLn h + signature <- getLn h + corrId <- getLn h + queueId <- getLn h command <- getLn h - return $ liftM2 (,,command) signature queueId + return (signature, corrId, queueId, command) tPut :: MonadIO m => Handle -> Transmission -> m () -tPut h (signature, (queueId, command)) = tPutRaw h (signature, queueId, serializeCommand command) +tPut h (signature, (corrId, queueId, command)) = tPutRaw h (encode signature, corrId, encode queueId, serializeCommand command) fromClient :: Cmd -> Either ErrorType Cmd fromClient = \case @@ -108,19 +110,22 @@ fromServer = \case -- | get client and server transmissions -- `fromParty` is used to limit allowed senders - `fromClient` or `fromServer` should be used tGet :: forall m. MonadIO m => (Cmd -> Either ErrorType Cmd) -> Handle -> m TransmissionOrError -tGet fromParty h = tGetRaw h >>= either (const tError) tParseLoadBody +tGet fromParty h = do + (signature, corrId, queueId, command) <- tGetRaw h + let decodedTransmission = liftM2 (,corrId,,command) (decode signature) (decode queueId) + either (const $ tError corrId) tParseLoadBody decodedTransmission where - tError :: m TransmissionOrError - tError = return (B.empty, (B.empty, Left $ SYNTAX errBadTransmission)) + tError :: ByteString -> m TransmissionOrError + tError corrId = return (B.empty, (corrId, B.empty, Left $ SYNTAX errBadTransmission)) tParseLoadBody :: RawTransmission -> m TransmissionOrError - tParseLoadBody t@(signature, queueId, command) = do + tParseLoadBody t@(signature, corrId, queueId, command) = do let cmd = parseCommand command >>= fromParty >>= tCredentials t fullCmd <- either (return . Left) cmdWithMsgBody cmd - return (signature, (queueId, fullCmd)) + return (signature, (corrId, queueId, fullCmd)) tCredentials :: RawTransmission -> Cmd -> Either ErrorType Cmd - tCredentials (signature, queueId, _) cmd = case cmd of + tCredentials (signature, _, queueId, _) cmd = case cmd of -- IDS response should not have queue ID Cmd SBroker (IDS _ _) -> Right cmd -- ERROR response does not always have queue ID diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 9a122e035..fccfce8fb 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -57,7 +57,7 @@ smpServerTest :: [RawTransmission] -> IO [RawTransmission] smpServerTest commands = runSmpTest \h -> mapM (sendReceive h) commands where sendReceive :: Handle -> RawTransmission -> IO RawTransmission - sendReceive h t = tPutRaw h t >> either (error "bad transmission") id <$> tGetRaw h + sendReceive h t = tPutRaw h t >> tGetRaw h smpTest :: (Handle -> IO ()) -> Expectation smpTest test' = runSmpTest test' `shouldReturn` () diff --git a/tests/Test.hs b/tests/Test.hs index 1bdd88077..4d5f43732 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -5,6 +5,7 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} +import Crypto.Random import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B @@ -28,10 +29,16 @@ main = hspec do describe "switch subscription to another SMP queue" testSwitchSub pattern Resp :: QueueId -> Command 'Broker -> TransmissionOrError -pattern Resp queueId command = ("", (queueId, Right (Cmd SBroker command))) +pattern Resp queueId command <- ("", (_, queueId, Right (Cmd SBroker command))) -sendRecv :: Handle -> RawTransmission -> IO TransmissionOrError -sendRecv h (sgn, qId, cmd) = tPutRaw h (fromRight "" $ decode sgn, qId, cmd) >> tGet fromServer h +sendRecv :: Handle -> (ByteString, ByteString, ByteString) -> IO TransmissionOrError +sendRecv h (sgn, qId, cmd) = do + corrId <- encode <$> getRandomBytes 3 + tPutRaw h (sgn, corrId, encode qId, cmd) + t@(_, (corrId', _, _)) <- tGet fromServer h + if corrId == corrId' + then return t + else error "response correlation ID does not match request" (>#>) :: [RawTransmission] -> [RawTransmission] -> Expectation commands >#> responses = smpServerTest commands `shouldReturn` responses @@ -238,36 +245,36 @@ testSwitchSub = syntaxTests :: SpecWith () syntaxTests = do - it "unknown command" $ [("", "1234", "HELLO")] >#> [("", "1234", "ERR UNKNOWN")] + it "unknown command" $ [("", "abcd", "1234", "HELLO")] >#> [("", "abcd", "1234", "ERR UNKNOWN")] describe "NEW" do - it "no parameters" $ [("", "", "NEW")] >#> [("", "", "ERR SYNTAX 2")] - it "many parameters" $ [("", "", "NEW 1 2")] >#> [("", "", "ERR SYNTAX 2")] - it "has signature" $ [("1234", "", "NEW 1234")] >#> [("", "", "ERR SYNTAX 4")] - it "queue ID" $ [("", "1", "NEW 1234")] >#> [("", "1", "ERR SYNTAX 4")] + it "no parameters" $ [("", "bcda", "", "NEW")] >#> [("", "bcda", "", "ERR SYNTAX 2")] + it "many parameters" $ [("", "cdab", "", "NEW 1 2")] >#> [("", "cdab", "", "ERR SYNTAX 2")] + it "has signature" $ [("1234", "dabc", "", "NEW 1234")] >#> [("", "dabc", "", "ERR SYNTAX 4")] + it "queue ID" $ [("", "abcd", "12345678", "NEW 1234")] >#> [("", "abcd", "12345678", "ERR SYNTAX 4")] describe "KEY" do - it "valid syntax" $ [("1234", "1", "KEY 4567")] >#> [("", "1", "ERR AUTH")] - it "no parameters" $ [("1234", "1", "KEY")] >#> [("", "1", "ERR SYNTAX 2")] - it "many parameters" $ [("1234", "1", "KEY 1 2")] >#> [("", "1", "ERR SYNTAX 2")] - it "no signature" $ [("", "1", "KEY 4567")] >#> [("", "1", "ERR SYNTAX 3")] - it "no queue ID" $ [("1234", "", "KEY 4567")] >#> [("", "", "ERR SYNTAX 3")] + it "valid syntax" $ [("1234", "bcda", "12345678", "KEY 4567")] >#> [("", "bcda", "12345678", "ERR AUTH")] + it "no parameters" $ [("1234", "cdab", "12345678", "KEY")] >#> [("", "cdab", "12345678", "ERR SYNTAX 2")] + it "many parameters" $ [("1234", "dabc", "12345678", "KEY 1 2")] >#> [("", "dabc", "12345678", "ERR SYNTAX 2")] + it "no signature" $ [("", "abcd", "12345678", "KEY 4567")] >#> [("", "abcd", "12345678", "ERR SYNTAX 3")] + it "no queue ID" $ [("1234", "bcda", "", "KEY 4567")] >#> [("", "bcda", "", "ERR SYNTAX 3")] noParamsSyntaxTest "SUB" noParamsSyntaxTest "ACK" noParamsSyntaxTest "OFF" noParamsSyntaxTest "DEL" describe "SEND" do - it "valid syntax 1" $ [("1234", "1", "SEND :hello")] >#> [("", "1", "ERR AUTH")] - it "valid syntax 2" $ [("1234", "1", "SEND 11\nhello there\n")] >#> [("", "1", "ERR AUTH")] - it "no parameters" $ [("1234", "1", "SEND")] >#> [("", "1", "ERR SYNTAX 2")] - it "no queue ID" $ [("1234", "", "SEND :hello")] >#> [("", "", "ERR SYNTAX 5")] - it "bad message body 1" $ [("1234", "1", "SEND 11 hello")] >#> [("", "1", "ERR SYNTAX 6")] - it "bad message body 2" $ [("1234", "1", "SEND hello")] >#> [("", "1", "ERR SYNTAX 6")] - it "bigger body" $ [("1234", "1", "SEND 4\nhello\n")] >#> [("", "1", "ERR SIZE")] + it "valid syntax 1" $ [("1234", "cdab", "12345678", "SEND :hello")] >#> [("", "cdab", "12345678", "ERR AUTH")] + it "valid syntax 2" $ [("1234", "dabc", "12345678", "SEND 11\nhello there\n")] >#> [("", "dabc", "12345678", "ERR AUTH")] + it "no parameters" $ [("1234", "abcd", "12345678", "SEND")] >#> [("", "abcd", "12345678", "ERR SYNTAX 2")] + it "no queue ID" $ [("1234", "bcda", "", "SEND :hello")] >#> [("", "bcda", "", "ERR SYNTAX 5")] + it "bad message body 1" $ [("1234", "cdab", "12345678", "SEND 11 hello")] >#> [("", "cdab", "12345678", "ERR SYNTAX 6")] + it "bad message body 2" $ [("1234", "dabc", "12345678", "SEND hello")] >#> [("", "dabc", "12345678", "ERR SYNTAX 6")] + it "bigger body" $ [("1234", "abcd", "12345678", "SEND 4\nhello\n")] >#> [("", "abcd", "12345678", "ERR SIZE")] describe "broker response not allowed" do - it "OK" $ [("1234", "1", "OK")] >#> [("", "1", "ERR PROHIBITED")] + it "OK" $ [("1234", "bcda", "12345678", "OK")] >#> [("", "bcda", "12345678", "ERR PROHIBITED")] where noParamsSyntaxTest :: ByteString -> SpecWith () noParamsSyntaxTest cmd = describe (B.unpack cmd) do - it "valid syntax" $ [("1234", "1", cmd)] >#> [("", "1", "ERR AUTH")] - it "parameters" $ [("1234", "1", cmd <> " 1")] >#> [("", "1", "ERR SYNTAX 2")] - it "no signature" $ [("", "1", cmd)] >#> [("", "1", "ERR SYNTAX 3")] - it "no queue ID" $ [("1234", "", cmd)] >#> [("", "", "ERR SYNTAX 3")] + it "valid syntax" $ [("1234", "abcd", "12345678", cmd)] >#> [("", "abcd", "12345678", "ERR AUTH")] + it "parameters" $ [("1234", "bcda", "12345678", cmd <> " 1")] >#> [("", "bcda", "12345678", "ERR SYNTAX 2")] + it "no signature" $ [("", "cdab", "12345678", cmd)] >#> [("", "cdab", "12345678", "ERR SYNTAX 3")] + it "no queue ID" $ [("1234", "dabc", "", cmd)] >#> [("", "dabc", "", "ERR SYNTAX 3")] From e7581a91a8cfabdfd69839ef531447cd8c35be98 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 28 Dec 2020 15:39:28 +0000 Subject: [PATCH 2/4] test: update tests to include correclation ID in the tests themselves --- tests/Test.hs | 146 ++++++++++++++++++++++++-------------------------- 1 file changed, 69 insertions(+), 77 deletions(-) diff --git a/tests/Test.hs b/tests/Test.hs index 4d5f43732..ca27be2bf 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -5,11 +5,9 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} -import Crypto.Random import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import Data.Either import SMPClient import Simplex.Messaging.Server.Transmission import Simplex.Messaging.Transport @@ -28,17 +26,11 @@ main = hspec do describe "duplex communication over 2 SMP connections" testDuplex describe "switch subscription to another SMP queue" testSwitchSub -pattern Resp :: QueueId -> Command 'Broker -> TransmissionOrError -pattern Resp queueId command <- ("", (_, queueId, Right (Cmd SBroker command))) +pattern Resp :: CorrelationId -> QueueId -> Command 'Broker -> TransmissionOrError +pattern Resp corrId queueId command <- ("", (corrId, queueId, Right (Cmd SBroker command))) -sendRecv :: Handle -> (ByteString, ByteString, ByteString) -> IO TransmissionOrError -sendRecv h (sgn, qId, cmd) = do - corrId <- encode <$> getRandomBytes 3 - tPutRaw h (sgn, corrId, encode qId, cmd) - t@(_, (corrId', _, _)) <- tGet fromServer h - if corrId == corrId' - then return t - else error "response correlation ID does not match request" +sendRecv :: Handle -> (ByteString, ByteString, ByteString, ByteString) -> IO TransmissionOrError +sendRecv h (sgn, corrId, qId, cmd) = tPutRaw h (sgn, corrId, encode qId, cmd) >> tGet fromServer h (>#>) :: [RawTransmission] -> [RawTransmission] -> Expectation commands >#> responses = smpServerTest commands `shouldReturn` responses @@ -50,193 +42,193 @@ testCreateSecure :: SpecWith () testCreateSecure = it "should create (NEW) and secure (KEY) queue" $ smpTest \h -> do - Resp rId1 (IDS rId sId) <- sendRecv h ("", "", "NEW 1234") + Resp "abcd" rId1 (IDS rId sId) <- sendRecv h ("", "abcd", "", "NEW 1234") (rId1, "") #== "creates queue" - Resp sId1 ok1 <- sendRecv h ("", sId, "SEND :hello") + Resp "bcda" sId1 ok1 <- sendRecv h ("", "bcda", sId, "SEND :hello") (ok1, OK) #== "accepts unsigned SEND" (sId1, sId) #== "same queue ID in response 1" - Resp _ (MSG _ _ msg1) <- tGet fromServer h + Resp "" _ (MSG _ _ msg1) <- tGet fromServer h (msg1, "hello") #== "delivers message" - Resp _ ok4 <- sendRecv h ("1234", rId, "ACK") + Resp "cdab" _ ok4 <- sendRecv h ("1234", "cdab", rId, "ACK") (ok4, OK) #== "replies OK when message acknowledged if no more messages" - Resp _ err6 <- sendRecv h ("1234", rId, "ACK") + Resp "dabc" _ err6 <- sendRecv h ("1234", "dabc", rId, "ACK") (err6, ERR PROHIBITED) #== "replies ERR when message acknowledged without messages" - Resp sId2 err1 <- sendRecv h ("4567", sId, "SEND :hello") + Resp "abcd" sId2 err1 <- sendRecv h ("4567", "abcd", sId, "SEND :hello") (err1, ERR AUTH) #== "rejects signed SEND" (sId2, sId) #== "same queue ID in response 2" - Resp _ err2 <- sendRecv h ("12345678", rId, "KEY 4567") + Resp "bcda" _ err2 <- sendRecv h ("12345678", "bcda", rId, "KEY 4567") (err2, ERR AUTH) #== "rejects KEY with wrong signature (password atm)" - Resp _ err3 <- sendRecv h ("1234", sId, "KEY 4567") + Resp "cdab" _ err3 <- sendRecv h ("1234", "cdab", sId, "KEY 4567") (err3, ERR AUTH) #== "rejects KEY with sender's ID" - Resp rId2 ok2 <- sendRecv h ("1234", rId, "KEY 4567") + Resp "dabc" rId2 ok2 <- sendRecv h ("1234", "dabc", rId, "KEY 4567") (ok2, OK) #== "secures queue" (rId2, rId) #== "same queue ID in response 3" - Resp _ err4 <- sendRecv h ("1234", rId, "KEY 4567") + Resp "abcd" _ err4 <- sendRecv h ("1234", "abcd", rId, "KEY 4567") (err4, ERR AUTH) #== "rejects KEY if already secured" - Resp _ ok3 <- sendRecv h ("4567", sId, "SEND 11\nhello again") + Resp "bcda" _ ok3 <- sendRecv h ("4567", "bcda", sId, "SEND 11\nhello again") (ok3, OK) #== "accepts signed SEND" - Resp _ (MSG _ _ msg) <- tGet fromServer h + Resp "" _ (MSG _ _ msg) <- tGet fromServer h (msg, "hello again") #== "delivers message 2" - Resp _ ok5 <- sendRecv h ("1234", rId, "ACK") + Resp "cdab" _ ok5 <- sendRecv h ("1234", "cdab", rId, "ACK") (ok5, OK) #== "replies OK when message acknowledged 2" - Resp _ err5 <- sendRecv h ("", sId, "SEND :hello") + Resp "dabc" _ err5 <- sendRecv h ("", "dabc", sId, "SEND :hello") (err5, ERR AUTH) #== "rejects unsigned SEND" testCreateDelete :: SpecWith () testCreateDelete = it "should create (NEW), suspend (OFF) and delete (DEL) queue" $ smpTest2 \rh sh -> do - Resp rId1 (IDS rId sId) <- sendRecv rh ("", "", "NEW 1234") + Resp "abcd" rId1 (IDS rId sId) <- sendRecv rh ("", "abcd", "", "NEW 1234") (rId1, "") #== "creates queue" - Resp _ ok1 <- sendRecv rh ("1234", rId, "KEY 4567") + Resp "bcda" _ ok1 <- sendRecv rh ("1234", "bcda", rId, "KEY 4567") (ok1, OK) #== "secures queue" - Resp _ ok2 <- sendRecv sh ("4567", sId, "SEND :hello") + Resp "cdab" _ ok2 <- sendRecv sh ("4567", "cdab", sId, "SEND :hello") (ok2, OK) #== "accepts signed SEND" - Resp _ ok7 <- sendRecv sh ("4567", sId, "SEND :hello 2") + Resp "dabc" _ ok7 <- sendRecv sh ("4567", "dabc", sId, "SEND :hello 2") (ok7, OK) #== "accepts signed SEND 2 - this message is not delivered because the first is not ACKed" - Resp _ (MSG _ _ msg1) <- tGet fromServer rh + Resp "" _ (MSG _ _ msg1) <- tGet fromServer rh (msg1, "hello") #== "delivers message" - Resp _ err1 <- sendRecv rh ("12345678", rId, "OFF") + Resp "abcd" _ err1 <- sendRecv rh ("12345678", "abcd", rId, "OFF") (err1, ERR AUTH) #== "rejects OFF with wrong signature (password atm)" - Resp _ err2 <- sendRecv rh ("1234", sId, "OFF") + Resp "bcda" _ err2 <- sendRecv rh ("1234", "bcda", sId, "OFF") (err2, ERR AUTH) #== "rejects OFF with sender's ID" - Resp rId2 ok3 <- sendRecv rh ("1234", rId, "OFF") + Resp "cdab" rId2 ok3 <- sendRecv rh ("1234", "cdab", rId, "OFF") (ok3, OK) #== "suspends queue" (rId2, rId) #== "same queue ID in response 2" - Resp _ err3 <- sendRecv sh ("4567", sId, "SEND :hello") + Resp "dabc" _ err3 <- sendRecv sh ("4567", "dabc", sId, "SEND :hello") (err3, ERR AUTH) #== "rejects signed SEND" - Resp _ err4 <- sendRecv sh ("", sId, "SEND :hello") + Resp "abcd" _ err4 <- sendRecv sh ("", "abcd", sId, "SEND :hello") (err4, ERR AUTH) #== "reject unsigned SEND too" - Resp _ ok4 <- sendRecv rh ("1234", rId, "OFF") + Resp "bcda" _ ok4 <- sendRecv rh ("1234", "bcda", rId, "OFF") (ok4, OK) #== "accepts OFF when suspended" - Resp _ (MSG _ _ msg) <- sendRecv rh ("1234", rId, "SUB") + Resp "cdab" _ (MSG _ _ msg) <- sendRecv rh ("1234", "cdab", rId, "SUB") (msg, "hello") #== "accepts SUB when suspended and delivers the message again (because was not ACKed)" - Resp _ err5 <- sendRecv rh ("12345678", rId, "DEL") + Resp "dabc" _ err5 <- sendRecv rh ("12345678", "dabc", rId, "DEL") (err5, ERR AUTH) #== "rejects DEL with wrong signature (password atm)" - Resp _ err6 <- sendRecv rh ("1234", sId, "DEL") + Resp "abcd" _ err6 <- sendRecv rh ("1234", "abcd", sId, "DEL") (err6, ERR AUTH) #== "rejects DEL with sender's ID" - Resp rId3 ok6 <- sendRecv rh ("1234", rId, "DEL") + Resp "bcda" rId3 ok6 <- sendRecv rh ("1234", "bcda", rId, "DEL") (ok6, OK) #== "deletes queue" (rId3, rId) #== "same queue ID in response 3" - Resp _ err7 <- sendRecv sh ("4567", sId, "SEND :hello") + Resp "cdab" _ err7 <- sendRecv sh ("4567", "cdab", sId, "SEND :hello") (err7, ERR AUTH) #== "rejects signed SEND when deleted" - Resp _ err8 <- sendRecv sh ("", sId, "SEND :hello") + Resp "dabc" _ err8 <- sendRecv sh ("", "dabc", sId, "SEND :hello") (err8, ERR AUTH) #== "rejects unsigned SEND too when deleted" - Resp _ err11 <- sendRecv rh ("1234", rId, "ACK") + Resp "abcd" _ err11 <- sendRecv rh ("1234", "abcd", rId, "ACK") (err11, ERR AUTH) #== "rejects ACK when conn deleted - the second message is deleted" - Resp _ err9 <- sendRecv rh ("1234", rId, "OFF") + Resp "bcda" _ err9 <- sendRecv rh ("1234", "bcda", rId, "OFF") (err9, ERR AUTH) #== "rejects OFF when deleted" - Resp _ err10 <- sendRecv rh ("1234", rId, "SUB") + Resp "cdab" _ err10 <- sendRecv rh ("1234", "cdab", rId, "SUB") (err10, ERR AUTH) #== "rejects SUB when deleted" testDuplex :: SpecWith () testDuplex = it "should create 2 simplex connections and exchange messages" $ smpTest2 \alice bob -> do - Resp _ (IDS aRcv aSnd) <- sendRecv alice ("", "", "NEW 1234") + Resp "abcd" _ (IDS aRcv aSnd) <- sendRecv alice ("", "abcd", "", "NEW 1234") -- aSnd ID is passed to Bob out-of-band - Resp _ OK <- sendRecv bob ("", aSnd, "SEND :key efgh") + Resp "bcda" _ OK <- sendRecv bob ("", "bcda", aSnd, "SEND :key efgh") -- "key efgh" is ad-hoc, different from SMP protocol - Resp _ (MSG _ _ msg1) <- tGet fromServer alice - Resp _ OK <- sendRecv alice ("1234", aRcv, "ACK") + Resp "" _ (MSG _ _ msg1) <- tGet fromServer alice + Resp "cdab" _ OK <- sendRecv alice ("1234", "cdab", aRcv, "ACK") ["key", key1] <- return $ B.words msg1 (key1, "efgh") #== "key received from Bob" - Resp _ OK <- sendRecv alice ("1234", aRcv, "KEY " <> key1) + Resp "dabc" _ OK <- sendRecv alice ("1234", "dabc", aRcv, "KEY " <> key1) - Resp _ (IDS bRcv bSnd) <- sendRecv bob ("", "", "NEW abcd") - Resp _ OK <- sendRecv bob ("efgh", aSnd, "SEND :reply_id " <> encode bSnd) + Resp "abcd" _ (IDS bRcv bSnd) <- sendRecv bob ("", "abcd", "", "NEW abcd") + Resp "bcda" _ OK <- sendRecv bob ("efgh", "bcda", aSnd, "SEND :reply_id " <> encode bSnd) -- "reply_id ..." is ad-hoc, it is not a part of SMP protocol - Resp _ (MSG _ _ msg2) <- tGet fromServer alice - Resp _ OK <- sendRecv alice ("1234", aRcv, "ACK") + Resp "" _ (MSG _ _ msg2) <- tGet fromServer alice + Resp "cdab" _ OK <- sendRecv alice ("1234", "cdab", aRcv, "ACK") ["reply_id", bId] <- return $ B.words msg2 (bId, encode bSnd) #== "reply queue ID received from Bob" - Resp _ OK <- sendRecv alice ("", bSnd, "SEND :key 5678") + Resp "dabc" _ OK <- sendRecv alice ("", "dabc", bSnd, "SEND :key 5678") -- "key 5678" is ad-hoc, different from SMP protocol - Resp _ (MSG _ _ msg3) <- tGet fromServer bob - Resp _ OK <- sendRecv bob ("abcd", bRcv, "ACK") + Resp "" _ (MSG _ _ msg3) <- tGet fromServer bob + Resp "abcd" _ OK <- sendRecv bob ("abcd", "abcd", bRcv, "ACK") ["key", key2] <- return $ B.words msg3 (key2, "5678") #== "key received from Alice" - Resp _ OK <- sendRecv bob ("abcd", bRcv, "KEY " <> key2) + Resp "bcda" _ OK <- sendRecv bob ("abcd", "bcda", bRcv, "KEY " <> key2) - Resp _ OK <- sendRecv bob ("efgh", aSnd, "SEND :hi alice") + Resp "cdab" _ OK <- sendRecv bob ("efgh", "cdab", aSnd, "SEND :hi alice") - Resp _ (MSG _ _ msg4) <- tGet fromServer alice - Resp _ OK <- sendRecv alice ("1234", aRcv, "ACK") + Resp "" _ (MSG _ _ msg4) <- tGet fromServer alice + Resp "dabc" _ OK <- sendRecv alice ("1234", "dabc", aRcv, "ACK") (msg4, "hi alice") #== "message received from Bob" - Resp _ OK <- sendRecv alice ("5678", bSnd, "SEND :how are you bob") + Resp "abcd" _ OK <- sendRecv alice ("5678", "abcd", bSnd, "SEND :how are you bob") - Resp _ (MSG _ _ msg5) <- tGet fromServer bob - Resp _ OK <- sendRecv bob ("abcd", bRcv, "ACK") + Resp "" _ (MSG _ _ msg5) <- tGet fromServer bob + Resp "bcda" _ OK <- sendRecv bob ("abcd", "bcda", bRcv, "ACK") (msg5, "how are you bob") #== "message received from alice" testSwitchSub :: SpecWith () testSwitchSub = it "should create simplex connections and switch subscription to another TCP connection" $ smpTest3 \rh1 rh2 sh -> do - Resp _ (IDS rId sId) <- sendRecv rh1 ("", "", "NEW 1234") - Resp _ ok1 <- sendRecv sh ("", sId, "SEND :test1") + Resp "abcd" _ (IDS rId sId) <- sendRecv rh1 ("", "abcd", "", "NEW 1234") + Resp "bcda" _ ok1 <- sendRecv sh ("", "bcda", sId, "SEND :test1") (ok1, OK) #== "sent test message 1" - Resp _ ok2 <- sendRecv sh ("", sId, "SEND :test2, no ACK") + Resp "cdab" _ ok2 <- sendRecv sh ("", "cdab", sId, "SEND :test2, no ACK") (ok2, OK) #== "sent test message 2" - Resp _ (MSG _ _ msg1) <- tGet fromServer rh1 + Resp "" _ (MSG _ _ msg1) <- tGet fromServer rh1 (msg1, "test1") #== "test message 1 delivered to the 1st TCP connection" - Resp _ (MSG _ _ msg2) <- sendRecv rh1 ("1234", rId, "ACK") + Resp "abcd" _ (MSG _ _ msg2) <- sendRecv rh1 ("1234", "abcd", rId, "ACK") (msg2, "test2, no ACK") #== "test message 2 delivered, no ACK" - Resp _ (MSG _ _ msg2') <- sendRecv rh2 ("1234", rId, "SUB") + Resp "bcda" _ (MSG _ _ msg2') <- sendRecv rh2 ("1234", "bcda", rId, "SUB") (msg2', "test2, no ACK") #== "same simplex queue via another TCP connection, tes2 delivered again (no ACK in 1st queue)" - Resp _ OK <- sendRecv rh2 ("1234", rId, "ACK") + Resp "cdab" _ OK <- sendRecv rh2 ("1234", "cdab", rId, "ACK") - Resp _ end <- tGet fromServer rh1 + Resp "" _ end <- tGet fromServer rh1 (end, END) #== "unsubscribed the 1st TCP connection" - Resp _ OK <- sendRecv sh ("", sId, "SEND :test3") + Resp "dabc" _ OK <- sendRecv sh ("", "dabc", sId, "SEND :test3") - Resp _ (MSG _ _ msg3) <- tGet fromServer rh2 + Resp "" _ (MSG _ _ msg3) <- tGet fromServer rh2 (msg3, "test3") #== "delivered to the 2nd TCP connection" - Resp _ err <- sendRecv rh1 ("1234", rId, "ACK") + Resp "abcd" _ err <- sendRecv rh1 ("1234", "abcd", rId, "ACK") (err, ERR PROHIBITED) #== "rejects ACK from the 1st TCP connection" - Resp _ ok3 <- sendRecv rh2 ("1234", rId, "ACK") + Resp "bcda" _ ok3 <- sendRecv rh2 ("1234", "bcda", rId, "ACK") (ok3, OK) #== "accepts ACK from the 2nd TCP connection" timeout 1000 (tGet fromServer rh1) >>= \case From 7f1d3da202af8790838c54b3aa50cf14a579f7b8 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 28 Dec 2020 16:24:58 +0000 Subject: [PATCH 3/4] refactor msgResp helper function --- src/Simplex/Messaging/Server.hs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 5508081c3..a54a5a62c 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -217,7 +217,7 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = q <- atomically $ getMsgQueue ms rId atomically (tryPeek q) >>= \case Nothing -> forkSub q $> ok - Just msg -> atomically setDelivered $> msgResp corrId rId msg + Just msg -> atomically setDelivered $> mkResp corrId rId (msgCmd msg) _ -> return ok where forkSub :: MsgQueue -> m () @@ -231,7 +231,7 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = subscriber :: MsgQueue -> m () subscriber q = atomically $ do msg <- peekMsg q - writeTBQueue sndQ $ msgResp B.empty rId msg + writeTBQueue sndQ $ mkResp B.empty rId (msgCmd msg) setSub (\s -> s {subThread = NoSub}) void setDelivered @@ -258,8 +258,8 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = okResp :: Either ErrorType () -> Signed okResp = either err $ const ok - msgResp :: CorrelationId -> RecipientId -> Message -> Signed - msgResp _corrId rId Message {msgId, ts, msgBody} = mkResp _corrId rId $ MSG msgId ts msgBody + msgCmd :: Message -> Command 'Broker + msgCmd Message {msgId, ts, msgBody} = MSG msgId ts msgBody randomId :: (MonadUnliftIO m, MonadReader Env m) => Int -> m Encoded randomId n = do From f61ad27fcd38e1f908824300a5e4120d5afe3c98 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 28 Dec 2020 16:28:57 +0000 Subject: [PATCH 4/4] docs: add correlation IDs to examples --- README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/README.md b/README.md index ef15a71f2..09ed01cd7 100644 --- a/README.md +++ b/README.md @@ -33,9 +33,11 @@ Comments are prefixed with `--`, they are not part of transmissions. ```telnet > +> abcd -- correlation ID, any string > > NEW 1234 -- 1234 is recipient's key +abcd IDS QuCLU4YxgS7wcPFA YB4CCATREHkaQcEh -- recipient and sender IDs for the queue ``` @@ -44,9 +46,11 @@ IDS QuCLU4YxgS7wcPFA YB4CCATREHkaQcEh -- recipient and sender IDs for the queue ```telnet > -- no signature (just press enter) +> bcda -- correlation ID, any string > YB4CCATREHkaQcEh -- sender ID for the queue > SEND :key abcd +bcda YB4CCATREHkaQcEh OK ``` @@ -55,9 +59,11 @@ OK ```telnet > 1234 -- recipient's "signature" - same as "key" in the demo +> cdab > QuCLU4YxgS7wcPFA -- recipient ID > KEY abcd -- "key" provided by sender +cdab QuCLU4YxgS7wcPFA OK ``` @@ -66,9 +72,11 @@ OK ```telnet > abcd -- sender's "signature" - same as "key" in the demo +> dabc -- correlation ID > YB4CCATREHkaQcEh -- sender ID > SEND :hello +dabc YB4CCATREHkaQcEh OK ``` @@ -77,13 +85,16 @@ OK ```telnet +-- no correlation ID for messages delivered without client command QuCLU4YxgS7wcPFA MSG ECA3w3ID 2020-10-18T20:19:36.874Z 5 hello > 1234 +> abcd > QuCLU4YxgS7wcPFA > ACK +abcd QuCLU4YxgS7wcPFA OK ```