From 67f5f2ea8f85d596f73d873e6395bfcd05369e4c Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Thu, 22 May 2025 10:51:40 +0100 Subject: [PATCH] hide clients IntMap --- .../Messaging/Notifications/Server/Env.hs | 1 - .../Messaging/Notifications/Server/Main.hs | 1 - src/Simplex/Messaging/Server.hs | 74 +++++++++---------- src/Simplex/Messaging/Server/Env/STM.hs | 74 +++++++++++++++++-- tests/NtfClient.hs | 3 +- 5 files changed, 102 insertions(+), 51 deletions(-) diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index b513a3399..a287a065b 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -53,7 +53,6 @@ data NtfServerConfig = NtfServerConfig subIdBytes :: Int, regCodeBytes :: Int, clientQSize :: Natural, - subQSize :: Natural, pushQSize :: Natural, smpAgentCfg :: SMPClientAgentConfig, apnsConfig :: APNSPushClientConfig, diff --git a/src/Simplex/Messaging/Notifications/Server/Main.hs b/src/Simplex/Messaging/Notifications/Server/Main.hs index a073eee18..23954506a 100644 --- a/src/Simplex/Messaging/Notifications/Server/Main.hs +++ b/src/Simplex/Messaging/Notifications/Server/Main.hs @@ -233,7 +233,6 @@ ntfServerCLI cfgPath logPath = subIdBytes = 24, regCodeBytes = 32, clientQSize = 64, - subQSize = 2048, pushQSize = 32768, smpAgentCfg = defaultSMPClientAgentConfig diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 61e7d1981..509a85cb6 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -162,8 +162,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt stopServer s liftIO $ exitSuccess raceAny_ - ( serverThread "server subscribers" (subscribers s) subscriptions cancelSub - : serverThread "server notifiers" (ntfSubscribers s) ntfSubscriptions (\_ -> pure ()) + ( serverThread "server subscribers" s subscribers subscriptions cancelSub + : serverThread "server notifiers" s ntfSubscribers ntfSubscriptions (\_ -> pure ()) : deliverNtfsThread s : sendPendingEvtsThread s : receiveFromProxyAgent pa @@ -230,28 +230,29 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt serverThread :: forall s. String -> - ServerSubscribers -> + Server -> + (Server -> ServerSubscribers) -> (forall st. Client st -> TMap QueueId s) -> (s -> IO ()) -> M () - serverThread label ServerSubscribers {subQ, queueSubscribers = ss, subClients, pendingEvents} clientSubs unsub = do + serverThread label srv srvSubscribers clientSubs unsub = do labelMyThread label - cls <- asks clients liftIO . forever $ do -- Reading clients outside of `updateSubscribers` transaction to avoid transaction re-evaluation on each new connected client. -- In case client disconnects during the transaction (its `connected` property is read), -- the transaction will still be re-evaluated, and the client won't be stored as subscribed. sub@(_, clntId, _) <- atomically $ readTQueue subQ - c_ <- IM.lookup clntId <$> readTVarIO cls + c_ <- getServerClient clntId srv atomically (updateSubscribers c_ sub) $>>= endPreviousSubscriptions >>= mapM_ unsub where + ServerSubscribers {subQ, queueSubscribers = ss, subClients, pendingEvents} = srvSubscribers srv updateSubscribers :: Maybe AClient -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, BrokerMsg), AClient)) updateSubscribers c_ (qId, clntId, subscribed) = updateSub $>>= clientToBeNotified where updateSub = case c_ of - Just c@(AClient _ _ c') -> ifM (readTVar $ connected c') (updateSubConnected c) updateSubDisconnected + Just c@(AClient _ _ Client {connected}) -> ifM (readTVar connected) (updateSubConnected c) updateSubDisconnected Nothing -> updateSubDisconnected updateSubConnected c | subscribed = do @@ -264,9 +265,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt removeWhenNoSubs c TM.lookupDelete qId ss >>= mapM readTVar updateSubDisconnected = TM.lookup qId ss >>= mapM readTVar -- do not insert client if it is already disconnected, but send END to any other client - clientToBeNotified ac@(AClient _ _ c') - | clntId == clientId c' = pure Nothing - | otherwise = (\yes -> if yes then Just ((qId, subEvt), ac) else Nothing) <$> readTVar (connected c') + clientToBeNotified ac@(AClient _ _ Client {clientId, connected}) + | clntId == clientId = pure Nothing + | otherwise = (\yes -> if yes then Just ((qId, subEvt), ac) else Nothing) <$> readTVar connected where subEvt = if subscribed then END else DELD endPreviousSubscriptions :: ((QueueId, BrokerMsg), AClient) -> IO (Maybe s) @@ -279,15 +280,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt removeWhenNoSubs (AClient _ _ c) = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' subClients $ IS.delete (clientId c) deliverNtfsThread :: Server -> M () - deliverNtfsThread Server {ntfSubscribers} = do + deliverNtfsThread srv@Server {ntfSubscribers} = do ntfInt <- asks $ ntfDeliveryInterval . config NtfStore ns <- asks ntfStore stats <- asks serverStats - cls <- asks clients liftIO $ forever $ do threadDelay ntfInt cIds <- IS.toList <$> readTVarIO (subClients ntfSubscribers) - forM_ cIds $ \cId -> mapM_ (deliverNtfs ns stats) . IM.lookup cId =<< readTVarIO cls + forM_ cIds $ \cId -> getServerClient cId srv >>= mapM_ (deliverNtfs ns stats) where deliverNtfs ns stats (AClient _ _ Client {clientId, ntfSubscriptions, sndQ, connected}) = whenM (currentClient readTVarIO) $ do @@ -323,18 +323,17 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt atomicModifyIORef'_ (msgNtfsB stats) (+ (len `div` 80 + 1)) -- up to 80 NMSG in the batch sendPendingEvtsThread :: Server -> M () - sendPendingEvtsThread Server {subscribers, ntfSubscribers} = do + sendPendingEvtsThread srv@Server {subscribers, ntfSubscribers} = do endInt <- asks $ pendingENDInterval . config - cls <- asks clients forever $ do threadDelay endInt - sendPending cls subscribers - sendPending cls ntfSubscribers + sendPending subscribers + sendPending ntfSubscribers where - sendPending cls ServerSubscribers {pendingEvents} = do - ends <- atomically $ swapTVar pendingEvents IM.empty - unless (null ends) $ forM_ (IM.assocs ends) $ \(cId, evts) -> - mapM_ (enqueueEvts evts) . IM.lookup cId =<< readTVarIO cls + sendPending ServerSubscribers {pendingEvents} = do + pending <- atomically $ swapTVar pendingEvents IM.empty + unless (null pending) $ forM_ (IM.assocs pending) $ \(cId, evts) -> + liftIO (getServerClient cId srv) >>= mapM_ (enqueueEvts evts) enqueueEvts evts (AClient _ _ c@Client {connected, sndQ = q}) = whenM (readTVarIO connected) $ do sent <- atomically $ tryWriteTBQueue q ts @@ -578,14 +577,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount, rtsOptions} getRealTimeMetrics :: Env -> IO RealTimeMetrics - getRealTimeMetrics Env {clients, sockets, msgStore = AMS _ _ ms, server = Server {subscribers, ntfSubscribers}} = do + getRealTimeMetrics Env {sockets, msgStore = AMS _ _ ms, server = srv@Server {subscribers, ntfSubscribers}} = do socketStats <- mapM (traverse getSocketStats) =<< readTVarIO sockets #if MIN_VERSION_base(4,18,0) threadsCount <- length <$> listThreads #else let threadsCount = 0 #endif - clientsCount <- IM.size <$> readTVarIO clients + clientsCount <- IM.size <$> getServerClients srv smpSubsCount <- M.size <$> readTVarIO (queueSubscribers subscribers) smpSubClientsCount <- IS.size <$> readTVarIO (subClients subscribers) ntfSubsCount <- M.size <$> readTVarIO (queueSubscribers ntfSubscribers) @@ -650,9 +649,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt CPSuspend -> withAdminRole $ hPutStrLn h "suspend not implemented" CPResume -> withAdminRole $ hPutStrLn h "resume not implemented" CPClients -> withAdminRole $ do - active <- unliftIO u (asks clients) >>= readTVarIO + cls <- getServerClients srv hPutStrLn h "clientId,sessionId,connected,createdAt,rcvActiveAt,sndActiveAt,age,subscriptions" - forM_ (IM.toList active) $ \(cid, (AClient _ _ Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions})) -> do + forM_ (IM.toList cls) $ \(cid, (AClient _ _ Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions})) -> do connected' <- bshow <$> readTVarIO connected rcvActiveAt' <- strEncode <$> readTVarIO rcvActiveAt sndActiveAt' <- strEncode <$> readTVarIO sndActiveAt @@ -764,8 +763,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt #else hPutStrLn h "Threads: not available on GHC 8.10" #endif - Env {clients, server = Server {subscribers, ntfSubscribers}} <- unliftIO u ask - activeClients <- readTVarIO clients + let Server {subscribers, ntfSubscribers} = srv + activeClients <- getServerClients srv hPutStrLn h $ "Clients: " <> show (IM.size activeClients) when (r == CPRAdmin) $ do clQs <- clientTBQueueLengths' activeClients @@ -896,19 +895,12 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio clientId <- atomically $ stateTVar nextClientId $ \next -> (next, next + 1) AMS qt mt ms <- asks msgStore c <- liftIO $ newClient qt mt clientId q thVersion sessionId ts - runClientThreads qt mt ms c clientId `finally` clientDisconnected c + runClientThreads qt mt ms c `finally` clientDisconnected c where - runClientThreads :: MsgStoreClass (MsgStore qs ms) => SQSType qs -> SMSType ms -> MsgStore qs ms -> Client (MsgStore qs ms) -> IS.Key -> M () - runClientThreads qt mt ms c clientId = do - cls <- asks clients - ok <- - atomically $ do - ifM - (readTVar $ connected c) - (True <$ modifyTVar' cls (IM.insert clientId $ AClient qt mt c)) - (pure False) - when ok $ do - s <- asks server + runClientThreads :: MsgStoreClass (MsgStore qs ms) => SQSType qs -> SMSType ms -> MsgStore qs ms -> Client (MsgStore qs ms) -> M () + runClientThreads qt mt ms c = do + s <- asks server + whenM (liftIO $ insertServerClient (AClient qt mt c) s) $ do expCfg <- asks $ inactiveClientExpiration . config th <- newMVar h -- put TH under a fair lock to interleave messages and command responses labelMyThread . B.unpack $ "client $" <> encode sessionId @@ -932,10 +924,10 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connecte ntfSubs <- atomically $ swapTVar ntfSubscriptions M.empty liftIO $ mapM_ cancelSub subs whenM (asks serverActive >>= readTVarIO) $ do - Server {subscribers, ntfSubscribers} <- asks server + srv@Server {subscribers, ntfSubscribers} <- asks server liftIO $ updateSubscribers subs $ queueSubscribers subscribers liftIO $ updateSubscribers ntfSubs $ queueSubscribers ntfSubscribers - asks clients >>= atomically . (`modifyTVar'` IM.delete clientId) + liftIO $ deleteServerClient clientId srv atomically $ modifyTVar' (subClients subscribers) $ IS.delete clientId atomically $ modifyTVar' (subClients ntfSubscribers) $ IS.delete clientId tIds <- atomically $ swapTVar endThreads IM.empty diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index e1e241dd1..f541b02eb 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -41,6 +41,10 @@ module Simplex.Messaging.Server.Env.STM newEnv, mkJournalStoreConfig, newClient, + getServerClients, + getServerClient, + insertServerClient, + deleteServerClient, clientId', newSubscription, newProhibitedSub, @@ -108,6 +112,7 @@ import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ATransport, VersionRangeSMP, VersionSMP) import Simplex.Messaging.Transport.Server +import Simplex.Messaging.Util (ifM) import System.Directory (doesFileExist) import System.Exit (exitFailure) import System.IO (IOMode (..)) @@ -245,7 +250,6 @@ data Env = Env serverStats :: ServerStats, sockets :: TVar [(ServiceName, SocketState)], clientSeq :: TVar ClientId, - clients :: TVar (IntMap AClient), proxyAgent :: ProxyAgent -- senders served on this proxy } @@ -278,11 +282,15 @@ data AMsgStore = type Subscribed = Bool data Server = Server - { subscribers :: ServerSubscribers, + { clients :: ServerClients, + subscribers :: ServerSubscribers, ntfSubscribers :: ServerSubscribers, savingLock :: Lock } +-- not exported, to prevent accidental concurrent IntMap lookups inside STM transactions. +newtype ServerClients = ServerClients {serverClients :: TVar (IntMap AClient)} + data ServerSubscribers = ServerSubscribers { subQ :: TQueue (QueueId, ClientId, Subscribed), queueSubscribers :: TMap QueueId (TVar AClient), @@ -331,10 +339,32 @@ data Sub = Sub newServer :: IO Server newServer = do + clients <- newTVarIO mempty subscribers <- newServerSubscribers ntfSubscribers <- newServerSubscribers savingLock <- createLockIO - return Server {subscribers, ntfSubscribers, savingLock} + return Server {clients = ServerClients clients, subscribers, ntfSubscribers, savingLock} + +getServerClients :: Server -> IO (IntMap AClient) +getServerClients = readTVarIO . serverClients . clients +{-# INLINE getServerClients #-} + +getServerClient :: ClientId -> Server -> IO (Maybe AClient) +getServerClient cId s = IM.lookup cId <$> getServerClients s +{-# INLINE getServerClient #-} + +insertServerClient :: AClient -> Server -> IO Bool +insertServerClient ac@(AClient _ _ Client {clientId, connected}) Server {clients} = + atomically $ + ifM + (readTVar connected) + (True <$ modifyTVar' (serverClients clients) (IM.insert clientId ac)) + (pure False) +{-# INLINE insertServerClient #-} + +deleteServerClient :: ClientId -> Server -> IO () +deleteServerClient cId Server {clients} = atomically $ modifyTVar' (serverClients clients) $ IM.delete cId +{-# INLINE deleteServerClient #-} newServerSubscribers :: IO ServerSubscribers newServerSubscribers = do @@ -357,7 +387,24 @@ newClient _ _ clientId qSize thVersion sessionId createdAt = do connected <- newTVarIO True rcvActiveAt <- newTVarIO createdAt sndActiveAt <- newTVarIO createdAt - return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, procThreads, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt} + return + Client + { clientId, + subscriptions, + ntfSubscriptions, + rcvQ, + sndQ, + msgQ, + procThreads, + endThreads, + endThreadSeq, + thVersion, + sessionId, + connected, + createdAt, + rcvActiveAt, + sndActiveAt + } newSubscription :: SubscriptionThread -> STM Sub newSubscription st = do @@ -407,9 +454,24 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp serverStats <- newServerStats =<< getCurrentTime sockets <- newTVarIO [] clientSeq <- newTVarIO 0 - clients <- newTVarIO mempty proxyAgent <- newSMPProxyAgent smpAgentCfg random - pure Env {serverActive, config, serverInfo, server, serverIdentity, msgStore, ntfStore, random, tlsServerCreds, httpServerCreds, serverStats, sockets, clientSeq, clients, proxyAgent} + pure + Env + { serverActive, + config, + serverInfo, + server, + serverIdentity, + msgStore, + ntfStore, + random, + tlsServerCreds, + httpServerCreds, + serverStats, + sockets, + clientSeq, + proxyAgent + } where loadStoreLog :: StoreQueueClass q => (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO () loadStoreLog mkQ f st = do diff --git a/tests/NtfClient.hs b/tests/NtfClient.hs index f20264cb8..e68095873 100644 --- a/tests/NtfClient.hs +++ b/tests/NtfClient.hs @@ -83,7 +83,7 @@ ntfTestPrometheusMetricsFile :: FilePath ntfTestPrometheusMetricsFile = "tests/tmp/ntf-server-metrics.txt" ntfTestStoreDBOpts :: DBOpts -ntfTestStoreDBOpts = +ntfTestStoreDBOpts = DBOpts { connstr = ntfTestServerDBConnstr, schema = "ntf_server", @@ -134,7 +134,6 @@ ntfServerCfg = subIdBytes = 24, regCodeBytes = 32, clientQSize = 2, - subQSize = 2, pushQSize = 2, smpAgentCfg = defaultSMPClientAgentConfig {persistErrorInterval = 0}, apnsConfig =