diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 27223b12f..41a7b60c1 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -702,8 +702,8 @@ getXFTPServerClient c@AgentClient {active, xftpClients, workerSeq} tSess@(userId waitForProtocolClient :: ProtocolTypeI (ProtoType msg) => AgentClient -> TransportSession msg -> ClientVar msg -> AM (Client msg) waitForProtocolClient c (_, srv, _) v = do - NetworkConfig {tcpConnectTimeout} <- atomically $ getNetworkConfig c - client_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v) + NetworkConfig {tcpConnectTimeout, tcpTimeout} <- atomically $ getNetworkConfig c + client_ <- liftIO $ (tcpConnectTimeout + tcpTimeout) `timeout` atomically (readTMVar $ sessionVar v) liftEither $ case client_ of Just (Right smpClient) -> Right smpClient Just (Left e) -> Left e diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 60ec0e25f..6d0e16865 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -107,12 +107,13 @@ import Simplex.Messaging.Protocol import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport -import Simplex.Messaging.Transport.Client (SocksProxy, TransportClientConfig (..), TransportHost (..), runTransportClient) +import Simplex.Messaging.Transport.Client (SocksProxy, TransportClientConfig (..), TransportHost (..), defaultTcpConnectTimeout, runTransportClient) import Simplex.Messaging.Transport.KeepAlive import Simplex.Messaging.Transport.WebSockets (WS) -import Simplex.Messaging.Util (bshow, raceAny_, threadDelay') +import Simplex.Messaging.Util (bshow, raceAny_, threadDelay', whenM) import Simplex.Messaging.Version import System.Timeout (timeout) +import UnliftIO (pooledMapConcurrentlyN) -- | 'SMPClient' is a handle used to send commands to a specific SMP server. -- @@ -129,10 +130,11 @@ data PClient v err msg = PClient transportSession :: TransportSession msg, transportHost :: TransportHost, tcpTimeout :: Int, + rcvConcurrency :: Int, pingErrorCount :: TVar Int, clientCorrId :: TVar ChaChaDRG, sentCommands :: TMap CorrId (Request err msg), - sndQ :: TBQueue ByteString, + sndQ :: TBQueue (TVar Bool, ByteString), rcvQ :: TBQueue (NonEmpty (SignedTransmission err msg)), msgQ :: Maybe (TBQueue (ServerTransmission v msg)) } @@ -164,6 +166,7 @@ smpClientStub g sessionId thVersion thAuth = do transportSession = (1, "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5001", Nothing), transportHost = "localhost", tcpTimeout = 15_000_000, + rcvConcurrency = 8, pingErrorCount, clientCorrId, sentCommands, @@ -206,6 +209,8 @@ data NetworkConfig = NetworkConfig tcpTimeout :: Int, -- | additional timeout per kilobyte (1024 bytes) to be sent tcpTimeoutPerKb :: Int64, + -- | break response timeouts into groups, so later responses get later deadlines + rcvConcurrency :: Int, -- | TCP keep-alive options, Nothing to skip enabling keep-alive tcpKeepAlive :: Maybe KeepAliveOpts, -- | period for SMP ping commands (microseconds, 0 to disable) @@ -226,9 +231,10 @@ defaultNetworkConfig = hostMode = HMOnionViaSocks, requiredHostMode = False, sessionMode = TSMUser, - tcpConnectTimeout = 20_000_000, + tcpConnectTimeout = defaultTcpConnectTimeout, tcpTimeout = 15_000_000, tcpTimeoutPerKb = 5_000, + rcvConcurrency = 8, tcpKeepAlive = Just defaultKeepAliveOpts, smpPingInterval = 600_000_000, -- 10min smpPingCount = 3, @@ -236,8 +242,8 @@ defaultNetworkConfig = } transportClientConfig :: NetworkConfig -> TransportClientConfig -transportClientConfig NetworkConfig {socksProxy, tcpKeepAlive, logTLSErrors} = - TransportClientConfig {socksProxy, tcpKeepAlive, logTLSErrors, clientCredentials = Nothing, alpn = Nothing} +transportClientConfig NetworkConfig {socksProxy, tcpConnectTimeout, tcpKeepAlive, logTLSErrors} = + TransportClientConfig {socksProxy, tcpConnectTimeout, tcpKeepAlive, logTLSErrors, clientCredentials = Nothing, alpn = Nothing} {-# INLINE transportClientConfig #-} -- | protocol client configuration. @@ -271,7 +277,8 @@ defaultSMPClientConfig = defaultClientConfig supportedClientSMPRelayVRange {-# INLINE defaultSMPClientConfig #-} data Request err msg = Request - { entityId :: EntityId, + { corrId :: CorrId, + entityId :: EntityId, responseVar :: TMVar (Either (ProtocolClientError err) msg) } @@ -326,7 +333,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize `catch` \(e :: IOException) -> pure . Left $ PCEIOError e Left e -> pure $ Left e where - NetworkConfig {tcpConnectTimeout, tcpTimeout, smpPingInterval} = networkConfig + NetworkConfig {tcpConnectTimeout, tcpTimeout, rcvConcurrency, smpPingInterval} = networkConfig mkProtocolClient :: TransportHost -> STM (PClient v err msg) mkProtocolClient transportHost = do connected <- newTVar False @@ -344,6 +351,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize pingErrorCount, clientCorrId, sentCommands, + rcvConcurrency, sndQ, rcvQ, msgQ @@ -358,7 +366,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize async $ runTransportClient tcConfig (Just username) useHost port' (Just $ keyHash srv) (client t c cVar) `finally` atomically (tryPutTMVar cVar $ Left PCENetworkError) - c_ <- tcpConnectTimeout `timeout` atomically (takeTMVar cVar) + c_ <- (tcpConnectTimeout + tcpTimeout) `timeout` atomically (takeTMVar cVar) case c_ of Just (Right c') -> pure $ Right c' {action = Just action} Just (Left e) -> pure $ Left e @@ -385,7 +393,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize `finally` disconnected c' send :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO () - send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= tPutLog h + send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= \(active, s) -> whenM (readTVarIO active) (void $ tPutLog h s) receive :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO () receive ProtocolClient {client_ = PClient {rcvQ}} h = forever $ tGet h >>= atomically . writeTBQueue rcvQ @@ -674,19 +682,21 @@ streamProtocolCommands c@ProtocolClient {thParams = THandleParams {batch, blockS mapM_ (cb <=< sendBatch c) bs sendBatch :: ProtocolClient v err msg -> TransportBatch (Request err msg) -> IO [Response err msg] -sendBatch c@ProtocolClient {client_ = PClient {sndQ}} b = do +sendBatch c@ProtocolClient {client_ = PClient {rcvConcurrency, sndQ}} b = do case b of TBError e Request {entityId} -> do putStrLn "send error: large message" pure [Response entityId $ Left $ PCETransportError e] TBTransmissions s n rs | n > 0 -> do - atomically $ writeTBQueue sndQ s - mapConcurrently (getResponse c) rs + active <- newTVarIO True + atomically $ writeTBQueue sndQ (active, s) + pooledMapConcurrentlyN rcvConcurrency (getResponse c active) rs | otherwise -> pure [] TBTransmission s r -> do - atomically $ writeTBQueue sndQ s - (: []) <$> getResponse c r + active <- newTVarIO True + atomically $ writeTBQueue sndQ (active, s) + (: []) <$> getResponse c active r -- | Send Protocol command sendProtocolCommand :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg @@ -699,19 +709,22 @@ sendProtocolCommand c@ProtocolClient {client_ = PClient {sndQ}, thParams = THand Left e -> pure . Left $ PCETransportError e Right t | B.length s > blockSize - 2 -> pure . Left $ PCETransportError TELargeMsg - | otherwise -> atomically (writeTBQueue sndQ s) >> response <$> getResponse c r + | otherwise -> do + active <- newTVarIO True + atomically (writeTBQueue sndQ (active, s)) + response <$> getResponse c active r where s | batch = tEncodeBatch1 t | otherwise = tEncode t -- TODO switch to timeout or TimeManager that supports Int64 -getResponse :: ProtocolClient v err msg -> Request err msg -> IO (Response err msg) -getResponse ProtocolClient {client_ = PClient {tcpTimeout, pingErrorCount}} Request {entityId, responseVar} = do +getResponse :: ProtocolClient v err msg -> TVar Bool -> Request err msg -> IO (Response err msg) +getResponse ProtocolClient {client_ = PClient {tcpTimeout, pingErrorCount, sentCommands}} active Request {corrId, entityId, responseVar} = do response <- timeout tcpTimeout (atomically (takeTMVar responseVar)) >>= \case Just r -> atomically (writeTVar pingErrorCount 0) $> r - Nothing -> pure $ Left PCEResponseTimeout + Nothing -> atomically (writeTVar active False >> TM.delete corrId sentCommands) $> Left PCEResponseTimeout pure Response {entityId, response} mkTransmission :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> ClientCommand msg -> IO (PCTransmission err msg) @@ -726,7 +739,7 @@ mkTransmission ProtocolClient {thParams, client_ = PClient {clientCorrId, sentCo getNextCorrId = CorrId <$> C.randomBytes 24 clientCorrId -- also used as nonce mkRequest :: CorrId -> STM (Request err msg) mkRequest corrId = do - r <- Request entId <$> newEmptyTMVar + r <- Request corrId entId <$> newEmptyTMVar TM.insert corrId r sentCommands pure r diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index 4b925c6f6..c93fe06e5 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -163,8 +163,8 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ, randomDrg, wo waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO SMPClient waitForSMPClient v = do - let ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg - smpClient_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v) + let ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout, tcpTimeout}} = smpCfg agentCfg + smpClient_ <- liftIO $ (tcpConnectTimeout + tcpTimeout) `timeout` atomically (readTMVar $ sessionVar v) liftEither $ case smpClient_ of Just (Right smpClient) -> Right smpClient Just (Left e) -> Left e diff --git a/src/Simplex/Messaging/Transport/Client.hs b/src/Simplex/Messaging/Transport/Client.hs index daea3982e..3d5188348 100644 --- a/src/Simplex/Messaging/Transport/Client.hs +++ b/src/Simplex/Messaging/Transport/Client.hs @@ -10,6 +10,7 @@ module Simplex.Messaging.Transport.Client runTLSTransportClient, smpClientHandshake, defaultSMPPort, + defaultTcpConnectTimeout, defaultTransportClientConfig, defaultSocksProxy, TransportClientConfig (..), @@ -52,6 +53,7 @@ import Simplex.Messaging.Transport import Simplex.Messaging.Transport.KeepAlive import Simplex.Messaging.Util (bshow, (<$?>), catchAll, tshow) import System.IO.Error +import System.Timeout (timeout) import Text.Read (readMaybe) import UnliftIO.Exception (IOException) import qualified UnliftIO.Exception as E @@ -112,6 +114,7 @@ instance IsString (NonEmpty TransportHost) where fromString = parseString strDec data TransportClientConfig = TransportClientConfig { socksProxy :: Maybe SocksProxy, + tcpConnectTimeout :: Int, tcpKeepAlive :: Maybe KeepAliveOpts, logTLSErrors :: Bool, clientCredentials :: Maybe (X.CertificateChain, T.PrivKey), @@ -119,8 +122,12 @@ data TransportClientConfig = TransportClientConfig } deriving (Eq, Show) +-- time to resolve host, connect socket, set up TLS +defaultTcpConnectTimeout :: Int +defaultTcpConnectTimeout = 10000000 + defaultTransportClientConfig :: TransportClientConfig -defaultTransportClientConfig = TransportClientConfig Nothing (Just defaultKeepAliveOpts) True Nothing Nothing +defaultTransportClientConfig = TransportClientConfig Nothing defaultTcpConnectTimeout (Just defaultKeepAliveOpts) True Nothing Nothing clientTransportConfig :: TransportClientConfig -> TransportConfig clientTransportConfig TransportClientConfig {logTLSErrors} = @@ -131,7 +138,7 @@ runTransportClient :: Transport c => TransportClientConfig -> Maybe ByteString - runTransportClient = runTLSTransportClient supportedParameters Nothing runTLSTransportClient :: Transport c => T.Supported -> Maybe XS.CertificateStore -> TransportClientConfig -> Maybe ByteString -> TransportHost -> ServiceName -> Maybe C.KeyHash -> (c -> IO a) -> IO a -runTLSTransportClient tlsParams caStore_ cfg@TransportClientConfig {socksProxy, tcpKeepAlive, clientCredentials, alpn} proxyUsername host port keyHash client = do +runTLSTransportClient tlsParams caStore_ cfg@TransportClientConfig {socksProxy, tcpConnectTimeout, tcpKeepAlive, clientCredentials, alpn} proxyUsername host port keyHash client = do serverCert <- newEmptyTMVarIO let hostName = B.unpack $ strEncode host clientParams = mkTLSClientParams tlsParams caStore_ hostName port keyHash clientCredentials alpn serverCert @@ -142,13 +149,19 @@ runTLSTransportClient tlsParams caStore_ cfg@TransportClientConfig {socksProxy, sock <- connectTCP port mapM_ (setSocketKeepAlive sock) tcpKeepAlive `catchAll` \e -> logError ("Error setting TCP keep-alive" <> tshow e) let tCfg = clientTransportConfig cfg - connectTLS (Just hostName) tCfg clientParams sock >>= \tls -> do - chain <- atomically (tryTakeTMVar serverCert) >>= \case - Nothing -> do - logError "onServerCertificate didn't fire or failed to get cert chain" - closeTLS tls >> error "onServerCertificate failed" - Just c -> pure c - getClientConnection tCfg chain tls + timeout tcpConnectTimeout (connectTLS (Just hostName) tCfg clientParams sock) >>= \case + Nothing -> do + close sock + logError "connection timed out" + fail "connection timed out" + Just tls -> do + chain <- + atomically (tryTakeTMVar serverCert) >>= \case + Nothing -> do + logError "onServerCertificate didn't fire or failed to get cert chain" + closeTLS tls >> error "onServerCertificate failed" + Just c -> pure c + getClientConnection tCfg chain tls client c `E.finally` closeConnection c where hostAddr = \case diff --git a/src/Simplex/Messaging/Transport/HTTP2/Client.hs b/src/Simplex/Messaging/Transport/HTTP2/Client.hs index b279c1805..be37a6887 100644 --- a/src/Simplex/Messaging/Transport/HTTP2/Client.hs +++ b/src/Simplex/Messaging/Transport/HTTP2/Client.hs @@ -24,7 +24,7 @@ import Numeric.Natural (Natural) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String import Simplex.Messaging.Transport (ALPN, SessionId, TLS (tlsALPN), getServerCerts, getServerVerifyKey, tlsUniq) -import Simplex.Messaging.Transport.Client (TransportClientConfig (..), TransportHost (..), runTLSTransportClient) +import Simplex.Messaging.Transport.Client (TransportClientConfig (..), TransportHost (..), defaultTcpConnectTimeout, runTLSTransportClient) import Simplex.Messaging.Transport.HTTP2 import Simplex.Messaging.Util (eitherToMaybe) import UnliftIO.STM @@ -71,7 +71,15 @@ defaultHTTP2ClientConfig = HTTP2ClientConfig { qSize = 64, connTimeout = 10000000, - transportConfig = TransportClientConfig Nothing Nothing True Nothing Nothing, + transportConfig = + TransportClientConfig + { socksProxy = Nothing, + tcpConnectTimeout = defaultTcpConnectTimeout, + tcpKeepAlive = Nothing, + logTLSErrors = True, + clientCredentials = Nothing, + alpn = Nothing + }, bufferSize = defaultHTTP2BufferSize, bodyHeadSize = 16384, suportedTLSParams = http2TLSParams diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index 59370e654..5ff9197cb 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -34,7 +34,7 @@ import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Server (runSMPAgentBlocking) import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), SQLiteStore (dbNew)) import Simplex.Messaging.Agent.Store.SQLite.Common (withTransaction') -import Simplex.Messaging.Client (ProtocolClientConfig (..), chooseTransportHost, defaultSMPClientConfig, defaultNetworkConfig) +import Simplex.Messaging.Client (ProtocolClientConfig (..), chooseTransportHost, defaultNetworkConfig, defaultSMPClientConfig) import Simplex.Messaging.Notifications.Client (defaultNTFClientConfig) import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Protocol (NtfServer, ProtoServerWithAuth) @@ -215,7 +215,7 @@ agentCfg = certificateFile = "tests/fixtures/server.crt" } where - networkConfig = defaultNetworkConfig {tcpConnectTimeout = 3_000_000, tcpTimeout = 2_000_000} + networkConfig = defaultNetworkConfig {tcpConnectTimeout = 1_000_000, tcpTimeout = 2_000_000} fastRetryInterval :: RetryInterval fastRetryInterval = defaultReconnectInterval {initialInterval = 50_000}