mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 18:35:59 +00:00
extend HTTP2 transport (#632)
* extend HTTP2 transport * refactor caStore * HTTP2 body * enable test * remove maxBodySize
This commit is contained in:
committed by
GitHub
parent
ee2a764f93
commit
b342b1dc59
@@ -92,6 +92,7 @@ library
|
||||
Simplex.Messaging.Server.StoreLog
|
||||
Simplex.Messaging.TMap
|
||||
Simplex.Messaging.Transport
|
||||
Simplex.Messaging.Transport.Buffer
|
||||
Simplex.Messaging.Transport.Client
|
||||
Simplex.Messaging.Transport.HTTP2
|
||||
Simplex.Messaging.Transport.HTTP2.Client
|
||||
|
||||
@@ -32,10 +32,12 @@ import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import qualified Data.CaseInsensitive as CI
|
||||
import Data.Int (Int64)
|
||||
import Data.Map.Strict (Map)
|
||||
import Data.Maybe (isNothing)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Time.Clock.System
|
||||
import qualified Data.X509 as X
|
||||
import qualified Data.X509.CertificateStore as XS
|
||||
import GHC.Generics (Generic)
|
||||
import Network.HTTP.Types (HeaderName, Status)
|
||||
import qualified Network.HTTP.Types as N
|
||||
@@ -47,6 +49,7 @@ import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Notifications.Protocol
|
||||
import Simplex.Messaging.Notifications.Server.Store (NtfTknData (..))
|
||||
import Simplex.Messaging.Protocol (EncNMsgMeta)
|
||||
import Simplex.Messaging.Transport.HTTP2 (HTTP2Body (..))
|
||||
import Simplex.Messaging.Transport.HTTP2.Client
|
||||
import Simplex.Messaging.Util (safeDecodeUtf8)
|
||||
import System.Environment (getEnv)
|
||||
@@ -97,8 +100,8 @@ readECPrivateKey f = do
|
||||
data PushNotification
|
||||
= PNVerification NtfRegCode
|
||||
| PNMessage PNMessageData
|
||||
-- | PNAlert Text
|
||||
| PNCheckMessages
|
||||
| -- | PNAlert Text
|
||||
PNCheckMessages
|
||||
deriving (Show)
|
||||
|
||||
data PNMessageData = PNMessageData
|
||||
@@ -194,9 +197,9 @@ data APNSPushClientConfig = APNSPushClientConfig
|
||||
appName :: ByteString,
|
||||
appTeamId :: Text,
|
||||
apnsPort :: ServiceName,
|
||||
http2cfg :: HTTP2ClientConfig
|
||||
http2cfg :: HTTP2ClientConfig,
|
||||
caStoreFile :: FilePath
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
apnsProviderHost :: PushProvider -> HostName
|
||||
apnsProviderHost = \case
|
||||
@@ -215,7 +218,8 @@ defaultAPNSPushClientConfig =
|
||||
appName = "chat.simplex.app",
|
||||
appTeamId = "5NN7GUYB6T",
|
||||
apnsPort = "443",
|
||||
http2cfg = defaultHTTP2ClientConfig
|
||||
http2cfg = defaultHTTP2ClientConfig {bufferSize = 16384},
|
||||
caStoreFile = "/etc/ssl/cert.pem"
|
||||
}
|
||||
|
||||
data APNSPushClient = APNSPushClient
|
||||
@@ -259,8 +263,10 @@ mkApnsJWTToken appTeamId jwtHeader privateKey = do
|
||||
pure (jwt, signedJWT)
|
||||
|
||||
connectHTTPS2 :: HostName -> APNSPushClientConfig -> TVar (Maybe HTTP2Client) -> IO (Either HTTP2ClientError HTTP2Client)
|
||||
connectHTTPS2 apnsHost APNSPushClientConfig {apnsPort, http2cfg} https2Client = do
|
||||
r <- getHTTP2Client apnsHost apnsPort http2cfg disconnected
|
||||
connectHTTPS2 apnsHost APNSPushClientConfig {apnsPort, http2cfg, caStoreFile} https2Client = do
|
||||
caStore_ <- XS.readCertificateStore caStoreFile
|
||||
when (isNothing caStore_) $ putStrLn $ "Error loading CertificateStore from " <> caStoreFile
|
||||
r <- getHTTP2Client apnsHost apnsPort caStore_ http2cfg disconnected
|
||||
case r of
|
||||
Right client -> atomically . writeTVar https2Client $ Just client
|
||||
Left e -> putStrLn $ "Error connecting to APNS: " <> show e
|
||||
@@ -294,7 +300,8 @@ apnsNotification NtfTknData {tknDhSecret} nonce paddedLen = \case
|
||||
encrypt ntfData f = f . safeDecodeUtf8 . U.encode <$> C.cbEncrypt tknDhSecret nonce ntfData paddedLen
|
||||
apn aps notificationData = APNSNotification {aps, notificationData}
|
||||
apnMutableContent = APNSMutableContent {mutableContent = 1, alert = APNSAlertText "Encrypted message or another app event", category = Just ntfCategoryCheckMessage}
|
||||
-- apnAlert alert = APNSAlert {alert, badge = Nothing, sound = Nothing, category = Nothing}
|
||||
|
||||
-- apnAlert alert = APNSAlert {alert, badge = Nothing, sound = Nothing, category = Nothing}
|
||||
|
||||
apnsRequest :: APNSPushClient -> ByteString -> APNSNotification -> IO Request
|
||||
apnsRequest c tkn ntf@APNSNotification {aps} = do
|
||||
@@ -336,9 +343,9 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknData {toke
|
||||
nonce <- atomically $ C.pseudoRandomCbNonce nonceDrg
|
||||
apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn
|
||||
req <- liftIO $ apnsRequest c tknStr apnsNtf
|
||||
HTTP2Response {response, respBody} <- liftHTTPS2 $ sendRequest http2 req
|
||||
HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req
|
||||
let status = H.responseStatus response
|
||||
reason' = maybe "" reason $ J.decodeStrict' respBody
|
||||
reason' = maybe "" reason $ J.decodeStrict' bodyHead
|
||||
logDebug $ "APNS response: " <> T.pack (show status) <> " " <> reason'
|
||||
result status reason'
|
||||
where
|
||||
|
||||
@@ -38,7 +38,7 @@ import qualified Data.Map.Strict as M
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..), ServerQueueStatus (..))
|
||||
import Simplex.Messaging.Transport (trimCR)
|
||||
import Simplex.Messaging.Transport.Buffer (trimCR)
|
||||
import System.Directory (doesFileExist)
|
||||
import System.IO
|
||||
|
||||
|
||||
@@ -56,9 +56,6 @@ module Simplex.Messaging.Transport
|
||||
transportErrorP,
|
||||
sendHandshake,
|
||||
getHandshake,
|
||||
|
||||
-- * Trim trailing CR
|
||||
trimCR,
|
||||
)
|
||||
where
|
||||
|
||||
@@ -86,6 +83,7 @@ import qualified Paths_simplexmq as SMQ
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Parsers (dropPrefix, parse, parseRead1, sumTypeJSON)
|
||||
import Simplex.Messaging.Transport.Buffer
|
||||
import Simplex.Messaging.Util (bshow, catchAll, catchAll_)
|
||||
import Simplex.Messaging.Version
|
||||
import Test.QuickCheck (Arbitrary (..))
|
||||
@@ -152,8 +150,7 @@ data TLS = TLS
|
||||
{ tlsContext :: T.Context,
|
||||
tlsPeer :: TransportPeer,
|
||||
tlsUniq :: ByteString,
|
||||
buffer :: TVar ByteString,
|
||||
getLock :: TMVar ()
|
||||
tlsBuffer :: TBuffer
|
||||
}
|
||||
|
||||
connectTLS :: T.TLSParams p => Maybe HostName -> Bool -> p -> Socket -> IO T.Context
|
||||
@@ -169,9 +166,8 @@ getTLS :: TransportPeer -> T.Context -> IO TLS
|
||||
getTLS tlsPeer cxt = withTlsUnique tlsPeer cxt newTLS
|
||||
where
|
||||
newTLS tlsUniq = do
|
||||
buffer <- newTVarIO ""
|
||||
getLock <- newTMVarIO ()
|
||||
pure TLS {tlsContext = cxt, tlsPeer, tlsUniq, buffer, getLock}
|
||||
tlsBuffer <- atomically newTBuffer
|
||||
pure TLS {tlsContext = cxt, tlsPeer, tlsUniq, tlsBuffer}
|
||||
|
||||
withTlsUnique :: TransportPeer -> T.Context -> (ByteString -> IO c) -> IO c
|
||||
withTlsUnique peer cxt f =
|
||||
@@ -209,52 +205,22 @@ instance Transport TLS where
|
||||
closeConnection tls = closeTLS $ tlsContext tls
|
||||
|
||||
cGet :: TLS -> Int -> IO ByteString
|
||||
cGet TLS {tlsContext, buffer, getLock} n =
|
||||
E.bracket_
|
||||
(atomically $ takeTMVar getLock)
|
||||
(atomically $ putTMVar getLock ())
|
||||
$ do
|
||||
b <- readChunks =<< readTVarIO buffer
|
||||
let (s, b') = B.splitAt n b
|
||||
atomically $ writeTVar buffer $! b'
|
||||
pure s
|
||||
where
|
||||
readChunks :: ByteString -> IO ByteString
|
||||
readChunks b
|
||||
| B.length b >= n = pure b
|
||||
| otherwise =
|
||||
T.recvData tlsContext >>= \case
|
||||
-- https://hackage.haskell.org/package/tls-1.6.0/docs/Network-TLS.html#v:recvData
|
||||
"" -> ioe_EOF
|
||||
s -> readChunks $ b <> s
|
||||
cGet TLS {tlsContext, tlsBuffer} n = do
|
||||
s <- getBuffered tlsBuffer n (T.recvData tlsContext)
|
||||
-- https://hackage.haskell.org/package/tls-1.6.0/docs/Network-TLS.html#v:recvData
|
||||
if B.length s == n then pure s else ioe_EOF
|
||||
|
||||
cPut :: TLS -> ByteString -> IO ()
|
||||
cPut tls = T.sendData (tlsContext tls) . BL.fromStrict
|
||||
|
||||
getLn :: TLS -> IO ByteString
|
||||
getLn TLS {tlsContext, buffer, getLock} = do
|
||||
E.bracket_
|
||||
(atomically $ takeTMVar getLock)
|
||||
(atomically $ putTMVar getLock ())
|
||||
$ do
|
||||
b <- readChunks =<< readTVarIO buffer
|
||||
let (s, b') = B.break (== '\n') b
|
||||
atomically $ writeTVar buffer $! B.drop 1 b' -- drop '\n' we made a break at
|
||||
pure $ trimCR s
|
||||
getLn TLS {tlsContext, tlsBuffer} = do
|
||||
getLnBuffered tlsBuffer (T.recvData tlsContext) `E.catch` handleEOF
|
||||
where
|
||||
readChunks :: ByteString -> IO ByteString
|
||||
readChunks b
|
||||
| B.elem '\n' b = pure b
|
||||
| otherwise = readChunks . (b <>) =<< T.recvData tlsContext `E.catch` handleEOF
|
||||
handleEOF = \case
|
||||
T.Error_EOF -> E.throwIO TEBadBlock
|
||||
e -> E.throwIO e
|
||||
|
||||
-- | Trim trailing CR from ByteString.
|
||||
trimCR :: ByteString -> ByteString
|
||||
trimCR "" = ""
|
||||
trimCR s = if B.last s == '\r' then B.init s else s
|
||||
|
||||
-- * SMP transport
|
||||
|
||||
-- | The handle for SMP encrypted transport connection over Transport .
|
||||
|
||||
61
src/Simplex/Messaging/Transport/Buffer.hs
Normal file
61
src/Simplex/Messaging/Transport/Buffer.hs
Normal file
@@ -0,0 +1,61 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Transport.Buffer where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import qualified Control.Exception as E
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
|
||||
data TBuffer = TBuffer
|
||||
{ buffer :: TVar ByteString,
|
||||
getLock :: TMVar ()
|
||||
}
|
||||
|
||||
newTBuffer :: STM TBuffer
|
||||
newTBuffer = do
|
||||
buffer <- newTVar ""
|
||||
getLock <- newTMVar ()
|
||||
pure TBuffer {buffer, getLock}
|
||||
|
||||
withBufferLock :: TBuffer -> IO a -> IO a
|
||||
withBufferLock TBuffer {getLock} =
|
||||
E.bracket_
|
||||
(atomically $ takeTMVar getLock)
|
||||
(atomically $ putTMVar getLock ())
|
||||
|
||||
getBuffered :: TBuffer -> Int -> IO ByteString -> IO ByteString
|
||||
getBuffered tb@TBuffer {buffer} n getChunk = withBufferLock tb $ do
|
||||
b <- readChunks =<< readTVarIO buffer
|
||||
let (s, b') = B.splitAt n b
|
||||
atomically $ writeTVar buffer $! b'
|
||||
pure s
|
||||
where
|
||||
readChunks :: ByteString -> IO ByteString
|
||||
readChunks b
|
||||
| B.length b >= n = pure b
|
||||
| otherwise =
|
||||
getChunk >>= \case
|
||||
"" -> pure b
|
||||
s -> readChunks $ b <> s
|
||||
|
||||
-- This function is only used in test and needs to be improved before it can be used in production,
|
||||
-- it will never complete if TLS connection is closed before there is newline.
|
||||
getLnBuffered :: TBuffer -> IO ByteString -> IO ByteString
|
||||
getLnBuffered tb@TBuffer {buffer} getChunk = withBufferLock tb $ do
|
||||
b <- readChunks =<< readTVarIO buffer
|
||||
let (s, b') = B.break (== '\n') b
|
||||
atomically $ writeTVar buffer $! B.drop 1 b' -- drop '\n' we made a break at
|
||||
pure $ trimCR s
|
||||
where
|
||||
readChunks :: ByteString -> IO ByteString
|
||||
readChunks b
|
||||
| B.elem '\n' b = pure b
|
||||
| otherwise = readChunks . (b <>) =<< getChunk
|
||||
|
||||
-- | Trim trailing CR from ByteString.
|
||||
trimCR :: ByteString -> ByteString
|
||||
trimCR "" = ""
|
||||
trimCR s = if B.last s == '\r' then B.init s else s
|
||||
@@ -1,20 +1,29 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
|
||||
module Simplex.Messaging.Transport.HTTP2 where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import qualified Control.Exception as E
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Default (def)
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Foreign (mallocBytes)
|
||||
import Network.HPACK (BufferSize)
|
||||
import Network.HTTP2.Client (Config (..), defaultPositionReadMaker, freeSimpleConfig)
|
||||
import qualified Network.HTTP2.Client as HC
|
||||
import qualified Network.HTTP2.Server as HS
|
||||
import qualified Network.TLS as T
|
||||
import qualified Network.TLS.Extra as TE
|
||||
import Simplex.Messaging.Transport (TLS, Transport (cGet, cPut))
|
||||
import Simplex.Messaging.Transport (SessionId, TLS (tlsUniq), Transport (cGet, cPut))
|
||||
import Simplex.Messaging.Transport.Buffer
|
||||
import qualified System.TimeManager as TI
|
||||
|
||||
withTlsConfig :: TLS -> BufferSize -> (Config -> IO ()) -> IO ()
|
||||
withTlsConfig c sz = E.bracket (allocTlsConfig c sz) freeSimpleConfig
|
||||
withHTTP2 :: BufferSize -> (Config -> SessionId -> IO ()) -> TLS -> IO ()
|
||||
withHTTP2 sz run c = E.bracket (allocHTTP2Config c sz) freeSimpleConfig (`run` tlsUniq c)
|
||||
|
||||
allocTlsConfig :: TLS -> BufferSize -> IO Config
|
||||
allocTlsConfig c sz = do
|
||||
allocHTTP2Config :: TLS -> BufferSize -> IO Config
|
||||
allocHTTP2Config c sz = do
|
||||
buf <- mallocBytes sz
|
||||
tm <- TI.initialize $ 30 * 1000000
|
||||
pure
|
||||
@@ -34,3 +43,35 @@ http2TLSParams =
|
||||
T.supportedCiphers = TE.ciphersuite_strong_det,
|
||||
T.supportedSecureRenegotiation = False
|
||||
}
|
||||
|
||||
data HTTP2Body = HTTP2Body
|
||||
{ bodyHead :: ByteString,
|
||||
bodySize :: Int,
|
||||
bodyPart :: Maybe (Int -> IO ByteString),
|
||||
bodyBuffer :: TBuffer
|
||||
}
|
||||
|
||||
class HTTP2BodyChunk a where
|
||||
getBodyChunk :: a -> IO ByteString
|
||||
getBodeSize :: a -> Maybe Int
|
||||
|
||||
instance HTTP2BodyChunk HC.Response where
|
||||
getBodyChunk = HC.getResponseBodyChunk
|
||||
{-# INLINE getBodyChunk #-}
|
||||
getBodeSize = HC.responseBodySize
|
||||
{-# INLINE getBodeSize #-}
|
||||
|
||||
instance HTTP2BodyChunk HS.Request where
|
||||
getBodyChunk = HS.getRequestBodyChunk
|
||||
{-# INLINE getBodyChunk #-}
|
||||
getBodeSize = HS.requestBodySize
|
||||
{-# INLINE getBodeSize #-}
|
||||
|
||||
getHTTP2Body :: HTTP2BodyChunk a => a -> Int -> IO HTTP2Body
|
||||
getHTTP2Body r n = do
|
||||
bodyBuffer <- atomically newTBuffer
|
||||
let getPart n' = getBuffered bodyBuffer n' $ getBodyChunk r
|
||||
bodyHead <- getPart n
|
||||
let bodySize = fromMaybe 0 $ getBodeSize r
|
||||
bodyPart = if bodySize > n && B.length bodyHead == n then Just getPart else Nothing
|
||||
pure HTTP2Body {bodyHead, bodySize, bodyPart, bodyBuffer}
|
||||
|
||||
@@ -10,24 +10,32 @@ import Control.Exception (IOException)
|
||||
import qualified Control.Exception as E
|
||||
import Control.Monad.Except
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Maybe (isNothing)
|
||||
import Data.Time (UTCTime, getCurrentTime)
|
||||
import qualified Data.X509.CertificateStore as XS
|
||||
import Network.HPACK (HeaderTable)
|
||||
import Network.HPACK (BufferSize)
|
||||
import Network.HTTP2.Client (ClientConfig (..), Request, Response)
|
||||
import qualified Network.HTTP2.Client as H
|
||||
import Network.Socket (HostName, ServiceName)
|
||||
import qualified Network.TLS as T
|
||||
import Numeric.Natural (Natural)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Transport (SessionId)
|
||||
import Simplex.Messaging.Transport.Client (TransportClientConfig (..), TransportHost (..), runTLSTransportClient)
|
||||
import Simplex.Messaging.Transport.HTTP2 (http2TLSParams, withTlsConfig)
|
||||
import Simplex.Messaging.Transport.HTTP2 (HTTP2Body, getHTTP2Body, http2TLSParams, withHTTP2)
|
||||
import UnliftIO.STM
|
||||
import UnliftIO.Timeout
|
||||
|
||||
data HTTP2Client = HTTP2Client
|
||||
{ action :: Maybe (Async ()),
|
||||
connected :: TVar Bool,
|
||||
host :: HostName,
|
||||
sessionId :: SessionId,
|
||||
sessionTs :: UTCTime,
|
||||
client_ :: HClient
|
||||
}
|
||||
|
||||
data HClient = HClient
|
||||
{ connected :: TVar Bool,
|
||||
host :: TransportHost,
|
||||
port :: ServiceName,
|
||||
config :: HTTP2ClientConfig,
|
||||
reqQ :: TBQueue (Request, TMVar HTTP2Response)
|
||||
@@ -35,15 +43,15 @@ data HTTP2Client = HTTP2Client
|
||||
|
||||
data HTTP2Response = HTTP2Response
|
||||
{ response :: Response,
|
||||
respBody :: ByteString,
|
||||
respTrailers :: Maybe HeaderTable
|
||||
respBody :: HTTP2Body
|
||||
}
|
||||
|
||||
data HTTP2ClientConfig = HTTP2ClientConfig
|
||||
{ qSize :: Natural,
|
||||
connTimeout :: Int,
|
||||
transportConfig :: TransportClientConfig,
|
||||
caStoreFile :: FilePath,
|
||||
bufferSize :: BufferSize,
|
||||
bodyHeadSize :: Int,
|
||||
suportedTLSParams :: T.Supported
|
||||
}
|
||||
deriving (Show)
|
||||
@@ -54,73 +62,69 @@ defaultHTTP2ClientConfig =
|
||||
{ qSize = 64,
|
||||
connTimeout = 10000000,
|
||||
transportConfig = TransportClientConfig Nothing Nothing True,
|
||||
caStoreFile = "/etc/ssl/cert.pem",
|
||||
bufferSize = 32768,
|
||||
bodyHeadSize = 16384,
|
||||
suportedTLSParams = http2TLSParams
|
||||
}
|
||||
|
||||
data HTTP2ClientError = HCResponseTimeout | HCNetworkError | HCNetworkError1 | HCIOError IOException
|
||||
data HTTP2ClientError = HCResponseTimeout | HCNetworkError | HCIOError IOException
|
||||
deriving (Show)
|
||||
|
||||
getHTTP2Client :: HostName -> ServiceName -> HTTP2ClientConfig -> IO () -> IO (Either HTTP2ClientError HTTP2Client)
|
||||
getHTTP2Client host port config@HTTP2ClientConfig {transportConfig, connTimeout, caStoreFile, suportedTLSParams} disconnected =
|
||||
getHTTP2Client :: HostName -> ServiceName -> Maybe XS.CertificateStore -> HTTP2ClientConfig -> IO () -> IO (Either HTTP2ClientError HTTP2Client)
|
||||
getHTTP2Client host port = getVerifiedHTTP2Client Nothing (THDomainName host) port Nothing
|
||||
|
||||
getVerifiedHTTP2Client :: Maybe ByteString -> TransportHost -> ServiceName -> Maybe C.KeyHash -> Maybe XS.CertificateStore -> HTTP2ClientConfig -> IO () -> IO (Either HTTP2ClientError HTTP2Client)
|
||||
getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2ClientConfig {transportConfig, bufferSize, bodyHeadSize, connTimeout, suportedTLSParams} disconnected =
|
||||
(atomically mkHTTPS2Client >>= runClient)
|
||||
`E.catch` \(e :: IOException) -> pure . Left $ HCIOError e
|
||||
where
|
||||
mkHTTPS2Client :: STM HTTP2Client
|
||||
mkHTTPS2Client :: STM HClient
|
||||
mkHTTPS2Client = do
|
||||
connected <- newTVar False
|
||||
reqQ <- newTBQueue $ qSize config
|
||||
pure HTTP2Client {action = Nothing, connected, host, port, config, reqQ}
|
||||
pure HClient {connected, host, port, config, reqQ}
|
||||
|
||||
runClient :: HTTP2Client -> IO (Either HTTP2ClientError HTTP2Client)
|
||||
runClient :: HClient -> IO (Either HTTP2ClientError HTTP2Client)
|
||||
runClient c = do
|
||||
cVar <- newEmptyTMVarIO
|
||||
caStore <- XS.readCertificateStore caStoreFile
|
||||
when (isNothing caStore) . putStrLn $ "Error loading CertificateStore from " <> caStoreFile
|
||||
action <-
|
||||
async $
|
||||
runHTTP2Client suportedTLSParams caStore transportConfig host port (client c cVar)
|
||||
runHTTP2Client suportedTLSParams caStore transportConfig bufferSize proxyUsername host port keyHash (client c cVar)
|
||||
`E.finally` atomically (putTMVar cVar $ Left HCNetworkError)
|
||||
conn_ <- connTimeout `timeout` atomically (takeTMVar cVar)
|
||||
pure $ case conn_ of
|
||||
Just (Right ()) -> Right c {action = Just action}
|
||||
c_ <- connTimeout `timeout` atomically (takeTMVar cVar)
|
||||
pure $ case c_ of
|
||||
Just (Right c') -> Right c' {action = Just action}
|
||||
Just (Left e) -> Left e
|
||||
Nothing -> Left HCNetworkError1
|
||||
Nothing -> Left HCNetworkError
|
||||
|
||||
client :: HTTP2Client -> TMVar (Either HTTP2ClientError ()) -> (Request -> (Response -> IO ()) -> IO ()) -> IO ()
|
||||
client c cVar sendReq = do
|
||||
client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client ()
|
||||
client c cVar sessionId sendReq = do
|
||||
sessionTs <- getCurrentTime
|
||||
let c' = HTTP2Client {action = Nothing, client_ = c, sessionId, sessionTs}
|
||||
atomically $ do
|
||||
writeTVar (connected c) True
|
||||
putTMVar cVar $ Right ()
|
||||
process c sendReq `E.finally` disconnected
|
||||
putTMVar cVar (Right c')
|
||||
process c' sendReq `E.finally` disconnected
|
||||
|
||||
process :: HTTP2Client -> (Request -> (Response -> IO ()) -> IO ()) -> IO ()
|
||||
process HTTP2Client {reqQ} sendReq = forever $ do
|
||||
process :: HTTP2Client -> H.Client ()
|
||||
process HTTP2Client {client_ = HClient {reqQ}} sendReq = forever $ do
|
||||
(req, respVar) <- atomically $ readTBQueue reqQ
|
||||
sendReq req $ \r -> do
|
||||
let writeResp respBody respTrailers = atomically $ putTMVar respVar HTTP2Response {response = r, respBody, respTrailers}
|
||||
respBody <- getResponseBody r ""
|
||||
respTrailers <- H.getResponseTrailers r
|
||||
writeResp respBody respTrailers
|
||||
|
||||
getResponseBody :: Response -> ByteString -> IO ByteString
|
||||
getResponseBody r s =
|
||||
H.getResponseBodyChunk r >>= \chunk ->
|
||||
if B.null chunk then pure s else getResponseBody r $ s <> chunk
|
||||
respBody <- getHTTP2Body r bodyHeadSize
|
||||
atomically $ putTMVar respVar HTTP2Response {response = r, respBody}
|
||||
|
||||
-- | Disconnects client from the server and terminates client threads.
|
||||
closeHTTP2Client :: HTTP2Client -> IO ()
|
||||
closeHTTP2Client = mapM_ uninterruptibleCancel . action
|
||||
|
||||
sendRequest :: HTTP2Client -> Request -> IO (Either HTTP2ClientError HTTP2Response)
|
||||
sendRequest HTTP2Client {reqQ, config} req = do
|
||||
sendRequest HTTP2Client {client_ = HClient {config, reqQ}} req = do
|
||||
resp <- newEmptyTMVarIO
|
||||
atomically $ writeTBQueue reqQ (req, resp)
|
||||
maybe (Left HCResponseTimeout) Right <$> (connTimeout config `timeout` atomically (takeTMVar resp))
|
||||
|
||||
runHTTP2Client :: T.Supported -> Maybe XS.CertificateStore -> TransportClientConfig -> HostName -> ServiceName -> ((Request -> (Response -> IO ()) -> IO ()) -> IO ()) -> IO ()
|
||||
runHTTP2Client tlsParams caStore tcConfig host port client =
|
||||
runTLSTransportClient tlsParams caStore tcConfig Nothing (THDomainName host) port Nothing $ \c ->
|
||||
withTlsConfig c 16384 (`run` client)
|
||||
runHTTP2Client :: T.Supported -> Maybe XS.CertificateStore -> TransportClientConfig -> BufferSize -> Maybe ByteString -> TransportHost -> ServiceName -> Maybe C.KeyHash -> (SessionId -> H.Client ()) -> IO ()
|
||||
runHTTP2Client tlsParams caStore tcConfig bufferSize proxyUsername host port keyHash client =
|
||||
runTLSTransportClient tlsParams caStore tcConfig proxyUsername host port keyHash $ withHTTP2 bufferSize run
|
||||
where
|
||||
run = H.run $ ClientConfig "https" (B.pack host) 20
|
||||
run cfg = H.run (ClientConfig "https" (strEncode host) 20) cfg . client
|
||||
|
||||
@@ -6,22 +6,23 @@ module Simplex.Messaging.Transport.HTTP2.Server where
|
||||
import Control.Concurrent.Async (Async, async, uninterruptibleCancel)
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad
|
||||
import Data.ByteString (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Network.HPACK (HeaderTable)
|
||||
import Network.HTTP2.Server (Aux, PushPromise, Request, Response)
|
||||
import Network.HPACK (BufferSize)
|
||||
import Network.HTTP2.Server (Request, Response)
|
||||
import qualified Network.HTTP2.Server as H
|
||||
import Network.Socket
|
||||
import qualified Network.TLS as T
|
||||
import Numeric.Natural (Natural)
|
||||
import Simplex.Messaging.Transport.HTTP2 (withTlsConfig)
|
||||
import Simplex.Messaging.Transport (SessionId)
|
||||
import Simplex.Messaging.Transport.HTTP2 (HTTP2Body, getHTTP2Body, withHTTP2)
|
||||
import Simplex.Messaging.Transport.Server (loadSupportedTLSServerParams, runTransportServer)
|
||||
|
||||
type HTTP2ServerFunc = (Request -> (Response -> IO ()) -> IO ())
|
||||
type HTTP2ServerFunc = SessionId -> Request -> (Response -> IO ()) -> IO ()
|
||||
|
||||
data HTTP2ServerConfig = HTTP2ServerConfig
|
||||
{ qSize :: Natural,
|
||||
http2Port :: ServiceName,
|
||||
bufferSize :: BufferSize,
|
||||
bodyHeadSize :: Int,
|
||||
serverSupported :: T.Supported,
|
||||
caCertificateFile :: FilePath,
|
||||
privateKeyFile :: FilePath,
|
||||
@@ -31,9 +32,9 @@ data HTTP2ServerConfig = HTTP2ServerConfig
|
||||
deriving (Show)
|
||||
|
||||
data HTTP2Request = HTTP2Request
|
||||
{ request :: Request,
|
||||
reqBody :: ByteString,
|
||||
reqTrailers :: Maybe HeaderTable,
|
||||
{ sessionId :: SessionId,
|
||||
request :: Request,
|
||||
reqBody :: HTTP2Body,
|
||||
sendResponse :: Response -> IO ()
|
||||
}
|
||||
|
||||
@@ -43,29 +44,22 @@ data HTTP2Server = HTTP2Server
|
||||
}
|
||||
|
||||
getHTTP2Server :: HTTP2ServerConfig -> IO HTTP2Server
|
||||
getHTTP2Server HTTP2ServerConfig {qSize, http2Port, serverSupported, caCertificateFile, certificateFile, privateKeyFile, logTLSErrors} = do
|
||||
getHTTP2Server HTTP2ServerConfig {qSize, http2Port, bufferSize, bodyHeadSize, serverSupported, caCertificateFile, certificateFile, privateKeyFile, logTLSErrors} = do
|
||||
tlsServerParams <- loadSupportedTLSServerParams serverSupported caCertificateFile certificateFile privateKeyFile
|
||||
started <- newEmptyTMVarIO
|
||||
reqQ <- newTBQueueIO qSize
|
||||
action <- async $
|
||||
runHTTP2Server started http2Port tlsServerParams logTLSErrors $ \r sendResponse -> do
|
||||
reqBody <- getRequestBody r ""
|
||||
reqTrailers <- H.getRequestTrailers r
|
||||
atomically $ writeTBQueue reqQ HTTP2Request {request = r, reqBody, reqTrailers, sendResponse}
|
||||
runHTTP2Server started http2Port bufferSize tlsServerParams logTLSErrors $ \sessionId r sendResponse -> do
|
||||
reqBody <- getHTTP2Body r bodyHeadSize
|
||||
atomically $ writeTBQueue reqQ HTTP2Request {sessionId, request = r, reqBody, sendResponse}
|
||||
void . atomically $ takeTMVar started
|
||||
pure HTTP2Server {action, reqQ}
|
||||
where
|
||||
getRequestBody :: Request -> ByteString -> IO ByteString
|
||||
getRequestBody r s =
|
||||
H.getRequestBodyChunk r >>= \chunk ->
|
||||
if B.null chunk then pure s else getRequestBody r $ s <> chunk
|
||||
|
||||
closeHTTP2Server :: HTTP2Server -> IO ()
|
||||
closeHTTP2Server = uninterruptibleCancel . action
|
||||
|
||||
runHTTP2Server :: TMVar Bool -> ServiceName -> T.ServerParams -> Bool -> HTTP2ServerFunc -> IO ()
|
||||
runHTTP2Server started port serverParams logTLSErrors http2Server =
|
||||
runTransportServer started port serverParams logTLSErrors $ \c -> withTlsConfig c 16384 (`H.run` server)
|
||||
runHTTP2Server :: TMVar Bool -> ServiceName -> BufferSize -> T.ServerParams -> Bool -> HTTP2ServerFunc -> IO ()
|
||||
runHTTP2Server started port bufferSize serverParams logTLSErrors http2Server =
|
||||
runTransportServer started port serverParams logTLSErrors $ withHTTP2 bufferSize run
|
||||
where
|
||||
server :: Request -> Aux -> (Response -> [PushPromise] -> IO ()) -> IO ()
|
||||
server req _aux sendResp = http2Server req (`sendResp` [])
|
||||
run cfg sessId = H.run cfg $ \req _aux sendResp -> http2Server sessId req (`sendResp` [])
|
||||
|
||||
@@ -19,9 +19,9 @@ import Simplex.Messaging.Transport
|
||||
TransportPeer (..),
|
||||
closeTLS,
|
||||
smpBlockSize,
|
||||
trimCR,
|
||||
withTlsUnique,
|
||||
)
|
||||
import Simplex.Messaging.Transport.Buffer (trimCR)
|
||||
|
||||
data WS = WS
|
||||
{ wsPeer :: TransportPeer,
|
||||
|
||||
@@ -41,8 +41,7 @@ import Simplex.Messaging.Notifications.Transport
|
||||
import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Transport.Client
|
||||
import Simplex.Messaging.Transport.HTTP2 (http2TLSParams)
|
||||
import Simplex.Messaging.Transport.HTTP2.Client
|
||||
import Simplex.Messaging.Transport.HTTP2 (HTTP2Body (..), http2TLSParams)
|
||||
import Simplex.Messaging.Transport.HTTP2.Server
|
||||
import Test.Hspec
|
||||
import UnliftIO.Async
|
||||
@@ -87,7 +86,7 @@ ntfServerCfg =
|
||||
apnsConfig =
|
||||
defaultAPNSPushClientConfig
|
||||
{ apnsPort = apnsTestPort,
|
||||
http2cfg = defaultHTTP2ClientConfig {caStoreFile = "tests/fixtures/ca.crt"}
|
||||
caStoreFile = "tests/fixtures/ca.crt"
|
||||
},
|
||||
inactiveClientExpiration = Just defaultInactiveClientExpiration,
|
||||
storeLogFile = Nothing,
|
||||
@@ -175,6 +174,8 @@ apnsMockServerConfig =
|
||||
HTTP2ServerConfig
|
||||
{ qSize = 1,
|
||||
http2Port = apnsTestPort,
|
||||
bufferSize = 16384,
|
||||
bodyHeadSize = 16384,
|
||||
serverSupported = http2TLSParams,
|
||||
caCertificateFile = "tests/fixtures/ca.crt",
|
||||
privateKeyFile = "tests/fixtures/server.key",
|
||||
@@ -210,16 +211,16 @@ getAPNSMockServer config@HTTP2ServerConfig {qSize} = do
|
||||
pure APNSMockServer {action, apnsQ, http2Server}
|
||||
where
|
||||
runAPNSMockServer apnsQ HTTP2Server {reqQ} = forever $ do
|
||||
HTTP2Request {reqBody, sendResponse} <- atomically $ readTBQueue reqQ
|
||||
HTTP2Request {reqBody = HTTP2Body {bodyHead}, sendResponse} <- atomically $ readTBQueue reqQ
|
||||
let sendApnsResponse = \case
|
||||
APNSRespOk -> sendResponse $ H.responseNoBody N.ok200 []
|
||||
APNSRespError status reason ->
|
||||
sendResponse . H.responseBuilder status [] . lazyByteString $ J.encode APNSErrorResponse {reason}
|
||||
case J.decodeStrict' reqBody of
|
||||
case J.decodeStrict' bodyHead of
|
||||
Just notification ->
|
||||
atomically $ writeTBQueue apnsQ APNSMockRequest {notification, sendApnsResponse}
|
||||
_ -> do
|
||||
putStrLn $ "runAPNSMockServer J.decodeStrict' error, reqBody: " <> show reqBody
|
||||
putStrLn $ "runAPNSMockServer J.decodeStrict' error, reqBody: " <> show bodyHead
|
||||
sendApnsResponse $ APNSRespError N.badRequest400 "bad_request_body"
|
||||
|
||||
closeAPNSMockServer :: APNSMockServer -> IO ()
|
||||
|
||||
Reference in New Issue
Block a user