diff --git a/simplexmq.cabal b/simplexmq.cabal index 357cdc17c..fb6e1728e 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -92,6 +92,7 @@ library Simplex.Messaging.Server.StoreLog Simplex.Messaging.TMap Simplex.Messaging.Transport + Simplex.Messaging.Transport.Buffer Simplex.Messaging.Transport.Client Simplex.Messaging.Transport.HTTP2 Simplex.Messaging.Transport.HTTP2.Client diff --git a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs index 5221d7dbc..5eee63447 100644 --- a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs +++ b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs @@ -32,10 +32,12 @@ import qualified Data.ByteString.Lazy.Char8 as LB import qualified Data.CaseInsensitive as CI import Data.Int (Int64) import Data.Map.Strict (Map) +import Data.Maybe (isNothing) import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock.System import qualified Data.X509 as X +import qualified Data.X509.CertificateStore as XS import GHC.Generics (Generic) import Network.HTTP.Types (HeaderName, Status) import qualified Network.HTTP.Types as N @@ -47,6 +49,7 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Store (NtfTknData (..)) import Simplex.Messaging.Protocol (EncNMsgMeta) +import Simplex.Messaging.Transport.HTTP2 (HTTP2Body (..)) import Simplex.Messaging.Transport.HTTP2.Client import Simplex.Messaging.Util (safeDecodeUtf8) import System.Environment (getEnv) @@ -97,8 +100,8 @@ readECPrivateKey f = do data PushNotification = PNVerification NtfRegCode | PNMessage PNMessageData - -- | PNAlert Text - | PNCheckMessages + | -- | PNAlert Text + PNCheckMessages deriving (Show) data PNMessageData = PNMessageData @@ -194,9 +197,9 @@ data APNSPushClientConfig = APNSPushClientConfig appName :: ByteString, appTeamId :: Text, apnsPort :: ServiceName, - http2cfg :: HTTP2ClientConfig + http2cfg :: HTTP2ClientConfig, + caStoreFile :: FilePath } - deriving (Show) apnsProviderHost :: PushProvider -> HostName apnsProviderHost = \case @@ -215,7 +218,8 @@ defaultAPNSPushClientConfig = appName = "chat.simplex.app", appTeamId = "5NN7GUYB6T", apnsPort = "443", - http2cfg = defaultHTTP2ClientConfig + http2cfg = defaultHTTP2ClientConfig {bufferSize = 16384}, + caStoreFile = "/etc/ssl/cert.pem" } data APNSPushClient = APNSPushClient @@ -259,8 +263,10 @@ mkApnsJWTToken appTeamId jwtHeader privateKey = do pure (jwt, signedJWT) connectHTTPS2 :: HostName -> APNSPushClientConfig -> TVar (Maybe HTTP2Client) -> IO (Either HTTP2ClientError HTTP2Client) -connectHTTPS2 apnsHost APNSPushClientConfig {apnsPort, http2cfg} https2Client = do - r <- getHTTP2Client apnsHost apnsPort http2cfg disconnected +connectHTTPS2 apnsHost APNSPushClientConfig {apnsPort, http2cfg, caStoreFile} https2Client = do + caStore_ <- XS.readCertificateStore caStoreFile + when (isNothing caStore_) $ putStrLn $ "Error loading CertificateStore from " <> caStoreFile + r <- getHTTP2Client apnsHost apnsPort caStore_ http2cfg disconnected case r of Right client -> atomically . writeTVar https2Client $ Just client Left e -> putStrLn $ "Error connecting to APNS: " <> show e @@ -294,7 +300,8 @@ apnsNotification NtfTknData {tknDhSecret} nonce paddedLen = \case encrypt ntfData f = f . safeDecodeUtf8 . U.encode <$> C.cbEncrypt tknDhSecret nonce ntfData paddedLen apn aps notificationData = APNSNotification {aps, notificationData} apnMutableContent = APNSMutableContent {mutableContent = 1, alert = APNSAlertText "Encrypted message or another app event", category = Just ntfCategoryCheckMessage} - -- apnAlert alert = APNSAlert {alert, badge = Nothing, sound = Nothing, category = Nothing} + +-- apnAlert alert = APNSAlert {alert, badge = Nothing, sound = Nothing, category = Nothing} apnsRequest :: APNSPushClient -> ByteString -> APNSNotification -> IO Request apnsRequest c tkn ntf@APNSNotification {aps} = do @@ -336,9 +343,9 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknData {toke nonce <- atomically $ C.pseudoRandomCbNonce nonceDrg apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn req <- liftIO $ apnsRequest c tknStr apnsNtf - HTTP2Response {response, respBody} <- liftHTTPS2 $ sendRequest http2 req + HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req let status = H.responseStatus response - reason' = maybe "" reason $ J.decodeStrict' respBody + reason' = maybe "" reason $ J.decodeStrict' bodyHead logDebug $ "APNS response: " <> T.pack (show status) <> " " <> reason' result status reason' where diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 70b3532e1..c6b276283 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -38,7 +38,7 @@ import qualified Data.Map.Strict as M import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..), ServerQueueStatus (..)) -import Simplex.Messaging.Transport (trimCR) +import Simplex.Messaging.Transport.Buffer (trimCR) import System.Directory (doesFileExist) import System.IO diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index 0234c5146..2042e530e 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -56,9 +56,6 @@ module Simplex.Messaging.Transport transportErrorP, sendHandshake, getHandshake, - - -- * Trim trailing CR - trimCR, ) where @@ -86,6 +83,7 @@ import qualified Paths_simplexmq as SMQ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Parsers (dropPrefix, parse, parseRead1, sumTypeJSON) +import Simplex.Messaging.Transport.Buffer import Simplex.Messaging.Util (bshow, catchAll, catchAll_) import Simplex.Messaging.Version import Test.QuickCheck (Arbitrary (..)) @@ -152,8 +150,7 @@ data TLS = TLS { tlsContext :: T.Context, tlsPeer :: TransportPeer, tlsUniq :: ByteString, - buffer :: TVar ByteString, - getLock :: TMVar () + tlsBuffer :: TBuffer } connectTLS :: T.TLSParams p => Maybe HostName -> Bool -> p -> Socket -> IO T.Context @@ -169,9 +166,8 @@ getTLS :: TransportPeer -> T.Context -> IO TLS getTLS tlsPeer cxt = withTlsUnique tlsPeer cxt newTLS where newTLS tlsUniq = do - buffer <- newTVarIO "" - getLock <- newTMVarIO () - pure TLS {tlsContext = cxt, tlsPeer, tlsUniq, buffer, getLock} + tlsBuffer <- atomically newTBuffer + pure TLS {tlsContext = cxt, tlsPeer, tlsUniq, tlsBuffer} withTlsUnique :: TransportPeer -> T.Context -> (ByteString -> IO c) -> IO c withTlsUnique peer cxt f = @@ -209,52 +205,22 @@ instance Transport TLS where closeConnection tls = closeTLS $ tlsContext tls cGet :: TLS -> Int -> IO ByteString - cGet TLS {tlsContext, buffer, getLock} n = - E.bracket_ - (atomically $ takeTMVar getLock) - (atomically $ putTMVar getLock ()) - $ do - b <- readChunks =<< readTVarIO buffer - let (s, b') = B.splitAt n b - atomically $ writeTVar buffer $! b' - pure s - where - readChunks :: ByteString -> IO ByteString - readChunks b - | B.length b >= n = pure b - | otherwise = - T.recvData tlsContext >>= \case - -- https://hackage.haskell.org/package/tls-1.6.0/docs/Network-TLS.html#v:recvData - "" -> ioe_EOF - s -> readChunks $ b <> s + cGet TLS {tlsContext, tlsBuffer} n = do + s <- getBuffered tlsBuffer n (T.recvData tlsContext) + -- https://hackage.haskell.org/package/tls-1.6.0/docs/Network-TLS.html#v:recvData + if B.length s == n then pure s else ioe_EOF cPut :: TLS -> ByteString -> IO () cPut tls = T.sendData (tlsContext tls) . BL.fromStrict getLn :: TLS -> IO ByteString - getLn TLS {tlsContext, buffer, getLock} = do - E.bracket_ - (atomically $ takeTMVar getLock) - (atomically $ putTMVar getLock ()) - $ do - b <- readChunks =<< readTVarIO buffer - let (s, b') = B.break (== '\n') b - atomically $ writeTVar buffer $! B.drop 1 b' -- drop '\n' we made a break at - pure $ trimCR s + getLn TLS {tlsContext, tlsBuffer} = do + getLnBuffered tlsBuffer (T.recvData tlsContext) `E.catch` handleEOF where - readChunks :: ByteString -> IO ByteString - readChunks b - | B.elem '\n' b = pure b - | otherwise = readChunks . (b <>) =<< T.recvData tlsContext `E.catch` handleEOF handleEOF = \case T.Error_EOF -> E.throwIO TEBadBlock e -> E.throwIO e --- | Trim trailing CR from ByteString. -trimCR :: ByteString -> ByteString -trimCR "" = "" -trimCR s = if B.last s == '\r' then B.init s else s - -- * SMP transport -- | The handle for SMP encrypted transport connection over Transport . diff --git a/src/Simplex/Messaging/Transport/Buffer.hs b/src/Simplex/Messaging/Transport/Buffer.hs new file mode 100644 index 000000000..e789eeaac --- /dev/null +++ b/src/Simplex/Messaging/Transport/Buffer.hs @@ -0,0 +1,61 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} + +module Simplex.Messaging.Transport.Buffer where + +import Control.Concurrent.STM +import qualified Control.Exception as E +import Data.ByteString.Char8 (ByteString) +import qualified Data.ByteString.Char8 as B + +data TBuffer = TBuffer + { buffer :: TVar ByteString, + getLock :: TMVar () + } + +newTBuffer :: STM TBuffer +newTBuffer = do + buffer <- newTVar "" + getLock <- newTMVar () + pure TBuffer {buffer, getLock} + +withBufferLock :: TBuffer -> IO a -> IO a +withBufferLock TBuffer {getLock} = + E.bracket_ + (atomically $ takeTMVar getLock) + (atomically $ putTMVar getLock ()) + +getBuffered :: TBuffer -> Int -> IO ByteString -> IO ByteString +getBuffered tb@TBuffer {buffer} n getChunk = withBufferLock tb $ do + b <- readChunks =<< readTVarIO buffer + let (s, b') = B.splitAt n b + atomically $ writeTVar buffer $! b' + pure s + where + readChunks :: ByteString -> IO ByteString + readChunks b + | B.length b >= n = pure b + | otherwise = + getChunk >>= \case + "" -> pure b + s -> readChunks $ b <> s + +-- This function is only used in test and needs to be improved before it can be used in production, +-- it will never complete if TLS connection is closed before there is newline. +getLnBuffered :: TBuffer -> IO ByteString -> IO ByteString +getLnBuffered tb@TBuffer {buffer} getChunk = withBufferLock tb $ do + b <- readChunks =<< readTVarIO buffer + let (s, b') = B.break (== '\n') b + atomically $ writeTVar buffer $! B.drop 1 b' -- drop '\n' we made a break at + pure $ trimCR s + where + readChunks :: ByteString -> IO ByteString + readChunks b + | B.elem '\n' b = pure b + | otherwise = readChunks . (b <>) =<< getChunk + +-- | Trim trailing CR from ByteString. +trimCR :: ByteString -> ByteString +trimCR "" = "" +trimCR s = if B.last s == '\r' then B.init s else s diff --git a/src/Simplex/Messaging/Transport/HTTP2.hs b/src/Simplex/Messaging/Transport/HTTP2.hs index 750a5c87f..a2d9c4a86 100644 --- a/src/Simplex/Messaging/Transport/HTTP2.hs +++ b/src/Simplex/Messaging/Transport/HTTP2.hs @@ -1,20 +1,29 @@ +{-# LANGUAGE NamedFieldPuns #-} + module Simplex.Messaging.Transport.HTTP2 where +import Control.Concurrent.STM import qualified Control.Exception as E +import Data.ByteString.Char8 (ByteString) +import qualified Data.ByteString.Char8 as B import Data.Default (def) +import Data.Maybe (fromMaybe) import Foreign (mallocBytes) import Network.HPACK (BufferSize) import Network.HTTP2.Client (Config (..), defaultPositionReadMaker, freeSimpleConfig) +import qualified Network.HTTP2.Client as HC +import qualified Network.HTTP2.Server as HS import qualified Network.TLS as T import qualified Network.TLS.Extra as TE -import Simplex.Messaging.Transport (TLS, Transport (cGet, cPut)) +import Simplex.Messaging.Transport (SessionId, TLS (tlsUniq), Transport (cGet, cPut)) +import Simplex.Messaging.Transport.Buffer import qualified System.TimeManager as TI -withTlsConfig :: TLS -> BufferSize -> (Config -> IO ()) -> IO () -withTlsConfig c sz = E.bracket (allocTlsConfig c sz) freeSimpleConfig +withHTTP2 :: BufferSize -> (Config -> SessionId -> IO ()) -> TLS -> IO () +withHTTP2 sz run c = E.bracket (allocHTTP2Config c sz) freeSimpleConfig (`run` tlsUniq c) -allocTlsConfig :: TLS -> BufferSize -> IO Config -allocTlsConfig c sz = do +allocHTTP2Config :: TLS -> BufferSize -> IO Config +allocHTTP2Config c sz = do buf <- mallocBytes sz tm <- TI.initialize $ 30 * 1000000 pure @@ -34,3 +43,35 @@ http2TLSParams = T.supportedCiphers = TE.ciphersuite_strong_det, T.supportedSecureRenegotiation = False } + +data HTTP2Body = HTTP2Body + { bodyHead :: ByteString, + bodySize :: Int, + bodyPart :: Maybe (Int -> IO ByteString), + bodyBuffer :: TBuffer + } + +class HTTP2BodyChunk a where + getBodyChunk :: a -> IO ByteString + getBodeSize :: a -> Maybe Int + +instance HTTP2BodyChunk HC.Response where + getBodyChunk = HC.getResponseBodyChunk + {-# INLINE getBodyChunk #-} + getBodeSize = HC.responseBodySize + {-# INLINE getBodeSize #-} + +instance HTTP2BodyChunk HS.Request where + getBodyChunk = HS.getRequestBodyChunk + {-# INLINE getBodyChunk #-} + getBodeSize = HS.requestBodySize + {-# INLINE getBodeSize #-} + +getHTTP2Body :: HTTP2BodyChunk a => a -> Int -> IO HTTP2Body +getHTTP2Body r n = do + bodyBuffer <- atomically newTBuffer + let getPart n' = getBuffered bodyBuffer n' $ getBodyChunk r + bodyHead <- getPart n + let bodySize = fromMaybe 0 $ getBodeSize r + bodyPart = if bodySize > n && B.length bodyHead == n then Just getPart else Nothing + pure HTTP2Body {bodyHead, bodySize, bodyPart, bodyBuffer} diff --git a/src/Simplex/Messaging/Transport/HTTP2/Client.hs b/src/Simplex/Messaging/Transport/HTTP2/Client.hs index d0efe2060..8de44cadc 100644 --- a/src/Simplex/Messaging/Transport/HTTP2/Client.hs +++ b/src/Simplex/Messaging/Transport/HTTP2/Client.hs @@ -10,24 +10,32 @@ import Control.Exception (IOException) import qualified Control.Exception as E import Control.Monad.Except import Data.ByteString.Char8 (ByteString) -import qualified Data.ByteString.Char8 as B -import Data.Maybe (isNothing) +import Data.Time (UTCTime, getCurrentTime) import qualified Data.X509.CertificateStore as XS -import Network.HPACK (HeaderTable) +import Network.HPACK (BufferSize) import Network.HTTP2.Client (ClientConfig (..), Request, Response) import qualified Network.HTTP2.Client as H import Network.Socket (HostName, ServiceName) import qualified Network.TLS as T import Numeric.Natural (Natural) +import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Encoding.String +import Simplex.Messaging.Transport (SessionId) import Simplex.Messaging.Transport.Client (TransportClientConfig (..), TransportHost (..), runTLSTransportClient) -import Simplex.Messaging.Transport.HTTP2 (http2TLSParams, withTlsConfig) +import Simplex.Messaging.Transport.HTTP2 (HTTP2Body, getHTTP2Body, http2TLSParams, withHTTP2) import UnliftIO.STM import UnliftIO.Timeout data HTTP2Client = HTTP2Client { action :: Maybe (Async ()), - connected :: TVar Bool, - host :: HostName, + sessionId :: SessionId, + sessionTs :: UTCTime, + client_ :: HClient + } + +data HClient = HClient + { connected :: TVar Bool, + host :: TransportHost, port :: ServiceName, config :: HTTP2ClientConfig, reqQ :: TBQueue (Request, TMVar HTTP2Response) @@ -35,15 +43,15 @@ data HTTP2Client = HTTP2Client data HTTP2Response = HTTP2Response { response :: Response, - respBody :: ByteString, - respTrailers :: Maybe HeaderTable + respBody :: HTTP2Body } data HTTP2ClientConfig = HTTP2ClientConfig { qSize :: Natural, connTimeout :: Int, transportConfig :: TransportClientConfig, - caStoreFile :: FilePath, + bufferSize :: BufferSize, + bodyHeadSize :: Int, suportedTLSParams :: T.Supported } deriving (Show) @@ -54,73 +62,69 @@ defaultHTTP2ClientConfig = { qSize = 64, connTimeout = 10000000, transportConfig = TransportClientConfig Nothing Nothing True, - caStoreFile = "/etc/ssl/cert.pem", + bufferSize = 32768, + bodyHeadSize = 16384, suportedTLSParams = http2TLSParams } -data HTTP2ClientError = HCResponseTimeout | HCNetworkError | HCNetworkError1 | HCIOError IOException +data HTTP2ClientError = HCResponseTimeout | HCNetworkError | HCIOError IOException deriving (Show) -getHTTP2Client :: HostName -> ServiceName -> HTTP2ClientConfig -> IO () -> IO (Either HTTP2ClientError HTTP2Client) -getHTTP2Client host port config@HTTP2ClientConfig {transportConfig, connTimeout, caStoreFile, suportedTLSParams} disconnected = +getHTTP2Client :: HostName -> ServiceName -> Maybe XS.CertificateStore -> HTTP2ClientConfig -> IO () -> IO (Either HTTP2ClientError HTTP2Client) +getHTTP2Client host port = getVerifiedHTTP2Client Nothing (THDomainName host) port Nothing + +getVerifiedHTTP2Client :: Maybe ByteString -> TransportHost -> ServiceName -> Maybe C.KeyHash -> Maybe XS.CertificateStore -> HTTP2ClientConfig -> IO () -> IO (Either HTTP2ClientError HTTP2Client) +getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2ClientConfig {transportConfig, bufferSize, bodyHeadSize, connTimeout, suportedTLSParams} disconnected = (atomically mkHTTPS2Client >>= runClient) `E.catch` \(e :: IOException) -> pure . Left $ HCIOError e where - mkHTTPS2Client :: STM HTTP2Client + mkHTTPS2Client :: STM HClient mkHTTPS2Client = do connected <- newTVar False reqQ <- newTBQueue $ qSize config - pure HTTP2Client {action = Nothing, connected, host, port, config, reqQ} + pure HClient {connected, host, port, config, reqQ} - runClient :: HTTP2Client -> IO (Either HTTP2ClientError HTTP2Client) + runClient :: HClient -> IO (Either HTTP2ClientError HTTP2Client) runClient c = do cVar <- newEmptyTMVarIO - caStore <- XS.readCertificateStore caStoreFile - when (isNothing caStore) . putStrLn $ "Error loading CertificateStore from " <> caStoreFile action <- async $ - runHTTP2Client suportedTLSParams caStore transportConfig host port (client c cVar) + runHTTP2Client suportedTLSParams caStore transportConfig bufferSize proxyUsername host port keyHash (client c cVar) `E.finally` atomically (putTMVar cVar $ Left HCNetworkError) - conn_ <- connTimeout `timeout` atomically (takeTMVar cVar) - pure $ case conn_ of - Just (Right ()) -> Right c {action = Just action} + c_ <- connTimeout `timeout` atomically (takeTMVar cVar) + pure $ case c_ of + Just (Right c') -> Right c' {action = Just action} Just (Left e) -> Left e - Nothing -> Left HCNetworkError1 + Nothing -> Left HCNetworkError - client :: HTTP2Client -> TMVar (Either HTTP2ClientError ()) -> (Request -> (Response -> IO ()) -> IO ()) -> IO () - client c cVar sendReq = do + client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client () + client c cVar sessionId sendReq = do + sessionTs <- getCurrentTime + let c' = HTTP2Client {action = Nothing, client_ = c, sessionId, sessionTs} atomically $ do writeTVar (connected c) True - putTMVar cVar $ Right () - process c sendReq `E.finally` disconnected + putTMVar cVar (Right c') + process c' sendReq `E.finally` disconnected - process :: HTTP2Client -> (Request -> (Response -> IO ()) -> IO ()) -> IO () - process HTTP2Client {reqQ} sendReq = forever $ do + process :: HTTP2Client -> H.Client () + process HTTP2Client {client_ = HClient {reqQ}} sendReq = forever $ do (req, respVar) <- atomically $ readTBQueue reqQ sendReq req $ \r -> do - let writeResp respBody respTrailers = atomically $ putTMVar respVar HTTP2Response {response = r, respBody, respTrailers} - respBody <- getResponseBody r "" - respTrailers <- H.getResponseTrailers r - writeResp respBody respTrailers - - getResponseBody :: Response -> ByteString -> IO ByteString - getResponseBody r s = - H.getResponseBodyChunk r >>= \chunk -> - if B.null chunk then pure s else getResponseBody r $ s <> chunk + respBody <- getHTTP2Body r bodyHeadSize + atomically $ putTMVar respVar HTTP2Response {response = r, respBody} -- | Disconnects client from the server and terminates client threads. closeHTTP2Client :: HTTP2Client -> IO () closeHTTP2Client = mapM_ uninterruptibleCancel . action sendRequest :: HTTP2Client -> Request -> IO (Either HTTP2ClientError HTTP2Response) -sendRequest HTTP2Client {reqQ, config} req = do +sendRequest HTTP2Client {client_ = HClient {config, reqQ}} req = do resp <- newEmptyTMVarIO atomically $ writeTBQueue reqQ (req, resp) maybe (Left HCResponseTimeout) Right <$> (connTimeout config `timeout` atomically (takeTMVar resp)) -runHTTP2Client :: T.Supported -> Maybe XS.CertificateStore -> TransportClientConfig -> HostName -> ServiceName -> ((Request -> (Response -> IO ()) -> IO ()) -> IO ()) -> IO () -runHTTP2Client tlsParams caStore tcConfig host port client = - runTLSTransportClient tlsParams caStore tcConfig Nothing (THDomainName host) port Nothing $ \c -> - withTlsConfig c 16384 (`run` client) +runHTTP2Client :: T.Supported -> Maybe XS.CertificateStore -> TransportClientConfig -> BufferSize -> Maybe ByteString -> TransportHost -> ServiceName -> Maybe C.KeyHash -> (SessionId -> H.Client ()) -> IO () +runHTTP2Client tlsParams caStore tcConfig bufferSize proxyUsername host port keyHash client = + runTLSTransportClient tlsParams caStore tcConfig proxyUsername host port keyHash $ withHTTP2 bufferSize run where - run = H.run $ ClientConfig "https" (B.pack host) 20 + run cfg = H.run (ClientConfig "https" (strEncode host) 20) cfg . client diff --git a/src/Simplex/Messaging/Transport/HTTP2/Server.hs b/src/Simplex/Messaging/Transport/HTTP2/Server.hs index 11a7cdd3b..54cf85bf5 100644 --- a/src/Simplex/Messaging/Transport/HTTP2/Server.hs +++ b/src/Simplex/Messaging/Transport/HTTP2/Server.hs @@ -6,22 +6,23 @@ module Simplex.Messaging.Transport.HTTP2.Server where import Control.Concurrent.Async (Async, async, uninterruptibleCancel) import Control.Concurrent.STM import Control.Monad -import Data.ByteString (ByteString) -import qualified Data.ByteString.Char8 as B -import Network.HPACK (HeaderTable) -import Network.HTTP2.Server (Aux, PushPromise, Request, Response) +import Network.HPACK (BufferSize) +import Network.HTTP2.Server (Request, Response) import qualified Network.HTTP2.Server as H import Network.Socket import qualified Network.TLS as T import Numeric.Natural (Natural) -import Simplex.Messaging.Transport.HTTP2 (withTlsConfig) +import Simplex.Messaging.Transport (SessionId) +import Simplex.Messaging.Transport.HTTP2 (HTTP2Body, getHTTP2Body, withHTTP2) import Simplex.Messaging.Transport.Server (loadSupportedTLSServerParams, runTransportServer) -type HTTP2ServerFunc = (Request -> (Response -> IO ()) -> IO ()) +type HTTP2ServerFunc = SessionId -> Request -> (Response -> IO ()) -> IO () data HTTP2ServerConfig = HTTP2ServerConfig { qSize :: Natural, http2Port :: ServiceName, + bufferSize :: BufferSize, + bodyHeadSize :: Int, serverSupported :: T.Supported, caCertificateFile :: FilePath, privateKeyFile :: FilePath, @@ -31,9 +32,9 @@ data HTTP2ServerConfig = HTTP2ServerConfig deriving (Show) data HTTP2Request = HTTP2Request - { request :: Request, - reqBody :: ByteString, - reqTrailers :: Maybe HeaderTable, + { sessionId :: SessionId, + request :: Request, + reqBody :: HTTP2Body, sendResponse :: Response -> IO () } @@ -43,29 +44,22 @@ data HTTP2Server = HTTP2Server } getHTTP2Server :: HTTP2ServerConfig -> IO HTTP2Server -getHTTP2Server HTTP2ServerConfig {qSize, http2Port, serverSupported, caCertificateFile, certificateFile, privateKeyFile, logTLSErrors} = do +getHTTP2Server HTTP2ServerConfig {qSize, http2Port, bufferSize, bodyHeadSize, serverSupported, caCertificateFile, certificateFile, privateKeyFile, logTLSErrors} = do tlsServerParams <- loadSupportedTLSServerParams serverSupported caCertificateFile certificateFile privateKeyFile started <- newEmptyTMVarIO reqQ <- newTBQueueIO qSize action <- async $ - runHTTP2Server started http2Port tlsServerParams logTLSErrors $ \r sendResponse -> do - reqBody <- getRequestBody r "" - reqTrailers <- H.getRequestTrailers r - atomically $ writeTBQueue reqQ HTTP2Request {request = r, reqBody, reqTrailers, sendResponse} + runHTTP2Server started http2Port bufferSize tlsServerParams logTLSErrors $ \sessionId r sendResponse -> do + reqBody <- getHTTP2Body r bodyHeadSize + atomically $ writeTBQueue reqQ HTTP2Request {sessionId, request = r, reqBody, sendResponse} void . atomically $ takeTMVar started pure HTTP2Server {action, reqQ} - where - getRequestBody :: Request -> ByteString -> IO ByteString - getRequestBody r s = - H.getRequestBodyChunk r >>= \chunk -> - if B.null chunk then pure s else getRequestBody r $ s <> chunk closeHTTP2Server :: HTTP2Server -> IO () closeHTTP2Server = uninterruptibleCancel . action -runHTTP2Server :: TMVar Bool -> ServiceName -> T.ServerParams -> Bool -> HTTP2ServerFunc -> IO () -runHTTP2Server started port serverParams logTLSErrors http2Server = - runTransportServer started port serverParams logTLSErrors $ \c -> withTlsConfig c 16384 (`H.run` server) +runHTTP2Server :: TMVar Bool -> ServiceName -> BufferSize -> T.ServerParams -> Bool -> HTTP2ServerFunc -> IO () +runHTTP2Server started port bufferSize serverParams logTLSErrors http2Server = + runTransportServer started port serverParams logTLSErrors $ withHTTP2 bufferSize run where - server :: Request -> Aux -> (Response -> [PushPromise] -> IO ()) -> IO () - server req _aux sendResp = http2Server req (`sendResp` []) + run cfg sessId = H.run cfg $ \req _aux sendResp -> http2Server sessId req (`sendResp` []) diff --git a/src/Simplex/Messaging/Transport/WebSockets.hs b/src/Simplex/Messaging/Transport/WebSockets.hs index 69e137c7e..c6b1a2610 100644 --- a/src/Simplex/Messaging/Transport/WebSockets.hs +++ b/src/Simplex/Messaging/Transport/WebSockets.hs @@ -19,9 +19,9 @@ import Simplex.Messaging.Transport TransportPeer (..), closeTLS, smpBlockSize, - trimCR, withTlsUnique, ) +import Simplex.Messaging.Transport.Buffer (trimCR) data WS = WS { wsPeer :: TransportPeer, diff --git a/tests/NtfClient.hs b/tests/NtfClient.hs index 7c6c9fb2f..d3c10c80e 100644 --- a/tests/NtfClient.hs +++ b/tests/NtfClient.hs @@ -41,8 +41,7 @@ import Simplex.Messaging.Notifications.Transport import Simplex.Messaging.Protocol import Simplex.Messaging.Transport import Simplex.Messaging.Transport.Client -import Simplex.Messaging.Transport.HTTP2 (http2TLSParams) -import Simplex.Messaging.Transport.HTTP2.Client +import Simplex.Messaging.Transport.HTTP2 (HTTP2Body (..), http2TLSParams) import Simplex.Messaging.Transport.HTTP2.Server import Test.Hspec import UnliftIO.Async @@ -87,7 +86,7 @@ ntfServerCfg = apnsConfig = defaultAPNSPushClientConfig { apnsPort = apnsTestPort, - http2cfg = defaultHTTP2ClientConfig {caStoreFile = "tests/fixtures/ca.crt"} + caStoreFile = "tests/fixtures/ca.crt" }, inactiveClientExpiration = Just defaultInactiveClientExpiration, storeLogFile = Nothing, @@ -175,6 +174,8 @@ apnsMockServerConfig = HTTP2ServerConfig { qSize = 1, http2Port = apnsTestPort, + bufferSize = 16384, + bodyHeadSize = 16384, serverSupported = http2TLSParams, caCertificateFile = "tests/fixtures/ca.crt", privateKeyFile = "tests/fixtures/server.key", @@ -210,16 +211,16 @@ getAPNSMockServer config@HTTP2ServerConfig {qSize} = do pure APNSMockServer {action, apnsQ, http2Server} where runAPNSMockServer apnsQ HTTP2Server {reqQ} = forever $ do - HTTP2Request {reqBody, sendResponse} <- atomically $ readTBQueue reqQ + HTTP2Request {reqBody = HTTP2Body {bodyHead}, sendResponse} <- atomically $ readTBQueue reqQ let sendApnsResponse = \case APNSRespOk -> sendResponse $ H.responseNoBody N.ok200 [] APNSRespError status reason -> sendResponse . H.responseBuilder status [] . lazyByteString $ J.encode APNSErrorResponse {reason} - case J.decodeStrict' reqBody of + case J.decodeStrict' bodyHead of Just notification -> atomically $ writeTBQueue apnsQ APNSMockRequest {notification, sendApnsResponse} _ -> do - putStrLn $ "runAPNSMockServer J.decodeStrict' error, reqBody: " <> show reqBody + putStrLn $ "runAPNSMockServer J.decodeStrict' error, reqBody: " <> show bodyHead sendApnsResponse $ APNSRespError N.badRequest400 "bad_request_body" closeAPNSMockServer :: APNSMockServer -> IO ()