From a7762726e2d091e82ef17ca3c0beed08cf3511ca Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Mon, 6 Oct 2025 11:17:18 +0100 Subject: [PATCH] dont store queues in memory --- simplexmq.cabal | 2 +- src/Simplex/Messaging/Agent/Client.hs | 95 ++++++++++++------- .../Messaging/Agent/Store/AgentStore.hs | 15 +++ src/Simplex/Messaging/Agent/TSessionSubs.hs | 62 ++++++------ tests/Test.hs | 1 + 5 files changed, 113 insertions(+), 62 deletions(-) diff --git a/simplexmq.cabal b/simplexmq.cabal index d69392352..82e17b4b0 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -1,7 +1,7 @@ cabal-version: 1.12 name: simplexmq -version: 6.5.0.210 +version: 6.5.0.211 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index dc0f4ca0d..055c86a53 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -202,7 +202,6 @@ import qualified Data.ByteString.Base64 as B64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Composition ((.:)) -import Data.Containers.ListUtils (nubOrd) import Data.Either (isRight, partitionEithers) import Data.Functor (($>)) import Data.Int (Int64) @@ -301,8 +300,11 @@ import UnliftIO.Concurrent (forkIO, mkWeakThreadId) import UnliftIO.Directory (doesFileExist, getTemporaryDirectory, removeFile) import qualified UnliftIO.Exception as E import UnliftIO.STM -#if !defined(dbPostgres) +#if defined(dbPostgres) +import Simplex.Messaging.Agent.Store.AgentStore (getExistingRcvQueueSubs) +#else import qualified Database.SQLite.Simple as SQL +import Simplex.Messaging.Agent.Store.AgentStore (getRcvQueueSubs) #endif type ClientVar msg = SessionVar (Either (AgentErrorType, Maybe UTCTime) (Client msg)) @@ -713,32 +715,31 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess -- we make active subscriptions pending only if the client for tSess was current (in the map) and active, -- because we can have a race condition when a new current client could have already -- made subscriptions active, and the old client would be processing diconnection later. - removeClientAndSubs :: IO ([RcvQueueSub], [ConnId]) + removeClientAndSubs :: IO [(SMP.RecipientId, ConnId)] removeClientAndSubs = atomically $ do removeSessVar v tSess smpClients - ifM (readTVar active) removeSubs (pure ([], [])) + ifM (readTVar active) removeSubs (pure []) where sessId = sessionId $ thParams client removeSubs = do mode <- getSessionMode c subs <- SS.setSubsPending mode tSess sessId $ currentSubs c - let qs = M.elems subs - cs = nubOrd $ map qConnId qs -- this removes proxied relays that this client created sessions to destSrvs <- M.keys <$> readTVar prs forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, cId) smpProxiedRelays - pure (qs, cs) + pure subs - serverDown :: ([RcvQueueSub], [ConnId]) -> IO () - serverDown (qs, conns) = whenM (readTVarIO active) $ do + serverDown :: [(SMP.RecipientId, ConnId)] -> IO () + serverDown subs = whenM (readTVarIO active) $ do notifySub c "" $ hostEvent' DISCONNECT client - unless (null conns) $ notifySub c "" $ DOWN srv conns - unless (null qs) $ do - releaseGetLocksIO c qs + unless (null subs) $ do + let (rIds, cIds) = unzip subs + notifySub c "" $ DOWN srv cIds + releaseGetLocksIO c srv rIds mode <- getSessionModeIO c let resubscribe | (mode == TSMEntity) == isJust cId = resubscribeSMPSession c tSess - | otherwise = void $ subscribeQueues c qs True + | otherwise = resubscribeQueues c tSess subs `catchAllErrors'` (notifySub c "" . ERR . INTERNAL . show) runReaderT resubscribe env resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' () @@ -758,10 +759,10 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do ri <- asks $ reconnectInterval . config withRetryForeground ri isForeground (isNetworkOnline c) $ \_ loop -> do pending <- atomically $ SS.getPendingSubs tSess $ currentSubs c - unless (M.null pending) $ do + unless (null pending) $ do liftIO $ waitUntilForeground c liftIO $ waitForUserNetwork c - handleNotify $ resubscribeSessQueues c tSess $ M.elems pending + resubscribeSessQueues c tSess pending `catchAllErrors'` (notifySub c "" . ERR . INTERNAL . show) loop isForeground = (ASForeground ==) <$> readTVar (agentState c) cleanup :: SessionVar (Async ()) -> STM () @@ -770,8 +771,6 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do -- Not waiting may result in terminated worker remaining in the map. whenM (isEmptyTMVar $ sessionVar v) retry removeSessVar v tSess smpSubWorkers - handleNotify :: AM' () -> AM' () - handleNotify = E.handleAny $ notifySub c "" . ERR . INTERNAL . show notifySub :: forall e m. (AEntityI e, MonadIO m) => AgentClient -> ConnId -> AEvent e -> m () notifySub c connId cmd = liftIO $ nonBlockingWriteTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd) @@ -1529,13 +1528,42 @@ checkQueues c = fmap partitionEithers . mapM checkQueue prohibited <- liftIO $ hasGetLock c rq pure $ if prohibited then Left (rq, CMD PROHIBITED "checkQueues") else Right rq +checkQueues_ :: AgentClient -> SMPTransportSession -> [(SMP.RecipientId, ConnId)] -> AM' ([(ConnId, AgentErrorType)], [(SMP.RecipientId, ConnId)]) +checkQueues_ c (_, srv, _) = fmap partitionEithers . mapM checkQueue + where + checkQueue q@(rId, cId) = do + prohibited <- liftIO $ hasGetLock_ c srv rId + pure $ if prohibited then Left (cId, CMD PROHIBITED "checkQueues") else Right q + +resubscribeQueues :: AgentClient -> SMPTransportSession -> [(SMP.RecipientId, ConnId)] -> AM () +resubscribeQueues c tSess subs = do + qs <- getQueues c tSess subs + void $ lift $ subscribeQueues c qs True + -- This function expects that all queues belong to one transport session, -- and that they are already added to pending subscriptions. -resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [RcvQueueSub] -> AM' () -resubscribeSessQueues c tSess qs = do - (errs, qs_) <- checkQueues c qs - forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c (tSess, qs') True - unless (null errs) $ notifySub c "" $ ERRS $ map (first qConnId) errs +resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [(SMP.RecipientId, ConnId)] -> AM () +resubscribeSessQueues c tSess subs = do + (errs, subs') <- lift $ checkQueues_ c tSess subs + qs_ <- getQueues c tSess subs' + forM_ (L.nonEmpty qs_) $ \qs' -> void $ lift $ subscribeSessQueues_ c (tSess, qs') True + unless (null errs) $ notifySub c "" $ ERRS errs + +getQueues :: AgentClient -> SMPTransportSession -> [(SMP.RecipientId, ConnId)] -> AM [RcvQueueSub] +getQueues c (_, srv, _) subs = do +#if defined(dbPostgres) + let rIds = map fst subs + qs <- M.fromList . map (\rq -> (queueId rq, rq)) <$> withStore' c (\db -> getExistingRcvQueueSubs db srv rIds) + let addQueue (rId, cId) (es, rqs) = maybe (((cId, CONN NOT_FOUND "") : es, rqs)) (\rq -> (es, rq : rqs)) $ M.lookup rId qs + (errs, qs') = foldr addQueue ([], []) subs +#else + let (rIds, connIds) = unzip subs + qs <- withStore' c $ \db -> getRcvQueueSubs db srv rIds + let (errs, qs') = partitionEithers $ zipWith (\cId -> first ((cId,) . storeError)) connIds qs +#endif + -- TODO [subs] remove missing from pending, add to removed + unless (null errs) $ notifySub c "" $ ERRS errs + pure qs' subscribeSessQueues_ :: AgentClient -> (SMPTransportSession, NonEmpty RcvQueueSub) -> Bool -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool) subscribeSessQueues_ c qs withEvents = sendClientBatch_ "SUB" False subscribeQueues_ c NRMBackground qs @@ -1547,7 +1575,7 @@ subscribeSessQueues_ c qs withEvents = sendClientBatch_ "SUB" False subscribeQue rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs' cs_ <- if withEvents - then Just . S.fromList . map qConnId . M.elems <$> atomically (SS.getActiveSubs tSess $ currentSubs c) + then Just <$> atomically (SS.getActiveConns tSess $ currentSubs c) else pure Nothing active <- atomically $ @@ -1818,20 +1846,23 @@ sendAck c rq@RcvQueue {rcvId, rcvPrivateKey} msgId = ackSMPMessage smp rcvPrivateKey rcvId msgId hasGetLock :: SomeRcvQueue q => AgentClient -> q -> IO Bool -hasGetLock c rq = - TM.memberIO (qServer rq, queueId rq) $ getMsgLocks c +hasGetLock c rq = TM.memberIO (qServer rq, queueId rq) $ getMsgLocks c {-# INLINE hasGetLock #-} -releaseGetLock :: SomeRcvQueue q => AgentClient -> q -> STM () -releaseGetLock c rq = - TM.lookup (qServer rq, queueId rq) (getMsgLocks c) >>= mapM_ (`tryPutTMVar` ()) +hasGetLock_ :: AgentClient -> SMPServer -> SMP.RecipientId -> IO Bool +hasGetLock_ c srv rId = TM.memberIO (srv, rId) $ getMsgLocks c +{-# INLINE hasGetLock_ #-} + +releaseGetLock :: AgentClient -> RcvQueue -> STM () +releaseGetLock c RcvQueue {server, rcvId} = + TM.lookup (server, rcvId) (getMsgLocks c) >>= mapM_ (`tryPutTMVar` ()) {-# INLINE releaseGetLock #-} -releaseGetLocksIO :: SomeRcvQueue q => AgentClient -> [q] -> IO () -releaseGetLocksIO c rqs = do +releaseGetLocksIO :: AgentClient -> SMPServer -> [SMP.RecipientId] -> IO () +releaseGetLocksIO c srv rIds = do locks <- readTVarIO $ getMsgLocks c - forM_ rqs $ \rq -> - forM_ (M.lookup ((qServer rq, queueId rq)) locks) $ \lock -> + forM_ rIds $ \rId -> + forM_ (M.lookup (srv, rId) locks) $ \lock -> atomically $ tryPutTMVar lock () suspendQueue :: AgentClient -> NetworkRequestMode -> RcvQueue -> AM () diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index b24ceaf74..d4b0da61a 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -45,6 +45,11 @@ module Simplex.Messaging.Agent.Store.AgentStore getConnSubs, getDeletedConns, getConnsData, +#if defined(dbPostgres) + getExistingRcvQueueSubs, +#else + getRcvQueueSubs, +#endif setConnDeleted, setConnUserId, setConnAgentVersion, @@ -2072,6 +2077,11 @@ getConnSubs :: DB.Connection -> [ConnId] -> IO [Either StoreError SomeConnSub] getConnSubs = getAnyConns_ getRcvQueueSubsByConnIds_ getSndQueuesByConnIds_ False {-# INLINE getConnSubs #-} +getExistingRcvQueueSubs :: DB.Connection -> SMPServer -> [RecipientId] -> IO [RcvQueueSub] +getExistingRcvQueueSubs db srv rIds = + map toRcvQueueSub <$> + DB.query db (rcvQueueSubQuery <> " WHERE q.host = ? AND q.port = ? AND q.rcv_id IN ?") (host srv, port srv, In rIds) + getAnyConns_ :: forall rq sq. (DB.Connection -> [ConnId] -> IO (Map ConnId (NonEmpty rq))) -> @@ -2137,6 +2147,11 @@ getConnSubs :: DB.Connection -> [ConnId] -> IO [Either StoreError SomeConnSub] getConnSubs = getAnyConns_ getRcvQueueSubsByConnId_ getSndQueuesByConnId_ False {-# INLINE getConnSubs #-} +getRcvQueueSubs :: DB.Connection -> SMPServer -> [RecipientId] -> IO [Either StoreError RcvQueueSub] +getRcvQueueSubs db srv = + mapM $ \rId -> firstRow toRcvQueueSub SEConnNotFound $ + DB.query db (rcvQueueSubQuery <> " WHERE q.host = ? AND q.port = ? AND q.rcv_id = ?") (host srv, port srv, rId) + getAnyConns_ :: (DB.Connection -> ConnId -> IO (Maybe (NonEmpty rq))) -> (DB.Connection -> ConnId -> IO (Maybe (NonEmpty sq))) -> diff --git a/src/Simplex/Messaging/Agent/TSessionSubs.hs b/src/Simplex/Messaging/Agent/TSessionSubs.hs index 8c0cd05de..c73088187 100644 --- a/src/Simplex/Messaging/Agent/TSessionSubs.hs +++ b/src/Simplex/Messaging/Agent/TSessionSubs.hs @@ -1,5 +1,6 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} module Simplex.Messaging.Agent.TSessionSubs @@ -18,7 +19,7 @@ module Simplex.Messaging.Agent.TSessionSubs batchDeleteSubs, hasPendingSubs, getPendingSubs, - getActiveSubs, + getActiveConns, setSubsPending, foldSessionSubs, mapSubs, @@ -31,7 +32,7 @@ import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Maybe (isJust) import qualified Data.Set as S -import Simplex.Messaging.Agent.Protocol (SMPQueue (..)) +import Simplex.Messaging.Agent.Protocol (ConnId, SMPQueue (..)) import Simplex.Messaging.Agent.Store (RcvQueueSub (..), SomeRcvQueue) import Simplex.Messaging.Client (SMPTransportSession, TransportSessionMode (..)) import Simplex.Messaging.Protocol (RecipientId) @@ -48,8 +49,8 @@ data TSessionSubs = TSessionSubs data SessSubs = SessSubs { subsSessId :: TVar (Maybe SessionId), - activeSubs :: TMap RecipientId RcvQueueSub, - pendingSubs :: TMap RecipientId RcvQueueSub + activeSubs :: TMap RecipientId ConnId, + pendingSubs :: TMap RecipientId ConnId } emptyIO :: IO TSessionSubs @@ -80,12 +81,15 @@ hasPendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool hasPendingSub = hasQueue_ pendingSubs {-# INLINE hasPendingSub #-} -hasQueue_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool +hasQueue_ :: (SessSubs -> TMap RecipientId ConnId) -> RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool hasQueue_ subs rId tSess ss = isJust <$> (lookupSubs tSess ss $>>= TM.lookup rId . subs) -{-# INLINE hasQueue_ #-} addPendingSub :: RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM () -addPendingSub rq tSess ss = getSessSubs tSess ss >>= TM.insert (rcvId rq) rq . pendingSubs +addPendingSub RcvQueueSub {rcvId, connId} = addPendingSub_ rcvId connId +{-# INLINE addPendingSub #-} + +addPendingSub_ :: RecipientId -> ConnId -> SMPTransportSession -> TSessionSubs -> STM () +addPendingSub_ rId cId tSess ss = getSessSubs tSess ss >>= TM.insert rId cId . pendingSubs setSessionId :: SessionId -> SMPTransportSession -> TSessionSubs -> STM () setSessionId sessId tSess ss = do @@ -95,20 +99,19 @@ setSessionId sessId tSess ss = do Just sessId' -> unless (sessId == sessId') $ void $ setSubsPending_ s $ Just sessId addActiveSub :: SessionId -> RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM () -addActiveSub sessId rq tSess ss = do +addActiveSub sessId RcvQueueSub {rcvId, connId} tSess ss = do s <- getSessSubs tSess ss sessId' <- readTVar $ subsSessId s - let rId = rcvId rq if Just sessId == sessId' then do - TM.insert rId rq $ activeSubs s - TM.delete rId $ pendingSubs s - else TM.insert rId rq $ pendingSubs s + TM.insert rcvId connId $ activeSubs s + TM.delete rcvId $ pendingSubs s + else TM.insert rcvId connId $ pendingSubs s batchAddPendingSubs :: [RcvQueueSub] -> SMPTransportSession -> TSessionSubs -> STM () batchAddPendingSubs rqs tSess ss = do s <- getSessSubs tSess ss - modifyTVar' (pendingSubs s) $ M.union $ M.fromList $ map (\rq -> (rcvId rq, rq)) rqs + modifyTVar' (pendingSubs s) $ M.union $ M.fromList $ map (\rq -> (rcvId rq, connId rq)) rqs deletePendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM () deletePendingSub rId tSess = lookupSubs tSess >=> mapM_ (TM.delete rId . pendingSubs) @@ -125,18 +128,18 @@ batchDeleteSubs rqs tSess = lookupSubs tSess >=> mapM_ (\s -> delete (activeSubs hasPendingSubs :: SMPTransportSession -> TSessionSubs -> STM Bool hasPendingSubs tSess = lookupSubs tSess >=> maybe (pure False) (fmap (not . null) . readTVar . pendingSubs) -getPendingSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) -getPendingSubs = getSubs_ pendingSubs +getPendingSubs :: SMPTransportSession -> TSessionSubs -> STM [(RecipientId, ConnId)] +getPendingSubs tSess = fmap M.assocs . getSubs_ pendingSubs tSess {-# INLINE getPendingSubs #-} -getActiveSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) -getActiveSubs = getSubs_ activeSubs -{-# INLINE getActiveSubs #-} +getActiveConns :: SMPTransportSession -> TSessionSubs -> STM (S.Set ConnId) +getActiveConns tSess = fmap (S.fromList . M.elems) . getSubs_ activeSubs tSess +{-# INLINE getActiveConns #-} -getSubs_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) +getSubs_ :: (SessSubs -> TMap RecipientId ConnId) -> SMPTransportSession -> TSessionSubs -> STM (Map RecipientId ConnId) getSubs_ subs tSess = lookupSubs tSess >=> maybe (pure M.empty) (readTVar . subs) -setSubsPending :: TransportSessionMode -> SMPTransportSession -> SessionId -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) +setSubsPending :: TransportSessionMode -> SMPTransportSession -> SessionId -> TSessionSubs -> STM [(RecipientId, ConnId)] setSubsPending mode tSess@(uId, srv, connId_) sessId tss@(TSessionSubs ss) | entitySession == isJust connId_ = TM.lookup tSess ss >>= withSessSubs (`setSubsPending_` Nothing) @@ -146,30 +149,31 @@ setSubsPending mode tSess@(uId, srv, connId_) sessId tss@(TSessionSubs ss) entitySession = mode == TSMEntity sessEntId = if entitySession then Just else const Nothing withSessSubs run = \case - Nothing -> pure M.empty + Nothing -> pure [] Just s -> do sessId' <- readTVar $ subsSessId s - if Just sessId == sessId' then run s else pure M.empty + if Just sessId == sessId' then run s else pure [] setPendingChangeMode s = do subs <- M.union <$> readTVar (activeSubs s) <*> readTVar (pendingSubs s) - unless (null subs) $ - forM_ subs $ \rq -> addPendingSub rq (uId, srv, sessEntId (connId rq)) tss - pure subs + let subs' = M.assocs subs + unless (null subs') $ + forM_ subs' $ \(rId, cId) -> addPendingSub_ rId cId (uId, srv, sessEntId cId) tss + pure subs' -setSubsPending_ :: SessSubs -> Maybe SessionId -> STM (Map RecipientId RcvQueueSub) +setSubsPending_ :: SessSubs -> Maybe SessionId -> STM [(RecipientId, ConnId)] setSubsPending_ s sessId_ = do writeTVar (subsSessId s) sessId_ let as = activeSubs s subs <- readTVar as - unless (null subs) $ do + unless (M.null subs) $ do writeTVar as M.empty modifyTVar' (pendingSubs s) $ M.union subs - pure subs + pure $ M.assocs subs foldSessionSubs :: (a -> (SMPTransportSession, SessSubs) -> IO a) -> a -> TSessionSubs -> IO a foldSessionSubs f a = foldM f a . M.assocs <=< readTVarIO . sessionSubs -mapSubs :: (Map RecipientId RcvQueueSub -> a) -> SessSubs -> IO (a, a) +mapSubs :: (Map RecipientId ConnId -> a) -> SessSubs -> IO (a, a) mapSubs f s = do active <- readTVarIO $ activeSubs s pending <- readTVarIO $ pendingSubs s diff --git a/tests/Test.hs b/tests/Test.hs index 4fb4d5702..94867f00e 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -140,6 +140,7 @@ main = do describe "SMP proxy, postgres-only message store" $ before (pure $ ASType SQSPostgres SMSPostgres) smpProxyTests #endif + -- TODO [subs] change to memory store describe "SMP client agent, jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal) describe "SMP proxy, jornal message store" $ before (pure $ ASType SQSMemory SMSJournal) smpProxyTests