From f50589b31a5d25032a0c8ad7e3a25010d4757b3d Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Tue, 21 May 2024 22:52:22 +0100 Subject: [PATCH 1/2] agent: remove external timeout to resubscribe (#1164) * agent: remove external timeout to resubscribe * liftIO * fix tests --- src/Simplex/Messaging/Agent/Client.hs | 52 ++++++++--------------- src/Simplex/Messaging/Agent/Env/SQLite.hs | 4 -- tests/AgentTests.hs | 28 ++++++------ tests/AgentTests/FunctionalAPITests.hs | 1 + tests/AgentTests/NotificationTests.hs | 17 ++++---- 5 files changed, 44 insertions(+), 58 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 2299b2183..30e1f0aa8 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -608,12 +608,11 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = atomically $ putTMVar (sessionVar v) a runSubWorker = do ri <- asks $ reconnectInterval . config - timeoutCounts <- newTVarIO 0 withRetryInterval ri $ \_ loop -> do pending <- atomically getPending forM_ (L.nonEmpty pending) $ \qs -> do - lift $ waitForUserNetwork c - void . tryAgentError' $ reconnectSMPClient timeoutCounts c tSess qs + liftIO $ waitForUserNetwork c + reconnectSMPClient c tSess qs loop getPending = RQ.getSessQueues tSess $ pendingSubs c cleanup :: SessionVar (Async ()) -> STM () @@ -623,38 +622,23 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = whenM (isEmptyTMVar $ sessionVar v) retry removeSessVar v tSess smpSubWorkers -reconnectSMPClient :: TVar Int -> AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> AM () -reconnectSMPClient tc c tSess@(_, srv, _) qs = do - NetworkConfig {tcpTimeout} <- atomically $ getNetworkConfig c - -- this allows 3x of timeout per batch of subscription (90 queues per batch empirically) - let t = (length qs `div` 90 + 1) * tcpTimeout * 3 - ExceptT (sequence <$> (t `timeout` runExceptT resubscribe)) >>= \case - Just _ -> resetTimeouts - -- reset and do not report consecutive timeouts while offline - Nothing -> ifM (atomically $ isNetworkOnline c) notifyTimeout resetTimeouts +reconnectSMPClient :: AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> AM' () +reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do + cs <- readTVarIO $ RQ.getConnections $ activeSubs c + rs <- subscribeQueues c $ L.toList qs + let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs + conns = filter (`M.notMember` cs) okConns + unless (null conns) $ notifySub "" $ UP srv conns + let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs + mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs + forM_ (listToMaybe tempErrs) $ \(connId, e) -> do + when (null okConns && M.null cs && null finalErrs) . liftIO $ + closeClient c smpClients tSess + notifySub connId $ ERR e where - resetTimeouts = atomically $ writeTVar tc 0 - notifyTimeout = do - tc' <- atomically $ stateTVar tc $ \i -> (i + 1, i + 1) - maxTC <- asks $ maxSubscriptionTimeouts . config - when (tc' >= maxTC) $ do - let msg = show tc' <> " consecutive subscription timeouts: " <> show (length qs) <> " queues, transport session: " <> show tSess - atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ INTERNAL msg) - resubscribe :: AM () - resubscribe = do - cs <- readTVarIO $ RQ.getConnections $ activeSubs c - rs <- lift . subscribeQueues c $ L.toList qs - let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs - liftIO $ do - let conns = filter (`M.notMember` cs) okConns - unless (null conns) $ notifySub "" $ UP srv conns - let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs - liftIO $ mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs - forM_ (listToMaybe tempErrs) $ \(_, err) -> do - when (null okConns && M.null cs && null finalErrs) . liftIO $ - closeClient c smpClients tSess - throwError err - notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO () + handleNotify :: AM' () -> AM' () + handleNotify = E.handleAny $ notifySub "" . ERR . INTERNAL . show + notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> AM' () notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd) getNtfServerClient :: AgentClient -> NtfTransportSession -> AM NtfClient diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 9613adf3c..b753a4226 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -102,7 +102,6 @@ data AgentConfig = AgentConfig cleanupInterval :: Int64, cleanupStepInterval :: Int, maxWorkerRestartsPerMin :: Int, - maxSubscriptionTimeouts :: Int, storedMsgDataTTL :: NominalDiffTime, rcvFilesTTL :: NominalDiffTime, sndFilesTTL :: NominalDiffTime, @@ -173,9 +172,6 @@ defaultAgentConfig = cleanupInterval = 30 * 60 * 1000000, -- 30 minutes cleanupStepInterval = 200000, -- 200ms maxWorkerRestartsPerMin = 5, - -- 5 consecutive subscription timeouts will result in alert to the user - -- this is a fallback, as the timeout set to 3x of expected timeout, to avoid potential locking. - maxSubscriptionTimeouts = 5, storedMsgDataTTL = 21 * nominalDay, rcvFilesTTL = 2 * nominalDay, sndFilesTTL = nominalDay, diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index 32610b54e..9572aed2f 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -95,23 +95,24 @@ type AEntityTransmission p e = (ACorrId, ConnId, ACommand p e) type AEntityTransmissionOrError p e = (ACorrId, ConnId, Either AgentErrorType (ACommand p e)) tGetAgent :: Transport c => c -> IO (AEntityTransmissionOrError 'Agent 'AEConn) -tGetAgent = tGetAgent' +tGetAgent = tGetAgent' True -tGetAgent' :: forall c e. (Transport c, AEntityI e) => c -> IO (AEntityTransmissionOrError 'Agent e) -tGetAgent' h = do - (corrId, connId, cmdOrErr) <- pGetAgent h +tGetAgent' :: forall c e. (Transport c, AEntityI e) => Bool -> c -> IO (AEntityTransmissionOrError 'Agent e) +tGetAgent' skipErr h = do + (corrId, connId, cmdOrErr) <- pGetAgent skipErr h case cmdOrErr of Right (APC e cmd) -> case testEquality e (sAEntity @e) of Just Refl -> pure (corrId, connId, Right cmd) _ -> error $ "unexpected command " <> show cmd Left err -> pure (corrId, connId, Left err) -pGetAgent :: forall c. Transport c => c -> IO (ATransmissionOrError 'Agent) -pGetAgent h = do +pGetAgent :: forall c. Transport c => Bool -> c -> IO (ATransmissionOrError 'Agent) +pGetAgent skipErr h = do (corrId, connId, cmdOrErr) <- tGet SAgent h case cmdOrErr of - Right (APC _ CONNECT {}) -> pGetAgent h - Right (APC _ DISCONNECT {}) -> pGetAgent h + Right (APC _ CONNECT {}) -> pGetAgent skipErr h + Right (APC _ DISCONNECT {}) -> pGetAgent skipErr h + Right (APC _ (ERR (BROKER _ NETWORK))) | skipErr -> pGetAgent skipErr h cmd -> pure (corrId, connId, cmd) -- | receive message to handle `h` @@ -119,15 +120,18 @@ pGetAgent h = do (<#:) = tGetAgent (<#:?) :: Transport c => c -> IO (ATransmissionOrError 'Agent) -(<#:?) = pGetAgent +(<#:?) = pGetAgent True (<#:.) :: Transport c => c -> IO (AEntityTransmissionOrError 'Agent 'AENone) -(<#:.) = tGetAgent' +(<#:.) = tGetAgent' True -- | send transmission `t` to handle `h` and get response (#:) :: Transport c => c -> (ByteString, ByteString, ByteString) -> IO (AEntityTransmissionOrError 'Agent 'AEConn) h #: t = tPutRaw h t >> (<#:) h +(#:!) :: Transport c => c -> (ByteString, ByteString, ByteString) -> IO (AEntityTransmissionOrError 'Agent 'AEConn) +h #:! t = tPutRaw h t >> tGetAgent' False h + -- | action and expected response -- `h #:t #> r` is the test that sends `t` to `h` and validates that the response is `r` (#>) :: IO (AEntityTransmissionOrError 'Agent 'AEConn) -> AEntityTransmission 'Agent 'AEConn -> Expectation @@ -426,8 +430,8 @@ testServerConnectionAfterError t _ = do withAgent1 $ \bob -> do withAgent2 $ \alice -> do - bob #: ("1", "alice", "SUB") =#> \("1", "alice", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT - alice #: ("1", "bob", "SUB") =#> \("1", "bob", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT + bob #:! ("1", "alice", "SUB") =#> \("1", "alice", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT + alice #:! ("1", "bob", "SUB") =#> \("1", "bob", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT withServer $ do alice <#=? \case ("", "bob", APC _ (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False alice <#=? \case ("", "bob", APC _ (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 7cf1ab00a..bb1d34b10 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -144,6 +144,7 @@ pGet c = do case cmd of CONNECT {} -> pGet c DISCONNECT {} -> pGet c + ERR (BROKER _ NETWORK) -> pGet c _ -> pure t pattern CONF :: ConfirmationId -> [SMPServer] -> ConnInfo -> ACommand 'Agent e diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index 2c1045791..cf1c4783a 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -6,8 +6,8 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE TypeApplications #-} -{-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-} {-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} +{-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-} module AgentTests.NotificationTests where @@ -17,10 +17,6 @@ import AgentTests.FunctionalAPITests createConnection, exchangeGreetingsMsgId, get, - withAgent, - withAgentClients2, - withAgentClientsCfgServers2, - withAgentClients3, joinConnection, makeConnection, nGet, @@ -29,7 +25,11 @@ import AgentTests.FunctionalAPITests sendMessage, switchComplete, testServerMatrix2, + withAgent, + withAgentClients2, + withAgentClients3, withAgentClientsCfg2, + withAgentClientsCfgServers2, (##>), (=##>), pattern CON, @@ -59,8 +59,8 @@ import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO) import Simplex.Messaging.Agent.Store.SQLite (getSavedNtfToken) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Notifications.Server.Env (NtfServerConfig (..)) import Simplex.Messaging.Notifications.Protocol +import Simplex.Messaging.Notifications.Server.Env (NtfServerConfig (..)) import Simplex.Messaging.Notifications.Server.Push.APNS import Simplex.Messaging.Notifications.Types (NtfToken (..)) import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), NtfServer, ProtocolServer (..), SMPMsgMeta (..), SubscriptionMode (..)) @@ -151,7 +151,8 @@ testNtfMatrix t runTest = do it "next servers: SMP v7, NTF v2; curr clients: v6/v1" $ runNtfTestCfg t cfgV7 ntfServerCfgV2 agentCfg agentCfg runTest it "curr servers: SMP v6, NTF v1; curr clients: v6/v1" $ runNtfTestCfg t cfg ntfServerCfg agentCfg agentCfg runTest skip "this case cannot be supported - see RFC" $ - it "servers: SMP v6, NTF v1; clients: v7/v2 (not supported)" $ runNtfTestCfg t cfg ntfServerCfg agentCfgV7 agentCfgV7 runTest + it "servers: SMP v6, NTF v1; clients: v7/v2 (not supported)" $ + runNtfTestCfg t cfg ntfServerCfg agentCfgV7 agentCfgV7 runTest -- servers can be migrated in any order it "servers: next SMP v7, curr NTF v1; curr clients: v6/v1" $ runNtfTestCfg t cfgV7 ntfServerCfg agentCfg agentCfg runTest it "servers: curr SMP v6, next NTF v2; curr clients: v6/v1" $ runNtfTestCfg t cfg ntfServerCfgV2 agentCfg agentCfg runTest @@ -258,7 +259,7 @@ testNtfTokenServerRestart t APNSMockServer {apnsQ} = do atomically $ readTBQueue apnsQ liftIO $ sendApnsResponse APNSRespOk pure ntfData - -- the new agent is created as otherwise when running the tests in CI the old agent was keeping the connection to the server + -- the new agent is created as otherwise when running the tests in CI the old agent was keeping the connection to the server threadDelay 1000000 withAgent 2 agentCfg initAgentServers testDB $ \a' -> -- server stopped before token is verified, so now the attempt to verify it will return AUTH error but re-register token, From 769e54db76fc33a04085410cdf78ae77ed9717b6 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Tue, 21 May 2024 22:58:58 +0100 Subject: [PATCH 2/2] 5.7.4.1 --- CHANGELOG.md | 7 +++++++ package.yaml | 2 +- simplexmq.cabal | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0093ba92..e2a7bb5b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +# 5.7.4 + +SMP agent: +- remove re-subscription timeouts (as they are tracked per operation, and could cause failed subscriptions). +- reconnect XFTP clients when network settings changes. +- fix lock contention resulting in stuck subscriptions on network change. + # 5.7.3 SMP/NTF protocol: diff --git a/package.yaml b/package.yaml index 7ba64f531..a1f4a225a 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 5.7.4.0 +version: 5.7.4.1 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 19c99acc4..9bfa146c1 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 5.7.4.0 +version: 5.7.4.1 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and