From 8fa9ed63171df4a4e7a76ee8b8e7c1c4465c16bb Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Wed, 10 Apr 2024 21:50:05 +0100 Subject: [PATCH] wait for user network availability (#1085) * ghc-options * wait for user network availability * test * update * comment * refactor * slow config * line * waitForUserNetwork in xftp and ntf workers * refactor * refactor with registerDelay --------- Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> --- src/Simplex/FileTransfer/Agent.hs | 12 +- src/Simplex/FileTransfer/Client.hs | 14 ++- src/Simplex/Messaging/Agent.hs | 29 ++++- src/Simplex/Messaging/Agent/Client.hs | 105 +++++++++++++++--- src/Simplex/Messaging/Agent/Env/SQLite.hs | 18 ++- .../Messaging/Agent/NtfSubSupervisor.hs | 6 +- src/Simplex/Messaging/Agent/RetryInterval.hs | 9 +- src/Simplex/Messaging/Client.hs | 4 +- tests/AgentTests/FunctionalAPITests.hs | 33 +++++- 9 files changed, 188 insertions(+), 42 deletions(-) diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index ed52dea75..d04117942 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -179,7 +179,8 @@ runXFTPRcvWorker c srv Worker {doWork} = do RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas" fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _} -> do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay - withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> + withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do + lift $ waitForUserNetwork c downloadFileChunk fc replica `catchAgentError` \e -> retryOnError "XFTP rcv worker" (retryLoop loop e delay') (retryDone e) e where @@ -422,7 +423,8 @@ runXFTPSndPrepareWorker c Worker {doWork} = do where tryCreate = do usedSrvs <- newTVarIO ([] :: [XFTPServer]) - withRetryInterval (riFast ri) $ \_ loop -> + withRetryInterval (riFast ri) $ \_ loop -> do + lift $ waitForUserNetwork c createWithNextSrv usedSrvs `catchAgentError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop) (throwError e) e where @@ -454,7 +456,8 @@ runXFTPSndWorker c srv Worker {doWork} = do SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas" fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay - withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> + withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do + lift $ waitForUserNetwork c uploadFileChunk cfg fc replica `catchAgentError` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e where @@ -620,7 +623,8 @@ runXFTPDelWorker c srv Worker {doWork} = do where processDeletedReplica replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} = do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay - withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> + withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do + lift $ waitForUserNetwork c deleteChunkReplica `catchAgentError` \e -> retryOnError "XFTP del worker" (retryLoop loop e delay') (retryDone e) e where diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index 370edafd8..642a3e73b 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -27,7 +27,6 @@ import qualified Data.X509 as X import qualified Data.X509.Validation as XV import qualified Network.HTTP.Types as N import qualified Network.HTTP2.Client as H -import Simplex.FileTransfer.Description (mb) import Simplex.FileTransfer.Protocol import Simplex.FileTransfer.Transport import Simplex.Messaging.Client @@ -70,7 +69,6 @@ data XFTPClient = XFTPClient data XFTPClientConfig = XFTPClientConfig { xftpNetworkConfig :: NetworkConfig, - uploadTimeoutPerMb :: Int64, serverVRange :: VersionRangeXFTP } @@ -93,7 +91,6 @@ defaultXFTPClientConfig :: XFTPClientConfig defaultXFTPClientConfig = XFTPClientConfig { xftpNetworkConfig = defaultNetworkConfig, - uploadTimeoutPerMb = 10000000, -- 10 seconds serverVRange = supportedFileServerVRange } @@ -190,8 +187,8 @@ sendXFTPCommand c@XFTPClient {thParams} pKey fId cmd chunkSpec_ = do sendXFTPTransmission :: XFTPClient -> ByteString -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body) sendXFTPTransmission XFTPClient {config, thParams, http2Client} t chunkSpec_ = do let req = H.requestStreaming N.methodPost "/" [] streamBody - reqTimeout = (\XFTPChunkSpec {chunkSize} -> chunkTimeout config chunkSize) <$> chunkSpec_ - HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- withExceptT xftpClientError . ExceptT $ sendRequest http2Client req reqTimeout + reqTimeout = xftpReqTimeout config $ (\XFTPChunkSpec {chunkSize} -> chunkSize) <$> chunkSpec_ + HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- withExceptT xftpClientError . ExceptT $ sendRequest http2Client req (Just reqTimeout) when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK -- TODO validate that the file ID is the same as in the request? (_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission thParams bodyHead @@ -251,8 +248,13 @@ downloadXFTPChunk g c@XFTPClient {config} rpKey fId chunkSpec@XFTPRcvChunkSpec { _ -> throwError $ PCEResponseError NO_FILE (r, _) -> throwError . PCEUnexpectedResponse $ bshow r +xftpReqTimeout :: XFTPClientConfig -> Maybe Word32 -> Int +xftpReqTimeout cfg@XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpTimeout}} chunkSize_ = + maybe tcpTimeout (chunkTimeout cfg) chunkSize_ + chunkTimeout :: XFTPClientConfig -> Word32 -> Int -chunkTimeout config chunkSize = fromIntegral $ (fromIntegral chunkSize * uploadTimeoutPerMb config) `div` mb 1 +chunkTimeout XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpTimeout, tcpTimeoutPerKb}} sz = + tcpTimeout + fromIntegral (min ((fromIntegral sz `div` 1024) * tcpTimeoutPerKb) (fromIntegral (maxBound :: Int))) deleteXFTPChunk :: XFTPClient -> C.APrivateAuthKey -> SenderId -> ExceptT XFTPClientError IO () deleteXFTPChunk c spKey sId = sendXFTPCommand c spKey sId FDEL Nothing >>= okResponse diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index ffb967e0c..2149e1e27 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -82,6 +82,7 @@ module Simplex.Messaging.Agent setNtfServers, setNetworkConfig, getNetworkConfig, + setUserNetworkInfo, reconnectAllServers, registerNtfToken, verifyNtfToken, @@ -402,17 +403,32 @@ testProtocolServer c userId srv = withAgentEnv' c $ case protocolTypeI @p of SPXFTP -> runXFTPServerTest c userId srv SPNTF -> runNTFServerTest c userId srv --- | set SOCKS5 proxy on/off and optionally set TCP timeout +-- | set SOCKS5 proxy on/off and optionally set TCP timeouts for fast network setNetworkConfig :: AgentClient -> NetworkConfig -> IO () -setNetworkConfig c cfg' = do - cfg <- atomically $ do - swapTVar (useNetworkConfig c) cfg' - when (cfg /= cfg') $ reconnectAllServers c +setNetworkConfig c@AgentClient {useNetworkConfig} cfg' = do + changed <- atomically $ do + (_, cfg) <- readTVar useNetworkConfig + if cfg == cfg' + then pure False + else True <$ (writeTVar useNetworkConfig $! (slowNetworkConfig cfg', cfg')) + when changed $ reconnectAllServers c +-- returns fast network config getNetworkConfig :: AgentClient -> IO NetworkConfig -getNetworkConfig = readTVarIO . useNetworkConfig +getNetworkConfig = fmap snd . readTVarIO . useNetworkConfig {-# INLINE getNetworkConfig #-} +setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO () +setUserNetworkInfo c@AgentClient {userNetworkState} UserNetworkInfo {networkType = nt'} = withAgentEnv' c $ do + d <- asks $ initialInterval . userNetworkInterval . config + ts <- liftIO getCurrentTime + atomically $ do + ns@UserNetworkState {networkType = nt} <- readTVar userNetworkState + when (nt' /= nt) $ + writeTVar userNetworkState $! case nt' of + UNNone -> ns {networkType = nt', offline = Just UNSOffline {offlineDelay = d, offlineFrom = ts}} + _ -> ns {networkType = nt', offline = Nothing} + reconnectAllServers :: AgentClient -> IO () reconnectAllServers c = do reconnectServerClients c smpClients @@ -1267,6 +1283,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq (Worker {doWork let mId = unId msgId ri' = maybe id updateRetryInterval2 msgRetryState ri withRetryLock2 ri' qLock $ \riState loop -> do + lift $ waitForUserNetwork c resp <- tryError $ case msgType of AM_CONN_INFO -> sendConfirmation c sq msgBody AM_CONN_INFO_REPLY -> sendConfirmation c sq msgBody diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index cd68cd158..3091fba87 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -81,6 +81,7 @@ module Simplex.Messaging.Agent.Client agentClientStore, agentDRG, getAgentSubscriptions, + slowNetworkConfig, Worker (..), SessionVar (..), SubscriptionsInfo (..), @@ -100,6 +101,11 @@ module Simplex.Messaging.Agent.Client agentOperations, agentOperationBracket, waitUntilActive, + UserNetworkInfo (..), + UserNetworkType (..), + UserNetworkState (..), + UNSOffline (..), + waitForUserNetwork, throwWhenInactive, throwWhenNoDelivery, beginAgentOperation, @@ -149,6 +155,7 @@ import qualified Data.ByteString.Char8 as B import Data.Composition ((.:.)) import Data.Either (lefts, partitionEithers) import Data.Functor (($>)) +import Data.Int (Int64) import Data.List (deleteFirstsBy, foldl', partition, (\\)) import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L @@ -159,7 +166,7 @@ import Data.Set (Set) import qualified Data.Set as S import Data.Text (Text) import Data.Text.Encoding -import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime) +import Data.Time (UTCTime, defaultTimeLocale, diffUTCTime, formatTime, getCurrentTime) import Data.Time.Clock.System (getSystemTime) import Data.Word (Word16) import Network.Socket (HostName) @@ -265,7 +272,8 @@ data AgentClient = AgentClient ntfClients :: TMap NtfTransportSession NtfClientVar, xftpServers :: TMap UserId (NonEmpty XFTPServerWithAuth), xftpClients :: TMap XFTPTransportSession XFTPClientVar, - useNetworkConfig :: TVar NetworkConfig, + useNetworkConfig :: TVar (NetworkConfig, NetworkConfig), -- (slow, fast) networks + userNetworkState :: TVar UserNetworkState, subscrConns :: TVar (Set ConnId), activeSubs :: TRcvQueues, pendingSubs :: TRcvQueues, @@ -396,6 +404,23 @@ data AgentStatsKey = AgentStatsKey } deriving (Eq, Ord, Show) +data UserNetworkInfo = UserNetworkInfo + { networkType :: UserNetworkType + } + deriving (Show) + +data UserNetworkType = UNNone | UNCellular | UNWifi | UNEthernet | UNOther + deriving (Eq, Show) + +data UserNetworkState = UserNetworkState + { networkType :: UserNetworkType, + offline :: Maybe UNSOffline + } + deriving (Show) + +data UNSOffline = UNSOffline {offlineDelay :: Int64, offlineFrom :: UTCTime} + deriving (Show) + -- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's. newAgentClient :: Int -> InitialAgentServers -> Env -> STM AgentClient newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do @@ -411,7 +436,8 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = ntfClients <- TM.empty xftpServers <- newTVar xftp xftpClients <- TM.empty - useNetworkConfig <- newTVar netCfg + useNetworkConfig <- newTVar (slowNetworkConfig netCfg, netCfg) + userNetworkState <- newTVar $ UserNetworkState UNOther Nothing subscrConns <- newTVar S.empty activeSubs <- RQ.empty pendingSubs <- RQ.empty @@ -446,6 +472,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = xftpServers, xftpClients, useNetworkConfig, + userNetworkState, subscrConns, activeSubs, pendingSubs, @@ -470,6 +497,13 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = agentEnv } +slowNetworkConfig :: NetworkConfig -> NetworkConfig +slowNetworkConfig cfg@NetworkConfig {tcpConnectTimeout, tcpTimeout, tcpTimeoutPerKb} = + cfg {tcpConnectTimeout = slow tcpConnectTimeout, tcpTimeout = slow tcpTimeout, tcpTimeoutPerKb = slow tcpTimeoutPerKb} + where + slow :: Integral a => a -> a + slow t = (t * 3) `div` 2 + agentClientStore :: AgentClient -> SQLiteStore agentClientStore AgentClient {agentEnv = Env {store}} = store {-# INLINE agentClientStore #-} @@ -582,6 +616,7 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers} tSess = withRetryInterval ri $ \_ loop -> do pending <- atomically getPending forM_ (L.nonEmpty pending) $ \qs -> do + waitForUserNetwork c void . tryAgentError' $ reconnectSMPClient timeoutCounts c tSess qs loop getPending = RQ.getSessQueues tSess $ pendingSubs c @@ -594,7 +629,7 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers} tSess = reconnectSMPClient :: TVar Int -> AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> AM () reconnectSMPClient tc c tSess@(_, srv, _) qs = do - NetworkConfig {tcpTimeout} <- readTVarIO $ useNetworkConfig c + NetworkConfig {tcpTimeout} <- atomically $ getNetworkConfig c -- this allows 3x of timeout per batch of subscription (90 queues per batch empirically) let t = (length qs `div` 90 + 1) * tcpTimeout * 3 ExceptT (sequence <$> (t `timeout` runExceptT resubscribe)) >>= \case @@ -647,7 +682,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv getXFTPServerClient :: AgentClient -> XFTPTransportSession -> AM XFTPClient -getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@(userId, srv, _) = do +getXFTPServerClient c@AgentClient {active, xftpClients} tSess@(userId, srv, _) = do unlessM (readTVarIO active) . throwError $ INACTIVE atomically (getTSessVar c tSess xftpClients) >>= either @@ -658,7 +693,7 @@ getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@ connectClient v = do cfg <- asks $ xftpCfg . config g <- asks random - xftpNetworkConfig <- readTVarIO useNetworkConfig + xftpNetworkConfig <- atomically $ getNetworkConfig c liftError' (protocolClientError XFTP $ B.unpack $ strEncode srv) $ X.getXFTPClient g tSess cfg {xftpNetworkConfig} $ clientDisconnected v @@ -693,7 +728,7 @@ removeTSessVar' v tSess vs = waitForProtocolClient :: ProtocolTypeI (ProtoType msg) => AgentClient -> TransportSession msg -> ClientVar msg -> AM (Client msg) waitForProtocolClient c (_, srv, _) v = do - NetworkConfig {tcpConnectTimeout} <- readTVarIO $ useNetworkConfig c + NetworkConfig {tcpConnectTimeout} <- atomically $ getNetworkConfig c client_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v) liftEither $ case client_ of Just (Right smpClient) -> Right smpClient @@ -729,11 +764,51 @@ hostEvent :: forall v err msg. (ProtocolTypeI (ProtoType msg), ProtocolServerCli hostEvent event = event (AProtocolType $ protocolTypeI @(ProtoType msg)) . clientTransportHost getClientConfig :: AgentClient -> (AgentConfig -> ProtocolClientConfig v) -> AM' (ProtocolClientConfig v) -getClientConfig AgentClient {useNetworkConfig} cfgSel = do +getClientConfig c cfgSel = do cfg <- asks $ cfgSel . config - networkConfig <- readTVarIO useNetworkConfig + networkConfig <- atomically $ getNetworkConfig c pure cfg {networkConfig} +getNetworkConfig :: AgentClient -> STM NetworkConfig +getNetworkConfig c = do + (slowCfg, fastCfg) <- readTVar (useNetworkConfig c) + UserNetworkState {networkType} <- readTVar (userNetworkState c) + pure $ case networkType of + UNCellular -> slowCfg + UNNone -> slowCfg + _ -> fastCfg + +waitForUserNetwork :: AgentClient -> AM' () +waitForUserNetwork AgentClient {userNetworkState} = + (offline <$> readTVarIO userNetworkState) >>= mapM_ waitWhileOffline + where + waitWhileOffline UNSOffline {offlineDelay = d} = + unlessM (liftIO $ waitOnline d False) $ do -- network delay reached, increase delay + ts' <- liftIO getCurrentTime + ni <- asks $ userNetworkInterval . config + atomically $ do + ns@UserNetworkState {offline} <- readTVar userNetworkState + forM_ offline $ \UNSOffline {offlineDelay = d', offlineFrom = ts} -> + -- Using `min` to avoid multiple updates in a short period of time + -- and to reset `offlineDelay` if network went `on` and `off` again. + writeTVar userNetworkState $! + let d'' = nextRetryDelay (diffToMicroseconds $ diffUTCTime ts' ts) (min d d') ni + in ns {offline = Just UNSOffline {offlineDelay = d'', offlineFrom = ts}} + waitOnline :: Int64 -> Bool -> IO Bool + waitOnline t online' + | t <= 0 = pure online' + | otherwise = + registerDelay (fromIntegral maxWait) + >>= atomically . onlineOrDelay + >>= waitOnline (t - maxWait) + where + maxWait = min t $ fromIntegral (maxBound :: Int) + onlineOrDelay delay = do + online <- isNothing . offline <$> readTVar userNetworkState + expired <- readTVar delay + unless (online || expired) retry + pure online + closeAgentClient :: AgentClient -> IO () closeAgentClient c = do atomically $ writeTVar (active c) False @@ -789,7 +864,7 @@ closeClient c clientSel tSess = closeClient_ :: ProtocolServerClient v err msg => AgentClient -> ClientVar msg -> IO () closeClient_ c v = do - NetworkConfig {tcpConnectTimeout} <- readTVarIO $ useNetworkConfig c + NetworkConfig {tcpConnectTimeout} <- atomically $ getNetworkConfig c E.handle (\BlockedIndefinitelyOnSTM -> pure ()) $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v) >>= \case Just (Right client) -> closeProtocolServerClient client `catchAll_` pure () _ -> pure () @@ -950,7 +1025,7 @@ runXFTPServerTest :: AgentClient -> UserId -> XFTPServerWithAuth -> AM' (Maybe P runXFTPServerTest c userId (ProtoServerWithAuth srv auth) = do cfg <- asks $ xftpCfg . config g <- asks random - xftpNetworkConfig <- readTVarIO $ useNetworkConfig c + xftpNetworkConfig <- atomically $ getNetworkConfig c workDir <- getXFTPWorkPath filePath <- getTempFilePath workDir rcvPath <- getTempFilePath workDir @@ -1040,7 +1115,7 @@ mkSMPTSession q = mkTSession (qUserId q) (qServer q) (qConnId q) {-# INLINE mkSMPTSession #-} getSessionMode :: AgentClient -> IO TransportSessionMode -getSessionMode = fmap sessionMode . readTVarIO . useNetworkConfig +getSessionMode = atomically . fmap sessionMode . getNetworkConfig {-# INLINE getSessionMode #-} newRcvQueue :: AgentClient -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri) @@ -1132,7 +1207,7 @@ sendTSessionBatches statCmd statBatchSize toRQ action c qs = where batchQueues :: AM' [(SMPTransportSession, NonEmpty q)] batchQueues = do - mode <- sessionMode <$> readTVarIO (useNetworkConfig c) + mode <- atomically $ sessionMode <$> getNetworkConfig c pure . M.assocs $ foldl' (batch mode) M.empty qs where batch mode m q = @@ -1775,3 +1850,7 @@ $(J.deriveJSON defaultJSON ''WorkersSummary) $(J.deriveJSON defaultJSON {J.fieldLabelModifier = takeWhile (/= '_')} ''AgentWorkersDetails) $(J.deriveJSON defaultJSON ''AgentWorkersSummary) + +$(J.deriveJSON (enumJSON $ dropPrefix "UN") ''UserNetworkType) + +$(J.deriveJSON defaultJSON ''UserNetworkInfo) diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index a1d060586..02a28ba95 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -92,6 +92,7 @@ data AgentConfig = AgentConfig xftpCfg :: XFTPClientConfig, reconnectInterval :: RetryInterval, messageRetryInterval :: RetryInterval2, + userNetworkInterval :: RetryInterval, messageTimeout :: NominalDiffTime, connDeleteDeliveryTimeout :: NominalDiffTime, helloTimeout :: NominalDiffTime, @@ -126,7 +127,7 @@ defaultReconnectInterval = RetryInterval { initialInterval = 2_000000, increaseAfter = 10_000000, - maxInterval = 180_000000 + maxInterval = 60_000000 } defaultMessageRetryInterval :: RetryInterval2 @@ -134,18 +135,26 @@ defaultMessageRetryInterval = RetryInterval2 { riFast = RetryInterval - { initialInterval = 1_000000, + { initialInterval = 2_000000, increaseAfter = 10_000000, maxInterval = 60_000000 }, riSlow = RetryInterval - { initialInterval = 180_000000, -- 3 minutes + { initialInterval = 300_000000, -- 5 minutes increaseAfter = 60_000000, - maxInterval = 3 * 3600_000000 -- 3 hours + maxInterval = 6 * 3600_000000 -- 6 hours } } +defaultUserNetworkInterval :: RetryInterval +defaultUserNetworkInterval = + RetryInterval + { initialInterval = 1200_000000, -- 20 minutes + increaseAfter = 0, + maxInterval = 7200_000000 -- 2 hours + } + defaultAgentConfig :: AgentConfig defaultAgentConfig = AgentConfig @@ -161,6 +170,7 @@ defaultAgentConfig = xftpCfg = defaultXFTPClientConfig, reconnectInterval = defaultReconnectInterval, messageRetryInterval = defaultMessageRetryInterval, + userNetworkInterval = defaultUserNetworkInterval, messageTimeout = 2 * nominalDay, connDeleteDeliveryTimeout = 2 * nominalDay, helloTimeout = 2 * nominalDay, diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index 7e47b5ba6..ae0066328 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -160,7 +160,8 @@ runNtfWorker c srv Worker {doWork} = do \nextSub@(NtfSubscription {connId}, _, _) -> do logInfo $ "runNtfWorker, nextSub " <> tshow nextSub ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \_ loop -> + withRetryInterval ri $ \_ loop -> do + lift $ waitForUserNetwork c processSub nextSub `catchAgentError` retryOnError c "NtfWorker" loop (workerInternalError c connId . show) processSub :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> AM () @@ -243,7 +244,8 @@ runNtfSMPWorker c srv Worker {doWork} = do \nextSub@(NtfSubscription {connId}, _, _) -> do logInfo $ "runNtfSMPWorker, nextSub " <> tshow nextSub ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \_ loop -> + withRetryInterval ri $ \_ loop -> do + lift $ waitForUserNetwork c processSub nextSub `catchAgentError` retryOnError c "NtfSMPWorker" loop (workerInternalError c connId . show) processSub :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> AM () diff --git a/src/Simplex/Messaging/Agent/RetryInterval.hs b/src/Simplex/Messaging/Agent/RetryInterval.hs index b75fd9a60..00fe4039e 100644 --- a/src/Simplex/Messaging/Agent/RetryInterval.hs +++ b/src/Simplex/Messaging/Agent/RetryInterval.hs @@ -11,6 +11,7 @@ module Simplex.Messaging.Agent.RetryInterval withRetryIntervalCount, withRetryLock2, updateRetryInterval2, + nextRetryDelay, ) where @@ -60,7 +61,7 @@ withRetryIntervalCount ri action = callAction 0 0 $ initialInterval ri loop = do liftIO $ threadDelay' delay let elapsed' = elapsed + delay - callAction (n + 1) elapsed' $ nextDelay elapsed' delay ri + callAction (n + 1) elapsed' $ nextRetryDelay elapsed' delay ri -- This function allows action to toggle between slow and fast retry intervals. withRetryLock2 :: forall m. MonadIO m => RetryInterval2 -> TMVar () -> (RI2State -> (RetryIntervalMode -> m ()) -> m ()) -> m () @@ -76,7 +77,7 @@ withRetryLock2 RetryInterval2 {riSlow, riFast} lock action = run (elapsed, delay) ri call = do wait delay let elapsed' = elapsed + delay - delay' = nextDelay elapsed' delay ri + delay' = nextRetryDelay elapsed' delay ri call (elapsed', delay') wait delay = do waiting <- newTVarIO True @@ -87,8 +88,8 @@ withRetryLock2 RetryInterval2 {riSlow, riFast} lock action = takeTMVar lock writeTVar waiting False -nextDelay :: Int64 -> Int64 -> RetryInterval -> Int64 -nextDelay elapsed delay RetryInterval {increaseAfter, maxInterval} = +nextRetryDelay :: Int64 -> Int64 -> RetryInterval -> Int64 +nextRetryDelay elapsed delay RetryInterval {increaseAfter, maxInterval} = if elapsed < increaseAfter || delay == maxInterval then delay else min (delay * 3 `div` 2) maxInterval diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index d02cb7bb4..e0591b14d 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -207,7 +207,7 @@ data NetworkConfig = NetworkConfig -- | timeout of protocol commands (microseconds) tcpTimeout :: Int, -- | additional timeout per kilobyte (1024 bytes) to be sent - tcpTimeoutPerKb :: Int, + tcpTimeoutPerKb :: Int64, -- | TCP keep-alive options, Nothing to skip enabling keep-alive tcpKeepAlive :: Maybe KeepAliveOpts, -- | period for SMP ping commands (microseconds, 0 to disable) @@ -230,7 +230,7 @@ defaultNetworkConfig = sessionMode = TSMUser, tcpConnectTimeout = 20_000_000, tcpTimeout = 15_000_000, - tcpTimeoutPerKb = 45_000, -- 45ms, should be less than 130ms to avoid Int overflow on 32 bit systems + tcpTimeoutPerKb = 5_000, tcpKeepAlive = Just defaultKeepAliveOpts, smpPingInterval = 600_000_000, -- 10min smpPingCount = 3, diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 28cef8417..03567b367 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -72,10 +72,11 @@ import SMPAgentClient import SMPClient (cfg, testPort, testPort2, testStoreLogFile2, withSmpServer, withSmpServerConfigOn, withSmpServerOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn, withSmpServerV7) import Simplex.Messaging.Agent hiding (createConnection, joinConnection, sendMessage) import qualified Simplex.Messaging.Agent as A -import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..)) +import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..), UserNetworkInfo (..), UserNetworkType (..), waitForUserNetwork) import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), createAgentStore) import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, REQ) import qualified Simplex.Messaging.Agent.Protocol as A +import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..)) import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), SQLiteStore (dbNew)) import Simplex.Messaging.Agent.Store.SQLite.Common (withTransaction') import Simplex.Messaging.Client (NetworkConfig (..), ProtocolClientConfig (..), TransportSessionMode (TSMEntity, TSMUser), defaultSMPClientConfig) @@ -89,6 +90,7 @@ import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server.Env.STM (ServerConfig (..)) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Transport (ATransport (..), SMPVersion, VersionSMP, authCmdsSMPVersion, basicAuthSMPVersion, batchCmdsSMPVersion, currentServerSMPRelayVersion) +import Simplex.Messaging.Util (diffToMicroseconds) import Simplex.Messaging.Version (VersionRange (..)) import qualified Simplex.Messaging.Version as V import Simplex.Messaging.Version.Internal (Version (..)) @@ -423,6 +425,8 @@ functionalAPITests t = do it "should send and receive delivery receipt" $ withSmpServer t testDeliveryReceipts it "should send delivery receipt only in connection v3+" $ testDeliveryReceiptsVersion t it "send delivery receipts concurrently with messages" $ testDeliveryReceiptsConcurrent t + describe "user network info" $ do + it "should wait for user network" testWaitForUserNetwork testBasicAuth :: ATransport -> Bool -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> IO Int testBasicAuth t allowNewQueues srv@(srvAuth, srvVersion) clnt1 clnt2 = do @@ -2639,6 +2643,33 @@ testServerMultipleIdentities = } testE2ERatchetParams12 +testWaitForUserNetwork :: HasCallStack => IO () +testWaitForUserNetwork = do + a <- getSMPAgentClient' 1 aCfg initAgentServers testDB + noNetworkDelay a + setUserNetworkInfo a $ UserNetworkInfo UNNone + networkDelay a 100000 + networkDelay a 150000 + networkDelay a 200000 + networkDelay a 200000 + setUserNetworkInfo a $ UserNetworkInfo UNCellular + noNetworkDelay a + setUserNetworkInfo a $ UserNetworkInfo UNNone + networkDelay a 100000 + concurrently_ + (threadDelay 50000 >> setUserNetworkInfo a (UserNetworkInfo UNCellular)) + (networkDelay a 50000) + noNetworkDelay a + where + aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 100000, increaseAfter = 0, maxInterval = 200000}} + noNetworkDelay a = (10000 >) <$> waitNetwork a `shouldReturn` True + networkDelay a d' = (\d -> d' < d && d < d' + 15000) <$> waitNetwork a `shouldReturn` True + waitNetwork a = do + t <- getCurrentTime + waitForUserNetwork a `runReaderT` agentEnv a + t' <- getCurrentTime + pure $ diffToMicroseconds $ diffUTCTime t' t + exchangeGreetings :: HasCallStack => AgentClient -> ConnId -> AgentClient -> ConnId -> ExceptT AgentErrorType IO () exchangeGreetings = exchangeGreetings_ PQEncOn