mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-31 16:15:57 +00:00
Merge branch 'master' into xftp
This commit is contained in:
@@ -592,7 +592,7 @@ protocolClientError protocolError_ host = \case
|
||||
PCEIncompatibleHost -> BROKER host HOST
|
||||
PCETransportError e -> BROKER host $ TRANSPORT e
|
||||
e@PCESignatureError {} -> INTERNAL $ show e
|
||||
e@PCEIOError {} -> INTERNAL $ show e
|
||||
PCEIOError {} -> BROKER host NETWORK
|
||||
|
||||
data SMPTestStep = TSConnect | TSCreateQueue | TSSecureQueue | TSDeleteQueue | TSDisconnect
|
||||
deriving (Eq, Show, Generic)
|
||||
@@ -695,6 +695,7 @@ temporaryClientError :: ProtocolClientError -> Bool
|
||||
temporaryClientError = \case
|
||||
PCENetworkError -> True
|
||||
PCEResponseTimeout -> True
|
||||
PCEIOError _ -> True
|
||||
_ -> False
|
||||
|
||||
temporaryAgentError :: AgentErrorType -> Bool
|
||||
|
||||
@@ -62,7 +62,9 @@ module Simplex.Messaging.Client
|
||||
defaultNetworkConfig,
|
||||
transportClientConfig,
|
||||
chooseTransportHost,
|
||||
proxyUsername,
|
||||
ServerTransmission,
|
||||
ClientCommand,
|
||||
)
|
||||
where
|
||||
|
||||
@@ -317,9 +319,6 @@ getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize,
|
||||
Just (Left e) -> Left e
|
||||
Nothing -> Left PCENetworkError
|
||||
|
||||
proxyUsername :: TransportSession msg -> ByteString
|
||||
proxyUsername (userId, _, entityId_) = C.sha256Hash $ bshow userId <> maybe "" (":" <>) entityId_
|
||||
|
||||
useTransport :: (ServiceName, ATransport)
|
||||
useTransport = case port srv of
|
||||
"" -> defaultTransport cfg
|
||||
@@ -382,6 +381,9 @@ getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize,
|
||||
Right msg -> atomically $ mapM_ (`writeTBQueue` serverTransmission c qId msg) msgQ
|
||||
Left e -> putStrLn $ "SMP client error: " <> show e
|
||||
|
||||
proxyUsername :: TransportSession msg -> ByteString
|
||||
proxyUsername (userId, _, entityId_) = C.sha256Hash $ bshow userId <> maybe "" (":" <>) entityId_
|
||||
|
||||
-- | Disconnects client from the server and terminates client threads.
|
||||
closeProtocolClient :: ProtocolClient msg -> IO ()
|
||||
closeProtocolClient = mapM_ uninterruptibleCancel . action
|
||||
|
||||
@@ -319,7 +319,7 @@ clientDisconnected NtfServerClient {connected} = atomically $ writeTVar connecte
|
||||
|
||||
receive :: Transport c => THandle c -> NtfServerClient -> M ()
|
||||
receive th NtfServerClient {rcvQ, sndQ, activeAt} = forever $ do
|
||||
ts <- tGet th
|
||||
ts <- liftIO $ tGet th
|
||||
forM_ ts $ \t@(_, _, (corrId, entId, cmdOrError)) -> do
|
||||
atomically . writeTVar activeAt =<< liftIO getSystemTime
|
||||
logDebug "received transmission"
|
||||
|
||||
@@ -126,6 +126,10 @@ module Simplex.Messaging.Protocol
|
||||
-- * TCP transport functions
|
||||
tPut,
|
||||
tGet,
|
||||
tParse,
|
||||
tDecodeParseValidate,
|
||||
tEncode,
|
||||
tEncodeBatch,
|
||||
|
||||
-- * exports for tests
|
||||
CommandTag (..),
|
||||
@@ -1177,7 +1181,7 @@ tPut th trs
|
||||
tPutBatch :: [Either TransportError ()] -> NonEmpty ByteString -> IO [Either TransportError ()]
|
||||
tPutBatch rs ts = do
|
||||
let (n, s, ts_) = encodeBatch 0 "" ts
|
||||
r <- if n == 0 then largeMsg else replicate n <$> tPutLog (lenEncode n `B.cons` s)
|
||||
r <- if n == 0 then largeMsg else replicate n <$> tPutLog (tEncodeBatch n s)
|
||||
let rs' = rs <> r
|
||||
case ts_ of
|
||||
Just ts' -> tPutBatch rs' ts'
|
||||
@@ -1200,7 +1204,12 @@ tPut th trs
|
||||
else case L.nonEmpty ts_ of
|
||||
Just ts' -> encodeBatch n' s' ts'
|
||||
_ -> (n', s', Nothing)
|
||||
tEncode (sig, tr) = smpEncode (C.signatureBytes sig) <> tr
|
||||
|
||||
tEncode :: C.CryptoSignature s => (s, ByteString) -> ByteString
|
||||
tEncode (sig, t) = smpEncode (C.signatureBytes sig) <> t
|
||||
|
||||
tEncodeBatch :: Int -> ByteString -> ByteString
|
||||
tEncodeBatch n s = lenEncode n `B.cons` s
|
||||
|
||||
encodeTransmission :: ProtocolEncoding c => Version -> ByteString -> Transmission c -> ByteString
|
||||
encodeTransmission v sessionId (CorrId corrId, queueId, command) =
|
||||
@@ -1208,33 +1217,36 @@ encodeTransmission v sessionId (CorrId corrId, queueId, command) =
|
||||
|
||||
-- | Receive and parse transmission from the TCP transport (ignoring any trailing padding).
|
||||
tGetParse :: Transport c => THandle c -> IO (NonEmpty (Either TransportError RawTransmission))
|
||||
tGetParse th
|
||||
| batch th = either ((:| []) . Left) id <$> runExceptT getBatch
|
||||
| otherwise = (:| []) . (parse transmissionP TEBadBlock =<<) <$> tGetBlock th
|
||||
tGetParse th = eitherList (tParse $ batch th) <$> tGetBlock th
|
||||
|
||||
tParse :: Bool -> ByteString -> NonEmpty (Either TransportError RawTransmission)
|
||||
tParse batch s
|
||||
| batch = eitherList (L.map (\(Large t) -> tParse1 t)) ts
|
||||
| otherwise = [tParse1 s]
|
||||
where
|
||||
getBatch :: ExceptT TransportError IO (NonEmpty (Either TransportError RawTransmission))
|
||||
getBatch = do
|
||||
s <- ExceptT $ tGetBlock th
|
||||
ts <- liftEither $ parse smpP TEBadBlock s
|
||||
pure $ L.map (\(Large t) -> parse transmissionP TEBadBlock t) ts
|
||||
tParse1 = parse transmissionP TEBadBlock
|
||||
ts = parse smpP TEBadBlock s
|
||||
|
||||
eitherList :: (a -> NonEmpty (Either e b)) -> Either e a -> NonEmpty (Either e b)
|
||||
eitherList = either (\e -> [Left e])
|
||||
|
||||
-- | Receive client and server transmissions (determined by `cmd` type).
|
||||
tGet :: forall cmd c m. (ProtocolEncoding cmd, Transport c, MonadIO m) => THandle c -> m (NonEmpty (SignedTransmission cmd))
|
||||
tGet th@THandle {sessionId, thVersion = v} = liftIO (tGetParse th) >>= mapM decodeParseValidate
|
||||
tGet :: forall cmd c. (ProtocolEncoding cmd, Transport c) => THandle c -> IO (NonEmpty (SignedTransmission cmd))
|
||||
tGet th@THandle {sessionId, thVersion = v} = L.map (tDecodeParseValidate sessionId v) <$> tGetParse th
|
||||
|
||||
tDecodeParseValidate :: forall cmd. ProtocolEncoding cmd => SessionId -> Version -> Either TransportError RawTransmission -> SignedTransmission cmd
|
||||
tDecodeParseValidate sessionId v = \case
|
||||
Right RawTransmission {signature, signed, sessId, corrId, entityId, command}
|
||||
| sessId == sessionId ->
|
||||
let decodedTransmission = (,corrId,entityId,command) <$> C.decodeSignature signature
|
||||
in either (const $ tError corrId) (tParseValidate signed) decodedTransmission
|
||||
| otherwise -> (Nothing, "", (CorrId corrId, "", Left SESSION))
|
||||
Left _ -> tError ""
|
||||
where
|
||||
decodeParseValidate :: Either TransportError RawTransmission -> m (SignedTransmission cmd)
|
||||
decodeParseValidate = \case
|
||||
Right RawTransmission {signature, signed, sessId, corrId, entityId, command}
|
||||
| sessId == sessionId ->
|
||||
let decodedTransmission = (,corrId,entityId,command) <$> C.decodeSignature signature
|
||||
in either (const $ tError corrId) (tParseValidate signed) decodedTransmission
|
||||
| otherwise -> pure (Nothing, "", (CorrId corrId, "", Left SESSION))
|
||||
Left _ -> tError ""
|
||||
tError :: ByteString -> SignedTransmission cmd
|
||||
tError corrId = (Nothing, "", (CorrId corrId, "", Left BLOCK))
|
||||
|
||||
tError :: ByteString -> m (SignedTransmission cmd)
|
||||
tError corrId = pure (Nothing, "", (CorrId corrId, "", Left BLOCK))
|
||||
|
||||
tParseValidate :: ByteString -> SignedRawTransmission -> m (SignedTransmission cmd)
|
||||
tParseValidate signed t@(sig, corrId, entityId, command) = do
|
||||
tParseValidate :: ByteString -> SignedRawTransmission -> SignedTransmission cmd
|
||||
tParseValidate signed t@(sig, corrId, entityId, command) =
|
||||
let cmd = parseProtocol v command >>= checkCredentials t
|
||||
pure (sig, signed, (CorrId corrId, entityId, cmd))
|
||||
in (sig, signed, (CorrId corrId, entityId, cmd))
|
||||
|
||||
@@ -260,7 +260,7 @@ cancelSub sub =
|
||||
|
||||
receive :: Transport c => THandle c -> Client -> M ()
|
||||
receive th Client {rcvQ, sndQ, activeAt} = forever $ do
|
||||
ts <- L.toList <$> tGet th
|
||||
ts <- L.toList <$> liftIO (tGet th)
|
||||
atomically . writeTVar activeAt =<< liftIO getSystemTime
|
||||
as <- partitionEithers <$> mapM cmdAction ts
|
||||
write sndQ $ fst as
|
||||
|
||||
@@ -19,6 +19,9 @@ import Simplex.Messaging.Transport (SessionId, TLS (tlsUniq), Transport (cGet, c
|
||||
import Simplex.Messaging.Transport.Buffer
|
||||
import qualified System.TimeManager as TI
|
||||
|
||||
defaultHTTP2BufferSize :: BufferSize
|
||||
defaultHTTP2BufferSize = 32768
|
||||
|
||||
withHTTP2 :: BufferSize -> (Config -> SessionId -> IO ()) -> TLS -> IO ()
|
||||
withHTTP2 sz run c = E.bracket (allocHTTP2Config c sz) freeSimpleConfig (`run` tlsUniq c)
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Transport (SessionId)
|
||||
import Simplex.Messaging.Transport.Client (TransportClientConfig (..), TransportHost (..), runTLSTransportClient)
|
||||
import Simplex.Messaging.Transport.HTTP2 (HTTP2Body, getHTTP2Body, http2TLSParams, withHTTP2)
|
||||
import Simplex.Messaging.Transport.HTTP2
|
||||
import UnliftIO.STM
|
||||
import UnliftIO.Timeout
|
||||
|
||||
@@ -62,7 +62,7 @@ defaultHTTP2ClientConfig =
|
||||
{ qSize = 64,
|
||||
connTimeout = 10000000,
|
||||
transportConfig = TransportClientConfig Nothing Nothing True,
|
||||
bufferSize = 32768,
|
||||
bufferSize = defaultHTTP2BufferSize,
|
||||
bodyHeadSize = 16384,
|
||||
suportedTLSParams = http2TLSParams
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ import Network.Socket
|
||||
import qualified Network.TLS as T
|
||||
import Numeric.Natural (Natural)
|
||||
import Simplex.Messaging.Transport (SessionId)
|
||||
import Simplex.Messaging.Transport.HTTP2 (HTTP2Body, getHTTP2Body, withHTTP2)
|
||||
import Simplex.Messaging.Transport.HTTP2
|
||||
import Simplex.Messaging.Transport.Server (loadSupportedTLSServerParams, runTransportServer)
|
||||
|
||||
type HTTP2ServerFunc = SessionId -> Request -> (Response -> IO ()) -> IO ()
|
||||
@@ -43,6 +43,7 @@ data HTTP2Server = HTTP2Server
|
||||
reqQ :: TBQueue HTTP2Request
|
||||
}
|
||||
|
||||
-- This server is for testing only, it processes all requests in a single queue.
|
||||
getHTTP2Server :: HTTP2ServerConfig -> IO HTTP2Server
|
||||
getHTTP2Server HTTP2ServerConfig {qSize, http2Port, bufferSize, bodyHeadSize, serverSupported, caCertificateFile, certificateFile, privateKeyFile, logTLSErrors} = do
|
||||
tlsServerParams <- loadSupportedTLSServerParams serverSupported caCertificateFile certificateFile privateKeyFile
|
||||
|
||||
@@ -91,7 +91,7 @@ tPut1 h t = do
|
||||
|
||||
tGet1 :: (ProtocolEncoding cmd, Transport c, MonadIO m, MonadFail m) => THandle c -> m (SignedTransmission cmd)
|
||||
tGet1 h = do
|
||||
[r] <- tGet h
|
||||
[r] <- liftIO $ tGet h
|
||||
pure r
|
||||
|
||||
(#==) :: (HasCallStack, Eq a, Show a) => (a, a) -> String -> Assertion
|
||||
|
||||
Reference in New Issue
Block a user