mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-05 00:16:07 +00:00
proxy: remember server connection error for some time
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
{-# LANGUAGE FlexibleInstances #-}
|
||||
{-# LANGUAGE InstanceSigs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE MultiWayIf #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
@@ -33,6 +34,7 @@ import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (listToMaybe)
|
||||
import Data.Set (Set)
|
||||
import Data.Text.Encoding
|
||||
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
|
||||
import Data.Tuple (swap)
|
||||
import Numeric.Natural
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
@@ -44,14 +46,14 @@ import Simplex.Messaging.Session
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Util (catchAll_, toChunks, ($>>=))
|
||||
import Simplex.Messaging.Util (catchAll_, ifM, toChunks, ($>>=))
|
||||
import System.Timeout (timeout)
|
||||
import UnliftIO (async)
|
||||
import UnliftIO.Exception (Exception)
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM
|
||||
|
||||
type SMPClientVar = SessionVar (Either SMPClientError SMPClient)
|
||||
type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) SMPClient)
|
||||
|
||||
data SMPClientAgentEvent
|
||||
= CAConnected SMPServer
|
||||
@@ -70,6 +72,7 @@ type SMPSub = (SMPSubParty, QueueId)
|
||||
data SMPClientAgentConfig = SMPClientAgentConfig
|
||||
{ smpCfg :: ProtocolClientConfig SMPVersion,
|
||||
reconnectInterval :: RetryInterval,
|
||||
persistErrorInterval :: NominalDiffTime,
|
||||
msgQSize :: Natural,
|
||||
agentQSize :: Natural,
|
||||
agentSubsBatchSize :: Int
|
||||
@@ -85,6 +88,7 @@ defaultSMPClientAgentConfig =
|
||||
increaseAfter = 10 * second,
|
||||
maxInterval = 10 * second
|
||||
},
|
||||
persistErrorInterval = 0,
|
||||
msgQSize = 256,
|
||||
agentQSize = 256,
|
||||
agentSubsBatchSize = 900
|
||||
@@ -168,10 +172,15 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ,
|
||||
waitForSMPClient v = do
|
||||
let ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg
|
||||
smpClient_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v)
|
||||
liftEither $ case smpClient_ of
|
||||
Just (Right smpClient) -> Right smpClient
|
||||
Just (Left e) -> Left e
|
||||
Nothing -> Left PCEResponseTimeout
|
||||
case smpClient_ of
|
||||
Just (Right smpClient) -> pure smpClient
|
||||
Just (Left (e, Nothing)) -> throwE e
|
||||
Just (Left (e, Just ts)) ->
|
||||
ifM
|
||||
((ts <) <$> liftIO getCurrentTime)
|
||||
(atomically (removeSessVar v srv smpClients) >> getSMPServerClient' ca srv)
|
||||
(throwE e)
|
||||
Nothing -> throwE PCEResponseTimeout
|
||||
|
||||
newSMPClient :: SMPClientVar -> ExceptT SMPClientError IO SMPClient
|
||||
newSMPClient v = tryConnectClient pure (liftIO tryConnectAsync)
|
||||
@@ -182,15 +191,19 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ,
|
||||
Right smp -> do
|
||||
logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv
|
||||
atomically $ do
|
||||
putTMVar (sessionVar v) r
|
||||
putTMVar (sessionVar v) (Right smp)
|
||||
TM.insert (sessionId $ thParams smp) smp smpSessions
|
||||
successAction smp
|
||||
Left e -> do
|
||||
if e == PCENetworkError || e == PCEResponseTimeout
|
||||
then retryAction
|
||||
else atomically $ do
|
||||
putTMVar (sessionVar v) (Left e)
|
||||
removeSessVar v srv smpClients
|
||||
if
|
||||
| e == PCENetworkError || e == PCEResponseTimeout -> retryAction
|
||||
| persistErrorInterval agentCfg == 0 -> do
|
||||
atomically $ do
|
||||
putTMVar (sessionVar v) (Left (e, Nothing))
|
||||
removeSessVar v srv smpClients
|
||||
| otherwise -> do
|
||||
ts <- addUTCTime (persistErrorInterval agentCfg) <$> liftIO getCurrentTime
|
||||
atomically $ putTMVar (sessionVar v) (Left (e, Just ts))
|
||||
throwE e
|
||||
tryConnectAsync :: IO ()
|
||||
tryConnectAsync = do
|
||||
|
||||
Reference in New Issue
Block a user