mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 20:45:52 +00:00
suspend and delete connection (#28)
* suspend and delete connection * agent: OFF/DEL tests, infix operators in tests * test for subscriptions
This commit is contained in:
committed by
Efim Poberezkin
parent
19dc7b3389
commit
d719b741dc
@@ -107,6 +107,8 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
|
||||
SUB -> subscribeConnection
|
||||
SEND msgBody -> sendMessage msgBody
|
||||
ACK aMsgId -> ackMessage aMsgId
|
||||
OFF -> suspendConnection
|
||||
DEL -> deleteConnection
|
||||
where
|
||||
createNewConnection :: SMPServer -> m ()
|
||||
createNewConnection server = do
|
||||
@@ -163,6 +165,27 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
|
||||
where
|
||||
ackMsg rq = sendAck c rq >> respond OK
|
||||
|
||||
suspendConnection :: m ()
|
||||
suspendConnection =
|
||||
withStore (`getConn` connAlias) >>= \case
|
||||
SomeConn _ (DuplexConnection _ rq _) -> suspend rq
|
||||
SomeConn _ (ReceiveConnection _ rq) -> suspend rq
|
||||
_ -> throwError PROHIBITED
|
||||
where
|
||||
suspend rq = suspendQueue c rq >> respond OK
|
||||
|
||||
deleteConnection :: m ()
|
||||
deleteConnection =
|
||||
withStore (`getConn` connAlias) >>= \case
|
||||
SomeConn _ (DuplexConnection _ rq _) -> delete rq
|
||||
SomeConn _ (ReceiveConnection _ rq) -> delete rq
|
||||
_ -> throwError PROHIBITED
|
||||
where
|
||||
delete rq = do
|
||||
deleteQueue c rq
|
||||
withStore (`deleteConn` connAlias)
|
||||
respond OK
|
||||
|
||||
sendReplyQInfo :: SMPServer -> SendQueue -> m ()
|
||||
sendReplyQInfo srv sq = do
|
||||
(rq, qInfo) <- newReceiveQueue c srv connAlias
|
||||
|
||||
@@ -19,6 +19,8 @@ module Simplex.Messaging.Agent.Client
|
||||
secureQueue,
|
||||
sendAgentMessage,
|
||||
sendAck,
|
||||
suspendQueue,
|
||||
deleteQueue,
|
||||
logServer,
|
||||
removeSubscription,
|
||||
)
|
||||
@@ -209,6 +211,16 @@ sendAck c ReceiveQueue {server, rcvId, rcvPrivateKey} =
|
||||
withLogSMP c server rcvId "ACK" $ \smp ->
|
||||
ackSMPMessage smp rcvPrivateKey rcvId
|
||||
|
||||
suspendQueue :: AgentMonad m => AgentClient -> ReceiveQueue -> m ()
|
||||
suspendQueue c ReceiveQueue {server, rcvId, rcvPrivateKey} =
|
||||
withLogSMP c server rcvId "OFF" $ \smp ->
|
||||
suspendSMPQueue smp rcvPrivateKey rcvId
|
||||
|
||||
deleteQueue :: AgentMonad m => AgentClient -> ReceiveQueue -> m ()
|
||||
deleteQueue c ReceiveQueue {server, rcvId, rcvPrivateKey} =
|
||||
withLogSMP c server rcvId "DEL" $ \smp ->
|
||||
deleteSMPQueue smp rcvPrivateKey rcvId
|
||||
|
||||
sendAgentMessage :: AgentMonad m => AgentClient -> SendQueue -> AMessage -> m ()
|
||||
sendAgentMessage c SendQueue {server, sndId, sndPrivateKey, encryptKey} agentMsg = do
|
||||
msg <- mkAgentMessage encryptKey agentMsg
|
||||
|
||||
@@ -33,6 +33,7 @@ import Simplex.Messaging.Agent.Store.Types
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Types (CorrId (..), Encoded, ErrorType, MsgBody, PublicKey, errBadParameters, errMessageBody)
|
||||
import qualified Simplex.Messaging.Types as ST
|
||||
import Simplex.Messaging.Util
|
||||
import System.IO
|
||||
import Text.Read
|
||||
@@ -71,7 +72,6 @@ data ACommand (p :: AParty) where
|
||||
JOIN :: SMPQueueInfo -> ReplyMode -> ACommand Client -- response OK
|
||||
CON :: ACommand Agent -- notification that connection is established
|
||||
-- TODO currently it automatically allows whoever sends the confirmation
|
||||
READY :: ACommand Agent
|
||||
-- CONF :: OtherPartyId -> ACommand Agent
|
||||
-- LET :: OtherPartyId -> ACommand Client
|
||||
SUB :: ACommand Client
|
||||
@@ -82,11 +82,13 @@ data ACommand (p :: AParty) where
|
||||
MSG :: AgentMsgId -> UTCTime -> UTCTime -> MsgStatus -> MsgBody -> ACommand Agent
|
||||
ACK :: AgentMsgId -> ACommand Client
|
||||
-- RCVD :: AgentMsgId -> ACommand Agent
|
||||
-- OFF :: ACommand Client
|
||||
-- DEL :: ACommand Client
|
||||
OFF :: ACommand Client
|
||||
DEL :: ACommand Client
|
||||
OK :: ACommand Agent
|
||||
ERR :: AgentErrorType -> ACommand Agent
|
||||
|
||||
deriving instance Eq (ACommand p)
|
||||
|
||||
deriving instance Show (ACommand p)
|
||||
|
||||
type Message = ByteString
|
||||
@@ -210,9 +212,9 @@ data Mode = On | Off deriving (Eq, Show, Read)
|
||||
newtype AckMode = AckMode Mode deriving (Eq, Show)
|
||||
|
||||
data SMPQueueInfo = SMPQueueInfo SMPServer SMP.SenderId EncryptionKey
|
||||
deriving (Show)
|
||||
deriving (Eq, Show)
|
||||
|
||||
data ReplyMode = ReplyOff | ReplyOn | ReplyVia SMPServer deriving (Show)
|
||||
data ReplyMode = ReplyOff | ReplyOn | ReplyVia SMPServer deriving (Eq, Show)
|
||||
|
||||
type EncryptionKey = PublicKey
|
||||
|
||||
@@ -226,10 +228,10 @@ data QueueStatus = New | Confirmed | Secured | Active | Disabled
|
||||
type AgentMsgId = Integer
|
||||
|
||||
data MsgStatus = MsgOk | MsgError MsgErrorType
|
||||
deriving (Show)
|
||||
deriving (Eq, Show)
|
||||
|
||||
data MsgErrorType = MsgSkipped AgentMsgId AgentMsgId | MsgBadId AgentMsgId | MsgBadHash
|
||||
deriving (Show)
|
||||
deriving (Eq, Show)
|
||||
|
||||
data AgentErrorType
|
||||
= UNKNOWN
|
||||
@@ -240,7 +242,7 @@ data AgentErrorType
|
||||
| SIZE
|
||||
| STORE StoreError
|
||||
| INTERNAL -- etc. TODO SYNTAX Natural
|
||||
deriving (Show, Exception)
|
||||
deriving (Eq, Show, Exception)
|
||||
|
||||
data AckStatus = AckOk | AckError AckErrorType
|
||||
deriving (Show)
|
||||
@@ -285,7 +287,9 @@ parseCommandP =
|
||||
<|> "SEND " *> sendCmd
|
||||
<|> "MSG " *> message
|
||||
<|> "ACK " *> acknowledge
|
||||
-- <|> "ERR " *> agentError - TODO
|
||||
<|> "OFF" $> ACmd SClient OFF
|
||||
<|> "DEL" $> ACmd SClient DEL
|
||||
<|> "ERR " *> agentError
|
||||
<|> "CON" $> ACmd SAgent CON
|
||||
<|> "OK" $> ACmd SAgent OK
|
||||
where
|
||||
@@ -297,6 +301,9 @@ parseCommandP =
|
||||
let sp = A.space; msgId = A.decimal <* sp; ts = tsISO8601P <* sp; body = A.takeByteString
|
||||
in ACmd SAgent <$> (MSG <$> msgId <*> ts <*> ts <*> status <* sp <*> body)
|
||||
acknowledge = ACmd SClient <$> (ACK <$> A.decimal)
|
||||
-- TODO other error types
|
||||
agentError = ACmd SAgent . ERR <$> ("SMP " *> smpErrorType)
|
||||
smpErrorType = "AUTH" $> SMP ST.AUTH
|
||||
replyMode =
|
||||
" NO_REPLY" $> ReplyOff
|
||||
<|> A.space *> (ReplyVia <$> smpServerP)
|
||||
@@ -321,10 +328,11 @@ serializeCommand = \case
|
||||
MSG aMsgId aTs ts st body ->
|
||||
B.unwords ["MSG", B.pack $ show aMsgId, B.pack $ formatISO8601Millis aTs, B.pack $ formatISO8601Millis ts, msgStatus st, serializeMsg body]
|
||||
ACK aMsgId -> "ACK " <> B.pack (show aMsgId)
|
||||
OFF -> "OFF"
|
||||
DEL -> "DEL"
|
||||
CON -> "CON"
|
||||
ERR e -> "ERR " <> B.pack (show e)
|
||||
OK -> "OK"
|
||||
c -> B.pack $ show c
|
||||
where
|
||||
replyMode :: ReplyMode -> ByteString
|
||||
replyMode = \case
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
|
||||
module AgentTests where
|
||||
|
||||
@@ -11,7 +12,9 @@ import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import SMPAgentClient
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Types (ErrorType (..), MsgBody)
|
||||
import System.IO (Handle)
|
||||
import System.Timeout
|
||||
import Test.Hspec
|
||||
|
||||
agentTests :: Spec
|
||||
@@ -20,12 +23,12 @@ agentTests = do
|
||||
describe "SMP agent protocol syntax" syntaxTests
|
||||
describe "Establishing duplex connection" do
|
||||
it "should connect via one server and one agent" $
|
||||
smpAgentTest2_1 testDuplexConnection1
|
||||
it "should connect via one server and two agents" $
|
||||
smpAgentTest2 testDuplexConnection1
|
||||
|
||||
sendRecv :: Handle -> (ByteString, ByteString, ByteString) -> IO (ATransmissionOrError 'Agent)
|
||||
sendRecv h (corrId, connAlias, cmd) = tPutRaw h (corrId, connAlias, cmd) >> tGet SAgent h
|
||||
smpAgentTest2_1 testDuplexConnection
|
||||
it "should connect via one server and 2 agents" $
|
||||
smpAgentTest2 testDuplexConnection
|
||||
describe "Connection subscriptions" do
|
||||
it "should connect via one server and one agent" $
|
||||
smpAgentTest3_1 testSubscription
|
||||
|
||||
(>#>) :: ARawTransmission -> ARawTransmission -> Expectation
|
||||
command >#> response = smpAgentTest command `shouldReturn` response
|
||||
@@ -33,21 +36,66 @@ command >#> response = smpAgentTest command `shouldReturn` response
|
||||
(>#>=) :: ARawTransmission -> ((ByteString, ByteString, [ByteString]) -> Bool) -> Expectation
|
||||
command >#>= p = smpAgentTest command >>= (`shouldSatisfy` p . \(cId, cAlias, cmd) -> (cId, cAlias, B.words cmd))
|
||||
|
||||
testDuplexConnection1 :: Handle -> Handle -> IO ()
|
||||
testDuplexConnection1 alice bob = do
|
||||
("1", "bob", Right (INV qInfo)) <- sendRecv alice ("1", "bob", "NEW localhost:5000")
|
||||
("11", "alice", Right CON) <- sendRecv bob ("11", "alice", "JOIN " <> serializeSmpQueueInfo qInfo)
|
||||
("", "bob", Right CON) <- tGet SAgent alice
|
||||
("2", "bob", Right OK) <- sendRecv alice ("2", "bob", "SEND :hello")
|
||||
("3", "bob", Right OK) <- sendRecv alice ("3", "bob", "SEND :how are you?")
|
||||
("", "alice", Right (MSG _ _ _ _ "hello")) <- tGet SAgent bob
|
||||
("12", "alice", Right OK) <- sendRecv bob ("12", "alice", "ACK 0")
|
||||
("", "alice", Right (MSG _ _ _ _ "how are you?")) <- tGet SAgent bob
|
||||
("13", "alice", Right OK) <- sendRecv bob ("13", "alice", "ACK 0")
|
||||
("14", "alice", Right OK) <- sendRecv bob ("14", "alice", "SEND 9\nhello too")
|
||||
("", "bob", Right (MSG _ _ _ _ "hello too")) <- tGet SAgent alice
|
||||
("4", "bob", Right OK) <- sendRecv alice ("4", "bob", "ACK 0")
|
||||
return ()
|
||||
(#:) :: Handle -> (ByteString, ByteString, ByteString) -> IO (ATransmissionOrError 'Agent)
|
||||
h #: t = tPutRaw h t >> tGet SAgent h
|
||||
|
||||
(#>) :: IO (ATransmissionOrError 'Agent) -> ATransmission 'Agent -> Expectation
|
||||
action #> (corrId, cAlias, cmd) = action `shouldReturn` (corrId, cAlias, Right cmd)
|
||||
|
||||
(<#) :: Handle -> ATransmission 'Agent -> Expectation
|
||||
h <# (corrId, cAlias, cmd) = tGet SAgent h `shouldReturn` (corrId, cAlias, Right cmd)
|
||||
|
||||
(<#=) :: Handle -> (ATransmissionOrError 'Agent -> Bool) -> Expectation
|
||||
h <#= p = tGet SAgent h >>= (`shouldSatisfy` p)
|
||||
|
||||
pattern Msg :: MsgBody -> Either AgentErrorType (ACommand 'Agent)
|
||||
pattern Msg msg <- Right (MSG _ _ _ _ msg)
|
||||
|
||||
testDuplexConnection :: Handle -> Handle -> IO ()
|
||||
testDuplexConnection alice bob = do
|
||||
("1", "bob", Right (INV qInfo)) <- alice #: ("1", "bob", "NEW localhost:5000")
|
||||
let qInfo' = serializeSmpQueueInfo qInfo
|
||||
bob #: ("11", "alice", "JOIN " <> qInfo') #> ("11", "alice", CON)
|
||||
alice <# ("", "bob", CON)
|
||||
alice #: ("2", "bob", "SEND :hello") #> ("2", "bob", OK)
|
||||
alice #: ("3", "bob", "SEND :how are you?") #> ("3", "bob", OK)
|
||||
bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False
|
||||
bob #: ("12", "alice", "ACK 0") #> ("12", "alice", OK)
|
||||
bob <#= \case ("", "alice", Msg "how are you?") -> True; _ -> False
|
||||
bob #: ("13", "alice", "ACK 0") #> ("13", "alice", OK)
|
||||
bob #: ("14", "alice", "SEND 9\nhello too") #> ("14", "alice", OK)
|
||||
alice <#= \case ("", "bob", Msg "hello too") -> True; _ -> False
|
||||
alice #: ("4", "bob", "ACK 0") #> ("4", "bob", OK)
|
||||
bob #: ("15", "alice", "SEND 9\nmessage 1") #> ("15", "alice", OK)
|
||||
bob #: ("16", "alice", "SEND 9\nmessage 2") #> ("16", "alice", OK)
|
||||
alice <#= \case ("", "bob", Msg "message 1") -> True; _ -> False
|
||||
alice #: ("5", "bob", "OFF") #> ("5", "bob", OK)
|
||||
bob #: ("17", "alice", "SEND 9\nmessage 3") #> ("17", "alice", ERR (SMP AUTH))
|
||||
alice #: ("6", "bob", "DEL") #> ("6", "bob", OK)
|
||||
10000 `timeout` tGet SAgent alice >>= \case
|
||||
Nothing -> return ()
|
||||
Just _ -> error "nothing else should be delivered to alice"
|
||||
|
||||
testSubscription :: Handle -> Handle -> Handle -> IO ()
|
||||
testSubscription alice1 alice2 bob = do
|
||||
("1", "bob", Right (INV qInfo)) <- alice1 #: ("1", "bob", "NEW localhost:5000")
|
||||
let qInfo' = serializeSmpQueueInfo qInfo
|
||||
bob #: ("11", "alice", "JOIN " <> qInfo') #> ("11", "alice", CON)
|
||||
bob #: ("12", "alice", "SEND 5\nhello") #> ("12", "alice", OK)
|
||||
bob #: ("13", "alice", "SEND 11\nhello again") #> ("13", "alice", OK)
|
||||
alice1 <# ("", "bob", CON)
|
||||
alice1 <#= \case ("", "bob", Msg "hello") -> True; _ -> False
|
||||
alice1 #: ("2", "bob", "ACK 0") #> ("2", "bob", OK)
|
||||
alice1 <#= \case ("", "bob", Msg "hello again") -> True; _ -> False
|
||||
alice2 #: ("21", "bob", "SUB") #> ("21", "bob", OK)
|
||||
alice2 <#= \case ("", "bob", Msg "hello again") -> True; _ -> False
|
||||
alice1 <# ("", "bob", END)
|
||||
alice2 #: ("22", "bob", "ACK 0") #> ("22", "bob", OK)
|
||||
bob #: ("14", "alice", "SEND 2\nhi") #> ("14", "alice", OK)
|
||||
alice2 <#= \case ("", "bob", Msg "hi") -> True; _ -> False
|
||||
10000 `timeout` tGet SAgent alice1 >>= \case
|
||||
Nothing -> return ()
|
||||
Just _ -> error "nothing else should be delivered to alice"
|
||||
|
||||
syntaxTests :: Spec
|
||||
syntaxTests = do
|
||||
|
||||
@@ -30,12 +30,18 @@ agentTestPort = "5001"
|
||||
agentTestPort2 :: ServiceName
|
||||
agentTestPort2 = "5011"
|
||||
|
||||
agentTestPort3 :: ServiceName
|
||||
agentTestPort3 = "5021"
|
||||
|
||||
testDB :: String
|
||||
testDB = "smp-agent.test.protocol.db"
|
||||
|
||||
testDB2 :: String
|
||||
testDB2 = "smp-agent2.test.protocol.db"
|
||||
|
||||
testDB3 :: String
|
||||
testDB3 = "smp-agent3.test.protocol.db"
|
||||
|
||||
smpAgentTest :: ARawTransmission -> IO ARawTransmission
|
||||
smpAgentTest cmd = runSmpAgentTest $ \h -> tPutRaw h cmd >> tGetRaw h
|
||||
|
||||
@@ -75,6 +81,21 @@ smpAgentTest2_1 test' = smpAgentTestN_1 2 _test
|
||||
_test [h1, h2] = test' h1 h2
|
||||
_test _ = error "expected 2 handles"
|
||||
|
||||
smpAgentTest3 :: (Handle -> Handle -> Handle -> IO ()) -> Expectation
|
||||
smpAgentTest3 test' =
|
||||
smpAgentTestN
|
||||
[(agentTestPort, testDB), (agentTestPort2, testDB2), (agentTestPort3, testDB3)]
|
||||
_test
|
||||
where
|
||||
_test [h1, h2, h3] = test' h1 h2 h3
|
||||
_test _ = error "expected 3 handles"
|
||||
|
||||
smpAgentTest3_1 :: (Handle -> Handle -> Handle -> IO ()) -> Expectation
|
||||
smpAgentTest3_1 test' = smpAgentTestN_1 3 _test
|
||||
where
|
||||
_test [h1, h2, h3] = test' h1 h2 h3
|
||||
_test _ = error "expected 3 handles"
|
||||
|
||||
cfg :: AgentConfig
|
||||
cfg =
|
||||
AgentConfig
|
||||
|
||||
Reference in New Issue
Block a user