mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-12 01:34:45 +00:00
return error from request
This commit is contained in:
@@ -390,8 +390,6 @@ runXFTPSndPrepareWorker c doWork = do
|
||||
else pure sndFile
|
||||
maxRecipients <- asks (xftpMaxRecipientsPerRequest . config)
|
||||
let numRecipients' = min numRecipients maxRecipients
|
||||
liftIO $ print "finished encrypting"
|
||||
threadDelay 30000000
|
||||
-- concurrently?
|
||||
forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients'
|
||||
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading
|
||||
|
||||
@@ -135,15 +135,18 @@ sendXFTPCommand XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}}
|
||||
xftpEncodeTransmission sessionId (Just pKey) ("", fId, FileCmd (sFileParty @p) cmd)
|
||||
let req = H.requestStreaming N.methodPost "/" [] $ streamBody t
|
||||
reqTimeout = (\XFTPChunkSpec {chunkSize} -> chunkTimeout config chunkSize) <$> chunkSpec_
|
||||
HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- liftEitherError xftpClientError $ sendRequest http2 req reqTimeout
|
||||
when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK
|
||||
-- TODO validate that the file ID is the same as in the request?
|
||||
(_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission sessionId bodyHead
|
||||
case respOrErr of
|
||||
Right r -> case protocolError r of
|
||||
Just e -> throwError $ PCEProtocolError e
|
||||
_ -> pure (r, body)
|
||||
Left e -> throwError $ PCEResponseError e
|
||||
res <- liftEitherError xftpClientError $ sendRequest http2 req reqTimeout
|
||||
case res of
|
||||
HTTP2RequestResponse HTTP2Response {respBody = body@HTTP2Body {bodyHead}} -> do
|
||||
when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK
|
||||
-- TODO validate that the file ID is the same as in the request?
|
||||
(_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission sessionId bodyHead
|
||||
case respOrErr of
|
||||
Right r -> case protocolError r of
|
||||
Just e -> throwError $ PCEProtocolError e
|
||||
_ -> pure (r, body)
|
||||
Left e -> throwError $ PCEResponseError e
|
||||
HTTP2RequestError e -> throwError $ PCEInternalError $ show e
|
||||
where
|
||||
streamBody :: ByteString -> (Builder -> IO ()) -> IO () -> IO ()
|
||||
streamBody t send done = do
|
||||
|
||||
@@ -702,6 +702,7 @@ protocolClientError protocolError_ host = \case
|
||||
PCETransportError e -> BROKER host $ TRANSPORT e
|
||||
e@PCECryptoError {} -> INTERNAL $ show e
|
||||
PCEIOError {} -> BROKER host NETWORK
|
||||
PCEInternalError e -> INTERNAL e
|
||||
|
||||
data ProtocolTestStep
|
||||
= TSConnect
|
||||
|
||||
@@ -417,6 +417,8 @@ data ProtocolClientError err
|
||||
PCECryptoError C.CryptoError
|
||||
| -- | IO Error
|
||||
PCEIOError IOException
|
||||
| -- | Internal error
|
||||
PCEInternalError String
|
||||
deriving (Eq, Show, Exception)
|
||||
|
||||
type SMPClientError = ProtocolClientError ErrorType
|
||||
|
||||
@@ -233,6 +233,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
|
||||
PCEProtocolError AUTH -> updateSubStatus smpQueue NSAuth
|
||||
PCEProtocolError e -> updateErr "SMP error " e
|
||||
PCEIOError e -> updateErr "IOError " e
|
||||
PCEInternalError e -> updateErr "InternalError " e
|
||||
PCEResponseError e -> updateErr "ResponseError " e
|
||||
PCEUnexpectedResponse r -> updateErr "UnexpectedResponse " r
|
||||
PCETransportError e -> updateErr "TransportError " e
|
||||
|
||||
@@ -344,11 +344,14 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknData {toke
|
||||
apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn
|
||||
req <- liftIO $ apnsRequest c tknStr apnsNtf
|
||||
-- TODO when HTTP2 client is thread-safe, we can use sendRequestDirect
|
||||
HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req Nothing
|
||||
let status = H.responseStatus response
|
||||
reason' = maybe "" reason $ J.decodeStrict' bodyHead
|
||||
logDebug $ "APNS response: " <> T.pack (show status) <> " " <> reason'
|
||||
result status reason'
|
||||
res <- liftHTTPS2 $ sendRequest http2 req Nothing
|
||||
case res of
|
||||
HTTP2RequestResponse HTTP2Response {response, respBody = HTTP2Body {bodyHead}} -> do
|
||||
let status = H.responseStatus response
|
||||
reason' = maybe "" reason $ J.decodeStrict' bodyHead
|
||||
logDebug $ "APNS response: " <> T.pack (show status) <> " " <> reason'
|
||||
result status reason'
|
||||
HTTP2RequestError _e -> throwError PPPermanentError
|
||||
where
|
||||
result :: Maybe Status -> Text -> ExceptT PushProviderError IO ()
|
||||
result status reason'
|
||||
|
||||
@@ -29,10 +29,10 @@ import UnliftIO.STM
|
||||
import UnliftIO.Timeout
|
||||
|
||||
data HTTP2Client = HTTP2Client
|
||||
{ action :: Maybe (Async HTTP2Response),
|
||||
{ action :: Maybe (Async HTTP2RequestResult),
|
||||
sessionId :: SessionId,
|
||||
sessionTs :: UTCTime,
|
||||
sendReq :: Request -> (Response -> IO HTTP2Response) -> IO HTTP2Response,
|
||||
sendReq :: Request -> (Response -> IO HTTP2RequestResult) -> IO HTTP2RequestResult,
|
||||
client_ :: HClient
|
||||
}
|
||||
|
||||
@@ -42,9 +42,13 @@ data HClient = HClient
|
||||
host :: TransportHost,
|
||||
port :: ServiceName,
|
||||
config :: HTTP2ClientConfig,
|
||||
reqQ :: TBQueue (Request, TMVar HTTP2Response)
|
||||
reqQ :: TBQueue (Request, TMVar HTTP2RequestResult)
|
||||
}
|
||||
|
||||
data HTTP2RequestResult
|
||||
= HTTP2RequestResponse HTTP2Response
|
||||
| HTTP2RequestError E.SomeException
|
||||
|
||||
data HTTP2Response = HTTP2Response
|
||||
{ response :: Response,
|
||||
respBody :: HTTP2Body
|
||||
@@ -101,40 +105,42 @@ getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2Clien
|
||||
Just (Left e) -> Left e
|
||||
Nothing -> Left HCNetworkError
|
||||
|
||||
client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client HTTP2Response
|
||||
client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client HTTP2RequestResult
|
||||
client c cVar sessionId sendReq = do
|
||||
sessionTs <- getCurrentTime
|
||||
let c' = HTTP2Client {action = Nothing, client_ = c, sendReq, sessionId, sessionTs}
|
||||
atomically $ do
|
||||
writeTVar (connected c) True
|
||||
putTMVar cVar (Right c')
|
||||
process c' sendReq `E.finally` (putStrLn "process error" >> disconnected)
|
||||
process c' sendReq `E.finally` disconnected
|
||||
|
||||
process :: HTTP2Client -> H.Client HTTP2Response
|
||||
process :: HTTP2Client -> H.Client HTTP2RequestResult
|
||||
process HTTP2Client {client_ = HClient {reqQ}} sendReq = forever $ do
|
||||
(req, respVar) <- atomically $ readTBQueue reqQ
|
||||
do
|
||||
( sendReq req $ \r -> do
|
||||
respBody <- getHTTP2Body r bodyHeadSize
|
||||
let resp = HTTP2Response {response = r, respBody}
|
||||
atomically $ putTMVar respVar resp
|
||||
pure resp
|
||||
)
|
||||
`E.finally` print "sendReq error"
|
||||
(req, resVar) <- atomically $ readTBQueue reqQ
|
||||
sendReq req (processResp resVar) `E.catch` \e -> do
|
||||
let res = HTTP2RequestError e
|
||||
atomically $ putTMVar resVar res
|
||||
pure res
|
||||
where
|
||||
processResp resVar r = do
|
||||
respBody <- getHTTP2Body r bodyHeadSize
|
||||
let res = HTTP2RequestResponse $ HTTP2Response {response = r, respBody}
|
||||
atomically $ putTMVar resVar res
|
||||
pure res
|
||||
|
||||
-- | Disconnects client from the server and terminates client threads.
|
||||
closeHTTP2Client :: HTTP2Client -> IO ()
|
||||
closeHTTP2Client = mapM_ uninterruptibleCancel . action
|
||||
|
||||
sendRequest :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2Response)
|
||||
sendRequest :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2RequestResult)
|
||||
sendRequest HTTP2Client {client_ = HClient {config, reqQ}} req reqTimeout_ = do
|
||||
resp <- newEmptyTMVarIO
|
||||
atomically $ writeTBQueue reqQ (req, resp)
|
||||
resVar <- newEmptyTMVarIO
|
||||
atomically $ writeTBQueue reqQ (req, resVar)
|
||||
let reqTimeout = http2RequestTimeout config reqTimeout_
|
||||
maybe (Left HCResponseTimeout) Right <$> (reqTimeout `timeout` atomically (takeTMVar resp))
|
||||
maybe (Left HCResponseTimeout) Right <$> (reqTimeout `timeout` atomically (takeTMVar resVar))
|
||||
|
||||
-- | this function should not be used until HTTP2 is thread safe, use sendRequest
|
||||
sendRequestDirect :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2Response)
|
||||
sendRequestDirect :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2RequestResult)
|
||||
sendRequestDirect HTTP2Client {client_ = HClient {config, disconnected}, sendReq} req reqTimeout_ = do
|
||||
let reqTimeout = http2RequestTimeout config reqTimeout_
|
||||
reqTimeout `timeout` try (sendReq req process) >>= \case
|
||||
@@ -144,7 +150,7 @@ sendRequestDirect HTTP2Client {client_ = HClient {config, disconnected}, sendReq
|
||||
where
|
||||
process r = do
|
||||
respBody <- getHTTP2Body r $ bodyHeadSize config
|
||||
pure HTTP2Response {response = r, respBody}
|
||||
pure $ HTTP2RequestResponse $ HTTP2Response {response = r, respBody}
|
||||
|
||||
http2RequestTimeout :: HTTP2ClientConfig -> Maybe Int -> Int
|
||||
http2RequestTimeout HTTP2ClientConfig {connTimeout} = maybe connTimeout (connTimeout +)
|
||||
|
||||
Reference in New Issue
Block a user