diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 400e58c14..9b68c4172 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -592,7 +592,7 @@ protocolClientError protocolError_ host = \case PCEIncompatibleHost -> BROKER host HOST PCETransportError e -> BROKER host $ TRANSPORT e e@PCESignatureError {} -> INTERNAL $ show e - e@PCEIOError {} -> INTERNAL $ show e + PCEIOError {} -> BROKER host NETWORK data SMPTestStep = TSConnect | TSCreateQueue | TSSecureQueue | TSDeleteQueue | TSDisconnect deriving (Eq, Show, Generic) @@ -695,6 +695,7 @@ temporaryClientError :: ProtocolClientError -> Bool temporaryClientError = \case PCENetworkError -> True PCEResponseTimeout -> True + PCEIOError _ -> True _ -> False temporaryAgentError :: AgentErrorType -> Bool diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 3072b0849..1af672873 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -62,7 +62,9 @@ module Simplex.Messaging.Client defaultNetworkConfig, transportClientConfig, chooseTransportHost, + proxyUsername, ServerTransmission, + ClientCommand, ) where @@ -317,9 +319,6 @@ getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, Just (Left e) -> Left e Nothing -> Left PCENetworkError - proxyUsername :: TransportSession msg -> ByteString - proxyUsername (userId, _, entityId_) = C.sha256Hash $ bshow userId <> maybe "" (":" <>) entityId_ - useTransport :: (ServiceName, ATransport) useTransport = case port srv of "" -> defaultTransport cfg @@ -382,6 +381,9 @@ getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, Right msg -> atomically $ mapM_ (`writeTBQueue` serverTransmission c qId msg) msgQ Left e -> putStrLn $ "SMP client error: " <> show e +proxyUsername :: TransportSession msg -> ByteString +proxyUsername (userId, _, entityId_) = C.sha256Hash $ bshow userId <> maybe "" (":" <>) entityId_ + -- | Disconnects client from the server and terminates client threads. closeProtocolClient :: ProtocolClient msg -> IO () closeProtocolClient = mapM_ uninterruptibleCancel . action diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index ce3a0972b..bed8ea433 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -319,7 +319,7 @@ clientDisconnected NtfServerClient {connected} = atomically $ writeTVar connecte receive :: Transport c => THandle c -> NtfServerClient -> M () receive th NtfServerClient {rcvQ, sndQ, activeAt} = forever $ do - ts <- tGet th + ts <- liftIO $ tGet th forM_ ts $ \t@(_, _, (corrId, entId, cmdOrError)) -> do atomically . writeTVar activeAt =<< liftIO getSystemTime logDebug "received transmission" diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 9ac6e3ebc..61ec2d906 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -126,6 +126,10 @@ module Simplex.Messaging.Protocol -- * TCP transport functions tPut, tGet, + tParse, + tDecodeParseValidate, + tEncode, + tEncodeBatch, -- * exports for tests CommandTag (..), @@ -1177,7 +1181,7 @@ tPut th trs tPutBatch :: [Either TransportError ()] -> NonEmpty ByteString -> IO [Either TransportError ()] tPutBatch rs ts = do let (n, s, ts_) = encodeBatch 0 "" ts - r <- if n == 0 then largeMsg else replicate n <$> tPutLog (lenEncode n `B.cons` s) + r <- if n == 0 then largeMsg else replicate n <$> tPutLog (tEncodeBatch n s) let rs' = rs <> r case ts_ of Just ts' -> tPutBatch rs' ts' @@ -1200,7 +1204,12 @@ tPut th trs else case L.nonEmpty ts_ of Just ts' -> encodeBatch n' s' ts' _ -> (n', s', Nothing) - tEncode (sig, tr) = smpEncode (C.signatureBytes sig) <> tr + +tEncode :: C.CryptoSignature s => (s, ByteString) -> ByteString +tEncode (sig, t) = smpEncode (C.signatureBytes sig) <> t + +tEncodeBatch :: Int -> ByteString -> ByteString +tEncodeBatch n s = lenEncode n `B.cons` s encodeTransmission :: ProtocolEncoding c => Version -> ByteString -> Transmission c -> ByteString encodeTransmission v sessionId (CorrId corrId, queueId, command) = @@ -1208,33 +1217,36 @@ encodeTransmission v sessionId (CorrId corrId, queueId, command) = -- | Receive and parse transmission from the TCP transport (ignoring any trailing padding). tGetParse :: Transport c => THandle c -> IO (NonEmpty (Either TransportError RawTransmission)) -tGetParse th - | batch th = either ((:| []) . Left) id <$> runExceptT getBatch - | otherwise = (:| []) . (parse transmissionP TEBadBlock =<<) <$> tGetBlock th +tGetParse th = eitherList (tParse $ batch th) <$> tGetBlock th + +tParse :: Bool -> ByteString -> NonEmpty (Either TransportError RawTransmission) +tParse batch s + | batch = eitherList (L.map (\(Large t) -> tParse1 t)) ts + | otherwise = [tParse1 s] where - getBatch :: ExceptT TransportError IO (NonEmpty (Either TransportError RawTransmission)) - getBatch = do - s <- ExceptT $ tGetBlock th - ts <- liftEither $ parse smpP TEBadBlock s - pure $ L.map (\(Large t) -> parse transmissionP TEBadBlock t) ts + tParse1 = parse transmissionP TEBadBlock + ts = parse smpP TEBadBlock s + +eitherList :: (a -> NonEmpty (Either e b)) -> Either e a -> NonEmpty (Either e b) +eitherList = either (\e -> [Left e]) -- | Receive client and server transmissions (determined by `cmd` type). -tGet :: forall cmd c m. (ProtocolEncoding cmd, Transport c, MonadIO m) => THandle c -> m (NonEmpty (SignedTransmission cmd)) -tGet th@THandle {sessionId, thVersion = v} = liftIO (tGetParse th) >>= mapM decodeParseValidate +tGet :: forall cmd c. (ProtocolEncoding cmd, Transport c) => THandle c -> IO (NonEmpty (SignedTransmission cmd)) +tGet th@THandle {sessionId, thVersion = v} = L.map (tDecodeParseValidate sessionId v) <$> tGetParse th + +tDecodeParseValidate :: forall cmd. ProtocolEncoding cmd => SessionId -> Version -> Either TransportError RawTransmission -> SignedTransmission cmd +tDecodeParseValidate sessionId v = \case + Right RawTransmission {signature, signed, sessId, corrId, entityId, command} + | sessId == sessionId -> + let decodedTransmission = (,corrId,entityId,command) <$> C.decodeSignature signature + in either (const $ tError corrId) (tParseValidate signed) decodedTransmission + | otherwise -> (Nothing, "", (CorrId corrId, "", Left SESSION)) + Left _ -> tError "" where - decodeParseValidate :: Either TransportError RawTransmission -> m (SignedTransmission cmd) - decodeParseValidate = \case - Right RawTransmission {signature, signed, sessId, corrId, entityId, command} - | sessId == sessionId -> - let decodedTransmission = (,corrId,entityId,command) <$> C.decodeSignature signature - in either (const $ tError corrId) (tParseValidate signed) decodedTransmission - | otherwise -> pure (Nothing, "", (CorrId corrId, "", Left SESSION)) - Left _ -> tError "" + tError :: ByteString -> SignedTransmission cmd + tError corrId = (Nothing, "", (CorrId corrId, "", Left BLOCK)) - tError :: ByteString -> m (SignedTransmission cmd) - tError corrId = pure (Nothing, "", (CorrId corrId, "", Left BLOCK)) - - tParseValidate :: ByteString -> SignedRawTransmission -> m (SignedTransmission cmd) - tParseValidate signed t@(sig, corrId, entityId, command) = do + tParseValidate :: ByteString -> SignedRawTransmission -> SignedTransmission cmd + tParseValidate signed t@(sig, corrId, entityId, command) = let cmd = parseProtocol v command >>= checkCredentials t - pure (sig, signed, (CorrId corrId, entityId, cmd)) + in (sig, signed, (CorrId corrId, entityId, cmd)) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index e5cb7f919..e66da37be 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -260,7 +260,7 @@ cancelSub sub = receive :: Transport c => THandle c -> Client -> M () receive th Client {rcvQ, sndQ, activeAt} = forever $ do - ts <- L.toList <$> tGet th + ts <- L.toList <$> liftIO (tGet th) atomically . writeTVar activeAt =<< liftIO getSystemTime as <- partitionEithers <$> mapM cmdAction ts write sndQ $ fst as diff --git a/src/Simplex/Messaging/Transport/HTTP2.hs b/src/Simplex/Messaging/Transport/HTTP2.hs index a2d9c4a86..dc023f920 100644 --- a/src/Simplex/Messaging/Transport/HTTP2.hs +++ b/src/Simplex/Messaging/Transport/HTTP2.hs @@ -19,6 +19,9 @@ import Simplex.Messaging.Transport (SessionId, TLS (tlsUniq), Transport (cGet, c import Simplex.Messaging.Transport.Buffer import qualified System.TimeManager as TI +defaultHTTP2BufferSize :: BufferSize +defaultHTTP2BufferSize = 32768 + withHTTP2 :: BufferSize -> (Config -> SessionId -> IO ()) -> TLS -> IO () withHTTP2 sz run c = E.bracket (allocHTTP2Config c sz) freeSimpleConfig (`run` tlsUniq c) diff --git a/src/Simplex/Messaging/Transport/HTTP2/Client.hs b/src/Simplex/Messaging/Transport/HTTP2/Client.hs index 8de44cadc..c8d74dbc5 100644 --- a/src/Simplex/Messaging/Transport/HTTP2/Client.hs +++ b/src/Simplex/Messaging/Transport/HTTP2/Client.hs @@ -22,7 +22,7 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String import Simplex.Messaging.Transport (SessionId) import Simplex.Messaging.Transport.Client (TransportClientConfig (..), TransportHost (..), runTLSTransportClient) -import Simplex.Messaging.Transport.HTTP2 (HTTP2Body, getHTTP2Body, http2TLSParams, withHTTP2) +import Simplex.Messaging.Transport.HTTP2 import UnliftIO.STM import UnliftIO.Timeout @@ -62,7 +62,7 @@ defaultHTTP2ClientConfig = { qSize = 64, connTimeout = 10000000, transportConfig = TransportClientConfig Nothing Nothing True, - bufferSize = 32768, + bufferSize = defaultHTTP2BufferSize, bodyHeadSize = 16384, suportedTLSParams = http2TLSParams } diff --git a/src/Simplex/Messaging/Transport/HTTP2/Server.hs b/src/Simplex/Messaging/Transport/HTTP2/Server.hs index 54cf85bf5..9a305ed8a 100644 --- a/src/Simplex/Messaging/Transport/HTTP2/Server.hs +++ b/src/Simplex/Messaging/Transport/HTTP2/Server.hs @@ -13,7 +13,7 @@ import Network.Socket import qualified Network.TLS as T import Numeric.Natural (Natural) import Simplex.Messaging.Transport (SessionId) -import Simplex.Messaging.Transport.HTTP2 (HTTP2Body, getHTTP2Body, withHTTP2) +import Simplex.Messaging.Transport.HTTP2 import Simplex.Messaging.Transport.Server (loadSupportedTLSServerParams, runTransportServer) type HTTP2ServerFunc = SessionId -> Request -> (Response -> IO ()) -> IO () @@ -43,6 +43,7 @@ data HTTP2Server = HTTP2Server reqQ :: TBQueue HTTP2Request } +-- This server is for testing only, it processes all requests in a single queue. getHTTP2Server :: HTTP2ServerConfig -> IO HTTP2Server getHTTP2Server HTTP2ServerConfig {qSize, http2Port, bufferSize, bodyHeadSize, serverSupported, caCertificateFile, certificateFile, privateKeyFile, logTLSErrors} = do tlsServerParams <- loadSupportedTLSServerParams serverSupported caCertificateFile certificateFile privateKeyFile diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 5a83e5f1e..884e88a29 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -91,7 +91,7 @@ tPut1 h t = do tGet1 :: (ProtocolEncoding cmd, Transport c, MonadIO m, MonadFail m) => THandle c -> m (SignedTransmission cmd) tGet1 h = do - [r] <- tGet h + [r] <- liftIO $ tGet h pure r (#==) :: (HasCallStack, Eq a, Show a) => (a, a) -> String -> Assertion