From 6f832733189f769e195e8b3951e28d88b8118ec4 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Thu, 2 May 2024 15:14:01 +0100 Subject: [PATCH 01/10] client: increase timeout for SOCKS connection, increase timeout for direct connection (#1123) --- src/Simplex/Messaging/Transport/Client.hs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/Simplex/Messaging/Transport/Client.hs b/src/Simplex/Messaging/Transport/Client.hs index 08cff1d0d..a943b36d9 100644 --- a/src/Simplex/Messaging/Transport/Client.hs +++ b/src/Simplex/Messaging/Transport/Client.hs @@ -19,7 +19,7 @@ module Simplex.Messaging.Transport.Client TransportHost (..), TransportHosts (..), TransportHosts_ (..), - validateCertificateChain + validateCertificateChain, ) where @@ -52,7 +52,7 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (parseAll, parseString) import Simplex.Messaging.Transport import Simplex.Messaging.Transport.KeepAlive -import Simplex.Messaging.Util (bshow, (<$?>), catchAll, tshow) +import Simplex.Messaging.Util (bshow, catchAll, tshow, (<$?>)) import System.IO.Error import System.Timeout (timeout) import Text.Read (readMaybe) @@ -143,14 +143,19 @@ runTLSTransportClient tlsParams caStore_ cfg@TransportClientConfig {socksProxy, serverCert <- newEmptyTMVarIO let hostName = B.unpack $ strEncode host clientParams = mkTLSClientParams tlsParams caStore_ hostName port keyHash clientCredentials alpn serverCert - connectTCP = case socksProxy of - Just proxy -> connectSocksClient proxy proxyUsername $ hostAddr host - _ -> connectTCPClient hostName + (connectTCP, tlsTimeout) = case socksProxy of + -- We use a much larger timeout for connections via SOCKS proxy, to allow the circuits created + -- in the socket connection that would otherwise timeout to be used in the next connection attempt. + -- Using standard timeout results in permanent timeout for the clients using SOCKS in cases + -- when SOCKS proxy is very slow (bad network, congestion in underlying network, etc.), + -- because SOCKS proxy destroys circuits when the last session using them is closed. + Just proxy -> (connectSocksClient proxy proxyUsername (hostAddr host), tcpConnectTimeout * 10) + _ -> (connectTCPClient hostName, tcpConnectTimeout) c <- do sock <- connectTCP port mapM_ (setSocketKeepAlive sock) tcpKeepAlive `catchAll` \e -> logError ("Error setting TCP keep-alive" <> tshow e) let tCfg = clientTransportConfig cfg - tcpConnectTimeout `timeout` connectTLS (Just hostName) tCfg clientParams sock >>= \case + tlsTimeout `timeout` connectTLS (Just hostName) tCfg clientParams sock >>= \case Nothing -> do close sock logError "connection timed out" From 9e49c289b4273e69ed763b38b58bd45adb299065 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Wed, 1 May 2024 00:51:08 +0100 Subject: [PATCH 02/10] upgrade SMP/NTF servers to v7/v2 protocol versions (#996) * upgrade SMP/NTF servers to v7/v2 protocol versions * 5.6.0.0 --------- Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> --- src/Simplex/Messaging/Notifications/Transport.hs | 2 +- src/Simplex/Messaging/Transport.hs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Simplex/Messaging/Notifications/Transport.hs b/src/Simplex/Messaging/Notifications/Transport.hs index 022403471..e2c287437 100644 --- a/src/Simplex/Messaging/Notifications/Transport.hs +++ b/src/Simplex/Messaging/Notifications/Transport.hs @@ -47,7 +47,7 @@ currentClientNTFVersion :: VersionNTF currentClientNTFVersion = VersionNTF 1 currentServerNTFVersion :: VersionNTF -currentServerNTFVersion = VersionNTF 1 +currentServerNTFVersion = VersionNTF 2 supportedClientNTFVRange :: VersionRangeNTF supportedClientNTFVRange = mkVersionRange initialNTFVersion currentClientNTFVersion diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index 519154bb5..8dfd15813 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -153,7 +153,7 @@ currentClientSMPRelayVersion :: VersionSMP currentClientSMPRelayVersion = VersionSMP 6 currentServerSMPRelayVersion :: VersionSMP -currentServerSMPRelayVersion = VersionSMP 6 +currentServerSMPRelayVersion = VersionSMP 7 -- minimal supported protocol version is 4 -- TODO remove code that supports sending commands without batching From 60403955c05b9f8e72709fc724d5f9f68b216ea4 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Wed, 1 May 2024 00:56:33 +0100 Subject: [PATCH 03/10] 5.7.0.4 --- CHANGELOG.md | 20 ++++++++++++++++++++ package.yaml | 2 +- simplexmq.cabal | 2 +- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e06ac1caa..ad8862b0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,23 @@ +# 5.7.0 + +Version 5.7.0.4 + +_Please note_: the earliest SimpleX Chat clients supported by this version of the servers is 5.5.3 (released on February 11, 2024). + +SMP server: +- increase max SMP protocol version to 7 (support for deniable authenticators). + +NTF server: +- increase max NTF protocol version to 2 (support for deniable authenticators). + +XFTP server: +- version handshake using ALPN. + +SMP agent: +- increase timeouts for XFTP files. +- don't send commands after timeout. +- PQ encryption support. + # 5.6.2 Version 5.6.2.2. diff --git a/package.yaml b/package.yaml index b4c1f1d78..084fe3a8f 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 5.7.0.3 +version: 5.7.0.4 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index f353925bd..1bdd67c0b 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 5.7.0.3 +version: 5.7.0.4 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and From c5941b790b10e0896fb9c8c59b5123657f827003 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Thu, 2 May 2024 15:14:01 +0100 Subject: [PATCH 04/10] client: increase timeout for SOCKS connection, increase timeout for direct connection (#1123) --- src/Simplex/Messaging/Transport/Client.hs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/Simplex/Messaging/Transport/Client.hs b/src/Simplex/Messaging/Transport/Client.hs index 08cff1d0d..a943b36d9 100644 --- a/src/Simplex/Messaging/Transport/Client.hs +++ b/src/Simplex/Messaging/Transport/Client.hs @@ -19,7 +19,7 @@ module Simplex.Messaging.Transport.Client TransportHost (..), TransportHosts (..), TransportHosts_ (..), - validateCertificateChain + validateCertificateChain, ) where @@ -52,7 +52,7 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (parseAll, parseString) import Simplex.Messaging.Transport import Simplex.Messaging.Transport.KeepAlive -import Simplex.Messaging.Util (bshow, (<$?>), catchAll, tshow) +import Simplex.Messaging.Util (bshow, catchAll, tshow, (<$?>)) import System.IO.Error import System.Timeout (timeout) import Text.Read (readMaybe) @@ -143,14 +143,19 @@ runTLSTransportClient tlsParams caStore_ cfg@TransportClientConfig {socksProxy, serverCert <- newEmptyTMVarIO let hostName = B.unpack $ strEncode host clientParams = mkTLSClientParams tlsParams caStore_ hostName port keyHash clientCredentials alpn serverCert - connectTCP = case socksProxy of - Just proxy -> connectSocksClient proxy proxyUsername $ hostAddr host - _ -> connectTCPClient hostName + (connectTCP, tlsTimeout) = case socksProxy of + -- We use a much larger timeout for connections via SOCKS proxy, to allow the circuits created + -- in the socket connection that would otherwise timeout to be used in the next connection attempt. + -- Using standard timeout results in permanent timeout for the clients using SOCKS in cases + -- when SOCKS proxy is very slow (bad network, congestion in underlying network, etc.), + -- because SOCKS proxy destroys circuits when the last session using them is closed. + Just proxy -> (connectSocksClient proxy proxyUsername (hostAddr host), tcpConnectTimeout * 10) + _ -> (connectTCPClient hostName, tcpConnectTimeout) c <- do sock <- connectTCP port mapM_ (setSocketKeepAlive sock) tcpKeepAlive `catchAll` \e -> logError ("Error setting TCP keep-alive" <> tshow e) let tCfg = clientTransportConfig cfg - tcpConnectTimeout `timeout` connectTLS (Just hostName) tCfg clientParams sock >>= \case + tlsTimeout `timeout` connectTLS (Just hostName) tCfg clientParams sock >>= \case Nothing -> do close sock logError "connection timed out" From 8d8010a62aef2241fec3876fcfe57d51456b2bc0 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Thu, 2 May 2024 16:22:55 +0100 Subject: [PATCH 05/10] 5.7.1.0 --- CHANGELOG.md | 5 +++++ package.yaml | 2 +- simplexmq.cabal | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ad8862b0f..ffdaa7ff2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 5.7.1 + +SMP agent: +- increase timeout for TLS connection via SOCKS + # 5.7.0 Version 5.7.0.4 diff --git a/package.yaml b/package.yaml index 084fe3a8f..ccf0794ae 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 5.7.0.4 +version: 5.7.1.0 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 1bdd67c0b..9b5fa36b0 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 5.7.0.4 +version: 5.7.1.0 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and From b586a6e90af5eef0e974ac9209f7ee20a7bf6d35 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Fri, 3 May 2024 22:16:52 +0100 Subject: [PATCH 06/10] client: removed concurrency limit when waiting for subscription results (#1126) --- src/Simplex/Messaging/Client.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index f54fe686f..8d3c5e54f 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -718,7 +718,7 @@ sendBatch c@ProtocolClient {client_ = PClient {rcvConcurrency, sndQ}} b = do | n > 0 -> do active <- newTVarIO True atomically $ writeTBQueue sndQ (active, s) - pooledMapConcurrentlyN rcvConcurrency (getResponse c active) rs + mapConcurrently (getResponse c active) rs | otherwise -> pure [] TBTransmission s r -> do active <- newTVarIO True From 0e205e70adbe269d43914ef62f04bbbfd04fbeec Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Sat, 4 May 2024 01:39:00 +0300 Subject: [PATCH 07/10] add TRcvQueues tests (#1117) Co-authored-by: Evgeny Poberezkin --- tests/CoreTests/TRcvQueuesTests.hs | 50 ++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/tests/CoreTests/TRcvQueuesTests.hs b/tests/CoreTests/TRcvQueuesTests.hs index 91722228b..2b0009344 100644 --- a/tests/CoreTests/TRcvQueuesTests.hs +++ b/tests/CoreTests/TRcvQueuesTests.hs @@ -22,10 +22,13 @@ tRcvQueuesTests = do describe "connection API" $ do it "hasConn" hasConnTest it "hasConn, batch add" hasConnTestBatch + it "hasConn, batch idempotent" batchIdempotentTest it "deleteConn" deleteConnTest describe "session API" $ do it "getSessQueues" getSessQueuesTest it "getDelSessQueues" getDelSessQueuesTest + describe "queue transfer" $ do + it "getDelSessQueues-batchAddQueues preserves total length" removeSubsTest checkDataInvariant :: RQ.TRcvQueues -> IO Bool checkDataInvariant trq = atomically $ do @@ -62,6 +65,19 @@ hasConnTestBatch = do atomically (RQ.hasConn "c3" trq) `shouldReturn` True atomically (RQ.hasConn "nope" trq) `shouldReturn` False +batchIdempotentTest :: IO () +batchIdempotentTest = do + trq <- atomically RQ.empty + let qs = [dummyRQ 0 "smp://1234-w==@alpha" "c1", dummyRQ 0 "smp://1234-w==@alpha" "c2", dummyRQ 0 "smp://1234-w==@beta" "c3"] + atomically $ RQ.batchAddQueues trq qs + checkDataInvariant trq `shouldReturn` True + qs' <- readTVarIO $ RQ.getRcvQueues trq + cs' <- readTVarIO $ RQ.getConnections trq + atomically $ RQ.batchAddQueues trq qs + checkDataInvariant trq `shouldReturn` True + readTVarIO (RQ.getRcvQueues trq) `shouldReturn` qs' + fmap L.nub <$> readTVarIO (RQ.getConnections trq) `shouldReturn`cs' -- connections get duplicated, but that doesn't appear to affect anybody + deleteConnTest :: IO () deleteConnTest = do trq <- atomically RQ.empty @@ -121,6 +137,40 @@ getDelSessQueuesTest = do atomically (RQ.hasConn "c3" trq) `shouldReturn` True atomically (RQ.hasConn "c4" trq) `shouldReturn` True +removeSubsTest :: IO () +removeSubsTest = do + aq <- atomically RQ.empty + let qs = + [ dummyRQ 0 "smp://1234-w==@alpha" "c1", + dummyRQ 0 "smp://1234-w==@alpha" "c2", + dummyRQ 0 "smp://1234-w==@beta" "c3", + dummyRQ 1 "smp://1234-w==@beta" "c4" + ] + atomically $ RQ.batchAddQueues aq qs + + pq <- atomically RQ.empty + atomically (totalSize aq pq) `shouldReturn` (4, 4) + + atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@alpha", Nothing) aq >>= RQ.batchAddQueues pq . fst + atomically (totalSize aq pq) `shouldReturn` (4, 4) + + atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@beta", Just "non-existent") aq >>= RQ.batchAddQueues pq . fst + atomically (totalSize aq pq) `shouldReturn` (4, 4) + + atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@localhost", Nothing) aq >>= RQ.batchAddQueues pq . fst + atomically (totalSize aq pq) `shouldReturn` (4, 4) + + atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@beta", Just "c3") aq >>= RQ.batchAddQueues pq . fst + atomically (totalSize aq pq) `shouldReturn` (4, 4) + +totalSize :: RQ.TRcvQueues -> RQ.TRcvQueues -> STM (Int, Int) +totalSize a b = do + qsizeA <- M.size <$> readTVar (RQ.getRcvQueues a) + qsizeB <- M.size <$> readTVar (RQ.getRcvQueues b) + csizeA <- M.size <$> readTVar (RQ.getConnections a) + csizeB <- M.size <$> readTVar (RQ.getConnections b) + pure (qsizeA + qsizeB, csizeA + csizeB) + dummyRQ :: UserId -> SMPServer -> ConnId -> RcvQueue dummyRQ userId server connId = RcvQueue From ee8e4067b02c520a41b82ab972e262b24f58cd69 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 5 May 2024 12:12:19 +0100 Subject: [PATCH 08/10] agent: prepare connection record before joining to prevent race conditions (#1128) * agent: prepare connection record before joining to prevent race conditions * prepare connection for contact address as well * clean up --- src/Simplex/Messaging/Agent.hs | 100 +++++++++++++++++-------- src/Simplex/Messaging/Client.hs | 1 - tests/AgentTests/FunctionalAPITests.hs | 12 ++- 3 files changed, 75 insertions(+), 38 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 38112b030..f93d03e35 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -55,6 +55,7 @@ module Simplex.Messaging.Agent deleteConnectionAsync, deleteConnectionsAsync, createConnection, + prepareConnectionToJoin, joinConnection, allowConnection, acceptContact, @@ -149,7 +150,7 @@ import Simplex.FileTransfer.Protocol (FileParty (..)) import Simplex.FileTransfer.Util (removePath) import Simplex.Messaging.Agent.Client import Simplex.Messaging.Agent.Env.SQLite -import Simplex.Messaging.Agent.Lock (withLock', withLock) +import Simplex.Messaging.Agent.Lock (withLock, withLock') import Simplex.Messaging.Agent.NtfSubSupervisor import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval @@ -160,7 +161,7 @@ import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations import Simplex.Messaging.Client (ProtocolClient (..), ServerTransmission) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs) -import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOn, pattern PQEncOff, pattern PQSupportOn, pattern PQSupportOff) +import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOff, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn) import qualified Simplex.Messaging.Crypto.Ratchet as CR import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String @@ -198,7 +199,7 @@ getSMPAgentClient_ clientId cfg initServers store backgroundMode = liftIO $ newSMPAgentEnv cfg store >>= runReaderT runAgent where runAgent = do - c@AgentClient {acThread} <- atomically . newAgentClient clientId initServers =<< ask + c@AgentClient {acThread} <- atomically . newAgentClient clientId initServers =<< ask t <- runAgentThreads c `forkFinally` const (liftIO $ disconnectAgentClient c) atomically . writeTVar acThread . Just =<< mkWeakThreadId t pure c @@ -239,7 +240,7 @@ createUser c = withAgentEnv c .: createUser' c {-# INLINE createUser #-} -- | Delete user record optionally deleting all user's connections on SMP servers -deleteUser :: AgentClient -> UserId -> Bool -> AE () +deleteUser :: AgentClient -> UserId -> Bool -> AE () deleteUser c = withAgentEnv c .: deleteUser' c {-# INLINE deleteUser #-} @@ -288,9 +289,18 @@ createConnection :: AgentClient -> UserId -> Bool -> SConnectionMode c -> Maybe createConnection c userId enableNtfs = withAgentEnv c .:: newConn c userId "" enableNtfs {-# INLINE createConnection #-} --- | Join SMP agent connection (JOIN command) -joinConnection :: AgentClient -> UserId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ConnId -joinConnection c userId enableNtfs = withAgentEnv c .:: joinConn c userId "" enableNtfs +-- | Create SMP agent connection without queue (to be joined with joinConnection passing connection ID). +-- This method is required to prevent race condition when confirmation from peer is received before +-- the caller of joinConnection saves connection ID to the database. +-- Instead of it we could send confirmation asynchronously, but then it would be harder to report +-- "link deleted" (SMP AUTH) interactively, so this approach is simpler overall. +prepareConnectionToJoin :: AgentClient -> UserId -> Bool -> ConnectionRequestUri c -> PQSupport -> AE ConnId +prepareConnectionToJoin c userId enableNtfs = withAgentEnv c .: newConnToJoin c userId "" enableNtfs + +-- | Join SMP agent connection (JOIN command). +joinConnection :: AgentClient -> UserId -> Maybe ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ConnId +joinConnection c userId Nothing enableNtfs = withAgentEnv c .:: joinConn c userId "" False enableNtfs +joinConnection c userId (Just connId) enableNtfs = withAgentEnv c .:: joinConn c userId connId True enableNtfs {-# INLINE joinConnection #-} -- | Allow connection to continue after CONF notification (LET command) @@ -575,7 +585,7 @@ processCommand :: AgentClient -> (EntityId, APartyCmd 'Client) -> AM (EntityId, processCommand c (connId, APC e cmd) = second (APC e) <$> case cmd of NEW enableNtfs (ACM cMode) pqIK subMode -> second (INV . ACR cMode) <$> newConn c userId connId enableNtfs cMode Nothing pqIK subMode - JOIN enableNtfs (ACR _ cReq) pqEnc subMode connInfo -> (,OK) <$> joinConn c userId connId enableNtfs cReq connInfo pqEnc subMode + JOIN enableNtfs (ACR _ cReq) pqEnc subMode connInfo -> (,OK) <$> joinConn c userId connId False enableNtfs cReq connInfo pqEnc subMode LET confId ownCInfo -> allowConnection' c connId confId ownCInfo $> (connId, OK) ACPT invId pqEnc ownCInfo -> (,OK) <$> acceptContact' c connId True invId ownCInfo pqEnc SMSubscribe RJCT invId -> rejectContact' c connId invId $> (connId, OK) @@ -708,11 +718,14 @@ switchConnectionAsync' c corrId connId = newConn :: AgentClient -> UserId -> ConnId -> Bool -> SConnectionMode c -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AM (ConnId, ConnectionRequestUri c) newConn c userId connId enableNtfs cMode clientData pqInitKeys subMode = - getSMPServer c userId >>= newConnSrv c userId connId enableNtfs cMode clientData pqInitKeys subMode + getSMPServer c userId >>= newConnSrv c userId connId False enableNtfs cMode clientData pqInitKeys subMode -newConnSrv :: AgentClient -> UserId -> ConnId -> Bool -> SConnectionMode c -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> SMPServerWithAuth -> AM (ConnId, ConnectionRequestUri c) -newConnSrv c userId connId enableNtfs cMode clientData pqInitKeys subMode srv = do - connId' <- newConnNoQueues c userId connId enableNtfs cMode (CR.connPQEncryption pqInitKeys) +newConnSrv :: AgentClient -> UserId -> ConnId -> Bool -> Bool -> SConnectionMode c -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> SMPServerWithAuth -> AM (ConnId, ConnectionRequestUri c) +newConnSrv c userId connId hasNewConn enableNtfs cMode clientData pqInitKeys subMode srv = do + connId' <- + if hasNewConn + then pure connId + else newConnNoQueues c userId connId enableNtfs cMode (CR.connPQEncryption pqInitKeys) newRcvConnSrv c userId connId' enableNtfs cMode clientData pqInitKeys subMode srv newRcvConnSrv :: AgentClient -> UserId -> ConnId -> Bool -> SConnectionMode c -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> SMPServerWithAuth -> AM (ConnId, ConnectionRequestUri c) @@ -738,18 +751,36 @@ newRcvConnSrv c userId connId enableNtfs cMode clientData pqInitKeys subMode srv withStore' c $ \db -> createRatchetX3dhKeys db connId pk1 pk2 pKem pure (connId, CRInvitationUri crData $ toVersionRangeT e2eRcvParams e2eEncryptVRange) -joinConn :: AgentClient -> UserId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ConnId -joinConn c userId connId enableNtfs cReq cInfo pqSupport subMode = do +newConnToJoin :: forall c. AgentClient -> UserId -> ConnId -> Bool -> ConnectionRequestUri c -> PQSupport -> AM ConnId +newConnToJoin c userId connId enableNtfs cReq pqSup = case cReq of + CRInvitationUri {} -> + lift (compatibleInvitationUri cReq) >>= \case + Just (_, (Compatible (CR.E2ERatchetParams v _ _ _)), aVersion) -> create aVersion (Just v) + Nothing -> throwError $ AGENT A_VERSION + CRContactUri {} -> + lift (compatibleContactUri cReq) >>= \case + Just (_, aVersion) -> create aVersion Nothing + Nothing -> throwError $ AGENT A_VERSION + where + create :: Compatible VersionSMPA -> Maybe CR.VersionE2E -> AM ConnId + create (Compatible connAgentVersion) e2eV_ = do + g <- asks random + let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion e2eV_ + cData = ConnData {userId, connId, connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport} + withStore c $ \db -> createNewConn db g cData SCMInvitation + +joinConn :: AgentClient -> UserId -> ConnId -> Bool -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ConnId +joinConn c userId connId hasNewConn enableNtfs cReq cInfo pqSupport subMode = do srv <- case cReq of CRInvitationUri ConnReqUriData {crSmpQueues = q :| _} _ -> getNextServer c userId [qServer q] _ -> getSMPServer c userId - joinConnSrv c userId connId enableNtfs cReq cInfo pqSupport subMode srv + joinConnSrv c userId connId hasNewConn enableNtfs cReq cInfo pqSupport subMode srv -startJoinInvitation :: UserId -> ConnId -> Bool -> ConnectionRequestUri 'CMInvitation -> PQSupport -> AM (Compatible VersionSMPA, ConnData, NewSndQueue, CR.Ratchet 'C.X448, CR.SndE2ERatchetParams 'C.X448) +startJoinInvitation :: UserId -> ConnId -> Bool -> ConnectionRequestUri 'CMInvitation -> PQSupport -> AM (ConnData, NewSndQueue, CR.Ratchet 'C.X448, CR.SndE2ERatchetParams 'C.X448) startJoinInvitation userId connId enableNtfs cReqUri pqSup = lift (compatibleInvitationUri cReqUri) >>= \case - Just (qInfo, (Compatible e2eRcvParams@(CR.E2ERatchetParams v _ rcDHRr kem_)), aVersion@(Compatible connAgentVersion)) -> do + Just (qInfo, (Compatible e2eRcvParams@(CR.E2ERatchetParams v _ rcDHRr kem_)), Compatible connAgentVersion) -> do g <- asks random let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion (Just v) (pk1, pk2, pKem, e2eSndParams) <- liftIO $ CR.generateSndE2EParams g v (CR.replyKEM_ v kem_ pqSupport) @@ -760,7 +791,7 @@ startJoinInvitation userId connId enableNtfs cReqUri pqSup = rc = CR.initSndRatchet rcVs rcDHRr rcDHRs rcParams q <- lift $ newSndQueue userId "" qInfo let cData = ConnData {userId, connId, connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport} - pure (aVersion, cData, q, rc, e2eSndParams) + pure (cData, q, rc, e2eSndParams) Nothing -> throwError $ AGENT A_VERSION connRequestPQSupport :: AgentClient -> PQSupport -> ConnectionRequestUri c -> IO (Maybe (VersionSMPA, PQSupport)) @@ -786,40 +817,43 @@ compatibleContactUri (CRContactUri ConnReqUriData {crAgentVRange, crSmpQueues = AgentConfig {smpClientVRange, smpAgentVRange} <- asks config pure $ (,) - <$> (qUri `compatibleVersion` smpClientVRange) + <$> (qUri `compatibleVersion` smpClientVRange) <*> (crAgentVRange `compatibleVersion` smpAgentVRange) versionPQSupport_ :: VersionSMPA -> Maybe CR.VersionE2E -> PQSupport versionPQSupport_ agentV e2eV_ = PQSupport $ agentV >= pqdrSMPAgentVersion && maybe True (>= CR.pqRatchetE2EEncryptVersion) e2eV_ {-# INLINE versionPQSupport_ #-} -joinConnSrv :: AgentClient -> UserId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> SMPServerWithAuth -> AM ConnId -joinConnSrv c userId connId enableNtfs inv@CRInvitationUri {} cInfo pqSup subMode srv = +joinConnSrv :: AgentClient -> UserId -> ConnId -> Bool -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> SMPServerWithAuth -> AM ConnId +joinConnSrv c userId connId hasNewConn enableNtfs inv@CRInvitationUri {} cInfo pqSup subMode srv = withInvLock c (strEncode inv) "joinConnSrv" $ do - (aVersion, cData, q, rc, e2eSndParams) <- startJoinInvitation userId connId enableNtfs inv pqSup + (cData, q, rc, e2eSndParams) <- startJoinInvitation userId connId enableNtfs inv pqSup g <- asks random (connId', sq) <- withStore c $ \db -> runExceptT $ do - r@(connId', _) <- ExceptT $ createSndConn db g cData q + r@(connId', _) <- + if hasNewConn + then (connId,) <$> ExceptT (updateNewConnSnd db connId q) + else ExceptT $ createSndConn db g cData q liftIO $ createRatchet db connId' rc pure r let cData' = (cData :: ConnData) {connId = connId'} - tryError (confirmQueue aVersion c cData' sq srv cInfo (Just e2eSndParams) subMode) >>= \case + tryError (confirmQueue c cData' sq srv cInfo (Just e2eSndParams) subMode) >>= \case Right _ -> pure connId' Left e -> do -- possible improvement: recovery for failure on network timeout, see rfcs/2022-04-20-smp-conf-timeout-recovery.md void $ withStore' c $ \db -> deleteConn db Nothing connId' throwError e -joinConnSrv c userId connId enableNtfs cReqUri@CRContactUri {} cInfo pqSup subMode srv = +joinConnSrv c userId connId hasNewConn enableNtfs cReqUri@CRContactUri {} cInfo pqSup subMode srv = lift (compatibleContactUri cReqUri) >>= \case Just (qInfo, vrsn) -> do - (connId', cReq) <- newConnSrv c userId connId enableNtfs SCMInvitation Nothing (CR.IKNoPQ pqSup) subMode srv + (connId', cReq) <- newConnSrv c userId connId hasNewConn enableNtfs SCMInvitation Nothing (CR.IKNoPQ pqSup) subMode srv sendInvitation c userId qInfo vrsn cReq cInfo pure connId' Nothing -> throwError $ AGENT A_VERSION joinConnSrvAsync :: AgentClient -> UserId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> SMPServerWithAuth -> AM () joinConnSrvAsync c userId connId enableNtfs inv@CRInvitationUri {} cInfo pqSupport subMode srv = do - (_aVersion, cData, q, rc, e2eSndParams) <- startJoinInvitation userId connId enableNtfs inv pqSupport + (cData, q, rc, e2eSndParams) <- startJoinInvitation userId connId enableNtfs inv pqSupport q' <- withStore c $ \db -> runExceptT $ do liftIO $ createRatchet db connId rc ExceptT $ updateNewConnSnd db connId q @@ -861,7 +895,7 @@ acceptContact' c connId enableNtfs invId ownConnInfo pqSupport subMode = withCon withStore c (`getConn` contactConnId) >>= \case SomeConn _ (ContactConnection ConnData {userId} _) -> do withStore' c $ \db -> acceptInvitation db invId ownConnInfo - joinConn c userId connId enableNtfs connReq ownConnInfo pqSupport subMode `catchAgentError` \err -> do + joinConn c userId connId False enableNtfs connReq ownConnInfo pqSupport subMode `catchAgentError` \err -> do withStore' c (`unacceptInvitation` invId) throwError err _ -> throwError $ CMD PROHIBITED @@ -1207,7 +1241,7 @@ enqueueMessage c cData sq msgFlags aMessage = {-# INLINE enqueueMessage #-} -- this function is used only for sending messages in batch, it returns the list of successes to enqueue additional deliveries -enqueueMessageB :: forall t. (Traversable t) => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId)))) +enqueueMessageB :: forall t. Traversable t => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId)))) enqueueMessageB c reqs = do cfg <- asks config reqMids <- withStoreBatch c $ \db -> fmap (bindRight $ storeSentMsg db cfg) reqs @@ -1239,7 +1273,7 @@ enqueueSavedMessage :: AgentClient -> ConnData -> AgentMsgId -> SndQueue -> AM' enqueueSavedMessage c cData msgId sq = enqueueSavedMessageB c $ Identity (cData, [sq], msgId) {-# INLINE enqueueSavedMessage #-} -enqueueSavedMessageB :: (Foldable t) => AgentClient -> t (ConnData, [SndQueue], AgentMsgId) -> AM' () +enqueueSavedMessageB :: Foldable t => AgentClient -> t (ConnData, [SndQueue], AgentMsgId) -> AM' () enqueueSavedMessageB c reqs = do -- saving to the database is in the start to avoid race conditions when delivery is read from queue before it is saved void $ withStoreBatch' c $ \db -> concatMap (storeDeliveries db) reqs @@ -2565,8 +2599,8 @@ confirmQueueAsync c cData sq srv connInfo e2eEncryption_ subMode = do storeConfirmation c cData sq e2eEncryption_ =<< mkAgentConfirmation c cData sq srv connInfo subMode lift $ submitPendingMsg c cData sq -confirmQueue :: Compatible VersionSMPA -> AgentClient -> ConnData -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> SubscriptionMode -> AM () -confirmQueue (Compatible agentVersion) c cData@ConnData {connId, pqSupport} sq srv connInfo e2eEncryption_ subMode = do +confirmQueue :: AgentClient -> ConnData -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> SubscriptionMode -> AM () +confirmQueue c cData@ConnData {connId, connAgentVersion, pqSupport} sq srv connInfo e2eEncryption_ subMode = do msg <- mkConfirmation =<< mkAgentConfirmation c cData sq srv connInfo subMode sendConfirmation c sq msg withStore' c $ \db -> setSndQueueStatus db sq Confirmed @@ -2578,7 +2612,7 @@ confirmQueue (Compatible agentVersion) c cData@ConnData {connId, pqSupport} sq s void . liftIO $ updateSndIds db connId let pqEnc = CR.pqSupportToEnc pqSupport (encConnInfo, _) <- agentRatchetEncrypt db cData (smpEncode aMessage) e2eEncConnInfoLength (Just pqEnc) currentE2EVersion - pure . smpEncode $ AgentConfirmation {agentVersion, e2eEncryption_, encConnInfo} + pure . smpEncode $ AgentConfirmation {agentVersion = connAgentVersion, e2eEncryption_, encConnInfo} mkAgentConfirmation :: AgentClient -> ConnData -> SndQueue -> SMPServerWithAuth -> ConnInfo -> SubscriptionMode -> AM AgentMessage mkAgentConfirmation c cData sq srv connInfo subMode = do diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 8d3c5e54f..38c36f9f2 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -113,7 +113,6 @@ import Simplex.Messaging.Transport.WebSockets (WS) import Simplex.Messaging.Util (bshow, diffToMicroseconds, raceAny_, threadDelay', whenM) import Simplex.Messaging.Version import System.Timeout (timeout) -import UnliftIO (pooledMapConcurrentlyN) -- | 'SMPClient' is a handle used to send commands to a specific SMP server. -- diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 79742efab..d3e4e6924 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -244,7 +244,7 @@ createConnection :: AgentClient -> UserId -> Bool -> SConnectionMode c -> Maybe createConnection c userId enableNtfs cMode clientData = A.createConnection c userId enableNtfs cMode clientData (IKNoPQ PQSupportOn) joinConnection :: AgentClient -> UserId -> Bool -> ConnectionRequestUri c -> ConnInfo -> SubscriptionMode -> AE ConnId -joinConnection c userId enableNtfs cReq connInfo = A.joinConnection c userId enableNtfs cReq connInfo PQSupportOn +joinConnection c userId enableNtfs cReq connInfo = A.joinConnection c userId Nothing enableNtfs cReq connInfo PQSupportOn sendMessage :: AgentClient -> ConnId -> SMP.MsgFlags -> MsgBody -> AE AgentMsgId sendMessage c connId msgFlags msgBody = do @@ -503,7 +503,7 @@ runAgentClientTest :: HasCallStack => PQSupport -> AgentClient -> AgentClient -> runAgentClientTest pqSupport alice@AgentClient {} bob baseId = runRight_ $ do (bobId, qInfo) <- A.createConnection alice 1 True SCMInvitation Nothing (IKNoPQ pqSupport) SMSubscribe - aliceId <- A.joinConnection bob 1 True qInfo "bob's connInfo" pqSupport SMSubscribe + aliceId <- A.joinConnection bob 1 Nothing True qInfo "bob's connInfo" pqSupport SMSubscribe ("", _, A.CONF confId pqSup' _ "bob's connInfo") <- get alice liftIO $ pqSup' `shouldBe` pqSupport allowConnection alice bobId confId "alice's connInfo" @@ -630,7 +630,9 @@ runAgentClientContactTest :: HasCallStack => PQSupport -> AgentClient -> AgentCl runAgentClientContactTest pqSupport alice bob baseId = runRight_ $ do (_, qInfo) <- A.createConnection alice 1 True SCMContact Nothing (IKNoPQ pqSupport) SMSubscribe - aliceId <- A.joinConnection bob 1 True qInfo "bob's connInfo" pqSupport SMSubscribe + aliceId <- A.prepareConnectionToJoin bob 1 True qInfo pqSupport + aliceId' <- A.joinConnection bob 1 (Just aliceId) True qInfo "bob's connInfo" pqSupport SMSubscribe + liftIO $ aliceId' `shouldBe` aliceId ("", _, A.REQ invId pqSup' _ "bob's connInfo") <- get alice liftIO $ pqSup' `shouldBe` pqSupport bobId <- acceptContact alice True invId "alice's connInfo" PQSupportOn SMSubscribe @@ -1399,7 +1401,9 @@ makeConnectionForUsers = makeConnectionForUsers_ PQSupportOn makeConnectionForUsers_ :: PQSupport -> AgentClient -> UserId -> AgentClient -> UserId -> ExceptT AgentErrorType IO (ConnId, ConnId) makeConnectionForUsers_ pqSupport alice aliceUserId bob bobUserId = do (bobId, qInfo) <- A.createConnection alice aliceUserId True SCMInvitation Nothing (CR.IKNoPQ pqSupport) SMSubscribe - aliceId <- A.joinConnection bob bobUserId True qInfo "bob's connInfo" pqSupport SMSubscribe + aliceId <- A.prepareConnectionToJoin bob bobUserId True qInfo pqSupport + aliceId' <- A.joinConnection bob bobUserId (Just aliceId) True qInfo "bob's connInfo" pqSupport SMSubscribe + liftIO $ aliceId' `shouldBe` aliceId ("", _, A.CONF confId pqSup' _ "bob's connInfo") <- get alice liftIO $ pqSup' `shouldBe` pqSupport allowConnection alice bobId confId "alice's connInfo" From e13b0df539cd259187277ad1fb790fae8d505574 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 5 May 2024 17:05:51 +0100 Subject: [PATCH 09/10] client: remove TLS handshake timeout (#1129) * client: remove TLS handshake timeout * remove comment --- src/Simplex/Messaging/Transport/Client.hs | 36 ++++++++--------------- 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/src/Simplex/Messaging/Transport/Client.hs b/src/Simplex/Messaging/Transport/Client.hs index a943b36d9..da2c6c253 100644 --- a/src/Simplex/Messaging/Transport/Client.hs +++ b/src/Simplex/Messaging/Transport/Client.hs @@ -54,7 +54,6 @@ import Simplex.Messaging.Transport import Simplex.Messaging.Transport.KeepAlive import Simplex.Messaging.Util (bshow, catchAll, tshow, (<$?>)) import System.IO.Error -import System.Timeout (timeout) import Text.Read (readMaybe) import UnliftIO.Exception (IOException) import qualified UnliftIO.Exception as E @@ -139,35 +138,26 @@ runTransportClient :: Transport c => TransportClientConfig -> Maybe ByteString - runTransportClient = runTLSTransportClient supportedParameters Nothing runTLSTransportClient :: Transport c => T.Supported -> Maybe XS.CertificateStore -> TransportClientConfig -> Maybe ByteString -> TransportHost -> ServiceName -> Maybe C.KeyHash -> (c -> IO a) -> IO a -runTLSTransportClient tlsParams caStore_ cfg@TransportClientConfig {socksProxy, tcpConnectTimeout, tcpKeepAlive, clientCredentials, alpn} proxyUsername host port keyHash client = do +runTLSTransportClient tlsParams caStore_ cfg@TransportClientConfig {socksProxy, tcpKeepAlive, clientCredentials, alpn} proxyUsername host port keyHash client = do serverCert <- newEmptyTMVarIO let hostName = B.unpack $ strEncode host clientParams = mkTLSClientParams tlsParams caStore_ hostName port keyHash clientCredentials alpn serverCert - (connectTCP, tlsTimeout) = case socksProxy of - -- We use a much larger timeout for connections via SOCKS proxy, to allow the circuits created - -- in the socket connection that would otherwise timeout to be used in the next connection attempt. - -- Using standard timeout results in permanent timeout for the clients using SOCKS in cases - -- when SOCKS proxy is very slow (bad network, congestion in underlying network, etc.), - -- because SOCKS proxy destroys circuits when the last session using them is closed. - Just proxy -> (connectSocksClient proxy proxyUsername (hostAddr host), tcpConnectTimeout * 10) - _ -> (connectTCPClient hostName, tcpConnectTimeout) + connectTCP = case socksProxy of + Just proxy -> connectSocksClient proxy proxyUsername (hostAddr host) + _ -> connectTCPClient hostName c <- do sock <- connectTCP port mapM_ (setSocketKeepAlive sock) tcpKeepAlive `catchAll` \e -> logError ("Error setting TCP keep-alive" <> tshow e) let tCfg = clientTransportConfig cfg - tlsTimeout `timeout` connectTLS (Just hostName) tCfg clientParams sock >>= \case - Nothing -> do - close sock - logError "connection timed out" - fail "connection timed out" - Just tls -> do - chain <- - atomically (tryTakeTMVar serverCert) >>= \case - Nothing -> do - logError "onServerCertificate didn't fire or failed to get cert chain" - closeTLS tls >> error "onServerCertificate failed" - Just c -> pure c - getClientConnection tCfg chain tls + -- No TLS timeout to avoid failing connections via SOCKS + tls <- connectTLS (Just hostName) tCfg clientParams sock + chain <- + atomically (tryTakeTMVar serverCert) >>= \case + Nothing -> do + logError "onServerCertificate didn't fire or failed to get cert chain" + closeTLS tls >> error "onServerCertificate failed" + Just c -> pure c + getClientConnection tCfg chain tls client c `E.finally` closeConnection c where hostAddr = \case From 93fd424f86086c6f378b50e343f32ec47f8b0c3f Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 5 May 2024 17:13:26 +0100 Subject: [PATCH 10/10] 5.7.2.0 --- CHANGELOG.md | 7 +++++++ package.yaml | 2 +- simplexmq.cabal | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ffdaa7ff2..b5792195d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +# 5.7.2 + +SMP agent: +- fix connections failing when connecting via link due to race condition on slow network. +- remove concurrency limit when waiting for connection subscription. +- remove TLS timeout. + # 5.7.1 SMP agent: diff --git a/package.yaml b/package.yaml index ccf0794ae..7a536f7bf 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 5.7.1.0 +version: 5.7.2.0 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 9b5fa36b0..7539d675a 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 5.7.1.0 +version: 5.7.2.0 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and