diff --git a/simplexmq.cabal b/simplexmq.cabal index a0d620957..9a3411610 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -102,7 +102,6 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240121_message_delivery_indexes - Simplex.Messaging.Agent.TAsyncs Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client Simplex.Messaging.Client.Agent diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 85ac38c2d..9c6571f14 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -172,7 +172,6 @@ import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), withTransaction) import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB -import Simplex.Messaging.Agent.TAsyncs import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues (getRcvQueues)) import qualified Simplex.Messaging.Agent.TRcvQueues as RQ import Simplex.Messaging.Client @@ -278,7 +277,6 @@ data AgentClient = AgentClient deleteLock :: Lock, -- smpSubWorkers for SMP servers sessions smpSubWorkers :: TMap SMPTransportSession (SessionVar (Async ())), - asyncClients :: TAsyncs, agentStats :: TMap AgentStatsKey (TVar Int), clientId :: Int, agentEnv :: Env @@ -418,7 +416,6 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = invLocks <- TM.empty deleteLock <- createLock smpSubWorkers <- TM.empty - asyncClients <- newTAsyncs agentStats <- TM.empty return AgentClient @@ -452,7 +449,6 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = invLocks, deleteLock, smpSubWorkers, - asyncClients, agentStats, clientId, agentEnv @@ -503,10 +499,11 @@ instance ProtocolServerClient XFTPErrorType FileResponse where getSMPServerClient :: forall m. AgentMonad m => AgentClient -> SMPTransportSession -> m SMPClient getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, _) = do unlessM (readTVarIO active) . throwError $ INACTIVE - atomically (getTSessVar c tSess smpClients) - >>= either newClient (waitForProtocolClient c tSess) + v <- atomically (getTSessVar c tSess smpClients) + either newClient (waitForProtocolClient c tSess) v + `catchAgentError` \e -> resubscribeSMPSession c tSess >> throwError e where - newClient = newProtocolClient c tSess smpClients connectClient resubscribeSMPSession + newClient = newProtocolClient c tSess smpClients connectClient connectClient :: SMPClientVar -> m SMPClient connectClient v = do cfg <- getClientConfig c smpCfg @@ -541,8 +538,13 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, resubscribeSMPSession :: AgentMonad' m => AgentClient -> SMPTransportSession -> m () resubscribeSMPSession c@AgentClient {smpSubWorkers} tSess = - atomically (getTSessVar c tSess smpSubWorkers) >>= either newSubWorker (\_ -> pure ()) + atomically getWorkerVar >>= mapM_ (either newSubWorker (\_ -> pure ())) where + getWorkerVar = + ifM + (null <$> getPending) + (pure Nothing) -- prevent race with cleanup and adding pending queues in another call + (Just <$> getTSessVar c tSess smpSubWorkers) newSubWorker v = do a <- async $ void (E.tryAny runSubWorker) >> atomically (cleanup v) atomically $ putTMVar (sessionVar v) a @@ -550,10 +552,11 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers} tSess = ri <- asks $ reconnectInterval . config timeoutCounts <- newTVarIO 0 withRetryInterval ri $ \_ loop -> do - pending <- atomically . RQ.getSessQueues tSess $ pendingSubs c + pending <- atomically getPending forM_ (L.nonEmpty pending) $ \qs -> do void . tryAgentError' $ reconnectSMPClient timeoutCounts c tSess qs loop + getPending = RQ.getSessQueues tSess $ pendingSubs c cleanup :: SessionVar (Async ()) -> STM () cleanup v = do -- Here we wait until TMVar is not empty to prevent worker cleanup happening before worker is added to TMVar. @@ -597,7 +600,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d unlessM (readTVarIO active) . throwError $ INACTIVE atomically (getTSessVar c tSess ntfClients) >>= either - (newProtocolClient c tSess ntfClients connectClient $ \_ _ -> pure ()) + (newProtocolClient c tSess ntfClients connectClient) (waitForProtocolClient c tSess) where connectClient :: NtfClientVar -> m NtfClient @@ -617,7 +620,7 @@ getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@ unlessM (readTVarIO active) . throwError $ INACTIVE atomically (getTSessVar c tSess xftpClients) >>= either - (newProtocolClient c tSess xftpClients connectClient $ \_ _ -> pure ()) + (newProtocolClient c tSess xftpClients connectClient) (waitForProtocolClient c tSess) where connectClient :: XFTPClientVar -> m XFTPClient @@ -666,47 +669,22 @@ newProtocolClient :: TransportSession msg -> TMap (TransportSession msg) (ClientVar msg) -> (ClientVar msg -> m (Client msg)) -> - (AgentClient -> TransportSession msg -> m ()) -> ClientVar msg -> m (Client msg) -newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient clientConnectedAsync v = - -- attempt sync connect first +newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient v = tryAgentError (connectClient v) >>= \case - Right client -> putClient client $> client - Left e -> do - handleErr e $ newAsyncAction asyncConnectLoop (asyncClients c) -- initiate reconnect loop for temporary errors - throwError e -- signal error to caller - where - putClient :: Client msg -> m () - putClient client = do + Right client -> do logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv <> " (user " <> bshow userId <> maybe "" (" for entity " <>) entityId_ <> ")" - -- tryPutTMVar is a precaution, it always succeeds here - r <- atomically $ tryPutTMVar (sessionVar v) (Right client) - unless r $ logError "newProtocolClient: cannot put connected client" + atomically $ putTMVar (sessionVar v) (Right client) liftIO $ incClientStat c userId client "CLIENT" "OK" atomically $ writeTBQueue (subQ c) ("", "", APC SAENone $ hostEvent CONNECT client) - handleErr :: AgentErrorType -> m () -> m () - handleErr e handleTmp = do + pure client + Left e -> do liftIO $ incServerStat c userId srv "CLIENT" $ strEncode e - if temporaryAgentError e - then handleTmp - else do - r <- atomically $ do - removeTSessVar v tSess clients - -- tryPutTMVar is a precaution, it always succeeds here - tryPutTMVar (sessionVar v) (Left e) - unless r $ logError "newProtocolClient: cannot put client error" - asyncConnectLoop :: Int -> m () - asyncConnectLoop aId = do - ri <- asks $ reconnectInterval . config - withRetryInterval ri (const retryConnectClient) - `E.finally` atomically (removeAsyncAction aId $ asyncClients c) - where - -- does not return anything, restarts instead of throwing errors - retryConnectClient loop = - tryAgentError (connectClient v) >>= \case - Right client -> putClient client >> clientConnectedAsync c tSess - Left e -> handleErr e loop + atomically $ do + removeTSessVar v tSess clients + putTMVar (sessionVar v) (Left e) + throwError e -- signal error to caller hostEvent :: forall err msg. (ProtocolTypeI (ProtoType msg), ProtocolServerClient err msg) => (AProtocolType -> TransportHost -> ACommand 'Agent 'AENone) -> Client msg -> ACommand 'Agent 'AENone hostEvent event = event (AProtocolType $ protocolTypeI @(ProtoType msg)) . clientTransportHost @@ -724,7 +702,6 @@ closeAgentClient c = liftIO $ do closeProtocolServerClients c ntfClients closeProtocolServerClients c xftpClients atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect - cancelActions . actions $ asyncClients c clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst) clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker clear connCmdsQueued @@ -776,9 +753,6 @@ closeXFTPServerClient :: AgentMonad' m => AgentClient -> UserId -> XFTPServer -> closeXFTPServerClient c userId server (FileDigest chunkDigest) = mkTransportSession c userId server chunkDigest >>= liftIO . closeClient c xftpClients -cancelActions :: (Foldable f, Monoid (f (Async ()))) => TVar (f (Async ())) -> IO () -cancelActions as = atomically (swapTVar as mempty) >>= mapM_ (forkIO . uninterruptibleCancel) - withConnLock :: MonadUnliftIO m => AgentClient -> ConnId -> String -> m a -> m a withConnLock _ "" _ = id withConnLock AgentClient {connLocks} connId name = withLockMap_ connLocks connId name @@ -1572,7 +1546,6 @@ data AgentWorkersDetails = AgentWorkersDetails smpDeliveryWorkers_ :: Map Text WorkersDetails, asyncCmdWorkers_ :: Map Text WorkersDetails, smpSubWorkers_ :: [Text], - asyncCients_ :: [Int], ntfWorkers_ :: Map Text WorkersDetails, ntfSMPWorkers_ :: Map Text WorkersDetails, xftpRcvWorkers_ :: Map Text WorkersDetails, @@ -1589,14 +1562,13 @@ data WorkersDetails = WorkersDetails deriving (Show) getAgentWorkersDetails :: MonadIO m => AgentClient -> m AgentWorkersDetails -getAgentWorkersDetails AgentClient {smpClients, ntfClients, xftpClients, smpDeliveryWorkers, asyncCmdWorkers, smpSubWorkers, asyncClients = TAsyncs {actions}, agentEnv} = do +getAgentWorkersDetails AgentClient {smpClients, ntfClients, xftpClients, smpDeliveryWorkers, asyncCmdWorkers, smpSubWorkers, agentEnv} = do smpClients_ <- textKeys <$> readTVarIO smpClients ntfClients_ <- textKeys <$> readTVarIO ntfClients xftpClients_ <- textKeys <$> readTVarIO xftpClients smpDeliveryWorkers_ <- workerStats . fmap fst =<< readTVarIO smpDeliveryWorkers asyncCmdWorkers_ <- workerStats =<< readTVarIO asyncCmdWorkers smpSubWorkers_ <- textKeys <$> readTVarIO smpSubWorkers - asyncCients_ <- M.keys <$> readTVarIO actions ntfWorkers_ <- workerStats =<< readTVarIO ntfWorkers ntfSMPWorkers_ <- workerStats =<< readTVarIO ntfSMPWorkers xftpRcvWorkers_ <- workerStats =<< readTVarIO xftpRcvWorkers @@ -1610,7 +1582,6 @@ getAgentWorkersDetails AgentClient {smpClients, ntfClients, xftpClients, smpDeli smpDeliveryWorkers_, asyncCmdWorkers_, smpSubWorkers_, - asyncCients_, ntfWorkers_, ntfSMPWorkers_, xftpRcvWorkers_, @@ -1639,7 +1610,6 @@ data AgentWorkersSummary = AgentWorkersSummary smpDeliveryWorkersCount :: Int, asyncCmdWorkersCount :: Int, smpSubWorkersCount :: Int, - asyncCientsCount :: Int, ntfWorkersCount :: Int, ntfSMPWorkersCount :: Int, xftpRcvWorkersCount :: Int, @@ -1649,14 +1619,13 @@ data AgentWorkersSummary = AgentWorkersSummary deriving (Show) getAgentWorkersSummary :: MonadIO m => AgentClient -> m AgentWorkersSummary -getAgentWorkersSummary AgentClient {smpClients, ntfClients, xftpClients, smpDeliveryWorkers, asyncCmdWorkers, smpSubWorkers, asyncClients = TAsyncs {actions}, agentEnv} = do +getAgentWorkersSummary AgentClient {smpClients, ntfClients, xftpClients, smpDeliveryWorkers, asyncCmdWorkers, smpSubWorkers, agentEnv} = do smpClientsCount <- M.size <$> readTVarIO smpClients ntfClientsCount <- M.size <$> readTVarIO ntfClients xftpClientsCount <- M.size <$> readTVarIO xftpClients smpDeliveryWorkersCount <- M.size <$> readTVarIO smpDeliveryWorkers asyncCmdWorkersCount <- M.size <$> readTVarIO asyncCmdWorkers smpSubWorkersCount <- M.size <$> readTVarIO smpSubWorkers - asyncCientsCount <- M.size <$> readTVarIO actions ntfWorkersCount <- M.size <$> readTVarIO ntfWorkers ntfSMPWorkersCount <- M.size <$> readTVarIO ntfSMPWorkers xftpRcvWorkersCount <- M.size <$> readTVarIO xftpRcvWorkers @@ -1670,7 +1639,6 @@ getAgentWorkersSummary AgentClient {smpClients, ntfClients, xftpClients, smpDeli smpDeliveryWorkersCount, asyncCmdWorkersCount, smpSubWorkersCount, - asyncCientsCount, ntfWorkersCount, ntfSMPWorkersCount, xftpRcvWorkersCount, diff --git a/src/Simplex/Messaging/Agent/Server.hs b/src/Simplex/Messaging/Agent/Server.hs index 4ac53f4e5..a6e15dcc4 100644 --- a/src/Simplex/Messaging/Agent/Server.hs +++ b/src/Simplex/Messaging/Agent/Server.hs @@ -34,21 +34,21 @@ import UnliftIO.STM -- See a full agent executable here: https://github.com/simplex-chat/simplexmq/blob/master/apps/smp-agent/Main.hs runSMPAgent :: (MonadRandom m, MonadUnliftIO m) => ATransport -> AgentConfig -> InitialAgentServers -> SQLiteStore -> m () runSMPAgent t cfg initServers store = - runSMPAgentBlocking t cfg initServers store =<< newEmptyTMVarIO + runSMPAgentBlocking t cfg initServers store 0 =<< newEmptyTMVarIO -- | Runs an SMP agent as a TCP service using passed configuration with signalling. -- -- This function uses passed TMVar to signal when the server is ready to accept TCP requests (True) -- and when it is disconnected from the TCP socket once the server thread is killed (False). -runSMPAgentBlocking :: (MonadRandom m, MonadUnliftIO m) => ATransport -> AgentConfig -> InitialAgentServers -> SQLiteStore -> TMVar Bool -> m () -runSMPAgentBlocking (ATransport t) cfg@AgentConfig {tcpPort, caCertificateFile, certificateFile, privateKeyFile} initServers store started = do +runSMPAgentBlocking :: (MonadRandom m, MonadUnliftIO m) => ATransport -> AgentConfig -> InitialAgentServers -> SQLiteStore -> Int -> TMVar Bool -> m () +runSMPAgentBlocking (ATransport t) cfg@AgentConfig {tcpPort, caCertificateFile, certificateFile, privateKeyFile} initServers store initClientId started = do liftIO (newSMPAgentEnv cfg store) >>= runReaderT (smpAgent t) where smpAgent :: forall c m'. (Transport c, MonadUnliftIO m', MonadReader Env m') => TProxy c -> m' () smpAgent _ = do -- tlsServerParams is not in Env to avoid breaking functional API w/t key and certificate generation tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile - clientId <- newTVarIO 0 + clientId <- newTVarIO initClientId runTransportServer started tcpPort tlsServerParams defaultTransportServerConfig $ \(h :: c) -> do liftIO . putLn h $ "Welcome to SMP agent v" <> B.pack simplexMQVersion cId <- atomically $ stateTVar clientId $ \i -> (i + 1, i + 1) diff --git a/src/Simplex/Messaging/Agent/TAsyncs.hs b/src/Simplex/Messaging/Agent/TAsyncs.hs deleted file mode 100644 index d2e2ea1f5..000000000 --- a/src/Simplex/Messaging/Agent/TAsyncs.hs +++ /dev/null @@ -1,24 +0,0 @@ -module Simplex.Messaging.Agent.TAsyncs where - -import Control.Monad.IO.Unlift (MonadUnliftIO) -import Simplex.Messaging.TMap (TMap) -import qualified Simplex.Messaging.TMap as TM -import UnliftIO.Async (Async, async) -import UnliftIO.STM - -data TAsyncs = TAsyncs - { actionId :: TVar Int, - actions :: TMap Int (Async ()) - } - -newTAsyncs :: STM TAsyncs -newTAsyncs = TAsyncs <$> newTVar 0 <*> TM.empty - -newAsyncAction :: MonadUnliftIO m => (Int -> m ()) -> TAsyncs -> m () -newAsyncAction action as = do - aId <- atomically $ stateTVar (actionId as) $ \i -> (i + 1, i + 1) - a <- async $ action aId - atomically $ TM.insert aId a $ actions as - -removeAsyncAction :: Int -> TAsyncs -> STM () -removeAsyncAction aId = TM.delete aId . actions diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index 1f048f7d0..589683a78 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -359,7 +359,6 @@ testServerConnectionAfterError t _ = do withAgent2 $ \alice -> do withServer $ do connect (bob, "bob") (alice, "alice") - bob <#. ("", "", DOWN server ["alice"]) alice <#. ("", "", DOWN server ["bob"]) alice #: ("1", "bob", "SEND F 5\nhello") #> ("1", "bob", MID 4) @@ -386,10 +385,10 @@ testServerConnectionAfterError t _ = do where server = SMPServer "localhost" testPort2 testKeyHash withServer test' = withSmpServerStoreLogOn (ATransport t) testPort2 (const test') `shouldReturn` () - withAgent1 = withAgent agentTestPort testDB - withAgent2 = withAgent agentTestPort2 testDB2 - withAgent :: String -> FilePath -> (c -> IO a) -> IO a - withAgent agentPort agentDB = withSmpAgentThreadOn_ (ATransport t) (agentPort, testPort2, agentDB) (pure ()) . const . testSMPAgentClientOn agentPort + withAgent1 = withAgent agentTestPort testDB 0 + withAgent2 = withAgent agentTestPort2 testDB2 10 + withAgent :: String -> FilePath -> Int -> (c -> IO a) -> IO a + withAgent agentPort agentDB initClientId = withSmpAgentThreadOn_ (ATransport t) (agentPort, testPort2, agentDB) initClientId (pure ()) . const . testSMPAgentClientOn agentPort testMsgDeliveryAgentRestart :: Transport c => TProxy c -> c -> IO () testMsgDeliveryAgentRestart t bob = do @@ -424,7 +423,7 @@ testMsgDeliveryAgentRestart t bob = do removeFile testDB where withServer test' = withSmpServerStoreLogOn (ATransport t) testPort2 (const test') `shouldReturn` () - withAgent = withSmpAgentThreadOn_ (ATransport t) (agentTestPort, testPort, testDB) (pure ()) . const . testSMPAgentClientOn agentTestPort + withAgent = withSmpAgentThreadOn_ (ATransport t) (agentTestPort, testPort, testDB) 0 (pure ()) . const . testSMPAgentClientOn agentTestPort testConcurrentMsgDelivery :: Transport c => TProxy c -> c -> c -> IO () testConcurrentMsgDelivery _ alice bob = do diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index 3a3c69f8c..1a70f6588 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -225,15 +225,15 @@ fastMessageRetryInterval = RetryInterval2 {riFast = fastRetryInterval, riSlow = type AgentTestMonad m = (MonadUnliftIO m, MonadRandom m, MonadFail m) -withSmpAgentThreadOn_ :: AgentTestMonad m => ATransport -> (ServiceName, ServiceName, FilePath) -> m () -> (ThreadId -> m a) -> m a -withSmpAgentThreadOn_ t (port', smpPort', db') afterProcess = +withSmpAgentThreadOn_ :: AgentTestMonad m => ATransport -> (ServiceName, ServiceName, FilePath) -> Int -> m () -> (ThreadId -> m a) -> m a +withSmpAgentThreadOn_ t (port', smpPort', db') initClientId afterProcess = let cfg' = agentCfg {tcpPort = port'} initServers' = initAgentServers {smp = userServers [ProtoServerWithAuth (SMPServer "localhost" smpPort' testKeyHash) Nothing]} in serverBracket ( \started -> do Right st <- liftIO $ createAgentStore db' "" False MCError when (dbNew st) . liftIO $ withTransaction' st (`SQL.execute_` "INSERT INTO users (user_id) VALUES (1)") - runSMPAgentBlocking t cfg' initServers' st started + runSMPAgentBlocking t cfg' initServers' st initClientId started ) afterProcess @@ -241,7 +241,7 @@ userServers :: NonEmpty (ProtoServerWithAuth p) -> Map UserId (NonEmpty (ProtoSe userServers srvs = M.fromList [(1, srvs)] withSmpAgentThreadOn :: AgentTestMonad m => ATransport -> (ServiceName, ServiceName, FilePath) -> (ThreadId -> m a) -> m a -withSmpAgentThreadOn t a@(_, _, db') = withSmpAgentThreadOn_ t a $ removeFile db' +withSmpAgentThreadOn t a@(_, _, db') = withSmpAgentThreadOn_ t a 0 $ removeFile db' withSmpAgentOn :: AgentTestMonad m => ATransport -> (ServiceName, ServiceName, FilePath) -> m a -> m a withSmpAgentOn t (port', smpPort', db') = withSmpAgentThreadOn t (port', smpPort', db') . const