mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
transport: increase client timeouts, don't send command after timeout (#1110)
* transport: fix client handshake timeouts * fix handshake timeout * skip sending requests for timed out responses * expose batch concurrency as PClient field * move to NetworkConfig * remove Request on timeout * use record --------- Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
committed by
GitHub
parent
3ba3172aaf
commit
b98fdb672d
@@ -702,8 +702,8 @@ getXFTPServerClient c@AgentClient {active, xftpClients, workerSeq} tSess@(userId
|
||||
|
||||
waitForProtocolClient :: ProtocolTypeI (ProtoType msg) => AgentClient -> TransportSession msg -> ClientVar msg -> AM (Client msg)
|
||||
waitForProtocolClient c (_, srv, _) v = do
|
||||
NetworkConfig {tcpConnectTimeout} <- atomically $ getNetworkConfig c
|
||||
client_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v)
|
||||
NetworkConfig {tcpConnectTimeout, tcpTimeout} <- atomically $ getNetworkConfig c
|
||||
client_ <- liftIO $ (tcpConnectTimeout + tcpTimeout) `timeout` atomically (readTMVar $ sessionVar v)
|
||||
liftEither $ case client_ of
|
||||
Just (Right smpClient) -> Right smpClient
|
||||
Just (Left e) -> Left e
|
||||
|
||||
@@ -107,12 +107,13 @@ import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Transport.Client (SocksProxy, TransportClientConfig (..), TransportHost (..), runTransportClient)
|
||||
import Simplex.Messaging.Transport.Client (SocksProxy, TransportClientConfig (..), TransportHost (..), defaultTcpConnectTimeout, runTransportClient)
|
||||
import Simplex.Messaging.Transport.KeepAlive
|
||||
import Simplex.Messaging.Transport.WebSockets (WS)
|
||||
import Simplex.Messaging.Util (bshow, raceAny_, threadDelay')
|
||||
import Simplex.Messaging.Util (bshow, raceAny_, threadDelay', whenM)
|
||||
import Simplex.Messaging.Version
|
||||
import System.Timeout (timeout)
|
||||
import UnliftIO (pooledMapConcurrentlyN)
|
||||
|
||||
-- | 'SMPClient' is a handle used to send commands to a specific SMP server.
|
||||
--
|
||||
@@ -129,10 +130,11 @@ data PClient v err msg = PClient
|
||||
transportSession :: TransportSession msg,
|
||||
transportHost :: TransportHost,
|
||||
tcpTimeout :: Int,
|
||||
rcvConcurrency :: Int,
|
||||
pingErrorCount :: TVar Int,
|
||||
clientCorrId :: TVar ChaChaDRG,
|
||||
sentCommands :: TMap CorrId (Request err msg),
|
||||
sndQ :: TBQueue ByteString,
|
||||
sndQ :: TBQueue (TVar Bool, ByteString),
|
||||
rcvQ :: TBQueue (NonEmpty (SignedTransmission err msg)),
|
||||
msgQ :: Maybe (TBQueue (ServerTransmission v msg))
|
||||
}
|
||||
@@ -164,6 +166,7 @@ smpClientStub g sessionId thVersion thAuth = do
|
||||
transportSession = (1, "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5001", Nothing),
|
||||
transportHost = "localhost",
|
||||
tcpTimeout = 15_000_000,
|
||||
rcvConcurrency = 8,
|
||||
pingErrorCount,
|
||||
clientCorrId,
|
||||
sentCommands,
|
||||
@@ -206,6 +209,8 @@ data NetworkConfig = NetworkConfig
|
||||
tcpTimeout :: Int,
|
||||
-- | additional timeout per kilobyte (1024 bytes) to be sent
|
||||
tcpTimeoutPerKb :: Int64,
|
||||
-- | break response timeouts into groups, so later responses get later deadlines
|
||||
rcvConcurrency :: Int,
|
||||
-- | TCP keep-alive options, Nothing to skip enabling keep-alive
|
||||
tcpKeepAlive :: Maybe KeepAliveOpts,
|
||||
-- | period for SMP ping commands (microseconds, 0 to disable)
|
||||
@@ -226,9 +231,10 @@ defaultNetworkConfig =
|
||||
hostMode = HMOnionViaSocks,
|
||||
requiredHostMode = False,
|
||||
sessionMode = TSMUser,
|
||||
tcpConnectTimeout = 20_000_000,
|
||||
tcpConnectTimeout = defaultTcpConnectTimeout,
|
||||
tcpTimeout = 15_000_000,
|
||||
tcpTimeoutPerKb = 5_000,
|
||||
rcvConcurrency = 8,
|
||||
tcpKeepAlive = Just defaultKeepAliveOpts,
|
||||
smpPingInterval = 600_000_000, -- 10min
|
||||
smpPingCount = 3,
|
||||
@@ -236,8 +242,8 @@ defaultNetworkConfig =
|
||||
}
|
||||
|
||||
transportClientConfig :: NetworkConfig -> TransportClientConfig
|
||||
transportClientConfig NetworkConfig {socksProxy, tcpKeepAlive, logTLSErrors} =
|
||||
TransportClientConfig {socksProxy, tcpKeepAlive, logTLSErrors, clientCredentials = Nothing, alpn = Nothing}
|
||||
transportClientConfig NetworkConfig {socksProxy, tcpConnectTimeout, tcpKeepAlive, logTLSErrors} =
|
||||
TransportClientConfig {socksProxy, tcpConnectTimeout, tcpKeepAlive, logTLSErrors, clientCredentials = Nothing, alpn = Nothing}
|
||||
{-# INLINE transportClientConfig #-}
|
||||
|
||||
-- | protocol client configuration.
|
||||
@@ -271,7 +277,8 @@ defaultSMPClientConfig = defaultClientConfig supportedClientSMPRelayVRange
|
||||
{-# INLINE defaultSMPClientConfig #-}
|
||||
|
||||
data Request err msg = Request
|
||||
{ entityId :: EntityId,
|
||||
{ corrId :: CorrId,
|
||||
entityId :: EntityId,
|
||||
responseVar :: TMVar (Either (ProtocolClientError err) msg)
|
||||
}
|
||||
|
||||
@@ -326,7 +333,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
|
||||
`catch` \(e :: IOException) -> pure . Left $ PCEIOError e
|
||||
Left e -> pure $ Left e
|
||||
where
|
||||
NetworkConfig {tcpConnectTimeout, tcpTimeout, smpPingInterval} = networkConfig
|
||||
NetworkConfig {tcpConnectTimeout, tcpTimeout, rcvConcurrency, smpPingInterval} = networkConfig
|
||||
mkProtocolClient :: TransportHost -> STM (PClient v err msg)
|
||||
mkProtocolClient transportHost = do
|
||||
connected <- newTVar False
|
||||
@@ -344,6 +351,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
|
||||
pingErrorCount,
|
||||
clientCorrId,
|
||||
sentCommands,
|
||||
rcvConcurrency,
|
||||
sndQ,
|
||||
rcvQ,
|
||||
msgQ
|
||||
@@ -358,7 +366,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
|
||||
async $
|
||||
runTransportClient tcConfig (Just username) useHost port' (Just $ keyHash srv) (client t c cVar)
|
||||
`finally` atomically (tryPutTMVar cVar $ Left PCENetworkError)
|
||||
c_ <- tcpConnectTimeout `timeout` atomically (takeTMVar cVar)
|
||||
c_ <- (tcpConnectTimeout + tcpTimeout) `timeout` atomically (takeTMVar cVar)
|
||||
case c_ of
|
||||
Just (Right c') -> pure $ Right c' {action = Just action}
|
||||
Just (Left e) -> pure $ Left e
|
||||
@@ -385,7 +393,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
|
||||
`finally` disconnected c'
|
||||
|
||||
send :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO ()
|
||||
send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= tPutLog h
|
||||
send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= \(active, s) -> whenM (readTVarIO active) (void $ tPutLog h s)
|
||||
|
||||
receive :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO ()
|
||||
receive ProtocolClient {client_ = PClient {rcvQ}} h = forever $ tGet h >>= atomically . writeTBQueue rcvQ
|
||||
@@ -674,19 +682,21 @@ streamProtocolCommands c@ProtocolClient {thParams = THandleParams {batch, blockS
|
||||
mapM_ (cb <=< sendBatch c) bs
|
||||
|
||||
sendBatch :: ProtocolClient v err msg -> TransportBatch (Request err msg) -> IO [Response err msg]
|
||||
sendBatch c@ProtocolClient {client_ = PClient {sndQ}} b = do
|
||||
sendBatch c@ProtocolClient {client_ = PClient {rcvConcurrency, sndQ}} b = do
|
||||
case b of
|
||||
TBError e Request {entityId} -> do
|
||||
putStrLn "send error: large message"
|
||||
pure [Response entityId $ Left $ PCETransportError e]
|
||||
TBTransmissions s n rs
|
||||
| n > 0 -> do
|
||||
atomically $ writeTBQueue sndQ s
|
||||
mapConcurrently (getResponse c) rs
|
||||
active <- newTVarIO True
|
||||
atomically $ writeTBQueue sndQ (active, s)
|
||||
pooledMapConcurrentlyN rcvConcurrency (getResponse c active) rs
|
||||
| otherwise -> pure []
|
||||
TBTransmission s r -> do
|
||||
atomically $ writeTBQueue sndQ s
|
||||
(: []) <$> getResponse c r
|
||||
active <- newTVarIO True
|
||||
atomically $ writeTBQueue sndQ (active, s)
|
||||
(: []) <$> getResponse c active r
|
||||
|
||||
-- | Send Protocol command
|
||||
sendProtocolCommand :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg
|
||||
@@ -699,19 +709,22 @@ sendProtocolCommand c@ProtocolClient {client_ = PClient {sndQ}, thParams = THand
|
||||
Left e -> pure . Left $ PCETransportError e
|
||||
Right t
|
||||
| B.length s > blockSize - 2 -> pure . Left $ PCETransportError TELargeMsg
|
||||
| otherwise -> atomically (writeTBQueue sndQ s) >> response <$> getResponse c r
|
||||
| otherwise -> do
|
||||
active <- newTVarIO True
|
||||
atomically (writeTBQueue sndQ (active, s))
|
||||
response <$> getResponse c active r
|
||||
where
|
||||
s
|
||||
| batch = tEncodeBatch1 t
|
||||
| otherwise = tEncode t
|
||||
|
||||
-- TODO switch to timeout or TimeManager that supports Int64
|
||||
getResponse :: ProtocolClient v err msg -> Request err msg -> IO (Response err msg)
|
||||
getResponse ProtocolClient {client_ = PClient {tcpTimeout, pingErrorCount}} Request {entityId, responseVar} = do
|
||||
getResponse :: ProtocolClient v err msg -> TVar Bool -> Request err msg -> IO (Response err msg)
|
||||
getResponse ProtocolClient {client_ = PClient {tcpTimeout, pingErrorCount, sentCommands}} active Request {corrId, entityId, responseVar} = do
|
||||
response <-
|
||||
timeout tcpTimeout (atomically (takeTMVar responseVar)) >>= \case
|
||||
Just r -> atomically (writeTVar pingErrorCount 0) $> r
|
||||
Nothing -> pure $ Left PCEResponseTimeout
|
||||
Nothing -> atomically (writeTVar active False >> TM.delete corrId sentCommands) $> Left PCEResponseTimeout
|
||||
pure Response {entityId, response}
|
||||
|
||||
mkTransmission :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> ClientCommand msg -> IO (PCTransmission err msg)
|
||||
@@ -726,7 +739,7 @@ mkTransmission ProtocolClient {thParams, client_ = PClient {clientCorrId, sentCo
|
||||
getNextCorrId = CorrId <$> C.randomBytes 24 clientCorrId -- also used as nonce
|
||||
mkRequest :: CorrId -> STM (Request err msg)
|
||||
mkRequest corrId = do
|
||||
r <- Request entId <$> newEmptyTMVar
|
||||
r <- Request corrId entId <$> newEmptyTMVar
|
||||
TM.insert corrId r sentCommands
|
||||
pure r
|
||||
|
||||
|
||||
@@ -163,8 +163,8 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ, randomDrg, wo
|
||||
|
||||
waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO SMPClient
|
||||
waitForSMPClient v = do
|
||||
let ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg
|
||||
smpClient_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v)
|
||||
let ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout, tcpTimeout}} = smpCfg agentCfg
|
||||
smpClient_ <- liftIO $ (tcpConnectTimeout + tcpTimeout) `timeout` atomically (readTMVar $ sessionVar v)
|
||||
liftEither $ case smpClient_ of
|
||||
Just (Right smpClient) -> Right smpClient
|
||||
Just (Left e) -> Left e
|
||||
|
||||
@@ -10,6 +10,7 @@ module Simplex.Messaging.Transport.Client
|
||||
runTLSTransportClient,
|
||||
smpClientHandshake,
|
||||
defaultSMPPort,
|
||||
defaultTcpConnectTimeout,
|
||||
defaultTransportClientConfig,
|
||||
defaultSocksProxy,
|
||||
TransportClientConfig (..),
|
||||
@@ -52,6 +53,7 @@ import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Transport.KeepAlive
|
||||
import Simplex.Messaging.Util (bshow, (<$?>), catchAll, tshow)
|
||||
import System.IO.Error
|
||||
import System.Timeout (timeout)
|
||||
import Text.Read (readMaybe)
|
||||
import UnliftIO.Exception (IOException)
|
||||
import qualified UnliftIO.Exception as E
|
||||
@@ -112,6 +114,7 @@ instance IsString (NonEmpty TransportHost) where fromString = parseString strDec
|
||||
|
||||
data TransportClientConfig = TransportClientConfig
|
||||
{ socksProxy :: Maybe SocksProxy,
|
||||
tcpConnectTimeout :: Int,
|
||||
tcpKeepAlive :: Maybe KeepAliveOpts,
|
||||
logTLSErrors :: Bool,
|
||||
clientCredentials :: Maybe (X.CertificateChain, T.PrivKey),
|
||||
@@ -119,8 +122,12 @@ data TransportClientConfig = TransportClientConfig
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
-- time to resolve host, connect socket, set up TLS
|
||||
defaultTcpConnectTimeout :: Int
|
||||
defaultTcpConnectTimeout = 10000000
|
||||
|
||||
defaultTransportClientConfig :: TransportClientConfig
|
||||
defaultTransportClientConfig = TransportClientConfig Nothing (Just defaultKeepAliveOpts) True Nothing Nothing
|
||||
defaultTransportClientConfig = TransportClientConfig Nothing defaultTcpConnectTimeout (Just defaultKeepAliveOpts) True Nothing Nothing
|
||||
|
||||
clientTransportConfig :: TransportClientConfig -> TransportConfig
|
||||
clientTransportConfig TransportClientConfig {logTLSErrors} =
|
||||
@@ -131,7 +138,7 @@ runTransportClient :: Transport c => TransportClientConfig -> Maybe ByteString -
|
||||
runTransportClient = runTLSTransportClient supportedParameters Nothing
|
||||
|
||||
runTLSTransportClient :: Transport c => T.Supported -> Maybe XS.CertificateStore -> TransportClientConfig -> Maybe ByteString -> TransportHost -> ServiceName -> Maybe C.KeyHash -> (c -> IO a) -> IO a
|
||||
runTLSTransportClient tlsParams caStore_ cfg@TransportClientConfig {socksProxy, tcpKeepAlive, clientCredentials, alpn} proxyUsername host port keyHash client = do
|
||||
runTLSTransportClient tlsParams caStore_ cfg@TransportClientConfig {socksProxy, tcpConnectTimeout, tcpKeepAlive, clientCredentials, alpn} proxyUsername host port keyHash client = do
|
||||
serverCert <- newEmptyTMVarIO
|
||||
let hostName = B.unpack $ strEncode host
|
||||
clientParams = mkTLSClientParams tlsParams caStore_ hostName port keyHash clientCredentials alpn serverCert
|
||||
@@ -142,13 +149,19 @@ runTLSTransportClient tlsParams caStore_ cfg@TransportClientConfig {socksProxy,
|
||||
sock <- connectTCP port
|
||||
mapM_ (setSocketKeepAlive sock) tcpKeepAlive `catchAll` \e -> logError ("Error setting TCP keep-alive" <> tshow e)
|
||||
let tCfg = clientTransportConfig cfg
|
||||
connectTLS (Just hostName) tCfg clientParams sock >>= \tls -> do
|
||||
chain <- atomically (tryTakeTMVar serverCert) >>= \case
|
||||
Nothing -> do
|
||||
logError "onServerCertificate didn't fire or failed to get cert chain"
|
||||
closeTLS tls >> error "onServerCertificate failed"
|
||||
Just c -> pure c
|
||||
getClientConnection tCfg chain tls
|
||||
timeout tcpConnectTimeout (connectTLS (Just hostName) tCfg clientParams sock) >>= \case
|
||||
Nothing -> do
|
||||
close sock
|
||||
logError "connection timed out"
|
||||
fail "connection timed out"
|
||||
Just tls -> do
|
||||
chain <-
|
||||
atomically (tryTakeTMVar serverCert) >>= \case
|
||||
Nothing -> do
|
||||
logError "onServerCertificate didn't fire or failed to get cert chain"
|
||||
closeTLS tls >> error "onServerCertificate failed"
|
||||
Just c -> pure c
|
||||
getClientConnection tCfg chain tls
|
||||
client c `E.finally` closeConnection c
|
||||
where
|
||||
hostAddr = \case
|
||||
|
||||
@@ -24,7 +24,7 @@ import Numeric.Natural (Natural)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Transport (ALPN, SessionId, TLS (tlsALPN), getServerCerts, getServerVerifyKey, tlsUniq)
|
||||
import Simplex.Messaging.Transport.Client (TransportClientConfig (..), TransportHost (..), runTLSTransportClient)
|
||||
import Simplex.Messaging.Transport.Client (TransportClientConfig (..), TransportHost (..), defaultTcpConnectTimeout, runTLSTransportClient)
|
||||
import Simplex.Messaging.Transport.HTTP2
|
||||
import Simplex.Messaging.Util (eitherToMaybe)
|
||||
import UnliftIO.STM
|
||||
@@ -71,7 +71,15 @@ defaultHTTP2ClientConfig =
|
||||
HTTP2ClientConfig
|
||||
{ qSize = 64,
|
||||
connTimeout = 10000000,
|
||||
transportConfig = TransportClientConfig Nothing Nothing True Nothing Nothing,
|
||||
transportConfig =
|
||||
TransportClientConfig
|
||||
{ socksProxy = Nothing,
|
||||
tcpConnectTimeout = defaultTcpConnectTimeout,
|
||||
tcpKeepAlive = Nothing,
|
||||
logTLSErrors = True,
|
||||
clientCredentials = Nothing,
|
||||
alpn = Nothing
|
||||
},
|
||||
bufferSize = defaultHTTP2BufferSize,
|
||||
bodyHeadSize = 16384,
|
||||
suportedTLSParams = http2TLSParams
|
||||
|
||||
@@ -34,7 +34,7 @@ import Simplex.Messaging.Agent.RetryInterval
|
||||
import Simplex.Messaging.Agent.Server (runSMPAgentBlocking)
|
||||
import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), SQLiteStore (dbNew))
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Common (withTransaction')
|
||||
import Simplex.Messaging.Client (ProtocolClientConfig (..), chooseTransportHost, defaultSMPClientConfig, defaultNetworkConfig)
|
||||
import Simplex.Messaging.Client (ProtocolClientConfig (..), chooseTransportHost, defaultNetworkConfig, defaultSMPClientConfig)
|
||||
import Simplex.Messaging.Notifications.Client (defaultNTFClientConfig)
|
||||
import Simplex.Messaging.Parsers (parseAll)
|
||||
import Simplex.Messaging.Protocol (NtfServer, ProtoServerWithAuth)
|
||||
@@ -215,7 +215,7 @@ agentCfg =
|
||||
certificateFile = "tests/fixtures/server.crt"
|
||||
}
|
||||
where
|
||||
networkConfig = defaultNetworkConfig {tcpConnectTimeout = 3_000_000, tcpTimeout = 2_000_000}
|
||||
networkConfig = defaultNetworkConfig {tcpConnectTimeout = 1_000_000, tcpTimeout = 2_000_000}
|
||||
|
||||
fastRetryInterval :: RetryInterval
|
||||
fastRetryInterval = defaultReconnectInterval {initialInterval = 50_000}
|
||||
|
||||
Reference in New Issue
Block a user