From 571d148bdf9e29403b9bcec6f7daeff815a4db38 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Sun, 18 Aug 2024 21:30:06 +0100 Subject: [PATCH] agent: do not start delivery workers when there are no messages to deliver (#1263) * agent: use weak ThreadId and forkIO in workers instead of async (reduce memory) * agent: do not start and exit delivery workers when there are no messages to deliver (#1264) * agent: exit delivery workers when no messages to deliver * only start delivery workers when there are pending messages * fix * focus test * enable all tests * lift * do not exit workers when there is no work --- src/Simplex/Messaging/Agent.hs | 6 +++++- src/Simplex/Messaging/Agent/Client.hs | 9 +++++---- src/Simplex/Messaging/Agent/Env/SQLite.hs | 6 ++++-- src/Simplex/Messaging/Agent/Store/SQLite.hs | 5 +++++ tests/AgentTests/NotificationTests.hs | 2 ++ 5 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 6ae73061a..f93a0b1a6 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -959,7 +959,7 @@ subscribeConnections' c connIds = do let (errs, cs) = M.mapEither id conns errs' = M.map (Left . storeError) errs (subRs, rcvQs) = M.mapEither rcvQueueOrResult cs - lift $ mapM_ (mapM_ (\(cData, sqs) -> mapM_ (resumeMsgDelivery c cData) sqs) . sndQueue) cs + resumeDelivery cs lift $ resumeConnCmds c $ M.keys cs rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concat $ M.elems rcvQs) ns <- asks ntfSupervisor @@ -1005,6 +1005,10 @@ subscribeConnections' c connIds = do let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCDelete ConnData {connId} = toConnData conn atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd) + resumeDelivery :: Map ConnId SomeConn -> AM () + resumeDelivery conns = do + conns' <- M.restrictKeys conns . S.fromList <$> withStore' c getConnectionsForDelivery + lift $ mapM_ (mapM_ (\(cData, sqs) -> mapM_ (resumeMsgDelivery c cData) sqs) . sndQueue) conns' sndQueue :: SomeConn -> Maybe (ConnData, NonEmpty SndQueue) sndQueue (SomeConn _ conn) = case conn of DuplexConnection cData _ sqs -> Just (cData, sqs) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 02b31cb95..fbdb53548 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -163,7 +163,7 @@ module Simplex.Messaging.Agent.Client where import Control.Applicative ((<|>)) -import Control.Concurrent (ThreadId, forkIO) +import Control.Concurrent (ThreadId, killThread) import Control.Concurrent.Async (Async, uninterruptibleCancel) import Control.Concurrent.STM (retry) import Control.Exception (AsyncException (..), BlockedIndefinitelyOnSTM (..)) @@ -266,10 +266,11 @@ import Simplex.Messaging.Transport (SMPVersion, SessionId, THandleParams (sessio import Simplex.Messaging.Transport.Client (TransportHost (..)) import Simplex.Messaging.Util import Simplex.Messaging.Version -import System.Mem.Weak (Weak) +import System.Mem.Weak (Weak, deRefWeak) import System.Random (randomR) import UnliftIO (mapConcurrently, timeout) import UnliftIO.Async (async) +import UnliftIO.Concurrent (forkIO, mkWeakThreadId) import UnliftIO.Directory (doesFileExist, getTemporaryDirectory, removeFile) import qualified UnliftIO.Exception as E import UnliftIO.STM @@ -410,7 +411,7 @@ runWorkerAsync Worker {action} work = (atomically . tryPutTMVar action) -- if it was running (or if start crashes), put it back and unlock (don't lock if it was just started) (\a -> when (isNothing a) start) -- start worker if it's not running where - start = atomically . putTMVar action . Just =<< async work + start = atomically . putTMVar action . Just =<< mkWeakThreadId =<< forkIO work data AgentOperation = AONtfNetwork | AORcvNetwork | AOMsgDelivery | AOSndNetwork | AODatabase deriving (Eq, Show) @@ -905,7 +906,7 @@ closeAgentClient c = do cancelWorker :: Worker -> IO () cancelWorker Worker {doWork, action} = do noWorkToDo doWork - atomically (tryTakeTMVar action) >>= mapM_ (mapM_ uninterruptibleCancel) + atomically (tryTakeTMVar action) >>= mapM_ (mapM_ $ deRefWeak >=> mapM_ killThread) waitUntilActive :: AgentClient -> IO () waitUntilActive AgentClient {active} = unlessM (readTVarIO active) $ atomically $ unlessM (readTVar active) retry diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index f57cf91e9..bc5e800c5 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -41,6 +41,7 @@ module Simplex.Messaging.Agent.Env.SQLite ) where +import Control.Concurrent (ThreadId) import Control.Monad.Except import Control.Monad.IO.Unlift import Control.Monad.Reader @@ -76,8 +77,9 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (SMPVersion, TLS, Transport (..)) import Simplex.Messaging.Transport.Client (defaultSMPPort) import Simplex.Messaging.Util (allFinally, catchAllErrors, catchAllErrors', tryAllErrors, tryAllErrors') +import System.Mem.Weak (Weak) import System.Random (StdGen, newStdGen) -import UnliftIO (Async, SomeException) +import UnliftIO (SomeException) import UnliftIO.STM type AM' a = ReaderT Env IO a @@ -312,7 +314,7 @@ mkInternal = INTERNAL . show data Worker = Worker { workerId :: Int, doWork :: TMVar (), - action :: TMVar (Maybe (Async ())), + action :: TMVar (Maybe (Weak ThreadId)), restarts :: TVar RestartCount } diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index e0e4fc58f..fd29bd743 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -110,6 +110,7 @@ module Simplex.Messaging.Agent.Store.SQLite getSndMsgViaRcpt, updateSndMsgRcpt, getPendingQueueMsg, + getConnectionsForDelivery, updatePendingMsgRIState, deletePendingMsgs, getExpiredSndMessages, @@ -1008,6 +1009,10 @@ updateSndMsgRcpt db connId sndMsgId MsgReceipt {agentMsgId, msgRcptStatus} = "UPDATE snd_messages SET rcpt_internal_id = ?, rcpt_status = ? WHERE conn_id = ? AND internal_snd_id = ?" (agentMsgId, msgRcptStatus, connId, sndMsgId) +getConnectionsForDelivery :: DB.Connection -> IO [ConnId] +getConnectionsForDelivery db = + map fromOnly <$> DB.query_ db "SELECT DISTINCT conn_id FROM snd_message_deliveries WHERE failed = 0" + getPendingQueueMsg :: DB.Connection -> ConnId -> SndQueue -> IO (Either StoreError (Maybe (Maybe RcvQueue, PendingMsgData))) getPendingQueueMsg db connId SndQueue {dbQueueId} = getWorkItem "message" getMsgId getMsgData markMsgFailed diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index cc79faeca..012da704d 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -508,6 +508,7 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} baseId ali suspendAgent alice 0 closeSQLiteStore store threadDelay 1000000 + print "before opening the database from another agent" -- aliceNtf client doesn't have subscription and is allowed to get notification message withAgent 3 aliceCfg initAgentServers testDB $ \aliceNtf -> runRight_ $ do @@ -515,6 +516,7 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} baseId ali pure () threadDelay 1000000 + print "after closing the database in another agent" reopenSQLiteStore store foregroundAgent alice threadDelay 500000