From 88f1b727e0e4bfef01a346d818b7f6d94a181b16 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Thu, 30 May 2024 18:49:43 +0100 Subject: [PATCH] SMP protocol extension to debug subscribed SMP queues (#1181) * SMP protocol extension to debug subscribed SMP queues * fix, test * corrections Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> * increase delays * increase timeout * delay * delay * enable all tests --------- Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> --- simplexmq.cabal | 1 + src/Simplex/Messaging/Agent.hs | 16 +++ src/Simplex/Messaging/Agent/Client.hs | 7 ++ src/Simplex/Messaging/Client.hs | 8 ++ src/Simplex/Messaging/Protocol.hs | 25 ++++- src/Simplex/Messaging/Server.hs | 24 ++++- .../Messaging/Server/QueueStore/QueueInfo.hs | 55 +++++++++++ src/Simplex/RemoteControl/Invitation.hs | 2 +- tests/AgentTests/FunctionalAPITests.hs | 98 ++++++++++++++++++- tests/CLITests.hs | 4 +- 10 files changed, 232 insertions(+), 8 deletions(-) create mode 100644 src/Simplex/Messaging/Server/QueueStore/QueueInfo.hs diff --git a/simplexmq.cabal b/simplexmq.cabal index 829fd23df..31dfff436 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -172,6 +172,7 @@ library Simplex.Messaging.Server.MsgStore Simplex.Messaging.Server.MsgStore.STM Simplex.Messaging.Server.QueueStore + Simplex.Messaging.Server.QueueStore.QueueInfo Simplex.Messaging.Server.QueueStore.STM Simplex.Messaging.Server.Stats Simplex.Messaging.Server.StoreLog diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 76d7d333d..073660ba0 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -70,6 +70,7 @@ module Simplex.Messaging.Agent sendMessages, sendMessagesB, ackMessage, + getConnectionQueueInfo, switchConnection, abortConnectionSwitch, synchronizeRatchet, @@ -176,6 +177,7 @@ import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Parsers (parse) import Simplex.Messaging.Protocol (BrokerMsg, Cmd (..), EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SParty (..), SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, VersionSMPC, XFTPServerWithAuth) import qualified Simplex.Messaging.Protocol as SMP +import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.ServiceScheme (ServiceScheme (..)) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (SMPVersion, THandleParams (sessionId)) @@ -371,6 +373,10 @@ ackMessage :: AgentClient -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> AE ackMessage c = withAgentEnv c .:. ackMessage' c {-# INLINE ackMessage #-} +getConnectionQueueInfo :: AgentClient -> ConnId -> AE QueueInfo +getConnectionQueueInfo c = withAgentEnv c . getConnectionQueueInfo' c +{-# INLINE getConnectionQueueInfo #-} + -- | Switch connection to the new receive queue switchConnection :: AgentClient -> ConnId -> AE ConnectionStats switchConnection c = withAgentEnv c . switchConnection' c @@ -1510,6 +1516,16 @@ ackMessage' c connId msgId rcptInfo_ = withConnLock c connId "ackMessage" $ do withStore' c $ \db -> deleteDeliveredSndMsg db connId $ InternalId sndMsgId _ -> pure () +getConnectionQueueInfo' :: AgentClient -> ConnId -> AM QueueInfo +getConnectionQueueInfo' c connId = do + SomeConn _ conn <- withStore c (`getConn` connId) + case conn of + DuplexConnection _ (rq :| _) _ -> getQueueInfo c rq + RcvConnection _ rq -> getQueueInfo c rq + ContactConnection _ rq -> getQueueInfo c rq + SndConnection {} -> throwE $ CONN SIMPLEX + NewConnection _ -> throwE $ CMD PROHIBITED "getConnectionQueueInfo': NewConnection" + switchConnection' :: AgentClient -> ConnId -> AM ConnectionStats switchConnection' c connId = withConnLock c connId "switchConnection" $ diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index bc2a41ae2..e63ab5d1d 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -56,6 +56,7 @@ module Simplex.Messaging.Agent.Client disableQueueNotifications, disableQueuesNtfs, sendAgentMessage, + getQueueInfo, agentNtfRegisterToken, agentNtfVerifyToken, agentNtfCheckToken, @@ -238,6 +239,7 @@ import Simplex.Messaging.Protocol sameSrvAddr', ) import qualified Simplex.Messaging.Protocol as SMP +import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.Session import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -1577,6 +1579,11 @@ sendAgentMessage c sq@SndQueue {userId, server, sndId, sndPrivateKey} msgFlags a msg <- agentCbEncrypt sq Nothing $ smpEncode clientMsg sendOrProxySMPMessage c userId server "" (Just sndPrivateKey) sndId msgFlags msg +getQueueInfo :: AgentClient -> RcvQueue -> AM QueueInfo +getQueueInfo c rq@RcvQueue {rcvId, rcvPrivateKey} = + withSMPClient c rq "QUE" $ \smp -> + getSMPQueueInfo smp rcvPrivateKey rcvId + agentNtfRegisterToken :: AgentClient -> NtfToken -> NtfPublicAuthKey -> C.PublicKeyX25519 -> AM (NtfTokenId, C.PublicKeyX25519) agentNtfRegisterToken c NtfToken {deviceToken, ntfServer, ntfPrivKey} ntfPubKey pubDhKey = withClient c (0, ntfServer, Nothing) "TNEW" $ \ntf -> ntfRegisterToken ntf ntfPrivKey (NewNtfTkn deviceToken ntfPubKey pubDhKey) diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 8c23db822..79838792e 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -59,6 +59,7 @@ module Simplex.Messaging.Client connectSMPProxiedRelay, proxySMPMessage, forwardSMPMessage, + getSMPQueueInfo, sendProtocolCommand, -- * Supporting types and client configuration @@ -128,6 +129,7 @@ import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (defaultJSON, dropPrefix, enumJSON) import Simplex.Messaging.Protocol +import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport @@ -920,6 +922,12 @@ forwardSMPMessage c@ProtocolClient {thParams, client_ = PClient {clientCorrId = pure fwdResponse r -> throwE $ unexpectedResponse r +getSMPQueueInfo :: SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO QueueInfo +getSMPQueueInfo c pKey qId = + sendSMPCommand c (Just pKey) qId QUE >>= \case + INFO info -> pure info + r -> throwE $ unexpectedResponse r + okSMPCommand :: PartyI p => Command p -> SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO () okSMPCommand cmd c pKey qId = sendSMPCommand c (Just pKey) qId cmd >>= \case diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 1812dce37..5edea1719 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -156,6 +156,7 @@ module Simplex.Messaging.Protocol sameSrvAddr, sameSrvAddr', noAuthSrv, + toMsgInfo, -- * TCP transport functions TransportBatch (..), @@ -197,8 +198,8 @@ import qualified Data.List.NonEmpty as L import Data.Maybe (isJust, isNothing) import Data.String import qualified Data.Text as T -import Data.Text.Encoding (encodeUtf8) -import Data.Time.Clock.System (SystemTime (..)) +import Data.Text.Encoding (decodeLatin1, encodeUtf8) +import Data.Time.Clock.System (SystemTime (..), systemToUTCTime) import Data.Type.Equality import Data.Word (Word16) import qualified Data.X509 as X @@ -210,6 +211,7 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers +import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.ServiceScheme import Simplex.Messaging.Transport import Simplex.Messaging.Transport.Client (TransportHost, TransportHosts (..)) @@ -386,6 +388,7 @@ data Command (p :: Party) where ACK :: MsgId -> Command Recipient OFF :: Command Recipient DEL :: Command Recipient + QUE :: Command Recipient -- SMP sender commands -- SEND v1 has to be supported for encoding/decoding -- SEND :: MsgBody -> Command Sender @@ -463,6 +466,7 @@ data BrokerMsg where RRES :: EncFwdResponse -> BrokerMsg -- relay to proxy PRES :: EncResponse -> BrokerMsg -- proxy to client END :: BrokerMsg + INFO :: QueueInfo -> BrokerMsg OK :: BrokerMsg ERR :: ErrorType -> BrokerMsg PONG :: BrokerMsg @@ -505,6 +509,13 @@ data Message msgTs :: SystemTime } +toMsgInfo :: Message -> MsgInfo +toMsgInfo = \case + Message {msgId, msgTs} -> msgInfo msgId msgTs MTMessage + MessageQuota {msgId, msgTs} -> msgInfo msgId msgTs MTQuota + where + msgInfo msgId msgTs msgType = MsgInfo {msgId = decodeLatin1 $ B64.encode msgId, msgTs = systemToUTCTime msgTs, msgType} + messageId :: Message -> MsgId messageId = \case Message {msgId} -> msgId @@ -652,6 +663,7 @@ data CommandTag (p :: Party) where ACK_ :: CommandTag Recipient OFF_ :: CommandTag Recipient DEL_ :: CommandTag Recipient + QUE_ :: CommandTag Recipient SEND_ :: CommandTag Sender PING_ :: CommandTag Sender PRXY_ :: CommandTag ProxiedClient @@ -674,6 +686,7 @@ data BrokerMsgTag | RRES_ | PRES_ | END_ + | INFO_ | OK_ | ERR_ | PONG_ @@ -698,6 +711,7 @@ instance PartyI p => Encoding (CommandTag p) where ACK_ -> "ACK" OFF_ -> "OFF" DEL_ -> "DEL" + QUE_ -> "QUE" SEND_ -> "SEND" PING_ -> "PING" PRXY_ -> "PRXY" @@ -717,6 +731,7 @@ instance ProtocolMsgTag CmdTag where "ACK" -> Just $ CT SRecipient ACK_ "OFF" -> Just $ CT SRecipient OFF_ "DEL" -> Just $ CT SRecipient DEL_ + "QUE" -> Just $ CT SRecipient QUE_ "SEND" -> Just $ CT SSender SEND_ "PING" -> Just $ CT SSender PING_ "PRXY" -> Just $ CT SProxiedClient PRXY_ @@ -742,6 +757,7 @@ instance Encoding BrokerMsgTag where RRES_ -> "RRES" PRES_ -> "PRES" END_ -> "END" + INFO_ -> "INFO" OK_ -> "OK" ERR_ -> "ERR" PONG_ -> "PONG" @@ -757,6 +773,7 @@ instance ProtocolMsgTag BrokerMsgTag where "RRES" -> Just RRES_ "PRES" -> Just PRES_ "END" -> Just END_ + "INFO" -> Just INFO_ "OK" -> Just OK_ "ERR" -> Just ERR_ "PONG" -> Just PONG_ @@ -1275,6 +1292,7 @@ instance PartyI p => ProtocolEncoding SMPVersion ErrorType (Command p) where ACK msgId -> e (ACK_, ' ', msgId) OFF -> e OFF_ DEL -> e DEL_ + QUE -> e QUE_ SEND flags msg -> e (SEND_, ' ', flags, ' ', Tail msg) PING -> e PING_ NSUB -> e NSUB_ @@ -1340,6 +1358,7 @@ instance ProtocolEncoding SMPVersion ErrorType Cmd where ACK_ -> ACK <$> _smpP OFF_ -> pure OFF DEL_ -> pure DEL + QUE_ -> pure QUE CT SSender tag -> Cmd SSender <$> case tag of SEND_ -> SEND <$> _smpP <*> (unTail <$> _smpP) @@ -1368,6 +1387,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where RRES (EncFwdResponse encBlock) -> e (RRES_, ' ', Tail encBlock) PRES (EncResponse encBlock) -> e (PRES_, ' ', Tail encBlock) END -> e END_ + INFO info -> e (INFO_, ' ', info) OK -> e OK_ ERR err -> e (ERR_, ' ', err) PONG -> e PONG_ @@ -1388,6 +1408,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where RRES_ -> RRES <$> (EncFwdResponse . unTail <$> _smpP) PRES_ -> PRES <$> (EncResponse . unTail <$> _smpP) END_ -> pure END + INFO_ -> INFO <$> _smpP OK_ -> pure OK ERR_ -> ERR <$> _smpP PONG_ -> pure PONG diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 0d209229c..d34c0002c 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -59,7 +59,7 @@ import Data.List (intercalate, mapAccumR) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes, fromMaybe, isNothing) +import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) @@ -82,6 +82,7 @@ import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.QueueStore +import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.Server.QueueStore.STM as QS import Simplex.Messaging.Server.Stats import Simplex.Messaging.Server.StoreLog @@ -791,6 +792,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi NDEL -> deleteQueueNotifier_ st OFF -> suspendQueue_ st DEL -> delQueueAndMsgs st + QUE -> withQueue getQueueInfo where createQueue :: QueueStore -> RcvPublicAuthKey -> RcvPublicDhKey -> SubscriptionMode -> M (Transmission BrokerMsg) createQueue st recipientKey dhKey subMode = time "NEW" $ do @@ -1162,6 +1164,26 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi Right q -> updateDeletedStats q $> ok Left e -> pure $ err e + getQueueInfo :: QueueRec -> M (Transmission BrokerMsg) + getQueueInfo QueueRec {senderKey, notifier} = do + q@MsgQueue {size} <- getStoreMsgQueue "getQueueInfo" queueId + info <- atomically $ do + qiSub <- TM.lookup queueId subscriptions >>= mapM mkQSub + qiSize <- readTVar size + qiMsg <- toMsgInfo <$$> tryPeekMsg q + pure QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg} + pure (corrId, queueId, INFO info) + where + mkQSub sub = do + Sub {subThread, delivered} <- readTVar sub + let qSubThread = case subThread of + NoSub -> QNoSub + SubPending -> QSubPending + SubThread _ -> QSubThread + ProhibitSub -> QProhibitSub + qDelivered <- decodeLatin1 . encode <$$> tryReadTMVar delivered + pure QSub {qSubThread, qDelivered} + ok :: Transmission BrokerMsg ok = (corrId, queueId, OK) diff --git a/src/Simplex/Messaging/Server/QueueStore/QueueInfo.hs b/src/Simplex/Messaging/Server/QueueStore/QueueInfo.hs new file mode 100644 index 000000000..b329a54ff --- /dev/null +++ b/src/Simplex/Messaging/Server/QueueStore/QueueInfo.hs @@ -0,0 +1,55 @@ +{-# LANGUAGE TemplateHaskell #-} + +module Simplex.Messaging.Server.QueueStore.QueueInfo where + +import qualified Data.Aeson as J +import qualified Data.Aeson.TH as JQ +import qualified Data.Attoparsec.ByteString.Char8 as A +import qualified Data.ByteString.Lazy.Char8 as LB +import Data.Text (Text) +import Data.Time.Clock (UTCTime) +import Simplex.Messaging.Encoding +import Simplex.Messaging.Parsers (defaultJSON, dropPrefix, enumJSON) +import Simplex.Messaging.Util ((<$?>)) + +data QueueInfo = QueueInfo + { qiSnd :: Bool, + qiNtf :: Bool, + qiSub :: Maybe QSub, + qiSize :: Int, + qiMsg :: Maybe MsgInfo + } + deriving (Eq, Show) + +data QSub = QSub + { qSubThread :: QSubThread, + qDelivered :: Maybe Text + } + deriving (Eq, Show) + +data QSubThread = QNoSub | QSubPending | QSubThread | QProhibitSub + deriving (Eq, Show) + +data MsgInfo = MsgInfo + { msgId :: Text, + msgTs :: UTCTime, + msgType :: MsgType + } + deriving (Eq, Show) + +data MsgType = MTMessage | MTQuota + deriving (Eq, Show) + +$(JQ.deriveJSON (enumJSON $ dropPrefix "Q") ''QSubThread) + +$(JQ.deriveJSON defaultJSON ''QSub) + +$(JQ.deriveJSON (enumJSON $ dropPrefix "MT") ''MsgType) + +$(JQ.deriveJSON defaultJSON ''MsgInfo) + +$(JQ.deriveJSON defaultJSON ''QueueInfo) + +instance Encoding QueueInfo where + smpEncode = LB.toStrict . J.encode + smpP = J.eitherDecodeStrict <$?> A.takeByteString diff --git a/src/Simplex/RemoteControl/Invitation.hs b/src/Simplex/RemoteControl/Invitation.hs index 712c41a9d..d606c4ff0 100644 --- a/src/Simplex/RemoteControl/Invitation.hs +++ b/src/Simplex/RemoteControl/Invitation.hs @@ -123,7 +123,7 @@ instance StrEncoding RCSignedInvitation where idsig <- requiredP sigs "idsig" $ parseAll strP pure RCSignedInvitation {invitation, ssig, idsig} -signInvitation :: C.PrivateKey C.Ed25519 -> C.PrivateKey C.Ed25519 -> RCInvitation -> RCSignedInvitation +signInvitation :: C.PrivateKey 'C.Ed25519 -> C.PrivateKey 'C.Ed25519 -> RCInvitation -> RCSignedInvitation signInvitation sKey idKey invitation = RCSignedInvitation {invitation, ssig, idsig} where uri = strEncode invitation diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 2b21ff3f7..c2badea63 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -57,6 +57,7 @@ import Control.Monad import Control.Monad.Except import Control.Monad.Reader import Data.Bifunctor (first) +import qualified Data.ByteString.Base64 as B64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Either (isRight) @@ -66,6 +67,7 @@ import Data.List.NonEmpty (NonEmpty) import qualified Data.Map as M import Data.Maybe (isJust, isNothing) import qualified Data.Set as S +import Data.Text.Encoding (decodeLatin1) import Data.Time.Clock (diffUTCTime, getCurrentTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Data.Type.Equality (testEquality, (:~:) (Refl)) @@ -92,6 +94,7 @@ import Simplex.Messaging.Protocol (BasicAuth, ErrorType (..), MsgBody, ProtocolS import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server.Env.STM (ServerConfig (..)) import Simplex.Messaging.Server.Expiration +import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.Transport (ATransport (..), SMPVersion, VersionSMP, authCmdsSMPVersion, basicAuthSMPVersion, batchCmdsSMPVersion, currentServerSMPRelayVersion, supportedSMPHandshakes) import Simplex.Messaging.Util (diffToMicroseconds) import Simplex.Messaging.Version (VersionRange (..)) @@ -107,8 +110,11 @@ type AEntityTransmission e = (ACorrId, ConnId, ACommand 'Agent e) -- deriving instance Eq (ValidFileDescription p) +shouldRespond :: (HasCallStack, MonadUnliftIO m, Eq a, Show a) => m a -> a -> m () +a `shouldRespond` r = withFrozenCallStack $ withTimeout a (`shouldBe` r) + (##>) :: (HasCallStack, MonadUnliftIO m) => m (AEntityTransmission e) -> AEntityTransmission e -> m () -a ##> t = withTimeout a (`shouldBe` t) +a ##> t = a `shouldRespond` t (=##>) :: (Show a, HasCallStack, MonadUnliftIO m) => m a -> (HasCallStack => a -> Bool) -> m () a =##> p = @@ -228,7 +234,7 @@ mkVersionRange :: Word16 -> Word16 -> VersionRange v mkVersionRange v1 v2 = V.mkVersionRange (Version v1) (Version v2) runRight_ :: (Eq e, Show e, HasCallStack) => ExceptT e IO () -> Expectation -runRight_ action = runExceptT action `shouldReturn` Right () +runRight_ action = withFrozenCallStack $ runExceptT action `shouldReturn` Right () runRight :: (Show e, HasCallStack) => ExceptT e IO a -> IO a runRight action = @@ -444,6 +450,9 @@ functionalAPITests t = do it "should wait for user network" testWaitForUserNetwork it "should not reset online to offline if happens too quickly" testDoNotResetOnlineToOffline it "should resume multiple threads" testResumeMultipleThreads + describe "SMP queue info" $ do + it "server should respond with queue and subscription information" $ + withSmpServer t testServerQueueInfo testBasicAuth :: ATransport -> Bool -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> IO Int testBasicAuth t allowNewQueues srv@(srvAuth, srvVersion) clnt1 clnt2 = do @@ -2755,6 +2764,91 @@ testResumeMultipleThreads = do where aCfg = agentCfg {userOfflineDelay = 0} +testServerQueueInfo :: IO () +testServerQueueInfo = do + withAgentClients2 $ \alice bob -> runRight_ $ do + (bobId, cReq) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe + liftIO $ threadDelay 200000 + checkEmptyQ alice bobId False + aliceId <- joinConnection bob 1 True cReq "bob's connInfo" SMSubscribe + ("", _, CONF confId _ "bob's connInfo") <- get alice + liftIO $ threadDelay 200000 + checkEmptyQ alice bobId False + allowConnection alice bobId confId "alice's connInfo" + get alice ##> ("", bobId, CON) + get bob ##> ("", aliceId, INFO "alice's connInfo") + get bob ##> ("", aliceId, CON) + liftIO $ threadDelay 200000 + checkEmptyQ alice bobId True + checkEmptyQ bob aliceId True + let msgId = 4 + (msgId', PQEncOn) <- A.sendMessage alice bobId PQEncOn SMP.noMsgFlags "hello" + liftIO $ msgId' `shouldBe` msgId + get alice ##> ("", bobId, SENT msgId) + liftIO $ threadDelay 200000 + Just srvMsgId <- checkMsgQ bob aliceId 1 + get bob =##> \case + ("", c, MSG MsgMeta {integrity = MsgOk, broker = (smId, _), recipient = (mId, _), pqEncryption = PQEncOn} _ "hello") -> + c == aliceId && decodeLatin1 (B64.encode smId) == srvMsgId && mId == msgId + _ -> False + ackMessage bob aliceId msgId Nothing + liftIO $ threadDelay 200000 + checkEmptyQ bob aliceId True + (msgId1, PQEncOn) <- A.sendMessage alice bobId PQEncOn SMP.noMsgFlags "hello 1" + get alice ##> ("", bobId, SENT msgId1) + Just _ <- checkMsgQ bob aliceId 1 + (msgId2, PQEncOn) <- A.sendMessage alice bobId PQEncOn SMP.noMsgFlags "hello 2" + get alice ##> ("", bobId, SENT msgId2) + (msgId3, PQEncOn) <- A.sendMessage alice bobId PQEncOn SMP.noMsgFlags "hello 3" + get alice ##> ("", bobId, SENT msgId3) + (msgId4, PQEncOn) <- A.sendMessage alice bobId PQEncOn SMP.noMsgFlags "hello 4" + get alice ##> ("", bobId, SENT msgId4) + Just _ <- checkMsgQ bob aliceId 4 + (msgId5, PQEncOn) <- A.sendMessage alice bobId PQEncOn SMP.noMsgFlags "hello: quota exceeded" + liftIO $ threadDelay 200000 + Just _ <- checkMsgQ bob aliceId 5 + get bob =##> \case ("", c, Msg' mId PQEncOn "hello 1") -> c == aliceId && mId == msgId1; _ -> False + ackMessage bob aliceId msgId1 Nothing + liftIO $ threadDelay 200000 + Just _ <- checkMsgQ bob aliceId 4 + get bob =##> \case ("", c, Msg' mId PQEncOn "hello 2") -> c == aliceId && mId == msgId2; _ -> False + ackMessage bob aliceId msgId2 Nothing + get bob =##> \case ("", c, Msg' mId PQEncOn "hello 3") -> c == aliceId && mId == msgId3; _ -> False + ackMessage bob aliceId msgId3 Nothing + liftIO $ threadDelay 200000 + Just _ <- checkMsgQ bob aliceId 2 + get bob =##> \case ("", c, Msg' mId PQEncOn "hello 4") -> c == aliceId && mId == msgId4; _ -> False + ackMessage bob aliceId msgId4 Nothing + liftIO $ threadDelay 200000 + Just _ <- checkMsgQ bob aliceId 1 -- the one that did not fit now accepted + get alice ##> ("", bobId, QCONT) + get alice ##> ("", bobId, SENT msgId5) + liftIO $ threadDelay 200000 + Just _srvMsgId <- checkQ bob aliceId True (Just QNoSub) 1 (Just MTMessage) + get bob =##> \case ("", c, Msg' mId PQEncOn "hello: quota exceeded") -> c == aliceId && mId == msgId5 + 1; _ -> False + ackMessage bob aliceId (msgId5 + 1) Nothing + liftIO $ threadDelay 200000 + checkEmptyQ bob aliceId True + pure () + where + checkEmptyQ c cId qiSnd' = do + r <- checkQ c cId qiSnd' (Just QSubThread) 0 Nothing + liftIO $ r `shouldBe` Nothing + checkMsgQ c cId qiSize' = do + r <- checkQ c cId True (Just QNoSub) qiSize' (Just MTMessage) + liftIO $ isJust r `shouldBe` True + pure r + checkQ c cId qiSnd' qiSubThread_ qiSize' msgType_ = do + QueueInfo {qiSnd, qiNtf, qiSub, qiSize, qiMsg} <- getConnectionQueueInfo c cId + liftIO $ do + qiSnd `shouldBe` qiSnd' + qiNtf `shouldBe` False + qSubThread <$> qiSub `shouldBe` qiSubThread_ + qiSize `shouldBe` qiSize' + msgId_ <- forM qiMsg $ \MsgInfo {msgId, msgType} -> msgId <$ (Just msgType `shouldBe` msgType_) + qDelivered <$> qiSub `shouldBe` Just msgId_ + pure msgId_ + noNetworkDelay :: AgentClient -> IO () noNetworkDelay a = do d <- waitNetwork a diff --git a/tests/CLITests.hs b/tests/CLITests.hs index 2193de5e9..1310665ee 100644 --- a/tests/CLITests.hs +++ b/tests/CLITests.hs @@ -77,13 +77,13 @@ smpServerTest storeLog basicAuth = do let certPath = cfgPath "server.crt" oldCrt@X.Certificate {} <- XF.readSignedObject certPath >>= \case - [cert] -> pure . X.signedObject $ X.getSigned cert + [cert'] -> pure . X.signedObject $ X.getSigned cert' _ -> error "bad crt format" r' <- lines <$> capture_ (withArgs ["cert"] $ (100000 `timeout` smpServerCLI cfgPath logPath) `catchAll_` pure (Just ())) r' `shouldContain` ["Generated new server credentials"] newCrt <- XF.readSignedObject certPath >>= \case - [cert] -> pure . X.signedObject $ X.getSigned cert + [cert'] -> pure . X.signedObject $ X.getSigned cert' _ -> error "bad crt format after cert" X.certSignatureAlg oldCrt `shouldBe` X.certSignatureAlg newCrt X.certSubjectDN oldCrt `shouldBe` X.certSubjectDN newCrt