hide clients IntMap

This commit is contained in:
Evgeny Poberezkin
2025-05-22 10:51:40 +01:00
parent 43ef908309
commit 67f5f2ea8f
5 changed files with 102 additions and 51 deletions

View File

@@ -53,7 +53,6 @@ data NtfServerConfig = NtfServerConfig
subIdBytes :: Int,
regCodeBytes :: Int,
clientQSize :: Natural,
subQSize :: Natural,
pushQSize :: Natural,
smpAgentCfg :: SMPClientAgentConfig,
apnsConfig :: APNSPushClientConfig,

View File

@@ -233,7 +233,6 @@ ntfServerCLI cfgPath logPath =
subIdBytes = 24,
regCodeBytes = 32,
clientQSize = 64,
subQSize = 2048,
pushQSize = 32768,
smpAgentCfg =
defaultSMPClientAgentConfig

View File

@@ -162,8 +162,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
stopServer s
liftIO $ exitSuccess
raceAny_
( serverThread "server subscribers" (subscribers s) subscriptions cancelSub
: serverThread "server notifiers" (ntfSubscribers s) ntfSubscriptions (\_ -> pure ())
( serverThread "server subscribers" s subscribers subscriptions cancelSub
: serverThread "server notifiers" s ntfSubscribers ntfSubscriptions (\_ -> pure ())
: deliverNtfsThread s
: sendPendingEvtsThread s
: receiveFromProxyAgent pa
@@ -230,28 +230,29 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
serverThread ::
forall s.
String ->
ServerSubscribers ->
Server ->
(Server -> ServerSubscribers) ->
(forall st. Client st -> TMap QueueId s) ->
(s -> IO ()) ->
M ()
serverThread label ServerSubscribers {subQ, queueSubscribers = ss, subClients, pendingEvents} clientSubs unsub = do
serverThread label srv srvSubscribers clientSubs unsub = do
labelMyThread label
cls <- asks clients
liftIO . forever $ do
-- Reading clients outside of `updateSubscribers` transaction to avoid transaction re-evaluation on each new connected client.
-- In case client disconnects during the transaction (its `connected` property is read),
-- the transaction will still be re-evaluated, and the client won't be stored as subscribed.
sub@(_, clntId, _) <- atomically $ readTQueue subQ
c_ <- IM.lookup clntId <$> readTVarIO cls
c_ <- getServerClient clntId srv
atomically (updateSubscribers c_ sub)
$>>= endPreviousSubscriptions
>>= mapM_ unsub
where
ServerSubscribers {subQ, queueSubscribers = ss, subClients, pendingEvents} = srvSubscribers srv
updateSubscribers :: Maybe AClient -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, BrokerMsg), AClient))
updateSubscribers c_ (qId, clntId, subscribed) = updateSub $>>= clientToBeNotified
where
updateSub = case c_ of
Just c@(AClient _ _ c') -> ifM (readTVar $ connected c') (updateSubConnected c) updateSubDisconnected
Just c@(AClient _ _ Client {connected}) -> ifM (readTVar connected) (updateSubConnected c) updateSubDisconnected
Nothing -> updateSubDisconnected
updateSubConnected c
| subscribed = do
@@ -264,9 +265,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
removeWhenNoSubs c
TM.lookupDelete qId ss >>= mapM readTVar
updateSubDisconnected = TM.lookup qId ss >>= mapM readTVar -- do not insert client if it is already disconnected, but send END to any other client
clientToBeNotified ac@(AClient _ _ c')
| clntId == clientId c' = pure Nothing
| otherwise = (\yes -> if yes then Just ((qId, subEvt), ac) else Nothing) <$> readTVar (connected c')
clientToBeNotified ac@(AClient _ _ Client {clientId, connected})
| clntId == clientId = pure Nothing
| otherwise = (\yes -> if yes then Just ((qId, subEvt), ac) else Nothing) <$> readTVar connected
where
subEvt = if subscribed then END else DELD
endPreviousSubscriptions :: ((QueueId, BrokerMsg), AClient) -> IO (Maybe s)
@@ -279,15 +280,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
removeWhenNoSubs (AClient _ _ c) = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' subClients $ IS.delete (clientId c)
deliverNtfsThread :: Server -> M ()
deliverNtfsThread Server {ntfSubscribers} = do
deliverNtfsThread srv@Server {ntfSubscribers} = do
ntfInt <- asks $ ntfDeliveryInterval . config
NtfStore ns <- asks ntfStore
stats <- asks serverStats
cls <- asks clients
liftIO $ forever $ do
threadDelay ntfInt
cIds <- IS.toList <$> readTVarIO (subClients ntfSubscribers)
forM_ cIds $ \cId -> mapM_ (deliverNtfs ns stats) . IM.lookup cId =<< readTVarIO cls
forM_ cIds $ \cId -> getServerClient cId srv >>= mapM_ (deliverNtfs ns stats)
where
deliverNtfs ns stats (AClient _ _ Client {clientId, ntfSubscriptions, sndQ, connected}) =
whenM (currentClient readTVarIO) $ do
@@ -323,18 +323,17 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
atomicModifyIORef'_ (msgNtfsB stats) (+ (len `div` 80 + 1)) -- up to 80 NMSG in the batch
sendPendingEvtsThread :: Server -> M ()
sendPendingEvtsThread Server {subscribers, ntfSubscribers} = do
sendPendingEvtsThread srv@Server {subscribers, ntfSubscribers} = do
endInt <- asks $ pendingENDInterval . config
cls <- asks clients
forever $ do
threadDelay endInt
sendPending cls subscribers
sendPending cls ntfSubscribers
sendPending subscribers
sendPending ntfSubscribers
where
sendPending cls ServerSubscribers {pendingEvents} = do
ends <- atomically $ swapTVar pendingEvents IM.empty
unless (null ends) $ forM_ (IM.assocs ends) $ \(cId, evts) ->
mapM_ (enqueueEvts evts) . IM.lookup cId =<< readTVarIO cls
sendPending ServerSubscribers {pendingEvents} = do
pending <- atomically $ swapTVar pendingEvents IM.empty
unless (null pending) $ forM_ (IM.assocs pending) $ \(cId, evts) ->
liftIO (getServerClient cId srv) >>= mapM_ (enqueueEvts evts)
enqueueEvts evts (AClient _ _ c@Client {connected, sndQ = q}) =
whenM (readTVarIO connected) $ do
sent <- atomically $ tryWriteTBQueue q ts
@@ -578,14 +577,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount, rtsOptions}
getRealTimeMetrics :: Env -> IO RealTimeMetrics
getRealTimeMetrics Env {clients, sockets, msgStore = AMS _ _ ms, server = Server {subscribers, ntfSubscribers}} = do
getRealTimeMetrics Env {sockets, msgStore = AMS _ _ ms, server = srv@Server {subscribers, ntfSubscribers}} = do
socketStats <- mapM (traverse getSocketStats) =<< readTVarIO sockets
#if MIN_VERSION_base(4,18,0)
threadsCount <- length <$> listThreads
#else
let threadsCount = 0
#endif
clientsCount <- IM.size <$> readTVarIO clients
clientsCount <- IM.size <$> getServerClients srv
smpSubsCount <- M.size <$> readTVarIO (queueSubscribers subscribers)
smpSubClientsCount <- IS.size <$> readTVarIO (subClients subscribers)
ntfSubsCount <- M.size <$> readTVarIO (queueSubscribers ntfSubscribers)
@@ -650,9 +649,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
CPSuspend -> withAdminRole $ hPutStrLn h "suspend not implemented"
CPResume -> withAdminRole $ hPutStrLn h "resume not implemented"
CPClients -> withAdminRole $ do
active <- unliftIO u (asks clients) >>= readTVarIO
cls <- getServerClients srv
hPutStrLn h "clientId,sessionId,connected,createdAt,rcvActiveAt,sndActiveAt,age,subscriptions"
forM_ (IM.toList active) $ \(cid, (AClient _ _ Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions})) -> do
forM_ (IM.toList cls) $ \(cid, (AClient _ _ Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions})) -> do
connected' <- bshow <$> readTVarIO connected
rcvActiveAt' <- strEncode <$> readTVarIO rcvActiveAt
sndActiveAt' <- strEncode <$> readTVarIO sndActiveAt
@@ -764,8 +763,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
#else
hPutStrLn h "Threads: not available on GHC 8.10"
#endif
Env {clients, server = Server {subscribers, ntfSubscribers}} <- unliftIO u ask
activeClients <- readTVarIO clients
let Server {subscribers, ntfSubscribers} = srv
activeClients <- getServerClients srv
hPutStrLn h $ "Clients: " <> show (IM.size activeClients)
when (r == CPRAdmin) $ do
clQs <- clientTBQueueLengths' activeClients
@@ -896,19 +895,12 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio
clientId <- atomically $ stateTVar nextClientId $ \next -> (next, next + 1)
AMS qt mt ms <- asks msgStore
c <- liftIO $ newClient qt mt clientId q thVersion sessionId ts
runClientThreads qt mt ms c clientId `finally` clientDisconnected c
runClientThreads qt mt ms c `finally` clientDisconnected c
where
runClientThreads :: MsgStoreClass (MsgStore qs ms) => SQSType qs -> SMSType ms -> MsgStore qs ms -> Client (MsgStore qs ms) -> IS.Key -> M ()
runClientThreads qt mt ms c clientId = do
cls <- asks clients
ok <-
atomically $ do
ifM
(readTVar $ connected c)
(True <$ modifyTVar' cls (IM.insert clientId $ AClient qt mt c))
(pure False)
when ok $ do
s <- asks server
runClientThreads :: MsgStoreClass (MsgStore qs ms) => SQSType qs -> SMSType ms -> MsgStore qs ms -> Client (MsgStore qs ms) -> M ()
runClientThreads qt mt ms c = do
s <- asks server
whenM (liftIO $ insertServerClient (AClient qt mt c) s) $ do
expCfg <- asks $ inactiveClientExpiration . config
th <- newMVar h -- put TH under a fair lock to interleave messages and command responses
labelMyThread . B.unpack $ "client $" <> encode sessionId
@@ -932,10 +924,10 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connecte
ntfSubs <- atomically $ swapTVar ntfSubscriptions M.empty
liftIO $ mapM_ cancelSub subs
whenM (asks serverActive >>= readTVarIO) $ do
Server {subscribers, ntfSubscribers} <- asks server
srv@Server {subscribers, ntfSubscribers} <- asks server
liftIO $ updateSubscribers subs $ queueSubscribers subscribers
liftIO $ updateSubscribers ntfSubs $ queueSubscribers ntfSubscribers
asks clients >>= atomically . (`modifyTVar'` IM.delete clientId)
liftIO $ deleteServerClient clientId srv
atomically $ modifyTVar' (subClients subscribers) $ IS.delete clientId
atomically $ modifyTVar' (subClients ntfSubscribers) $ IS.delete clientId
tIds <- atomically $ swapTVar endThreads IM.empty

View File

@@ -41,6 +41,10 @@ module Simplex.Messaging.Server.Env.STM
newEnv,
mkJournalStoreConfig,
newClient,
getServerClients,
getServerClient,
insertServerClient,
deleteServerClient,
clientId',
newSubscription,
newProhibitedSub,
@@ -108,6 +112,7 @@ import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport, VersionRangeSMP, VersionSMP)
import Simplex.Messaging.Transport.Server
import Simplex.Messaging.Util (ifM)
import System.Directory (doesFileExist)
import System.Exit (exitFailure)
import System.IO (IOMode (..))
@@ -245,7 +250,6 @@ data Env = Env
serverStats :: ServerStats,
sockets :: TVar [(ServiceName, SocketState)],
clientSeq :: TVar ClientId,
clients :: TVar (IntMap AClient),
proxyAgent :: ProxyAgent -- senders served on this proxy
}
@@ -278,11 +282,15 @@ data AMsgStore =
type Subscribed = Bool
data Server = Server
{ subscribers :: ServerSubscribers,
{ clients :: ServerClients,
subscribers :: ServerSubscribers,
ntfSubscribers :: ServerSubscribers,
savingLock :: Lock
}
-- not exported, to prevent accidental concurrent IntMap lookups inside STM transactions.
newtype ServerClients = ServerClients {serverClients :: TVar (IntMap AClient)}
data ServerSubscribers = ServerSubscribers
{ subQ :: TQueue (QueueId, ClientId, Subscribed),
queueSubscribers :: TMap QueueId (TVar AClient),
@@ -331,10 +339,32 @@ data Sub = Sub
newServer :: IO Server
newServer = do
clients <- newTVarIO mempty
subscribers <- newServerSubscribers
ntfSubscribers <- newServerSubscribers
savingLock <- createLockIO
return Server {subscribers, ntfSubscribers, savingLock}
return Server {clients = ServerClients clients, subscribers, ntfSubscribers, savingLock}
getServerClients :: Server -> IO (IntMap AClient)
getServerClients = readTVarIO . serverClients . clients
{-# INLINE getServerClients #-}
getServerClient :: ClientId -> Server -> IO (Maybe AClient)
getServerClient cId s = IM.lookup cId <$> getServerClients s
{-# INLINE getServerClient #-}
insertServerClient :: AClient -> Server -> IO Bool
insertServerClient ac@(AClient _ _ Client {clientId, connected}) Server {clients} =
atomically $
ifM
(readTVar connected)
(True <$ modifyTVar' (serverClients clients) (IM.insert clientId ac))
(pure False)
{-# INLINE insertServerClient #-}
deleteServerClient :: ClientId -> Server -> IO ()
deleteServerClient cId Server {clients} = atomically $ modifyTVar' (serverClients clients) $ IM.delete cId
{-# INLINE deleteServerClient #-}
newServerSubscribers :: IO ServerSubscribers
newServerSubscribers = do
@@ -357,7 +387,24 @@ newClient _ _ clientId qSize thVersion sessionId createdAt = do
connected <- newTVarIO True
rcvActiveAt <- newTVarIO createdAt
sndActiveAt <- newTVarIO createdAt
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, procThreads, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt}
return
Client
{ clientId,
subscriptions,
ntfSubscriptions,
rcvQ,
sndQ,
msgQ,
procThreads,
endThreads,
endThreadSeq,
thVersion,
sessionId,
connected,
createdAt,
rcvActiveAt,
sndActiveAt
}
newSubscription :: SubscriptionThread -> STM Sub
newSubscription st = do
@@ -407,9 +454,24 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp
serverStats <- newServerStats =<< getCurrentTime
sockets <- newTVarIO []
clientSeq <- newTVarIO 0
clients <- newTVarIO mempty
proxyAgent <- newSMPProxyAgent smpAgentCfg random
pure Env {serverActive, config, serverInfo, server, serverIdentity, msgStore, ntfStore, random, tlsServerCreds, httpServerCreds, serverStats, sockets, clientSeq, clients, proxyAgent}
pure
Env
{ serverActive,
config,
serverInfo,
server,
serverIdentity,
msgStore,
ntfStore,
random,
tlsServerCreds,
httpServerCreds,
serverStats,
sockets,
clientSeq,
proxyAgent
}
where
loadStoreLog :: StoreQueueClass q => (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO ()
loadStoreLog mkQ f st = do

View File

@@ -83,7 +83,7 @@ ntfTestPrometheusMetricsFile :: FilePath
ntfTestPrometheusMetricsFile = "tests/tmp/ntf-server-metrics.txt"
ntfTestStoreDBOpts :: DBOpts
ntfTestStoreDBOpts =
ntfTestStoreDBOpts =
DBOpts
{ connstr = ntfTestServerDBConnstr,
schema = "ntf_server",
@@ -134,7 +134,6 @@ ntfServerCfg =
subIdBytes = 24,
regCodeBytes = 32,
clientQSize = 2,
subQSize = 2,
pushQSize = 2,
smpAgentCfg = defaultSMPClientAgentConfig {persistErrorInterval = 0},
apnsConfig =