diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 415ead6c0..a5ba40f4a 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -180,7 +180,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _} -> do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do - lift $ waitForUserNetwork c + liftIO $ waitForUserNetwork c downloadFileChunk fc replica `catchAgentError` \e -> retryOnError "XFTP rcv worker" (retryLoop loop e delay') (retryDone e) e where @@ -425,7 +425,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do tryCreate = do usedSrvs <- newTVarIO ([] :: [XFTPServer]) withRetryInterval (riFast ri) $ \_ loop -> do - lift $ waitForUserNetwork c + liftIO $ waitForUserNetwork c createWithNextSrv usedSrvs `catchAgentError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop) (throwError e) e where @@ -458,7 +458,7 @@ runXFTPSndWorker c srv Worker {doWork} = do fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do - lift $ waitForUserNetwork c + liftIO $ waitForUserNetwork c uploadFileChunk cfg fc replica `catchAgentError` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e where @@ -625,7 +625,7 @@ runXFTPDelWorker c srv Worker {doWork} = do processDeletedReplica replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} = do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do - lift $ waitForUserNetwork c + liftIO $ waitForUserNetwork c deleteChunkReplica `catchAgentError` \e -> retryOnError "XFTP del worker" (retryLoop loop e delay') (retryDone e) e where diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index d4fb11dd5..b1aa5070a 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -430,22 +430,29 @@ getNetworkConfig = fmap snd . readTVarIO . useNetworkConfig {-# INLINE getNetworkConfig #-} setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO () -setUserNetworkInfo c@AgentClient {userNetworkState} UserNetworkInfo {networkType = nt', online} = withAgentEnv' c $ do - d <- asks $ initialInterval . userNetworkInterval . config - ts <- liftIO getCurrentTime - atomically $ do - ns@UserNetworkState {networkType = nt, offline} <- readTVar userNetworkState - when (nt' /= nt || online /= isNothing offline) $ - writeTVar userNetworkState $! - let offline' - | nt' /= UNNone && online = Nothing - | isJust offline = offline - | otherwise = Just UNSOffline {offlineDelay = d, offlineFrom = ts} - in ns {networkType = nt', offline = offline'} +setUserNetworkInfo c@AgentClient {userNetworkInfo, userNetworkDelay} netInfo = withAgentEnv' c $ do + ni <- asks $ userNetworkInterval . config + let d = initialInterval ni + off <- atomically $ do + wasOnline <- isOnline <$> swapTVar userNetworkInfo netInfo + let off = wasOnline && not (isOnline netInfo) + when off $ writeTVar userNetworkDelay d + pure off + liftIO . when off . void . forkIO $ + growOfflineDelay 0 d ni + where + growOfflineDelay elapsed d ni = do + online <- waitOnlineOrDelay c d + unless online $ do + let elapsed' = elapsed + d + d' = nextRetryDelay elapsed' d ni + atomically $ writeTVar userNetworkDelay d' + growOfflineDelay elapsed' d' ni reconnectAllServers :: AgentClient -> IO () reconnectAllServers c = do reconnectServerClients c smpClients + reconnectServerClients c xftpClients reconnectServerClients c ntfClients -- | Register device notifications token @@ -1317,7 +1324,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq (Worker {doWork let mId = unId msgId ri' = maybe id updateRetryInterval2 msgRetryState ri withRetryLock2 ri' qLock $ \riState loop -> do - lift $ waitForUserNetwork c + liftIO $ waitForUserNetwork c resp <- tryError $ case msgType of AM_CONN_INFO -> sendConfirmation c sq msgBody AM_CONN_INFO_REPLY -> sendConfirmation c sq msgBody diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 13adb9fdc..fb595f1c0 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -108,9 +108,10 @@ module Simplex.Messaging.Agent.Client waitUntilActive, UserNetworkInfo (..), UserNetworkType (..), - UserNetworkState (..), - UNSOffline (..), waitForUserNetwork, + waitOnlineOrDelay, + isNetworkOnline, + isOnline, throwWhenInactive, throwWhenNoDelivery, beginAgentOperation, @@ -173,7 +174,7 @@ import Data.Set (Set) import qualified Data.Set as S import Data.Text (Text) import Data.Text.Encoding -import Data.Time (UTCTime, defaultTimeLocale, diffUTCTime, formatTime, getCurrentTime) +import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime) import Data.Time.Clock.System (getSystemTime) import Data.Word (Word16) import qualified Database.SQLite.Simple as SQL @@ -281,7 +282,8 @@ data AgentClient = AgentClient xftpServers :: TMap UserId (NonEmpty XFTPServerWithAuth), xftpClients :: TMap XFTPTransportSession XFTPClientVar, useNetworkConfig :: TVar (NetworkConfig, NetworkConfig), -- (slow, fast) networks - userNetworkState :: TVar UserNetworkState, + userNetworkInfo :: TVar UserNetworkInfo, + userNetworkDelay :: TVar Int64, subscrConns :: TVar (Set ConnId), activeSubs :: TRcvQueues, pendingSubs :: TRcvQueues, @@ -426,22 +428,20 @@ data UserNetworkInfo = UserNetworkInfo } deriving (Show) +isNetworkOnline :: AgentClient -> STM Bool +isNetworkOnline c = isOnline <$> readTVar (userNetworkInfo c) + +isOnline :: UserNetworkInfo -> Bool +isOnline UserNetworkInfo {networkType, online} = networkType /= UNNone && online + data UserNetworkType = UNNone | UNCellular | UNWifi | UNEthernet | UNOther deriving (Eq, Show) -data UserNetworkState = UserNetworkState - { networkType :: UserNetworkType, - offline :: Maybe UNSOffline - } - deriving (Show) - -data UNSOffline = UNSOffline {offlineDelay :: Int64, offlineFrom :: UTCTime} - deriving (Show) - -- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's. newAgentClient :: Int -> InitialAgentServers -> Env -> STM AgentClient newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do - let qSize = tbqSize $ config agentEnv + let cfg = config agentEnv + qSize = tbqSize cfg acThread <- newTVar Nothing active <- newTVar True rcvQ <- newTBQueue qSize @@ -455,7 +455,8 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = xftpServers <- newTVar xftp xftpClients <- TM.empty useNetworkConfig <- newTVar (slowNetworkConfig netCfg, netCfg) - userNetworkState <- newTVar $ UserNetworkState UNOther Nothing + userNetworkInfo <- newTVar $ UserNetworkInfo UNOther True + userNetworkDelay <- newTVar $ initialInterval $ userNetworkInterval cfg subscrConns <- newTVar S.empty activeSubs <- RQ.empty pendingSubs <- RQ.empty @@ -492,7 +493,8 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = xftpServers, xftpClients, useNetworkConfig, - userNetworkState, + userNetworkInfo, + userNetworkDelay, subscrConns, activeSubs, pendingSubs, @@ -703,7 +705,7 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = withRetryInterval ri $ \_ loop -> do pending <- atomically getPending forM_ (L.nonEmpty pending) $ \qs -> do - waitForUserNetwork c + lift $ waitForUserNetwork c void . tryAgentError' $ reconnectSMPClient timeoutCounts c tSess qs loop getPending = RQ.getSessQueues tSess $ pendingSubs c @@ -720,18 +722,17 @@ reconnectSMPClient tc c tSess@(_, srv, _) qs = do -- this allows 3x of timeout per batch of subscription (90 queues per batch empirically) let t = (length qs `div` 90 + 1) * tcpTimeout * 3 ExceptT (sequence <$> (t `timeout` runExceptT resubscribe)) >>= \case - Just _ -> atomically $ writeTVar tc 0 - Nothing -> - (offline <$> readTVarIO (userNetworkState c)) >>= \case - -- reset and do not report consequitive timeouts while offline - Just _ -> atomically $ writeTVar tc 0 - Nothing -> do - tc' <- atomically $ stateTVar tc $ \i -> (i + 1, i + 1) - maxTC <- asks $ maxSubscriptionTimeouts . config - when (tc' >= maxTC) $ do - let msg = show tc' <> " consecutive subscription timeouts: " <> show (length qs) <> " queues, transport session: " <> show tSess - atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ INTERNAL msg) + Just _ -> resetTimeouts + -- reset and do not report consecutive timeouts while offline + Nothing -> ifM (atomically $ isNetworkOnline c) notifyTimeout resetTimeouts where + resetTimeouts = atomically $ writeTVar tc 0 + notifyTimeout = do + tc' <- atomically $ stateTVar tc $ \i -> (i + 1, i + 1) + maxTC <- asks $ maxSubscriptionTimeouts . config + when (tc' >= maxTC) $ do + let msg = show tc' <> " consecutive subscription timeouts: " <> show (length qs) <> " queues, transport session: " <> show tSess + atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ INTERNAL msg) resubscribe :: AM () resubscribe = do cs <- readTVarIO $ RQ.getConnections $ activeSubs c @@ -848,43 +849,31 @@ getClientConfig c cfgSel = do getNetworkConfig :: AgentClient -> STM NetworkConfig getNetworkConfig c = do (slowCfg, fastCfg) <- readTVar (useNetworkConfig c) - UserNetworkState {networkType} <- readTVar (userNetworkState c) + UserNetworkInfo {networkType} <- readTVar $ userNetworkInfo c pure $ case networkType of UNCellular -> slowCfg UNNone -> slowCfg _ -> fastCfg -waitForUserNetwork :: AgentClient -> AM' () -waitForUserNetwork AgentClient {userNetworkState} = - readTVarIO userNetworkState >>= mapM_ waitWhileOffline . offline - where - waitWhileOffline UNSOffline {offlineDelay = d} = - unlessM (liftIO $ waitOnline d False) $ do - -- network delay reached, increase delay - ts' <- liftIO getCurrentTime - ni <- asks $ userNetworkInterval . config - atomically $ do - ns@UserNetworkState {offline} <- readTVar userNetworkState - forM_ offline $ \UNSOffline {offlineDelay = d', offlineFrom = ts} -> - -- Using `min` to avoid multiple updates in a short period of time - -- and to reset `offlineDelay` if network went `on` and `off` again. - writeTVar userNetworkState $! - let d'' = nextRetryDelay (diffToMicroseconds $ diffUTCTime ts' ts) (min d d') ni - in ns {offline = Just UNSOffline {offlineDelay = d'', offlineFrom = ts}} - waitOnline :: Int64 -> Bool -> IO Bool - waitOnline t online' - | t <= 0 = pure online' - | otherwise = - registerDelay (fromIntegral maxWait) - >>= atomically . onlineOrDelay - >>= waitOnline (t - maxWait) - where - maxWait = min t $ fromIntegral (maxBound :: Int) - onlineOrDelay delay = do - online <- isNothing . offline <$> readTVar userNetworkState - expired <- readTVar delay - unless (online || expired) retry - pure online +waitForUserNetwork :: AgentClient -> IO () +waitForUserNetwork c = + unlessM (atomically $ isNetworkOnline c) $ + readTVarIO (userNetworkDelay c) >>= void . waitOnlineOrDelay c + +waitOnlineOrDelay :: AgentClient -> Int64 -> IO Bool +waitOnlineOrDelay c t = do + let maxWait = min t $ fromIntegral (maxBound :: Int) + t' = t - maxWait + delay <- registerDelay $ fromIntegral maxWait + online <- + atomically $ do + expired <- readTVar delay + online <- isNetworkOnline c + unless (expired || online) retry + pure online + if online || t' <= 0 + then pure online + else waitOnlineOrDelay c t' closeAgentClient :: AgentClient -> IO () closeAgentClient c = do diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index ae0066328..4aaa5f278 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -161,7 +161,7 @@ runNtfWorker c srv Worker {doWork} = do logInfo $ "runNtfWorker, nextSub " <> tshow nextSub ri <- asks $ reconnectInterval . config withRetryInterval ri $ \_ loop -> do - lift $ waitForUserNetwork c + liftIO $ waitForUserNetwork c processSub nextSub `catchAgentError` retryOnError c "NtfWorker" loop (workerInternalError c connId . show) processSub :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> AM () @@ -245,7 +245,7 @@ runNtfSMPWorker c srv Worker {doWork} = do logInfo $ "runNtfSMPWorker, nextSub " <> tshow nextSub ri <- asks $ reconnectInterval . config withRetryInterval ri $ \_ loop -> do - lift $ waitForUserNetwork c + liftIO $ waitForUserNetwork c processSub nextSub `catchAgentError` retryOnError c "NtfSMPWorker" loop (workerInternalError c connId . show) processSub :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> AM () diff --git a/src/Simplex/Messaging/Util.hs b/src/Simplex/Messaging/Util.hs index a880cfaad..b42d5b378 100644 --- a/src/Simplex/Messaging/Util.hs +++ b/src/Simplex/Messaging/Util.hs @@ -152,12 +152,14 @@ timeoutThrow :: MonadUnliftIO m => e -> Int -> ExceptT e m a -> ExceptT e m a timeoutThrow e ms action = ExceptT (sequence <$> (ms `timeout` runExceptT action)) >>= maybe (throwError e) pure threadDelay' :: Int64 -> IO () -threadDelay' time - | time <= 0 = pure () -threadDelay' time = do - let maxWait = min time $ fromIntegral (maxBound :: Int) - threadDelay $ fromIntegral maxWait - when (maxWait /= time) $ threadDelay' (time - maxWait) +threadDelay' = loop + where + loop time + | time <= 0 = pure () + | otherwise = do + let maxWait = min time $ fromIntegral (maxBound :: Int) + threadDelay $ fromIntegral maxWait + loop $ time - maxWait diffToMicroseconds :: NominalDiffTime -> Int64 diffToMicroseconds diff = fromIntegral ((truncate $ diff * 1000000) :: Integer) diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 6ef2e00aa..54a0cd6aa 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -51,7 +51,7 @@ module AgentTests.FunctionalAPITests where import AgentTests.ConnectionRequestTests (connReqData, queueAddr, testE2ERatchetParams12) -import Control.Concurrent (killThread, threadDelay) +import Control.Concurrent (forkIO, killThread, threadDelay) import Control.Monad import Control.Monad.Except import Control.Monad.Reader @@ -440,6 +440,7 @@ functionalAPITests t = do describe "user network info" $ do it "should wait for user network" testWaitForUserNetwork it "should not reset offline interval while offline" testDoNotResetOfflineInterval + it "should resume multiple threads" testResumeMultipleThreads testBasicAuth :: ATransport -> Bool -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> IO Int testBasicAuth t allowNewQueues srv@(srvAuth, srvVersion) clnt1 clnt2 = do @@ -2709,6 +2710,7 @@ testWaitForUserNetwork = do a <- getSMPAgentClient' 1 aCfg initAgentServers testDB noNetworkDelay a setUserNetworkInfo a $ UserNetworkInfo UNNone False + threadDelay 5000 networkDelay a 100000 networkDelay a 150000 networkDelay a 200000 @@ -2729,6 +2731,7 @@ testDoNotResetOfflineInterval = do a <- getSMPAgentClient' 1 aCfg initAgentServers testDB noNetworkDelay a setUserNetworkInfo a $ UserNetworkInfo UNWifi False + threadDelay 5000 networkDelay a 100000 networkDelay a 150000 setUserNetworkInfo a $ UserNetworkInfo UNCellular False @@ -2742,16 +2745,42 @@ testDoNotResetOfflineInterval = do where aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 100000, increaseAfter = 0, maxInterval = 200000}} +testResumeMultipleThreads :: IO () +testResumeMultipleThreads = do + a <- getSMPAgentClient' 1 aCfg initAgentServers testDB + noNetworkDelay a + setUserNetworkInfo a $ UserNetworkInfo UNNone False + vs <- + replicateM 50000 $ do + v <- newEmptyTMVarIO + void . forkIO $ waitNetwork a >>= atomically . putTMVar v + pure v + threadDelay 1000000 + setUserNetworkInfo a $ UserNetworkInfo UNCellular True + ts <- mapM (atomically . readTMVar) vs + print $ minimum ts + print $ maximum ts + print $ sum ts `div` fromIntegral (length ts) + let average = sum ts `div` fromIntegral (length ts) + average < 3000000 `shouldBe` True + maximum ts < 4000000 `shouldBe` True + where + aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 1000000, increaseAfter = 0, maxInterval = 3600_000_000}} + noNetworkDelay :: AgentClient -> IO () -noNetworkDelay a = (10000 >) <$> waitNetwork a `shouldReturn` True +noNetworkDelay a = do + d <- waitNetwork a + unless (d < 10000) $ expectationFailure $ "expected no delay, d = " <> show d networkDelay :: AgentClient -> Int64 -> IO () -networkDelay a d' = (\d -> d' < d && d < d' + 15000) <$> waitNetwork a `shouldReturn` True +networkDelay a d' = do + d <- waitNetwork a + unless (d' < d && d < d' + 15000) $ expectationFailure $ "expected delay " <> show d' <> ", d = " <> show d waitNetwork :: AgentClient -> IO Int64 waitNetwork a = do t <- getCurrentTime - waitForUserNetwork a `runReaderT` agentEnv a + waitForUserNetwork a t' <- getCurrentTime pure $ diffToMicroseconds $ diffUTCTime t' t