mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-06 22:01:54 +00:00
notify up, fix test
This commit is contained in:
@@ -932,7 +932,20 @@ reconnectSMPServerClients c = do
|
||||
pure (clients, qs <> qs')
|
||||
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone DOWN_ALL)
|
||||
mapM_ (liftIO . forkIO . closeClient_ c) clients
|
||||
void $ subscribeQueues c qs
|
||||
(qSubRs, _) <- subscribeQueues c qs
|
||||
let upConns = subscribedConnsByServer qSubRs
|
||||
forM_ (M.toList upConns) $ \(server, connIds) ->
|
||||
liftIO $ notifyUP server (S.toList . S.fromList $ connIds)
|
||||
where
|
||||
subscribedConnsByServer :: [(RcvQueue, Either AgentErrorType ())] -> Map SMPServer [ConnId]
|
||||
subscribedConnsByServer = foldl' insertConnId M.empty
|
||||
where
|
||||
insertConnId :: Map SMPServer [ConnId] -> (RcvQueue, Either AgentErrorType ()) -> Map SMPServer [ConnId]
|
||||
insertConnId acc (RcvQueue {server, connId}, qSubResult) = case qSubResult of
|
||||
Right _ -> M.insertWith (<>) server [connId] acc
|
||||
Left _ -> acc
|
||||
notifyUP :: SMPServer -> [ConnId] -> IO ()
|
||||
notifyUP server connIds = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone (UP server connIds))
|
||||
|
||||
reconnectSMPServer :: AgentClient -> UserId -> SMPServer -> IO ()
|
||||
reconnectSMPServer c userId srv = do
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# OPTIONS_GHC -Wno-orphans #-}
|
||||
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
|
||||
|
||||
module AgentTests.FunctionalAPITests
|
||||
( functionalAPITests,
|
||||
@@ -2904,7 +2905,7 @@ testDeliveryReceiptsConcurrent t =
|
||||
_ -> error "timeout"
|
||||
|
||||
testTwoUsers :: HasCallStack => IO ()
|
||||
testTwoUsers = withAgentClients2 $ \a b -> do
|
||||
testTwoUsers = withAgentClientsCfg2 aCfg aCfg $ \a b -> do
|
||||
let nc = netCfg initAgentServers
|
||||
sessionMode nc `shouldBe` TSMUser
|
||||
runRight_ $ do
|
||||
@@ -2916,8 +2917,7 @@ testTwoUsers = withAgentClients2 $ \a b -> do
|
||||
b `hasClients` 1
|
||||
liftIO $ setNetworkConfig a nc {sessionMode = TSMEntity}
|
||||
liftIO $ threadDelay 250000
|
||||
("", "", DOWN _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
("", "", DOWN_ALL) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
a `hasClients` 2
|
||||
|
||||
@@ -2926,7 +2926,7 @@ testTwoUsers = withAgentClients2 $ \a b -> do
|
||||
liftIO $ threadDelay 250000
|
||||
liftIO $ setNetworkConfig a nc {sessionMode = TSMUser}
|
||||
liftIO $ threadDelay 250000
|
||||
("", "", DOWN _ _) <- nGet a
|
||||
("", "", DOWN_ALL) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
a `hasClients` 1
|
||||
|
||||
@@ -2939,10 +2939,7 @@ testTwoUsers = withAgentClients2 $ \a b -> do
|
||||
b `hasClients` 1
|
||||
liftIO $ setNetworkConfig a nc {sessionMode = TSMEntity}
|
||||
liftIO $ threadDelay 250000
|
||||
("", "", DOWN _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
("", "", DOWN_ALL) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
a `hasClients` 4
|
||||
exchangeGreetingsMsgId 6 a bId1 b aId1
|
||||
@@ -2952,8 +2949,7 @@ testTwoUsers = withAgentClients2 $ \a b -> do
|
||||
liftIO $ threadDelay 250000
|
||||
liftIO $ setNetworkConfig a nc {sessionMode = TSMUser}
|
||||
liftIO $ threadDelay 250000
|
||||
("", "", DOWN _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
("", "", DOWN_ALL) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
a `hasClients` 2
|
||||
exchangeGreetingsMsgId 8 a bId1 b aId1
|
||||
@@ -2961,6 +2957,8 @@ testTwoUsers = withAgentClients2 $ \a b -> do
|
||||
exchangeGreetingsMsgId 6 a bId2 b aId2
|
||||
exchangeGreetingsMsgId 6 a bId2' b aId2'
|
||||
where
|
||||
aCfg :: AgentConfig
|
||||
aCfg = agentCfg {tbqSize = 16}
|
||||
hasClients :: HasCallStack => AgentClient -> Int -> ExceptT AgentErrorType IO ()
|
||||
hasClients c n = liftIO $ M.size <$> readTVarIO (smpClients c) `shouldReturn` n
|
||||
|
||||
|
||||
Reference in New Issue
Block a user