diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index d32d59b3f..e54c6e5ff 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -2,6 +2,7 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE InstanceSigs #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} @@ -33,6 +34,7 @@ import qualified Data.Map.Strict as M import Data.Maybe (listToMaybe) import Data.Set (Set) import Data.Text.Encoding +import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime) import Data.Tuple (swap) import Numeric.Natural import Simplex.Messaging.Agent.RetryInterval @@ -44,14 +46,14 @@ import Simplex.Messaging.Session import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport -import Simplex.Messaging.Util (catchAll_, toChunks, ($>>=)) +import Simplex.Messaging.Util (catchAll_, ifM, toChunks, ($>>=)) import System.Timeout (timeout) import UnliftIO (async) import UnliftIO.Exception (Exception) import qualified UnliftIO.Exception as E import UnliftIO.STM -type SMPClientVar = SessionVar (Either SMPClientError SMPClient) +type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) SMPClient) data SMPClientAgentEvent = CAConnected SMPServer @@ -70,6 +72,7 @@ type SMPSub = (SMPSubParty, QueueId) data SMPClientAgentConfig = SMPClientAgentConfig { smpCfg :: ProtocolClientConfig SMPVersion, reconnectInterval :: RetryInterval, + persistErrorInterval :: NominalDiffTime, msgQSize :: Natural, agentQSize :: Natural, agentSubsBatchSize :: Int @@ -85,6 +88,7 @@ defaultSMPClientAgentConfig = increaseAfter = 10 * second, maxInterval = 10 * second }, + persistErrorInterval = 0, msgQSize = 256, agentQSize = 256, agentSubsBatchSize = 900 @@ -168,10 +172,15 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ, waitForSMPClient v = do let ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg smpClient_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v) - liftEither $ case smpClient_ of - Just (Right smpClient) -> Right smpClient - Just (Left e) -> Left e - Nothing -> Left PCEResponseTimeout + case smpClient_ of + Just (Right smpClient) -> pure smpClient + Just (Left (e, Nothing)) -> throwE e + Just (Left (e, Just ts)) -> + ifM + ((ts <) <$> liftIO getCurrentTime) + (atomically (removeSessVar v srv smpClients) >> getSMPServerClient' ca srv) + (throwE e) + Nothing -> throwE PCEResponseTimeout newSMPClient :: SMPClientVar -> ExceptT SMPClientError IO SMPClient newSMPClient v = tryConnectClient pure (liftIO tryConnectAsync) @@ -182,15 +191,19 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ, Right smp -> do logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv atomically $ do - putTMVar (sessionVar v) r + putTMVar (sessionVar v) (Right smp) TM.insert (sessionId $ thParams smp) smp smpSessions successAction smp Left e -> do - if e == PCENetworkError || e == PCEResponseTimeout - then retryAction - else atomically $ do - putTMVar (sessionVar v) (Left e) - removeSessVar v srv smpClients + if + | e == PCENetworkError || e == PCEResponseTimeout -> retryAction + | persistErrorInterval agentCfg == 0 -> do + atomically $ do + putTMVar (sessionVar v) (Left (e, Nothing)) + removeSessVar v srv smpClients + | otherwise -> do + ts <- addUTCTime (persistErrorInterval agentCfg) <$> liftIO getCurrentTime + atomically $ putTMVar (sessionVar v) (Left (e, Just ts)) throwE e tryConnectAsync :: IO () tryConnectAsync = do