mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-26 23:55:14 +00:00
change block size, checkpoint after vacuum, disable auto-vacuum
This commit is contained in:
@@ -41,6 +41,7 @@ module Simplex.Messaging.Agent
|
||||
SubscriptionsInfo (..),
|
||||
getSMPAgentClient,
|
||||
disconnectAgentClient,
|
||||
disposeAgentClient,
|
||||
resumeAgentClient,
|
||||
withConnLock,
|
||||
createUser,
|
||||
@@ -179,6 +180,11 @@ disconnectAgentClient c@AgentClient {agentEnv = Env {ntfSupervisor = ns, xftpAge
|
||||
closeXFTPAgent xa
|
||||
logConnection c False
|
||||
|
||||
disposeAgentClient :: MonadUnliftIO m => AgentClient -> m ()
|
||||
disposeAgentClient c@AgentClient {agentEnv = Env {store}} = do
|
||||
disconnectAgentClient c
|
||||
liftIO $ closeSQLiteStore store
|
||||
|
||||
resumeAgentClient :: MonadIO m => AgentClient -> m ()
|
||||
resumeAgentClient c = atomically $ writeTVar (active c) True
|
||||
|
||||
|
||||
@@ -402,9 +402,10 @@ connectDB path key checkPageSize = do
|
||||
where
|
||||
prepare db = do
|
||||
unless (null key) . execSQL_ db $ "PRAGMA key = " <> sqlString key <> ";"
|
||||
when checkPageSize $ do
|
||||
pageSize :: Maybe Int <- maybeFirstRow fromOnly $ SQL.query_ (DB.conn db) "PRAGMA page_size;"
|
||||
when (pageSize == Just 16384) $ execSQL_ db
|
||||
when checkPageSize $ maybeFirstRow id (SQL.query_ (DB.conn db) "PRAGMA page_size;") >>= \case
|
||||
Nothing -> pure ()
|
||||
Just (Only (16384 :: Int)) -> pure ()
|
||||
Just _ -> execSQL_ db
|
||||
"PRAGMA wal_checkpoint(TRUNCATE);\n\
|
||||
\PRAGMA journal_mode = DELETE;\n\
|
||||
\PRAGMA page_size = 16384;\n\
|
||||
@@ -416,8 +417,7 @@ connectDB path key checkPageSize = do
|
||||
\PRAGMA busy_timeout = 100;\n\
|
||||
\PRAGMA foreign_keys = ON;\n\
|
||||
\-- PRAGMA trusted_schema = OFF;\n\
|
||||
\PRAGMA secure_delete = ON;\n\
|
||||
\PRAGMA auto_vacuum = FULL;"
|
||||
\PRAGMA secure_delete = ON;"
|
||||
|
||||
closeSQLiteStore :: SQLiteStore -> IO ()
|
||||
closeSQLiteStore st@SQLiteStore {dbClosed} =
|
||||
|
||||
@@ -121,7 +121,7 @@ getCurrent db = map toMigration <$> DB.query_ db "SELECT name, down FROM migrati
|
||||
run :: SQLiteStore -> MigrationsToRun -> IO ()
|
||||
run st = \case
|
||||
MTRUp [] -> pure ()
|
||||
MTRUp ms -> mapM_ runUp ms >> withConnection' st (`execSQL` "VACUUM;")
|
||||
MTRUp ms -> mapM_ runUp ms >> withConnection' st (`execSQL` "VACUUM; PRAGMA wal_checkpoint(TRUNCATE);")
|
||||
MTRDown ms -> mapM_ runDown $ reverse ms
|
||||
MTRNone -> pure ()
|
||||
where
|
||||
|
||||
@@ -356,8 +356,8 @@ withAgentClientsCfg2 aCfg bCfg runTest = do
|
||||
a <- getSMPAgentClient' aCfg initAgentServers testDB
|
||||
b <- getSMPAgentClient' bCfg initAgentServers testDB2
|
||||
runTest a b
|
||||
disconnectAgentClient a
|
||||
disconnectAgentClient b
|
||||
disposeAgentClient a
|
||||
disposeAgentClient b
|
||||
|
||||
withAgentClients2 :: (AgentClient -> AgentClient -> IO ()) -> IO ()
|
||||
withAgentClients2 = withAgentClientsCfg2 agentCfg agentCfg
|
||||
@@ -446,7 +446,7 @@ testAsyncInitiatingOffline :: HasCallStack => IO ()
|
||||
testAsyncInitiatingOffline =
|
||||
withAgentClients2 $ \alice bob -> runRight_ $ do
|
||||
(bobId, cReq) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe
|
||||
disconnectAgentClient alice
|
||||
disposeAgentClient alice
|
||||
aliceId <- joinConnection bob 1 True cReq "bob's connInfo" SMSubscribe
|
||||
alice' <- liftIO $ getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
subscribeConnection alice' bobId
|
||||
@@ -462,7 +462,7 @@ testAsyncJoiningOfflineBeforeActivation =
|
||||
withAgentClients2 $ \alice bob -> runRight_ $ do
|
||||
(bobId, qInfo) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe
|
||||
aliceId <- joinConnection bob 1 True qInfo "bob's connInfo" SMSubscribe
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient bob
|
||||
("", _, CONF confId _ "bob's connInfo") <- get alice
|
||||
allowConnection alice bobId confId "alice's connInfo"
|
||||
bob' <- liftIO $ getSMPAgentClient' agentCfg initAgentServers testDB2
|
||||
@@ -476,9 +476,9 @@ testAsyncBothOffline :: HasCallStack => IO ()
|
||||
testAsyncBothOffline =
|
||||
withAgentClients2 $ \alice bob -> runRight_ $ do
|
||||
(bobId, cReq) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe
|
||||
disconnectAgentClient alice
|
||||
disposeAgentClient alice
|
||||
aliceId <- joinConnection bob 1 True cReq "bob's connInfo" SMSubscribe
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient bob
|
||||
alice' <- liftIO $ getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
subscribeConnection alice' bobId
|
||||
("", _, CONF confId _ "bob's connInfo") <- get alice'
|
||||
@@ -522,7 +522,7 @@ testAsyncHelloTimeout = do
|
||||
agentCfgV1 = agentCfg {smpAgentVRange = vr11, smpClientVRange = vr11, e2eEncryptVRange = vr11, smpCfg = smpCfgV1}
|
||||
withAgentClientsCfg2 agentCfgV1 agentCfg {helloTimeout = 1} $ \alice bob -> runRight_ $ do
|
||||
(_, cReq) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe
|
||||
disconnectAgentClient alice
|
||||
disposeAgentClient alice
|
||||
aliceId <- joinConnection bob 1 True cReq "bob's connInfo" SMSubscribe
|
||||
get bob ##> ("", aliceId, ERR $ CONN NOT_ACCEPTED)
|
||||
|
||||
@@ -548,7 +548,7 @@ testAllowConnectionClientRestart t = do
|
||||
pure ()
|
||||
|
||||
threadDelay 100000 -- give time to enqueue confirmation (enqueueConfirmation)
|
||||
disconnectAgentClient alice
|
||||
disposeAgentClient alice
|
||||
|
||||
alice2 <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
|
||||
@@ -563,8 +563,8 @@ testAllowConnectionClientRestart t = do
|
||||
get bob ##> ("", aliceId, CON)
|
||||
|
||||
exchangeGreetingsMsgId 4 alice2 bobId bob aliceId
|
||||
disconnectAgentClient alice2
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient alice2
|
||||
disposeAgentClient bob
|
||||
|
||||
testIncreaseConnAgentVersion :: HasCallStack => ATransport -> IO ()
|
||||
testIncreaseConnAgentVersion t = do
|
||||
@@ -580,7 +580,7 @@ testIncreaseConnAgentVersion t = do
|
||||
|
||||
-- version doesn't increase if incompatible
|
||||
|
||||
disconnectAgentClient alice
|
||||
disposeAgentClient alice
|
||||
alice2 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 3} initAgentServers testDB
|
||||
|
||||
runRight_ $ do
|
||||
@@ -591,7 +591,7 @@ testIncreaseConnAgentVersion t = do
|
||||
|
||||
-- version increases if compatible
|
||||
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient bob
|
||||
bob2 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 3} initAgentServers testDB2
|
||||
|
||||
runRight_ $ do
|
||||
@@ -602,7 +602,7 @@ testIncreaseConnAgentVersion t = do
|
||||
|
||||
-- version doesn't decrease, even if incompatible
|
||||
|
||||
disconnectAgentClient alice2
|
||||
disposeAgentClient alice2
|
||||
alice3 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 2 2} initAgentServers testDB
|
||||
|
||||
runRight_ $ do
|
||||
@@ -611,7 +611,7 @@ testIncreaseConnAgentVersion t = do
|
||||
checkVersion alice3 bobId 3
|
||||
checkVersion bob2 aliceId 3
|
||||
|
||||
disconnectAgentClient bob2
|
||||
disposeAgentClient bob2
|
||||
bob3 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 1} initAgentServers testDB2
|
||||
|
||||
runRight_ $ do
|
||||
@@ -619,8 +619,8 @@ testIncreaseConnAgentVersion t = do
|
||||
exchangeGreetingsMsgId 12 alice3 bobId bob3 aliceId
|
||||
checkVersion alice3 bobId 3
|
||||
checkVersion bob3 aliceId 3
|
||||
disconnectAgentClient alice3
|
||||
disconnectAgentClient bob3
|
||||
disposeAgentClient alice3
|
||||
disposeAgentClient bob3
|
||||
|
||||
checkVersion :: AgentClient -> ConnId -> Version -> ExceptT AgentErrorType IO ()
|
||||
checkVersion c connId v = do
|
||||
@@ -641,9 +641,9 @@ testIncreaseConnAgentVersionMaxCompatible t = do
|
||||
|
||||
-- version increases to max compatible
|
||||
|
||||
disconnectAgentClient alice
|
||||
disposeAgentClient alice
|
||||
alice2 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 3} initAgentServers testDB
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient bob
|
||||
bob2 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 4} initAgentServers testDB2
|
||||
|
||||
runRight_ $ do
|
||||
@@ -652,8 +652,8 @@ testIncreaseConnAgentVersionMaxCompatible t = do
|
||||
exchangeGreetingsMsgId 6 alice2 bobId bob2 aliceId
|
||||
checkVersion alice2 bobId 3
|
||||
checkVersion bob2 aliceId 3
|
||||
disconnectAgentClient alice2
|
||||
disconnectAgentClient bob2
|
||||
disposeAgentClient alice2
|
||||
disposeAgentClient bob2
|
||||
|
||||
testIncreaseConnAgentVersionStartDifferentVersion :: HasCallStack => ATransport -> IO ()
|
||||
testIncreaseConnAgentVersionStartDifferentVersion t = do
|
||||
@@ -669,7 +669,7 @@ testIncreaseConnAgentVersionStartDifferentVersion t = do
|
||||
|
||||
-- version increases to max compatible
|
||||
|
||||
disconnectAgentClient alice
|
||||
disposeAgentClient alice
|
||||
alice2 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 3} initAgentServers testDB
|
||||
|
||||
runRight_ $ do
|
||||
@@ -677,8 +677,8 @@ testIncreaseConnAgentVersionStartDifferentVersion t = do
|
||||
exchangeGreetingsMsgId 6 alice2 bobId bob aliceId
|
||||
checkVersion alice2 bobId 3
|
||||
checkVersion bob aliceId 3
|
||||
disconnectAgentClient alice2
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient alice2
|
||||
disposeAgentClient bob
|
||||
|
||||
testDeliverClientRestart :: HasCallStack => ATransport -> IO ()
|
||||
testDeliverClientRestart t = do
|
||||
@@ -696,7 +696,7 @@ testDeliverClientRestart t = do
|
||||
|
||||
6 <- runRight $ sendMessage bob aliceId SMP.noMsgFlags "hello"
|
||||
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient bob
|
||||
|
||||
bob2 <- getSMPAgentClient' agentCfg initAgentServers testDB2
|
||||
|
||||
@@ -708,8 +708,8 @@ testDeliverClientRestart t = do
|
||||
|
||||
get bob2 ##> ("", aliceId, SENT 6)
|
||||
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
|
||||
disconnectAgentClient alice
|
||||
disconnectAgentClient bob2
|
||||
disposeAgentClient alice
|
||||
disposeAgentClient bob2
|
||||
|
||||
testDuplicateMessage :: HasCallStack => ATransport -> IO ()
|
||||
testDuplicateMessage t = do
|
||||
@@ -721,7 +721,7 @@ testDuplicateMessage t = do
|
||||
4 <- sendMessage alice bobId SMP.noMsgFlags "hello"
|
||||
get alice ##> ("", bobId, SENT 4)
|
||||
get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient bob
|
||||
|
||||
-- if the agent user did not send ACK, the message will be delivered again
|
||||
bob1 <- getSMPAgentClient' agentCfg initAgentServers testDB2
|
||||
@@ -742,8 +742,8 @@ testDuplicateMessage t = do
|
||||
threadDelay 200000
|
||||
Left (BROKER _ TIMEOUT) <- runExceptT $ ackMessage bob1 aliceId 5 Nothing
|
||||
|
||||
disconnectAgentClient alice
|
||||
disconnectAgentClient bob1
|
||||
disposeAgentClient alice
|
||||
disposeAgentClient bob1
|
||||
|
||||
alice2 <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
bob2 <- getSMPAgentClient' agentCfg initAgentServers testDB2
|
||||
@@ -758,8 +758,8 @@ testDuplicateMessage t = do
|
||||
6 <- sendMessage alice2 bobId SMP.noMsgFlags "hello 3"
|
||||
get alice2 ##> ("", bobId, SENT 6)
|
||||
get bob2 =##> \case ("", c, Msg "hello 3") -> c == aliceId; _ -> False
|
||||
disconnectAgentClient alice2
|
||||
disconnectAgentClient bob2
|
||||
disposeAgentClient alice2
|
||||
disposeAgentClient bob2
|
||||
|
||||
testSkippedMessages :: HasCallStack => ATransport -> IO ()
|
||||
testSkippedMessages t = do
|
||||
@@ -773,7 +773,7 @@ testSkippedMessages t = do
|
||||
get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
|
||||
ackMessage bob aliceId 4 Nothing
|
||||
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient bob
|
||||
|
||||
runRight_ $ do
|
||||
5 <- sendMessage alice bobId SMP.noMsgFlags "hello 2"
|
||||
@@ -788,7 +788,7 @@ testSkippedMessages t = do
|
||||
nGet alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False
|
||||
threadDelay 200000
|
||||
|
||||
disconnectAgentClient alice
|
||||
disposeAgentClient alice
|
||||
|
||||
alice2 <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
bob2 <- getSMPAgentClient' agentCfg initAgentServers testDB2
|
||||
@@ -807,8 +807,8 @@ testSkippedMessages t = do
|
||||
get alice2 ##> ("", bobId, SENT 9)
|
||||
get bob2 =##> \case ("", c, Msg "hello 6") -> c == aliceId; _ -> False
|
||||
ackMessage bob2 aliceId 6 Nothing
|
||||
disconnectAgentClient alice2
|
||||
disconnectAgentClient bob2
|
||||
disposeAgentClient alice2
|
||||
disposeAgentClient bob2
|
||||
|
||||
testRatchetSync :: HasCallStack => ATransport -> IO ()
|
||||
testRatchetSync t = withAgentClients2 $ \alice bob ->
|
||||
@@ -849,7 +849,7 @@ setupDesynchronizedRatchet alice bob = do
|
||||
get alice =##> \case ("", c, Msg "hello 4") -> c == bobId; _ -> False
|
||||
ackMessage alice bobId 7 Nothing
|
||||
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient bob
|
||||
|
||||
-- importing database backup after progressing ratchet de-synchronizes ratchet
|
||||
liftIO $ renameFile (testDB2 <> ".bak") testDB2
|
||||
@@ -922,7 +922,7 @@ testRatchetSyncClientRestart t = do
|
||||
("", "", DOWN _ _) <- nGet bob2
|
||||
ConnectionStats {ratchetSyncState} <- runRight $ synchronizeRatchet bob2 aliceId False
|
||||
liftIO $ ratchetSyncState `shouldBe` RSStarted
|
||||
disconnectAgentClient bob2
|
||||
disposeAgentClient bob2
|
||||
bob3 <- getSMPAgentClient' agentCfg initAgentServers testDB2
|
||||
withSmpServerStoreMsgLogOn t testPort $ \_ -> do
|
||||
runRight_ $ do
|
||||
@@ -933,9 +933,9 @@ testRatchetSyncClientRestart t = do
|
||||
get alice =##> ratchetSyncP bobId RSOk
|
||||
get bob3 =##> ratchetSyncP aliceId RSOk
|
||||
exchangeGreetingsMsgIds alice bobId 12 bob3 aliceId 9
|
||||
disconnectAgentClient alice
|
||||
disconnectAgentClient bob
|
||||
disconnectAgentClient bob3
|
||||
disposeAgentClient alice
|
||||
disposeAgentClient bob
|
||||
disposeAgentClient bob3
|
||||
|
||||
testRatchetSyncSuspendForeground :: HasCallStack => ATransport -> IO ()
|
||||
testRatchetSyncSuspendForeground t = do
|
||||
@@ -967,9 +967,9 @@ testRatchetSyncSuspendForeground t = do
|
||||
get alice =##> ratchetSyncP bobId RSOk
|
||||
get bob2 =##> ratchetSyncP aliceId RSOk
|
||||
exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9
|
||||
disconnectAgentClient alice
|
||||
disconnectAgentClient bob
|
||||
disconnectAgentClient bob2
|
||||
disposeAgentClient alice
|
||||
disposeAgentClient bob
|
||||
disposeAgentClient bob2
|
||||
|
||||
testRatchetSyncSimultaneous :: HasCallStack => ATransport -> IO ()
|
||||
testRatchetSyncSimultaneous t = do
|
||||
@@ -1000,9 +1000,9 @@ testRatchetSyncSimultaneous t = do
|
||||
get alice =##> ratchetSyncP bobId RSOk
|
||||
get bob2 =##> ratchetSyncP aliceId RSOk
|
||||
exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9
|
||||
disconnectAgentClient alice
|
||||
disconnectAgentClient bob
|
||||
disconnectAgentClient bob2
|
||||
disposeAgentClient alice
|
||||
disposeAgentClient bob
|
||||
disposeAgentClient bob2
|
||||
|
||||
testOnlyCreatePull :: IO ()
|
||||
testOnlyCreatePull = withAgentClients2 $ \alice bob -> runRight_ $ do
|
||||
@@ -1059,7 +1059,7 @@ testInactiveClientDisconnected t = do
|
||||
runRight_ $ do
|
||||
(connId, _cReq) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe
|
||||
nGet alice ##> ("", "", DOWN testSMPServer [connId])
|
||||
disconnectAgentClient alice
|
||||
disposeAgentClient alice
|
||||
|
||||
testActiveClientNotDisconnected :: ATransport -> IO ()
|
||||
testActiveClientNotDisconnected t = do
|
||||
@@ -1070,7 +1070,7 @@ testActiveClientNotDisconnected t = do
|
||||
runRight_ $ do
|
||||
(connId, _cReq) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe
|
||||
keepSubscribing alice connId ts
|
||||
disconnectAgentClient alice
|
||||
disposeAgentClient alice
|
||||
where
|
||||
keepSubscribing :: AgentClient -> ConnId -> SystemTime -> ExceptT AgentErrorType IO ()
|
||||
keepSubscribing alice connId ts = do
|
||||
@@ -1189,8 +1189,8 @@ testBatchedSubscriptions nCreate nDel t = do
|
||||
delete b aIds'
|
||||
deleteFail a bIds'
|
||||
deleteFail b aIds'
|
||||
disconnectAgentClient a
|
||||
disconnectAgentClient b
|
||||
disposeAgentClient a
|
||||
disposeAgentClient b
|
||||
where
|
||||
subscribe :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO ()
|
||||
subscribe c cs = do
|
||||
@@ -1269,14 +1269,14 @@ testAsyncCommandsRestore t = do
|
||||
alice <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
bobId <- runRight $ createConnectionAsync alice 1 "1" True SCMInvitation SMSubscribe
|
||||
liftIO $ noMessages alice "alice doesn't receive INV because server is down"
|
||||
disconnectAgentClient alice
|
||||
disposeAgentClient alice
|
||||
alice' <- liftIO $ getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
withSmpServerStoreLogOn t testPort $ \_ -> do
|
||||
runRight_ $ do
|
||||
subscribeConnection alice' bobId
|
||||
("1", _, INV _) <- get alice'
|
||||
pure ()
|
||||
disconnectAgentClient alice'
|
||||
disposeAgentClient alice'
|
||||
|
||||
testAcceptContactAsync :: IO ()
|
||||
testAcceptContactAsync =
|
||||
@@ -1335,7 +1335,7 @@ testDeleteConnectionAsync t = do
|
||||
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
|
||||
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
|
||||
liftIO $ noMessages a "nothing else should be delivered to alice"
|
||||
disconnectAgentClient a
|
||||
disposeAgentClient a
|
||||
|
||||
testJoinConnectionAsyncReplyError :: HasCallStack => ATransport -> IO ()
|
||||
testJoinConnectionAsyncReplyError t = do
|
||||
@@ -1376,8 +1376,8 @@ testJoinConnectionAsyncReplyError t = do
|
||||
get b ##> ("", aId, INFO "alice's connInfo")
|
||||
get b ##> ("", aId, CON)
|
||||
exchangeGreetings a bId b aId
|
||||
disconnectAgentClient a
|
||||
disconnectAgentClient b
|
||||
disposeAgentClient a
|
||||
disposeAgentClient b
|
||||
|
||||
testUsers :: IO ()
|
||||
testUsers =
|
||||
@@ -1440,8 +1440,8 @@ testSwitchConnection servers = do
|
||||
exchangeGreetingsMsgId 4 a bId b aId
|
||||
testFullSwitch a bId b aId 10
|
||||
testFullSwitch a bId b aId 16
|
||||
disconnectAgentClient a
|
||||
disconnectAgentClient b
|
||||
disposeAgentClient a
|
||||
disposeAgentClient b
|
||||
|
||||
testFullSwitch :: AgentClient -> ByteString -> AgentClient -> ByteString -> Int64 -> ExceptT AgentErrorType IO ()
|
||||
testFullSwitch a bId b aId msgId = do
|
||||
@@ -1522,7 +1522,7 @@ testSwitchAsync servers = do
|
||||
withB = withAgent agentCfg {initialClientId = 1} servers testDB2
|
||||
|
||||
withAgent :: AgentConfig -> InitialAgentServers -> FilePath -> (AgentClient -> IO a) -> IO a
|
||||
withAgent cfg' servers dbPath = bracket (getSMPAgentClient' cfg' servers dbPath) disconnectAgentClient
|
||||
withAgent cfg' servers dbPath = bracket (getSMPAgentClient' cfg' servers dbPath) disposeAgentClient
|
||||
|
||||
sessionSubscribe :: (forall a. (AgentClient -> IO a) -> IO a) -> [ConnId] -> (AgentClient -> ExceptT AgentErrorType IO ()) -> IO ()
|
||||
sessionSubscribe withC connIds a =
|
||||
@@ -1540,7 +1540,7 @@ testSwitchDelete servers = do
|
||||
runRight_ $ do
|
||||
(aId, bId) <- makeConnection a b
|
||||
exchangeGreetingsMsgId 4 a bId b aId
|
||||
disconnectAgentClient b
|
||||
disposeAgentClient b
|
||||
stats <- switchConnectionAsync a "" bId
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Just RSSwitchStarted]
|
||||
phaseRcv a bId SPStarted [Just RSSendingQADD, Nothing]
|
||||
@@ -1549,8 +1549,8 @@ testSwitchDelete servers = do
|
||||
get a =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bId; _ -> False
|
||||
get a =##> \case ("", c, DEL_CONN) -> c == bId; _ -> False
|
||||
liftIO $ noMessages a "nothing else should be delivered to alice"
|
||||
disconnectAgentClient a
|
||||
disconnectAgentClient b
|
||||
disposeAgentClient a
|
||||
disposeAgentClient b
|
||||
|
||||
testAbortSwitchStarted :: HasCallStack => InitialAgentServers -> IO ()
|
||||
testAbortSwitchStarted servers = do
|
||||
@@ -1838,8 +1838,8 @@ testCreateQueueAuth clnt1 clnt2 = do
|
||||
get b ##> ("", aId, CON)
|
||||
exchangeGreetings a bId b aId
|
||||
pure 2
|
||||
disconnectAgentClient a
|
||||
disconnectAgentClient b
|
||||
disposeAgentClient a
|
||||
disposeAgentClient b
|
||||
pure r
|
||||
where
|
||||
getClient (clntAuth, clntVersion) =
|
||||
@@ -1902,8 +1902,8 @@ testDeliveryReceiptsVersion t = do
|
||||
liftIO $ noMessages b "no delivery receipt (unsupported version)"
|
||||
pure (aId, bId)
|
||||
|
||||
disconnectAgentClient a
|
||||
disconnectAgentClient b
|
||||
disposeAgentClient a
|
||||
disposeAgentClient b
|
||||
a' <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 4} initAgentServers testDB
|
||||
b' <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 4} initAgentServers testDB2
|
||||
|
||||
@@ -1925,8 +1925,8 @@ testDeliveryReceiptsVersion t = do
|
||||
ackMessage a' bId 10 $ Just ""
|
||||
get b' =##> \case ("", c, Rcvd 10) -> c == aId; _ -> False
|
||||
ackMessage b' aId 11 Nothing
|
||||
disconnectAgentClient a'
|
||||
disconnectAgentClient b'
|
||||
disposeAgentClient a'
|
||||
disposeAgentClient b'
|
||||
|
||||
testTwoUsers :: HasCallStack => IO ()
|
||||
testTwoUsers = withAgentClients2 $ \a b -> do
|
||||
@@ -2012,7 +2012,7 @@ testServerMultipleIdentities =
|
||||
exchangeGreetings alice bobId bob aliceId
|
||||
-- this saves queue with second server identity
|
||||
Left (BROKER _ NETWORK) <- runExceptT $ joinConnection bob 1 True secondIdentityCReq "bob's connInfo" SMSubscribe
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient bob
|
||||
bob' <- liftIO $ getSMPAgentClient' agentCfg initAgentServers testDB2
|
||||
subscribeConnection bob' aliceId
|
||||
exchangeGreetingsMsgId 6 alice bobId bob' aliceId
|
||||
|
||||
@@ -112,7 +112,7 @@ testNotificationToken APNSMockServer {apnsQ} = do
|
||||
deleteNtfToken a tkn
|
||||
-- agent deleted this token
|
||||
Left (CMD PROHIBITED) <- tryE $ checkNtfToken a tkn
|
||||
disconnectAgentClient a
|
||||
disposeAgentClient a
|
||||
|
||||
(.->) :: J.Value -> J.Key -> ExceptT AgentErrorType IO ByteString
|
||||
v .-> key = do
|
||||
@@ -144,7 +144,7 @@ testNtfTokenRepeatRegistration APNSMockServer {apnsQ} = do
|
||||
-- can still use the first verification code, it is the same after decryption
|
||||
verifyNtfToken a tkn nonce verification
|
||||
NTActive <- checkNtfToken a tkn
|
||||
disconnectAgentClient a
|
||||
disposeAgentClient a
|
||||
|
||||
testNtfTokenSecondRegistration :: APNSMockServer -> IO ()
|
||||
testNtfTokenSecondRegistration APNSMockServer {apnsQ} = do
|
||||
@@ -180,8 +180,8 @@ testNtfTokenSecondRegistration APNSMockServer {apnsQ} = do
|
||||
Left (NTF AUTH) <- tryE $ checkNtfToken a tkn
|
||||
-- and the second is active
|
||||
NTActive <- checkNtfToken a' tkn
|
||||
disconnectAgentClient a
|
||||
disconnectAgentClient a'
|
||||
disposeAgentClient a
|
||||
disposeAgentClient a'
|
||||
|
||||
testNtfTokenServerRestart :: ATransport -> APNSMockServer -> IO ()
|
||||
testNtfTokenServerRestart t APNSMockServer {apnsQ} = do
|
||||
@@ -195,7 +195,7 @@ testNtfTokenServerRestart t APNSMockServer {apnsQ} = do
|
||||
pure ntfData
|
||||
-- the new agent is created as otherwise when running the tests in CI the old agent was keeping the connection to the server
|
||||
threadDelay 1000000
|
||||
disconnectAgentClient a
|
||||
disposeAgentClient a
|
||||
a' <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
-- server stopped before token is verified, so now the attempt to verify it will return AUTH error but re-register token,
|
||||
-- so that repeat verification happens without restarting the clients, when notification arrives
|
||||
@@ -210,7 +210,7 @@ testNtfTokenServerRestart t APNSMockServer {apnsQ} = do
|
||||
liftIO $ sendApnsResponse' APNSRespOk
|
||||
verifyNtfToken a' tkn nonce' verification'
|
||||
NTActive <- checkNtfToken a' tkn
|
||||
disconnectAgentClient a'
|
||||
disposeAgentClient a'
|
||||
|
||||
testNotificationSubscriptionExistingConnection :: APNSMockServer -> IO ()
|
||||
testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do
|
||||
@@ -251,7 +251,7 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do
|
||||
runRight_ $ do
|
||||
(_, [SMPMsgMeta {msgFlags = MsgFlags True}]) <- getNotificationMessage aliceNtf nonce message
|
||||
pure ()
|
||||
disconnectAgentClient aliceNtf
|
||||
disposeAgentClient aliceNtf
|
||||
|
||||
runRight_ $ do
|
||||
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
|
||||
@@ -264,8 +264,8 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do
|
||||
get bob ##> ("", aliceId, SENT $ baseId + 2)
|
||||
-- no notifications should follow
|
||||
noNotification apnsQ
|
||||
disconnectAgentClient alice
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient alice
|
||||
disposeAgentClient bob
|
||||
where
|
||||
baseId = 3
|
||||
msgId = subtract baseId
|
||||
@@ -309,8 +309,8 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do
|
||||
ackMessage bob aliceId (baseId + 2) Nothing
|
||||
-- no unexpected notifications should follow
|
||||
noNotification apnsQ
|
||||
disconnectAgentClient alice
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient alice
|
||||
disposeAgentClient bob
|
||||
where
|
||||
baseId = 3
|
||||
msgId = subtract baseId
|
||||
@@ -388,8 +388,8 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do
|
||||
ackMessage alice bobId (baseId + 5) Nothing
|
||||
-- no notifications should follow
|
||||
noNotification apnsQ
|
||||
disconnectAgentClient alice
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient alice
|
||||
disposeAgentClient bob
|
||||
where
|
||||
baseId = 3
|
||||
msgId = subtract baseId
|
||||
@@ -417,7 +417,7 @@ testChangeToken APNSMockServer {apnsQ} = do
|
||||
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
|
||||
ackMessage alice bobId (baseId + 1) Nothing
|
||||
pure (aliceId, bobId)
|
||||
disconnectAgentClient alice
|
||||
disposeAgentClient alice
|
||||
|
||||
alice1 <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
runRight_ $ do
|
||||
@@ -433,8 +433,8 @@ testChangeToken APNSMockServer {apnsQ} = do
|
||||
ackMessage alice1 bobId (baseId + 2) Nothing
|
||||
-- no notifications should follow
|
||||
noNotification apnsQ
|
||||
disconnectAgentClient alice1
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient alice1
|
||||
disposeAgentClient bob
|
||||
where
|
||||
baseId = 3
|
||||
msgId = subtract baseId
|
||||
@@ -464,8 +464,8 @@ testNotificationsStoreLog t APNSMockServer {apnsQ} = do
|
||||
void $ messageNotification apnsQ
|
||||
get alice =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False
|
||||
liftIO $ killThread threadId
|
||||
disconnectAgentClient alice
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient alice
|
||||
disposeAgentClient bob
|
||||
|
||||
testNotificationsSMPRestart :: ATransport -> APNSMockServer -> IO ()
|
||||
testNotificationsSMPRestart t APNSMockServer {apnsQ} = do
|
||||
@@ -496,8 +496,8 @@ testNotificationsSMPRestart t APNSMockServer {apnsQ} = do
|
||||
_ <- messageNotificationData alice apnsQ
|
||||
get alice =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False
|
||||
liftIO $ killThread threadId
|
||||
disconnectAgentClient alice
|
||||
disconnectAgentClient bob
|
||||
disposeAgentClient alice
|
||||
disposeAgentClient bob
|
||||
|
||||
testNotificationsSMPRestartBatch :: Int -> ATransport -> APNSMockServer -> IO ()
|
||||
testNotificationsSMPRestartBatch n t APNSMockServer {apnsQ} = do
|
||||
@@ -536,8 +536,8 @@ testNotificationsSMPRestartBatch n t APNSMockServer {apnsQ} = do
|
||||
get b ##> ("", aliceId, SENT msgId)
|
||||
_ <- messageNotificationData a apnsQ
|
||||
get a =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False
|
||||
disconnectAgentClient a
|
||||
disconnectAgentClient b
|
||||
disposeAgentClient a
|
||||
disposeAgentClient b
|
||||
where
|
||||
runServers :: ExceptT AgentErrorType IO a -> IO a
|
||||
runServers a = do
|
||||
@@ -567,8 +567,8 @@ testSwitchNotifications servers APNSMockServer {apnsQ} = do
|
||||
switchComplete a bId b aId
|
||||
liftIO $ threadDelay 500000
|
||||
testMessage "hello again"
|
||||
disconnectAgentClient a
|
||||
disconnectAgentClient b
|
||||
disposeAgentClient a
|
||||
disposeAgentClient b
|
||||
|
||||
messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString)
|
||||
messageNotification apnsQ = do
|
||||
|
||||
+13
-13
@@ -21,7 +21,7 @@ import SMPAgentClient (agentCfg, initAgentServers, testDB, testDB2, testDB3)
|
||||
import Simplex.FileTransfer.Description
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..), XFTPErrorType (AUTH))
|
||||
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..))
|
||||
import Simplex.Messaging.Agent (AgentClient, disconnectAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpDeleteSndFileRemote, xftpReceiveFile, xftpSendFile, xftpStartWorkers)
|
||||
import Simplex.Messaging.Agent (AgentClient, disposeAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpDeleteSndFileRemote, xftpReceiveFile, xftpSendFile, xftpStartWorkers)
|
||||
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..))
|
||||
import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), RcvFileId, SndFileId, noAuthSrv)
|
||||
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs)
|
||||
@@ -103,7 +103,7 @@ testXFTPAgentSendReceive = withXFTPServer $ do
|
||||
runRight_ $ do
|
||||
rfId <- testReceive rcp rfd originalFilePath
|
||||
xftpDeleteRcvFile rcp rfId
|
||||
disconnectAgentClient rcp
|
||||
disposeAgentClient rcp
|
||||
|
||||
testXFTPAgentSendReceiveEncrypted :: HasCallStack => IO ()
|
||||
testXFTPAgentSendReceiveEncrypted = withXFTPServer $ do
|
||||
@@ -126,7 +126,7 @@ testXFTPAgentSendReceiveEncrypted = withXFTPServer $ do
|
||||
runRight_ $ do
|
||||
rfId <- testReceiveCF rcp rfd cfArgs originalFilePath
|
||||
xftpDeleteRcvFile rcp rfId
|
||||
disconnectAgentClient rcp
|
||||
disposeAgentClient rcp
|
||||
|
||||
createRandomFile :: HasCallStack => IO FilePath
|
||||
createRandomFile = do
|
||||
@@ -183,7 +183,7 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do
|
||||
rfId <- xftpReceiveFile rcp 1 rfd Nothing
|
||||
liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt
|
||||
pure rfId
|
||||
disconnectAgentClient rcp
|
||||
disposeAgentClient rcp
|
||||
|
||||
[prefixDir] <- listDirectory recipientFiles
|
||||
let tmpPath = recipientFiles </> prefixDir </> "xftp.encrypted"
|
||||
@@ -195,7 +195,7 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do
|
||||
runRight_ $ xftpStartWorkers rcp' (Just recipientFiles)
|
||||
("", rfId', RFPROG _ _) <- rfGet rcp'
|
||||
liftIO $ rfId' `shouldBe` rfId
|
||||
disconnectAgentClient rcp'
|
||||
disposeAgentClient rcp'
|
||||
|
||||
threadDelay 100000
|
||||
|
||||
@@ -231,7 +231,7 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do
|
||||
rfId <- xftpReceiveFile rcp 1 rfd Nothing
|
||||
liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt
|
||||
pure rfId
|
||||
disconnectAgentClient rcp
|
||||
disposeAgentClient rcp
|
||||
|
||||
[prefixDir] <- listDirectory recipientFiles
|
||||
let tmpPath = recipientFiles </> prefixDir </> "xftp.encrypted"
|
||||
@@ -258,7 +258,7 @@ testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do
|
||||
sfId <- xftpSendFile sndr 1 (CF.plain filePath) 2
|
||||
liftIO $ timeout 1000000 (get sndr) `shouldReturn` Nothing -- wait for worker to encrypt and attempt to create file
|
||||
pure sfId
|
||||
disconnectAgentClient sndr
|
||||
disposeAgentClient sndr
|
||||
|
||||
dirEntries <- listDirectory senderFiles
|
||||
let prefixDir = fromJust $ find (isSuffixOf "_snd.xftp") dirEntries
|
||||
@@ -273,7 +273,7 @@ testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do
|
||||
runRight_ $ xftpStartWorkers sndr' (Just senderFiles)
|
||||
("", sfId', SFPROG _ _) <- sfGet sndr'
|
||||
liftIO $ sfId' `shouldBe` sfId
|
||||
disconnectAgentClient sndr'
|
||||
disposeAgentClient sndr'
|
||||
|
||||
threadDelay 100000
|
||||
|
||||
@@ -310,7 +310,7 @@ testXFTPAgentSendCleanup = withGlobalLogging logCfgNoLogs $ do
|
||||
(_, _, SFPROG _ _) <- sfGet sndr
|
||||
pure ()
|
||||
pure sfId
|
||||
disconnectAgentClient sndr
|
||||
disposeAgentClient sndr
|
||||
pure sfId
|
||||
|
||||
dirEntries <- listDirectory senderFiles
|
||||
@@ -353,7 +353,7 @@ testXFTPAgentDelete = withGlobalLogging logCfgNoLogs $
|
||||
xftpDeleteSndFileRemote sndr 1 sfId sndDescr
|
||||
Nothing <- liftIO $ 100000 `timeout` sfGet sndr
|
||||
pure ()
|
||||
disconnectAgentClient rcp1
|
||||
disposeAgentClient rcp1
|
||||
|
||||
threadDelay 1000000
|
||||
length <$> listDirectory xftpServerFiles `shouldReturn` 0
|
||||
@@ -379,8 +379,8 @@ testXFTPAgentDeleteRestore = withGlobalLogging logCfgNoLogs $ do
|
||||
rcp1 <- getSMPAgentClient' agentCfg initAgentServers testDB2
|
||||
runRight_ $
|
||||
void $ testReceive rcp1 rfd1 filePath
|
||||
disconnectAgentClient rcp1
|
||||
disconnectAgentClient sndr
|
||||
disposeAgentClient rcp1
|
||||
disposeAgentClient sndr
|
||||
pure (sfId, sndDescr, rfd2)
|
||||
|
||||
-- delete file - should not succeed with server down
|
||||
@@ -389,7 +389,7 @@ testXFTPAgentDeleteRestore = withGlobalLogging logCfgNoLogs $ do
|
||||
xftpStartWorkers sndr (Just senderFiles)
|
||||
xftpDeleteSndFileRemote sndr 1 sfId sndDescr
|
||||
liftIO $ timeout 300000 (get sndr) `shouldReturn` Nothing -- wait for worker attempt
|
||||
disconnectAgentClient sndr
|
||||
disposeAgentClient sndr
|
||||
|
||||
threadDelay 300000
|
||||
length <$> listDirectory xftpServerFiles `shouldReturn` 6
|
||||
|
||||
Reference in New Issue
Block a user