mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-10 17:15:52 +00:00
transport: add send-queueing timeout
This commit is contained in:
@@ -700,7 +700,7 @@ sendBatch c@ProtocolClient {client_ = PClient {rcvConcurrency, sndQ}} b = do
|
||||
|
||||
-- | Send Protocol command
|
||||
sendProtocolCommand :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg
|
||||
sendProtocolCommand c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} pKey entId cmd =
|
||||
sendProtocolCommand c@ProtocolClient {client_ = PClient {tcpTimeout, sndQ}, thParams = THandleParams {batch, blockSize}} pKey entId cmd =
|
||||
ExceptT $ uncurry sendRecv =<< mkTransmission c (pKey, entId, cmd)
|
||||
where
|
||||
-- two separate "atomically" needed to avoid blocking
|
||||
@@ -711,9 +711,12 @@ sendProtocolCommand c@ProtocolClient {client_ = PClient {sndQ}, thParams = THand
|
||||
| B.length s > blockSize - 2 -> pure . Left $ PCETransportError TELargeMsg
|
||||
| otherwise -> do
|
||||
active <- newTVarIO True
|
||||
atomically (writeTBQueue sndQ (active, s))
|
||||
response <$> getResponse c active r
|
||||
timeout tcpSendTimeout (atomically $ writeTBQueue sndQ (active, s)) >>= \case
|
||||
Nothing -> pure $ Left PCEResponseTimeout
|
||||
Just () -> response <$> getResponse c active r
|
||||
where
|
||||
-- TODO: move to configuration
|
||||
tcpSendTimeout = tcpTimeout * 3 -- conservative timeout, allowing some asymmetry in uplink
|
||||
s
|
||||
| batch = tEncodeBatch1 t
|
||||
| otherwise = tEncode t
|
||||
|
||||
Reference in New Issue
Block a user