agent: only retry connecting SMP clients when there are pending subscriptions (#981)

* agent: only retry connecting SMP clients when there are pending subscriptions

* fix

* remove retry on creating clients

* simplify
This commit is contained in:
Evgeny Poberezkin
2024-02-01 16:17:37 +00:00
committed by GitHub
parent 24b84106a6
commit 2ae1c9f79d
6 changed files with 38 additions and 96 deletions
-1
View File
@@ -102,7 +102,6 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240121_message_delivery_indexes
Simplex.Messaging.Agent.TAsyncs
Simplex.Messaging.Agent.TRcvQueues
Simplex.Messaging.Client
Simplex.Messaging.Client.Agent
+25 -57
View File
@@ -172,7 +172,6 @@ import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), withTransaction)
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import Simplex.Messaging.Agent.TAsyncs
import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues (getRcvQueues))
import qualified Simplex.Messaging.Agent.TRcvQueues as RQ
import Simplex.Messaging.Client
@@ -278,7 +277,6 @@ data AgentClient = AgentClient
deleteLock :: Lock,
-- smpSubWorkers for SMP servers sessions
smpSubWorkers :: TMap SMPTransportSession (SessionVar (Async ())),
asyncClients :: TAsyncs,
agentStats :: TMap AgentStatsKey (TVar Int),
clientId :: Int,
agentEnv :: Env
@@ -418,7 +416,6 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
invLocks <- TM.empty
deleteLock <- createLock
smpSubWorkers <- TM.empty
asyncClients <- newTAsyncs
agentStats <- TM.empty
return
AgentClient
@@ -452,7 +449,6 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
invLocks,
deleteLock,
smpSubWorkers,
asyncClients,
agentStats,
clientId,
agentEnv
@@ -503,10 +499,11 @@ instance ProtocolServerClient XFTPErrorType FileResponse where
getSMPServerClient :: forall m. AgentMonad m => AgentClient -> SMPTransportSession -> m SMPClient
getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, _) = do
unlessM (readTVarIO active) . throwError $ INACTIVE
atomically (getTSessVar c tSess smpClients)
>>= either newClient (waitForProtocolClient c tSess)
v <- atomically (getTSessVar c tSess smpClients)
either newClient (waitForProtocolClient c tSess) v
`catchAgentError` \e -> resubscribeSMPSession c tSess >> throwError e
where
newClient = newProtocolClient c tSess smpClients connectClient resubscribeSMPSession
newClient = newProtocolClient c tSess smpClients connectClient
connectClient :: SMPClientVar -> m SMPClient
connectClient v = do
cfg <- getClientConfig c smpCfg
@@ -541,8 +538,13 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv,
resubscribeSMPSession :: AgentMonad' m => AgentClient -> SMPTransportSession -> m ()
resubscribeSMPSession c@AgentClient {smpSubWorkers} tSess =
atomically (getTSessVar c tSess smpSubWorkers) >>= either newSubWorker (\_ -> pure ())
atomically getWorkerVar >>= mapM_ (either newSubWorker (\_ -> pure ()))
where
getWorkerVar =
ifM
(null <$> getPending)
(pure Nothing) -- prevent race with cleanup and adding pending queues in another call
(Just <$> getTSessVar c tSess smpSubWorkers)
newSubWorker v = do
a <- async $ void (E.tryAny runSubWorker) >> atomically (cleanup v)
atomically $ putTMVar (sessionVar v) a
@@ -550,10 +552,11 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers} tSess =
ri <- asks $ reconnectInterval . config
timeoutCounts <- newTVarIO 0
withRetryInterval ri $ \_ loop -> do
pending <- atomically . RQ.getSessQueues tSess $ pendingSubs c
pending <- atomically getPending
forM_ (L.nonEmpty pending) $ \qs -> do
void . tryAgentError' $ reconnectSMPClient timeoutCounts c tSess qs
loop
getPending = RQ.getSessQueues tSess $ pendingSubs c
cleanup :: SessionVar (Async ()) -> STM ()
cleanup v = do
-- Here we wait until TMVar is not empty to prevent worker cleanup happening before worker is added to TMVar.
@@ -597,7 +600,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d
unlessM (readTVarIO active) . throwError $ INACTIVE
atomically (getTSessVar c tSess ntfClients)
>>= either
(newProtocolClient c tSess ntfClients connectClient $ \_ _ -> pure ())
(newProtocolClient c tSess ntfClients connectClient)
(waitForProtocolClient c tSess)
where
connectClient :: NtfClientVar -> m NtfClient
@@ -617,7 +620,7 @@ getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@
unlessM (readTVarIO active) . throwError $ INACTIVE
atomically (getTSessVar c tSess xftpClients)
>>= either
(newProtocolClient c tSess xftpClients connectClient $ \_ _ -> pure ())
(newProtocolClient c tSess xftpClients connectClient)
(waitForProtocolClient c tSess)
where
connectClient :: XFTPClientVar -> m XFTPClient
@@ -666,47 +669,22 @@ newProtocolClient ::
TransportSession msg ->
TMap (TransportSession msg) (ClientVar msg) ->
(ClientVar msg -> m (Client msg)) ->
(AgentClient -> TransportSession msg -> m ()) ->
ClientVar msg ->
m (Client msg)
newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient clientConnectedAsync v =
-- attempt sync connect first
newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient v =
tryAgentError (connectClient v) >>= \case
Right client -> putClient client $> client
Left e -> do
handleErr e $ newAsyncAction asyncConnectLoop (asyncClients c) -- initiate reconnect loop for temporary errors
throwError e -- signal error to caller
where
putClient :: Client msg -> m ()
putClient client = do
Right client -> do
logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv <> " (user " <> bshow userId <> maybe "" (" for entity " <>) entityId_ <> ")"
-- tryPutTMVar is a precaution, it always succeeds here
r <- atomically $ tryPutTMVar (sessionVar v) (Right client)
unless r $ logError "newProtocolClient: cannot put connected client"
atomically $ putTMVar (sessionVar v) (Right client)
liftIO $ incClientStat c userId client "CLIENT" "OK"
atomically $ writeTBQueue (subQ c) ("", "", APC SAENone $ hostEvent CONNECT client)
handleErr :: AgentErrorType -> m () -> m ()
handleErr e handleTmp = do
pure client
Left e -> do
liftIO $ incServerStat c userId srv "CLIENT" $ strEncode e
if temporaryAgentError e
then handleTmp
else do
r <- atomically $ do
removeTSessVar v tSess clients
-- tryPutTMVar is a precaution, it always succeeds here
tryPutTMVar (sessionVar v) (Left e)
unless r $ logError "newProtocolClient: cannot put client error"
asyncConnectLoop :: Int -> m ()
asyncConnectLoop aId = do
ri <- asks $ reconnectInterval . config
withRetryInterval ri (const retryConnectClient)
`E.finally` atomically (removeAsyncAction aId $ asyncClients c)
where
-- does not return anything, restarts instead of throwing errors
retryConnectClient loop =
tryAgentError (connectClient v) >>= \case
Right client -> putClient client >> clientConnectedAsync c tSess
Left e -> handleErr e loop
atomically $ do
removeTSessVar v tSess clients
putTMVar (sessionVar v) (Left e)
throwError e -- signal error to caller
hostEvent :: forall err msg. (ProtocolTypeI (ProtoType msg), ProtocolServerClient err msg) => (AProtocolType -> TransportHost -> ACommand 'Agent 'AENone) -> Client msg -> ACommand 'Agent 'AENone
hostEvent event = event (AProtocolType $ protocolTypeI @(ProtoType msg)) . clientTransportHost
@@ -724,7 +702,6 @@ closeAgentClient c = liftIO $ do
closeProtocolServerClients c ntfClients
closeProtocolServerClients c xftpClients
atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect
cancelActions . actions $ asyncClients c
clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst)
clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker
clear connCmdsQueued
@@ -776,9 +753,6 @@ closeXFTPServerClient :: AgentMonad' m => AgentClient -> UserId -> XFTPServer ->
closeXFTPServerClient c userId server (FileDigest chunkDigest) =
mkTransportSession c userId server chunkDigest >>= liftIO . closeClient c xftpClients
cancelActions :: (Foldable f, Monoid (f (Async ()))) => TVar (f (Async ())) -> IO ()
cancelActions as = atomically (swapTVar as mempty) >>= mapM_ (forkIO . uninterruptibleCancel)
withConnLock :: MonadUnliftIO m => AgentClient -> ConnId -> String -> m a -> m a
withConnLock _ "" _ = id
withConnLock AgentClient {connLocks} connId name = withLockMap_ connLocks connId name
@@ -1572,7 +1546,6 @@ data AgentWorkersDetails = AgentWorkersDetails
smpDeliveryWorkers_ :: Map Text WorkersDetails,
asyncCmdWorkers_ :: Map Text WorkersDetails,
smpSubWorkers_ :: [Text],
asyncCients_ :: [Int],
ntfWorkers_ :: Map Text WorkersDetails,
ntfSMPWorkers_ :: Map Text WorkersDetails,
xftpRcvWorkers_ :: Map Text WorkersDetails,
@@ -1589,14 +1562,13 @@ data WorkersDetails = WorkersDetails
deriving (Show)
getAgentWorkersDetails :: MonadIO m => AgentClient -> m AgentWorkersDetails
getAgentWorkersDetails AgentClient {smpClients, ntfClients, xftpClients, smpDeliveryWorkers, asyncCmdWorkers, smpSubWorkers, asyncClients = TAsyncs {actions}, agentEnv} = do
getAgentWorkersDetails AgentClient {smpClients, ntfClients, xftpClients, smpDeliveryWorkers, asyncCmdWorkers, smpSubWorkers, agentEnv} = do
smpClients_ <- textKeys <$> readTVarIO smpClients
ntfClients_ <- textKeys <$> readTVarIO ntfClients
xftpClients_ <- textKeys <$> readTVarIO xftpClients
smpDeliveryWorkers_ <- workerStats . fmap fst =<< readTVarIO smpDeliveryWorkers
asyncCmdWorkers_ <- workerStats =<< readTVarIO asyncCmdWorkers
smpSubWorkers_ <- textKeys <$> readTVarIO smpSubWorkers
asyncCients_ <- M.keys <$> readTVarIO actions
ntfWorkers_ <- workerStats =<< readTVarIO ntfWorkers
ntfSMPWorkers_ <- workerStats =<< readTVarIO ntfSMPWorkers
xftpRcvWorkers_ <- workerStats =<< readTVarIO xftpRcvWorkers
@@ -1610,7 +1582,6 @@ getAgentWorkersDetails AgentClient {smpClients, ntfClients, xftpClients, smpDeli
smpDeliveryWorkers_,
asyncCmdWorkers_,
smpSubWorkers_,
asyncCients_,
ntfWorkers_,
ntfSMPWorkers_,
xftpRcvWorkers_,
@@ -1639,7 +1610,6 @@ data AgentWorkersSummary = AgentWorkersSummary
smpDeliveryWorkersCount :: Int,
asyncCmdWorkersCount :: Int,
smpSubWorkersCount :: Int,
asyncCientsCount :: Int,
ntfWorkersCount :: Int,
ntfSMPWorkersCount :: Int,
xftpRcvWorkersCount :: Int,
@@ -1649,14 +1619,13 @@ data AgentWorkersSummary = AgentWorkersSummary
deriving (Show)
getAgentWorkersSummary :: MonadIO m => AgentClient -> m AgentWorkersSummary
getAgentWorkersSummary AgentClient {smpClients, ntfClients, xftpClients, smpDeliveryWorkers, asyncCmdWorkers, smpSubWorkers, asyncClients = TAsyncs {actions}, agentEnv} = do
getAgentWorkersSummary AgentClient {smpClients, ntfClients, xftpClients, smpDeliveryWorkers, asyncCmdWorkers, smpSubWorkers, agentEnv} = do
smpClientsCount <- M.size <$> readTVarIO smpClients
ntfClientsCount <- M.size <$> readTVarIO ntfClients
xftpClientsCount <- M.size <$> readTVarIO xftpClients
smpDeliveryWorkersCount <- M.size <$> readTVarIO smpDeliveryWorkers
asyncCmdWorkersCount <- M.size <$> readTVarIO asyncCmdWorkers
smpSubWorkersCount <- M.size <$> readTVarIO smpSubWorkers
asyncCientsCount <- M.size <$> readTVarIO actions
ntfWorkersCount <- M.size <$> readTVarIO ntfWorkers
ntfSMPWorkersCount <- M.size <$> readTVarIO ntfSMPWorkers
xftpRcvWorkersCount <- M.size <$> readTVarIO xftpRcvWorkers
@@ -1670,7 +1639,6 @@ getAgentWorkersSummary AgentClient {smpClients, ntfClients, xftpClients, smpDeli
smpDeliveryWorkersCount,
asyncCmdWorkersCount,
smpSubWorkersCount,
asyncCientsCount,
ntfWorkersCount,
ntfSMPWorkersCount,
xftpRcvWorkersCount,
+4 -4
View File
@@ -34,21 +34,21 @@ import UnliftIO.STM
-- See a full agent executable here: https://github.com/simplex-chat/simplexmq/blob/master/apps/smp-agent/Main.hs
runSMPAgent :: (MonadRandom m, MonadUnliftIO m) => ATransport -> AgentConfig -> InitialAgentServers -> SQLiteStore -> m ()
runSMPAgent t cfg initServers store =
runSMPAgentBlocking t cfg initServers store =<< newEmptyTMVarIO
runSMPAgentBlocking t cfg initServers store 0 =<< newEmptyTMVarIO
-- | Runs an SMP agent as a TCP service using passed configuration with signalling.
--
-- This function uses passed TMVar to signal when the server is ready to accept TCP requests (True)
-- and when it is disconnected from the TCP socket once the server thread is killed (False).
runSMPAgentBlocking :: (MonadRandom m, MonadUnliftIO m) => ATransport -> AgentConfig -> InitialAgentServers -> SQLiteStore -> TMVar Bool -> m ()
runSMPAgentBlocking (ATransport t) cfg@AgentConfig {tcpPort, caCertificateFile, certificateFile, privateKeyFile} initServers store started = do
runSMPAgentBlocking :: (MonadRandom m, MonadUnliftIO m) => ATransport -> AgentConfig -> InitialAgentServers -> SQLiteStore -> Int -> TMVar Bool -> m ()
runSMPAgentBlocking (ATransport t) cfg@AgentConfig {tcpPort, caCertificateFile, certificateFile, privateKeyFile} initServers store initClientId started = do
liftIO (newSMPAgentEnv cfg store) >>= runReaderT (smpAgent t)
where
smpAgent :: forall c m'. (Transport c, MonadUnliftIO m', MonadReader Env m') => TProxy c -> m' ()
smpAgent _ = do
-- tlsServerParams is not in Env to avoid breaking functional API w/t key and certificate generation
tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile
clientId <- newTVarIO 0
clientId <- newTVarIO initClientId
runTransportServer started tcpPort tlsServerParams defaultTransportServerConfig $ \(h :: c) -> do
liftIO . putLn h $ "Welcome to SMP agent v" <> B.pack simplexMQVersion
cId <- atomically $ stateTVar clientId $ \i -> (i + 1, i + 1)
-24
View File
@@ -1,24 +0,0 @@
module Simplex.Messaging.Agent.TAsyncs where
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import UnliftIO.Async (Async, async)
import UnliftIO.STM
data TAsyncs = TAsyncs
{ actionId :: TVar Int,
actions :: TMap Int (Async ())
}
newTAsyncs :: STM TAsyncs
newTAsyncs = TAsyncs <$> newTVar 0 <*> TM.empty
newAsyncAction :: MonadUnliftIO m => (Int -> m ()) -> TAsyncs -> m ()
newAsyncAction action as = do
aId <- atomically $ stateTVar (actionId as) $ \i -> (i + 1, i + 1)
a <- async $ action aId
atomically $ TM.insert aId a $ actions as
removeAsyncAction :: Int -> TAsyncs -> STM ()
removeAsyncAction aId = TM.delete aId . actions
+5 -6
View File
@@ -359,7 +359,6 @@ testServerConnectionAfterError t _ = do
withAgent2 $ \alice -> do
withServer $ do
connect (bob, "bob") (alice, "alice")
bob <#. ("", "", DOWN server ["alice"])
alice <#. ("", "", DOWN server ["bob"])
alice #: ("1", "bob", "SEND F 5\nhello") #> ("1", "bob", MID 4)
@@ -386,10 +385,10 @@ testServerConnectionAfterError t _ = do
where
server = SMPServer "localhost" testPort2 testKeyHash
withServer test' = withSmpServerStoreLogOn (ATransport t) testPort2 (const test') `shouldReturn` ()
withAgent1 = withAgent agentTestPort testDB
withAgent2 = withAgent agentTestPort2 testDB2
withAgent :: String -> FilePath -> (c -> IO a) -> IO a
withAgent agentPort agentDB = withSmpAgentThreadOn_ (ATransport t) (agentPort, testPort2, agentDB) (pure ()) . const . testSMPAgentClientOn agentPort
withAgent1 = withAgent agentTestPort testDB 0
withAgent2 = withAgent agentTestPort2 testDB2 10
withAgent :: String -> FilePath -> Int -> (c -> IO a) -> IO a
withAgent agentPort agentDB initClientId = withSmpAgentThreadOn_ (ATransport t) (agentPort, testPort2, agentDB) initClientId (pure ()) . const . testSMPAgentClientOn agentPort
testMsgDeliveryAgentRestart :: Transport c => TProxy c -> c -> IO ()
testMsgDeliveryAgentRestart t bob = do
@@ -424,7 +423,7 @@ testMsgDeliveryAgentRestart t bob = do
removeFile testDB
where
withServer test' = withSmpServerStoreLogOn (ATransport t) testPort2 (const test') `shouldReturn` ()
withAgent = withSmpAgentThreadOn_ (ATransport t) (agentTestPort, testPort, testDB) (pure ()) . const . testSMPAgentClientOn agentTestPort
withAgent = withSmpAgentThreadOn_ (ATransport t) (agentTestPort, testPort, testDB) 0 (pure ()) . const . testSMPAgentClientOn agentTestPort
testConcurrentMsgDelivery :: Transport c => TProxy c -> c -> c -> IO ()
testConcurrentMsgDelivery _ alice bob = do
+4 -4
View File
@@ -225,15 +225,15 @@ fastMessageRetryInterval = RetryInterval2 {riFast = fastRetryInterval, riSlow =
type AgentTestMonad m = (MonadUnliftIO m, MonadRandom m, MonadFail m)
withSmpAgentThreadOn_ :: AgentTestMonad m => ATransport -> (ServiceName, ServiceName, FilePath) -> m () -> (ThreadId -> m a) -> m a
withSmpAgentThreadOn_ t (port', smpPort', db') afterProcess =
withSmpAgentThreadOn_ :: AgentTestMonad m => ATransport -> (ServiceName, ServiceName, FilePath) -> Int -> m () -> (ThreadId -> m a) -> m a
withSmpAgentThreadOn_ t (port', smpPort', db') initClientId afterProcess =
let cfg' = agentCfg {tcpPort = port'}
initServers' = initAgentServers {smp = userServers [ProtoServerWithAuth (SMPServer "localhost" smpPort' testKeyHash) Nothing]}
in serverBracket
( \started -> do
Right st <- liftIO $ createAgentStore db' "" False MCError
when (dbNew st) . liftIO $ withTransaction' st (`SQL.execute_` "INSERT INTO users (user_id) VALUES (1)")
runSMPAgentBlocking t cfg' initServers' st started
runSMPAgentBlocking t cfg' initServers' st initClientId started
)
afterProcess
@@ -241,7 +241,7 @@ userServers :: NonEmpty (ProtoServerWithAuth p) -> Map UserId (NonEmpty (ProtoSe
userServers srvs = M.fromList [(1, srvs)]
withSmpAgentThreadOn :: AgentTestMonad m => ATransport -> (ServiceName, ServiceName, FilePath) -> (ThreadId -> m a) -> m a
withSmpAgentThreadOn t a@(_, _, db') = withSmpAgentThreadOn_ t a $ removeFile db'
withSmpAgentThreadOn t a@(_, _, db') = withSmpAgentThreadOn_ t a 0 $ removeFile db'
withSmpAgentOn :: AgentTestMonad m => ATransport -> (ServiceName, ServiceName, FilePath) -> m a -> m a
withSmpAgentOn t (port', smpPort', db') = withSmpAgentThreadOn t (port', smpPort', db') . const