mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-25 09:52:18 +00:00
agent: do not start delivery workers when there are no messages to deliver (#1263)
* agent: use weak ThreadId and forkIO in workers instead of async (reduce memory) * agent: do not start and exit delivery workers when there are no messages to deliver (#1264) * agent: exit delivery workers when no messages to deliver * only start delivery workers when there are pending messages * fix * focus test * enable all tests * lift * do not exit workers when there is no work
This commit is contained in:
@@ -959,7 +959,7 @@ subscribeConnections' c connIds = do
|
||||
let (errs, cs) = M.mapEither id conns
|
||||
errs' = M.map (Left . storeError) errs
|
||||
(subRs, rcvQs) = M.mapEither rcvQueueOrResult cs
|
||||
lift $ mapM_ (mapM_ (\(cData, sqs) -> mapM_ (resumeMsgDelivery c cData) sqs) . sndQueue) cs
|
||||
resumeDelivery cs
|
||||
lift $ resumeConnCmds c $ M.keys cs
|
||||
rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concat $ M.elems rcvQs)
|
||||
ns <- asks ntfSupervisor
|
||||
@@ -1005,6 +1005,10 @@ subscribeConnections' c connIds = do
|
||||
let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCDelete
|
||||
ConnData {connId} = toConnData conn
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd)
|
||||
resumeDelivery :: Map ConnId SomeConn -> AM ()
|
||||
resumeDelivery conns = do
|
||||
conns' <- M.restrictKeys conns . S.fromList <$> withStore' c getConnectionsForDelivery
|
||||
lift $ mapM_ (mapM_ (\(cData, sqs) -> mapM_ (resumeMsgDelivery c cData) sqs) . sndQueue) conns'
|
||||
sndQueue :: SomeConn -> Maybe (ConnData, NonEmpty SndQueue)
|
||||
sndQueue (SomeConn _ conn) = case conn of
|
||||
DuplexConnection cData _ sqs -> Just (cData, sqs)
|
||||
|
||||
@@ -163,7 +163,7 @@ module Simplex.Messaging.Agent.Client
|
||||
where
|
||||
|
||||
import Control.Applicative ((<|>))
|
||||
import Control.Concurrent (ThreadId, forkIO)
|
||||
import Control.Concurrent (ThreadId, killThread)
|
||||
import Control.Concurrent.Async (Async, uninterruptibleCancel)
|
||||
import Control.Concurrent.STM (retry)
|
||||
import Control.Exception (AsyncException (..), BlockedIndefinitelyOnSTM (..))
|
||||
@@ -266,10 +266,11 @@ import Simplex.Messaging.Transport (SMPVersion, SessionId, THandleParams (sessio
|
||||
import Simplex.Messaging.Transport.Client (TransportHost (..))
|
||||
import Simplex.Messaging.Util
|
||||
import Simplex.Messaging.Version
|
||||
import System.Mem.Weak (Weak)
|
||||
import System.Mem.Weak (Weak, deRefWeak)
|
||||
import System.Random (randomR)
|
||||
import UnliftIO (mapConcurrently, timeout)
|
||||
import UnliftIO.Async (async)
|
||||
import UnliftIO.Concurrent (forkIO, mkWeakThreadId)
|
||||
import UnliftIO.Directory (doesFileExist, getTemporaryDirectory, removeFile)
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM
|
||||
@@ -410,7 +411,7 @@ runWorkerAsync Worker {action} work =
|
||||
(atomically . tryPutTMVar action) -- if it was running (or if start crashes), put it back and unlock (don't lock if it was just started)
|
||||
(\a -> when (isNothing a) start) -- start worker if it's not running
|
||||
where
|
||||
start = atomically . putTMVar action . Just =<< async work
|
||||
start = atomically . putTMVar action . Just =<< mkWeakThreadId =<< forkIO work
|
||||
|
||||
data AgentOperation = AONtfNetwork | AORcvNetwork | AOMsgDelivery | AOSndNetwork | AODatabase
|
||||
deriving (Eq, Show)
|
||||
@@ -905,7 +906,7 @@ closeAgentClient c = do
|
||||
cancelWorker :: Worker -> IO ()
|
||||
cancelWorker Worker {doWork, action} = do
|
||||
noWorkToDo doWork
|
||||
atomically (tryTakeTMVar action) >>= mapM_ (mapM_ uninterruptibleCancel)
|
||||
atomically (tryTakeTMVar action) >>= mapM_ (mapM_ $ deRefWeak >=> mapM_ killThread)
|
||||
|
||||
waitUntilActive :: AgentClient -> IO ()
|
||||
waitUntilActive AgentClient {active} = unlessM (readTVarIO active) $ atomically $ unlessM (readTVar active) retry
|
||||
|
||||
@@ -41,6 +41,7 @@ module Simplex.Messaging.Agent.Env.SQLite
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent (ThreadId)
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift
|
||||
import Control.Monad.Reader
|
||||
@@ -76,8 +77,9 @@ import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Transport (SMPVersion, TLS, Transport (..))
|
||||
import Simplex.Messaging.Transport.Client (defaultSMPPort)
|
||||
import Simplex.Messaging.Util (allFinally, catchAllErrors, catchAllErrors', tryAllErrors, tryAllErrors')
|
||||
import System.Mem.Weak (Weak)
|
||||
import System.Random (StdGen, newStdGen)
|
||||
import UnliftIO (Async, SomeException)
|
||||
import UnliftIO (SomeException)
|
||||
import UnliftIO.STM
|
||||
|
||||
type AM' a = ReaderT Env IO a
|
||||
@@ -312,7 +314,7 @@ mkInternal = INTERNAL . show
|
||||
data Worker = Worker
|
||||
{ workerId :: Int,
|
||||
doWork :: TMVar (),
|
||||
action :: TMVar (Maybe (Async ())),
|
||||
action :: TMVar (Maybe (Weak ThreadId)),
|
||||
restarts :: TVar RestartCount
|
||||
}
|
||||
|
||||
|
||||
@@ -110,6 +110,7 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
getSndMsgViaRcpt,
|
||||
updateSndMsgRcpt,
|
||||
getPendingQueueMsg,
|
||||
getConnectionsForDelivery,
|
||||
updatePendingMsgRIState,
|
||||
deletePendingMsgs,
|
||||
getExpiredSndMessages,
|
||||
@@ -1008,6 +1009,10 @@ updateSndMsgRcpt db connId sndMsgId MsgReceipt {agentMsgId, msgRcptStatus} =
|
||||
"UPDATE snd_messages SET rcpt_internal_id = ?, rcpt_status = ? WHERE conn_id = ? AND internal_snd_id = ?"
|
||||
(agentMsgId, msgRcptStatus, connId, sndMsgId)
|
||||
|
||||
getConnectionsForDelivery :: DB.Connection -> IO [ConnId]
|
||||
getConnectionsForDelivery db =
|
||||
map fromOnly <$> DB.query_ db "SELECT DISTINCT conn_id FROM snd_message_deliveries WHERE failed = 0"
|
||||
|
||||
getPendingQueueMsg :: DB.Connection -> ConnId -> SndQueue -> IO (Either StoreError (Maybe (Maybe RcvQueue, PendingMsgData)))
|
||||
getPendingQueueMsg db connId SndQueue {dbQueueId} =
|
||||
getWorkItem "message" getMsgId getMsgData markMsgFailed
|
||||
|
||||
@@ -508,6 +508,7 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} baseId ali
|
||||
suspendAgent alice 0
|
||||
closeSQLiteStore store
|
||||
threadDelay 1000000
|
||||
print "before opening the database from another agent"
|
||||
|
||||
-- aliceNtf client doesn't have subscription and is allowed to get notification message
|
||||
withAgent 3 aliceCfg initAgentServers testDB $ \aliceNtf -> runRight_ $ do
|
||||
@@ -515,6 +516,7 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} baseId ali
|
||||
pure ()
|
||||
|
||||
threadDelay 1000000
|
||||
print "after closing the database in another agent"
|
||||
reopenSQLiteStore store
|
||||
foregroundAgent alice
|
||||
threadDelay 500000
|
||||
|
||||
Reference in New Issue
Block a user