From d719b741dca18b0cdd1db9fa86fc852cb76b0acc Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Sun, 24 Jan 2021 19:20:49 +0000 Subject: [PATCH] suspend and delete connection (#28) * suspend and delete connection * agent: OFF/DEL tests, infix operators in tests * test for subscriptions --- src/Simplex/Messaging/Agent.hs | 23 ++++++ src/Simplex/Messaging/Agent/Client.hs | 12 +++ src/Simplex/Messaging/Agent/Transmission.hs | 28 ++++--- tests/AgentTests.hs | 90 ++++++++++++++++----- tests/SMPAgentClient.hs | 21 +++++ 5 files changed, 143 insertions(+), 31 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index e68a3c12d..31d31866b 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -107,6 +107,8 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) = SUB -> subscribeConnection SEND msgBody -> sendMessage msgBody ACK aMsgId -> ackMessage aMsgId + OFF -> suspendConnection + DEL -> deleteConnection where createNewConnection :: SMPServer -> m () createNewConnection server = do @@ -163,6 +165,27 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) = where ackMsg rq = sendAck c rq >> respond OK + suspendConnection :: m () + suspendConnection = + withStore (`getConn` connAlias) >>= \case + SomeConn _ (DuplexConnection _ rq _) -> suspend rq + SomeConn _ (ReceiveConnection _ rq) -> suspend rq + _ -> throwError PROHIBITED + where + suspend rq = suspendQueue c rq >> respond OK + + deleteConnection :: m () + deleteConnection = + withStore (`getConn` connAlias) >>= \case + SomeConn _ (DuplexConnection _ rq _) -> delete rq + SomeConn _ (ReceiveConnection _ rq) -> delete rq + _ -> throwError PROHIBITED + where + delete rq = do + deleteQueue c rq + withStore (`deleteConn` connAlias) + respond OK + sendReplyQInfo :: SMPServer -> SendQueue -> m () sendReplyQInfo srv sq = do (rq, qInfo) <- newReceiveQueue c srv connAlias diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index b29ae8d6a..d185ec35d 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -19,6 +19,8 @@ module Simplex.Messaging.Agent.Client secureQueue, sendAgentMessage, sendAck, + suspendQueue, + deleteQueue, logServer, removeSubscription, ) @@ -209,6 +211,16 @@ sendAck c ReceiveQueue {server, rcvId, rcvPrivateKey} = withLogSMP c server rcvId "ACK" $ \smp -> ackSMPMessage smp rcvPrivateKey rcvId +suspendQueue :: AgentMonad m => AgentClient -> ReceiveQueue -> m () +suspendQueue c ReceiveQueue {server, rcvId, rcvPrivateKey} = + withLogSMP c server rcvId "OFF" $ \smp -> + suspendSMPQueue smp rcvPrivateKey rcvId + +deleteQueue :: AgentMonad m => AgentClient -> ReceiveQueue -> m () +deleteQueue c ReceiveQueue {server, rcvId, rcvPrivateKey} = + withLogSMP c server rcvId "DEL" $ \smp -> + deleteSMPQueue smp rcvPrivateKey rcvId + sendAgentMessage :: AgentMonad m => AgentClient -> SendQueue -> AMessage -> m () sendAgentMessage c SendQueue {server, sndId, sndPrivateKey, encryptKey} agentMsg = do msg <- mkAgentMessage encryptKey agentMsg diff --git a/src/Simplex/Messaging/Agent/Transmission.hs b/src/Simplex/Messaging/Agent/Transmission.hs index c215d2de7..64f4fcc04 100644 --- a/src/Simplex/Messaging/Agent/Transmission.hs +++ b/src/Simplex/Messaging/Agent/Transmission.hs @@ -33,6 +33,7 @@ import Simplex.Messaging.Agent.Store.Types import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Transport import Simplex.Messaging.Types (CorrId (..), Encoded, ErrorType, MsgBody, PublicKey, errBadParameters, errMessageBody) +import qualified Simplex.Messaging.Types as ST import Simplex.Messaging.Util import System.IO import Text.Read @@ -71,7 +72,6 @@ data ACommand (p :: AParty) where JOIN :: SMPQueueInfo -> ReplyMode -> ACommand Client -- response OK CON :: ACommand Agent -- notification that connection is established -- TODO currently it automatically allows whoever sends the confirmation - READY :: ACommand Agent -- CONF :: OtherPartyId -> ACommand Agent -- LET :: OtherPartyId -> ACommand Client SUB :: ACommand Client @@ -82,11 +82,13 @@ data ACommand (p :: AParty) where MSG :: AgentMsgId -> UTCTime -> UTCTime -> MsgStatus -> MsgBody -> ACommand Agent ACK :: AgentMsgId -> ACommand Client -- RCVD :: AgentMsgId -> ACommand Agent - -- OFF :: ACommand Client - -- DEL :: ACommand Client + OFF :: ACommand Client + DEL :: ACommand Client OK :: ACommand Agent ERR :: AgentErrorType -> ACommand Agent +deriving instance Eq (ACommand p) + deriving instance Show (ACommand p) type Message = ByteString @@ -210,9 +212,9 @@ data Mode = On | Off deriving (Eq, Show, Read) newtype AckMode = AckMode Mode deriving (Eq, Show) data SMPQueueInfo = SMPQueueInfo SMPServer SMP.SenderId EncryptionKey - deriving (Show) + deriving (Eq, Show) -data ReplyMode = ReplyOff | ReplyOn | ReplyVia SMPServer deriving (Show) +data ReplyMode = ReplyOff | ReplyOn | ReplyVia SMPServer deriving (Eq, Show) type EncryptionKey = PublicKey @@ -226,10 +228,10 @@ data QueueStatus = New | Confirmed | Secured | Active | Disabled type AgentMsgId = Integer data MsgStatus = MsgOk | MsgError MsgErrorType - deriving (Show) + deriving (Eq, Show) data MsgErrorType = MsgSkipped AgentMsgId AgentMsgId | MsgBadId AgentMsgId | MsgBadHash - deriving (Show) + deriving (Eq, Show) data AgentErrorType = UNKNOWN @@ -240,7 +242,7 @@ data AgentErrorType | SIZE | STORE StoreError | INTERNAL -- etc. TODO SYNTAX Natural - deriving (Show, Exception) + deriving (Eq, Show, Exception) data AckStatus = AckOk | AckError AckErrorType deriving (Show) @@ -285,7 +287,9 @@ parseCommandP = <|> "SEND " *> sendCmd <|> "MSG " *> message <|> "ACK " *> acknowledge - -- <|> "ERR " *> agentError - TODO + <|> "OFF" $> ACmd SClient OFF + <|> "DEL" $> ACmd SClient DEL + <|> "ERR " *> agentError <|> "CON" $> ACmd SAgent CON <|> "OK" $> ACmd SAgent OK where @@ -297,6 +301,9 @@ parseCommandP = let sp = A.space; msgId = A.decimal <* sp; ts = tsISO8601P <* sp; body = A.takeByteString in ACmd SAgent <$> (MSG <$> msgId <*> ts <*> ts <*> status <* sp <*> body) acknowledge = ACmd SClient <$> (ACK <$> A.decimal) + -- TODO other error types + agentError = ACmd SAgent . ERR <$> ("SMP " *> smpErrorType) + smpErrorType = "AUTH" $> SMP ST.AUTH replyMode = " NO_REPLY" $> ReplyOff <|> A.space *> (ReplyVia <$> smpServerP) @@ -321,10 +328,11 @@ serializeCommand = \case MSG aMsgId aTs ts st body -> B.unwords ["MSG", B.pack $ show aMsgId, B.pack $ formatISO8601Millis aTs, B.pack $ formatISO8601Millis ts, msgStatus st, serializeMsg body] ACK aMsgId -> "ACK " <> B.pack (show aMsgId) + OFF -> "OFF" + DEL -> "DEL" CON -> "CON" ERR e -> "ERR " <> B.pack (show e) OK -> "OK" - c -> B.pack $ show c where replyMode :: ReplyMode -> ByteString replyMode = \case diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index 8255fa631..dc07dac2d 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -3,6 +3,7 @@ {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PatternSynonyms #-} module AgentTests where @@ -11,7 +12,9 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import SMPAgentClient import Simplex.Messaging.Agent.Transmission +import Simplex.Messaging.Types (ErrorType (..), MsgBody) import System.IO (Handle) +import System.Timeout import Test.Hspec agentTests :: Spec @@ -20,12 +23,12 @@ agentTests = do describe "SMP agent protocol syntax" syntaxTests describe "Establishing duplex connection" do it "should connect via one server and one agent" $ - smpAgentTest2_1 testDuplexConnection1 - it "should connect via one server and two agents" $ - smpAgentTest2 testDuplexConnection1 - -sendRecv :: Handle -> (ByteString, ByteString, ByteString) -> IO (ATransmissionOrError 'Agent) -sendRecv h (corrId, connAlias, cmd) = tPutRaw h (corrId, connAlias, cmd) >> tGet SAgent h + smpAgentTest2_1 testDuplexConnection + it "should connect via one server and 2 agents" $ + smpAgentTest2 testDuplexConnection + describe "Connection subscriptions" do + it "should connect via one server and one agent" $ + smpAgentTest3_1 testSubscription (>#>) :: ARawTransmission -> ARawTransmission -> Expectation command >#> response = smpAgentTest command `shouldReturn` response @@ -33,21 +36,66 @@ command >#> response = smpAgentTest command `shouldReturn` response (>#>=) :: ARawTransmission -> ((ByteString, ByteString, [ByteString]) -> Bool) -> Expectation command >#>= p = smpAgentTest command >>= (`shouldSatisfy` p . \(cId, cAlias, cmd) -> (cId, cAlias, B.words cmd)) -testDuplexConnection1 :: Handle -> Handle -> IO () -testDuplexConnection1 alice bob = do - ("1", "bob", Right (INV qInfo)) <- sendRecv alice ("1", "bob", "NEW localhost:5000") - ("11", "alice", Right CON) <- sendRecv bob ("11", "alice", "JOIN " <> serializeSmpQueueInfo qInfo) - ("", "bob", Right CON) <- tGet SAgent alice - ("2", "bob", Right OK) <- sendRecv alice ("2", "bob", "SEND :hello") - ("3", "bob", Right OK) <- sendRecv alice ("3", "bob", "SEND :how are you?") - ("", "alice", Right (MSG _ _ _ _ "hello")) <- tGet SAgent bob - ("12", "alice", Right OK) <- sendRecv bob ("12", "alice", "ACK 0") - ("", "alice", Right (MSG _ _ _ _ "how are you?")) <- tGet SAgent bob - ("13", "alice", Right OK) <- sendRecv bob ("13", "alice", "ACK 0") - ("14", "alice", Right OK) <- sendRecv bob ("14", "alice", "SEND 9\nhello too") - ("", "bob", Right (MSG _ _ _ _ "hello too")) <- tGet SAgent alice - ("4", "bob", Right OK) <- sendRecv alice ("4", "bob", "ACK 0") - return () +(#:) :: Handle -> (ByteString, ByteString, ByteString) -> IO (ATransmissionOrError 'Agent) +h #: t = tPutRaw h t >> tGet SAgent h + +(#>) :: IO (ATransmissionOrError 'Agent) -> ATransmission 'Agent -> Expectation +action #> (corrId, cAlias, cmd) = action `shouldReturn` (corrId, cAlias, Right cmd) + +(<#) :: Handle -> ATransmission 'Agent -> Expectation +h <# (corrId, cAlias, cmd) = tGet SAgent h `shouldReturn` (corrId, cAlias, Right cmd) + +(<#=) :: Handle -> (ATransmissionOrError 'Agent -> Bool) -> Expectation +h <#= p = tGet SAgent h >>= (`shouldSatisfy` p) + +pattern Msg :: MsgBody -> Either AgentErrorType (ACommand 'Agent) +pattern Msg msg <- Right (MSG _ _ _ _ msg) + +testDuplexConnection :: Handle -> Handle -> IO () +testDuplexConnection alice bob = do + ("1", "bob", Right (INV qInfo)) <- alice #: ("1", "bob", "NEW localhost:5000") + let qInfo' = serializeSmpQueueInfo qInfo + bob #: ("11", "alice", "JOIN " <> qInfo') #> ("11", "alice", CON) + alice <# ("", "bob", CON) + alice #: ("2", "bob", "SEND :hello") #> ("2", "bob", OK) + alice #: ("3", "bob", "SEND :how are you?") #> ("3", "bob", OK) + bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False + bob #: ("12", "alice", "ACK 0") #> ("12", "alice", OK) + bob <#= \case ("", "alice", Msg "how are you?") -> True; _ -> False + bob #: ("13", "alice", "ACK 0") #> ("13", "alice", OK) + bob #: ("14", "alice", "SEND 9\nhello too") #> ("14", "alice", OK) + alice <#= \case ("", "bob", Msg "hello too") -> True; _ -> False + alice #: ("4", "bob", "ACK 0") #> ("4", "bob", OK) + bob #: ("15", "alice", "SEND 9\nmessage 1") #> ("15", "alice", OK) + bob #: ("16", "alice", "SEND 9\nmessage 2") #> ("16", "alice", OK) + alice <#= \case ("", "bob", Msg "message 1") -> True; _ -> False + alice #: ("5", "bob", "OFF") #> ("5", "bob", OK) + bob #: ("17", "alice", "SEND 9\nmessage 3") #> ("17", "alice", ERR (SMP AUTH)) + alice #: ("6", "bob", "DEL") #> ("6", "bob", OK) + 10000 `timeout` tGet SAgent alice >>= \case + Nothing -> return () + Just _ -> error "nothing else should be delivered to alice" + +testSubscription :: Handle -> Handle -> Handle -> IO () +testSubscription alice1 alice2 bob = do + ("1", "bob", Right (INV qInfo)) <- alice1 #: ("1", "bob", "NEW localhost:5000") + let qInfo' = serializeSmpQueueInfo qInfo + bob #: ("11", "alice", "JOIN " <> qInfo') #> ("11", "alice", CON) + bob #: ("12", "alice", "SEND 5\nhello") #> ("12", "alice", OK) + bob #: ("13", "alice", "SEND 11\nhello again") #> ("13", "alice", OK) + alice1 <# ("", "bob", CON) + alice1 <#= \case ("", "bob", Msg "hello") -> True; _ -> False + alice1 #: ("2", "bob", "ACK 0") #> ("2", "bob", OK) + alice1 <#= \case ("", "bob", Msg "hello again") -> True; _ -> False + alice2 #: ("21", "bob", "SUB") #> ("21", "bob", OK) + alice2 <#= \case ("", "bob", Msg "hello again") -> True; _ -> False + alice1 <# ("", "bob", END) + alice2 #: ("22", "bob", "ACK 0") #> ("22", "bob", OK) + bob #: ("14", "alice", "SEND 2\nhi") #> ("14", "alice", OK) + alice2 <#= \case ("", "bob", Msg "hi") -> True; _ -> False + 10000 `timeout` tGet SAgent alice1 >>= \case + Nothing -> return () + Just _ -> error "nothing else should be delivered to alice" syntaxTests :: Spec syntaxTests = do diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index f4ea8c6cd..15eb48dc6 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -30,12 +30,18 @@ agentTestPort = "5001" agentTestPort2 :: ServiceName agentTestPort2 = "5011" +agentTestPort3 :: ServiceName +agentTestPort3 = "5021" + testDB :: String testDB = "smp-agent.test.protocol.db" testDB2 :: String testDB2 = "smp-agent2.test.protocol.db" +testDB3 :: String +testDB3 = "smp-agent3.test.protocol.db" + smpAgentTest :: ARawTransmission -> IO ARawTransmission smpAgentTest cmd = runSmpAgentTest $ \h -> tPutRaw h cmd >> tGetRaw h @@ -75,6 +81,21 @@ smpAgentTest2_1 test' = smpAgentTestN_1 2 _test _test [h1, h2] = test' h1 h2 _test _ = error "expected 2 handles" +smpAgentTest3 :: (Handle -> Handle -> Handle -> IO ()) -> Expectation +smpAgentTest3 test' = + smpAgentTestN + [(agentTestPort, testDB), (agentTestPort2, testDB2), (agentTestPort3, testDB3)] + _test + where + _test [h1, h2, h3] = test' h1 h2 h3 + _test _ = error "expected 3 handles" + +smpAgentTest3_1 :: (Handle -> Handle -> Handle -> IO ()) -> Expectation +smpAgentTest3_1 test' = smpAgentTestN_1 3 _test + where + _test [h1, h2, h3] = test' h1 h2 h3 + _test _ = error "expected 3 handles" + cfg :: AgentConfig cfg = AgentConfig