mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
wait for user network availability (#1085)
* ghc-options * wait for user network availability * test * update * comment * refactor * slow config * line * waitForUserNetwork in xftp and ntf workers * refactor * refactor with registerDelay --------- Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
91cf6841e0
commit
8fa9ed6317
@@ -179,7 +179,8 @@ runXFTPRcvWorker c srv Worker {doWork} = do
|
||||
RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas"
|
||||
fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _} -> do
|
||||
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
|
||||
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop ->
|
||||
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
|
||||
lift $ waitForUserNetwork c
|
||||
downloadFileChunk fc replica
|
||||
`catchAgentError` \e -> retryOnError "XFTP rcv worker" (retryLoop loop e delay') (retryDone e) e
|
||||
where
|
||||
@@ -422,7 +423,8 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
|
||||
where
|
||||
tryCreate = do
|
||||
usedSrvs <- newTVarIO ([] :: [XFTPServer])
|
||||
withRetryInterval (riFast ri) $ \_ loop ->
|
||||
withRetryInterval (riFast ri) $ \_ loop -> do
|
||||
lift $ waitForUserNetwork c
|
||||
createWithNextSrv usedSrvs
|
||||
`catchAgentError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop) (throwError e) e
|
||||
where
|
||||
@@ -454,7 +456,8 @@ runXFTPSndWorker c srv Worker {doWork} = do
|
||||
SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas"
|
||||
fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do
|
||||
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
|
||||
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop ->
|
||||
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
|
||||
lift $ waitForUserNetwork c
|
||||
uploadFileChunk cfg fc replica
|
||||
`catchAgentError` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e
|
||||
where
|
||||
@@ -620,7 +623,8 @@ runXFTPDelWorker c srv Worker {doWork} = do
|
||||
where
|
||||
processDeletedReplica replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} = do
|
||||
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
|
||||
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop ->
|
||||
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
|
||||
lift $ waitForUserNetwork c
|
||||
deleteChunkReplica
|
||||
`catchAgentError` \e -> retryOnError "XFTP del worker" (retryLoop loop e delay') (retryDone e) e
|
||||
where
|
||||
|
||||
@@ -27,7 +27,6 @@ import qualified Data.X509 as X
|
||||
import qualified Data.X509.Validation as XV
|
||||
import qualified Network.HTTP.Types as N
|
||||
import qualified Network.HTTP2.Client as H
|
||||
import Simplex.FileTransfer.Description (mb)
|
||||
import Simplex.FileTransfer.Protocol
|
||||
import Simplex.FileTransfer.Transport
|
||||
import Simplex.Messaging.Client
|
||||
@@ -70,7 +69,6 @@ data XFTPClient = XFTPClient
|
||||
|
||||
data XFTPClientConfig = XFTPClientConfig
|
||||
{ xftpNetworkConfig :: NetworkConfig,
|
||||
uploadTimeoutPerMb :: Int64,
|
||||
serverVRange :: VersionRangeXFTP
|
||||
}
|
||||
|
||||
@@ -93,7 +91,6 @@ defaultXFTPClientConfig :: XFTPClientConfig
|
||||
defaultXFTPClientConfig =
|
||||
XFTPClientConfig
|
||||
{ xftpNetworkConfig = defaultNetworkConfig,
|
||||
uploadTimeoutPerMb = 10000000, -- 10 seconds
|
||||
serverVRange = supportedFileServerVRange
|
||||
}
|
||||
|
||||
@@ -190,8 +187,8 @@ sendXFTPCommand c@XFTPClient {thParams} pKey fId cmd chunkSpec_ = do
|
||||
sendXFTPTransmission :: XFTPClient -> ByteString -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body)
|
||||
sendXFTPTransmission XFTPClient {config, thParams, http2Client} t chunkSpec_ = do
|
||||
let req = H.requestStreaming N.methodPost "/" [] streamBody
|
||||
reqTimeout = (\XFTPChunkSpec {chunkSize} -> chunkTimeout config chunkSize) <$> chunkSpec_
|
||||
HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- withExceptT xftpClientError . ExceptT $ sendRequest http2Client req reqTimeout
|
||||
reqTimeout = xftpReqTimeout config $ (\XFTPChunkSpec {chunkSize} -> chunkSize) <$> chunkSpec_
|
||||
HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- withExceptT xftpClientError . ExceptT $ sendRequest http2Client req (Just reqTimeout)
|
||||
when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK
|
||||
-- TODO validate that the file ID is the same as in the request?
|
||||
(_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission thParams bodyHead
|
||||
@@ -251,8 +248,13 @@ downloadXFTPChunk g c@XFTPClient {config} rpKey fId chunkSpec@XFTPRcvChunkSpec {
|
||||
_ -> throwError $ PCEResponseError NO_FILE
|
||||
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
xftpReqTimeout :: XFTPClientConfig -> Maybe Word32 -> Int
|
||||
xftpReqTimeout cfg@XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpTimeout}} chunkSize_ =
|
||||
maybe tcpTimeout (chunkTimeout cfg) chunkSize_
|
||||
|
||||
chunkTimeout :: XFTPClientConfig -> Word32 -> Int
|
||||
chunkTimeout config chunkSize = fromIntegral $ (fromIntegral chunkSize * uploadTimeoutPerMb config) `div` mb 1
|
||||
chunkTimeout XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpTimeout, tcpTimeoutPerKb}} sz =
|
||||
tcpTimeout + fromIntegral (min ((fromIntegral sz `div` 1024) * tcpTimeoutPerKb) (fromIntegral (maxBound :: Int)))
|
||||
|
||||
deleteXFTPChunk :: XFTPClient -> C.APrivateAuthKey -> SenderId -> ExceptT XFTPClientError IO ()
|
||||
deleteXFTPChunk c spKey sId = sendXFTPCommand c spKey sId FDEL Nothing >>= okResponse
|
||||
|
||||
@@ -82,6 +82,7 @@ module Simplex.Messaging.Agent
|
||||
setNtfServers,
|
||||
setNetworkConfig,
|
||||
getNetworkConfig,
|
||||
setUserNetworkInfo,
|
||||
reconnectAllServers,
|
||||
registerNtfToken,
|
||||
verifyNtfToken,
|
||||
@@ -402,17 +403,32 @@ testProtocolServer c userId srv = withAgentEnv' c $ case protocolTypeI @p of
|
||||
SPXFTP -> runXFTPServerTest c userId srv
|
||||
SPNTF -> runNTFServerTest c userId srv
|
||||
|
||||
-- | set SOCKS5 proxy on/off and optionally set TCP timeout
|
||||
-- | set SOCKS5 proxy on/off and optionally set TCP timeouts for fast network
|
||||
setNetworkConfig :: AgentClient -> NetworkConfig -> IO ()
|
||||
setNetworkConfig c cfg' = do
|
||||
cfg <- atomically $ do
|
||||
swapTVar (useNetworkConfig c) cfg'
|
||||
when (cfg /= cfg') $ reconnectAllServers c
|
||||
setNetworkConfig c@AgentClient {useNetworkConfig} cfg' = do
|
||||
changed <- atomically $ do
|
||||
(_, cfg) <- readTVar useNetworkConfig
|
||||
if cfg == cfg'
|
||||
then pure False
|
||||
else True <$ (writeTVar useNetworkConfig $! (slowNetworkConfig cfg', cfg'))
|
||||
when changed $ reconnectAllServers c
|
||||
|
||||
-- returns fast network config
|
||||
getNetworkConfig :: AgentClient -> IO NetworkConfig
|
||||
getNetworkConfig = readTVarIO . useNetworkConfig
|
||||
getNetworkConfig = fmap snd . readTVarIO . useNetworkConfig
|
||||
{-# INLINE getNetworkConfig #-}
|
||||
|
||||
setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO ()
|
||||
setUserNetworkInfo c@AgentClient {userNetworkState} UserNetworkInfo {networkType = nt'} = withAgentEnv' c $ do
|
||||
d <- asks $ initialInterval . userNetworkInterval . config
|
||||
ts <- liftIO getCurrentTime
|
||||
atomically $ do
|
||||
ns@UserNetworkState {networkType = nt} <- readTVar userNetworkState
|
||||
when (nt' /= nt) $
|
||||
writeTVar userNetworkState $! case nt' of
|
||||
UNNone -> ns {networkType = nt', offline = Just UNSOffline {offlineDelay = d, offlineFrom = ts}}
|
||||
_ -> ns {networkType = nt', offline = Nothing}
|
||||
|
||||
reconnectAllServers :: AgentClient -> IO ()
|
||||
reconnectAllServers c = do
|
||||
reconnectServerClients c smpClients
|
||||
@@ -1267,6 +1283,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq (Worker {doWork
|
||||
let mId = unId msgId
|
||||
ri' = maybe id updateRetryInterval2 msgRetryState ri
|
||||
withRetryLock2 ri' qLock $ \riState loop -> do
|
||||
lift $ waitForUserNetwork c
|
||||
resp <- tryError $ case msgType of
|
||||
AM_CONN_INFO -> sendConfirmation c sq msgBody
|
||||
AM_CONN_INFO_REPLY -> sendConfirmation c sq msgBody
|
||||
|
||||
@@ -81,6 +81,7 @@ module Simplex.Messaging.Agent.Client
|
||||
agentClientStore,
|
||||
agentDRG,
|
||||
getAgentSubscriptions,
|
||||
slowNetworkConfig,
|
||||
Worker (..),
|
||||
SessionVar (..),
|
||||
SubscriptionsInfo (..),
|
||||
@@ -100,6 +101,11 @@ module Simplex.Messaging.Agent.Client
|
||||
agentOperations,
|
||||
agentOperationBracket,
|
||||
waitUntilActive,
|
||||
UserNetworkInfo (..),
|
||||
UserNetworkType (..),
|
||||
UserNetworkState (..),
|
||||
UNSOffline (..),
|
||||
waitForUserNetwork,
|
||||
throwWhenInactive,
|
||||
throwWhenNoDelivery,
|
||||
beginAgentOperation,
|
||||
@@ -149,6 +155,7 @@ import qualified Data.ByteString.Char8 as B
|
||||
import Data.Composition ((.:.))
|
||||
import Data.Either (lefts, partitionEithers)
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (deleteFirstsBy, foldl', partition, (\\))
|
||||
import Data.List.NonEmpty (NonEmpty (..), (<|))
|
||||
import qualified Data.List.NonEmpty as L
|
||||
@@ -159,7 +166,7 @@ import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import Data.Text (Text)
|
||||
import Data.Text.Encoding
|
||||
import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime)
|
||||
import Data.Time (UTCTime, defaultTimeLocale, diffUTCTime, formatTime, getCurrentTime)
|
||||
import Data.Time.Clock.System (getSystemTime)
|
||||
import Data.Word (Word16)
|
||||
import Network.Socket (HostName)
|
||||
@@ -265,7 +272,8 @@ data AgentClient = AgentClient
|
||||
ntfClients :: TMap NtfTransportSession NtfClientVar,
|
||||
xftpServers :: TMap UserId (NonEmpty XFTPServerWithAuth),
|
||||
xftpClients :: TMap XFTPTransportSession XFTPClientVar,
|
||||
useNetworkConfig :: TVar NetworkConfig,
|
||||
useNetworkConfig :: TVar (NetworkConfig, NetworkConfig), -- (slow, fast) networks
|
||||
userNetworkState :: TVar UserNetworkState,
|
||||
subscrConns :: TVar (Set ConnId),
|
||||
activeSubs :: TRcvQueues,
|
||||
pendingSubs :: TRcvQueues,
|
||||
@@ -396,6 +404,23 @@ data AgentStatsKey = AgentStatsKey
|
||||
}
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
data UserNetworkInfo = UserNetworkInfo
|
||||
{ networkType :: UserNetworkType
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
data UserNetworkType = UNNone | UNCellular | UNWifi | UNEthernet | UNOther
|
||||
deriving (Eq, Show)
|
||||
|
||||
data UserNetworkState = UserNetworkState
|
||||
{ networkType :: UserNetworkType,
|
||||
offline :: Maybe UNSOffline
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
data UNSOffline = UNSOffline {offlineDelay :: Int64, offlineFrom :: UTCTime}
|
||||
deriving (Show)
|
||||
|
||||
-- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's.
|
||||
newAgentClient :: Int -> InitialAgentServers -> Env -> STM AgentClient
|
||||
newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do
|
||||
@@ -411,7 +436,8 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
|
||||
ntfClients <- TM.empty
|
||||
xftpServers <- newTVar xftp
|
||||
xftpClients <- TM.empty
|
||||
useNetworkConfig <- newTVar netCfg
|
||||
useNetworkConfig <- newTVar (slowNetworkConfig netCfg, netCfg)
|
||||
userNetworkState <- newTVar $ UserNetworkState UNOther Nothing
|
||||
subscrConns <- newTVar S.empty
|
||||
activeSubs <- RQ.empty
|
||||
pendingSubs <- RQ.empty
|
||||
@@ -446,6 +472,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
|
||||
xftpServers,
|
||||
xftpClients,
|
||||
useNetworkConfig,
|
||||
userNetworkState,
|
||||
subscrConns,
|
||||
activeSubs,
|
||||
pendingSubs,
|
||||
@@ -470,6 +497,13 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
|
||||
agentEnv
|
||||
}
|
||||
|
||||
slowNetworkConfig :: NetworkConfig -> NetworkConfig
|
||||
slowNetworkConfig cfg@NetworkConfig {tcpConnectTimeout, tcpTimeout, tcpTimeoutPerKb} =
|
||||
cfg {tcpConnectTimeout = slow tcpConnectTimeout, tcpTimeout = slow tcpTimeout, tcpTimeoutPerKb = slow tcpTimeoutPerKb}
|
||||
where
|
||||
slow :: Integral a => a -> a
|
||||
slow t = (t * 3) `div` 2
|
||||
|
||||
agentClientStore :: AgentClient -> SQLiteStore
|
||||
agentClientStore AgentClient {agentEnv = Env {store}} = store
|
||||
{-# INLINE agentClientStore #-}
|
||||
@@ -582,6 +616,7 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers} tSess =
|
||||
withRetryInterval ri $ \_ loop -> do
|
||||
pending <- atomically getPending
|
||||
forM_ (L.nonEmpty pending) $ \qs -> do
|
||||
waitForUserNetwork c
|
||||
void . tryAgentError' $ reconnectSMPClient timeoutCounts c tSess qs
|
||||
loop
|
||||
getPending = RQ.getSessQueues tSess $ pendingSubs c
|
||||
@@ -594,7 +629,7 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers} tSess =
|
||||
|
||||
reconnectSMPClient :: TVar Int -> AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> AM ()
|
||||
reconnectSMPClient tc c tSess@(_, srv, _) qs = do
|
||||
NetworkConfig {tcpTimeout} <- readTVarIO $ useNetworkConfig c
|
||||
NetworkConfig {tcpTimeout} <- atomically $ getNetworkConfig c
|
||||
-- this allows 3x of timeout per batch of subscription (90 queues per batch empirically)
|
||||
let t = (length qs `div` 90 + 1) * tcpTimeout * 3
|
||||
ExceptT (sequence <$> (t `timeout` runExceptT resubscribe)) >>= \case
|
||||
@@ -647,7 +682,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d
|
||||
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
|
||||
|
||||
getXFTPServerClient :: AgentClient -> XFTPTransportSession -> AM XFTPClient
|
||||
getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@(userId, srv, _) = do
|
||||
getXFTPServerClient c@AgentClient {active, xftpClients} tSess@(userId, srv, _) = do
|
||||
unlessM (readTVarIO active) . throwError $ INACTIVE
|
||||
atomically (getTSessVar c tSess xftpClients)
|
||||
>>= either
|
||||
@@ -658,7 +693,7 @@ getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@
|
||||
connectClient v = do
|
||||
cfg <- asks $ xftpCfg . config
|
||||
g <- asks random
|
||||
xftpNetworkConfig <- readTVarIO useNetworkConfig
|
||||
xftpNetworkConfig <- atomically $ getNetworkConfig c
|
||||
liftError' (protocolClientError XFTP $ B.unpack $ strEncode srv) $
|
||||
X.getXFTPClient g tSess cfg {xftpNetworkConfig} $
|
||||
clientDisconnected v
|
||||
@@ -693,7 +728,7 @@ removeTSessVar' v tSess vs =
|
||||
|
||||
waitForProtocolClient :: ProtocolTypeI (ProtoType msg) => AgentClient -> TransportSession msg -> ClientVar msg -> AM (Client msg)
|
||||
waitForProtocolClient c (_, srv, _) v = do
|
||||
NetworkConfig {tcpConnectTimeout} <- readTVarIO $ useNetworkConfig c
|
||||
NetworkConfig {tcpConnectTimeout} <- atomically $ getNetworkConfig c
|
||||
client_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v)
|
||||
liftEither $ case client_ of
|
||||
Just (Right smpClient) -> Right smpClient
|
||||
@@ -729,11 +764,51 @@ hostEvent :: forall v err msg. (ProtocolTypeI (ProtoType msg), ProtocolServerCli
|
||||
hostEvent event = event (AProtocolType $ protocolTypeI @(ProtoType msg)) . clientTransportHost
|
||||
|
||||
getClientConfig :: AgentClient -> (AgentConfig -> ProtocolClientConfig v) -> AM' (ProtocolClientConfig v)
|
||||
getClientConfig AgentClient {useNetworkConfig} cfgSel = do
|
||||
getClientConfig c cfgSel = do
|
||||
cfg <- asks $ cfgSel . config
|
||||
networkConfig <- readTVarIO useNetworkConfig
|
||||
networkConfig <- atomically $ getNetworkConfig c
|
||||
pure cfg {networkConfig}
|
||||
|
||||
getNetworkConfig :: AgentClient -> STM NetworkConfig
|
||||
getNetworkConfig c = do
|
||||
(slowCfg, fastCfg) <- readTVar (useNetworkConfig c)
|
||||
UserNetworkState {networkType} <- readTVar (userNetworkState c)
|
||||
pure $ case networkType of
|
||||
UNCellular -> slowCfg
|
||||
UNNone -> slowCfg
|
||||
_ -> fastCfg
|
||||
|
||||
waitForUserNetwork :: AgentClient -> AM' ()
|
||||
waitForUserNetwork AgentClient {userNetworkState} =
|
||||
(offline <$> readTVarIO userNetworkState) >>= mapM_ waitWhileOffline
|
||||
where
|
||||
waitWhileOffline UNSOffline {offlineDelay = d} =
|
||||
unlessM (liftIO $ waitOnline d False) $ do -- network delay reached, increase delay
|
||||
ts' <- liftIO getCurrentTime
|
||||
ni <- asks $ userNetworkInterval . config
|
||||
atomically $ do
|
||||
ns@UserNetworkState {offline} <- readTVar userNetworkState
|
||||
forM_ offline $ \UNSOffline {offlineDelay = d', offlineFrom = ts} ->
|
||||
-- Using `min` to avoid multiple updates in a short period of time
|
||||
-- and to reset `offlineDelay` if network went `on` and `off` again.
|
||||
writeTVar userNetworkState $!
|
||||
let d'' = nextRetryDelay (diffToMicroseconds $ diffUTCTime ts' ts) (min d d') ni
|
||||
in ns {offline = Just UNSOffline {offlineDelay = d'', offlineFrom = ts}}
|
||||
waitOnline :: Int64 -> Bool -> IO Bool
|
||||
waitOnline t online'
|
||||
| t <= 0 = pure online'
|
||||
| otherwise =
|
||||
registerDelay (fromIntegral maxWait)
|
||||
>>= atomically . onlineOrDelay
|
||||
>>= waitOnline (t - maxWait)
|
||||
where
|
||||
maxWait = min t $ fromIntegral (maxBound :: Int)
|
||||
onlineOrDelay delay = do
|
||||
online <- isNothing . offline <$> readTVar userNetworkState
|
||||
expired <- readTVar delay
|
||||
unless (online || expired) retry
|
||||
pure online
|
||||
|
||||
closeAgentClient :: AgentClient -> IO ()
|
||||
closeAgentClient c = do
|
||||
atomically $ writeTVar (active c) False
|
||||
@@ -789,7 +864,7 @@ closeClient c clientSel tSess =
|
||||
|
||||
closeClient_ :: ProtocolServerClient v err msg => AgentClient -> ClientVar msg -> IO ()
|
||||
closeClient_ c v = do
|
||||
NetworkConfig {tcpConnectTimeout} <- readTVarIO $ useNetworkConfig c
|
||||
NetworkConfig {tcpConnectTimeout} <- atomically $ getNetworkConfig c
|
||||
E.handle (\BlockedIndefinitelyOnSTM -> pure ()) $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v) >>= \case
|
||||
Just (Right client) -> closeProtocolServerClient client `catchAll_` pure ()
|
||||
_ -> pure ()
|
||||
@@ -950,7 +1025,7 @@ runXFTPServerTest :: AgentClient -> UserId -> XFTPServerWithAuth -> AM' (Maybe P
|
||||
runXFTPServerTest c userId (ProtoServerWithAuth srv auth) = do
|
||||
cfg <- asks $ xftpCfg . config
|
||||
g <- asks random
|
||||
xftpNetworkConfig <- readTVarIO $ useNetworkConfig c
|
||||
xftpNetworkConfig <- atomically $ getNetworkConfig c
|
||||
workDir <- getXFTPWorkPath
|
||||
filePath <- getTempFilePath workDir
|
||||
rcvPath <- getTempFilePath workDir
|
||||
@@ -1040,7 +1115,7 @@ mkSMPTSession q = mkTSession (qUserId q) (qServer q) (qConnId q)
|
||||
{-# INLINE mkSMPTSession #-}
|
||||
|
||||
getSessionMode :: AgentClient -> IO TransportSessionMode
|
||||
getSessionMode = fmap sessionMode . readTVarIO . useNetworkConfig
|
||||
getSessionMode = atomically . fmap sessionMode . getNetworkConfig
|
||||
{-# INLINE getSessionMode #-}
|
||||
|
||||
newRcvQueue :: AgentClient -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri)
|
||||
@@ -1132,7 +1207,7 @@ sendTSessionBatches statCmd statBatchSize toRQ action c qs =
|
||||
where
|
||||
batchQueues :: AM' [(SMPTransportSession, NonEmpty q)]
|
||||
batchQueues = do
|
||||
mode <- sessionMode <$> readTVarIO (useNetworkConfig c)
|
||||
mode <- atomically $ sessionMode <$> getNetworkConfig c
|
||||
pure . M.assocs $ foldl' (batch mode) M.empty qs
|
||||
where
|
||||
batch mode m q =
|
||||
@@ -1775,3 +1850,7 @@ $(J.deriveJSON defaultJSON ''WorkersSummary)
|
||||
$(J.deriveJSON defaultJSON {J.fieldLabelModifier = takeWhile (/= '_')} ''AgentWorkersDetails)
|
||||
|
||||
$(J.deriveJSON defaultJSON ''AgentWorkersSummary)
|
||||
|
||||
$(J.deriveJSON (enumJSON $ dropPrefix "UN") ''UserNetworkType)
|
||||
|
||||
$(J.deriveJSON defaultJSON ''UserNetworkInfo)
|
||||
|
||||
@@ -92,6 +92,7 @@ data AgentConfig = AgentConfig
|
||||
xftpCfg :: XFTPClientConfig,
|
||||
reconnectInterval :: RetryInterval,
|
||||
messageRetryInterval :: RetryInterval2,
|
||||
userNetworkInterval :: RetryInterval,
|
||||
messageTimeout :: NominalDiffTime,
|
||||
connDeleteDeliveryTimeout :: NominalDiffTime,
|
||||
helloTimeout :: NominalDiffTime,
|
||||
@@ -126,7 +127,7 @@ defaultReconnectInterval =
|
||||
RetryInterval
|
||||
{ initialInterval = 2_000000,
|
||||
increaseAfter = 10_000000,
|
||||
maxInterval = 180_000000
|
||||
maxInterval = 60_000000
|
||||
}
|
||||
|
||||
defaultMessageRetryInterval :: RetryInterval2
|
||||
@@ -134,18 +135,26 @@ defaultMessageRetryInterval =
|
||||
RetryInterval2
|
||||
{ riFast =
|
||||
RetryInterval
|
||||
{ initialInterval = 1_000000,
|
||||
{ initialInterval = 2_000000,
|
||||
increaseAfter = 10_000000,
|
||||
maxInterval = 60_000000
|
||||
},
|
||||
riSlow =
|
||||
RetryInterval
|
||||
{ initialInterval = 180_000000, -- 3 minutes
|
||||
{ initialInterval = 300_000000, -- 5 minutes
|
||||
increaseAfter = 60_000000,
|
||||
maxInterval = 3 * 3600_000000 -- 3 hours
|
||||
maxInterval = 6 * 3600_000000 -- 6 hours
|
||||
}
|
||||
}
|
||||
|
||||
defaultUserNetworkInterval :: RetryInterval
|
||||
defaultUserNetworkInterval =
|
||||
RetryInterval
|
||||
{ initialInterval = 1200_000000, -- 20 minutes
|
||||
increaseAfter = 0,
|
||||
maxInterval = 7200_000000 -- 2 hours
|
||||
}
|
||||
|
||||
defaultAgentConfig :: AgentConfig
|
||||
defaultAgentConfig =
|
||||
AgentConfig
|
||||
@@ -161,6 +170,7 @@ defaultAgentConfig =
|
||||
xftpCfg = defaultXFTPClientConfig,
|
||||
reconnectInterval = defaultReconnectInterval,
|
||||
messageRetryInterval = defaultMessageRetryInterval,
|
||||
userNetworkInterval = defaultUserNetworkInterval,
|
||||
messageTimeout = 2 * nominalDay,
|
||||
connDeleteDeliveryTimeout = 2 * nominalDay,
|
||||
helloTimeout = 2 * nominalDay,
|
||||
|
||||
@@ -160,7 +160,8 @@ runNtfWorker c srv Worker {doWork} = do
|
||||
\nextSub@(NtfSubscription {connId}, _, _) -> do
|
||||
logInfo $ "runNtfWorker, nextSub " <> tshow nextSub
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \_ loop ->
|
||||
withRetryInterval ri $ \_ loop -> do
|
||||
lift $ waitForUserNetwork c
|
||||
processSub nextSub
|
||||
`catchAgentError` retryOnError c "NtfWorker" loop (workerInternalError c connId . show)
|
||||
processSub :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> AM ()
|
||||
@@ -243,7 +244,8 @@ runNtfSMPWorker c srv Worker {doWork} = do
|
||||
\nextSub@(NtfSubscription {connId}, _, _) -> do
|
||||
logInfo $ "runNtfSMPWorker, nextSub " <> tshow nextSub
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \_ loop ->
|
||||
withRetryInterval ri $ \_ loop -> do
|
||||
lift $ waitForUserNetwork c
|
||||
processSub nextSub
|
||||
`catchAgentError` retryOnError c "NtfSMPWorker" loop (workerInternalError c connId . show)
|
||||
processSub :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> AM ()
|
||||
|
||||
@@ -11,6 +11,7 @@ module Simplex.Messaging.Agent.RetryInterval
|
||||
withRetryIntervalCount,
|
||||
withRetryLock2,
|
||||
updateRetryInterval2,
|
||||
nextRetryDelay,
|
||||
)
|
||||
where
|
||||
|
||||
@@ -60,7 +61,7 @@ withRetryIntervalCount ri action = callAction 0 0 $ initialInterval ri
|
||||
loop = do
|
||||
liftIO $ threadDelay' delay
|
||||
let elapsed' = elapsed + delay
|
||||
callAction (n + 1) elapsed' $ nextDelay elapsed' delay ri
|
||||
callAction (n + 1) elapsed' $ nextRetryDelay elapsed' delay ri
|
||||
|
||||
-- This function allows action to toggle between slow and fast retry intervals.
|
||||
withRetryLock2 :: forall m. MonadIO m => RetryInterval2 -> TMVar () -> (RI2State -> (RetryIntervalMode -> m ()) -> m ()) -> m ()
|
||||
@@ -76,7 +77,7 @@ withRetryLock2 RetryInterval2 {riSlow, riFast} lock action =
|
||||
run (elapsed, delay) ri call = do
|
||||
wait delay
|
||||
let elapsed' = elapsed + delay
|
||||
delay' = nextDelay elapsed' delay ri
|
||||
delay' = nextRetryDelay elapsed' delay ri
|
||||
call (elapsed', delay')
|
||||
wait delay = do
|
||||
waiting <- newTVarIO True
|
||||
@@ -87,8 +88,8 @@ withRetryLock2 RetryInterval2 {riSlow, riFast} lock action =
|
||||
takeTMVar lock
|
||||
writeTVar waiting False
|
||||
|
||||
nextDelay :: Int64 -> Int64 -> RetryInterval -> Int64
|
||||
nextDelay elapsed delay RetryInterval {increaseAfter, maxInterval} =
|
||||
nextRetryDelay :: Int64 -> Int64 -> RetryInterval -> Int64
|
||||
nextRetryDelay elapsed delay RetryInterval {increaseAfter, maxInterval} =
|
||||
if elapsed < increaseAfter || delay == maxInterval
|
||||
then delay
|
||||
else min (delay * 3 `div` 2) maxInterval
|
||||
|
||||
@@ -207,7 +207,7 @@ data NetworkConfig = NetworkConfig
|
||||
-- | timeout of protocol commands (microseconds)
|
||||
tcpTimeout :: Int,
|
||||
-- | additional timeout per kilobyte (1024 bytes) to be sent
|
||||
tcpTimeoutPerKb :: Int,
|
||||
tcpTimeoutPerKb :: Int64,
|
||||
-- | TCP keep-alive options, Nothing to skip enabling keep-alive
|
||||
tcpKeepAlive :: Maybe KeepAliveOpts,
|
||||
-- | period for SMP ping commands (microseconds, 0 to disable)
|
||||
@@ -230,7 +230,7 @@ defaultNetworkConfig =
|
||||
sessionMode = TSMUser,
|
||||
tcpConnectTimeout = 20_000_000,
|
||||
tcpTimeout = 15_000_000,
|
||||
tcpTimeoutPerKb = 45_000, -- 45ms, should be less than 130ms to avoid Int overflow on 32 bit systems
|
||||
tcpTimeoutPerKb = 5_000,
|
||||
tcpKeepAlive = Just defaultKeepAliveOpts,
|
||||
smpPingInterval = 600_000_000, -- 10min
|
||||
smpPingCount = 3,
|
||||
|
||||
@@ -72,10 +72,11 @@ import SMPAgentClient
|
||||
import SMPClient (cfg, testPort, testPort2, testStoreLogFile2, withSmpServer, withSmpServerConfigOn, withSmpServerOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn, withSmpServerV7)
|
||||
import Simplex.Messaging.Agent hiding (createConnection, joinConnection, sendMessage)
|
||||
import qualified Simplex.Messaging.Agent as A
|
||||
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..))
|
||||
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..), UserNetworkInfo (..), UserNetworkType (..), waitForUserNetwork)
|
||||
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), createAgentStore)
|
||||
import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, REQ)
|
||||
import qualified Simplex.Messaging.Agent.Protocol as A
|
||||
import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..))
|
||||
import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), SQLiteStore (dbNew))
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Common (withTransaction')
|
||||
import Simplex.Messaging.Client (NetworkConfig (..), ProtocolClientConfig (..), TransportSessionMode (TSMEntity, TSMUser), defaultSMPClientConfig)
|
||||
@@ -89,6 +90,7 @@ import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Server.Env.STM (ServerConfig (..))
|
||||
import Simplex.Messaging.Server.Expiration
|
||||
import Simplex.Messaging.Transport (ATransport (..), SMPVersion, VersionSMP, authCmdsSMPVersion, basicAuthSMPVersion, batchCmdsSMPVersion, currentServerSMPRelayVersion)
|
||||
import Simplex.Messaging.Util (diffToMicroseconds)
|
||||
import Simplex.Messaging.Version (VersionRange (..))
|
||||
import qualified Simplex.Messaging.Version as V
|
||||
import Simplex.Messaging.Version.Internal (Version (..))
|
||||
@@ -423,6 +425,8 @@ functionalAPITests t = do
|
||||
it "should send and receive delivery receipt" $ withSmpServer t testDeliveryReceipts
|
||||
it "should send delivery receipt only in connection v3+" $ testDeliveryReceiptsVersion t
|
||||
it "send delivery receipts concurrently with messages" $ testDeliveryReceiptsConcurrent t
|
||||
describe "user network info" $ do
|
||||
it "should wait for user network" testWaitForUserNetwork
|
||||
|
||||
testBasicAuth :: ATransport -> Bool -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> IO Int
|
||||
testBasicAuth t allowNewQueues srv@(srvAuth, srvVersion) clnt1 clnt2 = do
|
||||
@@ -2639,6 +2643,33 @@ testServerMultipleIdentities =
|
||||
}
|
||||
testE2ERatchetParams12
|
||||
|
||||
testWaitForUserNetwork :: HasCallStack => IO ()
|
||||
testWaitForUserNetwork = do
|
||||
a <- getSMPAgentClient' 1 aCfg initAgentServers testDB
|
||||
noNetworkDelay a
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNNone
|
||||
networkDelay a 100000
|
||||
networkDelay a 150000
|
||||
networkDelay a 200000
|
||||
networkDelay a 200000
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNCellular
|
||||
noNetworkDelay a
|
||||
setUserNetworkInfo a $ UserNetworkInfo UNNone
|
||||
networkDelay a 100000
|
||||
concurrently_
|
||||
(threadDelay 50000 >> setUserNetworkInfo a (UserNetworkInfo UNCellular))
|
||||
(networkDelay a 50000)
|
||||
noNetworkDelay a
|
||||
where
|
||||
aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 100000, increaseAfter = 0, maxInterval = 200000}}
|
||||
noNetworkDelay a = (10000 >) <$> waitNetwork a `shouldReturn` True
|
||||
networkDelay a d' = (\d -> d' < d && d < d' + 15000) <$> waitNetwork a `shouldReturn` True
|
||||
waitNetwork a = do
|
||||
t <- getCurrentTime
|
||||
waitForUserNetwork a `runReaderT` agentEnv a
|
||||
t' <- getCurrentTime
|
||||
pure $ diffToMicroseconds $ diffUTCTime t' t
|
||||
|
||||
exchangeGreetings :: HasCallStack => AgentClient -> ConnId -> AgentClient -> ConnId -> ExceptT AgentErrorType IO ()
|
||||
exchangeGreetings = exchangeGreetings_ PQEncOn
|
||||
|
||||
|
||||
Reference in New Issue
Block a user