do not send session ID in each transmission

This commit is contained in:
Evgeny Poberezkin
2024-02-09 19:53:06 +00:00
parent 3295fbee8b
commit a734c29eeb
19 changed files with 365 additions and 274 deletions
+12 -9
View File
@@ -45,7 +45,7 @@ import Simplex.Messaging.Protocol
RecipientId,
SenderId,
)
import Simplex.Messaging.Transport (supportedParameters)
import Simplex.Messaging.Transport (THandleParams (..), supportedParameters)
import Simplex.Messaging.Transport.Client (TransportClientConfig, TransportHost)
import Simplex.Messaging.Transport.HTTP2
import Simplex.Messaging.Transport.HTTP2.Client
@@ -57,6 +57,7 @@ import UnliftIO.Directory
data XFTPClient = XFTPClient
{ http2Client :: HTTP2Client,
transportSession :: TransportSession FileResponse,
thParams :: THandleParams,
config :: XFTPClientConfig
}
@@ -98,7 +99,9 @@ getXFTPClient transportSession@(_, srv, _) config@XFTPClientConfig {xftpNetworkC
let usePort = if null port then "443" else port
clientDisconnected = readTVarIO clientVar >>= mapM_ disconnected
http2Client <- liftEitherError xftpClientError $ getVerifiedHTTP2Client (Just username) useHost usePort (Just keyHash) Nothing http2Config clientDisconnected
let c = XFTPClient {http2Client, transportSession, config}
let HTTP2Client {sessionId} = http2Client
thParams = THandleParams {sessionId, blockSize = xftpBlockSize, thVersion = currentXFTPVersion, thAuth = Nothing, encrypt = False, batch = True}
c = XFTPClient {http2Client, thParams, transportSession, config}
atomically $ writeTVar clientVar $ Just c
pure c
@@ -132,20 +135,20 @@ xftpClientError = \case
HCIOError e -> PCEIOError e
sendXFTPCommand :: forall p. FilePartyI p => XFTPClient -> C.APrivateAuthKey -> XFTPFileId -> FileCommand p -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body)
sendXFTPCommand c@XFTPClient {http2Client = HTTP2Client {sessionId}} pKey fId cmd chunkSpec_ = do
sendXFTPCommand c@XFTPClient {thParams} pKey fId cmd chunkSpec_ = do
t <-
liftEither . first PCETransportError $
xftpEncodeTransmission sessionId (Just pKey) ("", fId, FileCmd (sFileParty @p) cmd)
xftpEncodeTransmission thParams (Just pKey) ("", fId, FileCmd (sFileParty @p) cmd)
sendXFTPTransmission c t chunkSpec_
sendXFTPTransmission :: XFTPClient -> ByteString -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body)
sendXFTPTransmission XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}} t chunkSpec_ = do
sendXFTPTransmission XFTPClient {config, thParams, http2Client} t chunkSpec_ = do
let req = H.requestStreaming N.methodPost "/" [] streamBody
reqTimeout = (\XFTPChunkSpec {chunkSize} -> chunkTimeout config chunkSize) <$> chunkSpec_
HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- liftEitherError xftpClientError $ sendRequest http2 req reqTimeout
HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- liftEitherError xftpClientError $ sendRequest http2Client req reqTimeout
when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK
-- TODO validate that the file ID is the same as in the request?
(_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission sessionId bodyHead
(_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission thParams bodyHead
case respOrErr of
Right r -> case protocolError r of
Just e -> throwError $ PCEProtocolError e
@@ -212,10 +215,10 @@ ackXFTPChunk :: XFTPClient -> C.APrivateAuthKey -> RecipientId -> ExceptT XFTPCl
ackXFTPChunk c rpKey rId = sendXFTPCommand c rpKey rId FACK Nothing >>= okResponse
pingXFTP :: XFTPClient -> ExceptT XFTPClientError IO ()
pingXFTP c@XFTPClient {http2Client = HTTP2Client {sessionId}} = do
pingXFTP c@XFTPClient {thParams} = do
t <-
liftEither . first PCETransportError $
xftpEncodeTransmission sessionId Nothing ("", "", FileCmd SFRecipient PING)
xftpEncodeTransmission thParams Nothing ("", "", FileCmd SFRecipient PING)
(r, _) <- sendXFTPTransmission c t Nothing
case r of
FRPong -> pure ()
+12 -10
View File
@@ -7,6 +7,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-}
@@ -46,13 +47,14 @@ import Simplex.Messaging.Protocol
SignedTransmission,
SndPublicAuthKey,
Transmission,
encodeTransmission,
ClntTransmission (..),
encodeClntTransmission,
messageTagP,
tDecodeParseValidate,
tEncodeBatch1,
tParse,
)
import Simplex.Messaging.Transport (SessionId, TransportError (..))
import Simplex.Messaging.Transport (THandleParams (..), TransportError (..))
import Simplex.Messaging.Util (bshow, (<$?>))
import Simplex.Messaging.Version
@@ -394,20 +396,20 @@ checkParty' c = case testEquality (sFileParty @p) (sFileParty @p') of
Just Refl -> Just c
_ -> Nothing
xftpEncodeTransmission :: ProtocolEncoding e c => SessionId -> Maybe C.APrivateAuthKey -> Transmission c -> Either TransportError ByteString
xftpEncodeTransmission sessionId pKey (corrId, fId, msg) = do
let t = encodeTransmission currentXFTPVersion sessionId (corrId, fId, msg)
xftpEncodeBatch1 =<< authTransmission Nothing pKey corrId t
xftpEncodeTransmission :: ProtocolEncoding e c => THandleParams -> Maybe C.APrivateAuthKey -> Transmission c -> Either TransportError ByteString
xftpEncodeTransmission thParams pKey (corrId, fId, msg) = do
let ClntTransmission {tForAuth, tToSend} = encodeClntTransmission thParams (corrId, fId, msg)
xftpEncodeBatch1 . (,tToSend) =<< authTransmission Nothing pKey corrId tForAuth
-- this function uses batch syntax but puts only one transmission in the batch
xftpEncodeBatch1 :: SentRawTransmission -> Either TransportError ByteString
xftpEncodeBatch1 t = first (const TELargeMsg) $ C.pad (tEncodeBatch1 t) xftpBlockSize
xftpDecodeTransmission :: ProtocolEncoding e c => SessionId -> ByteString -> Either XFTPErrorType (SignedTransmission e c)
xftpDecodeTransmission sessionId t = do
xftpDecodeTransmission :: ProtocolEncoding e c => THandleParams -> ByteString -> Either XFTPErrorType (SignedTransmission e c)
xftpDecodeTransmission thParams t = do
t' <- first (const BLOCK) $ C.unPad t
case tParse True t' of
t'' :| [] -> Right $ tDecodeParseValidate sessionId currentXFTPVersion t''
case tParse thParams t' of
t'' :| [] -> Right $ tDecodeParseValidate thParams t''
_ -> Left BLOCK
$(J.deriveJSON (enumJSON $ dropPrefix "F") ''FileParty)
+15 -5
View File
@@ -51,6 +51,7 @@ import Simplex.Messaging.Protocol (CorrId, RcvPublicDhKey, RcvPublicAuthKey, Rec
import Simplex.Messaging.Server (dummyVerifyCmd, verifyCmdAuthorization)
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.Stats
import Simplex.Messaging.Transport (THandleParams (..))
import Simplex.Messaging.Transport.Buffer (trimCR)
import Simplex.Messaging.Transport.HTTP2
import Simplex.Messaging.Transport.HTTP2.Server
@@ -66,6 +67,14 @@ import qualified UnliftIO.Exception as E
type M a = ReaderT XFTPEnv IO a
data XFTPTransportRequest =
XFTPTransportRequest
{ thParams :: THandleParams,
reqBody :: HTTP2Body,
request :: H.Request,
sendResponse :: H.Response -> IO ()
}
runXFTPServer :: XFTPServerConfig -> IO ()
runXFTPServer cfg = do
started <- newEmptyTMVarIO
@@ -86,7 +95,8 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
liftIO $
runHTTP2Server started xftpPort defaultHTTP2BufferSize serverParams transportConfig inactiveClientExpiration $ \sessionId r sendResponse -> do
reqBody <- getHTTP2Body r xftpBlockSize
processRequest HTTP2Request {sessionId, request = r, reqBody, sendResponse} `runReaderT` env
let thParams = THandleParams {sessionId, blockSize = xftpBlockSize, thVersion = currentXFTPVersion, thAuth = Nothing, encrypt = False, batch = True}
processRequest XFTPTransportRequest {thParams, request = r, reqBody, sendResponse} `runReaderT` env
stopServer :: M ()
stopServer = do
@@ -215,11 +225,11 @@ data ServerFile = ServerFile
sbState :: LC.SbState
}
processRequest :: HTTP2Request -> M ()
processRequest HTTP2Request {sessionId, reqBody = body@HTTP2Body {bodyHead}, sendResponse}
processRequest :: XFTPTransportRequest -> M ()
processRequest XFTPTransportRequest {thParams, reqBody = body@HTTP2Body {bodyHead}, sendResponse}
| B.length bodyHead /= xftpBlockSize = sendXFTPResponse ("", "", FRErr BLOCK) Nothing
| otherwise = do
case xftpDecodeTransmission sessionId bodyHead of
case xftpDecodeTransmission thParams bodyHead of
Right (sig_, signed, (corrId, fId, cmdOrErr)) -> do
case cmdOrErr of
Right cmd -> do
@@ -233,7 +243,7 @@ processRequest HTTP2Request {sessionId, reqBody = body@HTTP2Body {bodyHead}, sen
where
sendXFTPResponse :: (CorrId, XFTPFileId, FileResponse) -> Maybe ServerFile -> M ()
sendXFTPResponse (corrId, fId, resp) serverFile_ = do
let t_ = xftpEncodeTransmission sessionId Nothing (corrId, fId, resp)
let t_ = xftpEncodeTransmission thParams Nothing (corrId, fId, resp)
liftIO $ sendResponse $ H.responseStreaming N.ok200 [] $ streamBody t_
where
streamBody t_ send done = do
+2 -1
View File
@@ -163,6 +163,7 @@ import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Parsers (parse)
import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, XFTPServerWithAuth)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Transport (THandleParams (sessionId))
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util
import Simplex.Messaging.Version
@@ -2061,7 +2062,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s
where
processEND = \case
Just (Right clnt)
| sessId == sessionId clnt -> do
| sessId == sessionId (thParams clnt) -> do
removeSubscription c connId
notify' END
pure "END"
+2 -1
View File
@@ -213,6 +213,7 @@ import Simplex.Messaging.Protocol
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (THandleParams (..))
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Util
import Simplex.Messaging.Version
@@ -1117,7 +1118,7 @@ getQueueMessage :: AgentMonad m => AgentClient -> RcvQueue -> m (Maybe SMPMsgMet
getQueueMessage c rq@RcvQueue {server, rcvId, rcvPrivateKey} = do
atomically createTakeGetLock
(v, msg_) <- withSMPClient c rq "GET" $ \smp ->
(thVersion smp,) <$> getSMPMessage smp rcvPrivateKey rcvId
(thVersion $ thParams smp,) <$> getSMPMessage smp rcvPrivateKey rcvId
mapM (decryptMeta v) msg_
where
decryptMeta v msg@SMP.RcvMessage {msgId} = SMP.rcvMessageMeta msgId <$> decryptSMPMessage v rq msg
+24 -23
View File
@@ -28,7 +28,7 @@
module Simplex.Messaging.Client
( -- * Connect (disconnect) client to (from) SMP server
TransportSession,
ProtocolClient (thVersion, sessionId, sessionTs),
ProtocolClient (thParams, sessionTs),
SMPClient,
getProtocolClient,
closeProtocolClient,
@@ -119,13 +119,9 @@ import System.Timeout (timeout)
-- Use 'getSMPClient' to connect to an SMP server and create a client handle.
data ProtocolClient err msg = ProtocolClient
{ action :: Maybe (Async ()),
sessionId :: SessionId,
thParams :: THandleParams,
sessionTs :: UTCTime,
thVersion :: Version,
thAuth :: Maybe THandleAuth,
timeoutPerBlock :: Int,
blockSize :: Int,
batch :: Bool,
client_ :: PClient err msg
}
@@ -153,13 +149,17 @@ clientStub sessionId thVersion thAuth = do
return
ProtocolClient
{ action = Nothing,
sessionId,
thParams =
THandleParams
{ sessionId,
thVersion,
thAuth,
blockSize = smpBlockSize,
encrypt = thVersion >= encryptTransmissionSMPVersion,
batch = True
},
sessionTs = undefined,
thVersion,
thAuth,
timeoutPerBlock = undefined,
blockSize = smpBlockSize,
batch = undefined,
client_ =
PClient
{ connected,
@@ -373,10 +373,10 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
ks <- atomically $ C.generateKeyPair g
runExceptT (protocolClientHandshake @err @msg h ks (keyHash srv) serverVRange) >>= \case
Left e -> atomically . putTMVar cVar . Left $ PCETransportError e
Right th@THandle {sessionId, thVersion, thAuth, blockSize, batch} -> do
Right th@THandle {params} -> do
sessionTs <- getCurrentTime
let timeoutPerBlock = (blockSize * tcpTimeoutPerKb) `div` 1024
c' = ProtocolClient {action = Nothing, client_ = c, sessionId, thVersion, thAuth, sessionTs, timeoutPerBlock, blockSize, batch}
let timeoutPerBlock = (blockSize params * tcpTimeoutPerKb) `div` 1024
c' = ProtocolClient {action = Nothing, client_ = c, thParams = params, sessionTs, timeoutPerBlock}
atomically $ do
writeTVar (connected c) True
putTMVar cVar $ Right c'
@@ -521,7 +521,7 @@ writeSMPMessage :: SMPClient -> RecipientId -> BrokerMsg -> IO ()
writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c rId msg) (msgQ $ client_ c)
serverTransmission :: ProtocolClient err msg -> RecipientId -> msg -> ServerTransmission msg
serverTransmission ProtocolClient {thVersion, sessionId, client_ = PClient {transportSession}} entityId message =
serverTransmission ProtocolClient {thParams = THandleParams {thVersion, sessionId}, client_ = PClient {transportSession}} entityId message =
(transportSession, thVersion, sessionId, entityId, message)
-- | Get message from SMP queue. The server returns ERR PROHIBITED if a client uses SUB and GET via the same transport connection for the same queue
@@ -639,7 +639,7 @@ type PCTransmission err msg = (Either TransportError SentRawTransmission, Reques
-- | Send multiple commands with batching and collect responses
sendProtocolCommands :: forall err msg. ProtocolEncoding err (ProtoCommand msg) => ProtocolClient err msg -> NonEmpty (ClientCommand msg) -> IO (NonEmpty (Response err msg))
sendProtocolCommands c@ProtocolClient {batch, blockSize} cs = do
sendProtocolCommands c@ProtocolClient {thParams = THandleParams {batch, blockSize}} cs = do
bs <- batchTransmissions' batch blockSize <$> mapM (mkTransmission c) cs
validate . concat =<< mapM (sendBatch c) bs
where
@@ -656,7 +656,7 @@ sendProtocolCommands c@ProtocolClient {batch, blockSize} cs = do
diff = L.length cs - length rs
streamProtocolCommands :: forall err msg. ProtocolEncoding err (ProtoCommand msg) => ProtocolClient err msg -> NonEmpty (ClientCommand msg) -> ([Response err msg] -> IO ()) -> IO ()
streamProtocolCommands c@ProtocolClient {batch, blockSize} cs cb = do
streamProtocolCommands c@ProtocolClient {thParams = THandleParams {batch, blockSize}} cs cb = do
bs <- batchTransmissions' batch blockSize <$> mapM (mkTransmission c) cs
mapM_ (cb <=< sendBatch c) bs
@@ -677,7 +677,7 @@ sendBatch c@ProtocolClient {client_ = PClient {sndQ}} b = do
-- | Send Protocol command
sendProtocolCommand :: forall err msg. ProtocolEncoding err (ProtoCommand msg) => ProtocolClient err msg -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg
sendProtocolCommand c@ProtocolClient {client_ = PClient {sndQ}, batch, blockSize} pKey entId cmd =
sendProtocolCommand c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} pKey entId cmd =
ExceptT $ uncurry sendRecv =<< mkTransmission c (pKey, entId, cmd)
where
-- two separate "atomically" needed to avoid blocking
@@ -702,11 +702,12 @@ getResponse ProtocolClient {client_ = PClient {tcpTimeout, pingErrorCount}} Requ
pure Response {entityId, response}
mkTransmission :: forall err msg. ProtocolEncoding err (ProtoCommand msg) => ProtocolClient err msg -> ClientCommand msg -> IO (PCTransmission err msg)
mkTransmission ProtocolClient {sessionId, thVersion = v, thAuth, client_ = PClient {clientCorrId, sentCommands}} (pKey_, entId, cmd) = do
mkTransmission ProtocolClient {thParams, client_ = PClient {clientCorrId, sentCommands}} (pKey_, entId, cmd) = do
corrId <- atomically getNextCorrId
let t = authTransmission thAuth pKey_ corrId $ encodeTransmission v sessionId (corrId, entId, cmd)
let ClntTransmission {tForAuth, tToSend} = encodeClntTransmission thParams (corrId, entId, cmd)
auth = authTransmission (thAuth thParams) pKey_ corrId tForAuth
r <- atomically $ mkRequest corrId
pure (t, r)
pure ((,tToSend) <$> auth, r)
where
getNextCorrId :: STM CorrId
getNextCorrId = do
@@ -718,8 +719,8 @@ mkTransmission ProtocolClient {sessionId, thVersion = v, thAuth, client_ = PClie
TM.insert corrId r sentCommands
pure r
authTransmission :: Maybe THandleAuth -> Maybe C.APrivateAuthKey -> CorrId -> ByteString -> Either TransportError SentRawTransmission
authTransmission thAuth pKey_ (CorrId corrId) t = (,t) <$> traverse authenticate pKey_
authTransmission :: Maybe THandleAuth -> Maybe C.APrivateAuthKey -> CorrId -> ByteString -> Either TransportError (Maybe TransmissionAuth)
authTransmission thAuth pKey_ (CorrId corrId) t = traverse authenticate pKey_
where
authenticate :: C.APrivateAuthKey -> Either TransportError TransmissionAuth
authenticate (C.APrivateAuthKey a pk) = case a of
@@ -43,12 +43,12 @@ import Simplex.Messaging.Notifications.Server.Stats
import Simplex.Messaging.Notifications.Server.Store
import Simplex.Messaging.Notifications.Server.StoreLog
import Simplex.Messaging.Notifications.Transport
import Simplex.Messaging.Protocol (ErrorType (..), ProtocolServer (host), SMPServer, SignedTransmission, Transmission, encodeTransmission, tGet, tPut)
import Simplex.Messaging.Protocol (ErrorType (..), ProtocolServer (host), SMPServer, SignedTransmission, Transmission, encodeSrvTransmission, tGet, tPut)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Server
import Simplex.Messaging.Server.Stats
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport (..), THandle (..), THandleAuth (..), TProxy, Transport (..))
import Simplex.Messaging.Transport (ATransport (..), THandle (..), THandleAuth (..), THandleParams (..), TProxy, Transport (..))
import Simplex.Messaging.Transport.Server (runTransportServer)
import Simplex.Messaging.Util
import System.Exit (exitFailure)
@@ -337,10 +337,10 @@ updateTknStatus NtfTknData {ntfTknId, tknStatus} status = do
when (old /= status) $ withNtfLog $ \sl -> logTokenStatus sl ntfTknId status
runNtfClientTransport :: Transport c => THandle c -> M ()
runNtfClientTransport th@THandle {sessionId} = do
runNtfClientTransport th@THandle {params} = do
qSize <- asks $ clientQSize . config
ts <- liftIO getSystemTime
c <- atomically $ newNtfServerClient qSize sessionId ts
c <- atomically $ newNtfServerClient qSize params ts
s <- asks subscriber
ps <- asks pushServer
expCfg <- asks $ inactiveClientExpiration . config
@@ -354,7 +354,7 @@ clientDisconnected :: NtfServerClient -> IO ()
clientDisconnected NtfServerClient {connected} = atomically $ writeTVar connected False
receive :: Transport c => THandle c -> NtfServerClient -> M ()
receive th@THandle {thAuth} NtfServerClient {rcvQ, sndQ, rcvActiveAt} = forever $ do
receive th@THandle {params = THandleParams {thAuth}} NtfServerClient {rcvQ, sndQ, rcvActiveAt} = forever $ do
ts <- liftIO $ tGet th
forM_ ts $ \t@(_, _, (corrId, entId, cmdOrError)) -> do
atomically . writeTVar rcvActiveAt =<< liftIO getSystemTime
@@ -369,9 +369,9 @@ receive th@THandle {thAuth} NtfServerClient {rcvQ, sndQ, rcvActiveAt} = forever
write q t = atomically $ writeTBQueue q t
send :: Transport c => THandle c -> NtfServerClient -> IO ()
send h@THandle {thVersion = v} NtfServerClient {sndQ, sessionId, sndActiveAt} = forever $ do
send h@THandle {params} NtfServerClient {sndQ, sndActiveAt} = forever $ do
t <- atomically $ readTBQueue sndQ
void . liftIO $ tPut h [Right (Nothing, encodeTransmission v sessionId t)]
void . liftIO $ tPut h [Right (Nothing, encodeSrvTransmission params t)]
atomically . writeTVar sndActiveAt =<< liftIO getSystemTime
-- instance Show a => Show (TVar a) where
@@ -12,7 +12,6 @@ import Control.Concurrent.Async (Async)
import Control.Logger.Simple
import Control.Monad.IO.Unlift
import Crypto.Random
import Data.ByteString.Char8 (ByteString)
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty)
import Data.Time.Clock (getCurrentTime)
@@ -33,7 +32,7 @@ import Simplex.Messaging.Protocol (CorrId, SMPServer, Transmission)
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport)
import Simplex.Messaging.Transport (ATransport, THandleParams)
import Simplex.Messaging.Transport.Server (TransportServerConfig, loadFingerprint, loadTLSServerParams)
import Simplex.Messaging.Version (VersionRange)
import System.IO (IOMode (..))
@@ -160,17 +159,17 @@ data NtfRequest
data NtfServerClient = NtfServerClient
{ rcvQ :: TBQueue NtfRequest,
sndQ :: TBQueue (Transmission NtfResponse),
sessionId :: ByteString,
ntfThParams :: THandleParams,
connected :: TVar Bool,
rcvActiveAt :: TVar SystemTime,
sndActiveAt :: TVar SystemTime
}
newNtfServerClient :: Natural -> ByteString -> SystemTime -> STM NtfServerClient
newNtfServerClient qSize sessionId ts = do
newNtfServerClient :: Natural -> THandleParams -> SystemTime -> STM NtfServerClient
newNtfServerClient qSize ntfThParams ts = do
rcvQ <- newTBQueue qSize
sndQ <- newTBQueue qSize
connected <- newTVar True
rcvActiveAt <- newTVar ts
sndActiveAt <- newTVar ts
return NtfServerClient {rcvQ, sndQ, sessionId, connected, rcvActiveAt, sndActiveAt}
return NtfServerClient {rcvQ, sndQ, ntfThParams, connected, rcvActiveAt, sndActiveAt}
@@ -18,14 +18,17 @@ import Simplex.Messaging.Version
ntfBlockSize :: Int
ntfBlockSize = 512
encryptTransmissionNTFVersion :: Version
encryptTransmissionNTFVersion = 2
authEncryptCmdsNTFVersion :: Version
authEncryptCmdsNTFVersion = 2
authEncryptCmdsNTFVersion = 3
currentClientNTFVersion :: Version
currentClientNTFVersion = 1
currentClientNTFVersion = 2
currentServerNTFVersion :: Version
currentServerNTFVersion = 1
currentServerNTFVersion = 2
supportedClientNTFVRange :: VersionRange
supportedClientNTFVRange = mkVersionRange 1 currentClientNTFVersion
@@ -78,7 +81,7 @@ encodeNtfAuthPubKey v k
-- | Notifcations server transport handshake.
ntfServerHandshake :: forall c. Transport c => c -> C.KeyPairX25519 -> C.KeyHash -> VersionRange -> ExceptT TransportError IO (THandle c)
ntfServerHandshake c (k, pk) kh ntfVRange = do
let th@THandle {sessionId} = ntfTHandle c
let th@THandle {params = THandleParams {sessionId}} = ntfTHandle c
sendHandshake th $ NtfServerHandshake {sessionId, ntfVersionRange = ntfVRange, authPubKey = Just k}
getHandshake th >>= \case
NtfClientHandshake {ntfVersion = v, keyHash, authPubKey = k'}
@@ -91,7 +94,7 @@ ntfServerHandshake c (k, pk) kh ntfVRange = do
-- | Notifcations server client transport handshake.
ntfClientHandshake :: forall c. Transport c => c -> C.KeyPairX25519 -> C.KeyHash -> VersionRange -> ExceptT TransportError IO (THandle c)
ntfClientHandshake c (k, pk) keyHash ntfVRange = do
let th@THandle {sessionId} = ntfTHandle c
let th@THandle {params = THandleParams {sessionId}} = ntfTHandle c
NtfServerHandshake {sessionId = sessId, ntfVersionRange, authPubKey = k'} <- getHandshake th
if sessionId /= sessId
then throwError TEBadSession
@@ -102,10 +105,13 @@ ntfClientHandshake c (k, pk) keyHash ntfVRange = do
Nothing -> throwError $ TEHandshake VERSION
ntfThHandle :: forall c. THandle c -> Version -> C.PrivateKeyX25519 -> Maybe C.PublicKeyX25519 -> THandle c
ntfThHandle th v pk k_ =
ntfThHandle th@THandle {params} v pk k_ =
-- TODO drop SMP v6: make thAuth non-optional
let thAuth = (\k -> THandleAuth {peerPubKey = k, privKey = pk, dhSecret = C.dh' k pk}) <$> k_
in (th :: THandle c) {thVersion = v, thAuth}
params' = params {thVersion = v, thAuth, encrypt = v >= encryptTransmissionNTFVersion}
in (th :: THandle c) {params = params'}
ntfTHandle :: Transport c => c -> THandle c
ntfTHandle c = THandle {connection = c, sessionId = tlsUnique c, blockSize = ntfBlockSize, thVersion = 0, thAuth = Nothing, batch = False}
ntfTHandle c = THandle {connection = c, params}
where
params = THandleParams {sessionId = tlsUnique c, blockSize = ntfBlockSize, thVersion = 0, thAuth = Nothing, encrypt = False, batch = False}
+39 -18
View File
@@ -125,7 +125,9 @@ module Simplex.Messaging.Protocol
-- * Parse and serialize
ProtocolMsgTag (..),
messageTagP,
encodeTransmission,
ClntTransmission (..),
encodeClntTransmission,
encodeSrvTransmission,
transmissionP,
_smpP,
encodeRcvMsgBody,
@@ -1066,18 +1068,19 @@ data CommandError
deriving (Eq, Read, Show)
-- | SMP transmission parser.
transmissionP :: Parser RawTransmission
transmissionP = do
transmissionP :: THandleParams -> Parser RawTransmission
transmissionP THandleParams {sessionId, encrypt} = do
authenticator <- smpP
authorized <- A.takeByteString
either fail pure $ parseAll (trn authenticator authorized) authorized
where
trn authenticator authorized = do
sessId <- smpP
sessId <- if encrypt then pure "" else smpP
let authorized' = if encrypt then smpEncode sessionId <> authorized else authorized
corrId <- smpP
entityId <- smpP
command <- A.takeByteString
pure RawTransmission {authenticator, authorized, sessId, corrId, entityId, command}
pure RawTransmission {authenticator, authorized = authorized', sessId, corrId, entityId, command}
class (ProtocolEncoding err msg, ProtocolEncoding err (ProtoCommand msg), Show err, Show msg) => Protocol err msg | msg -> err where
type ProtoCommand msg = cmd | cmd -> msg
@@ -1312,7 +1315,7 @@ instance Encoding CommandError where
-- | Send signed SMP transmission to TCP transport.
tPut :: Transport c => THandle c -> NonEmpty (Either TransportError SentRawTransmission) -> IO [Either TransportError ()]
tPut th = fmap concat . mapM tPutBatch . batchTransmissions (batch th) (blockSize th)
tPut th@THandle {params} = fmap concat . mapM tPutBatch . batchTransmissions (batch params) (blockSize params)
where
tPutBatch :: TransportBatch () -> IO [Either TransportError ()]
tPutBatch = \case
@@ -1379,22 +1382,40 @@ tEncodeBatch1 :: SentRawTransmission -> ByteString
tEncodeBatch1 t = lenEncode 1 `B.cons` tEncodeForBatch t
{-# INLINE tEncodeBatch1 #-}
encodeTransmission :: ProtocolEncoding e c => Version -> ByteString -> Transmission c -> ByteString
encodeTransmission v sessionId (CorrId corrId, queueId, command) =
smpEncode (sessionId, corrId, queueId) <> encodeProtocol v command
{-# INLINE encodeTransmission #-}
-- tForAuth is lazy to avoid computing it when there is no key to sign
data ClntTransmission = ClntTransmission {tForAuth :: ~ByteString, tToSend :: ByteString}
encodeClntTransmission :: ProtocolEncoding e c => THandleParams -> Transmission c -> ClntTransmission
encodeClntTransmission THandleParams {thVersion = v, sessionId, encrypt} t =
ClntTransmission {tForAuth, tToSend = if encrypt then t' else tForAuth}
where
tForAuth = smpEncode sessionId <> t'
t' = encodeTransmission_ v t
{-# INLINE encodeClntTransmission #-}
encodeSrvTransmission :: ProtocolEncoding e c => THandleParams -> Transmission c -> ByteString
encodeSrvTransmission THandleParams {thVersion = v, sessionId, encrypt} t =
if encrypt then t' else smpEncode sessionId <> t'
where
t' = encodeTransmission_ v t
{-# INLINE encodeSrvTransmission #-}
encodeTransmission_ :: ProtocolEncoding e c => Version -> Transmission c -> ByteString
encodeTransmission_ v (CorrId corrId, queueId, command) =
smpEncode (corrId, queueId) <> encodeProtocol v command
{-# INLINE encodeTransmission_ #-}
-- | Receive and parse transmission from the TCP transport (ignoring any trailing padding).
tGetParse :: Transport c => THandle c -> IO (NonEmpty (Either TransportError RawTransmission))
tGetParse th = eitherList (tParse $ batch th) <$> tGetBlock th
tGetParse th@THandle {params} = eitherList (tParse params) <$> tGetBlock th
{-# INLINE tGetParse #-}
tParse :: Bool -> ByteString -> NonEmpty (Either TransportError RawTransmission)
tParse batch s
tParse :: THandleParams -> ByteString -> NonEmpty (Either TransportError RawTransmission)
tParse thParams@THandleParams {batch} s
| batch = eitherList (L.map (\(Large t) -> tParse1 t)) ts
| otherwise = [tParse1 s]
where
tParse1 = parse transmissionP TEBadBlock
tParse1 = parse (transmissionP thParams) TEBadBlock
ts = parse smpP TEBadBlock s
eitherList :: (a -> NonEmpty (Either e b)) -> Either e a -> NonEmpty (Either e b)
@@ -1402,12 +1423,12 @@ eitherList = either (\e -> [Left e])
-- | Receive client and server transmissions (determined by `cmd` type).
tGet :: forall err cmd c. (ProtocolEncoding err cmd, Transport c) => THandle c -> IO (NonEmpty (SignedTransmission err cmd))
tGet th@THandle {sessionId, thVersion = v} = L.map (tDecodeParseValidate sessionId v) <$> tGetParse th
tGet th@THandle {params} = L.map (tDecodeParseValidate params) <$> tGetParse th
tDecodeParseValidate :: forall err cmd. ProtocolEncoding err cmd => SessionId -> Version -> Either TransportError RawTransmission -> SignedTransmission err cmd
tDecodeParseValidate sessionId v = \case
tDecodeParseValidate :: forall err cmd. ProtocolEncoding err cmd => THandleParams -> Either TransportError RawTransmission -> SignedTransmission err cmd
tDecodeParseValidate THandleParams {sessionId, thVersion = v, encrypt} = \case
Right RawTransmission {authenticator, authorized, sessId, corrId, entityId, command}
| sessId == sessionId ->
| encrypt || sessId == sessionId ->
let decodedTransmission = (,corrId,entityId,command) <$> decodeTAuthBytes authenticator
in either (const $ tError corrId) (tParseValidate authorized) decodedTransmission
| otherwise -> (Nothing, "", (CorrId corrId, "", Left $ fromProtocolError @err @cmd PESession))
+5 -5
View File
@@ -371,7 +371,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
CPSkip -> pure ()
runClientTransport :: Transport c => THandle c -> M ()
runClientTransport th@THandle {thVersion, sessionId} = do
runClientTransport th@THandle {params = THandleParams {thVersion, sessionId}} = do
q <- asks $ tbqSize . config
ts <- liftIO getSystemTime
active <- asks clients
@@ -416,7 +416,7 @@ cancelSub sub =
_ -> return ()
receive :: Transport c => THandle c -> Client -> M ()
receive th@THandle {thAuth} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do
receive th@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive"
forever $ do
ts <- L.toList <$> liftIO (tGet th)
@@ -437,12 +437,12 @@ receive th@THandle {thAuth} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do
write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty
send :: Transport c => THandle c -> Client -> IO ()
send h@THandle {thVersion = v} Client {sndQ, sessionId, sndActiveAt} = do
send h@THandle {params} Client {sndQ, sessionId, sndActiveAt} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send"
forever $ do
ts <- atomically $ L.sortWith tOrder <$> readTBQueue sndQ
-- TODO we can authorize responses as well
void . liftIO . tPut h $ L.map (\t -> Right (Nothing, encodeTransmission v sessionId t)) ts
void . liftIO . tPut h $ L.map (\t -> Right (Nothing, encodeSrvTransmission params t)) ts
atomically . writeTVar sndActiveAt =<< liftIO getSystemTime
where
tOrder :: Transmission BrokerMsg -> Int
@@ -452,7 +452,7 @@ send h@THandle {thVersion = v} Client {sndQ, sessionId, sndActiveAt} = do
_ -> 1
disconnectTransport :: Transport c => THandle c -> TVar SystemTime -> TVar SystemTime -> ExpirationConfig -> IO Bool -> IO ()
disconnectTransport THandle {connection, sessionId} rcvActiveAt sndActiveAt expCfg noSubscriptions = do
disconnectTransport THandle {connection, params = THandleParams {sessionId}} rcvActiveAt sndActiveAt expCfg noSubscriptions = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " disconnectTransport"
loop
where
+28 -12
View File
@@ -33,6 +33,7 @@ module Simplex.Messaging.Transport
currentServerSMPRelayVersion,
basicAuthSMPVersion,
subModeSMPVersion,
encryptTransmissionSMPVersion,
authEncryptCmdsSMPVersion,
simplexMQVersion,
smpBlockSize,
@@ -54,6 +55,7 @@ module Simplex.Messaging.Transport
-- * SMP transport
THandle (..),
THandleParams (..),
THandleAuth (..),
TransportError (..),
HandshakeError (..),
@@ -120,14 +122,17 @@ basicAuthSMPVersion = 5
subModeSMPVersion :: Version
subModeSMPVersion = 6
encryptTransmissionSMPVersion :: Version
encryptTransmissionSMPVersion = 7
authEncryptCmdsSMPVersion :: Version
authEncryptCmdsSMPVersion = 7
authEncryptCmdsSMPVersion = 8
currentClientSMPRelayVersion :: Version
currentClientSMPRelayVersion = 6
currentClientSMPRelayVersion = 7
currentServerSMPRelayVersion :: Version
currentServerSMPRelayVersion = 6
currentServerSMPRelayVersion = 7
-- minimal supported protocol version is 4
-- TODO remove code that supports sending commands without batching
@@ -274,14 +279,22 @@ instance Transport TLS where
-- | The handle for SMP encrypted transport connection over Transport.
data THandle c = THandle
{ connection :: c,
sessionId :: SessionId,
params :: THandleParams
}
data THandleParams = THandleParams
{ sessionId :: SessionId,
blockSize :: Int,
-- | agreed server protocol version
thVersion :: Version,
-- | peer public key for command authorization and shared secrets for entity ID encryption
thAuth :: Maybe THandleAuth,
-- | additionally encrypt transmission inside transport protocol
-- to protect transmission from sending proxies
-- based on protocol version
encrypt :: Bool,
-- | send multiple transmissions in a single block
-- based on protocol and protocol version
-- based on protocol version
batch :: Bool
}
@@ -381,13 +394,13 @@ serializeTransportError = \case
-- | Pad and send block to SMP transport.
tPutBlock :: Transport c => THandle c -> ByteString -> IO (Either TransportError ())
tPutBlock THandle {connection = c, blockSize} block =
tPutBlock THandle {connection = c, params = THandleParams {blockSize}} block =
bimapM (const $ pure TELargeMsg) (cPut c) $
C.pad block blockSize
-- | Receive block from SMP transport.
tGetBlock :: Transport c => THandle c -> IO (Either TransportError ByteString)
tGetBlock THandle {connection = c, blockSize} = do
tGetBlock THandle {connection = c, params = THandleParams {blockSize}} = do
msg <- cGet c blockSize
if B.length msg == blockSize
then pure . first (const TELargeMsg) $ C.unPad msg
@@ -398,7 +411,7 @@ tGetBlock THandle {connection = c, blockSize} = do
-- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#appendix-a
smpServerHandshake :: forall c. Transport c => c -> C.KeyPairX25519 -> C.KeyHash -> VersionRange -> ExceptT TransportError IO (THandle c)
smpServerHandshake c (k, pk) kh smpVRange = do
let th@THandle {sessionId} = smpTHandle c
let th@THandle {params = THandleParams {sessionId}} = smpTHandle c
sendHandshake th $ ServerHandshake {sessionId, smpVersionRange = smpVRange, authPubKey = Just k}
getHandshake th >>= \case
ClientHandshake {smpVersion = v, keyHash, authPubKey = k'}
@@ -413,7 +426,7 @@ smpServerHandshake c (k, pk) kh smpVRange = do
-- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#appendix-a
smpClientHandshake :: forall c. Transport c => c -> C.KeyPairX25519 -> C.KeyHash -> VersionRange -> ExceptT TransportError IO (THandle c)
smpClientHandshake c (k, pk) keyHash smpVRange = do
let th@THandle {sessionId} = smpTHandle c
let th@THandle {params = THandleParams {sessionId}} = smpTHandle c
ServerHandshake {sessionId = sessId, smpVersionRange, authPubKey = k'} <- getHandshake th
if sessionId /= sessId
then throwE TEBadSession
@@ -424,10 +437,11 @@ smpClientHandshake c (k, pk) keyHash smpVRange = do
Nothing -> throwE $ TEHandshake VERSION
smpThHandle :: forall c. THandle c -> Version -> C.PrivateKeyX25519 -> Maybe C.PublicKeyX25519 -> THandle c
smpThHandle th v pk k_ =
smpThHandle th@THandle {params} v pk k_ =
-- TODO drop SMP v6: make thAuth non-optional
let thAuth = (\k -> THandleAuth {peerPubKey = k, privKey = pk, dhSecret = C.dh' k pk}) <$> k_
in (th :: THandle c) {thVersion = v, thAuth, batch = v >= batchCmdsSMPVersion}
params' = params {thVersion = v, thAuth, encrypt = v >= encryptTransmissionSMPVersion, batch = v >= batchCmdsSMPVersion}
in (th :: THandle c) {params = params'}
sendHandshake :: (Transport c, Encoding smp) => THandle c -> smp -> ExceptT TransportError IO ()
sendHandshake th = ExceptT . tPutBlock th . smpEncode
@@ -437,7 +451,9 @@ getHandshake :: (Transport c, Encoding smp) => THandle c -> ExceptT TransportErr
getHandshake th = ExceptT $ (first (\_ -> TEHandshake PARSE) . A.parseOnly smpP =<<) <$> tGetBlock th
smpTHandle :: Transport c => c -> THandle c
smpTHandle c = THandle {connection = c, sessionId = tlsUnique c, blockSize = smpBlockSize, thVersion = 0, thAuth = Nothing, batch = False}
smpTHandle c = THandle {connection = c, params}
where
params = THandleParams {sessionId = tlsUnique c, blockSize = smpBlockSize, thVersion = 0, thAuth = Nothing, encrypt = False, batch = False}
$(J.deriveJSON (sumTypeJSON id) ''HandshakeError)