diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 12663362d..652e2d58d 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -429,24 +429,16 @@ getNetworkConfig = fmap snd . readTVarIO . useNetworkConfig {-# INLINE getNetworkConfig #-} setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO () -setUserNetworkInfo c@AgentClient {userNetworkInfo, userNetworkDelay} netInfo = withAgentEnv' c $ do - ni <- asks $ userNetworkInterval . config - let d = initialInterval ni - off <- atomically $ do - wasOnline <- isOnline <$> swapTVar userNetworkInfo netInfo - let off = wasOnline && not (isOnline netInfo) - when off $ writeTVar userNetworkDelay d - pure off - liftIO . when off . void . forkIO $ - growOfflineDelay 0 d ni +setUserNetworkInfo c@AgentClient {userNetworkInfo, userNetworkUpdated} ni = withAgentEnv' c $ do + ts' <- liftIO getCurrentTime + i <- asks $ userOfflineDelay . config + -- if network offline event happens in less than `userOfflineDelay` after the previous event, it is ignored + atomically . whenM ((isOnline ni ||) <$> notRecentlyChanged ts' i) $ do + writeTVar userNetworkInfo ni + writeTVar userNetworkUpdated $ Just ts' where - growOfflineDelay elapsed d ni = do - online <- waitOnlineOrDelay c d - unless online $ do - let elapsed' = elapsed + d - d' = nextRetryDelay elapsed' d ni - atomically $ writeTVar userNetworkDelay d' - growOfflineDelay elapsed' d' ni + notRecentlyChanged ts' i = + maybe True (\ts -> diffUTCTime ts' ts > i) <$> readTVar userNetworkUpdated reconnectAllServers :: AgentClient -> IO () reconnectAllServers c = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 1720b2c0f..2299b2183 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -104,7 +104,6 @@ module Simplex.Messaging.Agent.Client UserNetworkInfo (..), UserNetworkType (..), waitForUserNetwork, - waitOnlineOrDelay, isNetworkOnline, isOnline, throwWhenInactive, @@ -155,7 +154,6 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Either (lefts, partitionEithers) import Data.Functor (($>)) -import Data.Int (Int64) import Data.List (deleteFirstsBy, foldl', partition, (\\)) import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L @@ -270,7 +268,7 @@ data AgentClient = AgentClient xftpClients :: TMap XFTPTransportSession XFTPClientVar, useNetworkConfig :: TVar (NetworkConfig, NetworkConfig), -- (slow, fast) networks userNetworkInfo :: TVar UserNetworkInfo, - userNetworkDelay :: TVar Int64, + userNetworkUpdated :: TVar (Maybe UTCTime), subscrConns :: TVar (Set ConnId), activeSubs :: TRcvQueues, pendingSubs :: TRcvQueues, @@ -434,7 +432,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = xftpClients <- TM.empty useNetworkConfig <- newTVar (slowNetworkConfig netCfg, netCfg) userNetworkInfo <- newTVar $ UserNetworkInfo UNOther True - userNetworkDelay <- newTVar $ initialInterval $ userNetworkInterval cfg + userNetworkUpdated <- newTVar Nothing subscrConns <- newTVar S.empty activeSubs <- RQ.empty pendingSubs <- RQ.empty @@ -470,7 +468,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = xftpClients, useNetworkConfig, userNetworkInfo, - userNetworkDelay, + userNetworkUpdated, subscrConns, activeSubs, pendingSubs, @@ -759,23 +757,9 @@ getNetworkConfig c = do waitForUserNetwork :: AgentClient -> IO () waitForUserNetwork c = - unlessM (atomically $ isNetworkOnline c) $ - readTVarIO (userNetworkDelay c) >>= void . waitOnlineOrDelay c - -waitOnlineOrDelay :: AgentClient -> Int64 -> IO Bool -waitOnlineOrDelay c t = do - let maxWait = min t $ fromIntegral (maxBound :: Int) - t' = t - maxWait - delay <- registerDelay $ fromIntegral maxWait - online <- - atomically $ do - expired <- readTVar delay - online <- isNetworkOnline c - unless (expired || online) retry - pure online - if online || t' <= 0 - then pure online - else waitOnlineOrDelay c t' + unlessM (atomically $ isNetworkOnline c) $ do + delay <- registerDelay $ userNetworkInterval $ config $ agentEnv c + atomically $ unlessM (isNetworkOnline c) $ unlessM (readTVar delay) retry closeAgentClient :: AgentClient -> IO () closeAgentClient c = do diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 07d3f29a8..9613adf3c 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -92,7 +92,8 @@ data AgentConfig = AgentConfig xftpCfg :: XFTPClientConfig, reconnectInterval :: RetryInterval, messageRetryInterval :: RetryInterval2, - userNetworkInterval :: RetryInterval, + userNetworkInterval :: Int, + userOfflineDelay :: NominalDiffTime, messageTimeout :: NominalDiffTime, connDeleteDeliveryTimeout :: NominalDiffTime, helloTimeout :: NominalDiffTime, @@ -147,14 +148,6 @@ defaultMessageRetryInterval = } } -defaultUserNetworkInterval :: RetryInterval -defaultUserNetworkInterval = - RetryInterval - { initialInterval = 1200_000000, -- 20 minutes - increaseAfter = 0, - maxInterval = 7200_000000 -- 2 hours - } - defaultAgentConfig :: AgentConfig defaultAgentConfig = AgentConfig @@ -170,7 +163,8 @@ defaultAgentConfig = xftpCfg = defaultXFTPClientConfig, reconnectInterval = defaultReconnectInterval, messageRetryInterval = defaultMessageRetryInterval, - userNetworkInterval = defaultUserNetworkInterval, + userNetworkInterval = 1800_000000, -- 30 minutes, should be less than Int32 max value + userOfflineDelay = 2, -- if network offline event happens in less than 2 seconds after it was set online, it is ignored messageTimeout = 2 * nominalDay, connDeleteDeliveryTimeout = 2 * nominalDay, helloTimeout = 2 * nominalDay, @@ -179,7 +173,7 @@ defaultAgentConfig = cleanupInterval = 30 * 60 * 1000000, -- 30 minutes cleanupStepInterval = 200000, -- 200ms maxWorkerRestartsPerMin = 5, - -- 3 consecutive subscription timeouts will result in alert to the user + -- 5 consecutive subscription timeouts will result in alert to the user -- this is a fallback, as the timeout set to 3x of expected timeout, to avoid potential locking. maxSubscriptionTimeouts = 5, storedMsgDataTTL = 21 * nominalDay, diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 16f96bee4..7cf1ab00a 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -77,7 +77,6 @@ import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestSte import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), createAgentStore) import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, REQ) import qualified Simplex.Messaging.Agent.Protocol as A -import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..)) import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), SQLiteStore (dbNew)) import Simplex.Messaging.Agent.Store.SQLite.Common (withTransaction') import Simplex.Messaging.Client (NetworkConfig (..), ProtocolClientConfig (..), TransportSessionMode (TSMEntity, TSMUser), defaultClientConfig) @@ -435,7 +434,7 @@ functionalAPITests t = do it "send delivery receipts concurrently with messages" $ testDeliveryReceiptsConcurrent t describe "user network info" $ do it "should wait for user network" testWaitForUserNetwork - it "should not reset offline interval while offline" testDoNotResetOfflineInterval + it "should not reset online to offline if happens too quickly" testDoNotResetOnlineToOffline it "should resume multiple threads" testResumeMultipleThreads testBasicAuth :: ATransport -> Bool -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> IO Int @@ -2698,11 +2697,8 @@ testWaitForUserNetwork = do a <- getSMPAgentClient' 1 aCfg initAgentServers testDB noNetworkDelay a setUserNetworkInfo a $ UserNetworkInfo UNNone False - threadDelay 5000 networkDelay a 100000 - networkDelay a 150000 - networkDelay a 200000 - networkDelay a 200000 + networkDelay a 100000 setUserNetworkInfo a $ UserNetworkInfo UNCellular True noNetworkDelay a setUserNetworkInfo a $ UserNetworkInfo UNCellular False @@ -2712,26 +2708,29 @@ testWaitForUserNetwork = do (networkDelay a 50000) noNetworkDelay a where - aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 100000, increaseAfter = 0, maxInterval = 200000}} + aCfg = agentCfg {userNetworkInterval = 100000, userOfflineDelay = 0} -testDoNotResetOfflineInterval :: IO () -testDoNotResetOfflineInterval = do +testDoNotResetOnlineToOffline :: IO () +testDoNotResetOnlineToOffline = do a <- getSMPAgentClient' 1 aCfg initAgentServers testDB noNetworkDelay a setUserNetworkInfo a $ UserNetworkInfo UNWifi False - threadDelay 5000 networkDelay a 100000 - networkDelay a 150000 - setUserNetworkInfo a $ UserNetworkInfo UNCellular False - networkDelay a 200000 - setUserNetworkInfo a $ UserNetworkInfo UNNone False - networkDelay a 200000 - setUserNetworkInfo a $ UserNetworkInfo UNCellular True + setUserNetworkInfo a $ UserNetworkInfo UNWifi False + setUserNetworkInfo a $ UserNetworkInfo UNWifi True noNetworkDelay a - setUserNetworkInfo a $ UserNetworkInfo UNCellular False + setUserNetworkInfo a $ UserNetworkInfo UNWifi False -- ingnored + noNetworkDelay a + threadDelay 100000 + setUserNetworkInfo a $ UserNetworkInfo UNWifi False networkDelay a 100000 + setUserNetworkInfo a $ UserNetworkInfo UNNone False + networkDelay a 100000 + setUserNetworkInfo a $ UserNetworkInfo UNWifi True + setUserNetworkInfo a $ UserNetworkInfo UNNone False -- ingnored + noNetworkDelay a where - aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 100000, increaseAfter = 0, maxInterval = 200000}} + aCfg = agentCfg {userNetworkInterval = 100000, userOfflineDelay = 0.1} testResumeMultipleThreads :: IO () testResumeMultipleThreads = do @@ -2746,14 +2745,14 @@ testResumeMultipleThreads = do threadDelay 1000000 setUserNetworkInfo a $ UserNetworkInfo UNCellular True ts <- mapM (atomically . readTMVar) vs - print $ minimum ts - print $ maximum ts - print $ sum ts `div` fromIntegral (length ts) + -- print $ minimum ts + -- print $ maximum ts + -- print $ sum ts `div` fromIntegral (length ts) let average = sum ts `div` fromIntegral (length ts) average < 3000000 `shouldBe` True maximum ts < 4000000 `shouldBe` True where - aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 1000000, increaseAfter = 0, maxInterval = 3600_000_000}} + aCfg = agentCfg {userOfflineDelay = 0} noNetworkDelay :: AgentClient -> IO () noNetworkDelay a = do