mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-24 14:35:22 +00:00
Merge pull request #2 from simplex-chat/correlation-id
add corellationId to SMP protocol server (WIP)
This commit is contained in:
@@ -33,9 +33,11 @@ Comments are prefixed with `--`, they are not part of transmissions.
|
||||
|
||||
```telnet
|
||||
>
|
||||
> abcd -- correlation ID, any string
|
||||
>
|
||||
> NEW 1234 -- 1234 is recipient's key
|
||||
|
||||
abcd
|
||||
|
||||
IDS QuCLU4YxgS7wcPFA YB4CCATREHkaQcEh -- recipient and sender IDs for the queue
|
||||
```
|
||||
@@ -44,9 +46,11 @@ IDS QuCLU4YxgS7wcPFA YB4CCATREHkaQcEh -- recipient and sender IDs for the queue
|
||||
|
||||
```telnet
|
||||
> -- no signature (just press enter)
|
||||
> bcda -- correlation ID, any string
|
||||
> YB4CCATREHkaQcEh -- sender ID for the queue
|
||||
> SEND :key abcd
|
||||
|
||||
bcda
|
||||
YB4CCATREHkaQcEh
|
||||
OK
|
||||
```
|
||||
@@ -55,9 +59,11 @@ OK
|
||||
|
||||
```telnet
|
||||
> 1234 -- recipient's "signature" - same as "key" in the demo
|
||||
> cdab
|
||||
> QuCLU4YxgS7wcPFA -- recipient ID
|
||||
> KEY abcd -- "key" provided by sender
|
||||
|
||||
cdab
|
||||
QuCLU4YxgS7wcPFA
|
||||
OK
|
||||
```
|
||||
@@ -66,9 +72,11 @@ OK
|
||||
|
||||
```telnet
|
||||
> abcd -- sender's "signature" - same as "key" in the demo
|
||||
> dabc -- correlation ID
|
||||
> YB4CCATREHkaQcEh -- sender ID
|
||||
> SEND :hello
|
||||
|
||||
dabc
|
||||
YB4CCATREHkaQcEh
|
||||
OK
|
||||
```
|
||||
@@ -77,13 +85,16 @@ OK
|
||||
|
||||
```telnet
|
||||
|
||||
-- no correlation ID for messages delivered without client command
|
||||
QuCLU4YxgS7wcPFA
|
||||
MSG ECA3w3ID 2020-10-18T20:19:36.874Z 5
|
||||
hello
|
||||
> 1234
|
||||
> abcd
|
||||
> QuCLU4YxgS7wcPFA
|
||||
> ACK
|
||||
|
||||
abcd
|
||||
QuCLU4YxgS7wcPFA
|
||||
OK
|
||||
```
|
||||
|
||||
@@ -51,7 +51,7 @@ runSMPServer cfg@ServerConfig {tcpPort} = do
|
||||
(rId, clnt) <- readTBQueue subscribedQ
|
||||
cs <- readTVar subscribers
|
||||
case M.lookup rId cs of
|
||||
Just Client {rcvQ} -> writeTBQueue rcvQ (rId, Cmd SBroker END)
|
||||
Just Client {rcvQ} -> writeTBQueue rcvQ (B.empty, rId, Cmd SBroker END)
|
||||
Nothing -> return ()
|
||||
writeTVar subscribers $ M.insert rId clnt cs
|
||||
|
||||
@@ -75,10 +75,10 @@ cancelSub = \case
|
||||
|
||||
receive :: (MonadUnliftIO m, MonadReader Env m) => Handle -> Client -> m ()
|
||||
receive h Client {rcvQ} = forever $ do
|
||||
(signature, (queueId, cmdOrError)) <- tGet fromClient h
|
||||
(signature, (corrId, queueId, cmdOrError)) <- tGet fromClient h
|
||||
signed <- case cmdOrError of
|
||||
Left e -> return . mkResp queueId $ ERR e
|
||||
Right cmd -> verifyTransmission signature queueId cmd
|
||||
Left e -> return . mkResp corrId queueId $ ERR e
|
||||
Right cmd -> verifyTransmission (signature, (corrId, queueId, cmd))
|
||||
atomically $ writeTBQueue rcvQ signed
|
||||
|
||||
send :: MonadUnliftIO m => Handle -> Client -> m ()
|
||||
@@ -86,12 +86,12 @@ send h Client {sndQ} = forever $ do
|
||||
signed <- atomically $ readTBQueue sndQ
|
||||
tPut h (B.empty, signed)
|
||||
|
||||
mkResp :: QueueId -> Command 'Broker -> Signed
|
||||
mkResp queueId command = (queueId, Cmd SBroker command)
|
||||
mkResp :: CorrelationId -> QueueId -> Command 'Broker -> Signed
|
||||
mkResp corrId queueId command = (corrId, queueId, Cmd SBroker command)
|
||||
|
||||
verifyTransmission :: forall m. (MonadUnliftIO m, MonadReader Env m) => Signature -> QueueId -> Cmd -> m Signed
|
||||
verifyTransmission signature queueId cmd = do
|
||||
(queueId,) <$> case cmd of
|
||||
verifyTransmission :: forall m. (MonadUnliftIO m, MonadReader Env m) => Transmission -> m Signed
|
||||
verifyTransmission (signature, (corrId, queueId, cmd)) = do
|
||||
(corrId,queueId,) <$> case cmd of
|
||||
Cmd SBroker _ -> return $ smpErr INTERNAL -- it can only be client command, because `fromClient` was used
|
||||
Cmd SRecipient (NEW _) -> return cmd
|
||||
Cmd SRecipient _ -> withQueueRec SRecipient $ verifySignature . recipientKey
|
||||
@@ -121,11 +121,11 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
>>= atomically . writeTBQueue sndQ
|
||||
where
|
||||
processCommand :: Signed -> m Signed
|
||||
processCommand (queueId, cmd) = do
|
||||
processCommand (corrId, queueId, cmd) = do
|
||||
st <- asks queueStore
|
||||
case cmd of
|
||||
Cmd SBroker END -> unsubscribeQueue $> (queueId, cmd)
|
||||
Cmd SBroker _ -> return (queueId, cmd)
|
||||
Cmd SBroker END -> unsubscribeQueue $> (corrId, queueId, cmd)
|
||||
Cmd SBroker _ -> return (corrId, queueId, cmd)
|
||||
Cmd SSender (SEND msgBody) -> sendMessage st msgBody
|
||||
Cmd SRecipient command -> case command of
|
||||
NEW rKey -> createQueue st rKey
|
||||
@@ -136,7 +136,7 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
DEL -> delQueueAndMsgs st
|
||||
where
|
||||
createQueue :: QueueStore -> RecipientKey -> m Signed
|
||||
createQueue st rKey = mkResp B.empty <$> addSubscribe
|
||||
createQueue st rKey = mkResp corrId B.empty <$> addSubscribe
|
||||
where
|
||||
addSubscribe =
|
||||
addQueueRetry 3 >>= \case
|
||||
@@ -217,7 +217,7 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
q <- atomically $ getMsgQueue ms rId
|
||||
atomically (tryPeek q) >>= \case
|
||||
Nothing -> forkSub q $> ok
|
||||
Just msg -> atomically setDelivered $> msgResp rId msg
|
||||
Just msg -> atomically setDelivered $> mkResp corrId rId (msgCmd msg)
|
||||
_ -> return ok
|
||||
where
|
||||
forkSub :: MsgQueue -> m ()
|
||||
@@ -231,7 +231,7 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
subscriber :: MsgQueue -> m ()
|
||||
subscriber q = atomically $ do
|
||||
msg <- peekMsg q
|
||||
writeTBQueue sndQ $ msgResp rId msg
|
||||
writeTBQueue sndQ $ mkResp B.empty rId (msgCmd msg)
|
||||
setSub (\s -> s {subThread = NoSub})
|
||||
void setDelivered
|
||||
|
||||
@@ -250,16 +250,16 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
Right _ -> delMsgQueue ms queueId $> ok
|
||||
|
||||
ok :: Signed
|
||||
ok = mkResp queueId OK
|
||||
ok = mkResp corrId queueId OK
|
||||
|
||||
err :: ErrorType -> Signed
|
||||
err = mkResp queueId . ERR
|
||||
err = mkResp corrId queueId . ERR
|
||||
|
||||
okResp :: Either ErrorType () -> Signed
|
||||
okResp = either err $ const ok
|
||||
|
||||
msgResp :: RecipientId -> Message -> Signed
|
||||
msgResp rId Message {msgId, ts, msgBody} = mkResp rId $ MSG msgId ts msgBody
|
||||
msgCmd :: Message -> Command 'Broker
|
||||
msgCmd Message {msgId, ts, msgBody} = MSG msgId ts msgBody
|
||||
|
||||
randomId :: (MonadUnliftIO m, MonadReader Env m) => Int -> m Encoded
|
||||
randomId n = do
|
||||
|
||||
@@ -33,15 +33,15 @@ data Cmd where
|
||||
|
||||
deriving instance Show Cmd
|
||||
|
||||
type Signed = (QueueId, Cmd)
|
||||
type Signed = (CorrelationId, QueueId, Cmd)
|
||||
|
||||
type Transmission = (Signature, Signed)
|
||||
|
||||
type SignedOrError = (QueueId, Either ErrorType Cmd)
|
||||
type SignedOrError = (CorrelationId, QueueId, Either ErrorType Cmd)
|
||||
|
||||
type TransmissionOrError = (Signature, SignedOrError)
|
||||
|
||||
type RawTransmission = (ByteString, ByteString, ByteString)
|
||||
type RawTransmission = (ByteString, ByteString, ByteString, ByteString)
|
||||
|
||||
data Command (a :: Party) where
|
||||
NEW :: RecipientKey -> Command Recipient
|
||||
@@ -137,6 +137,8 @@ serializeCommand = \case
|
||||
|
||||
type Encoded = ByteString
|
||||
|
||||
type CorrelationId = ByteString
|
||||
|
||||
type PublicKey = Encoded
|
||||
|
||||
type Signature = Encoded
|
||||
|
||||
@@ -80,20 +80,22 @@ getBytes :: MonadIO m => Handle -> Int -> m ByteString
|
||||
getBytes h = liftIO . B.hGet h
|
||||
|
||||
tPutRaw :: MonadIO m => Handle -> RawTransmission -> m ()
|
||||
tPutRaw h (signature, queueId, command) = do
|
||||
putLn h (encode signature)
|
||||
putLn h (encode queueId)
|
||||
tPutRaw h (signature, corrId, queueId, command) = do
|
||||
putLn h signature
|
||||
putLn h corrId
|
||||
putLn h queueId
|
||||
putLn h command
|
||||
|
||||
tGetRaw :: MonadIO m => Handle -> m (Either String RawTransmission)
|
||||
tGetRaw :: MonadIO m => Handle -> m RawTransmission
|
||||
tGetRaw h = do
|
||||
signature <- decode <$> getLn h
|
||||
queueId <- decode <$> getLn h
|
||||
signature <- getLn h
|
||||
corrId <- getLn h
|
||||
queueId <- getLn h
|
||||
command <- getLn h
|
||||
return $ liftM2 (,,command) signature queueId
|
||||
return (signature, corrId, queueId, command)
|
||||
|
||||
tPut :: MonadIO m => Handle -> Transmission -> m ()
|
||||
tPut h (signature, (queueId, command)) = tPutRaw h (signature, queueId, serializeCommand command)
|
||||
tPut h (signature, (corrId, queueId, command)) = tPutRaw h (encode signature, corrId, encode queueId, serializeCommand command)
|
||||
|
||||
fromClient :: Cmd -> Either ErrorType Cmd
|
||||
fromClient = \case
|
||||
@@ -108,19 +110,22 @@ fromServer = \case
|
||||
-- | get client and server transmissions
|
||||
-- `fromParty` is used to limit allowed senders - `fromClient` or `fromServer` should be used
|
||||
tGet :: forall m. MonadIO m => (Cmd -> Either ErrorType Cmd) -> Handle -> m TransmissionOrError
|
||||
tGet fromParty h = tGetRaw h >>= either (const tError) tParseLoadBody
|
||||
tGet fromParty h = do
|
||||
(signature, corrId, queueId, command) <- tGetRaw h
|
||||
let decodedTransmission = liftM2 (,corrId,,command) (decode signature) (decode queueId)
|
||||
either (const $ tError corrId) tParseLoadBody decodedTransmission
|
||||
where
|
||||
tError :: m TransmissionOrError
|
||||
tError = return (B.empty, (B.empty, Left $ SYNTAX errBadTransmission))
|
||||
tError :: ByteString -> m TransmissionOrError
|
||||
tError corrId = return (B.empty, (corrId, B.empty, Left $ SYNTAX errBadTransmission))
|
||||
|
||||
tParseLoadBody :: RawTransmission -> m TransmissionOrError
|
||||
tParseLoadBody t@(signature, queueId, command) = do
|
||||
tParseLoadBody t@(signature, corrId, queueId, command) = do
|
||||
let cmd = parseCommand command >>= fromParty >>= tCredentials t
|
||||
fullCmd <- either (return . Left) cmdWithMsgBody cmd
|
||||
return (signature, (queueId, fullCmd))
|
||||
return (signature, (corrId, queueId, fullCmd))
|
||||
|
||||
tCredentials :: RawTransmission -> Cmd -> Either ErrorType Cmd
|
||||
tCredentials (signature, queueId, _) cmd = case cmd of
|
||||
tCredentials (signature, _, queueId, _) cmd = case cmd of
|
||||
-- IDS response should not have queue ID
|
||||
Cmd SBroker (IDS _ _) -> Right cmd
|
||||
-- ERROR response does not always have queue ID
|
||||
|
||||
+1
-1
@@ -57,7 +57,7 @@ smpServerTest :: [RawTransmission] -> IO [RawTransmission]
|
||||
smpServerTest commands = runSmpTest \h -> mapM (sendReceive h) commands
|
||||
where
|
||||
sendReceive :: Handle -> RawTransmission -> IO RawTransmission
|
||||
sendReceive h t = tPutRaw h t >> either (error "bad transmission") id <$> tGetRaw h
|
||||
sendReceive h t = tPutRaw h t >> tGetRaw h
|
||||
|
||||
smpTest :: (Handle -> IO ()) -> Expectation
|
||||
smpTest test' = runSmpTest test' `shouldReturn` ()
|
||||
|
||||
+91
-92
@@ -8,7 +8,6 @@
|
||||
import Data.ByteString.Base64
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Either
|
||||
import SMPClient
|
||||
import Simplex.Messaging.Server.Transmission
|
||||
import Simplex.Messaging.Transport
|
||||
@@ -27,11 +26,11 @@ main = hspec do
|
||||
describe "duplex communication over 2 SMP connections" testDuplex
|
||||
describe "switch subscription to another SMP queue" testSwitchSub
|
||||
|
||||
pattern Resp :: QueueId -> Command 'Broker -> TransmissionOrError
|
||||
pattern Resp queueId command = ("", (queueId, Right (Cmd SBroker command)))
|
||||
pattern Resp :: CorrelationId -> QueueId -> Command 'Broker -> TransmissionOrError
|
||||
pattern Resp corrId queueId command <- ("", (corrId, queueId, Right (Cmd SBroker command)))
|
||||
|
||||
sendRecv :: Handle -> RawTransmission -> IO TransmissionOrError
|
||||
sendRecv h (sgn, qId, cmd) = tPutRaw h (fromRight "" $ decode sgn, qId, cmd) >> tGet fromServer h
|
||||
sendRecv :: Handle -> (ByteString, ByteString, ByteString, ByteString) -> IO TransmissionOrError
|
||||
sendRecv h (sgn, corrId, qId, cmd) = tPutRaw h (sgn, corrId, encode qId, cmd) >> tGet fromServer h
|
||||
|
||||
(>#>) :: [RawTransmission] -> [RawTransmission] -> Expectation
|
||||
commands >#> responses = smpServerTest commands `shouldReturn` responses
|
||||
@@ -43,193 +42,193 @@ testCreateSecure :: SpecWith ()
|
||||
testCreateSecure =
|
||||
it "should create (NEW) and secure (KEY) queue" $
|
||||
smpTest \h -> do
|
||||
Resp rId1 (IDS rId sId) <- sendRecv h ("", "", "NEW 1234")
|
||||
Resp "abcd" rId1 (IDS rId sId) <- sendRecv h ("", "abcd", "", "NEW 1234")
|
||||
(rId1, "") #== "creates queue"
|
||||
|
||||
Resp sId1 ok1 <- sendRecv h ("", sId, "SEND :hello")
|
||||
Resp "bcda" sId1 ok1 <- sendRecv h ("", "bcda", sId, "SEND :hello")
|
||||
(ok1, OK) #== "accepts unsigned SEND"
|
||||
(sId1, sId) #== "same queue ID in response 1"
|
||||
|
||||
Resp _ (MSG _ _ msg1) <- tGet fromServer h
|
||||
Resp "" _ (MSG _ _ msg1) <- tGet fromServer h
|
||||
(msg1, "hello") #== "delivers message"
|
||||
|
||||
Resp _ ok4 <- sendRecv h ("1234", rId, "ACK")
|
||||
Resp "cdab" _ ok4 <- sendRecv h ("1234", "cdab", rId, "ACK")
|
||||
(ok4, OK) #== "replies OK when message acknowledged if no more messages"
|
||||
|
||||
Resp _ err6 <- sendRecv h ("1234", rId, "ACK")
|
||||
Resp "dabc" _ err6 <- sendRecv h ("1234", "dabc", rId, "ACK")
|
||||
(err6, ERR PROHIBITED) #== "replies ERR when message acknowledged without messages"
|
||||
|
||||
Resp sId2 err1 <- sendRecv h ("4567", sId, "SEND :hello")
|
||||
Resp "abcd" sId2 err1 <- sendRecv h ("4567", "abcd", sId, "SEND :hello")
|
||||
(err1, ERR AUTH) #== "rejects signed SEND"
|
||||
(sId2, sId) #== "same queue ID in response 2"
|
||||
|
||||
Resp _ err2 <- sendRecv h ("12345678", rId, "KEY 4567")
|
||||
Resp "bcda" _ err2 <- sendRecv h ("12345678", "bcda", rId, "KEY 4567")
|
||||
(err2, ERR AUTH) #== "rejects KEY with wrong signature (password atm)"
|
||||
|
||||
Resp _ err3 <- sendRecv h ("1234", sId, "KEY 4567")
|
||||
Resp "cdab" _ err3 <- sendRecv h ("1234", "cdab", sId, "KEY 4567")
|
||||
(err3, ERR AUTH) #== "rejects KEY with sender's ID"
|
||||
|
||||
Resp rId2 ok2 <- sendRecv h ("1234", rId, "KEY 4567")
|
||||
Resp "dabc" rId2 ok2 <- sendRecv h ("1234", "dabc", rId, "KEY 4567")
|
||||
(ok2, OK) #== "secures queue"
|
||||
(rId2, rId) #== "same queue ID in response 3"
|
||||
|
||||
Resp _ err4 <- sendRecv h ("1234", rId, "KEY 4567")
|
||||
Resp "abcd" _ err4 <- sendRecv h ("1234", "abcd", rId, "KEY 4567")
|
||||
(err4, ERR AUTH) #== "rejects KEY if already secured"
|
||||
|
||||
Resp _ ok3 <- sendRecv h ("4567", sId, "SEND 11\nhello again")
|
||||
Resp "bcda" _ ok3 <- sendRecv h ("4567", "bcda", sId, "SEND 11\nhello again")
|
||||
(ok3, OK) #== "accepts signed SEND"
|
||||
|
||||
Resp _ (MSG _ _ msg) <- tGet fromServer h
|
||||
Resp "" _ (MSG _ _ msg) <- tGet fromServer h
|
||||
(msg, "hello again") #== "delivers message 2"
|
||||
|
||||
Resp _ ok5 <- sendRecv h ("1234", rId, "ACK")
|
||||
Resp "cdab" _ ok5 <- sendRecv h ("1234", "cdab", rId, "ACK")
|
||||
(ok5, OK) #== "replies OK when message acknowledged 2"
|
||||
|
||||
Resp _ err5 <- sendRecv h ("", sId, "SEND :hello")
|
||||
Resp "dabc" _ err5 <- sendRecv h ("", "dabc", sId, "SEND :hello")
|
||||
(err5, ERR AUTH) #== "rejects unsigned SEND"
|
||||
|
||||
testCreateDelete :: SpecWith ()
|
||||
testCreateDelete =
|
||||
it "should create (NEW), suspend (OFF) and delete (DEL) queue" $
|
||||
smpTest2 \rh sh -> do
|
||||
Resp rId1 (IDS rId sId) <- sendRecv rh ("", "", "NEW 1234")
|
||||
Resp "abcd" rId1 (IDS rId sId) <- sendRecv rh ("", "abcd", "", "NEW 1234")
|
||||
(rId1, "") #== "creates queue"
|
||||
|
||||
Resp _ ok1 <- sendRecv rh ("1234", rId, "KEY 4567")
|
||||
Resp "bcda" _ ok1 <- sendRecv rh ("1234", "bcda", rId, "KEY 4567")
|
||||
(ok1, OK) #== "secures queue"
|
||||
|
||||
Resp _ ok2 <- sendRecv sh ("4567", sId, "SEND :hello")
|
||||
Resp "cdab" _ ok2 <- sendRecv sh ("4567", "cdab", sId, "SEND :hello")
|
||||
(ok2, OK) #== "accepts signed SEND"
|
||||
|
||||
Resp _ ok7 <- sendRecv sh ("4567", sId, "SEND :hello 2")
|
||||
Resp "dabc" _ ok7 <- sendRecv sh ("4567", "dabc", sId, "SEND :hello 2")
|
||||
(ok7, OK) #== "accepts signed SEND 2 - this message is not delivered because the first is not ACKed"
|
||||
|
||||
Resp _ (MSG _ _ msg1) <- tGet fromServer rh
|
||||
Resp "" _ (MSG _ _ msg1) <- tGet fromServer rh
|
||||
(msg1, "hello") #== "delivers message"
|
||||
|
||||
Resp _ err1 <- sendRecv rh ("12345678", rId, "OFF")
|
||||
Resp "abcd" _ err1 <- sendRecv rh ("12345678", "abcd", rId, "OFF")
|
||||
(err1, ERR AUTH) #== "rejects OFF with wrong signature (password atm)"
|
||||
|
||||
Resp _ err2 <- sendRecv rh ("1234", sId, "OFF")
|
||||
Resp "bcda" _ err2 <- sendRecv rh ("1234", "bcda", sId, "OFF")
|
||||
(err2, ERR AUTH) #== "rejects OFF with sender's ID"
|
||||
|
||||
Resp rId2 ok3 <- sendRecv rh ("1234", rId, "OFF")
|
||||
Resp "cdab" rId2 ok3 <- sendRecv rh ("1234", "cdab", rId, "OFF")
|
||||
(ok3, OK) #== "suspends queue"
|
||||
(rId2, rId) #== "same queue ID in response 2"
|
||||
|
||||
Resp _ err3 <- sendRecv sh ("4567", sId, "SEND :hello")
|
||||
Resp "dabc" _ err3 <- sendRecv sh ("4567", "dabc", sId, "SEND :hello")
|
||||
(err3, ERR AUTH) #== "rejects signed SEND"
|
||||
|
||||
Resp _ err4 <- sendRecv sh ("", sId, "SEND :hello")
|
||||
Resp "abcd" _ err4 <- sendRecv sh ("", "abcd", sId, "SEND :hello")
|
||||
(err4, ERR AUTH) #== "reject unsigned SEND too"
|
||||
|
||||
Resp _ ok4 <- sendRecv rh ("1234", rId, "OFF")
|
||||
Resp "bcda" _ ok4 <- sendRecv rh ("1234", "bcda", rId, "OFF")
|
||||
(ok4, OK) #== "accepts OFF when suspended"
|
||||
|
||||
Resp _ (MSG _ _ msg) <- sendRecv rh ("1234", rId, "SUB")
|
||||
Resp "cdab" _ (MSG _ _ msg) <- sendRecv rh ("1234", "cdab", rId, "SUB")
|
||||
(msg, "hello") #== "accepts SUB when suspended and delivers the message again (because was not ACKed)"
|
||||
|
||||
Resp _ err5 <- sendRecv rh ("12345678", rId, "DEL")
|
||||
Resp "dabc" _ err5 <- sendRecv rh ("12345678", "dabc", rId, "DEL")
|
||||
(err5, ERR AUTH) #== "rejects DEL with wrong signature (password atm)"
|
||||
|
||||
Resp _ err6 <- sendRecv rh ("1234", sId, "DEL")
|
||||
Resp "abcd" _ err6 <- sendRecv rh ("1234", "abcd", sId, "DEL")
|
||||
(err6, ERR AUTH) #== "rejects DEL with sender's ID"
|
||||
|
||||
Resp rId3 ok6 <- sendRecv rh ("1234", rId, "DEL")
|
||||
Resp "bcda" rId3 ok6 <- sendRecv rh ("1234", "bcda", rId, "DEL")
|
||||
(ok6, OK) #== "deletes queue"
|
||||
(rId3, rId) #== "same queue ID in response 3"
|
||||
|
||||
Resp _ err7 <- sendRecv sh ("4567", sId, "SEND :hello")
|
||||
Resp "cdab" _ err7 <- sendRecv sh ("4567", "cdab", sId, "SEND :hello")
|
||||
(err7, ERR AUTH) #== "rejects signed SEND when deleted"
|
||||
|
||||
Resp _ err8 <- sendRecv sh ("", sId, "SEND :hello")
|
||||
Resp "dabc" _ err8 <- sendRecv sh ("", "dabc", sId, "SEND :hello")
|
||||
(err8, ERR AUTH) #== "rejects unsigned SEND too when deleted"
|
||||
|
||||
Resp _ err11 <- sendRecv rh ("1234", rId, "ACK")
|
||||
Resp "abcd" _ err11 <- sendRecv rh ("1234", "abcd", rId, "ACK")
|
||||
(err11, ERR AUTH) #== "rejects ACK when conn deleted - the second message is deleted"
|
||||
|
||||
Resp _ err9 <- sendRecv rh ("1234", rId, "OFF")
|
||||
Resp "bcda" _ err9 <- sendRecv rh ("1234", "bcda", rId, "OFF")
|
||||
(err9, ERR AUTH) #== "rejects OFF when deleted"
|
||||
|
||||
Resp _ err10 <- sendRecv rh ("1234", rId, "SUB")
|
||||
Resp "cdab" _ err10 <- sendRecv rh ("1234", "cdab", rId, "SUB")
|
||||
(err10, ERR AUTH) #== "rejects SUB when deleted"
|
||||
|
||||
testDuplex :: SpecWith ()
|
||||
testDuplex =
|
||||
it "should create 2 simplex connections and exchange messages" $
|
||||
smpTest2 \alice bob -> do
|
||||
Resp _ (IDS aRcv aSnd) <- sendRecv alice ("", "", "NEW 1234")
|
||||
Resp "abcd" _ (IDS aRcv aSnd) <- sendRecv alice ("", "abcd", "", "NEW 1234")
|
||||
-- aSnd ID is passed to Bob out-of-band
|
||||
|
||||
Resp _ OK <- sendRecv bob ("", aSnd, "SEND :key efgh")
|
||||
Resp "bcda" _ OK <- sendRecv bob ("", "bcda", aSnd, "SEND :key efgh")
|
||||
-- "key efgh" is ad-hoc, different from SMP protocol
|
||||
|
||||
Resp _ (MSG _ _ msg1) <- tGet fromServer alice
|
||||
Resp _ OK <- sendRecv alice ("1234", aRcv, "ACK")
|
||||
Resp "" _ (MSG _ _ msg1) <- tGet fromServer alice
|
||||
Resp "cdab" _ OK <- sendRecv alice ("1234", "cdab", aRcv, "ACK")
|
||||
["key", key1] <- return $ B.words msg1
|
||||
(key1, "efgh") #== "key received from Bob"
|
||||
Resp _ OK <- sendRecv alice ("1234", aRcv, "KEY " <> key1)
|
||||
Resp "dabc" _ OK <- sendRecv alice ("1234", "dabc", aRcv, "KEY " <> key1)
|
||||
|
||||
Resp _ (IDS bRcv bSnd) <- sendRecv bob ("", "", "NEW abcd")
|
||||
Resp _ OK <- sendRecv bob ("efgh", aSnd, "SEND :reply_id " <> encode bSnd)
|
||||
Resp "abcd" _ (IDS bRcv bSnd) <- sendRecv bob ("", "abcd", "", "NEW abcd")
|
||||
Resp "bcda" _ OK <- sendRecv bob ("efgh", "bcda", aSnd, "SEND :reply_id " <> encode bSnd)
|
||||
-- "reply_id ..." is ad-hoc, it is not a part of SMP protocol
|
||||
|
||||
Resp _ (MSG _ _ msg2) <- tGet fromServer alice
|
||||
Resp _ OK <- sendRecv alice ("1234", aRcv, "ACK")
|
||||
Resp "" _ (MSG _ _ msg2) <- tGet fromServer alice
|
||||
Resp "cdab" _ OK <- sendRecv alice ("1234", "cdab", aRcv, "ACK")
|
||||
["reply_id", bId] <- return $ B.words msg2
|
||||
(bId, encode bSnd) #== "reply queue ID received from Bob"
|
||||
Resp _ OK <- sendRecv alice ("", bSnd, "SEND :key 5678")
|
||||
Resp "dabc" _ OK <- sendRecv alice ("", "dabc", bSnd, "SEND :key 5678")
|
||||
-- "key 5678" is ad-hoc, different from SMP protocol
|
||||
|
||||
Resp _ (MSG _ _ msg3) <- tGet fromServer bob
|
||||
Resp _ OK <- sendRecv bob ("abcd", bRcv, "ACK")
|
||||
Resp "" _ (MSG _ _ msg3) <- tGet fromServer bob
|
||||
Resp "abcd" _ OK <- sendRecv bob ("abcd", "abcd", bRcv, "ACK")
|
||||
["key", key2] <- return $ B.words msg3
|
||||
(key2, "5678") #== "key received from Alice"
|
||||
Resp _ OK <- sendRecv bob ("abcd", bRcv, "KEY " <> key2)
|
||||
Resp "bcda" _ OK <- sendRecv bob ("abcd", "bcda", bRcv, "KEY " <> key2)
|
||||
|
||||
Resp _ OK <- sendRecv bob ("efgh", aSnd, "SEND :hi alice")
|
||||
Resp "cdab" _ OK <- sendRecv bob ("efgh", "cdab", aSnd, "SEND :hi alice")
|
||||
|
||||
Resp _ (MSG _ _ msg4) <- tGet fromServer alice
|
||||
Resp _ OK <- sendRecv alice ("1234", aRcv, "ACK")
|
||||
Resp "" _ (MSG _ _ msg4) <- tGet fromServer alice
|
||||
Resp "dabc" _ OK <- sendRecv alice ("1234", "dabc", aRcv, "ACK")
|
||||
(msg4, "hi alice") #== "message received from Bob"
|
||||
|
||||
Resp _ OK <- sendRecv alice ("5678", bSnd, "SEND :how are you bob")
|
||||
Resp "abcd" _ OK <- sendRecv alice ("5678", "abcd", bSnd, "SEND :how are you bob")
|
||||
|
||||
Resp _ (MSG _ _ msg5) <- tGet fromServer bob
|
||||
Resp _ OK <- sendRecv bob ("abcd", bRcv, "ACK")
|
||||
Resp "" _ (MSG _ _ msg5) <- tGet fromServer bob
|
||||
Resp "bcda" _ OK <- sendRecv bob ("abcd", "bcda", bRcv, "ACK")
|
||||
(msg5, "how are you bob") #== "message received from alice"
|
||||
|
||||
testSwitchSub :: SpecWith ()
|
||||
testSwitchSub =
|
||||
it "should create simplex connections and switch subscription to another TCP connection" $
|
||||
smpTest3 \rh1 rh2 sh -> do
|
||||
Resp _ (IDS rId sId) <- sendRecv rh1 ("", "", "NEW 1234")
|
||||
Resp _ ok1 <- sendRecv sh ("", sId, "SEND :test1")
|
||||
Resp "abcd" _ (IDS rId sId) <- sendRecv rh1 ("", "abcd", "", "NEW 1234")
|
||||
Resp "bcda" _ ok1 <- sendRecv sh ("", "bcda", sId, "SEND :test1")
|
||||
(ok1, OK) #== "sent test message 1"
|
||||
Resp _ ok2 <- sendRecv sh ("", sId, "SEND :test2, no ACK")
|
||||
Resp "cdab" _ ok2 <- sendRecv sh ("", "cdab", sId, "SEND :test2, no ACK")
|
||||
(ok2, OK) #== "sent test message 2"
|
||||
|
||||
Resp _ (MSG _ _ msg1) <- tGet fromServer rh1
|
||||
Resp "" _ (MSG _ _ msg1) <- tGet fromServer rh1
|
||||
(msg1, "test1") #== "test message 1 delivered to the 1st TCP connection"
|
||||
Resp _ (MSG _ _ msg2) <- sendRecv rh1 ("1234", rId, "ACK")
|
||||
Resp "abcd" _ (MSG _ _ msg2) <- sendRecv rh1 ("1234", "abcd", rId, "ACK")
|
||||
(msg2, "test2, no ACK") #== "test message 2 delivered, no ACK"
|
||||
|
||||
Resp _ (MSG _ _ msg2') <- sendRecv rh2 ("1234", rId, "SUB")
|
||||
Resp "bcda" _ (MSG _ _ msg2') <- sendRecv rh2 ("1234", "bcda", rId, "SUB")
|
||||
(msg2', "test2, no ACK") #== "same simplex queue via another TCP connection, tes2 delivered again (no ACK in 1st queue)"
|
||||
Resp _ OK <- sendRecv rh2 ("1234", rId, "ACK")
|
||||
Resp "cdab" _ OK <- sendRecv rh2 ("1234", "cdab", rId, "ACK")
|
||||
|
||||
Resp _ end <- tGet fromServer rh1
|
||||
Resp "" _ end <- tGet fromServer rh1
|
||||
(end, END) #== "unsubscribed the 1st TCP connection"
|
||||
|
||||
Resp _ OK <- sendRecv sh ("", sId, "SEND :test3")
|
||||
Resp "dabc" _ OK <- sendRecv sh ("", "dabc", sId, "SEND :test3")
|
||||
|
||||
Resp _ (MSG _ _ msg3) <- tGet fromServer rh2
|
||||
Resp "" _ (MSG _ _ msg3) <- tGet fromServer rh2
|
||||
(msg3, "test3") #== "delivered to the 2nd TCP connection"
|
||||
|
||||
Resp _ err <- sendRecv rh1 ("1234", rId, "ACK")
|
||||
Resp "abcd" _ err <- sendRecv rh1 ("1234", "abcd", rId, "ACK")
|
||||
(err, ERR PROHIBITED) #== "rejects ACK from the 1st TCP connection"
|
||||
|
||||
Resp _ ok3 <- sendRecv rh2 ("1234", rId, "ACK")
|
||||
Resp "bcda" _ ok3 <- sendRecv rh2 ("1234", "bcda", rId, "ACK")
|
||||
(ok3, OK) #== "accepts ACK from the 2nd TCP connection"
|
||||
|
||||
timeout 1000 (tGet fromServer rh1) >>= \case
|
||||
@@ -238,36 +237,36 @@ testSwitchSub =
|
||||
|
||||
syntaxTests :: SpecWith ()
|
||||
syntaxTests = do
|
||||
it "unknown command" $ [("", "1234", "HELLO")] >#> [("", "1234", "ERR UNKNOWN")]
|
||||
it "unknown command" $ [("", "abcd", "1234", "HELLO")] >#> [("", "abcd", "1234", "ERR UNKNOWN")]
|
||||
describe "NEW" do
|
||||
it "no parameters" $ [("", "", "NEW")] >#> [("", "", "ERR SYNTAX 2")]
|
||||
it "many parameters" $ [("", "", "NEW 1 2")] >#> [("", "", "ERR SYNTAX 2")]
|
||||
it "has signature" $ [("1234", "", "NEW 1234")] >#> [("", "", "ERR SYNTAX 4")]
|
||||
it "queue ID" $ [("", "1", "NEW 1234")] >#> [("", "1", "ERR SYNTAX 4")]
|
||||
it "no parameters" $ [("", "bcda", "", "NEW")] >#> [("", "bcda", "", "ERR SYNTAX 2")]
|
||||
it "many parameters" $ [("", "cdab", "", "NEW 1 2")] >#> [("", "cdab", "", "ERR SYNTAX 2")]
|
||||
it "has signature" $ [("1234", "dabc", "", "NEW 1234")] >#> [("", "dabc", "", "ERR SYNTAX 4")]
|
||||
it "queue ID" $ [("", "abcd", "12345678", "NEW 1234")] >#> [("", "abcd", "12345678", "ERR SYNTAX 4")]
|
||||
describe "KEY" do
|
||||
it "valid syntax" $ [("1234", "1", "KEY 4567")] >#> [("", "1", "ERR AUTH")]
|
||||
it "no parameters" $ [("1234", "1", "KEY")] >#> [("", "1", "ERR SYNTAX 2")]
|
||||
it "many parameters" $ [("1234", "1", "KEY 1 2")] >#> [("", "1", "ERR SYNTAX 2")]
|
||||
it "no signature" $ [("", "1", "KEY 4567")] >#> [("", "1", "ERR SYNTAX 3")]
|
||||
it "no queue ID" $ [("1234", "", "KEY 4567")] >#> [("", "", "ERR SYNTAX 3")]
|
||||
it "valid syntax" $ [("1234", "bcda", "12345678", "KEY 4567")] >#> [("", "bcda", "12345678", "ERR AUTH")]
|
||||
it "no parameters" $ [("1234", "cdab", "12345678", "KEY")] >#> [("", "cdab", "12345678", "ERR SYNTAX 2")]
|
||||
it "many parameters" $ [("1234", "dabc", "12345678", "KEY 1 2")] >#> [("", "dabc", "12345678", "ERR SYNTAX 2")]
|
||||
it "no signature" $ [("", "abcd", "12345678", "KEY 4567")] >#> [("", "abcd", "12345678", "ERR SYNTAX 3")]
|
||||
it "no queue ID" $ [("1234", "bcda", "", "KEY 4567")] >#> [("", "bcda", "", "ERR SYNTAX 3")]
|
||||
noParamsSyntaxTest "SUB"
|
||||
noParamsSyntaxTest "ACK"
|
||||
noParamsSyntaxTest "OFF"
|
||||
noParamsSyntaxTest "DEL"
|
||||
describe "SEND" do
|
||||
it "valid syntax 1" $ [("1234", "1", "SEND :hello")] >#> [("", "1", "ERR AUTH")]
|
||||
it "valid syntax 2" $ [("1234", "1", "SEND 11\nhello there\n")] >#> [("", "1", "ERR AUTH")]
|
||||
it "no parameters" $ [("1234", "1", "SEND")] >#> [("", "1", "ERR SYNTAX 2")]
|
||||
it "no queue ID" $ [("1234", "", "SEND :hello")] >#> [("", "", "ERR SYNTAX 5")]
|
||||
it "bad message body 1" $ [("1234", "1", "SEND 11 hello")] >#> [("", "1", "ERR SYNTAX 6")]
|
||||
it "bad message body 2" $ [("1234", "1", "SEND hello")] >#> [("", "1", "ERR SYNTAX 6")]
|
||||
it "bigger body" $ [("1234", "1", "SEND 4\nhello\n")] >#> [("", "1", "ERR SIZE")]
|
||||
it "valid syntax 1" $ [("1234", "cdab", "12345678", "SEND :hello")] >#> [("", "cdab", "12345678", "ERR AUTH")]
|
||||
it "valid syntax 2" $ [("1234", "dabc", "12345678", "SEND 11\nhello there\n")] >#> [("", "dabc", "12345678", "ERR AUTH")]
|
||||
it "no parameters" $ [("1234", "abcd", "12345678", "SEND")] >#> [("", "abcd", "12345678", "ERR SYNTAX 2")]
|
||||
it "no queue ID" $ [("1234", "bcda", "", "SEND :hello")] >#> [("", "bcda", "", "ERR SYNTAX 5")]
|
||||
it "bad message body 1" $ [("1234", "cdab", "12345678", "SEND 11 hello")] >#> [("", "cdab", "12345678", "ERR SYNTAX 6")]
|
||||
it "bad message body 2" $ [("1234", "dabc", "12345678", "SEND hello")] >#> [("", "dabc", "12345678", "ERR SYNTAX 6")]
|
||||
it "bigger body" $ [("1234", "abcd", "12345678", "SEND 4\nhello\n")] >#> [("", "abcd", "12345678", "ERR SIZE")]
|
||||
describe "broker response not allowed" do
|
||||
it "OK" $ [("1234", "1", "OK")] >#> [("", "1", "ERR PROHIBITED")]
|
||||
it "OK" $ [("1234", "bcda", "12345678", "OK")] >#> [("", "bcda", "12345678", "ERR PROHIBITED")]
|
||||
where
|
||||
noParamsSyntaxTest :: ByteString -> SpecWith ()
|
||||
noParamsSyntaxTest cmd = describe (B.unpack cmd) do
|
||||
it "valid syntax" $ [("1234", "1", cmd)] >#> [("", "1", "ERR AUTH")]
|
||||
it "parameters" $ [("1234", "1", cmd <> " 1")] >#> [("", "1", "ERR SYNTAX 2")]
|
||||
it "no signature" $ [("", "1", cmd)] >#> [("", "1", "ERR SYNTAX 3")]
|
||||
it "no queue ID" $ [("1234", "", cmd)] >#> [("", "", "ERR SYNTAX 3")]
|
||||
it "valid syntax" $ [("1234", "abcd", "12345678", cmd)] >#> [("", "abcd", "12345678", "ERR AUTH")]
|
||||
it "parameters" $ [("1234", "bcda", "12345678", cmd <> " 1")] >#> [("", "bcda", "12345678", "ERR SYNTAX 2")]
|
||||
it "no signature" $ [("", "cdab", "12345678", cmd)] >#> [("", "cdab", "12345678", "ERR SYNTAX 3")]
|
||||
it "no queue ID" $ [("1234", "dabc", "", cmd)] >#> [("", "dabc", "", "ERR SYNTAX 3")]
|
||||
|
||||
Reference in New Issue
Block a user