mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-07-02 11:22:07 +00:00
209f7826cb
* smp-server: namespaces resolver scaffolding * smp-server: Names resolver hardening + cleanup * smp-server: fuse parallel dispatchers * smp-server: JSON wire format for NameRecord + Names.hs restructure * smp-server: redact RpcAuth in Show * smp-server: JSON wire fixups + spec rewrite + small cleanups * plan: prepend implementation-diverged banner * move SimplexName into shared module * smp-server: name + contract whitelist on RSLV * smp-server: address audit findings (canonical JSON, INI guards, SSRF, TLD case, shutdown) * smp-server: round 2 audit fixes (label case, response cap, ipv6 link-local) * smp-server: round 3 audit fixes (SSRF coverage, drop noop closeManager, CSV order) * smp-server: round 4 audit fixes (0X-hex host, expanded IPv6 forms, pingEndpoint timeout) * smp-server: hardcode TldRegistries (drop registry_tld_* INI keys) * smp-server: round 6 audit fixes (IPv6 SSRF, redirects, ASCII labels) - Reject IPv6 aliases of 169.254.169.254 (IPv4-compatible / IPv4-mapped / 6to4 / NAT64) via numeric range check on parsed IPv6. - Disable HTTP redirects on the Eth RPC request. - Restrict SimplexName labels to ASCII (Cyrillic/Greek/full-width otherwise hash to different on-chain records and diverge from UTS-46 registrars). - pingEndpoint: only JsonRpcErr means "reachable"; transport/decode failures fail startup. boundedIniInt: readMaybe over partial read. - Add 127.0.0.0/8 and 0.0.0.0 to isLoopback. - Replace hand-rolled hex helpers with Data.ByteArray.Encoding; raise managerConnCount to match rpcMaxConcurrency; hex Show for NameOwner. - Fuse parallel http/https when into unless+case; drop reverse/re-reverse in mkDomain TLDWeb; first AbiInvariantViolated; Nothing <$ decodeAddress; forM_ (eitherToMaybe ...); >>= chain in NameOwner FromJSON. - Drop dead imports/exports/pragmas and two restating comments. - Tests: factor unsafeOwner/unsafeLink, addr1/2/3, testNamesConfig; add non-ASCII label rejection coverage. * namespace: bound parser input to 253 bytes (DoS defense) The bare-name fallback and bareDomain parser would otherwise consume arbitrarily many non-space bytes via takeWhile1 before any validation or length check. A crafted multi-megabyte token would be decoded as UTF-8 and re-parsed in full before being rejected. Introduce `boundedNonSpace` (scan with 253-byte cap) at the two takeWhile1 sites. Inputs longer than 253 bytes leave residue that parseOnly's implicit endOfInput rejects, so the parser fails fast without ever allocating the full input. The bound is the DNS full-domain limit, chosen for being a familiar ceiling generous enough to cover any realistic SimpleX name (longest plausible @user.subdomain.simplex stays well under 100 bytes). No per-label cap — SimpleX names don't go through DNS label resolution and there's no semantic reason to constrain individual labels. * namespace: switch to Python HTTP resolver + agent plumbing (#1796) * namespace: relax resolver_endpoint validation (path prefix, http without auth) validateUrl gains two operator-friendly relaxations and a regression test: - Allow a path prefix (e.g. https://gw.example.com:443/snrc) for a resolver behind a reverse-proxy sub-path; /resolve/<name> and /health are appended (HttpResolver already strips one trailing slash, so root and sub-path behave identically). Query/fragment/userinfo stay rejected. - Off-loopback, reject only http WITH resolver_auth (the Authorization header would travel in cleartext). http without auth is now allowed (no secret to leak; resolver data is public — also lets dev setups reach a host resolver via http://host.docker.internal). https is always allowed, with or without auth. Plain http has no response integrity; intended for trusted/local networks only. Exports validateUrl and adds validateUrlSpec (11 cases) to SMPNamesTests. * namespace: NameRecord links as arrays (multi-link, cap 5) * namespace: distinct RSLV error responses RSLV collapsed every non-hit (no resolver, malformed name, not found, backing-store failure) to ERR AUTH, so a client iterating its configured servers could not tell "this router has no resolver, try the next" from "name not registered, stop", and a transient backend error read as an authoritative miss. Names capability is runtime config, orthogonal to the linear SMP version (a future v21 router without [NAMES] must still advertise v21), so it is signalled by a command-time error like allowSMPProxy, not by the version range: no resolver configured -> ERR CMD PROHIBITED (client skips, tries next) backing-store failure -> ERR INTERNAL (transient: retry/surface) not found / malformed -> ERR AUTH (authoritative "no such name") Update the protocol spec error table and add agent tests for the no-resolver (CMD PROHIBITED) and backend-failure (INTERNAL) paths. * refactor(names): server role + one error type Addresses epoberezkin's review (PR #1784). Name resolution becomes a server role like proxy; the agent owns resolution + server selection; one error type flows through the whole stack. - ServerRoles gains `names`; UserServers gains `nameSrvs` (opt-in list); resolveSimplexName drops the explicit server arg and picks a names-capable server via getNextServer. - RSLV carries SimplexNameDomain (was RslvRequest): no JSON on the wire, contract dropped, name validated at parse (invalid -> CMD SYNTAX). - Version check moves from the encoder to Client.hs (no ERR to server). - ErrorType.NAME {nameErr :: NameErrorType} (+ AgentErrorType.NAME), wire- and JSON-encoded; resolver errors surface with diagnostics. Success response renamed NAME -> RNAME to free the collision. - NameOwner -> EthAddress (record selector); NameRecord derives FromJSON and gains field-ordered Encoding; per-field caps removed. - Remove newEnvWithNames / runSMPServerBlockingWithNames test seams; stub resolver folded into ServerConfig.namesResolverCall_. * test(server): update stats backup line count NameResolverStatsData adds 6 lines to the server stats backup (the "rslvStats:" header plus the reqs/succ/notFound/resolverErrs/disabled fields), so testRestoreMessages' expected stats-backup line count is 95 -> 101. * feat(names): public-namespace resolution via RSLV/RNAME SNRC names resolver role: RSLV command -> HTTP resolver -> RNAME record. Agent owns server selection (ServerRoles.names); NAME error family; async, concurrency-bounded resolution; length-prefixed extensible wire; spec. * remove comments Co-authored-by: Evgeny <evgeny@poberezkin.com> * simplify * move tests name * simplify: text addresses, Tail JSON, drop admitRslv * fix * remove spaghetti * reduce diff * async again, refactor * different threads limit for name resolutions * remove comment * FromField instance for SimplexNameInfo * remove comments * unStrJSON * add sameConnShortLink * remove scheme prefix * remove unused import * remove connecttarget tests * remove comment * comment --------- Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com> Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
492 lines
22 KiB
Haskell
492 lines
22 KiB
Haskell
{-# LANGUAGE CPP #-}
|
|
{-# LANGUAGE DataKinds #-}
|
|
{-# LANGUAGE DuplicateRecordFields #-}
|
|
{-# LANGUAGE FlexibleContexts #-}
|
|
{-# LANGUAGE GADTs #-}
|
|
{-# LANGUAGE LambdaCase #-}
|
|
{-# LANGUAGE NamedFieldPuns #-}
|
|
{-# LANGUAGE NumericUnderscores #-}
|
|
{-# LANGUAGE OverloadedLists #-}
|
|
{-# LANGUAGE OverloadedStrings #-}
|
|
{-# LANGUAGE RankNTypes #-}
|
|
{-# LANGUAGE ScopedTypeVariables #-}
|
|
{-# LANGUAGE TypeApplications #-}
|
|
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
|
|
|
|
module SMPClient where
|
|
|
|
import Control.Monad
|
|
import Control.Monad.Except (runExceptT)
|
|
import Data.ByteString.Char8 (ByteString)
|
|
import Data.List.NonEmpty (NonEmpty)
|
|
import qualified Data.X509 as X
|
|
import qualified Data.X509.Validation as XV
|
|
import Network.Socket
|
|
import qualified Network.TLS as TLS
|
|
import Simplex.Messaging.Agent.Store.Postgres.Options (DBOpts (..))
|
|
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..))
|
|
import Simplex.Messaging.Client (NetworkConfig (..), NetworkTimeout (..), ProtocolClientConfig (..), chooseTransportHost, defaultNetworkConfig)
|
|
import Simplex.Messaging.Client.Agent (SMPClientAgentConfig (..), defaultSMPClientAgentConfig)
|
|
import qualified Simplex.Messaging.Crypto as C
|
|
import Simplex.Messaging.Encoding
|
|
import Simplex.Messaging.Protocol
|
|
import Simplex.Messaging.Server (runSMPServerBlocking)
|
|
import Simplex.Messaging.Server.Env.STM
|
|
import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SMSType (..), SQSType (..))
|
|
import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..))
|
|
import Simplex.Messaging.Transport
|
|
import Simplex.Messaging.Transport.Client
|
|
import Simplex.Messaging.Transport.Server
|
|
import Simplex.Messaging.Transport.Shared (ChainCertificates (..), chainIdCaCerts)
|
|
import Simplex.Messaging.Util (ifM)
|
|
import Simplex.Messaging.Version
|
|
import Simplex.Messaging.Version.Internal
|
|
import System.Info (os)
|
|
import Test.Hspec hiding (fit, it)
|
|
import UnliftIO.Concurrent
|
|
import qualified UnliftIO.Exception as E
|
|
import UnliftIO.STM (TMVar, atomically, newEmptyTMVarIO, putTMVar, takeTMVar)
|
|
import UnliftIO.Timeout (timeout)
|
|
import Util
|
|
|
|
#if defined(dbServerPostgres)
|
|
import Database.PostgreSQL.Simple (defaultConnectInfo)
|
|
#endif
|
|
|
|
#if defined(dbPostgres) || defined(dbServerPostgres)
|
|
import Database.PostgreSQL.Simple (ConnectInfo (..))
|
|
import Simplex.Messaging.Agent.Store.Postgres.Util (createDBAndUserIfNotExists, dropDatabaseAndUser)
|
|
#endif
|
|
|
|
data AServerConfig =
|
|
forall qs ms. (SupportedStore qs ms, MsgStoreClass (MsgStoreType qs ms)) =>
|
|
ASrvCfg (SQSType qs) (SMSType ms) (ServerConfig (MsgStoreType qs ms))
|
|
|
|
data AServerStoreCfg =
|
|
forall qs ms. (SupportedStore qs ms, MsgStoreClass (MsgStoreType qs ms)) =>
|
|
ASSCfg (SQSType qs) (SMSType ms) (ServerStoreCfg (MsgStoreType qs ms))
|
|
|
|
testHost :: NonEmpty TransportHost
|
|
testHost = "localhost"
|
|
|
|
testHost2 :: NonEmpty TransportHost
|
|
testHost2 = "127.0.0.1"
|
|
|
|
testPort :: ServiceName
|
|
testPort = "5001"
|
|
|
|
testPort2 :: ServiceName
|
|
testPort2 = "5002"
|
|
|
|
ntfTestPort :: ServiceName
|
|
ntfTestPort = "6001"
|
|
|
|
ntfTestPort2 :: ServiceName
|
|
ntfTestPort2 = "6002"
|
|
|
|
testKeyHash :: C.KeyHash
|
|
testKeyHash = "LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI="
|
|
|
|
testStoreLogFile :: FilePath
|
|
testStoreLogFile = "tests/tmp/smp-server-store.log"
|
|
|
|
testStoreLogFile2 :: FilePath
|
|
testStoreLogFile2 = "tests/tmp/smp-server-store.log.2"
|
|
|
|
testStoreDBOpts :: DBOpts
|
|
testStoreDBOpts =
|
|
DBOpts
|
|
{ connstr = testServerDBConnstr,
|
|
schema = "smp_server",
|
|
poolSize = 10,
|
|
createSchema = True
|
|
}
|
|
|
|
testStoreDBOpts2 :: DBOpts
|
|
testStoreDBOpts2 = testStoreDBOpts {schema = "smp_server2"}
|
|
|
|
testServerDBConnstr :: ByteString
|
|
testServerDBConnstr = "postgresql://test_server_user@/test_server_db"
|
|
|
|
#if defined(dbServerPostgres)
|
|
testServerDBConnectInfo :: ConnectInfo
|
|
testServerDBConnectInfo =
|
|
defaultConnectInfo {
|
|
connectUser = "test_server_user",
|
|
connectDatabase = "test_server_db"
|
|
}
|
|
#endif
|
|
|
|
testStoreMsgsFile :: FilePath
|
|
testStoreMsgsFile = "tests/tmp/smp-server-messages.log"
|
|
|
|
testStoreMsgsFile2 :: FilePath
|
|
testStoreMsgsFile2 = "tests/tmp/smp-server-messages.log.2"
|
|
|
|
testStoreMsgsDir :: FilePath
|
|
testStoreMsgsDir = "tests/tmp/messages"
|
|
|
|
testStoreMsgsDir2 :: FilePath
|
|
testStoreMsgsDir2 = "tests/tmp/messages.2"
|
|
|
|
testStoreNtfsFile :: FilePath
|
|
testStoreNtfsFile = "tests/tmp/smp-server-ntfs.log"
|
|
|
|
testStoreNtfsFile2 :: FilePath
|
|
testStoreNtfsFile2 = "tests/tmp/smp-server-ntfs.log.2"
|
|
|
|
testPrometheusMetricsFile :: FilePath
|
|
testPrometheusMetricsFile = "tests/tmp/smp-server-metrics.txt"
|
|
|
|
testServerStatsBackupFile :: FilePath
|
|
testServerStatsBackupFile = "tests/tmp/smp-server-stats.log"
|
|
|
|
xit' :: (HasCallStack, Example a) => String -> a -> SpecWith (Arg a)
|
|
xit' d = if os == "linux" then skip "skipped on Linux" . it d else it d
|
|
|
|
xit'' :: (HasCallStack, Example a) => String -> a -> SpecWith (Arg a)
|
|
xit'' d = skipOnCI . it d
|
|
|
|
skipOnCI :: SpecWith a -> SpecWith a
|
|
skipOnCI t = ifM (runIO envCI) (skip "skipped on CI" t) t
|
|
|
|
testSMPClient :: Transport c => (THandleSMP c 'TClient -> IO a) -> IO a
|
|
testSMPClient = testSMPClientVR supportedClientSMPRelayVRange
|
|
|
|
testSMPClientVR :: Transport c => VersionRangeSMP -> (THandleSMP c 'TClient -> IO a) -> IO a
|
|
testSMPClientVR vr client = do
|
|
Right useHost <- pure $ chooseTransportHost defaultNetworkConfig testHost
|
|
testSMPClient_ useHost testPort vr Nothing client
|
|
|
|
testSMPServiceClient :: Transport c => (TLS.Credential, C.KeyPairEd25519) -> (THandleSMP c 'TClient -> IO a) -> IO a
|
|
testSMPServiceClient serviceCreds client = do
|
|
Right useHost <- pure $ chooseTransportHost defaultNetworkConfig testHost
|
|
testSMPClient_ useHost testPort supportedClientSMPRelayVRange (Just serviceCreds) client
|
|
|
|
testSMPClient_ :: Transport c => TransportHost -> ServiceName -> VersionRangeSMP -> Maybe (TLS.Credential, C.KeyPairEd25519) -> (THandleSMP c 'TClient -> IO a) -> IO a
|
|
testSMPClient_ host port vr serviceCreds_ client = do
|
|
serviceAndKeys_ <- forM serviceCreds_ $ \(serviceCreds@(cc, pk), keys) -> do
|
|
Right serviceSignKey <- pure $ C.x509ToPrivate' pk
|
|
let idCert' = case chainIdCaCerts cc of
|
|
CCSelf cert -> cert
|
|
CCValid {idCert} -> idCert
|
|
_ -> error "bad certificate"
|
|
serviceCertHash = XV.getFingerprint idCert' X.HashSHA256
|
|
pure (ServiceCredentials {serviceRole = SRMessaging, serviceCreds, serviceCertHash, serviceSignKey}, keys)
|
|
let tcConfig = defaultTransportClientConfig {clientALPN, clientCredentials = fst <$> serviceCreds_} :: TransportClientConfig
|
|
runTransportClient tcConfig Nothing host port (Just testKeyHash) $ \h ->
|
|
runExceptT (smpClientHandshake h Nothing testKeyHash vr False serviceAndKeys_) >>= \case
|
|
Right th -> client th
|
|
Left e -> error $ show e
|
|
where
|
|
clientALPN
|
|
| authCmdsSMPVersion `isCompatible` vr = Just alpnSupportedSMPHandshakes
|
|
| otherwise = Nothing
|
|
|
|
runSMPClient :: Transport c => TProxy c 'TServer -> (THandleSMP c 'TClient -> IO a) -> IO a
|
|
runSMPClient _ test' = testSMPClient test'
|
|
|
|
runSMPServiceClient :: Transport c => TProxy c 'TServer -> (TLS.Credential, C.KeyPairEd25519) -> (THandleSMP c 'TClient -> IO a) -> IO a
|
|
runSMPServiceClient _ serviceCreds test' = testSMPServiceClient serviceCreds test'
|
|
|
|
testNtfServiceClient :: Transport c => TProxy c 'TServer -> C.KeyPairEd25519 -> (THandleSMP c 'TClient -> IO a) -> IO a
|
|
testNtfServiceClient _ keys client = do
|
|
tlsNtfServerCreds <- loadServerCredential ntfTestServerCredentials
|
|
serviceCertHash <- loadFingerprint ntfTestServerCredentials
|
|
Right serviceSignKey <- pure $ C.x509ToPrivate' $ snd tlsNtfServerCreds
|
|
let service = ServiceCredentials {serviceRole = SRNotifier, serviceCreds = tlsNtfServerCreds, serviceCertHash, serviceSignKey}
|
|
tcConfig =
|
|
defaultTransportClientConfig
|
|
{ clientCredentials = Just tlsNtfServerCreds,
|
|
clientALPN = Just alpnSupportedSMPHandshakes
|
|
}
|
|
runTransportClient tcConfig Nothing "localhost" testPort (Just testKeyHash) $ \h ->
|
|
runExceptT (smpClientHandshake h Nothing testKeyHash supportedClientSMPRelayVRange False $ Just (service, keys)) >>= \case
|
|
Right th -> client th
|
|
Left e -> error $ show e
|
|
|
|
ntfTestServerCredentials :: ServerCredentials
|
|
ntfTestServerCredentials =
|
|
ServerCredentials
|
|
{ caCertificateFile = Just "tests/fixtures/ca.crt",
|
|
privateKeyFile = "tests/fixtures/server.key",
|
|
certificateFile = "tests/fixtures/server.crt"
|
|
}
|
|
|
|
cfg :: AServerConfig
|
|
cfg = cfgMS (ASType SQSMemory SMSJournal)
|
|
|
|
cfgJ2 :: AServerConfig
|
|
cfgJ2 = journalCfg cfg testStoreLogFile2 testStoreMsgsDir2
|
|
|
|
cfgJ2QS :: SQSType s -> AServerConfig
|
|
cfgJ2QS = \case
|
|
SQSMemory -> journalCfg (cfgMS $ ASType SQSMemory SMSJournal) testStoreLogFile2 testStoreMsgsDir2
|
|
SQSPostgres -> journalCfgDB (cfgMS $ ASType SQSPostgres SMSJournal) testStoreDBOpts2 testStoreMsgsDir2
|
|
|
|
journalCfg :: AServerConfig -> FilePath -> FilePath -> AServerConfig
|
|
journalCfg (ASrvCfg _ _ cfg') storeLogFile storeMsgsPath =
|
|
ASrvCfg SQSMemory SMSJournal cfg' {serverStoreCfg = SSCMemoryJournal {storeLogFile, storeMsgsPath}}
|
|
|
|
journalCfgDB :: AServerConfig -> DBOpts -> FilePath -> AServerConfig
|
|
journalCfgDB (ASrvCfg _ _ cfg') dbOpts storeMsgsPath' =
|
|
let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCYesUp, deletedTTL = 86400}
|
|
in ASrvCfg SQSPostgres SMSJournal cfg' {serverStoreCfg = SSCDatabaseJournal {storeCfg, storeMsgsPath'}}
|
|
|
|
cfgMS :: AStoreType -> AServerConfig
|
|
cfgMS msType = withStoreCfg (testServerStoreConfig msType) $ \serverStoreCfg ->
|
|
ServerConfig
|
|
{ transports = [],
|
|
smpHandshakeTimeout = 60000000,
|
|
tbqSize = 4,
|
|
msgQueueQuota = 4,
|
|
maxJournalMsgCount = 5,
|
|
maxJournalStateLines = 2,
|
|
queueIdBytes = 24,
|
|
msgIdBytes = 24,
|
|
serverStoreCfg,
|
|
storeNtfsFile = Nothing,
|
|
allowNewQueues = True,
|
|
newQueueBasicAuth = Nothing,
|
|
controlPortUserAuth = Nothing,
|
|
controlPortAdminAuth = Nothing,
|
|
dailyBlockQueueQuota = 20,
|
|
messageExpiration = Just defaultMessageExpiration,
|
|
expireMessagesOnStart = True,
|
|
expireMessagesOnSend = False,
|
|
idleQueueInterval = defaultIdleQueueInterval,
|
|
notificationExpiration = defaultNtfExpiration,
|
|
inactiveClientExpiration = Just defaultInactiveClientExpiration,
|
|
logStatsInterval = Nothing,
|
|
logStatsStartTime = 0,
|
|
serverStatsLogFile = "tests/tmp/smp-server-stats.daily.log",
|
|
serverStatsBackupFile = Nothing,
|
|
prometheusInterval = Nothing,
|
|
prometheusMetricsFile = testPrometheusMetricsFile,
|
|
pendingENDInterval = 500000,
|
|
ntfDeliveryInterval = 200000,
|
|
smpCredentials =
|
|
ServerCredentials
|
|
{ caCertificateFile = Just "tests/fixtures/ca.crt",
|
|
privateKeyFile = "tests/fixtures/server.key",
|
|
certificateFile = "tests/fixtures/server.crt"
|
|
},
|
|
httpCredentials = Nothing,
|
|
smpServerVRange = supportedServerSMPRelayVRange,
|
|
transportConfig = mkTransportServerConfig True (Just alpnSupportedSMPHandshakes) True,
|
|
controlPort = Nothing,
|
|
smpAgentCfg = defaultSMPClientAgentConfig {persistErrorInterval = 1}, -- seconds
|
|
allowSMPProxy = False,
|
|
serverClientConcurrency = 2,
|
|
serverResolverConcurrency = defaultNameResolverConcurrency,
|
|
namesConfig = Nothing,
|
|
information = Nothing,
|
|
startOptions = defaultStartOptions
|
|
}
|
|
|
|
withStoreCfg :: AServerStoreCfg -> (forall s. ServerStoreCfg s -> ServerConfig s) -> AServerConfig
|
|
withStoreCfg (ASSCfg qt mt storeCfg) f = ASrvCfg qt mt (f storeCfg)
|
|
|
|
defaultStartOptions :: StartOptions
|
|
defaultStartOptions = StartOptions {maintenance = False, compactLog = False, logLevel = testLogLevel, skipWarnings = False, confirmMigrations = MCYesUp}
|
|
|
|
testServerStoreConfig :: AStoreType -> AServerStoreCfg
|
|
testServerStoreConfig = serverStoreConfig_ False
|
|
|
|
serverStoreConfig_ :: Bool -> AStoreType -> AServerStoreCfg
|
|
serverStoreConfig_ useDbStoreLog = \case
|
|
ASType SQSMemory SMSMemory ->
|
|
ASSCfg SQSMemory SMSMemory $ SSCMemory $ Just StorePaths {storeLogFile = testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile}
|
|
ASType SQSMemory SMSJournal ->
|
|
ASSCfg SQSMemory SMSJournal $ SSCMemoryJournal {storeLogFile = testStoreLogFile, storeMsgsPath = testStoreMsgsDir}
|
|
ASType SQSPostgres SMSJournal ->
|
|
ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeCfg, storeMsgsPath' = testStoreMsgsDir}
|
|
#if defined(dbServerPostgres)
|
|
ASType SQSPostgres SMSPostgres ->
|
|
ASSCfg SQSPostgres SMSPostgres $ SSCDatabase storeCfg
|
|
#endif
|
|
where
|
|
dbStoreLogPath = if useDbStoreLog then Just testStoreLogFile else Nothing
|
|
storeCfg = PostgresStoreCfg {dbOpts = testStoreDBOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = 86400}
|
|
|
|
cfgV7 :: AServerConfig
|
|
cfgV7 = updateCfg cfg $ \cfg' -> cfg' {smpServerVRange = mkVersionRange minServerSMPRelayVersion authCmdsSMPVersion}
|
|
|
|
cfgVPrev :: AStoreType -> AServerConfig
|
|
cfgVPrev msType = updateCfg (cfgMS msType) $ \cfg' -> cfg' {smpServerVRange = prevRange $ smpServerVRange cfg'}
|
|
|
|
prevRange :: VersionRange v -> VersionRange v
|
|
prevRange vr = vr {maxVersion = max (minVersion vr) (prevVersion $ maxVersion vr)}
|
|
|
|
prevVersion :: Version v -> Version v
|
|
prevVersion (Version v) = Version (v - 1)
|
|
|
|
proxyCfg :: AServerConfig
|
|
proxyCfg = proxyCfgMS (ASType SQSMemory SMSJournal)
|
|
|
|
proxyCfgMS :: AStoreType -> AServerConfig
|
|
proxyCfgMS msType =
|
|
updateCfg (cfgMS msType) $ \cfg' ->
|
|
let smpAgentCfg' = smpAgentCfg cfg'
|
|
in cfg'
|
|
{ allowSMPProxy = True,
|
|
smpAgentCfg = smpAgentCfg' {smpCfg = (smpCfg smpAgentCfg') {agreeSecret = True, proxyServer = True, serverVRange = supportedProxyClientSMPRelayVRange}}
|
|
}
|
|
|
|
proxyCfgJ2 :: AServerConfig
|
|
proxyCfgJ2 = journalCfg proxyCfg testStoreLogFile2 testStoreMsgsDir2
|
|
|
|
proxyCfgJ2QS :: SQSType qs -> AServerConfig
|
|
proxyCfgJ2QS = \case
|
|
SQSMemory -> journalCfg (proxyCfgMS $ ASType SQSMemory SMSJournal) testStoreLogFile2 testStoreMsgsDir2
|
|
SQSPostgres -> journalCfgDB (proxyCfgMS $ ASType SQSPostgres SMSJournal) testStoreDBOpts2 testStoreMsgsDir2
|
|
|
|
-- Proxy config with a short relay-connection timeout, to bound how long a failing
|
|
-- proxy->relay connection attempt blocks in the relay reconnection tests.
|
|
proxyCfgShortTimeout :: AServerConfig
|
|
proxyCfgShortTimeout =
|
|
updateCfg proxyCfg $ \cfg' ->
|
|
let aCfg = smpAgentCfg cfg'
|
|
cCfg = smpCfg aCfg
|
|
nt = NetworkTimeout {backgroundTimeout = 4_000000, interactiveTimeout = 4_000000}
|
|
in cfg' {smpAgentCfg = aCfg {smpCfg = cCfg {networkConfig = (networkConfig cCfg) {tcpConnectTimeout = nt}}}}
|
|
|
|
proxyVRangeV8 :: VersionRangeSMP
|
|
proxyVRangeV8 = mkVersionRange minServerSMPRelayVersion sendingProxySMPVersion
|
|
|
|
withSmpServerStoreMsgLogOn :: HasCallStack => (ASrvTransport, AStoreType) -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
|
|
withSmpServerStoreMsgLogOn (t, msType) =
|
|
withSmpServerConfigOn t $ updateCfg (cfgMS msType) $ \cfg' -> cfg' {storeNtfsFile = Just testStoreNtfsFile, serverStatsBackupFile = Just testServerStatsBackupFile}
|
|
|
|
withSmpServerStoreLogOn :: HasCallStack => (ASrvTransport, AStoreType) -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
|
|
withSmpServerStoreLogOn (t, msType) =
|
|
withSmpServerConfigOn t $ updateCfg (cfgMS msType) $ \cfg' -> cfg' {serverStatsBackupFile = Just testServerStatsBackupFile}
|
|
|
|
updateCfg :: AServerConfig -> (forall s. ServerConfig s -> ServerConfig s) -> AServerConfig
|
|
updateCfg (ASrvCfg qt mt cfg') f = ASrvCfg qt mt (f cfg')
|
|
|
|
withServerCfg :: AServerConfig -> (forall s. ServerConfig s -> a) -> a
|
|
withServerCfg (ASrvCfg _ _ cfg') f = f cfg'
|
|
|
|
withSmpServerConfigOn :: HasCallStack => ASrvTransport -> AServerConfig -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
|
|
withSmpServerConfigOn t (ASrvCfg _ _ cfg') port' =
|
|
serverBracket
|
|
(\started -> runSMPServerBlocking started cfg' {transports = [(port', t, False)]} Nothing)
|
|
(threadDelay 10000)
|
|
|
|
withSmpServerThreadOn :: HasCallStack => (ASrvTransport, AStoreType) -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
|
|
withSmpServerThreadOn (t, msType) = withSmpServerConfigOn t (cfgMS msType)
|
|
|
|
serverBracket :: HasCallStack => (TMVar Bool -> IO ()) -> IO () -> (HasCallStack => ThreadId -> IO a) -> IO a
|
|
serverBracket process afterProcess f = do
|
|
started <- newEmptyTMVarIO
|
|
E.bracket
|
|
(forkIOWithUnmask (\unmask -> unmask (process started) `E.catchAny` handleStartError started))
|
|
(\t -> killThread t >> afterProcess >> waitFor started "stop")
|
|
(\t -> waitFor started "start" >> f t >>= \r -> r <$ threadDelay 100000)
|
|
where
|
|
-- it putTMVar is called twise to unlock both parts of the bracket in case of start failure
|
|
handleStartError started e = do
|
|
atomically $ putTMVar started False
|
|
atomically $ putTMVar started False
|
|
E.throwIO e
|
|
waitFor started s =
|
|
5_000_000 `timeout` atomically (takeTMVar started) >>= \case
|
|
Nothing -> error $ "server did not " <> s
|
|
_ -> pure ()
|
|
|
|
-- A TCP server that accepts connections but never performs a TLS handshake, so a client
|
|
-- connecting to it stays blocked in the TLS handshake until its connection timeout.
|
|
withStallingServerOn :: HasCallStack => ServiceName -> IO a -> IO a
|
|
withStallingServerOn port action =
|
|
serverBracket
|
|
(\started -> runLocalTCPServer started port (\_ -> threadDelay maxBound))
|
|
(pure ())
|
|
(const action)
|
|
|
|
withSmpServerOn :: HasCallStack => (ASrvTransport, AStoreType) -> ServiceName -> IO a -> IO a
|
|
withSmpServerOn ps port' = withSmpServerThreadOn ps port' . const
|
|
|
|
withSmpServer :: HasCallStack => (ASrvTransport, AStoreType) -> IO a -> IO a
|
|
withSmpServer ps = withSmpServerOn ps testPort
|
|
|
|
withSmpServerProxy :: HasCallStack => (ASrvTransport, AStoreType) -> IO a -> IO a
|
|
withSmpServerProxy (t, msType) = withSmpServerConfigOn t (proxyCfgMS msType) testPort . const
|
|
|
|
withSmpServers2 :: HasCallStack => (ASrvTransport, AStoreType) -> IO a -> IO a
|
|
withSmpServers2 ps@(t, ASType qs _ms) = withSmpServer ps . withSmpServerConfigOn t (cfgJ2QS qs) testPort2 . const
|
|
|
|
withSmpServersProxy2 :: HasCallStack => (ASrvTransport, AStoreType) -> IO a -> IO a
|
|
withSmpServersProxy2 ps@(t, ASType qs _ms) = withSmpServerProxy ps . withSmpServerConfigOn t (proxyCfgJ2QS qs) testPort2 . const
|
|
|
|
runSmpTest :: forall c a. (HasCallStack, Transport c) => AStoreType -> (HasCallStack => THandleSMP c 'TClient -> IO a) -> IO a
|
|
runSmpTest msType test = withSmpServerConfigOn (transport @c) (cfgMS msType) testPort $ \_ -> testSMPClient test
|
|
|
|
runSmpTestN :: forall c a. (HasCallStack, Transport c) => AStoreType -> Int -> (HasCallStack => [THandleSMP c 'TClient] -> IO a) -> IO a
|
|
runSmpTestN msType = runSmpTestNCfg (cfgMS msType) supportedClientSMPRelayVRange
|
|
|
|
runSmpTestNCfg :: forall c a. (HasCallStack, Transport c) => AServerConfig -> VersionRangeSMP -> Int -> (HasCallStack => [THandleSMP c 'TClient] -> IO a) -> IO a
|
|
runSmpTestNCfg srvCfg clntVR nClients test = withSmpServerConfigOn (transport @c) srvCfg testPort $ \_ -> run nClients []
|
|
where
|
|
run :: Int -> [THandleSMP c 'TClient] -> IO a
|
|
run 0 hs = test hs
|
|
run n hs = testSMPClientVR clntVR $ \h -> run (n - 1) (h : hs)
|
|
|
|
smpServerTest ::
|
|
forall c smp.
|
|
(Transport c, Encoding smp) =>
|
|
TProxy c 'TServer ->
|
|
(Maybe TAuthorizations, ByteString, ByteString, smp) ->
|
|
IO (Maybe TAuthorizations, ByteString, ByteString, BrokerMsg)
|
|
smpServerTest _ t = runSmpTest (ASType SQSMemory SMSJournal) $ \h -> tPut' h t >> tGet' h
|
|
where
|
|
tPut' :: THandleSMP c 'TClient -> (Maybe TAuthorizations, ByteString, ByteString, smp) -> IO ()
|
|
tPut' h@THandle {params = THandleParams {sessionId, implySessId}} (sig, corrId, queueId, smp) = do
|
|
let t' = if implySessId then smpEncode (corrId, queueId, smp) else smpEncode (sessionId, corrId, queueId, smp)
|
|
[Right ()] <- tPut h [Right (sig, t')]
|
|
pure ()
|
|
tGet' h = do
|
|
[(CorrId corrId, EntityId qId, Right cmd)] <- tGetClient h
|
|
pure (Nothing, corrId, qId, cmd)
|
|
|
|
smpTest :: (HasCallStack, Transport c) => TProxy c 'TServer -> AStoreType -> (HasCallStack => THandleSMP c 'TClient -> IO ()) -> Expectation
|
|
smpTest _ msType test' = runSmpTest msType test' `shouldReturn` ()
|
|
|
|
smpTestN :: (HasCallStack, Transport c) => AStoreType -> Int -> (HasCallStack => [THandleSMP c 'TClient] -> IO ()) -> Expectation
|
|
smpTestN msType n test' = runSmpTestN msType n test' `shouldReturn` ()
|
|
|
|
smpTest2 :: forall c. (HasCallStack, Transport c) => TProxy c 'TServer -> AStoreType -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation
|
|
smpTest2 t msType = smpTest2Cfg (cfgMS msType) supportedClientSMPRelayVRange t
|
|
|
|
smpTest2Cfg :: forall c. (HasCallStack, Transport c) => AServerConfig -> VersionRangeSMP -> TProxy c 'TServer -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation
|
|
smpTest2Cfg srvCfg clntVR _ test' = runSmpTestNCfg srvCfg clntVR 2 _test `shouldReturn` ()
|
|
where
|
|
_test :: HasCallStack => [THandleSMP c 'TClient] -> IO ()
|
|
_test [h1, h2] = test' h1 h2
|
|
_test _ = error "expected 2 handles"
|
|
|
|
smpTest3 :: forall c. (HasCallStack, Transport c) => TProxy c 'TServer -> AStoreType -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation
|
|
smpTest3 _ msType test' = smpTestN msType 3 _test
|
|
where
|
|
_test :: HasCallStack => [THandleSMP c 'TClient] -> IO ()
|
|
_test [h1, h2, h3] = test' h1 h2 h3
|
|
_test _ = error "expected 3 handles"
|
|
|
|
smpTest4 :: forall c. (HasCallStack, Transport c) => TProxy c 'TServer -> AStoreType -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation
|
|
smpTest4 _ msType test' = smpTestN msType 4 _test
|
|
where
|
|
_test :: HasCallStack => [THandleSMP c 'TClient] -> IO ()
|
|
_test [h1, h2, h3, h4] = test' h1 h2 h3 h4
|
|
_test _ = error "expected 4 handles"
|
|
|
|
unexpected :: (HasCallStack, Show a) => a -> Expectation
|
|
unexpected r = expectationFailure $ "unexpected response " <> show r
|
|
|
|
#if defined(dbPostgres) || defined(dbServerPostgres)
|
|
postgressBracket :: ConnectInfo -> IO a -> IO a
|
|
postgressBracket connInfo =
|
|
E.bracket_
|
|
(dropDatabaseAndUser connInfo >> createDBAndUserIfNotExists connInfo)
|
|
(dropDatabaseAndUser connInfo)
|
|
#endif
|