diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index 69896408c..9b5973edd 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -117,6 +117,9 @@ xftpTransportHost XFTPClient {http2Client = HTTP2Client {client_ = HClient {host xftpSessionTs :: XFTPClient -> UTCTime xftpSessionTs = sessionTs . http2Client +xftpSessionId :: XFTPClient -> ByteString +xftpSessionId = sessionId . http2Client + xftpHTTP2Config :: TransportClientConfig -> XFTPClientConfig -> HTTP2ClientConfig xftpHTTP2Config transportConfig XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpConnectTimeout}} = defaultHTTP2ClientConfig diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 29b153464..051ad136b 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1792,12 +1792,11 @@ getAgentMigrations' :: AgentMonad m => AgentClient -> m [UpMigration] getAgentMigrations' c = map upMigration <$> withStore' c (Migrations.getCurrent . DB.conn) debugAgentLocks' :: AgentMonad' m => AgentClient -> m AgentLocks -debugAgentLocks' AgentClient {connLocks = cs, invLocks = is, reconnectLocks = rs, deleteLock = d} = do +debugAgentLocks' AgentClient {connLocks = cs, invLocks = is, deleteLock = d} = do connLocks <- getLocks cs invLocks <- getLocks is - srvLocks <- getLocks rs delLock <- atomically $ tryReadTMVar d - pure AgentLocks {connLocks, invLocks, srvLocks, delLock} + pure AgentLocks {connLocks, invLocks, delLock} where getLocks ls = atomically $ M.mapKeys (B.unpack . strEncode) . M.mapMaybe id <$> (mapM tryReadTMVar =<< readTVar ls) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 42f69cc21..fd5aadecc 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -264,15 +264,19 @@ data AgentClient = AgentClient invLocks :: TMap ByteString Lock, -- lock to prevent concurrency between periodic and async connection deletions deleteLock :: Lock, - -- locks to prevent concurrent reconnections to SMP servers - reconnectLocks :: TMap SMPTransportSession Lock, - reconnections :: TAsyncs, + -- smpSubWorkers for SMP servers sessions + smpSubWorkers :: TMap SMPTransportSession (TMVar SubWorker), asyncClients :: TAsyncs, agentStats :: TMap AgentStatsKey (TVar Int), clientId :: Int, agentEnv :: Env } +data SubWorker = SubWorker + { subWorkerId :: Int, + subWorkerAsync :: Async () + } + getAgentWorker :: (AgentMonad' m, Ord k, Show k) => String -> Bool -> AgentClient -> k -> TMap k Worker -> (Worker -> ExceptT AgentErrorType m ()) -> m Worker getAgentWorker = getAgentWorker' id pure @@ -361,7 +365,6 @@ data AgentState = ASForeground | ASSuspending | ASSuspended data AgentLocks = AgentLocks { connLocks :: Map String String, invLocks :: Map String String, - srvLocks :: Map String String, delLock :: Maybe String } deriving (Show) @@ -407,8 +410,7 @@ newAgentClient InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do connLocks <- TM.empty invLocks <- TM.empty deleteLock <- createLock - reconnectLocks <- TM.empty - reconnections <- newTAsyncs + smpSubWorkers <- TM.empty asyncClients <- newTAsyncs agentStats <- TM.empty clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i') @@ -443,8 +445,7 @@ newAgentClient InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do connLocks, invLocks, deleteLock, - reconnectLocks, - reconnections, + smpSubWorkers, asyncClients, agentStats, clientId, @@ -465,6 +466,7 @@ class (Encoding err, Show err) => ProtocolServerClient err msg | msg -> err wher clientServer :: Client msg -> String clientTransportHost :: Client msg -> TransportHost clientSessionTs :: Client msg -> UTCTime + clientSessionId :: Client msg -> ByteString instance ProtocolServerClient ErrorType BrokerMsg where type Client BrokerMsg = ProtocolClient ErrorType BrokerMsg @@ -474,6 +476,7 @@ instance ProtocolServerClient ErrorType BrokerMsg where clientServer = protocolClientServer clientTransportHost = transportHost' clientSessionTs = sessionTs + clientSessionId = sessionId instance ProtocolServerClient ErrorType NtfResponse where type Client NtfResponse = ProtocolClient ErrorType NtfResponse @@ -483,6 +486,7 @@ instance ProtocolServerClient ErrorType NtfResponse where clientServer = protocolClientServer clientTransportHost = transportHost' clientSessionTs = sessionTs + clientSessionId = sessionId instance ProtocolServerClient XFTPErrorType FileResponse where type Client FileResponse = XFTPClient @@ -492,16 +496,15 @@ instance ProtocolServerClient XFTPErrorType FileResponse where clientServer = X.xftpClientServer clientTransportHost = X.xftpTransportHost clientSessionTs = X.xftpSessionTs + clientSessionId = X.xftpSessionId getSMPServerClient :: forall m. AgentMonad m => AgentClient -> SMPTransportSession -> m SMPClient getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, _) = do unlessM (readTVarIO active) . throwError $ INACTIVE - atomically (getClientVar tSess smpClients) + atomically (getTSessVar tSess smpClients) >>= either newClient (waitForProtocolClient c tSess) where - newClient v = do - tc <- newTVarIO 0 - newProtocolClient c tSess smpClients connectClient (reconnectSMPClient 0 tc) v + newClient = newProtocolClient c tSess smpClients connectClient resubscribeSMPSession connectClient :: m SMPClient connectClient = do cfg <- getClientConfig c smpCfg @@ -515,7 +518,7 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, where removeClientAndSubs :: IO ([RcvQueue], [ConnId]) removeClientAndSubs = atomically $ do - TM.delete tSess smpClients + removeClientVar client tSess smpClients qs <- RQ.getDelSessQueues tSess $ activeSubs c mapM_ (`RQ.addQueue` pendingSubs c) qs let cs = S.fromList $ map qConnId qs @@ -529,41 +532,49 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, unless (null conns) $ notifySub "" $ DOWN srv conns unless (null qs) $ do atomically $ mapM_ (releaseGetLock c) qs - unliftIO u $ reconnectServer c tSess + unliftIO u $ resubscribeSMPSession c tSess notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO () notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd) -reconnectServer :: AgentMonad m => AgentClient -> SMPTransportSession -> m () -reconnectServer c tSess = newAsyncAction tryReconnectSMPClient $ reconnections c +resubscribeSMPSession :: AgentMonad' m => AgentClient -> SMPTransportSession -> m () +resubscribeSMPSession c@AgentClient {smpSubWorkers} tSess = + atomically (getTSessVar tSess smpSubWorkers) >>= either newSubWorker (\_ -> pure ()) where - tryReconnectSMPClient aId = do + newSubWorker v = do + subWorkerId <- atomically $ stateTVar (workerSeq c) $ \next -> (next, next + 1) + subWorkerAsync <- async $ runSubWorker subWorkerId `E.catchAny` const (atomically $ cleanup subWorkerId) + atomically $ putTMVar v SubWorker {subWorkerId, subWorkerAsync} + runSubWorker swId = do ri <- asks $ reconnectInterval . config timeoutCounts <- newTVarIO 0 - withRetryIntervalCount ri $ \n _ loop -> - reconnectSMPClient n timeoutCounts c tSess `catchAgentError` const loop - atomically . removeAsyncAction aId $ reconnections c + withRetryInterval ri $ \_ loop -> do + pending <- atomically $ do + qs <- RQ.getSessQueues tSess (pendingSubs c) + when (null qs) $ cleanup swId + pure qs + forM_ (L.nonEmpty pending) $ \qs -> do + void . runExceptT $ reconnectSMPClient timeoutCounts c tSess qs `catchAgentError` \_ -> pure () + loop + cleanup :: Int -> STM () + cleanup swId = removeTSessVar ((swId ==) . subWorkerId) tSess smpSubWorkers -reconnectSMPClient :: forall m. AgentMonad m => Int -> TVar Int -> AgentClient -> SMPTransportSession -> m () -reconnectSMPClient n tc c tSess@(_, srv, _) = do - ts <- liftIO getCurrentTime - let label = unwords ["reconnect", show n, show ts] - withLockMap_ (reconnectLocks c) tSess label $ do - qs <- atomically (RQ.getSessQueues tSess $ pendingSubs c) - NetworkConfig {tcpTimeout} <- readTVarIO $ useNetworkConfig c - -- this allows 3x of timeout per batch of subscription (90 queues per batch empirically) - let t = (length qs `div` 90 + 1) * tcpTimeout * 3 - t `timeout` mapM_ resubscribe (L.nonEmpty qs) >>= \case - Just _ -> atomically $ writeTVar tc 0 - Nothing -> do - tc' <- atomically $ stateTVar tc $ \i -> (i + 1, i + 1) - maxTC <- asks $ maxSubscriptionTimeouts . config - let err = if tc' >= maxTC then CRITICAL True else INTERNAL - msg = show tc' <> " consecutive subscription timeouts: " <> show (length qs) <> " queues, transport session: " <> show tSess - atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ err msg) +reconnectSMPClient :: forall m. AgentMonad m => TVar Int -> AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> m () +reconnectSMPClient tc c tSess@(_, srv, _) qs = do + NetworkConfig {tcpTimeout} <- readTVarIO $ useNetworkConfig c + -- this allows 3x of timeout per batch of subscription (90 queues per batch empirically) + let t = (length qs `div` 90 + 1) * tcpTimeout * 3 + t `timeout` resubscribe >>= \case + Just _ -> atomically $ writeTVar tc 0 + Nothing -> do + tc' <- atomically $ stateTVar tc $ \i -> (i + 1, i + 1) + maxTC <- asks $ maxSubscriptionTimeouts . config + let err = if tc' >= maxTC then CRITICAL True else INTERNAL + msg = show tc' <> " consecutive subscription timeouts: " <> show (length qs) <> " queues, transport session: " <> show tSess + atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ err msg) where - resubscribe :: NonEmpty RcvQueue -> m () - resubscribe qs = do + resubscribe :: m () + resubscribe = do cs <- atomically . RQ.getConns $ activeSubs c rs <- subscribeQueues c $ L.toList qs let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs @@ -572,14 +583,17 @@ reconnectSMPClient n tc c tSess@(_, srv, _) = do unless (null conns) $ notifySub "" $ UP srv conns let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs liftIO $ mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs - mapM_ (throwError . snd) $ listToMaybe tempErrs + forM_ (listToMaybe tempErrs) $ \(_, err) -> do + when (null okConns && S.null cs && null finalErrs) . liftIO $ + closeClient c smpClients tSess + throwError err notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO () notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd) getNtfServerClient :: forall m. AgentMonad m => AgentClient -> NtfTransportSession -> m NtfClient getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = do unlessM (readTVarIO active) . throwError $ INACTIVE - atomically (getClientVar tSess ntfClients) + atomically (getTSessVar tSess ntfClients) >>= either (newProtocolClient c tSess ntfClients connectClient $ \_ _ -> pure ()) (waitForProtocolClient c tSess) @@ -591,7 +605,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d clientDisconnected :: NtfClient -> IO () clientDisconnected client = do - atomically $ TM.delete tSess ntfClients + atomically $ removeClientVar client tSess ntfClients incClientStat c userId client "DISCONNECT" "" atomically $ writeTBQueue (subQ c) ("", "", APC SAENone $ hostEvent DISCONNECT client) logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv @@ -599,7 +613,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d getXFTPServerClient :: forall m. AgentMonad m => AgentClient -> XFTPTransportSession -> m XFTPClient getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@(userId, srv, _) = do unlessM (readTVarIO active) . throwError $ INACTIVE - atomically (getClientVar tSess xftpClients) + atomically (getTSessVar tSess xftpClients) >>= either (newProtocolClient c tSess xftpClients connectClient $ \_ _ -> pure ()) (waitForProtocolClient c tSess) @@ -612,13 +626,13 @@ getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@ clientDisconnected :: XFTPClient -> IO () clientDisconnected client = do - atomically $ TM.delete tSess xftpClients + atomically $ removeClientVar client tSess xftpClients incClientStat c userId client "DISCONNECT" "" atomically $ writeTBQueue (subQ c) ("", "", APC SAENone $ hostEvent DISCONNECT client) logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv -getClientVar :: forall a s. TransportSession s -> TMap (TransportSession s) (TMVar a) -> STM (Either (TMVar a) (TMVar a)) -getClientVar tSess clients = maybe (Left <$> newClientVar) (pure . Right) =<< TM.lookup tSess clients +getTSessVar :: forall a s. TransportSession s -> TMap (TransportSession s) (TMVar a) -> STM (Either (TMVar a) (TMVar a)) +getTSessVar tSess clients = maybe (Left <$> newClientVar) (pure . Right) =<< TM.lookup tSess clients where newClientVar :: STM (TMVar a) newClientVar = do @@ -626,6 +640,18 @@ getClientVar tSess clients = maybe (Left <$> newClientVar) (pure . Right) =<< TM TM.insert tSess var clients pure var +removeClientVar :: ProtocolServerClient err msg => Client msg -> TransportSession msg -> TMap (TransportSession msg) (ClientVar msg) -> STM () +removeClientVar = removeTSessVar . either (const False) . sameClient + +sameClient :: ProtocolServerClient err msg => Client msg -> Client msg -> Bool +sameClient c c' = clientSessionId c == clientSessionId c' + +removeTSessVar :: (a -> Bool) -> TransportSession msg -> TMap (TransportSession msg) (TMVar a) -> STM () +removeTSessVar same tSess vs = + TM.lookup tSess vs + $>>= tryReadTMVar + >>= mapM_ (\v -> when (same v) $ TM.delete tSess vs) + waitForProtocolClient :: (AgentMonad m, ProtocolTypeI (ProtoType msg)) => AgentClient -> TransportSession msg -> ClientVar msg -> m (Client msg) waitForProtocolClient c (_, srv, _) clientVar = do NetworkConfig {tcpConnectTimeout} <- readTVarIO $ useNetworkConfig c @@ -635,6 +661,7 @@ waitForProtocolClient c (_, srv, _) clientVar = do Just (Left e) -> Left e Nothing -> Left $ BROKER (B.unpack $ strEncode srv) TIMEOUT +-- clientConnected arg is only passed for SMP server newProtocolClient :: forall err msg m. (AgentMonad m, ProtocolTypeI (ProtoType msg), ProtocolServerClient err msg) => @@ -645,7 +672,7 @@ newProtocolClient :: (AgentClient -> TransportSession msg -> m ()) -> ClientVar msg -> m (Client msg) -newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient reconnectClient clientVar = tryConnectClient pure tryConnectAsync +newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient clientConnected clientVar = tryConnectClient pure tryConnectAsync where tryConnectClient :: (Client msg -> m a) -> m () -> m a tryConnectClient successAction retryAction = @@ -669,7 +696,7 @@ newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient reconne connectAsync :: Int -> m () connectAsync aId = do ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \_ loop -> void $ tryConnectClient (const $ reconnectClient c tSess) loop + withRetryInterval ri $ \_ loop -> void $ tryConnectClient (const $ clientConnected c tSess) loop atomically . removeAsyncAction aId $ asyncClients c hostEvent :: forall err msg. (ProtocolTypeI (ProtoType msg), ProtocolServerClient err msg) => (AProtocolType -> TransportHost -> ACommand 'Agent 'AENone) -> Client msg -> ACommand 'Agent 'AENone @@ -687,7 +714,7 @@ closeAgentClient c = liftIO $ do closeProtocolServerClients c smpClients closeProtocolServerClients c ntfClients closeProtocolServerClients c xftpClients - cancelActions . actions $ reconnections c + atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect cancelActions . actions $ asyncClients c clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst) clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker @@ -701,6 +728,8 @@ closeAgentClient c = liftIO $ do clearWorkers workers = atomically $ swapTVar (workers c) mempty clear :: Monoid m => (AgentClient -> TVar m) -> IO () clear sel = atomically $ writeTVar (sel c) mempty + cancelReconnect :: TMVar SubWorker -> IO () + cancelReconnect v = void . forkIO $ atomically (readTMVar v) >>= \(SubWorker _ a) -> uninterruptibleCancel a cancelWorker :: Worker -> IO () cancelWorker Worker {doWork, action} = do @@ -991,7 +1020,7 @@ temporaryOrHostError = \case e -> temporaryAgentError e -- | Subscribe to queues. The list of results can have a different order. -subscribeQueues :: forall m. AgentMonad m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())] +subscribeQueues :: forall m. AgentMonad' m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())] subscribeQueues c qs = do (errs, qs') <- partitionEithers <$> mapM checkQueue qs forM_ qs' $ \rq@RcvQueue {connId} -> atomically $ do @@ -1009,13 +1038,13 @@ subscribeQueues c qs = do rs <- sendBatch subscribeSMPQueues smp qs' mapM_ (uncurry $ processSubResult c) rs when (any temporaryClientError . lefts . map snd $ L.toList rs) . unliftIO u $ - reconnectServer c (transportSession' smp) + resubscribeSMPSession c (transportSession' smp) pure rs type BatchResponses e r = (NonEmpty (RcvQueue, Either e r)) -- statBatchSize is not used to batch the commands, only for traffic statistics -sendTSessionBatches :: forall m q r. AgentMonad m => ByteString -> Int -> (q -> RcvQueue) -> (SMPClient -> NonEmpty q -> IO (BatchResponses SMPClientError r)) -> AgentClient -> [q] -> m [(RcvQueue, Either AgentErrorType r)] +sendTSessionBatches :: forall m q r. AgentMonad' m => ByteString -> Int -> (q -> RcvQueue) -> (SMPClient -> NonEmpty q -> IO (BatchResponses SMPClientError r)) -> AgentClient -> [q] -> m [(RcvQueue, Either AgentErrorType r)] sendTSessionBatches statCmd statBatchSize toRQ action c qs = concatMap L.toList <$> (mapConcurrently sendClientBatch =<< batchQueues) where @@ -1029,7 +1058,7 @@ sendTSessionBatches statCmd statBatchSize toRQ action c qs = in M.alter (Just . maybe [q] (q <|)) tSess m sendClientBatch :: (SMPTransportSession, NonEmpty q) -> m (BatchResponses AgentErrorType r) sendClientBatch (tSess@(userId, srv, _), qs') = - tryError (getSMPServerClient c tSess) >>= \case + runExceptT (getSMPServerClient c tSess) >>= \case Left e -> pure $ L.map ((,Left e) . toRQ) qs' Right smp -> liftIO $ do logServer "-->" c srv (bshow (length qs') <> " queues") statCmd @@ -1130,7 +1159,7 @@ enableQueueNotifications c rq@RcvQueue {rcvId, rcvPrivateKey} notifierKey rcvNtf withSMPClient c rq "NKEY " $ \smp -> enableSMPQueueNotifications smp rcvPrivateKey rcvId notifierKey rcvNtfPublicDhKey -enableQueuesNtfs :: forall m. AgentMonad m => AgentClient -> [(RcvQueue, SMP.NtfPublicVerifyKey, SMP.RcvNtfPublicDhKey)] -> m [(RcvQueue, Either AgentErrorType (SMP.NotifierId, SMP.RcvNtfPublicDhKey))] +enableQueuesNtfs :: forall m. AgentMonad' m => AgentClient -> [(RcvQueue, SMP.NtfPublicVerifyKey, SMP.RcvNtfPublicDhKey)] -> m [(RcvQueue, Either AgentErrorType (SMP.NotifierId, SMP.RcvNtfPublicDhKey))] enableQueuesNtfs = sendTSessionBatches "NKEY" 90 fst3 enableQueues_ where fst3 (x, _, _) = x @@ -1144,7 +1173,7 @@ disableQueueNotifications c rq@RcvQueue {rcvId, rcvPrivateKey} = withSMPClient c rq "NDEL" $ \smp -> disableSMPQueueNotifications smp rcvPrivateKey rcvId -disableQueuesNtfs :: forall m. AgentMonad m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())] +disableQueuesNtfs :: forall m. AgentMonad' m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())] disableQueuesNtfs = sendTSessionBatches "NDEL" 90 id $ sendBatch disableSMPQueuesNtfs sendAck :: AgentMonad m => AgentClient -> RcvQueue -> MsgId -> m () @@ -1171,7 +1200,7 @@ deleteQueue c rq@RcvQueue {rcvId, rcvPrivateKey} = do withSMPClient c rq "DEL" $ \smp -> deleteSMPQueue smp rcvPrivateKey rcvId -deleteQueues :: forall m. AgentMonad m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())] +deleteQueues :: forall m. AgentMonad' m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())] deleteQueues = sendTSessionBatches "DEL" 90 id $ sendBatch deleteSMPQueues sendAgentMessage :: AgentMonad m => AgentClient -> SndQueue -> MsgFlags -> ByteString -> m () diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 059430290..9793a8220 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -788,7 +788,7 @@ testDuplicateMessage t = do -- commenting two lines below and uncommenting further two lines would also runRight_, -- it is the scenario tested above, when the message was not acknowledged by the user threadDelay 200000 - Left (BROKER _ TIMEOUT) <- runExceptT $ ackMessage bob1 aliceId 5 Nothing + Left (BROKER _ NETWORK) <- runExceptT $ ackMessage bob1 aliceId 5 Nothing disconnectAgentClient alice disconnectAgentClient bob1