mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 20:45:52 +00:00
tests: pass AStoreType to agent test as a parameter (#1479)
* tests: run agent tests with PostgreSQL SMP servers * agent tests with postgres database * enable tests * fix store log tests * fix test
This commit is contained in:
@@ -113,7 +113,7 @@ doesSchemaExist db schema = do
|
||||
closeDBStore :: DBStore -> IO ()
|
||||
closeDBStore DBStore {dbPool, dbPoolSize, dbClosed} =
|
||||
ifM (readTVarIO dbClosed) (putStrLn "closeDBStore: already closed") $ uninterruptibleMask_ $ do
|
||||
replicateM_ dbPoolSize $ atomically $ readTBQueue dbPool
|
||||
replicateM_ dbPoolSize $ atomically (readTBQueue dbPool) >>= DB.close
|
||||
atomically $ writeTVar dbClosed True
|
||||
|
||||
reopenDBStore :: DBStore -> IO ()
|
||||
|
||||
@@ -40,7 +40,6 @@ import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Functor (($>))
|
||||
import Data.List (sort, stripPrefix)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (mapMaybe)
|
||||
import qualified Data.Text as T
|
||||
import Data.Time.Clock (UTCTime, addUTCTime, getCurrentTime, nominalDay)
|
||||
|
||||
@@ -14,6 +14,7 @@ import AgentTests.FunctionalAPITests (functionalAPITests)
|
||||
import AgentTests.MigrationTests (migrationTests)
|
||||
import AgentTests.NotificationTests (notificationTests)
|
||||
import AgentTests.ServerChoice (serverChoiceTests)
|
||||
import Simplex.Messaging.Server.Env.STM (AStoreType (..))
|
||||
import Simplex.Messaging.Transport (ATransport (..))
|
||||
import Test.Hspec
|
||||
#if defined(dbPostgres)
|
||||
@@ -23,8 +24,8 @@ import Simplex.Messaging.Agent.Store.Postgres.Util (dropAllSchemasExceptSystem)
|
||||
import AgentTests.SQLiteTests (storeTests)
|
||||
#endif
|
||||
|
||||
agentTests :: ATransport -> Spec
|
||||
agentTests (ATransport t) = do
|
||||
agentTests :: (ATransport, AStoreType) -> Spec
|
||||
agentTests ps = do
|
||||
describe "Migration tests" migrationTests
|
||||
describe "Connection request" connectionRequestTests
|
||||
describe "Double ratchet tests" doubleRatchetTests
|
||||
@@ -33,9 +34,9 @@ agentTests (ATransport t) = do
|
||||
#else
|
||||
do
|
||||
#endif
|
||||
describe "Functional API" $ functionalAPITests (ATransport t)
|
||||
describe "Functional API" $ functionalAPITests ps
|
||||
describe "Chosen servers" serverChoiceTests
|
||||
describe "Notification tests" $ notificationTests (ATransport t)
|
||||
describe "Notification tests" $ notificationTests ps
|
||||
#if !defined(dbPostgres)
|
||||
describe "SQLite store" storeTests
|
||||
#endif
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -13,6 +13,7 @@ import Simplex.Messaging.Agent.Store.Shared
|
||||
import System.Random (randomIO)
|
||||
import Test.Hspec
|
||||
#if defined(dbPostgres)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Database.PostgreSQL.Simple (fromOnly)
|
||||
import Fixtures
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Util (dropSchema)
|
||||
@@ -206,7 +207,9 @@ createStore randSuffix migrations confirmMigrations = do
|
||||
let dbOpts =
|
||||
DBOpts {
|
||||
connstr = testDBConnstr,
|
||||
schema = testSchema randSuffix
|
||||
schema = B.pack $ testSchema randSuffix,
|
||||
poolSize = 1,
|
||||
createSchema = True
|
||||
}
|
||||
createDBStore dbOpts migrations confirmMigrations
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ import Data.Text.Encoding (encodeUtf8)
|
||||
import qualified Data.Text.IO as TIO
|
||||
import NtfClient
|
||||
import SMPAgentClient (agentCfg, initAgentServers, initAgentServers2, testDB, testDB2, testNtfServer, testNtfServer2)
|
||||
import SMPClient (cfg, cfgJ2, cfgVPrev, testPort, testPort2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn, xit'')
|
||||
import SMPClient (cfgMS, cfgJ2QS, cfgVPrev, serverStoreConfig, testPort, testPort2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn, xit'')
|
||||
import Simplex.Messaging.Agent hiding (createConnection, joinConnection, sendMessage)
|
||||
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..), withStore')
|
||||
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig, Env (..), InitialAgentServers)
|
||||
@@ -77,7 +77,7 @@ import Simplex.Messaging.Notifications.Types (NtfTknAction (..), NtfToken (..))
|
||||
import Simplex.Messaging.Parsers (parseAll)
|
||||
import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), NtfServer, ProtocolServer (..), SMPMsgMeta (..), SubscriptionMode (..))
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Server.Env.STM (ServerConfig (..))
|
||||
import Simplex.Messaging.Server.Env.STM (AStoreType (..), ServerConfig (..))
|
||||
import Simplex.Messaging.Transport (ATransport)
|
||||
import Test.Hspec
|
||||
import UnliftIO
|
||||
@@ -87,8 +87,8 @@ import Database.PostgreSQL.Simple.SqlQQ (sql)
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
#endif
|
||||
|
||||
notificationTests :: ATransport -> Spec
|
||||
notificationTests t = do
|
||||
notificationTests :: (ATransport, AStoreType) -> Spec
|
||||
notificationTests ps@(t, _) = do
|
||||
describe "Managing notification tokens" $ do
|
||||
it "should register and verify notification token" $
|
||||
withAPNSMockServer $ \apns ->
|
||||
@@ -133,61 +133,65 @@ notificationTests t = do
|
||||
testRunNTFServerTests t srv1 `shouldReturn` Just (ProtocolTestFailure TSConnect $ BROKER (B.unpack $ strEncode srv1) NETWORK)
|
||||
describe "Managing notification subscriptions" $ do
|
||||
describe "should create notification subscription for existing connection" $
|
||||
testNtfMatrix t testNotificationSubscriptionExistingConnection
|
||||
testNtfMatrix ps testNotificationSubscriptionExistingConnection
|
||||
describe "should create notification subscription for new connection" $
|
||||
testNtfMatrix t testNotificationSubscriptionNewConnection
|
||||
testNtfMatrix ps testNotificationSubscriptionNewConnection
|
||||
it "should change notifications mode" $
|
||||
withSmpServer t $
|
||||
withSmpServer ps $
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServer t $ testChangeNotificationsMode apns
|
||||
it "should change token" $
|
||||
withSmpServer t $
|
||||
withSmpServer ps $
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServer t $ testChangeToken apns
|
||||
describe "Notifications server store log" $
|
||||
it "should save and restore tokens and subscriptions" $
|
||||
withAPNSMockServer $ \apns ->
|
||||
testNotificationsStoreLog t apns
|
||||
testNotificationsStoreLog ps apns
|
||||
describe "Notifications after SMP server restart" $
|
||||
it "should resume subscriptions after SMP server is restarted" $
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServer t $ testNotificationsSMPRestart t apns
|
||||
withNtfServer t $ testNotificationsSMPRestart ps apns
|
||||
describe "Notifications after SMP server restart" $
|
||||
it "should resume batched subscriptions after SMP server is restarted" $
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServer t $ testNotificationsSMPRestartBatch 100 t apns
|
||||
withNtfServer t $ testNotificationsSMPRestartBatch 100 ps apns
|
||||
describe "should switch notifications to the new queue" $
|
||||
testServerMatrix2 t $ \servers ->
|
||||
testServerMatrix2 ps $ \servers ->
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServer t $ testSwitchNotifications servers apns
|
||||
it "should keep sending notifications for old token" $
|
||||
withSmpServer t $
|
||||
withSmpServer ps $
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServerOn t ntfTestPort $
|
||||
testNotificationsOldToken apns
|
||||
it "should update server from new token" $
|
||||
withSmpServer t $
|
||||
withSmpServer ps $
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServerOn t ntfTestPort2 . withNtfServerThreadOn t ntfTestPort $ \ntf ->
|
||||
testNotificationsNewToken apns ntf
|
||||
|
||||
testNtfMatrix :: HasCallStack => ATransport -> (APNSMockServer -> AgentMsgId -> AgentClient -> AgentClient -> IO ()) -> Spec
|
||||
testNtfMatrix t runTest = do
|
||||
testNtfMatrix :: HasCallStack => (ATransport, AStoreType) -> (APNSMockServer -> AgentMsgId -> AgentClient -> AgentClient -> IO ()) -> Spec
|
||||
testNtfMatrix ps@(_, msType) runTest = do
|
||||
describe "next and current" $ do
|
||||
it "curr servers; curr clients" $ runNtfTestCfg t 1 cfg ntfServerCfg agentCfg agentCfg runTest
|
||||
it "curr servers; prev clients" $ runNtfTestCfg t 3 cfg ntfServerCfg agentCfgVPrevPQ agentCfgVPrevPQ runTest
|
||||
it "prev servers; prev clients" $ runNtfTestCfg t 3 cfgVPrev ntfServerCfgVPrev agentCfgVPrevPQ agentCfgVPrevPQ runTest
|
||||
it "prev servers; curr clients" $ runNtfTestCfg t 1 cfgVPrev ntfServerCfgVPrev agentCfg agentCfg runTest
|
||||
it "curr servers; curr clients" $ runNtfTestCfg ps 1 cfg' ntfServerCfg agentCfg agentCfg runTest
|
||||
it "curr servers; prev clients" $ runNtfTestCfg ps 3 cfg' ntfServerCfg agentCfgVPrevPQ agentCfgVPrevPQ runTest
|
||||
it "prev servers; prev clients" $ runNtfTestCfg ps 3 cfgVPrev' ntfServerCfgVPrev agentCfgVPrevPQ agentCfgVPrevPQ runTest
|
||||
it "prev servers; curr clients" $ runNtfTestCfg ps 1 cfgVPrev' ntfServerCfgVPrev agentCfg agentCfg runTest
|
||||
-- servers can be upgraded in any order
|
||||
it "servers: curr SMP, prev NTF; prev clients" $ runNtfTestCfg t 3 cfg ntfServerCfgVPrev agentCfgVPrevPQ agentCfgVPrevPQ runTest
|
||||
it "servers: prev SMP, curr NTF; prev clients" $ runNtfTestCfg t 3 cfgVPrev ntfServerCfg agentCfgVPrevPQ agentCfgVPrevPQ runTest
|
||||
it "servers: curr SMP, prev NTF; prev clients" $ runNtfTestCfg ps 3 cfg' ntfServerCfgVPrev agentCfgVPrevPQ agentCfgVPrevPQ runTest
|
||||
it "servers: prev SMP, curr NTF; prev clients" $ runNtfTestCfg ps 3 cfgVPrev' ntfServerCfg agentCfgVPrevPQ agentCfgVPrevPQ runTest
|
||||
-- one of two clients can be upgraded
|
||||
it "servers: curr SMP, curr NTF; clients: curr/prev" $ runNtfTestCfg t 3 cfg ntfServerCfg agentCfg agentCfgVPrevPQ runTest
|
||||
it "servers: curr SMP, curr NTF; clients: prev/curr" $ runNtfTestCfg t 3 cfg ntfServerCfg agentCfgVPrevPQ agentCfg runTest
|
||||
it "servers: curr SMP, curr NTF; clients: curr/prev" $ runNtfTestCfg ps 3 cfg' ntfServerCfg agentCfg agentCfgVPrevPQ runTest
|
||||
it "servers: curr SMP, curr NTF; clients: prev/curr" $ runNtfTestCfg ps 3 cfg' ntfServerCfg agentCfgVPrevPQ agentCfg runTest
|
||||
where
|
||||
cfg' = cfgMS msType
|
||||
cfgVPrev' = cfgVPrev msType
|
||||
|
||||
runNtfTestCfg :: HasCallStack => ATransport -> AgentMsgId -> ServerConfig -> NtfServerConfig -> AgentConfig -> AgentConfig -> (APNSMockServer -> AgentMsgId -> AgentClient -> AgentClient -> IO ()) -> IO ()
|
||||
runNtfTestCfg t baseId smpCfg ntfCfg aCfg bCfg runTest = do
|
||||
withSmpServerConfigOn t smpCfg testPort $ \_ ->
|
||||
runNtfTestCfg :: HasCallStack => (ATransport, AStoreType) -> AgentMsgId -> ServerConfig -> NtfServerConfig -> AgentConfig -> AgentConfig -> (APNSMockServer -> AgentMsgId -> AgentClient -> AgentClient -> IO ()) -> IO ()
|
||||
runNtfTestCfg (t, msType) baseId smpCfg ntfCfg aCfg bCfg runTest = do
|
||||
let smpCfg' = smpCfg {serverStoreCfg = serverStoreConfig msType}
|
||||
withSmpServerConfigOn t smpCfg' testPort $ \_ ->
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServerCfg ntfCfg {transports = [(ntfTestPort, t, False)]} $ \_ ->
|
||||
withAgentClientsCfg2 aCfg bCfg $ runTest apns baseId
|
||||
@@ -746,9 +750,9 @@ testChangeToken apns = withAgent 1 agentCfg initAgentServers testDB2 $ \bob -> d
|
||||
baseId = 1
|
||||
msgId = subtract baseId
|
||||
|
||||
testNotificationsStoreLog :: ATransport -> APNSMockServer -> IO ()
|
||||
testNotificationsStoreLog t apns = withAgentClients2 $ \alice bob -> do
|
||||
withSmpServerStoreMsgLogOn t testPort $ \_ -> do
|
||||
testNotificationsStoreLog :: (ATransport, AStoreType) -> APNSMockServer -> IO ()
|
||||
testNotificationsStoreLog ps@(t, _) apns = withAgentClients2 $ \alice bob -> do
|
||||
withSmpServerStoreMsgLogOn ps testPort $ \_ -> do
|
||||
(aliceId, bobId) <- withNtfServerStoreLog t $ \threadId -> runRight $ do
|
||||
(aliceId, bobId) <- makeConnection alice bob
|
||||
_ <- registerTestToken alice "abcd" NMInstant apns
|
||||
@@ -779,13 +783,13 @@ testNotificationsStoreLog t apns = withAgentClients2 $ \alice bob -> do
|
||||
ackMessage alice bobId 4 Nothing
|
||||
noNotifications apns
|
||||
|
||||
withSmpServerStoreMsgLogOn t testPort $ \_ ->
|
||||
withSmpServerStoreMsgLogOn ps testPort $ \_ ->
|
||||
withNtfServerStoreLog t $ \_ -> runRight_ $ do
|
||||
void $ messageNotificationData alice apns
|
||||
|
||||
testNotificationsSMPRestart :: ATransport -> APNSMockServer -> IO ()
|
||||
testNotificationsSMPRestart t apns = withAgentClients2 $ \alice bob -> do
|
||||
(aliceId, bobId) <- withSmpServerStoreLogOn t testPort $ \threadId -> runRight $ do
|
||||
testNotificationsSMPRestart :: (ATransport, AStoreType) -> APNSMockServer -> IO ()
|
||||
testNotificationsSMPRestart ps apns = withAgentClients2 $ \alice bob -> do
|
||||
(aliceId, bobId) <- withSmpServerStoreLogOn ps testPort $ \threadId -> runRight $ do
|
||||
(aliceId, bobId) <- makeConnection alice bob
|
||||
_ <- registerTestToken alice "abcd" NMInstant apns
|
||||
liftIO $ threadDelay 250000
|
||||
@@ -801,7 +805,7 @@ testNotificationsSMPRestart t apns = withAgentClients2 $ \alice bob -> do
|
||||
nGet alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False
|
||||
nGet bob =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False
|
||||
|
||||
withSmpServerStoreLogOn t testPort $ \threadId -> runRight_ $ do
|
||||
withSmpServerStoreLogOn ps testPort $ \threadId -> runRight_ $ do
|
||||
nGet alice =##> \case ("", "", UP _ [c]) -> c == bobId; _ -> False
|
||||
nGet bob =##> \case ("", "", UP _ [c]) -> c == aliceId; _ -> False
|
||||
liftIO $ threadDelay 1000000
|
||||
@@ -811,8 +815,8 @@ testNotificationsSMPRestart t apns = withAgentClients2 $ \alice bob -> do
|
||||
get alice =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False
|
||||
liftIO $ killThread threadId
|
||||
|
||||
testNotificationsSMPRestartBatch :: Int -> ATransport -> APNSMockServer -> IO ()
|
||||
testNotificationsSMPRestartBatch n t apns =
|
||||
testNotificationsSMPRestartBatch :: Int -> (ATransport, AStoreType) -> APNSMockServer -> IO ()
|
||||
testNotificationsSMPRestartBatch n ps@(t, ASType qsType _) apns =
|
||||
withAgentClientsCfgServers2 agentCfg agentCfg initAgentServers2 $ \a b -> do
|
||||
threadDelay 1000000
|
||||
conns <- runServers $ do
|
||||
@@ -851,8 +855,8 @@ testNotificationsSMPRestartBatch n t apns =
|
||||
where
|
||||
runServers :: ExceptT AgentErrorType IO a -> IO a
|
||||
runServers a = do
|
||||
withSmpServerStoreLogOn t testPort $ \t1 -> do
|
||||
res <- withSmpServerConfigOn t cfgJ2 testPort2 $ \t2 ->
|
||||
withSmpServerStoreLogOn ps testPort $ \t1 -> do
|
||||
res <- withSmpServerConfigOn t (cfgJ2QS qsType) testPort2 $ \t2 ->
|
||||
runRight a `finally` killThread t2
|
||||
killThread t1
|
||||
pure res
|
||||
|
||||
@@ -146,23 +146,24 @@ testSMPClient_ host port vr client = do
|
||||
cfg :: ServerConfig
|
||||
cfg = cfgMS (ASType SQSMemory SMSJournal)
|
||||
|
||||
-- TODO [postgres]
|
||||
-- cfg :: ServerConfig
|
||||
-- cfg = cfgMS (ASType SQSPostgres SMSJournal)
|
||||
cfgDB :: ServerConfig
|
||||
cfgDB = cfgMS (ASType SQSPostgres SMSJournal)
|
||||
|
||||
cfgJ2 :: ServerConfig
|
||||
cfgJ2 = journalCfg cfg testStoreLogFile2 testStoreMsgsDir2
|
||||
|
||||
-- TODO [postgres]
|
||||
-- cfgJ2 :: ServerConfig
|
||||
-- cfgJ2 = journalCfg cfg testStoreDBOpts2 testStoreMsgsDir2
|
||||
cfgJ2QS :: SQSType s -> ServerConfig
|
||||
cfgJ2QS = \case
|
||||
SQSMemory -> journalCfg (cfgMS $ ASType SQSMemory SMSJournal) testStoreLogFile2 testStoreMsgsDir2
|
||||
SQSPostgres -> journalCfgDB (cfgMS $ ASType SQSPostgres SMSJournal) testStoreDBOpts2 testStoreMsgsDir2
|
||||
|
||||
journalCfg :: ServerConfig -> FilePath -> FilePath -> ServerConfig
|
||||
journalCfg cfg' storeLogFile storeMsgsPath = cfg' {serverStoreCfg = ASSCfg SQSMemory SMSJournal SSCMemoryJournal {storeLogFile, storeMsgsPath}}
|
||||
|
||||
-- TODO [postgres]
|
||||
-- journalCfg :: ServerConfig -> DBOpts -> FilePath -> ServerConfig
|
||||
-- journalCfg cfg' storeDBOpts storeMsgsPath' = cfg' {serverStoreCfg = ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeDBOpts, storeMsgsPath'}}
|
||||
journalCfgDB :: ServerConfig -> DBOpts -> FilePath -> ServerConfig
|
||||
journalCfgDB cfg' dbOpts storeMsgsPath' =
|
||||
let storeCfg = PostgresStoreCfg {dbOpts, confirmMigrations = MCYesUp, deletedTTL = 86400}
|
||||
in cfg' {serverStoreCfg = ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeCfg, storeMsgsPath'}}
|
||||
|
||||
cfgMS :: AStoreType -> ServerConfig
|
||||
cfgMS msType =
|
||||
@@ -175,14 +176,7 @@ cfgMS msType =
|
||||
maxJournalStateLines = 2,
|
||||
queueIdBytes = 24,
|
||||
msgIdBytes = 24,
|
||||
serverStoreCfg = case msType of
|
||||
ASType SQSMemory SMSMemory ->
|
||||
ASSCfg SQSMemory SMSMemory $ SSCMemory $ Just StorePaths {storeLogFile = testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile}
|
||||
ASType SQSMemory SMSJournal ->
|
||||
ASSCfg SQSMemory SMSJournal $ SSCMemoryJournal {storeLogFile = testStoreLogFile, storeMsgsPath = testStoreMsgsDir}
|
||||
ASType SQSPostgres SMSJournal ->
|
||||
let storeCfg = PostgresStoreCfg {dbOpts = testStoreDBOpts, confirmMigrations = MCYesUp, deletedTTL = 86400}
|
||||
in ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeCfg, storeMsgsPath' = testStoreMsgsDir},
|
||||
serverStoreCfg = serverStoreConfig msType,
|
||||
storeNtfsFile = Nothing,
|
||||
allowNewQueues = True,
|
||||
newQueueBasicAuth = Nothing,
|
||||
@@ -218,14 +212,24 @@ cfgMS msType =
|
||||
startOptions = StartOptions {maintenance = False, skipWarnings = False, confirmMigrations = MCYesUp}
|
||||
}
|
||||
|
||||
serverStoreConfig :: AStoreType -> AServerStoreCfg
|
||||
serverStoreConfig = \case
|
||||
ASType SQSMemory SMSMemory ->
|
||||
ASSCfg SQSMemory SMSMemory $ SSCMemory $ Just StorePaths {storeLogFile = testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile}
|
||||
ASType SQSMemory SMSJournal ->
|
||||
ASSCfg SQSMemory SMSJournal $ SSCMemoryJournal {storeLogFile = testStoreLogFile, storeMsgsPath = testStoreMsgsDir}
|
||||
ASType SQSPostgres SMSJournal ->
|
||||
let storeCfg = PostgresStoreCfg {dbOpts = testStoreDBOpts, confirmMigrations = MCYesUp, deletedTTL = 86400}
|
||||
in ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeCfg, storeMsgsPath' = testStoreMsgsDir}
|
||||
|
||||
cfgV7 :: ServerConfig
|
||||
cfgV7 = cfg {smpServerVRange = mkVersionRange minServerSMPRelayVersion authCmdsSMPVersion}
|
||||
|
||||
cfgV8 :: ServerConfig
|
||||
cfgV8 = cfg {smpServerVRange = mkVersionRange minServerSMPRelayVersion sendingProxySMPVersion}
|
||||
cfgV8 :: AStoreType -> ServerConfig
|
||||
cfgV8 msType = (cfgMS msType) {smpServerVRange = mkVersionRange minServerSMPRelayVersion sendingProxySMPVersion}
|
||||
|
||||
cfgVPrev :: ServerConfig
|
||||
cfgVPrev = cfg {smpServerVRange = prevRange $ smpServerVRange cfg}
|
||||
cfgVPrev :: AStoreType -> ServerConfig
|
||||
cfgVPrev msType = (cfgMS msType) {smpServerVRange = prevRange $ smpServerVRange cfg}
|
||||
|
||||
prevRange :: VersionRange v -> VersionRange v
|
||||
prevRange vr = vr {maxVersion = max (minVersion vr) (prevVersion $ maxVersion vr)}
|
||||
@@ -234,8 +238,11 @@ prevVersion :: Version v -> Version v
|
||||
prevVersion (Version v) = Version (v - 1)
|
||||
|
||||
proxyCfg :: ServerConfig
|
||||
proxyCfg =
|
||||
cfg
|
||||
proxyCfg = proxyCfgMS (ASType SQSMemory SMSJournal)
|
||||
|
||||
proxyCfgMS :: AStoreType -> ServerConfig
|
||||
proxyCfgMS msType =
|
||||
(cfgMS msType)
|
||||
{ allowSMPProxy = True,
|
||||
smpAgentCfg = smpAgentCfg' {smpCfg = (smpCfg smpAgentCfg') {agreeSecret = True, proxyServer = True, serverVRange = supportedProxyClientSMPRelayVRange}}
|
||||
}
|
||||
@@ -252,18 +259,12 @@ proxyCfgJ2 = journalCfg proxyCfg testStoreLogFile2 testStoreMsgsDir2
|
||||
proxyVRangeV8 :: VersionRangeSMP
|
||||
proxyVRangeV8 = mkVersionRange minServerSMPRelayVersion sendingProxySMPVersion
|
||||
|
||||
withSmpServerStoreMsgLogOn :: HasCallStack => ATransport -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
|
||||
withSmpServerStoreMsgLogOn = (`withSmpServerStoreMsgLogOnMS` ASType SQSMemory SMSJournal)
|
||||
|
||||
withSmpServerStoreMsgLogOnMS :: HasCallStack => ATransport -> AStoreType -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
|
||||
withSmpServerStoreMsgLogOnMS t msType =
|
||||
withSmpServerStoreMsgLogOn :: HasCallStack => (ATransport, AStoreType) -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
|
||||
withSmpServerStoreMsgLogOn (t, msType) =
|
||||
withSmpServerConfigOn t (cfgMS msType) {storeNtfsFile = Just testStoreNtfsFile, serverStatsBackupFile = Just testServerStatsBackupFile}
|
||||
|
||||
withSmpServerStoreLogOn :: HasCallStack => ATransport -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
|
||||
withSmpServerStoreLogOn = (`withSmpServerStoreLogOnMS` ASType SQSMemory SMSJournal)
|
||||
|
||||
withSmpServerStoreLogOnMS :: HasCallStack => ATransport -> AStoreType -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
|
||||
withSmpServerStoreLogOnMS t msType = withSmpServerConfigOn t (cfgMS msType) {serverStatsBackupFile = Just testServerStatsBackupFile}
|
||||
withSmpServerStoreLogOn :: HasCallStack => (ATransport, AStoreType) -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
|
||||
withSmpServerStoreLogOn (t, msType) = withSmpServerConfigOn t (cfgMS msType) {serverStatsBackupFile = Just testServerStatsBackupFile}
|
||||
|
||||
withSmpServerConfigOn :: HasCallStack => ATransport -> ServerConfig -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
|
||||
withSmpServerConfigOn t cfg' port' =
|
||||
@@ -271,8 +272,8 @@ withSmpServerConfigOn t cfg' port' =
|
||||
(\started -> runSMPServerBlocking started cfg' {transports = [(port', t, False)]} Nothing)
|
||||
(threadDelay 10000)
|
||||
|
||||
withSmpServerThreadOn :: HasCallStack => ATransport -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
|
||||
withSmpServerThreadOn t = withSmpServerConfigOn t cfg
|
||||
withSmpServerThreadOn :: HasCallStack => (ATransport, AStoreType) -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
|
||||
withSmpServerThreadOn (t, msType) = withSmpServerConfigOn t (cfgMS msType)
|
||||
|
||||
serverBracket :: HasCallStack => (TMVar Bool -> IO ()) -> IO () -> (HasCallStack => ThreadId -> IO a) -> IO a
|
||||
serverBracket process afterProcess f = do
|
||||
@@ -292,14 +293,14 @@ serverBracket process afterProcess f = do
|
||||
Nothing -> error $ "server did not " <> s
|
||||
_ -> pure ()
|
||||
|
||||
withSmpServerOn :: HasCallStack => ATransport -> ServiceName -> IO a -> IO a
|
||||
withSmpServerOn t port' = withSmpServerThreadOn t port' . const
|
||||
withSmpServerOn :: HasCallStack => (ATransport, AStoreType) -> ServiceName -> IO a -> IO a
|
||||
withSmpServerOn ps port' = withSmpServerThreadOn ps port' . const
|
||||
|
||||
withSmpServer :: HasCallStack => ATransport -> IO a -> IO a
|
||||
withSmpServer t = withSmpServerOn t testPort
|
||||
withSmpServer :: HasCallStack => (ATransport, AStoreType) -> IO a -> IO a
|
||||
withSmpServer ps = withSmpServerOn ps testPort
|
||||
|
||||
withSmpServerProxy :: HasCallStack => ATransport -> IO a -> IO a
|
||||
withSmpServerProxy t = withSmpServerConfigOn t proxyCfg testPort . const
|
||||
withSmpServerProxy :: HasCallStack => (ATransport, AStoreType) -> IO a -> IO a
|
||||
withSmpServerProxy (t, msType) = withSmpServerConfigOn t (proxyCfgMS msType) testPort . const
|
||||
|
||||
runSmpTest :: forall c a. (HasCallStack, Transport c) => AStoreType -> (HasCallStack => THandleSMP c 'TClient -> IO a) -> IO a
|
||||
runSmpTest msType test = withSmpServerConfigOn (transport @c) (cfgMS msType) testPort $ \_ -> testSMPClient test
|
||||
|
||||
@@ -38,7 +38,8 @@ import Simplex.Messaging.Crypto.Ratchet (pattern PQSupportOn)
|
||||
import qualified Simplex.Messaging.Crypto.Ratchet as CR
|
||||
import Simplex.Messaging.Protocol (EncRcvMsgBody (..), MsgBody, RcvMessage (..), SubscriptionMode (..), maxMessageLength, noMsgFlags, pattern NoEntity)
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Server.Env.STM (ServerConfig (..))
|
||||
import Simplex.Messaging.Server.Env.STM (AStoreType (..), ServerConfig (..))
|
||||
import Simplex.Messaging.Server.MsgStore.Types (SQSType (..))
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Util (bshow, tshow)
|
||||
import Simplex.Messaging.Version (mkVersionRange)
|
||||
@@ -52,7 +53,7 @@ import Fixtures
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Util (dropAllSchemasExceptSystem)
|
||||
#endif
|
||||
|
||||
smpProxyTests :: Spec
|
||||
smpProxyTests :: SpecWith AStoreType
|
||||
smpProxyTests = do
|
||||
describe "server configuration" $ do
|
||||
it "refuses proxy handshake unless enabled" testNoProxy
|
||||
@@ -117,8 +118,9 @@ smpProxyTests = do
|
||||
it "without proxy" . oneServer $
|
||||
agentDeliverMessageViaProxy ([srv1], SPMNever, False) ([srv1], SPMNever, False) C.SEd448 "hello 1" "hello 2" 1
|
||||
describe "two servers" $ do
|
||||
it "always via proxy" . twoServers $
|
||||
agentDeliverMessageViaProxy ([srv1], SPMAlways, True) ([srv2], SPMAlways, True) C.SEd448 "hello 1" "hello 2" 1
|
||||
it "always via proxy" $ \msType -> twoServers
|
||||
(agentDeliverMessageViaProxy ([srv1], SPMAlways, True) ([srv2], SPMAlways, True) C.SEd448 "hello 1" "hello 2" 1)
|
||||
msType
|
||||
it "both via proxy" . twoServers $
|
||||
agentDeliverMessageViaProxy ([srv1], SPMUnknown, True) ([srv2], SPMUnknown, True) C.SEd448 "hello 1" "hello 2" 1
|
||||
it "first via proxy" . twoServers $
|
||||
@@ -131,9 +133,9 @@ smpProxyTests = do
|
||||
agentDeliverMessageViaProxy ([srv1], SPMUnknown, False) ([srv2], SPMUnknown, False) C.SEd448 "hello 1" "hello 2" 3
|
||||
it "fails when fallback is prohibited" . twoServers_ proxyCfg cfgV7 $
|
||||
agentViaProxyVersionError
|
||||
it "retries sending when destination or proxy relay is offline" $
|
||||
it "retries sending when destination or proxy relay is offline" $ \_ ->
|
||||
agentViaProxyRetryOffline
|
||||
it "retries sending when destination relay session disconnects in proxy" $
|
||||
it "retries sending when destination relay session disconnects in proxy" $ \_ ->
|
||||
agentViaProxyRetryNoSession
|
||||
describe "stress test 1k" $ do
|
||||
let deliver nAgents nMsgs = agentDeliverMessagesViaProxyConc (replicate nAgents [srv1]) (map bshow [1 :: Int .. nMsgs])
|
||||
@@ -144,16 +146,17 @@ smpProxyTests = do
|
||||
let deliver nAgents nMsgs = agentDeliverMessagesViaProxyConc (replicate nAgents [srv1]) (map bshow [1 :: Int .. nMsgs])
|
||||
it "25 agents, 300 pairs, 17 messages" . oneServer . withNumCapabilities 4 $ deliver 25 17
|
||||
where
|
||||
oneServer = withSmpServerConfigOn (transport @TLS) proxyCfg {msgQueueQuota = 128, maxJournalMsgCount = 256} testPort . const
|
||||
twoServers = twoServers_ proxyCfg proxyCfg
|
||||
twoServersFirstProxy = twoServers_ proxyCfg cfgV8 {msgQueueQuota = 128, maxJournalMsgCount = 256}
|
||||
twoServersMoreConc = twoServers_ proxyCfg {serverClientConcurrency = 128} cfgV8 {msgQueueQuota = 128, maxJournalMsgCount = 256}
|
||||
twoServersNoConc = twoServers_ proxyCfg {serverClientConcurrency = 1} cfgV8 {msgQueueQuota = 128, maxJournalMsgCount = 256}
|
||||
twoServers_ cfg1 cfg2 runTest =
|
||||
oneServer test msType = withSmpServerConfigOn (transport @TLS) (proxyCfgMS msType) {msgQueueQuota = 128, maxJournalMsgCount = 256} testPort $ const test
|
||||
twoServers test msType = twoServers_ (proxyCfgMS msType) (proxyCfgMS msType) test msType
|
||||
twoServersFirstProxy test msType = twoServers_ (proxyCfgMS msType) (cfgV8 msType) {msgQueueQuota = 128, maxJournalMsgCount = 256} test msType
|
||||
twoServersMoreConc test msType = twoServers_ (proxyCfgMS msType) {serverClientConcurrency = 128} (cfgV8 msType) {msgQueueQuota = 128, maxJournalMsgCount = 256} test msType
|
||||
twoServersNoConc test msType = twoServers_ (proxyCfgMS msType) {serverClientConcurrency = 1} (cfgV8 msType) {msgQueueQuota = 128, maxJournalMsgCount = 256} test msType
|
||||
twoServers_ :: ServerConfig -> ServerConfig -> IO () -> AStoreType -> IO ()
|
||||
twoServers_ cfg1 cfg2 runTest (ASType qsType _) =
|
||||
withSmpServerConfigOn (transport @TLS) cfg1 testPort $ \_ ->
|
||||
let cfg2' = journalCfg cfg2 testStoreLogFile2 testStoreMsgsDir2
|
||||
-- TODO [postgres]
|
||||
-- let cfg2' = journalCfg cfg2 testStoreDBOpts2 testStoreMsgsDir2
|
||||
let cfg2' = case qsType of
|
||||
SQSMemory -> journalCfg cfg2 testStoreLogFile2 testStoreMsgsDir2
|
||||
SQSPostgres -> journalCfgDB cfg2 testStoreDBOpts2 testStoreMsgsDir2
|
||||
in withSmpServerConfigOn (transport @TLS) cfg2' testPort2 $ const runTest
|
||||
|
||||
deliverMessageViaProxy :: (C.AlgorithmI a, C.AuthAlgorithm a) => SMPServer -> SMPServer -> C.SAlgorithm a -> ByteString -> ByteString -> IO ()
|
||||
@@ -427,25 +430,24 @@ agentViaProxyRetryNoSession = do
|
||||
withServer2 = withSmpServerConfigOn (transport @TLS) proxyCfgJ2 testPort2
|
||||
servers srv = (initAgentServersProxy SPMAlways SPFProhibit) {smp = userServers [srv]}
|
||||
|
||||
testNoProxy :: IO ()
|
||||
testNoProxy = do
|
||||
withSmpServerConfigOn (transport @TLS) cfg testPort2 $ \_ -> do
|
||||
testNoProxy :: AStoreType -> IO ()
|
||||
testNoProxy msType = do
|
||||
withSmpServerConfigOn (transport @TLS) (cfgMS msType) testPort2 $ \_ -> do
|
||||
testSMPClient_ "127.0.0.1" testPort2 proxyVRangeV8 $ \(th :: THandleSMP TLS 'TClient) -> do
|
||||
(_, _, (_corrId, _entityId, reply)) <- sendRecv th (Nothing, "0", NoEntity, SMP.PRXY testSMPServer Nothing)
|
||||
reply `shouldBe` Right (SMP.ERR $ SMP.PROXY SMP.BASIC_AUTH)
|
||||
|
||||
testProxyAuth :: IO ()
|
||||
testProxyAuth = do
|
||||
testProxyAuth :: AStoreType -> IO ()
|
||||
testProxyAuth msType = do
|
||||
withSmpServerConfigOn (transport @TLS) proxyCfgAuth testPort $ \_ -> do
|
||||
testSMPClient_ "127.0.0.1" testPort proxyVRangeV8 $ \(th :: THandleSMP TLS 'TClient) -> do
|
||||
(_, _s, (_corrId, _entityId, reply)) <- sendRecv th (Nothing, "0", NoEntity, SMP.PRXY testSMPServer2 $ Just "wrong")
|
||||
reply `shouldBe` Right (SMP.ERR $ SMP.PROXY SMP.BASIC_AUTH)
|
||||
where
|
||||
proxyCfgAuth = proxyCfg {newQueueBasicAuth = Just "correct"}
|
||||
proxyCfgAuth = (proxyCfgMS msType) {newQueueBasicAuth = Just "correct"}
|
||||
|
||||
todo :: IO ()
|
||||
todo = do
|
||||
fail "TODO"
|
||||
todo :: AStoreType -> IO ()
|
||||
todo _ = fail "TODO"
|
||||
|
||||
runExceptT' :: Exception e => ExceptT e IO a -> IO a
|
||||
runExceptT' a = runExceptT a >>= either throwIO pure
|
||||
|
||||
@@ -17,7 +17,7 @@ module ServerTests where
|
||||
|
||||
import Control.Concurrent (ThreadId, killThread, threadDelay)
|
||||
import Control.Concurrent.STM
|
||||
import Control.Exception (SomeException, try)
|
||||
import Control.Exception (SomeException, try, throwIO)
|
||||
import Control.Monad
|
||||
import Control.Monad.IO.Class
|
||||
import CoreTests.MsgStoreTests (testJournalStoreCfg)
|
||||
@@ -70,8 +70,8 @@ serverTests = do
|
||||
describe "GET & SUB commands" testGetSubCommands
|
||||
describe "Exceeding queue quota" testExceedQueueQuota
|
||||
describe "Store log" testWithStoreLog
|
||||
xdescribe "Restore messages" testRestoreMessages -- TODO [postgres]
|
||||
xdescribe "Restore messages (old / v2)" testRestoreExpireMessages -- TODO [postgres]
|
||||
describe "Restore messages" testRestoreMessages
|
||||
describe "Restore messages (old / v2)" testRestoreExpireMessages
|
||||
describe "Save prometheus metrics" testPrometheusMetrics
|
||||
describe "Timing of AUTH error" testTiming
|
||||
describe "Message notifications" testMessageNotifications
|
||||
@@ -564,7 +564,7 @@ testExceedQueueQuota =
|
||||
|
||||
testWithStoreLog :: SpecWith (ATransport, AStoreType)
|
||||
testWithStoreLog =
|
||||
xit "should store simplex queues to log and restore them after server restart" $ \(at@(ATransport t), msType) -> do
|
||||
it "should store simplex queues to log and restore them after server restart" $ \ps@(at@(ATransport t), _) -> do
|
||||
g <- C.newRandom
|
||||
(sPub1, sKey1) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
|
||||
(sPub2, sKey2) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
|
||||
@@ -576,7 +576,7 @@ testWithStoreLog =
|
||||
senderId2 <- newTVarIO NoEntity
|
||||
notifierId <- newTVarIO NoEntity
|
||||
|
||||
withSmpServerStoreLogOnMS at msType testPort . runTest t $ \h -> runClient t $ \h1 -> do
|
||||
withSmpServerStoreLogOn ps testPort . runTest t $ \h -> runClient t $ \h1 -> do
|
||||
(sId1, rId1, rKey1, dhShared) <- createAndSecureQueue h sPub1
|
||||
(rcvNtfPubDhKey, _) <- atomically $ C.generateKeyPair g
|
||||
Resp "abcd" _ (NID nId _) <- signSendRecv h rKey1 ("abcd", rId1, NKEY nPub rcvNtfPubDhKey)
|
||||
@@ -607,7 +607,7 @@ testWithStoreLog =
|
||||
Resp "dabc" _ OK <- signSendRecv h rKey2 ("dabc", rId2, DEL)
|
||||
pure ()
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 6
|
||||
when (usesStoreLog ps) $ logSize testStoreLogFile `shouldReturn` 6
|
||||
|
||||
let cfg' = cfg {serverStoreCfg = ASSCfg SQSMemory SMSMemory $ SSCMemory Nothing}
|
||||
withSmpServerConfigOn at cfg' testPort . runTest t $ \h -> do
|
||||
@@ -616,7 +616,7 @@ testWithStoreLog =
|
||||
Resp "bcda" _ (ERR AUTH) <- signSendRecv h sKey1 ("bcda", sId1, _SEND "hello")
|
||||
pure ()
|
||||
|
||||
withSmpServerStoreLogOnMS at msType testPort . runTest t $ \h -> runClient t $ \h1 -> do
|
||||
withSmpServerStoreLogOn ps testPort . runTest t $ \h -> runClient t $ \h1 -> do
|
||||
-- this queue is restored
|
||||
rId1 <- readTVarIO recipientId1
|
||||
Just rKey1 <- readTVarIO recipientKey1
|
||||
@@ -633,8 +633,9 @@ testWithStoreLog =
|
||||
Resp "cdab" _ (ERR AUTH) <- signSendRecv h sKey2 ("cdab", sId2, _SEND "hello too")
|
||||
pure ()
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
removeFile testStoreLogFile
|
||||
when (usesStoreLog ps) $ do
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
removeFile testStoreLogFile
|
||||
where
|
||||
runTest :: Transport c => TProxy c -> (THandleSMP c 'TClient -> IO ()) -> ThreadId -> Expectation
|
||||
runTest _ test' server = do
|
||||
@@ -644,15 +645,24 @@ testWithStoreLog =
|
||||
runClient :: Transport c => TProxy c -> (THandleSMP c 'TClient -> IO ()) -> Expectation
|
||||
runClient _ test' = testSMPClient test' `shouldReturn` ()
|
||||
|
||||
usesStoreLog :: (ATransport, AStoreType) -> Bool
|
||||
usesStoreLog (_, ASType qsType _) = case qsType of
|
||||
SQSMemory -> True
|
||||
SQSPostgres -> False
|
||||
|
||||
logSize :: FilePath -> IO Int
|
||||
logSize f =
|
||||
try (length . B.lines <$> B.readFile f) >>= \case
|
||||
Right l -> pure l
|
||||
Left (_ :: SomeException) -> logSize f
|
||||
logSize f = go (10 :: Int)
|
||||
where
|
||||
go n =
|
||||
try (length . B.lines <$> B.readFile f) >>= \case
|
||||
Right l -> pure l
|
||||
Left (e :: SomeException)
|
||||
| n > 0 -> threadDelay 100000 >> go (n - 1)
|
||||
| otherwise -> throwIO e
|
||||
|
||||
testRestoreMessages :: SpecWith (ATransport, AStoreType)
|
||||
testRestoreMessages =
|
||||
it "should store messages on exit and restore on start" $ \(at@(ATransport t), msType) -> do
|
||||
it "should store messages on exit and restore on start" $ \ps@(ATransport t, _) -> do
|
||||
removeFileIfExists testStoreLogFile
|
||||
removeFileIfExists testStoreMsgsFile
|
||||
whenM (doesDirectoryExist testStoreMsgsDir) $ removeDirectoryRecursive testStoreMsgsDir
|
||||
@@ -664,8 +674,7 @@ testRestoreMessages =
|
||||
recipientKey <- newTVarIO Nothing
|
||||
dhShared <- newTVarIO Nothing
|
||||
senderId <- newTVarIO NoEntity
|
||||
|
||||
withSmpServerStoreMsgLogOnMS at msType testPort . runTest t $ \h -> do
|
||||
withSmpServerStoreMsgLogOn ps testPort . runTest t $ \h -> do
|
||||
runClient t $ \h1 -> do
|
||||
(sId, rId, rKey, dh) <- createAndSecureQueue h1 sPub
|
||||
atomically $ do
|
||||
@@ -685,16 +694,12 @@ testRestoreMessages =
|
||||
Resp "5" _ OK <- signSendRecv h sKey ("5", sId, _SEND "hello 5")
|
||||
Resp "6" _ (ERR QUOTA) <- signSendRecv h sKey ("6", sId, _SEND "hello 6")
|
||||
pure ()
|
||||
|
||||
rId <- readTVarIO recipientId
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 2
|
||||
-- logSize testStoreMsgsFile `shouldReturn` 5
|
||||
when (usesStoreLog ps) $ logSize testStoreLogFile `shouldReturn` 2
|
||||
logSize testServerStatsBackupFile `shouldReturn` 76
|
||||
Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
checkStats stats1 [rId] 5 1
|
||||
|
||||
withSmpServerStoreMsgLogOnMS at msType testPort . runTest t $ \h -> do
|
||||
withSmpServerStoreMsgLogOn ps testPort . runTest t $ \h -> do
|
||||
Just rKey <- readTVarIO recipientKey
|
||||
Just dh <- readTVarIO dhShared
|
||||
let dec = decryptMsgV3 dh
|
||||
@@ -704,15 +709,14 @@ testRestoreMessages =
|
||||
(dec mId3 msg3, Right "hello 3") #== "restored message delivered"
|
||||
Resp "4" _ (Msg mId4 msg4) <- signSendRecv h rKey ("4", rId, ACK mId3)
|
||||
(dec mId4 msg4, Right "hello 4") #== "restored message delivered"
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
when (usesStoreLog ps) $ logSize testStoreLogFile `shouldReturn` 1
|
||||
-- the last message is not removed because it was not ACK'd
|
||||
-- logSize testStoreMsgsFile `shouldReturn` 3
|
||||
logSize testServerStatsBackupFile `shouldReturn` 76
|
||||
Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
checkStats stats2 [rId] 5 3
|
||||
|
||||
withSmpServerStoreMsgLogOnMS at msType testPort . runTest t $ \h -> do
|
||||
withSmpServerStoreMsgLogOn ps testPort . runTest t $ \h -> do
|
||||
Just rKey <- readTVarIO recipientKey
|
||||
Just dh <- readTVarIO dhShared
|
||||
let dec = decryptMsgV3 dh
|
||||
@@ -724,13 +728,12 @@ testRestoreMessages =
|
||||
(dec mId6 msg6, Left "ClientRcvMsgQuota") #== "restored message delivered"
|
||||
Resp "7" _ OK <- signSendRecv h rKey ("7", rId, ACK mId6)
|
||||
pure ()
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
-- logSize testStoreMsgsFile `shouldReturn` 0
|
||||
when (usesStoreLog ps) $ do
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
removeFile testStoreLogFile
|
||||
logSize testServerStatsBackupFile `shouldReturn` 76
|
||||
Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
checkStats stats3 [rId] 5 5
|
||||
|
||||
removeFile testStoreLogFile
|
||||
removeFileIfExists testStoreMsgsFile
|
||||
whenM (doesDirectoryExist testStoreMsgsDir) $ removeDirectoryRecursive testStoreMsgsDir
|
||||
removeFile testServerStatsBackupFile
|
||||
@@ -761,15 +764,14 @@ checkStats s qs sent received = do
|
||||
|
||||
testRestoreExpireMessages :: SpecWith (ATransport, AStoreType)
|
||||
testRestoreExpireMessages =
|
||||
it "should store messages on exit and restore on start" $ \(at@(ATransport t), msType) -> do
|
||||
it "should store messages on exit and restore on start (old / v2)" $ \ps@(at@(ATransport t), msType) -> do
|
||||
g <- C.newRandom
|
||||
(sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
|
||||
recipientId <- newTVarIO NoEntity
|
||||
recipientKey <- newTVarIO Nothing
|
||||
dhShared <- newTVarIO Nothing
|
||||
senderId <- newTVarIO NoEntity
|
||||
|
||||
withSmpServerStoreMsgLogOnMS at msType testPort . runTest t $ \h -> do
|
||||
withSmpServerStoreMsgLogOn ps testPort . runTest t $ \h -> do
|
||||
runClient t $ \h1 -> do
|
||||
(sId, rId, rKey, dh) <- createAndSecureQueue h1 sPub
|
||||
atomically $ do
|
||||
@@ -784,31 +786,36 @@ testRestoreExpireMessages =
|
||||
Resp "3" _ OK <- signSendRecv h sKey ("3", sId, _SEND "hello 3")
|
||||
Resp "4" _ OK <- signSendRecv h sKey ("4", sId, _SEND "hello 4")
|
||||
pure ()
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 2
|
||||
exportStoreMessages msType
|
||||
msgs <- B.readFile testStoreMsgsFile
|
||||
length (B.lines msgs) `shouldBe` 4
|
||||
msgs <-
|
||||
if usesStoreLog ps
|
||||
then do
|
||||
logSize testStoreLogFile `shouldReturn` 2
|
||||
exportStoreMessages msType
|
||||
msgs <- B.readFile testStoreMsgsFile
|
||||
length (B.lines msgs) `shouldBe` 4
|
||||
pure msgs
|
||||
else pure []
|
||||
|
||||
let expCfg1 = Just ExpirationConfig {ttl = 86400, checkInterval = 43200}
|
||||
cfg1 = (cfgMS msType) {messageExpiration = expCfg1, serverStatsBackupFile = Just testServerStatsBackupFile}
|
||||
withSmpServerConfigOn at cfg1 testPort . runTest t $ \_ -> pure ()
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
exportStoreMessages msType
|
||||
msgs' <- B.readFile testStoreMsgsFile
|
||||
msgs' `shouldBe` msgs
|
||||
|
||||
when (usesStoreLog ps) $ do
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
exportStoreMessages msType
|
||||
msgs' <- B.readFile testStoreMsgsFile
|
||||
msgs' `shouldBe` msgs
|
||||
let expCfg2 = Just ExpirationConfig {ttl = 2, checkInterval = 43200}
|
||||
cfg2 = (cfgMS msType) {messageExpiration = expCfg2, serverStatsBackupFile = Just testServerStatsBackupFile}
|
||||
withSmpServerConfigOn at cfg2 testPort . runTest t $ \_ -> pure ()
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
-- two messages expired
|
||||
exportStoreMessages msType
|
||||
msgs'' <- B.readFile testStoreMsgsFile
|
||||
length (B.lines msgs'') `shouldBe` 2
|
||||
B.lines msgs'' `shouldBe` drop 2 (B.lines msgs)
|
||||
when (usesStoreLog ps) $ do
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
-- two messages expired
|
||||
exportStoreMessages msType
|
||||
msgs'' <- B.readFile testStoreMsgsFile
|
||||
length (B.lines msgs'') `shouldBe` 2
|
||||
B.lines msgs'' `shouldBe` drop 2 (B.lines msgs)
|
||||
Right ServerStatsData {_msgExpired} <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
_msgExpired `shouldBe` 2
|
||||
where
|
||||
@@ -822,6 +829,7 @@ testRestoreExpireMessages =
|
||||
readWriteQueueStore True (mkQueue ms) testStoreLogFile (queueStore ms) >>= closeStoreLog
|
||||
removeFileIfExists testStoreMsgsFile
|
||||
exportMessages False ms testStoreMsgsFile False
|
||||
closeMsgStore ms
|
||||
runTest :: Transport c => TProxy c -> (THandleSMP c 'TClient -> IO ()) -> ThreadId -> Expectation
|
||||
runTest _ test' server = do
|
||||
testSMPClient test' `shouldReturn` ()
|
||||
@@ -1020,9 +1028,9 @@ testMsgNOTExpireOnInterval =
|
||||
testBlockMessageQueue :: SpecWith (ATransport, AStoreType)
|
||||
testBlockMessageQueue =
|
||||
-- TODO [postgres]
|
||||
xit "should return BLOCKED error when queue is blocked" $ \(at@(ATransport (t :: TProxy c)), msType) -> do
|
||||
xit "should return BLOCKED error when queue is blocked" $ \ps@(ATransport (t :: TProxy c), _) -> do
|
||||
g <- C.newRandom
|
||||
(rId, sId) <- withSmpServerStoreLogOnMS at msType testPort $ runTest t $ \h -> do
|
||||
(rId, sId) <- withSmpServerStoreLogOn ps testPort $ runTest t $ \h -> do
|
||||
(rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd448 g
|
||||
(dhPub, _dhPriv :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g
|
||||
Resp "abcd" rId1 (Ids rId sId _srvDh) <- signSendRecv h rKey ("abcd", NoEntity, NEW rPub dhPub Nothing SMSubscribe True)
|
||||
@@ -1032,7 +1040,7 @@ testBlockMessageQueue =
|
||||
-- TODO [postgres] block via control port
|
||||
withFile testStoreLogFile AppendMode $ \h -> B.hPutStrLn h $ strEncode $ BlockQueue rId $ BlockingInfo BRContent
|
||||
|
||||
withSmpServerStoreLogOnMS at msType testPort $ runTest t $ \h -> do
|
||||
withSmpServerStoreLogOn ps testPort $ runTest t $ \h -> do
|
||||
(sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd448 g
|
||||
Resp "dabc" sId2 (ERR (BLOCKED (BlockingInfo BRContent))) <- signSendRecv h sKey ("dabc", sId, SKEY sPub)
|
||||
(sId2, sId) #== "same queue ID in response"
|
||||
|
||||
@@ -26,7 +26,6 @@ import RemoteControl (remoteControlTests)
|
||||
import SMPClient (testServerDBConnectInfo)
|
||||
import SMPProxyTests (smpProxyTests)
|
||||
import ServerTests
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Util (createDBAndUserIfNotExists, dropDatabaseAndUser)
|
||||
import Simplex.Messaging.Server.Env.STM (AStoreType (..))
|
||||
import Simplex.Messaging.Server.MsgStore.Types (SMSType (..), SQSType (..))
|
||||
import Simplex.Messaging.Transport (TLS, Transport (..))
|
||||
@@ -34,12 +33,12 @@ import Simplex.Messaging.Transport (TLS, Transport (..))
|
||||
import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive)
|
||||
import System.Environment (setEnv)
|
||||
import Test.Hspec
|
||||
import Util (postgressBracket)
|
||||
import XFTPAgent
|
||||
import XFTPCLI
|
||||
import XFTPServerTests (xftpServerTests)
|
||||
#if defined(dbPostgres)
|
||||
import Fixtures
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Util (createDBAndUserIfNotExists, dropDatabaseAndUser)
|
||||
#else
|
||||
import AgentTests.SchemaDump (schemaDumpTest)
|
||||
#endif
|
||||
@@ -54,10 +53,8 @@ main = do
|
||||
setEnv "APNS_KEY_ID" "H82WD9K9AQ"
|
||||
setEnv "APNS_KEY_FILE" "./tests/fixtures/AuthKey_H82WD9K9AQ.p8"
|
||||
hspec
|
||||
-- TODO [postgres] run tests with postgres server locally and maybe in CI
|
||||
#if defined(dbPostgres)
|
||||
. beforeAll_ (dropDatabaseAndUser testDBConnectInfo >> createDBAndUserIfNotExists testDBConnectInfo)
|
||||
. afterAll_ (dropDatabaseAndUser testDBConnectInfo)
|
||||
. aroundAll_ (postgressBracket testDBConnectInfo)
|
||||
#endif
|
||||
. before_ (createDirectoryIfMissing False "tests/tmp")
|
||||
. after_ (eventuallyRemove "tests/tmp" 3)
|
||||
@@ -78,8 +75,7 @@ main = do
|
||||
describe "Store log tests" storeLogTests
|
||||
describe "TRcvQueues tests" tRcvQueuesTests
|
||||
describe "Util tests" utilTests
|
||||
beforeAll_ (dropDatabaseAndUser testServerDBConnectInfo >> createDBAndUserIfNotExists testServerDBConnectInfo)
|
||||
$ afterAll_ (dropDatabaseAndUser testServerDBConnectInfo)
|
||||
aroundAll_ (postgressBracket testServerDBConnectInfo)
|
||||
-- TODO [postgres] fix store log tests
|
||||
$ describe "SMP server via TLS, postgres+jornal message store" $ do
|
||||
describe "SMP syntax" $ serverSyntaxTests (transport @TLS)
|
||||
@@ -93,8 +89,13 @@ main = do
|
||||
-- describe "SMP syntax" $ serverSyntaxTests (transport @WS)
|
||||
-- before (pure (transport @WS, ASType SQSMemory SMSJournal)) serverTests
|
||||
describe "Notifications server" $ ntfServerTests (transport @TLS)
|
||||
describe "SMP client agent" $ agentTests (transport @TLS)
|
||||
describe "SMP proxy" smpProxyTests
|
||||
aroundAll_ (postgressBracket testServerDBConnectInfo) $ do
|
||||
describe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal)
|
||||
describe "SMP proxy, postgres+jornal message store" $
|
||||
before (pure $ ASType SQSPostgres SMSJournal) smpProxyTests
|
||||
describe "SMP client agent, jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal)
|
||||
describe "SMP proxy, jornal message store" $
|
||||
before (pure $ ASType SQSMemory SMSJournal) smpProxyTests
|
||||
describe "XFTP" $ do
|
||||
describe "XFTP server" xftpServerTests
|
||||
describe "XFTP file description" fileDescriptionTests
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
module Util where
|
||||
|
||||
import qualified Control.Exception as E
|
||||
import Control.Monad (replicateM, when)
|
||||
import Data.Either (partitionEithers)
|
||||
import Data.List (tails)
|
||||
import Database.PostgreSQL.Simple (ConnectInfo (..))
|
||||
import GHC.Conc (getNumCapabilities, getNumProcessors, setNumCapabilities)
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Util (createDBAndUserIfNotExists, dropDatabaseAndUser)
|
||||
import System.Directory (doesFileExist, removeFile)
|
||||
import Test.Hspec
|
||||
import UnliftIO
|
||||
@@ -32,3 +35,9 @@ removeFileIfExists :: FilePath -> IO ()
|
||||
removeFileIfExists filePath = do
|
||||
fileExists <- doesFileExist filePath
|
||||
when fileExists $ removeFile filePath
|
||||
|
||||
postgressBracket :: ConnectInfo -> IO a -> IO a
|
||||
postgressBracket connInfo =
|
||||
E.bracket_
|
||||
(dropDatabaseAndUser connInfo >> createDBAndUserIfNotExists connInfo)
|
||||
(dropDatabaseAndUser connInfo)
|
||||
|
||||
Reference in New Issue
Block a user