mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-31 01:05:57 +00:00
agent: do not increase network activity interval while offline (#1159)
* agent: do not increase network activity interval while offline * test
This commit is contained in:
committed by
GitHub
parent
33f6d2f1da
commit
1bb6a5c43b
@@ -429,24 +429,16 @@ getNetworkConfig = fmap snd . readTVarIO . useNetworkConfig
|
||||
{-# INLINE getNetworkConfig #-}
|
||||
|
||||
setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO ()
|
||||
setUserNetworkInfo c@AgentClient {userNetworkInfo, userNetworkDelay} netInfo = withAgentEnv' c $ do
|
||||
ni <- asks $ userNetworkInterval . config
|
||||
let d = initialInterval ni
|
||||
off <- atomically $ do
|
||||
wasOnline <- isOnline <$> swapTVar userNetworkInfo netInfo
|
||||
let off = wasOnline && not (isOnline netInfo)
|
||||
when off $ writeTVar userNetworkDelay d
|
||||
pure off
|
||||
liftIO . when off . void . forkIO $
|
||||
growOfflineDelay 0 d ni
|
||||
setUserNetworkInfo c@AgentClient {userNetworkInfo, userNetworkUpdated} ni = withAgentEnv' c $ do
|
||||
ts' <- liftIO getCurrentTime
|
||||
i <- asks $ userOfflineDelay . config
|
||||
-- if network offline event happens in less than `userOfflineDelay` after the previous event, it is ignored
|
||||
atomically . whenM ((isOnline ni ||) <$> notRecentlyChanged ts' i) $ do
|
||||
writeTVar userNetworkInfo ni
|
||||
writeTVar userNetworkUpdated $ Just ts'
|
||||
where
|
||||
growOfflineDelay elapsed d ni = do
|
||||
online <- waitOnlineOrDelay c d
|
||||
unless online $ do
|
||||
let elapsed' = elapsed + d
|
||||
d' = nextRetryDelay elapsed' d ni
|
||||
atomically $ writeTVar userNetworkDelay d'
|
||||
growOfflineDelay elapsed' d' ni
|
||||
notRecentlyChanged ts' i =
|
||||
maybe True (\ts -> diffUTCTime ts' ts > i) <$> readTVar userNetworkUpdated
|
||||
|
||||
reconnectAllServers :: AgentClient -> IO ()
|
||||
reconnectAllServers c = do
|
||||
|
||||
@@ -104,7 +104,6 @@ module Simplex.Messaging.Agent.Client
|
||||
UserNetworkInfo (..),
|
||||
UserNetworkType (..),
|
||||
waitForUserNetwork,
|
||||
waitOnlineOrDelay,
|
||||
isNetworkOnline,
|
||||
isOnline,
|
||||
throwWhenInactive,
|
||||
@@ -155,7 +154,6 @@ import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Either (lefts, partitionEithers)
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (deleteFirstsBy, foldl', partition, (\\))
|
||||
import Data.List.NonEmpty (NonEmpty (..), (<|))
|
||||
import qualified Data.List.NonEmpty as L
|
||||
@@ -270,7 +268,7 @@ data AgentClient = AgentClient
|
||||
xftpClients :: TMap XFTPTransportSession XFTPClientVar,
|
||||
useNetworkConfig :: TVar (NetworkConfig, NetworkConfig), -- (slow, fast) networks
|
||||
userNetworkInfo :: TVar UserNetworkInfo,
|
||||
userNetworkDelay :: TVar Int64,
|
||||
userNetworkUpdated :: TVar (Maybe UTCTime),
|
||||
subscrConns :: TVar (Set ConnId),
|
||||
activeSubs :: TRcvQueues,
|
||||
pendingSubs :: TRcvQueues,
|
||||
@@ -434,7 +432,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
|
||||
xftpClients <- TM.empty
|
||||
useNetworkConfig <- newTVar (slowNetworkConfig netCfg, netCfg)
|
||||
userNetworkInfo <- newTVar $ UserNetworkInfo UNOther True
|
||||
userNetworkDelay <- newTVar $ initialInterval $ userNetworkInterval cfg
|
||||
userNetworkUpdated <- newTVar Nothing
|
||||
subscrConns <- newTVar S.empty
|
||||
activeSubs <- RQ.empty
|
||||
pendingSubs <- RQ.empty
|
||||
@@ -470,7 +468,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
|
||||
xftpClients,
|
||||
useNetworkConfig,
|
||||
userNetworkInfo,
|
||||
userNetworkDelay,
|
||||
userNetworkUpdated,
|
||||
subscrConns,
|
||||
activeSubs,
|
||||
pendingSubs,
|
||||
@@ -759,23 +757,9 @@ getNetworkConfig c = do
|
||||
|
||||
waitForUserNetwork :: AgentClient -> IO ()
|
||||
waitForUserNetwork c =
|
||||
unlessM (atomically $ isNetworkOnline c) $
|
||||
readTVarIO (userNetworkDelay c) >>= void . waitOnlineOrDelay c
|
||||
|
||||
waitOnlineOrDelay :: AgentClient -> Int64 -> IO Bool
|
||||
waitOnlineOrDelay c t = do
|
||||
let maxWait = min t $ fromIntegral (maxBound :: Int)
|
||||
t' = t - maxWait
|
||||
delay <- registerDelay $ fromIntegral maxWait
|
||||
online <-
|
||||
atomically $ do
|
||||
expired <- readTVar delay
|
||||
online <- isNetworkOnline c
|
||||
unless (expired || online) retry
|
||||
pure online
|
||||
if online || t' <= 0
|
||||
then pure online
|
||||
else waitOnlineOrDelay c t'
|
||||
unlessM (atomically $ isNetworkOnline c) $ do
|
||||
delay <- registerDelay $ userNetworkInterval $ config $ agentEnv c
|
||||
atomically $ unlessM (isNetworkOnline c) $ unlessM (readTVar delay) retry
|
||||
|
||||
closeAgentClient :: AgentClient -> IO ()
|
||||
closeAgentClient c = do
|
||||
|
||||
@@ -92,7 +92,8 @@ data AgentConfig = AgentConfig
|
||||
xftpCfg :: XFTPClientConfig,
|
||||
reconnectInterval :: RetryInterval,
|
||||
messageRetryInterval :: RetryInterval2,
|
||||
userNetworkInterval :: RetryInterval,
|
||||
userNetworkInterval :: Int,
|
||||
userOfflineDelay :: NominalDiffTime,
|
||||
messageTimeout :: NominalDiffTime,
|
||||
connDeleteDeliveryTimeout :: NominalDiffTime,
|
||||
helloTimeout :: NominalDiffTime,
|
||||
@@ -147,14 +148,6 @@ defaultMessageRetryInterval =
|
||||
}
|
||||
}
|
||||
|
||||
defaultUserNetworkInterval :: RetryInterval
|
||||
defaultUserNetworkInterval =
|
||||
RetryInterval
|
||||
{ initialInterval = 1200_000000, -- 20 minutes
|
||||
increaseAfter = 0,
|
||||
maxInterval = 7200_000000 -- 2 hours
|
||||
}
|
||||
|
||||
defaultAgentConfig :: AgentConfig
|
||||
defaultAgentConfig =
|
||||
AgentConfig
|
||||
@@ -170,7 +163,8 @@ defaultAgentConfig =
|
||||
xftpCfg = defaultXFTPClientConfig,
|
||||
reconnectInterval = defaultReconnectInterval,
|
||||
messageRetryInterval = defaultMessageRetryInterval,
|
||||
userNetworkInterval = defaultUserNetworkInterval,
|
||||
userNetworkInterval = 1800_000000, -- 30 minutes, should be less than Int32 max value
|
||||
userOfflineDelay = 2, -- if network offline event happens in less than 2 seconds after it was set online, it is ignored
|
||||
messageTimeout = 2 * nominalDay,
|
||||
connDeleteDeliveryTimeout = 2 * nominalDay,
|
||||
helloTimeout = 2 * nominalDay,
|
||||
@@ -179,7 +173,7 @@ defaultAgentConfig =
|
||||
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
|
||||
cleanupStepInterval = 200000, -- 200ms
|
||||
maxWorkerRestartsPerMin = 5,
|
||||
-- 3 consecutive subscription timeouts will result in alert to the user
|
||||
-- 5 consecutive subscription timeouts will result in alert to the user
|
||||
-- this is a fallback, as the timeout set to 3x of expected timeout, to avoid potential locking.
|
||||
maxSubscriptionTimeouts = 5,
|
||||
storedMsgDataTTL = 21 * nominalDay,
|
||||
|
||||
@@ -77,7 +77,6 @@ import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestSte
|
||||
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), createAgentStore)
|
||||
import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, REQ)
|
||||
import qualified Simplex.Messaging.Agent.Protocol as A
|
||||
import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..))
|
||||
import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), SQLiteStore (dbNew))
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Common (withTransaction')
|
||||
import Simplex.Messaging.Client (NetworkConfig (..), ProtocolClientConfig (..), TransportSessionMode (TSMEntity, TSMUser), defaultClientConfig)
|
||||
@@ -435,7 +434,7 @@ functionalAPITests t = do
|
||||
it "send delivery receipts concurrently with messages" $ testDeliveryReceiptsConcurrent t
|
||||
describe "user network info" $ do
|
||||
it "should wait for user network" testWaitForUserNetwork
|
||||
it "should not reset offline interval while offline" testDoNotResetOfflineInterval
|
||||
it "should not reset online to offline if happens too quickly" testDoNotResetOnlineToOffline
|
||||
it "should resume multiple threads" testResumeMultipleThreads
|
||||
|
||||
testBasicAuth :: ATransport -> Bool -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> IO Int
|
||||
@@ -2698,11 +2697,8 @@ testWaitForUserNetwork = do
|
||||
a <- getSMPAgentClient' 1 aCfg initAgentServers testDB
|
||||
noNetworkDelay a
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNNone False
|
||||
threadDelay 5000
|
||||
networkDelay a 100000
|
||||
networkDelay a 150000
|
||||
networkDelay a 200000
|
||||
networkDelay a 200000
|
||||
networkDelay a 100000
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNCellular True
|
||||
noNetworkDelay a
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNCellular False
|
||||
@@ -2712,26 +2708,29 @@ testWaitForUserNetwork = do
|
||||
(networkDelay a 50000)
|
||||
noNetworkDelay a
|
||||
where
|
||||
aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 100000, increaseAfter = 0, maxInterval = 200000}}
|
||||
aCfg = agentCfg {userNetworkInterval = 100000, userOfflineDelay = 0}
|
||||
|
||||
testDoNotResetOfflineInterval :: IO ()
|
||||
testDoNotResetOfflineInterval = do
|
||||
testDoNotResetOnlineToOffline :: IO ()
|
||||
testDoNotResetOnlineToOffline = do
|
||||
a <- getSMPAgentClient' 1 aCfg initAgentServers testDB
|
||||
noNetworkDelay a
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNWifi False
|
||||
threadDelay 5000
|
||||
networkDelay a 100000
|
||||
networkDelay a 150000
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNCellular False
|
||||
networkDelay a 200000
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNNone False
|
||||
networkDelay a 200000
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNCellular True
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNWifi False
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNWifi True
|
||||
noNetworkDelay a
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNCellular False
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNWifi False -- ingnored
|
||||
noNetworkDelay a
|
||||
threadDelay 100000
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNWifi False
|
||||
networkDelay a 100000
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNNone False
|
||||
networkDelay a 100000
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNWifi True
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNNone False -- ingnored
|
||||
noNetworkDelay a
|
||||
where
|
||||
aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 100000, increaseAfter = 0, maxInterval = 200000}}
|
||||
aCfg = agentCfg {userNetworkInterval = 100000, userOfflineDelay = 0.1}
|
||||
|
||||
testResumeMultipleThreads :: IO ()
|
||||
testResumeMultipleThreads = do
|
||||
@@ -2746,14 +2745,14 @@ testResumeMultipleThreads = do
|
||||
threadDelay 1000000
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNCellular True
|
||||
ts <- mapM (atomically . readTMVar) vs
|
||||
print $ minimum ts
|
||||
print $ maximum ts
|
||||
print $ sum ts `div` fromIntegral (length ts)
|
||||
-- print $ minimum ts
|
||||
-- print $ maximum ts
|
||||
-- print $ sum ts `div` fromIntegral (length ts)
|
||||
let average = sum ts `div` fromIntegral (length ts)
|
||||
average < 3000000 `shouldBe` True
|
||||
maximum ts < 4000000 `shouldBe` True
|
||||
where
|
||||
aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 1000000, increaseAfter = 0, maxInterval = 3600_000_000}}
|
||||
aCfg = agentCfg {userOfflineDelay = 0}
|
||||
|
||||
noNetworkDelay :: AgentClient -> IO ()
|
||||
noNetworkDelay a = do
|
||||
|
||||
Reference in New Issue
Block a user