fix functional tests

This commit is contained in:
Alexander Bondarenko
2024-05-30 17:20:51 +03:00
parent 7f9b013a13
commit 2ce3c712e1

View File

@@ -13,6 +13,7 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -Wno-ambiguous-fields #-}
{-# OPTIONS_GHC -Wno-orphans #-}
module AgentTests.FunctionalAPITests
@@ -114,7 +115,7 @@ a ##> t = withTimeout a (`shouldBe` t)
a =##> p =
withTimeout a $ \r -> do
unless (p r) $ liftIO $ putStrLn $ "value failed predicate: " <> show r
r `shouldSatisfy` p
withFrozenCallStack $ r `shouldSatisfy` p
withTimeout :: (HasCallStack, MonadUnliftIO m) => m a -> (HasCallStack => a -> Expectation) -> m ()
withTimeout a test =
@@ -134,6 +135,13 @@ sfGet c = withFrozenCallStack $ get' @'AESndFile c
nGet :: (MonadIO m, HasCallStack) => AgentClient -> m (AEntityTransmission 'AENone)
nGet c = withFrozenCallStack $ get' @'AENone c
nGetUP :: (MonadIO m, HasCallStack) => AgentClient -> m (AEntityTransmission 'AENone)
nGetUP c = withFrozenCallStack $ liftIO $ do
timeout 15000000 (pGet_ c True) >>= \case
Just (corrId, connId, APC _ cmd@UP {}) -> pure (corrId, connId, cmd)
Just (_, _, APC _ cmd) -> error $ "unexpected command " <> show cmd
Nothing -> error "timed out waiting for UP"
get' :: forall e m. (MonadIO m, AEntityI e, HasCallStack) => AgentClient -> m (AEntityTransmission e)
get' c = withFrozenCallStack $ do
(corrId, connId, APC e cmd) <- pGet c
@@ -141,14 +149,18 @@ get' c = withFrozenCallStack $ do
Just Refl -> pure (corrId, connId, cmd)
_ -> error $ "unexpected command " <> show cmd
pGet :: forall m. MonadIO m => AgentClient -> m (ATransmission 'Agent)
pGet c = do
pGet :: forall m. (MonadIO m, HasCallStack) => AgentClient -> m (ATransmission 'Agent)
pGet c = withFrozenCallStack $ pGet_ c False
pGet_ :: forall m. (MonadIO m, HasCallStack) => AgentClient -> Bool -> m (ATransmission 'Agent)
pGet_ c expectUp = withFrozenCallStack $ do
t@(_, _, APC _ cmd) <- atomically (readTBQueue $ subQ c)
case cmd of
CONNECT {} -> pGet c
DISCONNECT {} -> pGet c
ERR (BROKER _ NETWORK) -> pGet c
MWARN {} -> pGet c
CONNECT {} -> pGet_ c expectUp
DISCONNECT {} -> pGet_ c expectUp
ERR (BROKER _ NETWORK) -> pGet_ c expectUp
MWARN {} -> pGet_ c expectUp
UP {} | not expectUp -> pGet_ c expectUp
_ -> pure t
pattern CONF :: ConfirmationId -> [SMPServer] -> ConnInfo -> ACommand 'Agent e
@@ -767,7 +779,7 @@ testAsyncServerOffline t = withAgentClients2 $ \alice bob -> do
conns `shouldBe` [bobId]
-- connection succeeds after server start
withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do
("", "", UP srv1 conns1) <- nGet alice
("", "", UP srv1 conns1) <- nGetUP alice
liftIO $ do
srv1 `shouldBe` testSMPServer
conns1 `shouldBe` [bobId]
@@ -807,8 +819,7 @@ testAllowConnectionClientRestart t = do
withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile2} testPort2 $ \_ -> do
runRight $ do
("", "", UP _ _) <- nGet bob
("", "", UP _ _) <- nGetUP bob
subscribeConnection alice2 bobId
get alice2 ##> ("", bobId, CON)
@@ -955,7 +966,7 @@ testDeliverClientRestart t = do
withSmpServerStoreMsgLogOn t testPort $ \_ -> do
runRight_ $ do
("", "", UP _ _) <- nGet alice
("", "", UP _ _) <- nGetUP alice
subscribeConnection bob2 aliceId
@@ -1075,8 +1086,8 @@ testExpireMessage t =
5 <- runRight $ sendMessage a bId SMP.noMsgFlags "2" -- this won't expire
get a =##> \case ("", c, MERR 4 (BROKER _ e)) -> bId == c && (e == TIMEOUT || e == NETWORK); _ -> False
withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do
withUP a bId $ \case ("", _, SENT 5) -> True; _ -> False
withUP b aId $ \case ("", _, MsgErr 4 (MsgSkipped 3 3) "2") -> True; _ -> False
get a =##> \case ("", _, SENT 5) -> True; _ -> False
get b =##> \case ("", _, MsgErr 4 (MsgSkipped 3 3) "2") -> True; _ -> False
ackMessage b aId 4 Nothing
testExpireManyMessages :: HasCallStack => ATransport -> IO ()
@@ -1106,19 +1117,10 @@ testExpireManyMessages t =
liftIO $ expected c e `shouldBe` True
r -> error $ show r
withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do
withUP a bId $ \case ("", _, SENT 7) -> True; _ -> False
withUP b aId $ \case ("", _, MsgErr 4 (MsgSkipped 3 5) "4") -> True; _ -> False
get a =##> \case ("", _, SENT 7) -> True; _ -> False
get b =##> \case ("", _, MsgErr 4 (MsgSkipped 3 5) "4") -> True; _ -> False
ackMessage b aId 4 Nothing
withUP :: AgentClient -> ConnId -> (AEntityTransmission 'AEConn -> Bool) -> ExceptT AgentErrorType IO ()
withUP a bId p =
liftIO $
getInAnyOrder
a
[ \case ("", "", APC SAENone (UP _ [c])) -> c == bId; _ -> False,
\case (corrId, c, APC SAEConn cmd) -> c == bId && p (corrId, c, cmd); _ -> False
]
testExpireMessageQuota :: HasCallStack => ATransport -> IO ()
testExpireMessageQuota t = withSmpServerConfigOn t cfg {msgQueueQuota = 1} testPort $ \_ -> do
a <- getSMPAgentClient' 1 agentCfg {quotaExceededTimeout = 1, messageRetryInterval = fastMessageRetryInterval} initAgentServers testDB
@@ -1268,19 +1270,14 @@ testRatchetSyncServerOffline t = withAgentClients2 $ \alice bob -> do
withSmpServerStoreMsgLogOn t testPort $ \_ -> do
concurrently_
(getInAnyOrder alice [ratchetSyncP' bobId RSAgreed, serverUpP])
(getInAnyOrder bob2 [ratchetSyncP' aliceId RSAgreed, serverUpP])
(pGet alice =##> ratchetSyncP' bobId RSAgreed)
(pGet bob2 =##> ratchetSyncP' aliceId RSAgreed)
runRight_ $ do
get alice =##> ratchetSyncP bobId RSOk
get bob2 =##> ratchetSyncP aliceId RSOk
exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9
disposeAgentClient bob2
serverUpP :: ATransmission 'Agent -> Bool
serverUpP = \case
("", "", APC SAENone (UP _ _)) -> True
_ -> False
testRatchetSyncClientRestart :: HasCallStack => ATransport -> IO ()
testRatchetSyncClientRestart t = do
alice <- getSMPAgentClient' 1 agentCfg initAgentServers testDB
@@ -1295,7 +1292,7 @@ testRatchetSyncClientRestart t = do
bob3 <- getSMPAgentClient' 3 agentCfg initAgentServers testDB2
withSmpServerStoreMsgLogOn t testPort $ \_ -> do
runRight_ $ do
("", "", UP _ _) <- nGet alice
("", "", UP _ _) <- nGetUP alice
subscribeConnection bob3 aliceId
get alice =##> ratchetSyncP bobId RSAgreed
get bob3 =##> ratchetSyncP aliceId RSAgreed
@@ -1324,13 +1321,11 @@ testRatchetSyncSuspendForeground t = do
foregroundAgent bob2
withSmpServerStoreMsgLogOn t testPort $ \_ -> do
concurrently_
(getInAnyOrder alice [ratchetSyncP' bobId RSAgreed, serverUpP])
(getInAnyOrder bob2 [ratchetSyncP' aliceId RSAgreed, serverUpP])
runRight_ $ do
get alice =##> ratchetSyncP bobId RSOk
get bob2 =##> ratchetSyncP aliceId RSOk
exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9
get alice =##> ratchetSyncP bobId RSAgreed
get bob2 =##> ratchetSyncP aliceId RSAgreed
get alice =##> ratchetSyncP bobId RSOk
get bob2 =##> ratchetSyncP aliceId RSOk
runRight_ $ exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9
disposeAgentClient alice
disposeAgentClient bob
disposeAgentClient bob2
@@ -1352,13 +1347,11 @@ testRatchetSyncSimultaneous t = do
liftIO $ aRSS `shouldBe` RSStarted
withSmpServerStoreMsgLogOn t testPort $ \_ -> do
concurrently_
(getInAnyOrder alice [ratchetSyncP' bobId RSAgreed, serverUpP])
(getInAnyOrder bob2 [ratchetSyncP' aliceId RSAgreed, serverUpP])
runRight_ $ do
get alice =##> ratchetSyncP bobId RSOk
get bob2 =##> ratchetSyncP aliceId RSOk
exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9
get alice =##> ratchetSyncP bobId RSAgreed
get bob2 =##> ratchetSyncP aliceId RSAgreed
get alice =##> ratchetSyncP bobId RSOk
get bob2 =##> ratchetSyncP aliceId RSOk
runRight_ $ exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9
disposeAgentClient alice
disposeAgentClient bob
disposeAgentClient bob2
@@ -1454,6 +1447,7 @@ testActiveClientNotDisconnected t = do
where
keepSubscribing :: AgentClient -> ConnId -> SystemTime -> ExceptT AgentErrorType IO ()
keepSubscribing alice connId ts = do
atomically $ void . flushTBQueue $ subQ alice -- drain queue so subscribeConnection may proceed with UPs
ts' <- liftIO getSystemTime
if milliseconds ts' - milliseconds ts < 2200
then do
@@ -1504,13 +1498,11 @@ testSuspendingAgentCompleteSending t = withAgentClients2 $ \a b -> do
liftIO $ suspendAgent b 5000000
withSmpServerStoreLogOn t testPort $ \_ -> runRight_ @AgentErrorType $ do
pGet b =##> \case ("", c, APC SAEConn (SENT 5)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False
pGet b =##> \case ("", c, APC SAEConn (SENT 5)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False
pGet b =##> \case ("", c, APC SAEConn (SENT 6)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False
pGet b =##> \case ("", c, APC SAEConn (SENT 5)) -> c == aId; _ -> False
pGet b =##> \case ("", c, APC SAEConn (SENT 6)) -> c == aId; _ -> False
("", "", SUSPENDED) <- nGet b
pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; ("", "", APC _ UP {}) -> True; _ -> False
pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; ("", "", APC _ UP {}) -> True; _ -> False
pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; _ -> False
ackMessage a bId 5 Nothing
get a =##> \case ("", c, Msg "how are you?") -> c == bId; _ -> False
ackMessage a bId 6 Nothing
@@ -1536,7 +1528,7 @@ testSuspendingAgentTimeout t = withAgentClients2 $ \a b -> do
testBatchedSubscriptions :: Int -> Int -> ATransport -> IO ()
testBatchedSubscriptions nCreate nDel t =
withAgentClientsCfgServers2 agentCfg agentCfg initAgentServers2 $ \a b -> do
withAgentClientsCfgServers2 agentCfgN agentCfgN initAgentServers2 $ \a b -> do
conns <- runServers $ do
conns <- replicateM nCreate $ makeConnection_ PQSupportOff a b
forM_ conns $ \(aId, bId) -> exchangeGreetings_ PQEncOff a bId b aId
@@ -1550,10 +1542,10 @@ testBatchedSubscriptions nCreate nDel t =
("", "", DOWN {}) <- nGet b
("", "", DOWN {}) <- nGet b
runServers $ do
("", "", UP {}) <- nGet a
("", "", UP {}) <- nGet a
("", "", UP {}) <- nGet b
("", "", UP {}) <- nGet b
("", "", UP {}) <- nGetUP a
("", "", UP {}) <- nGetUP a
("", "", UP {}) <- nGetUP b
("", "", UP {}) <- nGetUP b
liftIO $ threadDelay 1000000
let (aIds, bIds) = unzip conns
conns' = drop nDel conns
@@ -1569,6 +1561,8 @@ testBatchedSubscriptions nCreate nDel t =
deleteFail a bIds'
deleteFail b aIds'
where
agentCfgN :: AgentConfig
agentCfgN = agentCfg {tbqSize = fromIntegral nCreate} -- without reader thread sub notifications would be flushed until all batches finish, blocking on `notifySub`
subscribe :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO ()
subscribe c cs = do
r <- subscribeConnections c cs
@@ -1797,13 +1791,7 @@ testWaitDelivery t =
get alice ##> ("", bobId, SENT $ baseId + 3)
get alice ##> ("", bobId, SENT $ baseId + 4)
get alice =##> \case ("", cId, DEL_CONN) -> cId == bobId; _ -> False
liftIO $
getInAnyOrder
bob
[ \case ("", "", APC SAENone (UP _ [cId])) -> cId == aliceId; _ -> False,
\case ("", cId, APC SAEConn (Msg "how are you?")) -> cId == aliceId; _ -> False
]
get bob =##> \case ("", cId, Msg "how are you?") -> cId == aliceId; _ -> False
ackMessage bob aliceId (baseId + 3) Nothing
get bob =##> \case ("", c, Msg "message 1") -> c == aliceId; _ -> False
ackMessage bob aliceId (baseId + 4) Nothing
@@ -1894,7 +1882,7 @@ testWaitDeliveryTimeout t =
liftIO $ threadDelay 100000
withSmpServerStoreLogOn t testPort $ \_ -> do
nGet bob =##> \case ("", "", UP _ [cId]) -> cId == aliceId; _ -> False
nGetUP bob =##> \case ("", "", UP _ [cId]) -> cId == aliceId; _ -> False
liftIO $ noMessages alice "nothing else should be delivered to alice"
liftIO $ noMessages bob "nothing else should be delivered to bob"
where
@@ -1935,12 +1923,7 @@ testWaitDeliveryTimeout2 t =
get alice ##> ("", bobId, SENT $ baseId + 3)
-- "message 1" not delivered
liftIO $
getInAnyOrder
bob
[ \case ("", "", APC SAENone (UP _ [cId])) -> cId == aliceId; _ -> False,
\case ("", cId, APC SAEConn (Msg "how are you?")) -> cId == aliceId; _ -> False
]
get bob =##> \case ("", cId, Msg "how are you?") -> cId == aliceId; _ -> False
liftIO $ noMessages alice "nothing else should be delivered to alice"
liftIO $ noMessages bob "nothing else should be delivered to bob"
where
@@ -1964,14 +1947,8 @@ testJoinConnectionAsyncReplyError t = do
withSmpServerOn t testPort2 $ do
get b =##> \case ("2", c, OK) -> c == aId; _ -> False
confId <- withSmpServerStoreLogOn t testPort $ \_ -> do
pGet a >>= \case
("", "", APC _ (UP _ [_])) -> do
("", _, CONF confId _ "bob's connInfo") <- get a
pure confId
("", _, APC _ (CONF confId _ "bob's connInfo")) -> do
("", "", UP _ [_]) <- nGet a
pure confId
r -> error $ "unexpected response " <> show r
("", _, CONF confId _ "bob's connInfo") <- get a
pure confId
nGet a =##> \case ("", "", DOWN _ [c]) -> c == bId; _ -> False
runRight_ $ do
allowConnectionAsync a "3" bId confId "alice's connInfo"
@@ -1979,8 +1956,7 @@ testJoinConnectionAsyncReplyError t = do
ConnectionStats {rcvQueuesInfo = [RcvQueueInfo {}], sndQueuesInfo = [SndQueueInfo {}]} <- getConnectionServers b aId
pure ()
withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do
pGet a =##> \case ("3", c, APC _ OK) -> c == bId; ("", "", APC _ (UP _ [c])) -> c == bId; _ -> False
pGet a =##> \case ("3", c, APC _ OK) -> c == bId; ("", "", APC _ (UP _ [c])) -> c == bId; _ -> False
pGet a =##> \case ("3", c, APC _ OK) -> c == bId; _ -> False
get a ##> ("", bId, CON)
get b ##> ("", aId, INFO "alice's connInfo")
get b ##> ("", aId, CON)
@@ -2032,8 +2008,8 @@ testUsersNoServer t = withAgentClientsCfg2 aCfg agentCfg $ \a b -> do
nGet a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False
liftIO $ noMessages a "nothing else should be delivered to alice"
withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do
nGet a =##> \case ("", "", UP _ [c]) -> c == bId; _ -> False
nGet b =##> \case ("", "", UP _ cs) -> length cs == 2; _ -> False
nGetUP a =##> \case ("", "", UP _ [c]) -> c == bId; _ -> False
nGetUP b =##> \case ("", "", UP _ cs) -> length cs == 2; _ -> False
exchangeGreetingsMsgId 6 a bId b aId
where
aCfg = agentCfg {initialCleanupDelay = 10000, cleanupInterval = 10000, deleteErrorCount = 3}
@@ -2600,7 +2576,7 @@ testTwoUsers = withAgentClients2 $ \a b -> do
liftIO $ setNetworkConfig a nc {sessionMode = TSMEntity}
liftIO $ threadDelay 250000
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGetUP a
a `hasClients` 2
exchangeGreetingsMsgId 6 a bId1 b aId1
@@ -2610,8 +2586,8 @@ testTwoUsers = withAgentClients2 $ \a b -> do
liftIO $ threadDelay 250000
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGetUP a
("", "", UP _ _) <- nGetUP a
a `hasClients` 1
aUserId2 <- createUser a [noAuthSrv testSMPServer] [noAuthSrv testXFTPServer]
@@ -2625,8 +2601,8 @@ testTwoUsers = withAgentClients2 $ \a b -> do
liftIO $ threadDelay 250000
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGetUP a
("", "", UP _ _) <- nGetUP a
a `hasClients` 4
exchangeGreetingsMsgId 8 a bId1 b aId1
exchangeGreetingsMsgId 8 a bId1' b aId1'
@@ -2639,10 +2615,10 @@ testTwoUsers = withAgentClients2 $ \a b -> do
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGetUP a
("", "", UP _ _) <- nGetUP a
("", "", UP _ _) <- nGetUP a
("", "", UP _ _) <- nGetUP a
a `hasClients` 2
exchangeGreetingsMsgId 10 a bId1 b aId1
exchangeGreetingsMsgId 10 a bId1' b aId1'