mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-10 23:37:00 +00:00
proxy: increase client timeout for proxied commands (#1145)
This commit is contained in:
committed by
GitHub
parent
8516b0dd5b
commit
66c916dbb3
@@ -143,6 +143,7 @@ data PClient v err msg = PClient
|
||||
{ connected :: TVar Bool,
|
||||
transportSession :: TransportSession msg,
|
||||
transportHost :: TransportHost,
|
||||
tcpConnectTimeout :: Int,
|
||||
tcpTimeout :: Int,
|
||||
rcvConcurrency :: Int,
|
||||
sendPings :: TVar Bool,
|
||||
@@ -185,6 +186,7 @@ smpClientStub g sessionId thVersion thAuth = do
|
||||
{ connected,
|
||||
transportSession = (1, "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5001", Nothing),
|
||||
transportHost = "localhost",
|
||||
tcpConnectTimeout = 20_000_000,
|
||||
tcpTimeout = 15_000_000,
|
||||
rcvConcurrency = 8,
|
||||
sendPings,
|
||||
@@ -413,6 +415,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
|
||||
{ connected,
|
||||
transportSession,
|
||||
transportHost,
|
||||
tcpConnectTimeout,
|
||||
tcpTimeout,
|
||||
sendPings,
|
||||
lastReceived,
|
||||
@@ -735,9 +738,9 @@ deleteSMPQueues = okSMPCommands DEL
|
||||
-- send PRXY :: SMPServer -> Maybe BasicAuth -> Command Sender
|
||||
-- receives PKEY :: SessionId -> X.CertificateChain -> X.SignedExact X.PubKey -> BrokerMsg
|
||||
connectSMPProxiedRelay :: SMPClient -> SMPServer -> Maybe BasicAuth -> ExceptT SMPClientError IO ProxiedRelay
|
||||
connectSMPProxiedRelay c relayServ@ProtocolServer {keyHash = C.KeyHash kh} proxyAuth
|
||||
connectSMPProxiedRelay c@ProtocolClient {client_ = PClient {tcpConnectTimeout, tcpTimeout}} relayServ@ProtocolServer {keyHash = C.KeyHash kh} proxyAuth
|
||||
| thVersion (thParams c) >= sendingProxySMPVersion =
|
||||
sendSMPCommand c Nothing "" (PRXY relayServ proxyAuth) >>= \case
|
||||
sendProtocolCommand_ c Nothing tOut Nothing "" (Cmd SProxiedClient (PRXY relayServ proxyAuth)) >>= \case
|
||||
PKEY sId vr (chain, key) ->
|
||||
case supportedClientSMPRelayVRange `compatibleVersion` vr of
|
||||
Nothing -> throwE $ transportErr TEVersion
|
||||
@@ -745,6 +748,7 @@ connectSMPProxiedRelay c relayServ@ProtocolServer {keyHash = C.KeyHash kh} proxy
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
| otherwise = throwE $ PCETransportError TEVersion
|
||||
where
|
||||
tOut = Just $ tcpConnectTimeout + tcpTimeout
|
||||
transportErr = PCEProtocolError . PROXY . BROKER . TRANSPORT
|
||||
validateRelay :: X.CertificateChain -> X.SignedExact X.PubKey -> Either String C.PublicKeyX25519
|
||||
validateRelay (X.CertificateChain cert) exact = do
|
||||
@@ -819,7 +823,7 @@ proxySMPMessage ::
|
||||
MsgFlags ->
|
||||
MsgBody ->
|
||||
ExceptT SMPClientError IO (Either ProxyClientError ())
|
||||
proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {clientCorrId = g}} (ProxiedRelay sessionId v serverKey) spKey sId flags msg = do
|
||||
proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {clientCorrId = g, tcpTimeout}} (ProxiedRelay sessionId v serverKey) spKey sId flags msg = do
|
||||
-- prepare params
|
||||
let serverThAuth = (\ta -> ta {serverPeerPubKey = serverKey}) <$> thAuth proxyThParams
|
||||
serverThParams = smpTHParamsSetVersion v proxyThParams {sessionId, thAuth = serverThAuth}
|
||||
@@ -827,7 +831,7 @@ proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {c
|
||||
let cmdSecret = C.dh' serverKey cmdPrivKey
|
||||
nonce@(C.CbNonce corrId) <- liftIO . atomically $ C.randomCbNonce g
|
||||
-- encode
|
||||
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth serverThParams (CorrId corrId, sId, Cmd SSender $ SEND flags msg)
|
||||
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth serverThParams (CorrId corrId, sId, Cmd SSender (SEND flags msg))
|
||||
auth <- liftEitherWith PCETransportError $ authTransmission serverThAuth spKey nonce tForAuth
|
||||
b <- case batchTransmissions (batch serverThParams) (blockSize serverThParams) [Right (auth, tToSend)] of
|
||||
[] -> throwE $ PCETransportError TELargeMsg
|
||||
@@ -836,7 +840,8 @@ proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {c
|
||||
TBTransmissions s _ _ : _ -> pure s
|
||||
et <- liftEitherWith PCECryptoError $ EncTransmission <$> C.cbEncrypt cmdSecret nonce b paddedProxiedMsgLength
|
||||
-- proxy interaction errors are wrapped
|
||||
tryE (sendProtocolCommand_ c (Just nonce) Nothing sessionId (Cmd SProxiedClient (PFWD v cmdPubKey et))) >>= \case
|
||||
let tOut = Just $ 2 * tcpTimeout
|
||||
tryE (sendProtocolCommand_ c (Just nonce) tOut Nothing sessionId (Cmd SProxiedClient (PFWD v cmdPubKey et))) >>= \case
|
||||
Right r -> case r of
|
||||
PRES (EncResponse er) -> do
|
||||
-- server interaction errors are thrown directly
|
||||
@@ -872,7 +877,7 @@ forwardSMPMessage c@ProtocolClient {thParams, client_ = PClient {clientCorrId =
|
||||
let fwdT = FwdTransmission {fwdCorrId, fwdVersion, fwdKey, fwdTransmission}
|
||||
eft = EncFwdTransmission $ C.cbEncryptNoPad sessSecret nonce (smpEncode fwdT)
|
||||
-- send
|
||||
sendProtocolCommand_ c (Just nonce) Nothing "" (Cmd SSender (RFWD eft)) >>= \case
|
||||
sendProtocolCommand_ c (Just nonce) Nothing Nothing "" (Cmd SSender (RFWD eft)) >>= \case
|
||||
RRES (EncFwdResponse efr) -> do
|
||||
-- unwrap
|
||||
r' <- liftEitherWith PCECryptoError $ C.cbDecryptNoPad sessSecret (C.reverseNonce nonce) efr
|
||||
@@ -936,19 +941,19 @@ sendBatch c@ProtocolClient {client_ = PClient {sndQ}} b = do
|
||||
| n > 0 -> do
|
||||
active <- newTVarIO True
|
||||
atomically $ writeTBQueue sndQ (active, s)
|
||||
mapConcurrently (getResponse c active) rs
|
||||
mapConcurrently (getResponse c Nothing active) rs
|
||||
| otherwise -> pure []
|
||||
TBTransmission s r -> do
|
||||
active <- newTVarIO True
|
||||
atomically $ writeTBQueue sndQ (active, s)
|
||||
(: []) <$> getResponse c active r
|
||||
(: []) <$> getResponse c Nothing active r
|
||||
|
||||
-- | Send Protocol command
|
||||
sendProtocolCommand :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg
|
||||
sendProtocolCommand c = sendProtocolCommand_ c Nothing
|
||||
sendProtocolCommand c = sendProtocolCommand_ c Nothing Nothing
|
||||
|
||||
sendProtocolCommand_ :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> Maybe C.CbNonce -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg
|
||||
sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} nonce_ pKey entId cmd =
|
||||
sendProtocolCommand_ :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> Maybe C.CbNonce -> Maybe Int -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg
|
||||
sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} nonce_ tOut pKey entId cmd =
|
||||
ExceptT $ uncurry sendRecv =<< mkTransmission_ c nonce_ (pKey, entId, cmd)
|
||||
where
|
||||
-- two separate "atomically" needed to avoid blocking
|
||||
@@ -960,17 +965,17 @@ sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THan
|
||||
| otherwise -> do
|
||||
active <- newTVarIO True
|
||||
atomically (writeTBQueue sndQ (active, s))
|
||||
response <$> getResponse c active r
|
||||
response <$> getResponse c tOut active r
|
||||
where
|
||||
s
|
||||
| batch = tEncodeBatch1 t
|
||||
| otherwise = tEncode t
|
||||
|
||||
-- TODO switch to timeout or TimeManager that supports Int64
|
||||
getResponse :: ProtocolClient v err msg -> TVar Bool -> Request err msg -> IO (Response err msg)
|
||||
getResponse ProtocolClient {client_ = PClient {tcpTimeout, timeoutErrorCount, sentCommands}} active Request {corrId, entityId, responseVar} = do
|
||||
getResponse :: ProtocolClient v err msg -> Maybe Int -> TVar Bool -> Request err msg -> IO (Response err msg)
|
||||
getResponse ProtocolClient {client_ = PClient {tcpTimeout, timeoutErrorCount, sentCommands}} tOut active Request {corrId, entityId, responseVar} = do
|
||||
response <-
|
||||
timeout tcpTimeout (atomically (takeTMVar responseVar)) >>= \case
|
||||
fromMaybe tcpTimeout tOut `timeout` atomically (takeTMVar responseVar) >>= \case
|
||||
Just r -> atomically (writeTVar timeoutErrorCount 0) $> r
|
||||
Nothing -> do
|
||||
atomically (writeTVar active False >> TM.delete corrId sentCommands)
|
||||
|
||||
Reference in New Issue
Block a user