From a734c29eeb9d4a231335bafcf37ee59f3fede73e Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Fri, 9 Feb 2024 19:53:06 +0000 Subject: [PATCH] do not send session ID in each transmission --- src/Simplex/FileTransfer/Client.hs | 21 +- src/Simplex/FileTransfer/Protocol.hs | 22 +- src/Simplex/FileTransfer/Server.hs | 20 +- src/Simplex/Messaging/Agent.hs | 3 +- src/Simplex/Messaging/Agent/Client.hs | 3 +- src/Simplex/Messaging/Client.hs | 47 +-- src/Simplex/Messaging/Notifications/Server.hs | 14 +- .../Messaging/Notifications/Server/Env.hs | 11 +- .../Messaging/Notifications/Transport.hs | 22 +- src/Simplex/Messaging/Protocol.hs | 57 ++-- src/Simplex/Messaging/Server.hs | 10 +- src/Simplex/Messaging/Transport.hs | 40 ++- tests/AgentTests/FunctionalAPITests.hs | 16 +- tests/AgentTests/NotificationTests.hs | 40 ++- tests/CoreTests/BatchingTests.hs | 269 ++++++++++-------- tests/NtfClient.hs | 8 +- tests/NtfServerTests.hs | 12 +- tests/SMPClient.hs | 12 +- tests/ServerTests.hs | 12 +- 19 files changed, 365 insertions(+), 274 deletions(-) diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index 85c1200eb..86b33a40b 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -45,7 +45,7 @@ import Simplex.Messaging.Protocol RecipientId, SenderId, ) -import Simplex.Messaging.Transport (supportedParameters) +import Simplex.Messaging.Transport (THandleParams (..), supportedParameters) import Simplex.Messaging.Transport.Client (TransportClientConfig, TransportHost) import Simplex.Messaging.Transport.HTTP2 import Simplex.Messaging.Transport.HTTP2.Client @@ -57,6 +57,7 @@ import UnliftIO.Directory data XFTPClient = XFTPClient { http2Client :: HTTP2Client, transportSession :: TransportSession FileResponse, + thParams :: THandleParams, config :: XFTPClientConfig } @@ -98,7 +99,9 @@ getXFTPClient transportSession@(_, srv, _) config@XFTPClientConfig {xftpNetworkC let usePort = if null port then "443" else port clientDisconnected = readTVarIO clientVar >>= mapM_ disconnected http2Client <- liftEitherError xftpClientError $ getVerifiedHTTP2Client (Just username) useHost usePort (Just keyHash) Nothing http2Config clientDisconnected - let c = XFTPClient {http2Client, transportSession, config} + let HTTP2Client {sessionId} = http2Client + thParams = THandleParams {sessionId, blockSize = xftpBlockSize, thVersion = currentXFTPVersion, thAuth = Nothing, encrypt = False, batch = True} + c = XFTPClient {http2Client, thParams, transportSession, config} atomically $ writeTVar clientVar $ Just c pure c @@ -132,20 +135,20 @@ xftpClientError = \case HCIOError e -> PCEIOError e sendXFTPCommand :: forall p. FilePartyI p => XFTPClient -> C.APrivateAuthKey -> XFTPFileId -> FileCommand p -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body) -sendXFTPCommand c@XFTPClient {http2Client = HTTP2Client {sessionId}} pKey fId cmd chunkSpec_ = do +sendXFTPCommand c@XFTPClient {thParams} pKey fId cmd chunkSpec_ = do t <- liftEither . first PCETransportError $ - xftpEncodeTransmission sessionId (Just pKey) ("", fId, FileCmd (sFileParty @p) cmd) + xftpEncodeTransmission thParams (Just pKey) ("", fId, FileCmd (sFileParty @p) cmd) sendXFTPTransmission c t chunkSpec_ sendXFTPTransmission :: XFTPClient -> ByteString -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body) -sendXFTPTransmission XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}} t chunkSpec_ = do +sendXFTPTransmission XFTPClient {config, thParams, http2Client} t chunkSpec_ = do let req = H.requestStreaming N.methodPost "/" [] streamBody reqTimeout = (\XFTPChunkSpec {chunkSize} -> chunkTimeout config chunkSize) <$> chunkSpec_ - HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- liftEitherError xftpClientError $ sendRequest http2 req reqTimeout + HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- liftEitherError xftpClientError $ sendRequest http2Client req reqTimeout when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK -- TODO validate that the file ID is the same as in the request? - (_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission sessionId bodyHead + (_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission thParams bodyHead case respOrErr of Right r -> case protocolError r of Just e -> throwError $ PCEProtocolError e @@ -212,10 +215,10 @@ ackXFTPChunk :: XFTPClient -> C.APrivateAuthKey -> RecipientId -> ExceptT XFTPCl ackXFTPChunk c rpKey rId = sendXFTPCommand c rpKey rId FACK Nothing >>= okResponse pingXFTP :: XFTPClient -> ExceptT XFTPClientError IO () -pingXFTP c@XFTPClient {http2Client = HTTP2Client {sessionId}} = do +pingXFTP c@XFTPClient {thParams} = do t <- liftEither . first PCETransportError $ - xftpEncodeTransmission sessionId Nothing ("", "", FileCmd SFRecipient PING) + xftpEncodeTransmission thParams Nothing ("", "", FileCmd SFRecipient PING) (r, _) <- sendXFTPTransmission c t Nothing case r of FRPong -> pure () diff --git a/src/Simplex/FileTransfer/Protocol.hs b/src/Simplex/FileTransfer/Protocol.hs index 1269f965e..4c19fa05b 100644 --- a/src/Simplex/FileTransfer/Protocol.hs +++ b/src/Simplex/FileTransfer/Protocol.hs @@ -7,6 +7,7 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilies #-} {-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-} @@ -46,13 +47,14 @@ import Simplex.Messaging.Protocol SignedTransmission, SndPublicAuthKey, Transmission, - encodeTransmission, + ClntTransmission (..), + encodeClntTransmission, messageTagP, tDecodeParseValidate, tEncodeBatch1, tParse, ) -import Simplex.Messaging.Transport (SessionId, TransportError (..)) +import Simplex.Messaging.Transport (THandleParams (..), TransportError (..)) import Simplex.Messaging.Util (bshow, (<$?>)) import Simplex.Messaging.Version @@ -394,20 +396,20 @@ checkParty' c = case testEquality (sFileParty @p) (sFileParty @p') of Just Refl -> Just c _ -> Nothing -xftpEncodeTransmission :: ProtocolEncoding e c => SessionId -> Maybe C.APrivateAuthKey -> Transmission c -> Either TransportError ByteString -xftpEncodeTransmission sessionId pKey (corrId, fId, msg) = do - let t = encodeTransmission currentXFTPVersion sessionId (corrId, fId, msg) - xftpEncodeBatch1 =<< authTransmission Nothing pKey corrId t +xftpEncodeTransmission :: ProtocolEncoding e c => THandleParams -> Maybe C.APrivateAuthKey -> Transmission c -> Either TransportError ByteString +xftpEncodeTransmission thParams pKey (corrId, fId, msg) = do + let ClntTransmission {tForAuth, tToSend} = encodeClntTransmission thParams (corrId, fId, msg) + xftpEncodeBatch1 . (,tToSend) =<< authTransmission Nothing pKey corrId tForAuth -- this function uses batch syntax but puts only one transmission in the batch xftpEncodeBatch1 :: SentRawTransmission -> Either TransportError ByteString xftpEncodeBatch1 t = first (const TELargeMsg) $ C.pad (tEncodeBatch1 t) xftpBlockSize -xftpDecodeTransmission :: ProtocolEncoding e c => SessionId -> ByteString -> Either XFTPErrorType (SignedTransmission e c) -xftpDecodeTransmission sessionId t = do +xftpDecodeTransmission :: ProtocolEncoding e c => THandleParams -> ByteString -> Either XFTPErrorType (SignedTransmission e c) +xftpDecodeTransmission thParams t = do t' <- first (const BLOCK) $ C.unPad t - case tParse True t' of - t'' :| [] -> Right $ tDecodeParseValidate sessionId currentXFTPVersion t'' + case tParse thParams t' of + t'' :| [] -> Right $ tDecodeParseValidate thParams t'' _ -> Left BLOCK $(J.deriveJSON (enumJSON $ dropPrefix "F") ''FileParty) diff --git a/src/Simplex/FileTransfer/Server.hs b/src/Simplex/FileTransfer/Server.hs index 6045d4be4..abebec58d 100644 --- a/src/Simplex/FileTransfer/Server.hs +++ b/src/Simplex/FileTransfer/Server.hs @@ -51,6 +51,7 @@ import Simplex.Messaging.Protocol (CorrId, RcvPublicDhKey, RcvPublicAuthKey, Rec import Simplex.Messaging.Server (dummyVerifyCmd, verifyCmdAuthorization) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.Stats +import Simplex.Messaging.Transport (THandleParams (..)) import Simplex.Messaging.Transport.Buffer (trimCR) import Simplex.Messaging.Transport.HTTP2 import Simplex.Messaging.Transport.HTTP2.Server @@ -66,6 +67,14 @@ import qualified UnliftIO.Exception as E type M a = ReaderT XFTPEnv IO a +data XFTPTransportRequest = + XFTPTransportRequest + { thParams :: THandleParams, + reqBody :: HTTP2Body, + request :: H.Request, + sendResponse :: H.Response -> IO () + } + runXFTPServer :: XFTPServerConfig -> IO () runXFTPServer cfg = do started <- newEmptyTMVarIO @@ -86,7 +95,8 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira liftIO $ runHTTP2Server started xftpPort defaultHTTP2BufferSize serverParams transportConfig inactiveClientExpiration $ \sessionId r sendResponse -> do reqBody <- getHTTP2Body r xftpBlockSize - processRequest HTTP2Request {sessionId, request = r, reqBody, sendResponse} `runReaderT` env + let thParams = THandleParams {sessionId, blockSize = xftpBlockSize, thVersion = currentXFTPVersion, thAuth = Nothing, encrypt = False, batch = True} + processRequest XFTPTransportRequest {thParams, request = r, reqBody, sendResponse} `runReaderT` env stopServer :: M () stopServer = do @@ -215,11 +225,11 @@ data ServerFile = ServerFile sbState :: LC.SbState } -processRequest :: HTTP2Request -> M () -processRequest HTTP2Request {sessionId, reqBody = body@HTTP2Body {bodyHead}, sendResponse} +processRequest :: XFTPTransportRequest -> M () +processRequest XFTPTransportRequest {thParams, reqBody = body@HTTP2Body {bodyHead}, sendResponse} | B.length bodyHead /= xftpBlockSize = sendXFTPResponse ("", "", FRErr BLOCK) Nothing | otherwise = do - case xftpDecodeTransmission sessionId bodyHead of + case xftpDecodeTransmission thParams bodyHead of Right (sig_, signed, (corrId, fId, cmdOrErr)) -> do case cmdOrErr of Right cmd -> do @@ -233,7 +243,7 @@ processRequest HTTP2Request {sessionId, reqBody = body@HTTP2Body {bodyHead}, sen where sendXFTPResponse :: (CorrId, XFTPFileId, FileResponse) -> Maybe ServerFile -> M () sendXFTPResponse (corrId, fId, resp) serverFile_ = do - let t_ = xftpEncodeTransmission sessionId Nothing (corrId, fId, resp) + let t_ = xftpEncodeTransmission thParams Nothing (corrId, fId, resp) liftIO $ sendResponse $ H.responseStreaming N.ok200 [] $ streamBody t_ where streamBody t_ send done = do diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index b7eac5a97..77526dc7f 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -163,6 +163,7 @@ import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Parsers (parse) import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, XFTPServerWithAuth) import qualified Simplex.Messaging.Protocol as SMP +import Simplex.Messaging.Transport (THandleParams (sessionId)) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util import Simplex.Messaging.Version @@ -2061,7 +2062,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s where processEND = \case Just (Right clnt) - | sessId == sessionId clnt -> do + | sessId == sessionId (thParams clnt) -> do removeSubscription c connId notify' END pure "END" diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 1dfdd524b..1151ab16f 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -213,6 +213,7 @@ import Simplex.Messaging.Protocol import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM +import Simplex.Messaging.Transport (THandleParams (..)) import Simplex.Messaging.Transport.Client (TransportHost) import Simplex.Messaging.Util import Simplex.Messaging.Version @@ -1117,7 +1118,7 @@ getQueueMessage :: AgentMonad m => AgentClient -> RcvQueue -> m (Maybe SMPMsgMet getQueueMessage c rq@RcvQueue {server, rcvId, rcvPrivateKey} = do atomically createTakeGetLock (v, msg_) <- withSMPClient c rq "GET" $ \smp -> - (thVersion smp,) <$> getSMPMessage smp rcvPrivateKey rcvId + (thVersion $ thParams smp,) <$> getSMPMessage smp rcvPrivateKey rcvId mapM (decryptMeta v) msg_ where decryptMeta v msg@SMP.RcvMessage {msgId} = SMP.rcvMessageMeta msgId <$> decryptSMPMessage v rq msg diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index d9b79184f..0d1eb5f7b 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -28,7 +28,7 @@ module Simplex.Messaging.Client ( -- * Connect (disconnect) client to (from) SMP server TransportSession, - ProtocolClient (thVersion, sessionId, sessionTs), + ProtocolClient (thParams, sessionTs), SMPClient, getProtocolClient, closeProtocolClient, @@ -119,13 +119,9 @@ import System.Timeout (timeout) -- Use 'getSMPClient' to connect to an SMP server and create a client handle. data ProtocolClient err msg = ProtocolClient { action :: Maybe (Async ()), - sessionId :: SessionId, + thParams :: THandleParams, sessionTs :: UTCTime, - thVersion :: Version, - thAuth :: Maybe THandleAuth, timeoutPerBlock :: Int, - blockSize :: Int, - batch :: Bool, client_ :: PClient err msg } @@ -153,13 +149,17 @@ clientStub sessionId thVersion thAuth = do return ProtocolClient { action = Nothing, - sessionId, + thParams = + THandleParams + { sessionId, + thVersion, + thAuth, + blockSize = smpBlockSize, + encrypt = thVersion >= encryptTransmissionSMPVersion, + batch = True + }, sessionTs = undefined, - thVersion, - thAuth, timeoutPerBlock = undefined, - blockSize = smpBlockSize, - batch = undefined, client_ = PClient { connected, @@ -373,10 +373,10 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize ks <- atomically $ C.generateKeyPair g runExceptT (protocolClientHandshake @err @msg h ks (keyHash srv) serverVRange) >>= \case Left e -> atomically . putTMVar cVar . Left $ PCETransportError e - Right th@THandle {sessionId, thVersion, thAuth, blockSize, batch} -> do + Right th@THandle {params} -> do sessionTs <- getCurrentTime - let timeoutPerBlock = (blockSize * tcpTimeoutPerKb) `div` 1024 - c' = ProtocolClient {action = Nothing, client_ = c, sessionId, thVersion, thAuth, sessionTs, timeoutPerBlock, blockSize, batch} + let timeoutPerBlock = (blockSize params * tcpTimeoutPerKb) `div` 1024 + c' = ProtocolClient {action = Nothing, client_ = c, thParams = params, sessionTs, timeoutPerBlock} atomically $ do writeTVar (connected c) True putTMVar cVar $ Right c' @@ -521,7 +521,7 @@ writeSMPMessage :: SMPClient -> RecipientId -> BrokerMsg -> IO () writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c rId msg) (msgQ $ client_ c) serverTransmission :: ProtocolClient err msg -> RecipientId -> msg -> ServerTransmission msg -serverTransmission ProtocolClient {thVersion, sessionId, client_ = PClient {transportSession}} entityId message = +serverTransmission ProtocolClient {thParams = THandleParams {thVersion, sessionId}, client_ = PClient {transportSession}} entityId message = (transportSession, thVersion, sessionId, entityId, message) -- | Get message from SMP queue. The server returns ERR PROHIBITED if a client uses SUB and GET via the same transport connection for the same queue @@ -639,7 +639,7 @@ type PCTransmission err msg = (Either TransportError SentRawTransmission, Reques -- | Send multiple commands with batching and collect responses sendProtocolCommands :: forall err msg. ProtocolEncoding err (ProtoCommand msg) => ProtocolClient err msg -> NonEmpty (ClientCommand msg) -> IO (NonEmpty (Response err msg)) -sendProtocolCommands c@ProtocolClient {batch, blockSize} cs = do +sendProtocolCommands c@ProtocolClient {thParams = THandleParams {batch, blockSize}} cs = do bs <- batchTransmissions' batch blockSize <$> mapM (mkTransmission c) cs validate . concat =<< mapM (sendBatch c) bs where @@ -656,7 +656,7 @@ sendProtocolCommands c@ProtocolClient {batch, blockSize} cs = do diff = L.length cs - length rs streamProtocolCommands :: forall err msg. ProtocolEncoding err (ProtoCommand msg) => ProtocolClient err msg -> NonEmpty (ClientCommand msg) -> ([Response err msg] -> IO ()) -> IO () -streamProtocolCommands c@ProtocolClient {batch, blockSize} cs cb = do +streamProtocolCommands c@ProtocolClient {thParams = THandleParams {batch, blockSize}} cs cb = do bs <- batchTransmissions' batch blockSize <$> mapM (mkTransmission c) cs mapM_ (cb <=< sendBatch c) bs @@ -677,7 +677,7 @@ sendBatch c@ProtocolClient {client_ = PClient {sndQ}} b = do -- | Send Protocol command sendProtocolCommand :: forall err msg. ProtocolEncoding err (ProtoCommand msg) => ProtocolClient err msg -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg -sendProtocolCommand c@ProtocolClient {client_ = PClient {sndQ}, batch, blockSize} pKey entId cmd = +sendProtocolCommand c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} pKey entId cmd = ExceptT $ uncurry sendRecv =<< mkTransmission c (pKey, entId, cmd) where -- two separate "atomically" needed to avoid blocking @@ -702,11 +702,12 @@ getResponse ProtocolClient {client_ = PClient {tcpTimeout, pingErrorCount}} Requ pure Response {entityId, response} mkTransmission :: forall err msg. ProtocolEncoding err (ProtoCommand msg) => ProtocolClient err msg -> ClientCommand msg -> IO (PCTransmission err msg) -mkTransmission ProtocolClient {sessionId, thVersion = v, thAuth, client_ = PClient {clientCorrId, sentCommands}} (pKey_, entId, cmd) = do +mkTransmission ProtocolClient {thParams, client_ = PClient {clientCorrId, sentCommands}} (pKey_, entId, cmd) = do corrId <- atomically getNextCorrId - let t = authTransmission thAuth pKey_ corrId $ encodeTransmission v sessionId (corrId, entId, cmd) + let ClntTransmission {tForAuth, tToSend} = encodeClntTransmission thParams (corrId, entId, cmd) + auth = authTransmission (thAuth thParams) pKey_ corrId tForAuth r <- atomically $ mkRequest corrId - pure (t, r) + pure ((,tToSend) <$> auth, r) where getNextCorrId :: STM CorrId getNextCorrId = do @@ -718,8 +719,8 @@ mkTransmission ProtocolClient {sessionId, thVersion = v, thAuth, client_ = PClie TM.insert corrId r sentCommands pure r -authTransmission :: Maybe THandleAuth -> Maybe C.APrivateAuthKey -> CorrId -> ByteString -> Either TransportError SentRawTransmission -authTransmission thAuth pKey_ (CorrId corrId) t = (,t) <$> traverse authenticate pKey_ +authTransmission :: Maybe THandleAuth -> Maybe C.APrivateAuthKey -> CorrId -> ByteString -> Either TransportError (Maybe TransmissionAuth) +authTransmission thAuth pKey_ (CorrId corrId) t = traverse authenticate pKey_ where authenticate :: C.APrivateAuthKey -> Either TransportError TransmissionAuth authenticate (C.APrivateAuthKey a pk) = case a of diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 26c935f1c..0822f8d2d 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -43,12 +43,12 @@ import Simplex.Messaging.Notifications.Server.Stats import Simplex.Messaging.Notifications.Server.Store import Simplex.Messaging.Notifications.Server.StoreLog import Simplex.Messaging.Notifications.Transport -import Simplex.Messaging.Protocol (ErrorType (..), ProtocolServer (host), SMPServer, SignedTransmission, Transmission, encodeTransmission, tGet, tPut) +import Simplex.Messaging.Protocol (ErrorType (..), ProtocolServer (host), SMPServer, SignedTransmission, Transmission, encodeSrvTransmission, tGet, tPut) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server import Simplex.Messaging.Server.Stats import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Transport (ATransport (..), THandle (..), THandleAuth (..), TProxy, Transport (..)) +import Simplex.Messaging.Transport (ATransport (..), THandle (..), THandleAuth (..), THandleParams (..), TProxy, Transport (..)) import Simplex.Messaging.Transport.Server (runTransportServer) import Simplex.Messaging.Util import System.Exit (exitFailure) @@ -337,10 +337,10 @@ updateTknStatus NtfTknData {ntfTknId, tknStatus} status = do when (old /= status) $ withNtfLog $ \sl -> logTokenStatus sl ntfTknId status runNtfClientTransport :: Transport c => THandle c -> M () -runNtfClientTransport th@THandle {sessionId} = do +runNtfClientTransport th@THandle {params} = do qSize <- asks $ clientQSize . config ts <- liftIO getSystemTime - c <- atomically $ newNtfServerClient qSize sessionId ts + c <- atomically $ newNtfServerClient qSize params ts s <- asks subscriber ps <- asks pushServer expCfg <- asks $ inactiveClientExpiration . config @@ -354,7 +354,7 @@ clientDisconnected :: NtfServerClient -> IO () clientDisconnected NtfServerClient {connected} = atomically $ writeTVar connected False receive :: Transport c => THandle c -> NtfServerClient -> M () -receive th@THandle {thAuth} NtfServerClient {rcvQ, sndQ, rcvActiveAt} = forever $ do +receive th@THandle {params = THandleParams {thAuth}} NtfServerClient {rcvQ, sndQ, rcvActiveAt} = forever $ do ts <- liftIO $ tGet th forM_ ts $ \t@(_, _, (corrId, entId, cmdOrError)) -> do atomically . writeTVar rcvActiveAt =<< liftIO getSystemTime @@ -369,9 +369,9 @@ receive th@THandle {thAuth} NtfServerClient {rcvQ, sndQ, rcvActiveAt} = forever write q t = atomically $ writeTBQueue q t send :: Transport c => THandle c -> NtfServerClient -> IO () -send h@THandle {thVersion = v} NtfServerClient {sndQ, sessionId, sndActiveAt} = forever $ do +send h@THandle {params} NtfServerClient {sndQ, sndActiveAt} = forever $ do t <- atomically $ readTBQueue sndQ - void . liftIO $ tPut h [Right (Nothing, encodeTransmission v sessionId t)] + void . liftIO $ tPut h [Right (Nothing, encodeSrvTransmission params t)] atomically . writeTVar sndActiveAt =<< liftIO getSystemTime -- instance Show a => Show (TVar a) where diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index 5dc580245..ec2290b40 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -12,7 +12,6 @@ import Control.Concurrent.Async (Async) import Control.Logger.Simple import Control.Monad.IO.Unlift import Crypto.Random -import Data.ByteString.Char8 (ByteString) import Data.Int (Int64) import Data.List.NonEmpty (NonEmpty) import Data.Time.Clock (getCurrentTime) @@ -33,7 +32,7 @@ import Simplex.Messaging.Protocol (CorrId, SMPServer, Transmission) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Transport (ATransport) +import Simplex.Messaging.Transport (ATransport, THandleParams) import Simplex.Messaging.Transport.Server (TransportServerConfig, loadFingerprint, loadTLSServerParams) import Simplex.Messaging.Version (VersionRange) import System.IO (IOMode (..)) @@ -160,17 +159,17 @@ data NtfRequest data NtfServerClient = NtfServerClient { rcvQ :: TBQueue NtfRequest, sndQ :: TBQueue (Transmission NtfResponse), - sessionId :: ByteString, + ntfThParams :: THandleParams, connected :: TVar Bool, rcvActiveAt :: TVar SystemTime, sndActiveAt :: TVar SystemTime } -newNtfServerClient :: Natural -> ByteString -> SystemTime -> STM NtfServerClient -newNtfServerClient qSize sessionId ts = do +newNtfServerClient :: Natural -> THandleParams -> SystemTime -> STM NtfServerClient +newNtfServerClient qSize ntfThParams ts = do rcvQ <- newTBQueue qSize sndQ <- newTBQueue qSize connected <- newTVar True rcvActiveAt <- newTVar ts sndActiveAt <- newTVar ts - return NtfServerClient {rcvQ, sndQ, sessionId, connected, rcvActiveAt, sndActiveAt} + return NtfServerClient {rcvQ, sndQ, ntfThParams, connected, rcvActiveAt, sndActiveAt} diff --git a/src/Simplex/Messaging/Notifications/Transport.hs b/src/Simplex/Messaging/Notifications/Transport.hs index af78152a3..780edd811 100644 --- a/src/Simplex/Messaging/Notifications/Transport.hs +++ b/src/Simplex/Messaging/Notifications/Transport.hs @@ -18,14 +18,17 @@ import Simplex.Messaging.Version ntfBlockSize :: Int ntfBlockSize = 512 +encryptTransmissionNTFVersion :: Version +encryptTransmissionNTFVersion = 2 + authEncryptCmdsNTFVersion :: Version -authEncryptCmdsNTFVersion = 2 +authEncryptCmdsNTFVersion = 3 currentClientNTFVersion :: Version -currentClientNTFVersion = 1 +currentClientNTFVersion = 2 currentServerNTFVersion :: Version -currentServerNTFVersion = 1 +currentServerNTFVersion = 2 supportedClientNTFVRange :: VersionRange supportedClientNTFVRange = mkVersionRange 1 currentClientNTFVersion @@ -78,7 +81,7 @@ encodeNtfAuthPubKey v k -- | Notifcations server transport handshake. ntfServerHandshake :: forall c. Transport c => c -> C.KeyPairX25519 -> C.KeyHash -> VersionRange -> ExceptT TransportError IO (THandle c) ntfServerHandshake c (k, pk) kh ntfVRange = do - let th@THandle {sessionId} = ntfTHandle c + let th@THandle {params = THandleParams {sessionId}} = ntfTHandle c sendHandshake th $ NtfServerHandshake {sessionId, ntfVersionRange = ntfVRange, authPubKey = Just k} getHandshake th >>= \case NtfClientHandshake {ntfVersion = v, keyHash, authPubKey = k'} @@ -91,7 +94,7 @@ ntfServerHandshake c (k, pk) kh ntfVRange = do -- | Notifcations server client transport handshake. ntfClientHandshake :: forall c. Transport c => c -> C.KeyPairX25519 -> C.KeyHash -> VersionRange -> ExceptT TransportError IO (THandle c) ntfClientHandshake c (k, pk) keyHash ntfVRange = do - let th@THandle {sessionId} = ntfTHandle c + let th@THandle {params = THandleParams {sessionId}} = ntfTHandle c NtfServerHandshake {sessionId = sessId, ntfVersionRange, authPubKey = k'} <- getHandshake th if sessionId /= sessId then throwError TEBadSession @@ -102,10 +105,13 @@ ntfClientHandshake c (k, pk) keyHash ntfVRange = do Nothing -> throwError $ TEHandshake VERSION ntfThHandle :: forall c. THandle c -> Version -> C.PrivateKeyX25519 -> Maybe C.PublicKeyX25519 -> THandle c -ntfThHandle th v pk k_ = +ntfThHandle th@THandle {params} v pk k_ = -- TODO drop SMP v6: make thAuth non-optional let thAuth = (\k -> THandleAuth {peerPubKey = k, privKey = pk, dhSecret = C.dh' k pk}) <$> k_ - in (th :: THandle c) {thVersion = v, thAuth} + params' = params {thVersion = v, thAuth, encrypt = v >= encryptTransmissionNTFVersion} + in (th :: THandle c) {params = params'} ntfTHandle :: Transport c => c -> THandle c -ntfTHandle c = THandle {connection = c, sessionId = tlsUnique c, blockSize = ntfBlockSize, thVersion = 0, thAuth = Nothing, batch = False} +ntfTHandle c = THandle {connection = c, params} + where + params = THandleParams {sessionId = tlsUnique c, blockSize = ntfBlockSize, thVersion = 0, thAuth = Nothing, encrypt = False, batch = False} diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 1624735bc..8bdb03fbf 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -125,7 +125,9 @@ module Simplex.Messaging.Protocol -- * Parse and serialize ProtocolMsgTag (..), messageTagP, - encodeTransmission, + ClntTransmission (..), + encodeClntTransmission, + encodeSrvTransmission, transmissionP, _smpP, encodeRcvMsgBody, @@ -1066,18 +1068,19 @@ data CommandError deriving (Eq, Read, Show) -- | SMP transmission parser. -transmissionP :: Parser RawTransmission -transmissionP = do +transmissionP :: THandleParams -> Parser RawTransmission +transmissionP THandleParams {sessionId, encrypt} = do authenticator <- smpP authorized <- A.takeByteString either fail pure $ parseAll (trn authenticator authorized) authorized where trn authenticator authorized = do - sessId <- smpP + sessId <- if encrypt then pure "" else smpP + let authorized' = if encrypt then smpEncode sessionId <> authorized else authorized corrId <- smpP entityId <- smpP command <- A.takeByteString - pure RawTransmission {authenticator, authorized, sessId, corrId, entityId, command} + pure RawTransmission {authenticator, authorized = authorized', sessId, corrId, entityId, command} class (ProtocolEncoding err msg, ProtocolEncoding err (ProtoCommand msg), Show err, Show msg) => Protocol err msg | msg -> err where type ProtoCommand msg = cmd | cmd -> msg @@ -1312,7 +1315,7 @@ instance Encoding CommandError where -- | Send signed SMP transmission to TCP transport. tPut :: Transport c => THandle c -> NonEmpty (Either TransportError SentRawTransmission) -> IO [Either TransportError ()] -tPut th = fmap concat . mapM tPutBatch . batchTransmissions (batch th) (blockSize th) +tPut th@THandle {params} = fmap concat . mapM tPutBatch . batchTransmissions (batch params) (blockSize params) where tPutBatch :: TransportBatch () -> IO [Either TransportError ()] tPutBatch = \case @@ -1379,22 +1382,40 @@ tEncodeBatch1 :: SentRawTransmission -> ByteString tEncodeBatch1 t = lenEncode 1 `B.cons` tEncodeForBatch t {-# INLINE tEncodeBatch1 #-} -encodeTransmission :: ProtocolEncoding e c => Version -> ByteString -> Transmission c -> ByteString -encodeTransmission v sessionId (CorrId corrId, queueId, command) = - smpEncode (sessionId, corrId, queueId) <> encodeProtocol v command -{-# INLINE encodeTransmission #-} +-- tForAuth is lazy to avoid computing it when there is no key to sign +data ClntTransmission = ClntTransmission {tForAuth :: ~ByteString, tToSend :: ByteString} + +encodeClntTransmission :: ProtocolEncoding e c => THandleParams -> Transmission c -> ClntTransmission +encodeClntTransmission THandleParams {thVersion = v, sessionId, encrypt} t = + ClntTransmission {tForAuth, tToSend = if encrypt then t' else tForAuth} + where + tForAuth = smpEncode sessionId <> t' + t' = encodeTransmission_ v t +{-# INLINE encodeClntTransmission #-} + +encodeSrvTransmission :: ProtocolEncoding e c => THandleParams -> Transmission c -> ByteString +encodeSrvTransmission THandleParams {thVersion = v, sessionId, encrypt} t = + if encrypt then t' else smpEncode sessionId <> t' + where + t' = encodeTransmission_ v t +{-# INLINE encodeSrvTransmission #-} + +encodeTransmission_ :: ProtocolEncoding e c => Version -> Transmission c -> ByteString +encodeTransmission_ v (CorrId corrId, queueId, command) = + smpEncode (corrId, queueId) <> encodeProtocol v command +{-# INLINE encodeTransmission_ #-} -- | Receive and parse transmission from the TCP transport (ignoring any trailing padding). tGetParse :: Transport c => THandle c -> IO (NonEmpty (Either TransportError RawTransmission)) -tGetParse th = eitherList (tParse $ batch th) <$> tGetBlock th +tGetParse th@THandle {params} = eitherList (tParse params) <$> tGetBlock th {-# INLINE tGetParse #-} -tParse :: Bool -> ByteString -> NonEmpty (Either TransportError RawTransmission) -tParse batch s +tParse :: THandleParams -> ByteString -> NonEmpty (Either TransportError RawTransmission) +tParse thParams@THandleParams {batch} s | batch = eitherList (L.map (\(Large t) -> tParse1 t)) ts | otherwise = [tParse1 s] where - tParse1 = parse transmissionP TEBadBlock + tParse1 = parse (transmissionP thParams) TEBadBlock ts = parse smpP TEBadBlock s eitherList :: (a -> NonEmpty (Either e b)) -> Either e a -> NonEmpty (Either e b) @@ -1402,12 +1423,12 @@ eitherList = either (\e -> [Left e]) -- | Receive client and server transmissions (determined by `cmd` type). tGet :: forall err cmd c. (ProtocolEncoding err cmd, Transport c) => THandle c -> IO (NonEmpty (SignedTransmission err cmd)) -tGet th@THandle {sessionId, thVersion = v} = L.map (tDecodeParseValidate sessionId v) <$> tGetParse th +tGet th@THandle {params} = L.map (tDecodeParseValidate params) <$> tGetParse th -tDecodeParseValidate :: forall err cmd. ProtocolEncoding err cmd => SessionId -> Version -> Either TransportError RawTransmission -> SignedTransmission err cmd -tDecodeParseValidate sessionId v = \case +tDecodeParseValidate :: forall err cmd. ProtocolEncoding err cmd => THandleParams -> Either TransportError RawTransmission -> SignedTransmission err cmd +tDecodeParseValidate THandleParams {sessionId, thVersion = v, encrypt} = \case Right RawTransmission {authenticator, authorized, sessId, corrId, entityId, command} - | sessId == sessionId -> + | encrypt || sessId == sessionId -> let decodedTransmission = (,corrId,entityId,command) <$> decodeTAuthBytes authenticator in either (const $ tError corrId) (tParseValidate authorized) decodedTransmission | otherwise -> (Nothing, "", (CorrId corrId, "", Left $ fromProtocolError @err @cmd PESession)) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 279af86fc..b79398bdd 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -371,7 +371,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do CPSkip -> pure () runClientTransport :: Transport c => THandle c -> M () -runClientTransport th@THandle {thVersion, sessionId} = do +runClientTransport th@THandle {params = THandleParams {thVersion, sessionId}} = do q <- asks $ tbqSize . config ts <- liftIO getSystemTime active <- asks clients @@ -416,7 +416,7 @@ cancelSub sub = _ -> return () receive :: Transport c => THandle c -> Client -> M () -receive th@THandle {thAuth} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do +receive th@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive" forever $ do ts <- L.toList <$> liftIO (tGet th) @@ -437,12 +437,12 @@ receive th@THandle {thAuth} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty send :: Transport c => THandle c -> Client -> IO () -send h@THandle {thVersion = v} Client {sndQ, sessionId, sndActiveAt} = do +send h@THandle {params} Client {sndQ, sessionId, sndActiveAt} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send" forever $ do ts <- atomically $ L.sortWith tOrder <$> readTBQueue sndQ -- TODO we can authorize responses as well - void . liftIO . tPut h $ L.map (\t -> Right (Nothing, encodeTransmission v sessionId t)) ts + void . liftIO . tPut h $ L.map (\t -> Right (Nothing, encodeSrvTransmission params t)) ts atomically . writeTVar sndActiveAt =<< liftIO getSystemTime where tOrder :: Transmission BrokerMsg -> Int @@ -452,7 +452,7 @@ send h@THandle {thVersion = v} Client {sndQ, sessionId, sndActiveAt} = do _ -> 1 disconnectTransport :: Transport c => THandle c -> TVar SystemTime -> TVar SystemTime -> ExpirationConfig -> IO Bool -> IO () -disconnectTransport THandle {connection, sessionId} rcvActiveAt sndActiveAt expCfg noSubscriptions = do +disconnectTransport THandle {connection, params = THandleParams {sessionId}} rcvActiveAt sndActiveAt expCfg noSubscriptions = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " disconnectTransport" loop where diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index c20037684..4ae363313 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -33,6 +33,7 @@ module Simplex.Messaging.Transport currentServerSMPRelayVersion, basicAuthSMPVersion, subModeSMPVersion, + encryptTransmissionSMPVersion, authEncryptCmdsSMPVersion, simplexMQVersion, smpBlockSize, @@ -54,6 +55,7 @@ module Simplex.Messaging.Transport -- * SMP transport THandle (..), + THandleParams (..), THandleAuth (..), TransportError (..), HandshakeError (..), @@ -120,14 +122,17 @@ basicAuthSMPVersion = 5 subModeSMPVersion :: Version subModeSMPVersion = 6 +encryptTransmissionSMPVersion :: Version +encryptTransmissionSMPVersion = 7 + authEncryptCmdsSMPVersion :: Version -authEncryptCmdsSMPVersion = 7 +authEncryptCmdsSMPVersion = 8 currentClientSMPRelayVersion :: Version -currentClientSMPRelayVersion = 6 +currentClientSMPRelayVersion = 7 currentServerSMPRelayVersion :: Version -currentServerSMPRelayVersion = 6 +currentServerSMPRelayVersion = 7 -- minimal supported protocol version is 4 -- TODO remove code that supports sending commands without batching @@ -274,14 +279,22 @@ instance Transport TLS where -- | The handle for SMP encrypted transport connection over Transport. data THandle c = THandle { connection :: c, - sessionId :: SessionId, + params :: THandleParams + } + +data THandleParams = THandleParams + { sessionId :: SessionId, blockSize :: Int, -- | agreed server protocol version thVersion :: Version, -- | peer public key for command authorization and shared secrets for entity ID encryption thAuth :: Maybe THandleAuth, + -- | additionally encrypt transmission inside transport protocol + -- to protect transmission from sending proxies + -- based on protocol version + encrypt :: Bool, -- | send multiple transmissions in a single block - -- based on protocol and protocol version + -- based on protocol version batch :: Bool } @@ -381,13 +394,13 @@ serializeTransportError = \case -- | Pad and send block to SMP transport. tPutBlock :: Transport c => THandle c -> ByteString -> IO (Either TransportError ()) -tPutBlock THandle {connection = c, blockSize} block = +tPutBlock THandle {connection = c, params = THandleParams {blockSize}} block = bimapM (const $ pure TELargeMsg) (cPut c) $ C.pad block blockSize -- | Receive block from SMP transport. tGetBlock :: Transport c => THandle c -> IO (Either TransportError ByteString) -tGetBlock THandle {connection = c, blockSize} = do +tGetBlock THandle {connection = c, params = THandleParams {blockSize}} = do msg <- cGet c blockSize if B.length msg == blockSize then pure . first (const TELargeMsg) $ C.unPad msg @@ -398,7 +411,7 @@ tGetBlock THandle {connection = c, blockSize} = do -- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#appendix-a smpServerHandshake :: forall c. Transport c => c -> C.KeyPairX25519 -> C.KeyHash -> VersionRange -> ExceptT TransportError IO (THandle c) smpServerHandshake c (k, pk) kh smpVRange = do - let th@THandle {sessionId} = smpTHandle c + let th@THandle {params = THandleParams {sessionId}} = smpTHandle c sendHandshake th $ ServerHandshake {sessionId, smpVersionRange = smpVRange, authPubKey = Just k} getHandshake th >>= \case ClientHandshake {smpVersion = v, keyHash, authPubKey = k'} @@ -413,7 +426,7 @@ smpServerHandshake c (k, pk) kh smpVRange = do -- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#appendix-a smpClientHandshake :: forall c. Transport c => c -> C.KeyPairX25519 -> C.KeyHash -> VersionRange -> ExceptT TransportError IO (THandle c) smpClientHandshake c (k, pk) keyHash smpVRange = do - let th@THandle {sessionId} = smpTHandle c + let th@THandle {params = THandleParams {sessionId}} = smpTHandle c ServerHandshake {sessionId = sessId, smpVersionRange, authPubKey = k'} <- getHandshake th if sessionId /= sessId then throwE TEBadSession @@ -424,10 +437,11 @@ smpClientHandshake c (k, pk) keyHash smpVRange = do Nothing -> throwE $ TEHandshake VERSION smpThHandle :: forall c. THandle c -> Version -> C.PrivateKeyX25519 -> Maybe C.PublicKeyX25519 -> THandle c -smpThHandle th v pk k_ = +smpThHandle th@THandle {params} v pk k_ = -- TODO drop SMP v6: make thAuth non-optional let thAuth = (\k -> THandleAuth {peerPubKey = k, privKey = pk, dhSecret = C.dh' k pk}) <$> k_ - in (th :: THandle c) {thVersion = v, thAuth, batch = v >= batchCmdsSMPVersion} + params' = params {thVersion = v, thAuth, encrypt = v >= encryptTransmissionSMPVersion, batch = v >= batchCmdsSMPVersion} + in (th :: THandle c) {params = params'} sendHandshake :: (Transport c, Encoding smp) => THandle c -> smp -> ExceptT TransportError IO () sendHandshake th = ExceptT . tPutBlock th . smpEncode @@ -437,7 +451,9 @@ getHandshake :: (Transport c, Encoding smp) => THandle c -> ExceptT TransportErr getHandshake th = ExceptT $ (first (\_ -> TEHandshake PARSE) . A.parseOnly smpP =<<) <$> tGetBlock th smpTHandle :: Transport c => c -> THandle c -smpTHandle c = THandle {connection = c, sessionId = tlsUnique c, blockSize = smpBlockSize, thVersion = 0, thAuth = Nothing, batch = False} +smpTHandle c = THandle {connection = c, params} + where + params = THandleParams {sessionId = tlsUnique c, blockSize = smpBlockSize, thVersion = 0, thAuth = Nothing, encrypt = False, batch = False} $(J.deriveJSON (sumTypeJSON id) ''HandshakeError) diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 347ec75f4..d4508d6f3 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -30,7 +30,7 @@ module AgentTests.FunctionalAPITests (##>), (=##>), pattern Msg, - agentCfgV7, + agentCfgV8, ) where @@ -51,7 +51,7 @@ import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Data.Type.Equality import qualified Database.SQLite.Simple as SQL import SMPAgentClient -import SMPClient (cfg, testPort, testPort2, testStoreLogFile2, withSmpServer, withSmpServerV7, withSmpServerConfigOn, withSmpServerOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn) +import SMPClient (cfg, testPort, testPort2, testStoreLogFile2, withSmpServer, withSmpServerV8, withSmpServerConfigOn, withSmpServerOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn) import Simplex.Messaging.Agent import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..)) import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), createAgentStore) @@ -147,8 +147,8 @@ agentCfgVPrev = smpCfg = smpCfgVPrev } -agentCfgV7 :: AgentConfig -agentCfgV7 = +agentCfgV8 :: AgentConfig +agentCfgV8 = agentCfg { cmdAuthAlg = C.AuthAlg C.SX25519, smpCfg = smpCfgV7, @@ -380,10 +380,10 @@ canCreateQueue allowNew (srvAuth, srvVersion) (clntAuth, clntVersion) = testMatrix2 :: ATransport -> (AgentClient -> AgentClient -> AgentMsgId -> IO ()) -> Spec testMatrix2 t runTest = do - it "v7" $ withSmpServerV7 t $ runTestCfg2 agentCfgV7 agentCfgV7 3 runTest - it "v7 to current" $ withSmpServerV7 t $ runTestCfg2 agentCfgV7 agentCfg 3 runTest - it "current to v7" $ withSmpServerV7 t $ runTestCfg2 agentCfg agentCfgV7 3 runTest - it "current with v7 server" $ withSmpServerV7 t $ runTestCfg2 agentCfg agentCfg 3 runTest + it "v8" $ withSmpServerV8 t $ runTestCfg2 agentCfgV8 agentCfgV8 3 runTest + it "v8 to current" $ withSmpServerV8 t $ runTestCfg2 agentCfgV8 agentCfg 3 runTest + it "current to v8" $ withSmpServerV8 t $ runTestCfg2 agentCfg agentCfgV8 3 runTest + it "current with v8 server" $ withSmpServerV8 t $ runTestCfg2 agentCfg agentCfg 3 runTest it "current" $ withSmpServer t $ runTestCfg2 agentCfg agentCfg 3 runTest it "prev" $ withSmpServer t $ runTestCfg2 agentCfgVPrev agentCfgVPrev 3 runTest it "prev to current" $ withSmpServer t $ runTestCfg2 agentCfgVPrev agentCfg 3 runTest diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index 1ccfb9a69..1d93e50b5 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -12,7 +12,7 @@ module AgentTests.NotificationTests where -- import Control.Logger.Simple (LogConfig (..), LogLevel (..), setLogLevel, withGlobalLogging) -import AgentTests.FunctionalAPITests (agentCfgV7, exchangeGreetingsMsgId, get, getSMPAgentClient', makeConnection, nGet, runRight, runRight_, switchComplete, testServerMatrix2, withAgentClientsCfg2, (##>), (=##>), pattern Msg) +import AgentTests.FunctionalAPITests (agentCfgV8, exchangeGreetingsMsgId, get, getSMPAgentClient', makeConnection, nGet, runRight, runRight_, switchComplete, testServerMatrix2, withAgentClientsCfg2, (##>), (=##>), pattern Msg) import Control.Concurrent (ThreadId, killThread, threadDelay) import Control.Monad import Control.Monad.Except @@ -26,7 +26,7 @@ import Data.ByteString.Char8 (ByteString) import Data.Text.Encoding (encodeUtf8) import NtfClient import SMPAgentClient (agentCfg, initAgentServers, initAgentServers2, testDB, testDB2, testDB3, testNtfServer2) -import SMPClient (cfg, cfgV7, testPort, testPort2, testStoreLogFile2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn) +import SMPClient (cfg, cfgV8, testPort, testPort2, testStoreLogFile2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn) import Simplex.Messaging.Agent import Simplex.Messaging.Agent.Client (withStore') import Simplex.Messaging.Agent.Env.SQLite (AgentConfig, Env (..), InitialAgentServers) @@ -115,17 +115,31 @@ notificationTests t = do testNtfMatrix :: ATransport -> (APNSMockServer -> AgentClient -> AgentClient -> IO ()) -> Spec testNtfMatrix t runTest = do - it "new servers: SMP v7, NTF v2; new clients: v7/v2" $ runNtfTestCfg t cfgV7 ntfServerCfgV2 agentCfgV7 agentCfgV7 runTest - it "new servers: SMP v7, NTF v2; old clients: v6/v1" $ runNtfTestCfg t cfgV7 ntfServerCfgV2 agentCfg agentCfg runTest - it "old servers: SMP v6, NTF v1; old clients: v6/v1" $ runNtfTestCfg t cfg ntfServerCfg agentCfg agentCfg runTest - -- this case will cannot be supported - see RFC - xit "servers: SMP v6, NTF v1; clients: v7/v2 (not supported)" $ runNtfTestCfg t cfg ntfServerCfg agentCfgV7 agentCfgV7 runTest - -- servers can be migrated in any order - it "servers: new SMP v7, old NTF v1; old clients: v6/v1" $ runNtfTestCfg t cfgV7 ntfServerCfg agentCfg agentCfg runTest - it "servers: old SMP v6, new NTF v2; old clients: v6/v1" $ runNtfTestCfg t cfg ntfServerCfgV2 agentCfg agentCfg runTest - -- clients can be partially migrated - it "servers: new SMP v7, old NTF v2; clients: new/old" $ runNtfTestCfg t cfgV7 ntfServerCfgV2 agentCfgV7 agentCfg runTest - it "servers: new SMP v7, old NTF v2; clients: old/new" $ runNtfTestCfg t cfgV7 ntfServerCfgV2 agentCfg agentCfgV7 runTest + describe "next and current" $ do + it "next servers: SMP v8, NTF v3; next clients: v8/v3" $ runNtfTestCfg t cfgV8 ntfServerCfgV3 agentCfgV8 agentCfgV8 runTest + it "next servers: SMP v8, NTF v3; curr clients: v7/v2" $ runNtfTestCfg t cfgV8 ntfServerCfgV3 agentCfg agentCfg runTest + it "curr servers: SMP v7, NTF v2; curr clients: v7/v2" $ runNtfTestCfg t cfg ntfServerCfg agentCfg agentCfg runTest + -- this case will cannot be supported - see RFC + xit "servers: SMP v7, NTF v2; clients: v8/v3 (not supported)" $ runNtfTestCfg t cfg ntfServerCfg agentCfgV8 agentCfgV8 runTest + -- servers can be migrated in any order + it "servers: next SMP v8, curr NTF v2; curr clients: v7/v2" $ runNtfTestCfg t cfgV8 ntfServerCfg agentCfg agentCfg runTest + it "servers: curr SMP v7, next NTF v3; curr clients: v7/v2" $ runNtfTestCfg t cfg ntfServerCfgV3 agentCfg agentCfg runTest + -- clients can be partially migrated + it "servers: next SMP v8, curr NTF v3; clients: next/curr" $ runNtfTestCfg t cfgV8 ntfServerCfgV3 agentCfgV8 agentCfg runTest + it "servers: next SMP v8, curr NTF v3; clients: curr/new" $ runNtfTestCfg t cfgV8 ntfServerCfgV3 agentCfg agentCfgV8 runTest + describe "current and previous" $ do + it "curr servers: SMP v7, NTF v2; curr clients: v7/v2" $ runNtfTestCfg t cfgV8 ntfServerCfgV3 agentCfgV8 agentCfgV8 runTest + it "curr servers: SMP v7, NTF v2; prev clients: v6/v1" $ runNtfTestCfg t cfgV8 ntfServerCfgV3 agentCfg agentCfg runTest + it "prev servers: SMP v6, NTF v1; prev clients: v6/v1" $ runNtfTestCfg t cfg ntfServerCfg agentCfg agentCfg runTest + -- this case will cannot be supported - see RFC + xit "servers: SMP v6, NTF v1; clients: v7/v2 (not supported)" $ runNtfTestCfg t cfg ntfServerCfg agentCfgV8 agentCfgV8 runTest + -- servers can be migrated in any order + it "servers: curr SMP v7, prev NTF v1; prev clients: v6/v1" $ runNtfTestCfg t cfgV8 ntfServerCfg agentCfg agentCfg runTest + it "servers: prev SMP v6, curr NTF v2; prev clients: v6/v1" $ runNtfTestCfg t cfg ntfServerCfgV3 agentCfg agentCfg runTest + -- clients can be partially migrated + it "servers: curr SMP v7, prev NTF v2; clients: curr/prev" $ runNtfTestCfg t cfgV8 ntfServerCfgV3 agentCfgV8 agentCfg runTest + it "servers: curr SMP v7, prev NTF v2; clients: prev/new" $ runNtfTestCfg t cfgV8 ntfServerCfgV3 agentCfg agentCfgV8 runTest + runNtfTestCfg :: ATransport -> ServerConfig -> NtfServerConfig -> AgentConfig -> AgentConfig -> (APNSMockServer -> AgentClient -> AgentClient -> IO ()) -> IO () runNtfTestCfg t smpCfg ntfCfg aCfg bCfg runTest = diff --git a/tests/CoreTests/BatchingTests.hs b/tests/CoreTests/BatchingTests.hs index b0e9b1529..be4928672 100644 --- a/tests/CoreTests/BatchingTests.hs +++ b/tests/CoreTests/BatchingTests.hs @@ -1,5 +1,7 @@ {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE TupleSections #-} module CoreTests.BatchingTests (batchingTests) where @@ -19,48 +21,48 @@ import Test.Hspec batchingTests :: Spec batchingTests = do describe "batchTransmissions" $ do - describe "SMP v6 (current)" $ do - it "should batch with 90 subscriptions per batch" $ testBatchSubscriptions + describe "SMP v7 (current)" $ do + it "should batch with 109 subscriptions per batch" $ testBatchSubscriptions it "should break on message that does not fit" testBatchWithMessage it "should break on large message" testBatchWithLargeMessage - describe "v7 (next)" $ do - it "should batch with 110 subscriptions per batch" testBatchSubscriptionsV7 - it "should break on message that does not fit" testBatchWithMessageV7 - it "should break on large message" testBatchWithLargeMessageV7 + describe "v8 (next)" $ do + it "should batch with 142 subscriptions per batch" testBatchSubscriptionsV8 + it "should break on message that does not fit" testBatchWithMessageV8 + it "should break on large message" testBatchWithLargeMessageV8 describe "batchTransmissions'" $ do - describe "SMP v6 (current)" $ do - it "should batch with 90 subscriptions per batch" testClientBatchSubscriptions + describe "SMP v7 (current)" $ do + it "should batch with 109 subscriptions per batch" testClientBatchSubscriptions it "should break on message that does not fit" testClientBatchWithMessage it "should break on large message" testClientBatchWithLargeMessage - describe "v7 (next)" $ do - it "should batch with 110 subscriptions per batch" testClientBatchSubscriptionsV7 - it "should break on message that does not fit" testClientBatchWithMessageV7 - it "should break on large message" testClientBatchWithLargeMessageV7 + describe "v8 (next)" $ do + it "should batch with 142 subscriptions per batch" testClientBatchSubscriptionsV8 + it "should break on message that does not fit" testClientBatchWithMessageV8 + it "should break on large message" testClientBatchWithLargeMessageV8 testBatchSubscriptions :: IO () testBatchSubscriptions = do sessId <- atomically . C.randomBytes 32 =<< C.newRandom - subs <- replicateM 200 $ randomSUB sessId - let batches1 = batchTransmissions False smpBlockSize $ L.fromList subs - all lenOk1 batches1 `shouldBe` True - length batches1 `shouldBe` 200 - let batches = batchTransmissions True smpBlockSize $ L.fromList subs - length batches `shouldBe` 3 - [TBTransmissions s1 n1 _, TBTransmissions s2 n2 _, TBTransmissions s3 n3 _] <- pure batches - (n1, n2, n3) `shouldBe` (20, 90, 90) - all lenOk [s1, s2, s3] `shouldBe` True - -testBatchSubscriptionsV7 :: IO () -testBatchSubscriptionsV7 = do - sessId <- atomically . C.randomBytes 32 =<< C.newRandom - subs <- replicateM 250 $ randomSUBv7 sessId + subs <- replicateM 250 $ randomSUB sessId let batches1 = batchTransmissions False smpBlockSize $ L.fromList subs all lenOk1 batches1 `shouldBe` True length batches1 `shouldBe` 250 let batches = batchTransmissions True smpBlockSize $ L.fromList subs length batches `shouldBe` 3 [TBTransmissions s1 n1 _, TBTransmissions s2 n2 _, TBTransmissions s3 n3 _] <- pure batches - (n1, n2, n3) `shouldBe` (30, 110, 110) + (n1, n2, n3) `shouldBe` (32, 109, 109) + all lenOk [s1, s2, s3] `shouldBe` True + +testBatchSubscriptionsV8 :: IO () +testBatchSubscriptionsV8 = do + sessId <- atomically . C.randomBytes 32 =<< C.newRandom + subs <- replicateM 300 $ randomSUBv8 sessId + let batches1 = batchTransmissions False smpBlockSize $ L.fromList subs + all lenOk1 batches1 `shouldBe` True + length batches1 `shouldBe` 300 + let batches = batchTransmissions True smpBlockSize $ L.fromList subs + length batches `shouldBe` 3 + [TBTransmissions s1 n1 _, TBTransmissions s2 n2 _, TBTransmissions s3 n3 _] <- pure batches + (n1, n2, n3) `shouldBe` (16, 142, 142) all lenOk [s1, s2, s3] `shouldBe` True testBatchWithMessage :: IO () @@ -76,15 +78,15 @@ testBatchWithMessage = do let batches = batchTransmissions True smpBlockSize $ L.fromList cmds length batches `shouldBe` 2 [TBTransmissions s1 n1 _, TBTransmissions s2 n2 _] <- pure batches - (n1, n2) `shouldBe` (55, 46) + (n1, n2) `shouldBe` (45, 56) all lenOk [s1, s2] `shouldBe` True -testBatchWithMessageV7 :: IO () -testBatchWithMessageV7 = do +testBatchWithMessageV8 :: IO () +testBatchWithMessageV8 = do sessId <- atomically . C.randomBytes 32 =<< C.newRandom - subs1 <- replicateM 60 $ randomSUBv7 sessId - send <- randomSENDv7 sessId 8000 - subs2 <- replicateM 40 $ randomSUBv7 sessId + subs1 <- replicateM 60 $ randomSUBv8 sessId + send <- randomSENDv8 sessId 8000 + subs2 <- replicateM 40 $ randomSUBv8 sessId let cmds = subs1 <> [send] <> subs2 batches1 = batchTransmissions False smpBlockSize $ L.fromList cmds all lenOk1 batches1 `shouldBe` True @@ -92,7 +94,7 @@ testBatchWithMessageV7 = do let batches = batchTransmissions True smpBlockSize $ L.fromList cmds length batches `shouldBe` 2 [TBTransmissions s1 n1 _, TBTransmissions s2 n2 _] <- pure batches - (n1, n2) `shouldBe` (45, 56) + (n1, n2) `shouldBe` (29, 72) all lenOk [s1, s2] `shouldBe` True testBatchWithLargeMessage :: IO () @@ -100,26 +102,7 @@ testBatchWithLargeMessage = do sessId <- atomically . C.randomBytes 32 =<< C.newRandom subs1 <- replicateM 60 $ randomSUB sessId send <- randomSEND sessId 17000 - subs2 <- replicateM 100 $ randomSUB sessId - let cmds = subs1 <> [send] <> subs2 - batches1 = batchTransmissions False smpBlockSize $ L.fromList cmds - all lenOk1 batches1 `shouldBe` False - length batches1 `shouldBe` 161 - let batches1' = take 60 batches1 <> drop 61 batches1 - all lenOk1 batches1' `shouldBe` True - length batches1' `shouldBe` 160 - let batches = batchTransmissions True smpBlockSize $ L.fromList cmds - length batches `shouldBe` 4 - [TBTransmissions s1 n1 _, TBError TELargeMsg _, TBTransmissions s2 n2 _, TBTransmissions s3 n3 _] <- pure batches - (n1, n2, n3) `shouldBe` (60, 10, 90) - all lenOk [s1, s2, s3] `shouldBe` True - -testBatchWithLargeMessageV7 :: IO () -testBatchWithLargeMessageV7 = do - sessId <- atomically . C.randomBytes 32 =<< C.newRandom - subs1 <- replicateM 60 $ randomSUBv7 sessId - send <- randomSENDv7 sessId 17000 - subs2 <- replicateM 120 $ randomSUBv7 sessId + subs2 <- replicateM 120 $ randomSUB sessId let cmds = subs1 <> [send] <> subs2 batches1 = batchTransmissions False smpBlockSize $ L.fromList cmds all lenOk1 batches1 `shouldBe` False @@ -130,34 +113,53 @@ testBatchWithLargeMessageV7 = do let batches = batchTransmissions True smpBlockSize $ L.fromList cmds length batches `shouldBe` 4 [TBTransmissions s1 n1 _, TBError TELargeMsg _, TBTransmissions s2 n2 _, TBTransmissions s3 n3 _] <- pure batches - (n1, n2, n3) `shouldBe` (60, 10, 110) + (n1, n2, n3) `shouldBe` (60, 11, 109) + all lenOk [s1, s2, s3] `shouldBe` True + +testBatchWithLargeMessageV8 :: IO () +testBatchWithLargeMessageV8 = do + sessId <- atomically . C.randomBytes 32 =<< C.newRandom + subs1 <- replicateM 60 $ randomSUBv8 sessId + send <- randomSENDv8 sessId 17000 + subs2 <- replicateM 150 $ randomSUBv8 sessId + let cmds = subs1 <> [send] <> subs2 + batches1 = batchTransmissions False smpBlockSize $ L.fromList cmds + all lenOk1 batches1 `shouldBe` False + length batches1 `shouldBe` 211 + let batches1' = take 60 batches1 <> drop 61 batches1 + all lenOk1 batches1' `shouldBe` True + length batches1' `shouldBe` 210 + let batches = batchTransmissions True smpBlockSize $ L.fromList cmds + length batches `shouldBe` 4 + [TBTransmissions s1 n1 _, TBError TELargeMsg _, TBTransmissions s2 n2 _, TBTransmissions s3 n3 _] <- pure batches + (n1, n2, n3) `shouldBe` (60, 8, 142) all lenOk [s1, s2, s3] `shouldBe` True testClientBatchSubscriptions :: IO () testClientBatchSubscriptions = do sessId <- atomically . C.randomBytes 32 =<< C.newRandom client <- atomically $ clientStub sessId currentClientSMPRelayVersion Nothing - subs <- replicateM 200 $ randomSUBCmd client + subs <- replicateM 250 $ randomSUBCmd client let batches1 = batchTransmissions' False smpBlockSize $ L.fromList subs all lenOk1 batches1 `shouldBe` True let batches = batchTransmissions' True smpBlockSize $ L.fromList subs length batches `shouldBe` 3 [TBTransmissions s1 n1 rs1, TBTransmissions s2 n2 rs2, TBTransmissions s3 n3 rs3] <- pure batches - (n1, n2, n3) `shouldBe` (20, 90, 90) - (length rs1, length rs2, length rs3) `shouldBe` (20, 90, 90) + (n1, n2, n3) `shouldBe` (31, 110, 109) + (length rs1, length rs2, length rs3) `shouldBe` (31, 110, 109) all lenOk [s1, s2, s3] `shouldBe` True -testClientBatchSubscriptionsV7 :: IO () -testClientBatchSubscriptionsV7 = do - client <- clientStubV7 - subs <- replicateM 250 $ randomSUBCmdV7 client +testClientBatchSubscriptionsV8 :: IO () +testClientBatchSubscriptionsV8 = do + client <- clientStubV8 + subs <- replicateM 300 $ randomSUBCmdV8 client let batches1 = batchTransmissions' False smpBlockSize $ L.fromList subs all lenOk1 batches1 `shouldBe` True let batches = batchTransmissions' True smpBlockSize $ L.fromList subs length batches `shouldBe` 3 [TBTransmissions s1 n1 rs1, TBTransmissions s2 n2 rs2, TBTransmissions s3 n3 rs3] <- pure batches - (n1, n2, n3) `shouldBe` (30, 110, 110) - (length rs1, length rs2, length rs3) `shouldBe` (30, 110, 110) + (n1, n2, n3) `shouldBe` (16, 142, 142) + (length rs1, length rs2, length rs3) `shouldBe` (16, 142, 142) all lenOk [s1, s2, s3] `shouldBe` True testClientBatchWithMessage :: IO () @@ -174,16 +176,16 @@ testClientBatchWithMessage = do let batches = batchTransmissions' True smpBlockSize $ L.fromList cmds length batches `shouldBe` 2 [TBTransmissions s1 n1 rs1, TBTransmissions s2 n2 rs2] <- pure batches - (n1, n2) `shouldBe` (55, 46) - (length rs1, length rs2) `shouldBe` (55, 46) + (n1, n2) `shouldBe` (45, 56) + (length rs1, length rs2) `shouldBe` (45, 56) all lenOk [s1, s2] `shouldBe` True -testClientBatchWithMessageV7 :: IO () -testClientBatchWithMessageV7 = do - client <- clientStubV7 - subs1 <- replicateM 60 $ randomSUBCmdV7 client - send <- randomSENDCmdV7 client 8000 - subs2 <- replicateM 40 $ randomSUBCmdV7 client +testClientBatchWithMessageV8 :: IO () +testClientBatchWithMessageV8 = do + client <- clientStubV8 + subs1 <- replicateM 60 $ randomSUBCmdV8 client + send <- randomSENDCmdV8 client 8000 + subs2 <- replicateM 40 $ randomSUBCmdV8 client let cmds = subs1 <> [send] <> subs2 batches1 = batchTransmissions' False smpBlockSize $ L.fromList cmds all lenOk1 batches1 `shouldBe` True @@ -191,8 +193,8 @@ testClientBatchWithMessageV7 = do let batches = batchTransmissions' True smpBlockSize $ L.fromList cmds length batches `shouldBe` 2 [TBTransmissions s1 n1 rs1, TBTransmissions s2 n2 rs2] <- pure batches - (n1, n2) `shouldBe` (45, 56) - (length rs1, length rs2) `shouldBe` (45, 56) + (n1, n2) `shouldBe` (28, 73) + (length rs1, length rs2) `shouldBe` (28, 73) all lenOk [s1, s2] `shouldBe` True testClientBatchWithLargeMessage :: IO () @@ -201,36 +203,7 @@ testClientBatchWithLargeMessage = do client <- atomically $ clientStub sessId currentClientSMPRelayVersion Nothing subs1 <- replicateM 60 $ randomSUBCmd client send <- randomSENDCmd client 17000 - subs2 <- replicateM 100 $ randomSUBCmd client - let cmds = subs1 <> [send] <> subs2 - batches1 = batchTransmissions' False smpBlockSize $ L.fromList cmds - all lenOk1 batches1 `shouldBe` False - length batches1 `shouldBe` 161 - let batches1' = take 60 batches1 <> drop 61 batches1 - all lenOk1 batches1' `shouldBe` True - length batches1' `shouldBe` 160 - -- - let batches = batchTransmissions' True smpBlockSize $ L.fromList cmds - length batches `shouldBe` 4 - [TBTransmissions s1 n1 rs1, TBError TELargeMsg _, TBTransmissions s2 n2 rs2, TBTransmissions s3 n3 rs3] <- pure batches - (n1, n2, n3) `shouldBe` (60, 10, 90) - (length rs1, length rs2, length rs3) `shouldBe` (60, 10, 90) - all lenOk [s1, s2, s3] `shouldBe` True - -- - let cmds' = [send] <> subs1 <> subs2 - let batches' = batchTransmissions' True smpBlockSize $ L.fromList cmds' - length batches' `shouldBe` 3 - [TBError TELargeMsg _, TBTransmissions s1' n1' rs1', TBTransmissions s2' n2' rs2'] <- pure batches' - (n1', n2') `shouldBe` (70, 90) - (length rs1', length rs2') `shouldBe` (70, 90) - all lenOk [s1', s2'] `shouldBe` True - -testClientBatchWithLargeMessageV7 :: IO () -testClientBatchWithLargeMessageV7 = do - client <- clientStubV7 - subs1 <- replicateM 60 $ randomSUBCmdV7 client - send <- randomSENDCmdV7 client 17000 - subs2 <- replicateM 120 $ randomSUBCmdV7 client + subs2 <- replicateM 120 $ randomSUBCmd client let cmds = subs1 <> [send] <> subs2 batches1 = batchTransmissions' False smpBlockSize $ L.fromList cmds all lenOk1 batches1 `shouldBe` False @@ -242,20 +215,49 @@ testClientBatchWithLargeMessageV7 = do let batches = batchTransmissions' True smpBlockSize $ L.fromList cmds length batches `shouldBe` 4 [TBTransmissions s1 n1 rs1, TBError TELargeMsg _, TBTransmissions s2 n2 rs2, TBTransmissions s3 n3 rs3] <- pure batches - (n1, n2, n3) `shouldBe` (60, 10, 110) - (length rs1, length rs2, length rs3) `shouldBe` (60, 10, 110) + (n1, n2, n3) `shouldBe` (60, 11, 109) + (length rs1, length rs2, length rs3) `shouldBe` (60, 11, 109) all lenOk [s1, s2, s3] `shouldBe` True -- let cmds' = [send] <> subs1 <> subs2 let batches' = batchTransmissions' True smpBlockSize $ L.fromList cmds' length batches' `shouldBe` 3 [TBError TELargeMsg _, TBTransmissions s1' n1' rs1', TBTransmissions s2' n2' rs2'] <- pure batches' - (n1', n2') `shouldBe` (70, 110) - (length rs1', length rs2') `shouldBe` (70, 110) + (n1', n2') `shouldBe` (71, 109) + (length rs1', length rs2') `shouldBe` (71, 109) all lenOk [s1', s2'] `shouldBe` True -clientStubV7 :: IO (ProtocolClient ErrorType BrokerMsg) -clientStubV7 = do +testClientBatchWithLargeMessageV8 :: IO () +testClientBatchWithLargeMessageV8 = do + client <- clientStubV8 + subs1 <- replicateM 60 $ randomSUBCmdV8 client + send <- randomSENDCmdV8 client 17000 + subs2 <- replicateM 150 $ randomSUBCmdV8 client + let cmds = subs1 <> [send] <> subs2 + batches1 = batchTransmissions' False smpBlockSize $ L.fromList cmds + all lenOk1 batches1 `shouldBe` False + length batches1 `shouldBe` 211 + let batches1' = take 60 batches1 <> drop 61 batches1 + all lenOk1 batches1' `shouldBe` True + length batches1' `shouldBe` 210 + -- + let batches = batchTransmissions' True smpBlockSize $ L.fromList cmds + length batches `shouldBe` 4 + [TBTransmissions s1 n1 rs1, TBError TELargeMsg _, TBTransmissions s2 n2 rs2, TBTransmissions s3 n3 rs3] <- pure batches + (n1, n2, n3) `shouldBe` (60, 8, 142) + (length rs1, length rs2, length rs3) `shouldBe` (60, 8, 142) + all lenOk [s1, s2, s3] `shouldBe` True + -- + let cmds' = [send] <> subs1 <> subs2 + let batches' = batchTransmissions' True smpBlockSize $ L.fromList cmds' + length batches' `shouldBe` 3 + [TBError TELargeMsg _, TBTransmissions s1' n1' rs1', TBTransmissions s2' n2' rs2'] <- pure batches' + (n1', n2') `shouldBe` (68, 142) + (length rs1', length rs2') `shouldBe` (68, 142) + all lenOk [s1', s2'] `shouldBe` True + +clientStubV8 :: IO (ProtocolClient ErrorType BrokerMsg) +clientStubV8 = do g <- C.newRandom sessId <- atomically $ C.randomBytes 32 g (rKey, _) <- atomically $ C.generateAuthKeyPair C.SX25519 g @@ -263,25 +265,27 @@ clientStubV7 = do atomically $ clientStub sessId authEncryptCmdsSMPVersion thAuth_ randomSUB :: ByteString -> IO (Either TransportError (Maybe TransmissionAuth, ByteString)) -randomSUB = randomSUB_ currentClientSMPRelayVersion C.SEd448 +randomSUB = randomSUB_ C.SEd448 currentClientSMPRelayVersion -randomSUBv7 :: ByteString -> IO (Either TransportError (Maybe TransmissionAuth, ByteString)) -randomSUBv7 = randomSUB_ authEncryptCmdsSMPVersion C.SX25519 +randomSUBv8 :: ByteString -> IO (Either TransportError (Maybe TransmissionAuth, ByteString)) +randomSUBv8 = randomSUB_ C.SX25519 authEncryptCmdsSMPVersion -randomSUB_ :: (C.AlgorithmI a, C.AuthAlgorithm a) => Version -> C.SAlgorithm a -> ByteString -> IO (Either TransportError (Maybe TransmissionAuth, ByteString)) -randomSUB_ v a sessId = do +randomSUB_ :: (C.AlgorithmI a, C.AuthAlgorithm a) => C.SAlgorithm a -> Version -> ByteString -> IO (Either TransportError (Maybe TransmissionAuth, ByteString)) +randomSUB_ a v sessId = do g <- C.newRandom rId <- atomically $ C.randomBytes 24 g corrId <- atomically $ CorrId <$> C.randomBytes 3 g (rKey, rpKey) <- atomically $ C.generateAuthKeyPair a g thAuth_ <- testTHandleAuth v g rKey - pure $ authTransmission thAuth_ (Just rpKey) corrId $ encodeTransmission v sessId (corrId, rId, Cmd SRecipient SUB) + let thParams = testTHandleParams v sessId + ClntTransmission {tForAuth, tToSend} = encodeClntTransmission thParams (corrId, rId, Cmd SRecipient SUB) + pure $ (,tToSend) <$> authTransmission thAuth_ (Just rpKey) corrId tForAuth randomSUBCmd :: ProtocolClient ErrorType BrokerMsg -> IO (PCTransmission ErrorType BrokerMsg) randomSUBCmd = randomSUBCmd_ C.SEd448 -randomSUBCmdV7 :: ProtocolClient ErrorType BrokerMsg -> IO (PCTransmission ErrorType BrokerMsg) -randomSUBCmdV7 = randomSUBCmd_ C.SX25519 +randomSUBCmdV8 :: ProtocolClient ErrorType BrokerMsg -> IO (PCTransmission ErrorType BrokerMsg) +randomSUBCmdV8 = randomSUBCmd_ C.SX25519 randomSUBCmd_ :: (C.AlgorithmI a, C.AuthAlgorithm a) => C.SAlgorithm a -> ProtocolClient ErrorType BrokerMsg -> IO (PCTransmission ErrorType BrokerMsg) randomSUBCmd_ a c = do @@ -291,20 +295,33 @@ randomSUBCmd_ a c = do mkTransmission c (Just rpKey, rId, Cmd SRecipient SUB) randomSEND :: ByteString -> Int -> IO (Either TransportError (Maybe TransmissionAuth, ByteString)) -randomSEND = randomSEND_ currentClientSMPRelayVersion C.SEd448 +randomSEND = randomSEND_ C.SEd448 currentClientSMPRelayVersion -randomSENDv7 :: ByteString -> Int -> IO (Either TransportError (Maybe TransmissionAuth, ByteString)) -randomSENDv7 = randomSEND_ authEncryptCmdsSMPVersion C.SX25519 +randomSENDv8 :: ByteString -> Int -> IO (Either TransportError (Maybe TransmissionAuth, ByteString)) +randomSENDv8 = randomSEND_ C.SX25519 authEncryptCmdsSMPVersion -randomSEND_ :: (C.AlgorithmI a, C.AuthAlgorithm a) => Version -> C.SAlgorithm a -> ByteString -> Int -> IO (Either TransportError (Maybe TransmissionAuth, ByteString)) -randomSEND_ v a sessId len = do +randomSEND_ :: (C.AlgorithmI a, C.AuthAlgorithm a) => C.SAlgorithm a -> Version -> ByteString -> Int -> IO (Either TransportError (Maybe TransmissionAuth, ByteString)) +randomSEND_ a v sessId len = do g <- C.newRandom sId <- atomically $ C.randomBytes 24 g corrId <- atomically $ CorrId <$> C.randomBytes 3 g (rKey, rpKey) <- atomically $ C.generateAuthKeyPair a g thAuth_ <- testTHandleAuth v g rKey msg <- atomically $ C.randomBytes len g - pure $ authTransmission thAuth_ (Just rpKey) corrId $ encodeTransmission v sessId (corrId, sId, Cmd SSender $ SEND noMsgFlags msg) + let thParams = testTHandleParams v sessId + ClntTransmission {tForAuth, tToSend} = encodeClntTransmission thParams (corrId, sId, Cmd SSender $ SEND noMsgFlags msg) + pure $ (,tToSend) <$> authTransmission thAuth_ (Just rpKey) corrId tForAuth + +testTHandleParams :: Version -> ByteString -> THandleParams +testTHandleParams v sessionId = + THandleParams + { sessionId, + blockSize = smpBlockSize, + thVersion = v, + thAuth = Nothing, + encrypt = v >= encryptTransmissionSMPVersion, + batch = True + } testTHandleAuth :: Version -> TVar ChaChaDRG -> C.APublicAuthKey -> IO (Maybe THandleAuth) testTHandleAuth v g (C.APublicAuthKey a k) = case a of @@ -316,8 +333,8 @@ testTHandleAuth v g (C.APublicAuthKey a k) = case a of randomSENDCmd :: ProtocolClient ErrorType BrokerMsg -> Int -> IO (PCTransmission ErrorType BrokerMsg) randomSENDCmd = randomSENDCmd_ C.SEd448 -randomSENDCmdV7 :: ProtocolClient ErrorType BrokerMsg -> Int -> IO (PCTransmission ErrorType BrokerMsg) -randomSENDCmdV7 = randomSENDCmd_ C.SX25519 +randomSENDCmdV8 :: ProtocolClient ErrorType BrokerMsg -> Int -> IO (PCTransmission ErrorType BrokerMsg) +randomSENDCmdV8 = randomSENDCmd_ C.SX25519 randomSENDCmd_ :: (C.AlgorithmI a, C.AuthAlgorithm a) => C.SAlgorithm a -> ProtocolClient ErrorType BrokerMsg -> Int -> IO (PCTransmission ErrorType BrokerMsg) randomSENDCmd_ a c len = do diff --git a/tests/NtfClient.hs b/tests/NtfClient.hs index 9eb53c036..c575a7640 100644 --- a/tests/NtfClient.hs +++ b/tests/NtfClient.hs @@ -111,8 +111,8 @@ ntfServerCfg = transportConfig = defaultTransportServerConfig } -ntfServerCfgV2 :: NtfServerConfig -ntfServerCfgV2 = +ntfServerCfgV3 :: NtfServerConfig +ntfServerCfgV3 = ntfServerCfg { ntfServerVRange = mkVersionRange 1 authEncryptCmdsNTFVersion, smpAgentCfg = defaultSMPClientAgentConfig {smpCfg = (smpCfg defaultSMPClientAgentConfig) {serverVRange = mkVersionRange 4 authEncryptCmdsSMPVersion}} @@ -151,8 +151,8 @@ ntfServerTest :: ntfServerTest _ t = runNtfTest $ \h -> tPut' h t >> tGet' h where tPut' :: THandle c -> (Maybe TransmissionAuth, ByteString, ByteString, smp) -> IO () - tPut' h@THandle {sessionId} (sig, corrId, queueId, smp) = do - let t' = smpEncode (sessionId, corrId, queueId, smp) + tPut' h (sig, corrId, queueId, smp) = do + let t' = smpEncode (corrId, queueId, smp) [Right ()] <- tPut h [Right (sig, t')] pure () tGet' h = do diff --git a/tests/NtfServerTests.hs b/tests/NtfServerTests.hs index 0ce6f923d..3c2e70ccf 100644 --- a/tests/NtfServerTests.hs +++ b/tests/NtfServerTests.hs @@ -68,15 +68,15 @@ pattern RespNtf :: CorrId -> QueueId -> NtfResponse -> SignedTransmission ErrorT pattern RespNtf corrId queueId command <- (_, _, (corrId, queueId, Right command)) sendRecvNtf :: forall c e. (Transport c, NtfEntityI e) => THandle c -> (Maybe TransmissionAuth, ByteString, ByteString, NtfCommand e) -> IO (SignedTransmission ErrorType NtfResponse) -sendRecvNtf h@THandle {thVersion, sessionId} (sgn, corrId, qId, cmd) = do - let t = encodeTransmission thVersion sessionId (CorrId corrId, qId, cmd) - Right () <- tPut1 h (sgn, t) +sendRecvNtf h@THandle {params} (sgn, corrId, qId, cmd) = do + let ClntTransmission {tToSend} = encodeClntTransmission params (CorrId corrId, qId, cmd) + Right () <- tPut1 h (sgn, tToSend) tGet1 h signSendRecvNtf :: forall c e. (Transport c, NtfEntityI e) => THandle c -> C.APrivateAuthKey -> (ByteString, ByteString, NtfCommand e) -> IO (SignedTransmission ErrorType NtfResponse) -signSendRecvNtf h@THandle {thVersion, sessionId} (C.APrivateAuthKey a pk) (corrId, qId, cmd) = do - let t = encodeTransmission thVersion sessionId (CorrId corrId, qId, cmd) - Right () <- tPut1 h (authorize t, t) +signSendRecvNtf h@THandle {params} (C.APrivateAuthKey a pk) (corrId, qId, cmd) = do + let ClntTransmission {tForAuth, tToSend} = encodeClntTransmission params (CorrId corrId, qId, cmd) + Right () <- tPut1 h (authorize tForAuth, tToSend) tGet1 h where authorize t = case a of diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 740d9dcc7..67396c35b 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -105,8 +105,8 @@ cfg = controlPort = Nothing } -cfgV7 :: ServerConfig -cfgV7 = cfg {smpServerVRange = mkVersionRange 4 authEncryptCmdsSMPVersion} +cfgV8 :: ServerConfig +cfgV8 = cfg {smpServerVRange = mkVersionRange 4 authEncryptCmdsSMPVersion} withSmpServerStoreMsgLogOn :: HasCallStack => ATransport -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a withSmpServerStoreMsgLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, serverStatsBackupFile = Just testServerStatsBackupFile} @@ -142,8 +142,8 @@ withSmpServerOn t port' = withSmpServerThreadOn t port' . const withSmpServer :: HasCallStack => ATransport -> IO a -> IO a withSmpServer t = withSmpServerOn t testPort -withSmpServerV7 :: HasCallStack => ATransport -> IO a -> IO a -withSmpServerV7 t = withSmpServerConfigOn t cfgV7 testPort . const +withSmpServerV8 :: HasCallStack => ATransport -> IO a -> IO a +withSmpServerV8 t = withSmpServerConfigOn t cfgV8 testPort . const runSmpTest :: forall c a. (HasCallStack, Transport c) => (HasCallStack => THandle c -> IO a) -> IO a runSmpTest test = withSmpServer (transport @c) $ testSMPClient test @@ -164,8 +164,8 @@ smpServerTest :: smpServerTest _ t = runSmpTest $ \h -> tPut' h t >> tGet' h where tPut' :: THandle c -> (Maybe TransmissionAuth, ByteString, ByteString, smp) -> IO () - tPut' h@THandle {sessionId} (sig, corrId, queueId, smp) = do - let t' = smpEncode (sessionId, corrId, queueId, smp) + tPut' h (sig, corrId, queueId, smp) = do + let t' = smpEncode (corrId, queueId, smp) [Right ()] <- tPut h [Right (sig, t')] pure () tGet' h = do diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index f78408c48..2a94de790 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -73,15 +73,15 @@ pattern Msg :: MsgId -> MsgBody -> BrokerMsg pattern Msg msgId body <- MSG RcvMessage {msgId, msgBody = EncRcvMsgBody body} sendRecv :: forall c p. (Transport c, PartyI p) => THandle c -> (Maybe TransmissionAuth, ByteString, ByteString, Command p) -> IO (SignedTransmission ErrorType BrokerMsg) -sendRecv h@THandle {thVersion, sessionId} (sgn, corrId, qId, cmd) = do - let t = encodeTransmission thVersion sessionId (CorrId corrId, qId, cmd) - Right () <- tPut1 h (sgn, t) +sendRecv h@THandle {params} (sgn, corrId, qId, cmd) = do + let ClntTransmission {tToSend} = encodeClntTransmission params (CorrId corrId, qId, cmd) + Right () <- tPut1 h (sgn, tToSend) tGet1 h signSendRecv :: forall c p. (Transport c, PartyI p) => THandle c -> C.APrivateAuthKey -> (ByteString, ByteString, Command p) -> IO (SignedTransmission ErrorType BrokerMsg) -signSendRecv h@THandle {thVersion, sessionId} (C.APrivateAuthKey a pk) (corrId, qId, cmd) = do - let t = encodeTransmission thVersion sessionId (CorrId corrId, qId, cmd) - Right () <- tPut1 h (authorize t, t) +signSendRecv h@THandle {params} (C.APrivateAuthKey a pk) (corrId, qId, cmd) = do + let ClntTransmission {tForAuth, tToSend} = encodeClntTransmission params (CorrId corrId, qId, cmd) + Right () <- tPut1 h (authorize tForAuth, tToSend) tGet1 h where authorize t = case a of