Merge branch 'stable'

This commit is contained in:
Evgeny Poberezkin
2024-05-17 15:38:06 +01:00
6 changed files with 116 additions and 89 deletions
+4 -4
View File
@@ -180,7 +180,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _} -> do
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
lift $ waitForUserNetwork c
liftIO $ waitForUserNetwork c
downloadFileChunk fc replica
`catchAgentError` \e -> retryOnError "XFTP rcv worker" (retryLoop loop e delay') (retryDone e) e
where
@@ -425,7 +425,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
tryCreate = do
usedSrvs <- newTVarIO ([] :: [XFTPServer])
withRetryInterval (riFast ri) $ \_ loop -> do
lift $ waitForUserNetwork c
liftIO $ waitForUserNetwork c
createWithNextSrv usedSrvs
`catchAgentError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop) (throwError e) e
where
@@ -458,7 +458,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
lift $ waitForUserNetwork c
liftIO $ waitForUserNetwork c
uploadFileChunk cfg fc replica
`catchAgentError` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e
where
@@ -625,7 +625,7 @@ runXFTPDelWorker c srv Worker {doWork} = do
processDeletedReplica replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} = do
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
lift $ waitForUserNetwork c
liftIO $ waitForUserNetwork c
deleteChunkReplica
`catchAgentError` \e -> retryOnError "XFTP del worker" (retryLoop loop e delay') (retryDone e) e
where
+20 -13
View File
@@ -430,22 +430,29 @@ getNetworkConfig = fmap snd . readTVarIO . useNetworkConfig
{-# INLINE getNetworkConfig #-}
setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO ()
setUserNetworkInfo c@AgentClient {userNetworkState} UserNetworkInfo {networkType = nt', online} = withAgentEnv' c $ do
d <- asks $ initialInterval . userNetworkInterval . config
ts <- liftIO getCurrentTime
atomically $ do
ns@UserNetworkState {networkType = nt, offline} <- readTVar userNetworkState
when (nt' /= nt || online /= isNothing offline) $
writeTVar userNetworkState $!
let offline'
| nt' /= UNNone && online = Nothing
| isJust offline = offline
| otherwise = Just UNSOffline {offlineDelay = d, offlineFrom = ts}
in ns {networkType = nt', offline = offline'}
setUserNetworkInfo c@AgentClient {userNetworkInfo, userNetworkDelay} netInfo = withAgentEnv' c $ do
ni <- asks $ userNetworkInterval . config
let d = initialInterval ni
off <- atomically $ do
wasOnline <- isOnline <$> swapTVar userNetworkInfo netInfo
let off = wasOnline && not (isOnline netInfo)
when off $ writeTVar userNetworkDelay d
pure off
liftIO . when off . void . forkIO $
growOfflineDelay 0 d ni
where
growOfflineDelay elapsed d ni = do
online <- waitOnlineOrDelay c d
unless online $ do
let elapsed' = elapsed + d
d' = nextRetryDelay elapsed' d ni
atomically $ writeTVar userNetworkDelay d'
growOfflineDelay elapsed' d' ni
reconnectAllServers :: AgentClient -> IO ()
reconnectAllServers c = do
reconnectServerClients c smpClients
reconnectServerClients c xftpClients
reconnectServerClients c ntfClients
-- | Register device notifications token
@@ -1317,7 +1324,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq (Worker {doWork
let mId = unId msgId
ri' = maybe id updateRetryInterval2 msgRetryState ri
withRetryLock2 ri' qLock $ \riState loop -> do
lift $ waitForUserNetwork c
liftIO $ waitForUserNetwork c
resp <- tryError $ case msgType of
AM_CONN_INFO -> sendConfirmation c sq msgBody
AM_CONN_INFO_REPLY -> sendConfirmation c sq msgBody
+49 -60
View File
@@ -108,9 +108,10 @@ module Simplex.Messaging.Agent.Client
waitUntilActive,
UserNetworkInfo (..),
UserNetworkType (..),
UserNetworkState (..),
UNSOffline (..),
waitForUserNetwork,
waitOnlineOrDelay,
isNetworkOnline,
isOnline,
throwWhenInactive,
throwWhenNoDelivery,
beginAgentOperation,
@@ -173,7 +174,7 @@ import Data.Set (Set)
import qualified Data.Set as S
import Data.Text (Text)
import Data.Text.Encoding
import Data.Time (UTCTime, defaultTimeLocale, diffUTCTime, formatTime, getCurrentTime)
import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Data.Word (Word16)
import qualified Database.SQLite.Simple as SQL
@@ -281,7 +282,8 @@ data AgentClient = AgentClient
xftpServers :: TMap UserId (NonEmpty XFTPServerWithAuth),
xftpClients :: TMap XFTPTransportSession XFTPClientVar,
useNetworkConfig :: TVar (NetworkConfig, NetworkConfig), -- (slow, fast) networks
userNetworkState :: TVar UserNetworkState,
userNetworkInfo :: TVar UserNetworkInfo,
userNetworkDelay :: TVar Int64,
subscrConns :: TVar (Set ConnId),
activeSubs :: TRcvQueues,
pendingSubs :: TRcvQueues,
@@ -426,22 +428,20 @@ data UserNetworkInfo = UserNetworkInfo
}
deriving (Show)
isNetworkOnline :: AgentClient -> STM Bool
isNetworkOnline c = isOnline <$> readTVar (userNetworkInfo c)
isOnline :: UserNetworkInfo -> Bool
isOnline UserNetworkInfo {networkType, online} = networkType /= UNNone && online
data UserNetworkType = UNNone | UNCellular | UNWifi | UNEthernet | UNOther
deriving (Eq, Show)
data UserNetworkState = UserNetworkState
{ networkType :: UserNetworkType,
offline :: Maybe UNSOffline
}
deriving (Show)
data UNSOffline = UNSOffline {offlineDelay :: Int64, offlineFrom :: UTCTime}
deriving (Show)
-- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's.
newAgentClient :: Int -> InitialAgentServers -> Env -> STM AgentClient
newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do
let qSize = tbqSize $ config agentEnv
let cfg = config agentEnv
qSize = tbqSize cfg
acThread <- newTVar Nothing
active <- newTVar True
rcvQ <- newTBQueue qSize
@@ -455,7 +455,8 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
xftpServers <- newTVar xftp
xftpClients <- TM.empty
useNetworkConfig <- newTVar (slowNetworkConfig netCfg, netCfg)
userNetworkState <- newTVar $ UserNetworkState UNOther Nothing
userNetworkInfo <- newTVar $ UserNetworkInfo UNOther True
userNetworkDelay <- newTVar $ initialInterval $ userNetworkInterval cfg
subscrConns <- newTVar S.empty
activeSubs <- RQ.empty
pendingSubs <- RQ.empty
@@ -492,7 +493,8 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
xftpServers,
xftpClients,
useNetworkConfig,
userNetworkState,
userNetworkInfo,
userNetworkDelay,
subscrConns,
activeSubs,
pendingSubs,
@@ -703,7 +705,7 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess =
withRetryInterval ri $ \_ loop -> do
pending <- atomically getPending
forM_ (L.nonEmpty pending) $ \qs -> do
waitForUserNetwork c
lift $ waitForUserNetwork c
void . tryAgentError' $ reconnectSMPClient timeoutCounts c tSess qs
loop
getPending = RQ.getSessQueues tSess $ pendingSubs c
@@ -720,18 +722,17 @@ reconnectSMPClient tc c tSess@(_, srv, _) qs = do
-- this allows 3x of timeout per batch of subscription (90 queues per batch empirically)
let t = (length qs `div` 90 + 1) * tcpTimeout * 3
ExceptT (sequence <$> (t `timeout` runExceptT resubscribe)) >>= \case
Just _ -> atomically $ writeTVar tc 0
Nothing ->
(offline <$> readTVarIO (userNetworkState c)) >>= \case
-- reset and do not report consequitive timeouts while offline
Just _ -> atomically $ writeTVar tc 0
Nothing -> do
tc' <- atomically $ stateTVar tc $ \i -> (i + 1, i + 1)
maxTC <- asks $ maxSubscriptionTimeouts . config
when (tc' >= maxTC) $ do
let msg = show tc' <> " consecutive subscription timeouts: " <> show (length qs) <> " queues, transport session: " <> show tSess
atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ INTERNAL msg)
Just _ -> resetTimeouts
-- reset and do not report consecutive timeouts while offline
Nothing -> ifM (atomically $ isNetworkOnline c) notifyTimeout resetTimeouts
where
resetTimeouts = atomically $ writeTVar tc 0
notifyTimeout = do
tc' <- atomically $ stateTVar tc $ \i -> (i + 1, i + 1)
maxTC <- asks $ maxSubscriptionTimeouts . config
when (tc' >= maxTC) $ do
let msg = show tc' <> " consecutive subscription timeouts: " <> show (length qs) <> " queues, transport session: " <> show tSess
atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ INTERNAL msg)
resubscribe :: AM ()
resubscribe = do
cs <- readTVarIO $ RQ.getConnections $ activeSubs c
@@ -848,43 +849,31 @@ getClientConfig c cfgSel = do
getNetworkConfig :: AgentClient -> STM NetworkConfig
getNetworkConfig c = do
(slowCfg, fastCfg) <- readTVar (useNetworkConfig c)
UserNetworkState {networkType} <- readTVar (userNetworkState c)
UserNetworkInfo {networkType} <- readTVar $ userNetworkInfo c
pure $ case networkType of
UNCellular -> slowCfg
UNNone -> slowCfg
_ -> fastCfg
waitForUserNetwork :: AgentClient -> AM' ()
waitForUserNetwork AgentClient {userNetworkState} =
readTVarIO userNetworkState >>= mapM_ waitWhileOffline . offline
where
waitWhileOffline UNSOffline {offlineDelay = d} =
unlessM (liftIO $ waitOnline d False) $ do
-- network delay reached, increase delay
ts' <- liftIO getCurrentTime
ni <- asks $ userNetworkInterval . config
atomically $ do
ns@UserNetworkState {offline} <- readTVar userNetworkState
forM_ offline $ \UNSOffline {offlineDelay = d', offlineFrom = ts} ->
-- Using `min` to avoid multiple updates in a short period of time
-- and to reset `offlineDelay` if network went `on` and `off` again.
writeTVar userNetworkState $!
let d'' = nextRetryDelay (diffToMicroseconds $ diffUTCTime ts' ts) (min d d') ni
in ns {offline = Just UNSOffline {offlineDelay = d'', offlineFrom = ts}}
waitOnline :: Int64 -> Bool -> IO Bool
waitOnline t online'
| t <= 0 = pure online'
| otherwise =
registerDelay (fromIntegral maxWait)
>>= atomically . onlineOrDelay
>>= waitOnline (t - maxWait)
where
maxWait = min t $ fromIntegral (maxBound :: Int)
onlineOrDelay delay = do
online <- isNothing . offline <$> readTVar userNetworkState
expired <- readTVar delay
unless (online || expired) retry
pure online
waitForUserNetwork :: AgentClient -> IO ()
waitForUserNetwork c =
unlessM (atomically $ isNetworkOnline c) $
readTVarIO (userNetworkDelay c) >>= void . waitOnlineOrDelay c
waitOnlineOrDelay :: AgentClient -> Int64 -> IO Bool
waitOnlineOrDelay c t = do
let maxWait = min t $ fromIntegral (maxBound :: Int)
t' = t - maxWait
delay <- registerDelay $ fromIntegral maxWait
online <-
atomically $ do
expired <- readTVar delay
online <- isNetworkOnline c
unless (expired || online) retry
pure online
if online || t' <= 0
then pure online
else waitOnlineOrDelay c t'
closeAgentClient :: AgentClient -> IO ()
closeAgentClient c = do
@@ -161,7 +161,7 @@ runNtfWorker c srv Worker {doWork} = do
logInfo $ "runNtfWorker, nextSub " <> tshow nextSub
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \_ loop -> do
lift $ waitForUserNetwork c
liftIO $ waitForUserNetwork c
processSub nextSub
`catchAgentError` retryOnError c "NtfWorker" loop (workerInternalError c connId . show)
processSub :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> AM ()
@@ -245,7 +245,7 @@ runNtfSMPWorker c srv Worker {doWork} = do
logInfo $ "runNtfSMPWorker, nextSub " <> tshow nextSub
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \_ loop -> do
lift $ waitForUserNetwork c
liftIO $ waitForUserNetwork c
processSub nextSub
`catchAgentError` retryOnError c "NtfSMPWorker" loop (workerInternalError c connId . show)
processSub :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> AM ()
+8 -6
View File
@@ -152,12 +152,14 @@ timeoutThrow :: MonadUnliftIO m => e -> Int -> ExceptT e m a -> ExceptT e m a
timeoutThrow e ms action = ExceptT (sequence <$> (ms `timeout` runExceptT action)) >>= maybe (throwError e) pure
threadDelay' :: Int64 -> IO ()
threadDelay' time
| time <= 0 = pure ()
threadDelay' time = do
let maxWait = min time $ fromIntegral (maxBound :: Int)
threadDelay $ fromIntegral maxWait
when (maxWait /= time) $ threadDelay' (time - maxWait)
threadDelay' = loop
where
loop time
| time <= 0 = pure ()
| otherwise = do
let maxWait = min time $ fromIntegral (maxBound :: Int)
threadDelay $ fromIntegral maxWait
loop $ time - maxWait
diffToMicroseconds :: NominalDiffTime -> Int64
diffToMicroseconds diff = fromIntegral ((truncate $ diff * 1000000) :: Integer)
+33 -4
View File
@@ -51,7 +51,7 @@ module AgentTests.FunctionalAPITests
where
import AgentTests.ConnectionRequestTests (connReqData, queueAddr, testE2ERatchetParams12)
import Control.Concurrent (killThread, threadDelay)
import Control.Concurrent (forkIO, killThread, threadDelay)
import Control.Monad
import Control.Monad.Except
import Control.Monad.Reader
@@ -440,6 +440,7 @@ functionalAPITests t = do
describe "user network info" $ do
it "should wait for user network" testWaitForUserNetwork
it "should not reset offline interval while offline" testDoNotResetOfflineInterval
it "should resume multiple threads" testResumeMultipleThreads
testBasicAuth :: ATransport -> Bool -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> IO Int
testBasicAuth t allowNewQueues srv@(srvAuth, srvVersion) clnt1 clnt2 = do
@@ -2709,6 +2710,7 @@ testWaitForUserNetwork = do
a <- getSMPAgentClient' 1 aCfg initAgentServers testDB
noNetworkDelay a
setUserNetworkInfo a $ UserNetworkInfo UNNone False
threadDelay 5000
networkDelay a 100000
networkDelay a 150000
networkDelay a 200000
@@ -2729,6 +2731,7 @@ testDoNotResetOfflineInterval = do
a <- getSMPAgentClient' 1 aCfg initAgentServers testDB
noNetworkDelay a
setUserNetworkInfo a $ UserNetworkInfo UNWifi False
threadDelay 5000
networkDelay a 100000
networkDelay a 150000
setUserNetworkInfo a $ UserNetworkInfo UNCellular False
@@ -2742,16 +2745,42 @@ testDoNotResetOfflineInterval = do
where
aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 100000, increaseAfter = 0, maxInterval = 200000}}
testResumeMultipleThreads :: IO ()
testResumeMultipleThreads = do
a <- getSMPAgentClient' 1 aCfg initAgentServers testDB
noNetworkDelay a
setUserNetworkInfo a $ UserNetworkInfo UNNone False
vs <-
replicateM 50000 $ do
v <- newEmptyTMVarIO
void . forkIO $ waitNetwork a >>= atomically . putTMVar v
pure v
threadDelay 1000000
setUserNetworkInfo a $ UserNetworkInfo UNCellular True
ts <- mapM (atomically . readTMVar) vs
print $ minimum ts
print $ maximum ts
print $ sum ts `div` fromIntegral (length ts)
let average = sum ts `div` fromIntegral (length ts)
average < 3000000 `shouldBe` True
maximum ts < 4000000 `shouldBe` True
where
aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 1000000, increaseAfter = 0, maxInterval = 3600_000_000}}
noNetworkDelay :: AgentClient -> IO ()
noNetworkDelay a = (10000 >) <$> waitNetwork a `shouldReturn` True
noNetworkDelay a = do
d <- waitNetwork a
unless (d < 10000) $ expectationFailure $ "expected no delay, d = " <> show d
networkDelay :: AgentClient -> Int64 -> IO ()
networkDelay a d' = (\d -> d' < d && d < d' + 15000) <$> waitNetwork a `shouldReturn` True
networkDelay a d' = do
d <- waitNetwork a
unless (d' < d && d < d' + 15000) $ expectationFailure $ "expected delay " <> show d' <> ", d = " <> show d
waitNetwork :: AgentClient -> IO Int64
waitNetwork a = do
t <- getCurrentTime
waitForUserNetwork a `runReaderT` agentEnv a
waitForUserNetwork a
t' <- getCurrentTime
pure $ diffToMicroseconds $ diffUTCTime t' t