From 66c916dbb33e625cc40cc43aa2aededab859156b Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 12 May 2024 21:12:01 +0100 Subject: [PATCH] proxy: increase client timeout for proxied commands (#1145) --- src/Simplex/Messaging/Client.hs | 35 +++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 92dd4047b..62d75c3e2 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -143,6 +143,7 @@ data PClient v err msg = PClient { connected :: TVar Bool, transportSession :: TransportSession msg, transportHost :: TransportHost, + tcpConnectTimeout :: Int, tcpTimeout :: Int, rcvConcurrency :: Int, sendPings :: TVar Bool, @@ -185,6 +186,7 @@ smpClientStub g sessionId thVersion thAuth = do { connected, transportSession = (1, "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5001", Nothing), transportHost = "localhost", + tcpConnectTimeout = 20_000_000, tcpTimeout = 15_000_000, rcvConcurrency = 8, sendPings, @@ -413,6 +415,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize { connected, transportSession, transportHost, + tcpConnectTimeout, tcpTimeout, sendPings, lastReceived, @@ -735,9 +738,9 @@ deleteSMPQueues = okSMPCommands DEL -- send PRXY :: SMPServer -> Maybe BasicAuth -> Command Sender -- receives PKEY :: SessionId -> X.CertificateChain -> X.SignedExact X.PubKey -> BrokerMsg connectSMPProxiedRelay :: SMPClient -> SMPServer -> Maybe BasicAuth -> ExceptT SMPClientError IO ProxiedRelay -connectSMPProxiedRelay c relayServ@ProtocolServer {keyHash = C.KeyHash kh} proxyAuth +connectSMPProxiedRelay c@ProtocolClient {client_ = PClient {tcpConnectTimeout, tcpTimeout}} relayServ@ProtocolServer {keyHash = C.KeyHash kh} proxyAuth | thVersion (thParams c) >= sendingProxySMPVersion = - sendSMPCommand c Nothing "" (PRXY relayServ proxyAuth) >>= \case + sendProtocolCommand_ c Nothing tOut Nothing "" (Cmd SProxiedClient (PRXY relayServ proxyAuth)) >>= \case PKEY sId vr (chain, key) -> case supportedClientSMPRelayVRange `compatibleVersion` vr of Nothing -> throwE $ transportErr TEVersion @@ -745,6 +748,7 @@ connectSMPProxiedRelay c relayServ@ProtocolServer {keyHash = C.KeyHash kh} proxy r -> throwE . PCEUnexpectedResponse $ bshow r | otherwise = throwE $ PCETransportError TEVersion where + tOut = Just $ tcpConnectTimeout + tcpTimeout transportErr = PCEProtocolError . PROXY . BROKER . TRANSPORT validateRelay :: X.CertificateChain -> X.SignedExact X.PubKey -> Either String C.PublicKeyX25519 validateRelay (X.CertificateChain cert) exact = do @@ -819,7 +823,7 @@ proxySMPMessage :: MsgFlags -> MsgBody -> ExceptT SMPClientError IO (Either ProxyClientError ()) -proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {clientCorrId = g}} (ProxiedRelay sessionId v serverKey) spKey sId flags msg = do +proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {clientCorrId = g, tcpTimeout}} (ProxiedRelay sessionId v serverKey) spKey sId flags msg = do -- prepare params let serverThAuth = (\ta -> ta {serverPeerPubKey = serverKey}) <$> thAuth proxyThParams serverThParams = smpTHParamsSetVersion v proxyThParams {sessionId, thAuth = serverThAuth} @@ -827,7 +831,7 @@ proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {c let cmdSecret = C.dh' serverKey cmdPrivKey nonce@(C.CbNonce corrId) <- liftIO . atomically $ C.randomCbNonce g -- encode - let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth serverThParams (CorrId corrId, sId, Cmd SSender $ SEND flags msg) + let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth serverThParams (CorrId corrId, sId, Cmd SSender (SEND flags msg)) auth <- liftEitherWith PCETransportError $ authTransmission serverThAuth spKey nonce tForAuth b <- case batchTransmissions (batch serverThParams) (blockSize serverThParams) [Right (auth, tToSend)] of [] -> throwE $ PCETransportError TELargeMsg @@ -836,7 +840,8 @@ proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {c TBTransmissions s _ _ : _ -> pure s et <- liftEitherWith PCECryptoError $ EncTransmission <$> C.cbEncrypt cmdSecret nonce b paddedProxiedMsgLength -- proxy interaction errors are wrapped - tryE (sendProtocolCommand_ c (Just nonce) Nothing sessionId (Cmd SProxiedClient (PFWD v cmdPubKey et))) >>= \case + let tOut = Just $ 2 * tcpTimeout + tryE (sendProtocolCommand_ c (Just nonce) tOut Nothing sessionId (Cmd SProxiedClient (PFWD v cmdPubKey et))) >>= \case Right r -> case r of PRES (EncResponse er) -> do -- server interaction errors are thrown directly @@ -872,7 +877,7 @@ forwardSMPMessage c@ProtocolClient {thParams, client_ = PClient {clientCorrId = let fwdT = FwdTransmission {fwdCorrId, fwdVersion, fwdKey, fwdTransmission} eft = EncFwdTransmission $ C.cbEncryptNoPad sessSecret nonce (smpEncode fwdT) -- send - sendProtocolCommand_ c (Just nonce) Nothing "" (Cmd SSender (RFWD eft)) >>= \case + sendProtocolCommand_ c (Just nonce) Nothing Nothing "" (Cmd SSender (RFWD eft)) >>= \case RRES (EncFwdResponse efr) -> do -- unwrap r' <- liftEitherWith PCECryptoError $ C.cbDecryptNoPad sessSecret (C.reverseNonce nonce) efr @@ -936,19 +941,19 @@ sendBatch c@ProtocolClient {client_ = PClient {sndQ}} b = do | n > 0 -> do active <- newTVarIO True atomically $ writeTBQueue sndQ (active, s) - mapConcurrently (getResponse c active) rs + mapConcurrently (getResponse c Nothing active) rs | otherwise -> pure [] TBTransmission s r -> do active <- newTVarIO True atomically $ writeTBQueue sndQ (active, s) - (: []) <$> getResponse c active r + (: []) <$> getResponse c Nothing active r -- | Send Protocol command sendProtocolCommand :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg -sendProtocolCommand c = sendProtocolCommand_ c Nothing +sendProtocolCommand c = sendProtocolCommand_ c Nothing Nothing -sendProtocolCommand_ :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> Maybe C.CbNonce -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg -sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} nonce_ pKey entId cmd = +sendProtocolCommand_ :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> Maybe C.CbNonce -> Maybe Int -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg +sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} nonce_ tOut pKey entId cmd = ExceptT $ uncurry sendRecv =<< mkTransmission_ c nonce_ (pKey, entId, cmd) where -- two separate "atomically" needed to avoid blocking @@ -960,17 +965,17 @@ sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THan | otherwise -> do active <- newTVarIO True atomically (writeTBQueue sndQ (active, s)) - response <$> getResponse c active r + response <$> getResponse c tOut active r where s | batch = tEncodeBatch1 t | otherwise = tEncode t -- TODO switch to timeout or TimeManager that supports Int64 -getResponse :: ProtocolClient v err msg -> TVar Bool -> Request err msg -> IO (Response err msg) -getResponse ProtocolClient {client_ = PClient {tcpTimeout, timeoutErrorCount, sentCommands}} active Request {corrId, entityId, responseVar} = do +getResponse :: ProtocolClient v err msg -> Maybe Int -> TVar Bool -> Request err msg -> IO (Response err msg) +getResponse ProtocolClient {client_ = PClient {tcpTimeout, timeoutErrorCount, sentCommands}} tOut active Request {corrId, entityId, responseVar} = do response <- - timeout tcpTimeout (atomically (takeTMVar responseVar)) >>= \case + fromMaybe tcpTimeout tOut `timeout` atomically (takeTMVar responseVar) >>= \case Just r -> atomically (writeTVar timeoutErrorCount 0) $> r Nothing -> do atomically (writeTVar active False >> TM.delete corrId sentCommands)