dont store queues in memory

This commit is contained in:
Evgeny Poberezkin
2025-10-06 11:17:18 +01:00
parent 2ff25f5321
commit a7762726e2
5 changed files with 113 additions and 62 deletions

View File

@@ -1,7 +1,7 @@
cabal-version: 1.12
name: simplexmq
version: 6.5.0.210
version: 6.5.0.211
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and

View File

@@ -202,7 +202,6 @@ import qualified Data.ByteString.Base64 as B64
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Composition ((.:))
import Data.Containers.ListUtils (nubOrd)
import Data.Either (isRight, partitionEithers)
import Data.Functor (($>))
import Data.Int (Int64)
@@ -301,8 +300,11 @@ import UnliftIO.Concurrent (forkIO, mkWeakThreadId)
import UnliftIO.Directory (doesFileExist, getTemporaryDirectory, removeFile)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
#if !defined(dbPostgres)
#if defined(dbPostgres)
import Simplex.Messaging.Agent.Store.AgentStore (getExistingRcvQueueSubs)
#else
import qualified Database.SQLite.Simple as SQL
import Simplex.Messaging.Agent.Store.AgentStore (getRcvQueueSubs)
#endif
type ClientVar msg = SessionVar (Either (AgentErrorType, Maybe UTCTime) (Client msg))
@@ -713,32 +715,31 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess
-- we make active subscriptions pending only if the client for tSess was current (in the map) and active,
-- because we can have a race condition when a new current client could have already
-- made subscriptions active, and the old client would be processing diconnection later.
removeClientAndSubs :: IO ([RcvQueueSub], [ConnId])
removeClientAndSubs :: IO [(SMP.RecipientId, ConnId)]
removeClientAndSubs = atomically $ do
removeSessVar v tSess smpClients
ifM (readTVar active) removeSubs (pure ([], []))
ifM (readTVar active) removeSubs (pure [])
where
sessId = sessionId $ thParams client
removeSubs = do
mode <- getSessionMode c
subs <- SS.setSubsPending mode tSess sessId $ currentSubs c
let qs = M.elems subs
cs = nubOrd $ map qConnId qs
-- this removes proxied relays that this client created sessions to
destSrvs <- M.keys <$> readTVar prs
forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, cId) smpProxiedRelays
pure (qs, cs)
pure subs
serverDown :: ([RcvQueueSub], [ConnId]) -> IO ()
serverDown (qs, conns) = whenM (readTVarIO active) $ do
serverDown :: [(SMP.RecipientId, ConnId)] -> IO ()
serverDown subs = whenM (readTVarIO active) $ do
notifySub c "" $ hostEvent' DISCONNECT client
unless (null conns) $ notifySub c "" $ DOWN srv conns
unless (null qs) $ do
releaseGetLocksIO c qs
unless (null subs) $ do
let (rIds, cIds) = unzip subs
notifySub c "" $ DOWN srv cIds
releaseGetLocksIO c srv rIds
mode <- getSessionModeIO c
let resubscribe
| (mode == TSMEntity) == isJust cId = resubscribeSMPSession c tSess
| otherwise = void $ subscribeQueues c qs True
| otherwise = resubscribeQueues c tSess subs `catchAllErrors'` (notifySub c "" . ERR . INTERNAL . show)
runReaderT resubscribe env
resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' ()
@@ -758,10 +759,10 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
ri <- asks $ reconnectInterval . config
withRetryForeground ri isForeground (isNetworkOnline c) $ \_ loop -> do
pending <- atomically $ SS.getPendingSubs tSess $ currentSubs c
unless (M.null pending) $ do
unless (null pending) $ do
liftIO $ waitUntilForeground c
liftIO $ waitForUserNetwork c
handleNotify $ resubscribeSessQueues c tSess $ M.elems pending
resubscribeSessQueues c tSess pending `catchAllErrors'` (notifySub c "" . ERR . INTERNAL . show)
loop
isForeground = (ASForeground ==) <$> readTVar (agentState c)
cleanup :: SessionVar (Async ()) -> STM ()
@@ -770,8 +771,6 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
-- Not waiting may result in terminated worker remaining in the map.
whenM (isEmptyTMVar $ sessionVar v) retry
removeSessVar v tSess smpSubWorkers
handleNotify :: AM' () -> AM' ()
handleNotify = E.handleAny $ notifySub c "" . ERR . INTERNAL . show
notifySub :: forall e m. (AEntityI e, MonadIO m) => AgentClient -> ConnId -> AEvent e -> m ()
notifySub c connId cmd = liftIO $ nonBlockingWriteTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd)
@@ -1529,13 +1528,42 @@ checkQueues c = fmap partitionEithers . mapM checkQueue
prohibited <- liftIO $ hasGetLock c rq
pure $ if prohibited then Left (rq, CMD PROHIBITED "checkQueues") else Right rq
checkQueues_ :: AgentClient -> SMPTransportSession -> [(SMP.RecipientId, ConnId)] -> AM' ([(ConnId, AgentErrorType)], [(SMP.RecipientId, ConnId)])
checkQueues_ c (_, srv, _) = fmap partitionEithers . mapM checkQueue
where
checkQueue q@(rId, cId) = do
prohibited <- liftIO $ hasGetLock_ c srv rId
pure $ if prohibited then Left (cId, CMD PROHIBITED "checkQueues") else Right q
resubscribeQueues :: AgentClient -> SMPTransportSession -> [(SMP.RecipientId, ConnId)] -> AM ()
resubscribeQueues c tSess subs = do
qs <- getQueues c tSess subs
void $ lift $ subscribeQueues c qs True
-- This function expects that all queues belong to one transport session,
-- and that they are already added to pending subscriptions.
resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [RcvQueueSub] -> AM' ()
resubscribeSessQueues c tSess qs = do
(errs, qs_) <- checkQueues c qs
forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c (tSess, qs') True
unless (null errs) $ notifySub c "" $ ERRS $ map (first qConnId) errs
resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [(SMP.RecipientId, ConnId)] -> AM ()
resubscribeSessQueues c tSess subs = do
(errs, subs') <- lift $ checkQueues_ c tSess subs
qs_ <- getQueues c tSess subs'
forM_ (L.nonEmpty qs_) $ \qs' -> void $ lift $ subscribeSessQueues_ c (tSess, qs') True
unless (null errs) $ notifySub c "" $ ERRS errs
getQueues :: AgentClient -> SMPTransportSession -> [(SMP.RecipientId, ConnId)] -> AM [RcvQueueSub]
getQueues c (_, srv, _) subs = do
#if defined(dbPostgres)
let rIds = map fst subs
qs <- M.fromList . map (\rq -> (queueId rq, rq)) <$> withStore' c (\db -> getExistingRcvQueueSubs db srv rIds)
let addQueue (rId, cId) (es, rqs) = maybe (((cId, CONN NOT_FOUND "") : es, rqs)) (\rq -> (es, rq : rqs)) $ M.lookup rId qs
(errs, qs') = foldr addQueue ([], []) subs
#else
let (rIds, connIds) = unzip subs
qs <- withStore' c $ \db -> getRcvQueueSubs db srv rIds
let (errs, qs') = partitionEithers $ zipWith (\cId -> first ((cId,) . storeError)) connIds qs
#endif
-- TODO [subs] remove missing from pending, add to removed
unless (null errs) $ notifySub c "" $ ERRS errs
pure qs'
subscribeSessQueues_ :: AgentClient -> (SMPTransportSession, NonEmpty RcvQueueSub) -> Bool -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool)
subscribeSessQueues_ c qs withEvents = sendClientBatch_ "SUB" False subscribeQueues_ c NRMBackground qs
@@ -1547,7 +1575,7 @@ subscribeSessQueues_ c qs withEvents = sendClientBatch_ "SUB" False subscribeQue
rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs'
cs_ <-
if withEvents
then Just . S.fromList . map qConnId . M.elems <$> atomically (SS.getActiveSubs tSess $ currentSubs c)
then Just <$> atomically (SS.getActiveConns tSess $ currentSubs c)
else pure Nothing
active <-
atomically $
@@ -1818,20 +1846,23 @@ sendAck c rq@RcvQueue {rcvId, rcvPrivateKey} msgId =
ackSMPMessage smp rcvPrivateKey rcvId msgId
hasGetLock :: SomeRcvQueue q => AgentClient -> q -> IO Bool
hasGetLock c rq =
TM.memberIO (qServer rq, queueId rq) $ getMsgLocks c
hasGetLock c rq = TM.memberIO (qServer rq, queueId rq) $ getMsgLocks c
{-# INLINE hasGetLock #-}
releaseGetLock :: SomeRcvQueue q => AgentClient -> q -> STM ()
releaseGetLock c rq =
TM.lookup (qServer rq, queueId rq) (getMsgLocks c) >>= mapM_ (`tryPutTMVar` ())
hasGetLock_ :: AgentClient -> SMPServer -> SMP.RecipientId -> IO Bool
hasGetLock_ c srv rId = TM.memberIO (srv, rId) $ getMsgLocks c
{-# INLINE hasGetLock_ #-}
releaseGetLock :: AgentClient -> RcvQueue -> STM ()
releaseGetLock c RcvQueue {server, rcvId} =
TM.lookup (server, rcvId) (getMsgLocks c) >>= mapM_ (`tryPutTMVar` ())
{-# INLINE releaseGetLock #-}
releaseGetLocksIO :: SomeRcvQueue q => AgentClient -> [q] -> IO ()
releaseGetLocksIO c rqs = do
releaseGetLocksIO :: AgentClient -> SMPServer -> [SMP.RecipientId] -> IO ()
releaseGetLocksIO c srv rIds = do
locks <- readTVarIO $ getMsgLocks c
forM_ rqs $ \rq ->
forM_ (M.lookup ((qServer rq, queueId rq)) locks) $ \lock ->
forM_ rIds $ \rId ->
forM_ (M.lookup (srv, rId) locks) $ \lock ->
atomically $ tryPutTMVar lock ()
suspendQueue :: AgentClient -> NetworkRequestMode -> RcvQueue -> AM ()

View File

@@ -45,6 +45,11 @@ module Simplex.Messaging.Agent.Store.AgentStore
getConnSubs,
getDeletedConns,
getConnsData,
#if defined(dbPostgres)
getExistingRcvQueueSubs,
#else
getRcvQueueSubs,
#endif
setConnDeleted,
setConnUserId,
setConnAgentVersion,
@@ -2072,6 +2077,11 @@ getConnSubs :: DB.Connection -> [ConnId] -> IO [Either StoreError SomeConnSub]
getConnSubs = getAnyConns_ getRcvQueueSubsByConnIds_ getSndQueuesByConnIds_ False
{-# INLINE getConnSubs #-}
getExistingRcvQueueSubs :: DB.Connection -> SMPServer -> [RecipientId] -> IO [RcvQueueSub]
getExistingRcvQueueSubs db srv rIds =
map toRcvQueueSub <$>
DB.query db (rcvQueueSubQuery <> " WHERE q.host = ? AND q.port = ? AND q.rcv_id IN ?") (host srv, port srv, In rIds)
getAnyConns_ ::
forall rq sq.
(DB.Connection -> [ConnId] -> IO (Map ConnId (NonEmpty rq))) ->
@@ -2137,6 +2147,11 @@ getConnSubs :: DB.Connection -> [ConnId] -> IO [Either StoreError SomeConnSub]
getConnSubs = getAnyConns_ getRcvQueueSubsByConnId_ getSndQueuesByConnId_ False
{-# INLINE getConnSubs #-}
getRcvQueueSubs :: DB.Connection -> SMPServer -> [RecipientId] -> IO [Either StoreError RcvQueueSub]
getRcvQueueSubs db srv =
mapM $ \rId -> firstRow toRcvQueueSub SEConnNotFound $
DB.query db (rcvQueueSubQuery <> " WHERE q.host = ? AND q.port = ? AND q.rcv_id = ?") (host srv, port srv, rId)
getAnyConns_ ::
(DB.Connection -> ConnId -> IO (Maybe (NonEmpty rq))) ->
(DB.Connection -> ConnId -> IO (Maybe (NonEmpty sq))) ->

View File

@@ -1,5 +1,6 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
module Simplex.Messaging.Agent.TSessionSubs
@@ -18,7 +19,7 @@ module Simplex.Messaging.Agent.TSessionSubs
batchDeleteSubs,
hasPendingSubs,
getPendingSubs,
getActiveSubs,
getActiveConns,
setSubsPending,
foldSessionSubs,
mapSubs,
@@ -31,7 +32,7 @@ import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (isJust)
import qualified Data.Set as S
import Simplex.Messaging.Agent.Protocol (SMPQueue (..))
import Simplex.Messaging.Agent.Protocol (ConnId, SMPQueue (..))
import Simplex.Messaging.Agent.Store (RcvQueueSub (..), SomeRcvQueue)
import Simplex.Messaging.Client (SMPTransportSession, TransportSessionMode (..))
import Simplex.Messaging.Protocol (RecipientId)
@@ -48,8 +49,8 @@ data TSessionSubs = TSessionSubs
data SessSubs = SessSubs
{ subsSessId :: TVar (Maybe SessionId),
activeSubs :: TMap RecipientId RcvQueueSub,
pendingSubs :: TMap RecipientId RcvQueueSub
activeSubs :: TMap RecipientId ConnId,
pendingSubs :: TMap RecipientId ConnId
}
emptyIO :: IO TSessionSubs
@@ -80,12 +81,15 @@ hasPendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool
hasPendingSub = hasQueue_ pendingSubs
{-# INLINE hasPendingSub #-}
hasQueue_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool
hasQueue_ :: (SessSubs -> TMap RecipientId ConnId) -> RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool
hasQueue_ subs rId tSess ss = isJust <$> (lookupSubs tSess ss $>>= TM.lookup rId . subs)
{-# INLINE hasQueue_ #-}
addPendingSub :: RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM ()
addPendingSub rq tSess ss = getSessSubs tSess ss >>= TM.insert (rcvId rq) rq . pendingSubs
addPendingSub RcvQueueSub {rcvId, connId} = addPendingSub_ rcvId connId
{-# INLINE addPendingSub #-}
addPendingSub_ :: RecipientId -> ConnId -> SMPTransportSession -> TSessionSubs -> STM ()
addPendingSub_ rId cId tSess ss = getSessSubs tSess ss >>= TM.insert rId cId . pendingSubs
setSessionId :: SessionId -> SMPTransportSession -> TSessionSubs -> STM ()
setSessionId sessId tSess ss = do
@@ -95,20 +99,19 @@ setSessionId sessId tSess ss = do
Just sessId' -> unless (sessId == sessId') $ void $ setSubsPending_ s $ Just sessId
addActiveSub :: SessionId -> RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM ()
addActiveSub sessId rq tSess ss = do
addActiveSub sessId RcvQueueSub {rcvId, connId} tSess ss = do
s <- getSessSubs tSess ss
sessId' <- readTVar $ subsSessId s
let rId = rcvId rq
if Just sessId == sessId'
then do
TM.insert rId rq $ activeSubs s
TM.delete rId $ pendingSubs s
else TM.insert rId rq $ pendingSubs s
TM.insert rcvId connId $ activeSubs s
TM.delete rcvId $ pendingSubs s
else TM.insert rcvId connId $ pendingSubs s
batchAddPendingSubs :: [RcvQueueSub] -> SMPTransportSession -> TSessionSubs -> STM ()
batchAddPendingSubs rqs tSess ss = do
s <- getSessSubs tSess ss
modifyTVar' (pendingSubs s) $ M.union $ M.fromList $ map (\rq -> (rcvId rq, rq)) rqs
modifyTVar' (pendingSubs s) $ M.union $ M.fromList $ map (\rq -> (rcvId rq, connId rq)) rqs
deletePendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM ()
deletePendingSub rId tSess = lookupSubs tSess >=> mapM_ (TM.delete rId . pendingSubs)
@@ -125,18 +128,18 @@ batchDeleteSubs rqs tSess = lookupSubs tSess >=> mapM_ (\s -> delete (activeSubs
hasPendingSubs :: SMPTransportSession -> TSessionSubs -> STM Bool
hasPendingSubs tSess = lookupSubs tSess >=> maybe (pure False) (fmap (not . null) . readTVar . pendingSubs)
getPendingSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
getPendingSubs = getSubs_ pendingSubs
getPendingSubs :: SMPTransportSession -> TSessionSubs -> STM [(RecipientId, ConnId)]
getPendingSubs tSess = fmap M.assocs . getSubs_ pendingSubs tSess
{-# INLINE getPendingSubs #-}
getActiveSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
getActiveSubs = getSubs_ activeSubs
{-# INLINE getActiveSubs #-}
getActiveConns :: SMPTransportSession -> TSessionSubs -> STM (S.Set ConnId)
getActiveConns tSess = fmap (S.fromList . M.elems) . getSubs_ activeSubs tSess
{-# INLINE getActiveConns #-}
getSubs_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
getSubs_ :: (SessSubs -> TMap RecipientId ConnId) -> SMPTransportSession -> TSessionSubs -> STM (Map RecipientId ConnId)
getSubs_ subs tSess = lookupSubs tSess >=> maybe (pure M.empty) (readTVar . subs)
setSubsPending :: TransportSessionMode -> SMPTransportSession -> SessionId -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
setSubsPending :: TransportSessionMode -> SMPTransportSession -> SessionId -> TSessionSubs -> STM [(RecipientId, ConnId)]
setSubsPending mode tSess@(uId, srv, connId_) sessId tss@(TSessionSubs ss)
| entitySession == isJust connId_ =
TM.lookup tSess ss >>= withSessSubs (`setSubsPending_` Nothing)
@@ -146,30 +149,31 @@ setSubsPending mode tSess@(uId, srv, connId_) sessId tss@(TSessionSubs ss)
entitySession = mode == TSMEntity
sessEntId = if entitySession then Just else const Nothing
withSessSubs run = \case
Nothing -> pure M.empty
Nothing -> pure []
Just s -> do
sessId' <- readTVar $ subsSessId s
if Just sessId == sessId' then run s else pure M.empty
if Just sessId == sessId' then run s else pure []
setPendingChangeMode s = do
subs <- M.union <$> readTVar (activeSubs s) <*> readTVar (pendingSubs s)
unless (null subs) $
forM_ subs $ \rq -> addPendingSub rq (uId, srv, sessEntId (connId rq)) tss
pure subs
let subs' = M.assocs subs
unless (null subs') $
forM_ subs' $ \(rId, cId) -> addPendingSub_ rId cId (uId, srv, sessEntId cId) tss
pure subs'
setSubsPending_ :: SessSubs -> Maybe SessionId -> STM (Map RecipientId RcvQueueSub)
setSubsPending_ :: SessSubs -> Maybe SessionId -> STM [(RecipientId, ConnId)]
setSubsPending_ s sessId_ = do
writeTVar (subsSessId s) sessId_
let as = activeSubs s
subs <- readTVar as
unless (null subs) $ do
unless (M.null subs) $ do
writeTVar as M.empty
modifyTVar' (pendingSubs s) $ M.union subs
pure subs
pure $ M.assocs subs
foldSessionSubs :: (a -> (SMPTransportSession, SessSubs) -> IO a) -> a -> TSessionSubs -> IO a
foldSessionSubs f a = foldM f a . M.assocs <=< readTVarIO . sessionSubs
mapSubs :: (Map RecipientId RcvQueueSub -> a) -> SessSubs -> IO (a, a)
mapSubs :: (Map RecipientId ConnId -> a) -> SessSubs -> IO (a, a)
mapSubs f s = do
active <- readTVarIO $ activeSubs s
pending <- readTVarIO $ pendingSubs s

View File

@@ -140,6 +140,7 @@ main = do
describe "SMP proxy, postgres-only message store" $
before (pure $ ASType SQSPostgres SMSPostgres) smpProxyTests
#endif
-- TODO [subs] change to memory store
describe "SMP client agent, jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal)
describe "SMP proxy, jornal message store" $
before (pure $ ASType SQSMemory SMSJournal) smpProxyTests