diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index e899e3c36..98d0cb6ea 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -85,7 +85,7 @@ import Data.List.NonEmpty (NonEmpty) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes) +import Data.Maybe (listToMaybe) import Data.Set (Set) import Data.Text.Encoding import Data.Tuple (swap) @@ -111,7 +111,7 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util import Simplex.Messaging.Version import System.Timeout (timeout) -import UnliftIO (async, pooledForConcurrentlyN) +import UnliftIO (async) import qualified UnliftIO.Exception as E import UnliftIO.STM @@ -270,34 +270,18 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do reconnectClient `catchError` const loop reconnectClient :: m () - reconnectClient = do - n <- asks $ resubscriptionConcurrency . config - withAgentLock c . withClient c srv $ \smp -> do - cs <- atomically $ mapM readTVar =<< TM.lookup srv (pendingSubscrSrvrs c) - -- TODO if any of the subscriptions fails here (e.g. because of timeout), it terminates the whole process for all subscriptions - -- instead it should only report successful subscriptions and schedule the next call to reconnectClient to subscribe for the remaining subscriptions - -- this way, for each DOWN event there can be several UP events - conns <- pooledForConcurrentlyN n (maybe [] M.toList cs) $ \sub@(connId, _) -> - ifM - (atomically $ hasActiveSubscription c connId) - (pure $ Just connId) - (subscribe_ smp sub `catchError` handleError connId) - liftIO . unless (null conns) . notifySub "" . UP srv $ catMaybes conns + reconnectClient = + withAgentLock c $ + atomically (TM.lookup srv (pendingSubscrSrvrs c) >>= mapM readTVar) + >>= mapM_ resubscribe where - subscribe_ :: SMPClient -> (ConnId, RcvQueue) -> ExceptT ProtocolClientError IO (Maybe ConnId) - subscribe_ smp (connId, rq@RcvQueue {rcvPrivateKey, rcvId}) = do - subscribeSMPQueue smp rcvPrivateKey rcvId - addSubscription c rq connId - pure $ Just connId - - handleError :: ConnId -> ProtocolClientError -> ExceptT ProtocolClientError IO (Maybe ConnId) - handleError connId = \case - e@PCEResponseTimeout -> throwError e - e@PCENetworkError -> throwError e - e -> do - liftIO . notifySub connId . ERR $ protocolClientError SMP e - atomically $ removePendingSubscription c srv connId - pure Nothing + resubscribe :: Map ConnId RcvQueue -> m () + resubscribe qs = do + (errs, oks) <- M.mapEither id <$> subscribeQueues c srv qs + liftIO . unless (M.null oks) . notifySub "" . UP srv $ M.keys oks + let (tempErrs, finalErrs) = M.partition temporaryAgentError errs + liftIO . mapM_ (\(connId, e) -> notifySub connId $ ERR e) $ M.assocs finalErrs + mapM_ throwError . listToMaybe $ M.elems tempErrs notifySub :: ConnId -> ACommand 'Agent -> IO () notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, cmd) @@ -489,11 +473,23 @@ processSubResult :: AgentClient -> RcvQueue -> ConnId -> Either ProtocolClientEr processSubResult c rq@RcvQueue {server} connId r = do case r of Left e -> - atomically . when (e /= PCENetworkError && e /= PCEResponseTimeout) $ + atomically . unless (temporaryClientError e) $ removePendingSubscription c server connId _ -> addSubscription c rq connId pure r +temporaryClientError :: ProtocolClientError -> Bool +temporaryClientError = \case + PCENetworkError -> True + PCEResponseTimeout -> True + _ -> False + +temporaryAgentError :: AgentErrorType -> Bool +temporaryAgentError = \case + BROKER NETWORK -> True + BROKER TIMEOUT -> True + _ -> False + -- | subscribe multiple queues - all passed queues should be on the same server subscribeQueues :: AgentMonad m => AgentClient -> SMPServer -> Map ConnId RcvQueue -> m (Map ConnId (Either AgentErrorType ())) subscribeQueues c srv qs = do diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 6dd481ee9..4dd7cad75 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -67,7 +67,6 @@ data AgentConfig = AgentConfig ntfCfg :: ProtocolClientConfig, reconnectInterval :: RetryInterval, helloTimeout :: NominalDiffTime, - resubscriptionConcurrency :: Int, ntfCron :: Word16, ntfWorkerDelay :: Int, ntfSMPWorkerDelay :: Int, @@ -103,7 +102,6 @@ defaultAgentConfig = ntfCfg = defaultClientConfig {defaultTransport = ("443", transport @TLS)}, reconnectInterval = defaultReconnectInterval, helloTimeout = 2 * nominalDay, - resubscriptionConcurrency = 16, ntfCron = 20, -- minutes ntfWorkerDelay = 100000, -- microseconds ntfSMPWorkerDelay = 500000, -- microseconds diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 17716cf82..104a8c8a6 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -80,6 +80,7 @@ cfg = inactiveClientExpiration = Just defaultInactiveClientExpiration, logStatsInterval = Nothing, logStatsStartTime = 0, + serverStatsLogFile = "tests/smp-server-stats.daily.log", serverStatsBackupFile = Nothing, caCertificateFile = "tests/fixtures/ca.crt", privateKeyFile = "tests/fixtures/server.key",