mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-29 22:34:21 +00:00
correct stats for subscriptions
This commit is contained in:
@@ -260,9 +260,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
upsertSubscribedClient qId c queueSubscribers
|
||||
| otherwise = do
|
||||
removeWhenNoSubs c
|
||||
lookupDeleteSubscribedClient qId queueSubscribers
|
||||
lookupRemoveSubscribedClient qId queueSubscribers
|
||||
-- do not insert client if it is already disconnected, but send END to any other client
|
||||
updateSubDisconnected = lookupDeleteSubscribedClient qId queueSubscribers
|
||||
updateSubDisconnected = lookupRemoveSubscribedClient qId queueSubscribers
|
||||
clientToBeNotified ac@(AClient _ _ Client {clientId, connected})
|
||||
| clntId == clientId = pure Nothing
|
||||
| otherwise = (\yes -> if yes then Just ((qId, subEvt), ac) else Nothing) <$> readTVar connected
|
||||
@@ -579,12 +579,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
let threadsCount = 0
|
||||
#endif
|
||||
clientsCount <- IM.size <$> getServerClients srv
|
||||
smpSubsCount <- M.size <$> getSubscribedClients (queueSubscribers subscribers)
|
||||
smpSubClientsCount <- IS.size <$> readTVarIO (subClients subscribers)
|
||||
ntfSubsCount <- M.size <$> getSubscribedClients (queueSubscribers ntfSubscribers)
|
||||
ntfSubClientsCount <- IS.size <$> readTVarIO (subClients ntfSubscribers)
|
||||
smpSubs <- getSubscribersMetrics subscribers
|
||||
ntfSubs <- getSubscribersMetrics ntfSubscribers
|
||||
loadedCounts <- loadedQueueCounts ms
|
||||
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount, loadedCounts}
|
||||
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubs, ntfSubs, loadedCounts}
|
||||
where
|
||||
getSubscribersMetrics ServerSubscribers {queueSubscribers, subClients} = do
|
||||
(storedSubs, subsCount) <- getSubscribedClients queueSubscribers
|
||||
subClientsCount <- IS.size <$> readTVarIO subClients
|
||||
pure RTSubscriberMetrics {subsCount, subVarsCount = M.size storedSubs, subClientsCount}
|
||||
|
||||
runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M ()
|
||||
runClient signKey tp h = do
|
||||
@@ -779,9 +782,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
where
|
||||
putActiveClientsInfo :: String -> SubscribedClients -> Bool -> IO ()
|
||||
putActiveClientsInfo protoName clients showIds = do
|
||||
activeSubs <- getSubscribedClients clients
|
||||
hPutStrLn h $ protoName <> " subscriptions: " <> show (M.size activeSubs)
|
||||
clnts <- countSubClients activeSubs
|
||||
(storedSubs, subsCount) <- getSubscribedClients clients
|
||||
hPutStrLn h $ protoName <> " subscription vars: " <> show (M.size storedSubs)
|
||||
hPutStrLn h $ protoName <> " subscriptions: " <> show subsCount
|
||||
clnts <- countSubClients storedSubs
|
||||
hPutStrLn h $ protoName <> " subscribed clients: " <> show (IS.size clnts) <> (if showIds then " " <> show (IS.toList clnts) else "")
|
||||
where
|
||||
countSubClients :: M.Map QueueId (TVar (Maybe AClient)) -> IO IS.IntSet
|
||||
@@ -927,7 +931,7 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connecte
|
||||
where
|
||||
updateSubscribers :: M.Map QueueId a -> ServerSubscribers -> IO ()
|
||||
updateSubscribers subs ServerSubscribers {queueSubscribers, subClients} = do
|
||||
mapM_ (\qId -> deleteSubcribedClient qId c queueSubscribers) (M.keys subs)
|
||||
mapM_ (\qId -> removeSubcribedClient qId c queueSubscribers) (M.keys subs)
|
||||
atomically $ modifyTVar' subClients $ IS.delete clientId
|
||||
|
||||
cancelSub :: Sub -> IO ()
|
||||
@@ -1615,8 +1619,8 @@ client
|
||||
-- lookup can be outside of STM transaction,
|
||||
-- as long as the check that it is the same client is inside.
|
||||
getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame
|
||||
deliverIfSame rc' = time "deliver" . atomically $
|
||||
whenM (maybe False (sameClientId rc) <$> readTVar rc') $
|
||||
deliverIfSame rcv = time "deliver" . atomically $
|
||||
whenM (sameClient rc rcv) $
|
||||
tryTakeTMVar delivered >>= \case
|
||||
Just _ -> pure () -- if a message was already delivered, should not deliver more
|
||||
Nothing -> do
|
||||
|
||||
@@ -49,9 +49,10 @@ module Simplex.Messaging.Server.Env.STM
|
||||
getSubscribedClients,
|
||||
getSubscribedClient,
|
||||
upsertSubscribedClient,
|
||||
lookupDeleteSubscribedClient,
|
||||
deleteSubcribedClient,
|
||||
lookupRemoveSubscribedClient,
|
||||
removeSubcribedClient,
|
||||
sameClientId,
|
||||
sameClient,
|
||||
clientId',
|
||||
newSubscription,
|
||||
newProhibitedSub,
|
||||
@@ -313,43 +314,57 @@ data ServerSubscribers = ServerSubscribers
|
||||
-- any STM transaction that reads subscribed client will re-evaluate in this case.
|
||||
-- The subscriptions that were made at any point are not removed -
|
||||
-- this is a better trade-off with intermittently connected mobile clients.
|
||||
newtype SubscribedClients = SubscribedClients (TMap EntityId (TVar (Maybe AClient)))
|
||||
data SubscribedClients = SubscribedClients {subscribedClients :: TMap EntityId (TVar (Maybe AClient)), subcribedCount :: TVar Int}
|
||||
|
||||
getSubscribedClients :: SubscribedClients -> IO (Map EntityId (TVar (Maybe AClient)))
|
||||
getSubscribedClients (SubscribedClients cs) = readTVarIO cs
|
||||
{-# INLINE getSubscribedClients #-}
|
||||
getSubscribedClients :: SubscribedClients -> IO (Map EntityId (TVar (Maybe AClient)), Int)
|
||||
getSubscribedClients (SubscribedClients cs cnt) = (,) <$> readTVarIO cs <*> readTVarIO cnt
|
||||
|
||||
getSubscribedClient :: EntityId -> SubscribedClients -> IO (Maybe (TVar (Maybe AClient)))
|
||||
getSubscribedClient entId (SubscribedClients cs) = TM.lookupIO entId cs
|
||||
getSubscribedClient entId (SubscribedClients cs _) = TM.lookupIO entId cs
|
||||
{-# INLINE getSubscribedClient #-}
|
||||
|
||||
-- insert subscribed and current client, return previously subscribed client if it is different
|
||||
upsertSubscribedClient :: EntityId -> AClient -> SubscribedClients -> STM (Maybe AClient)
|
||||
upsertSubscribedClient entId ac@(AClient _ _ c) (SubscribedClients cs) =
|
||||
upsertSubscribedClient entId ac@(AClient _ _ c) (SubscribedClients cs cnt) =
|
||||
TM.lookup entId cs >>= \case
|
||||
Nothing -> Nothing <$ TM.insertM entId (newTVar $ Just ac) cs
|
||||
Nothing -> do
|
||||
TM.insertM entId (newTVar $ Just ac) cs
|
||||
modifyTVar' cnt (+ 1)
|
||||
pure Nothing
|
||||
Just cv ->
|
||||
readTVar cv >>= \case
|
||||
Just c' | sameClientId c c' -> pure Nothing
|
||||
c_ -> c_ <$ writeTVar cv (Just ac)
|
||||
Just c'
|
||||
| sameClientId c c' -> pure Nothing
|
||||
| otherwise -> Just c' <$ writeTVar cv (Just ac)
|
||||
Nothing -> do
|
||||
writeTVar cv (Just ac)
|
||||
modifyTVar' cnt (+ 1)
|
||||
pure Nothing
|
||||
|
||||
-- insert delete subscribed client
|
||||
lookupDeleteSubscribedClient :: EntityId -> SubscribedClients -> STM (Maybe AClient)
|
||||
lookupDeleteSubscribedClient entId (SubscribedClients cs) = TM.lookupDelete entId cs $>>= readTVar
|
||||
{-# INLINE lookupDeleteSubscribedClient #-}
|
||||
-- lookup and delete currently subscribed client
|
||||
lookupRemoveSubscribedClient :: EntityId -> SubscribedClients -> STM (Maybe AClient)
|
||||
lookupRemoveSubscribedClient entId (SubscribedClients cs cnt) =
|
||||
TM.lookup entId cs $>>= (`swapTVar` Nothing) >>= mapM (<$ modifyTVar' cnt (subtract 1))
|
||||
|
||||
deleteSubcribedClient :: EntityId -> Client s -> SubscribedClients -> IO ()
|
||||
deleteSubcribedClient entId c (SubscribedClients cs) =
|
||||
removeSubcribedClient :: EntityId -> Client s -> SubscribedClients -> IO ()
|
||||
removeSubcribedClient entId c (SubscribedClients cs cnt) =
|
||||
-- lookup of the subscribed client TVar can be in separate transaction,
|
||||
-- as long as the client is read in the same transaction -
|
||||
-- it prevents removing the next subscribed client and also avoids STM contention for the Map.
|
||||
TM.lookupIO entId cs >>=
|
||||
mapM_ (\c' -> atomically $ whenM (maybe False (sameClientId c) <$> readTVar c') $ writeTVar c' Nothing)
|
||||
TM.lookupIO entId cs >>= mapM_ (\cv -> atomically $ whenM (sameClient c cv) $ remove cv)
|
||||
where
|
||||
remove cv = do
|
||||
writeTVar cv Nothing
|
||||
modifyTVar' cnt (subtract 1)
|
||||
|
||||
sameClientId :: Client s -> AClient -> Bool
|
||||
sameClientId Client {clientId} ac = clientId == clientId' ac
|
||||
{-# INLINE sameClientId #-}
|
||||
|
||||
sameClient :: Client s -> TVar (Maybe AClient) -> STM Bool
|
||||
sameClient c cv = maybe False (sameClientId c) <$> readTVar cv
|
||||
{-# INLINE sameClient #-}
|
||||
|
||||
newtype ProxyAgent = ProxyAgent
|
||||
{ smpAgent :: SMPClientAgent
|
||||
}
|
||||
@@ -421,7 +436,7 @@ deleteServerClient cId Server {clients} = atomically $ modifyTVar' (serverClient
|
||||
newServerSubscribers :: IO ServerSubscribers
|
||||
newServerSubscribers = do
|
||||
subQ <- newTQueueIO
|
||||
queueSubscribers <- SubscribedClients <$> TM.emptyIO
|
||||
queueSubscribers <- SubscribedClients <$> TM.emptyIO <*> newTVarIO 0
|
||||
subClients <- newTVarIO IS.empty
|
||||
pendingEvents <- newTVarIO IM.empty
|
||||
pure ServerSubscribers {subQ, queueSubscribers, subClients, pendingEvents}
|
||||
|
||||
@@ -33,13 +33,22 @@ data RealTimeMetrics = RealTimeMetrics
|
||||
{ socketStats :: [(ServiceName, SocketStats)],
|
||||
threadsCount :: Int,
|
||||
clientsCount :: Int,
|
||||
smpSubsCount :: Int,
|
||||
smpSubClientsCount :: Int,
|
||||
ntfSubsCount :: Int,
|
||||
ntfSubClientsCount :: Int,
|
||||
smpSubs :: RTSubscriberMetrics,
|
||||
ntfSubs :: RTSubscriberMetrics,
|
||||
|
||||
-- smpSubsCount :: Int,
|
||||
-- smpSubClientsCount :: Int,
|
||||
-- ntfSubsCount :: Int,
|
||||
-- ntfSubClientsCount :: Int,
|
||||
loadedCounts :: LoadedQueueCounts
|
||||
}
|
||||
|
||||
data RTSubscriberMetrics = RTSubscriberMetrics
|
||||
{ subsCount :: Int,
|
||||
subVarsCount :: Int,
|
||||
subClientsCount :: Int
|
||||
}
|
||||
|
||||
{-# FOURMOLU_DISABLE\n#-}
|
||||
prometheusMetrics :: ServerMetrics -> RealTimeMetrics -> UTCTime -> Text
|
||||
prometheusMetrics sm rtm ts =
|
||||
@@ -50,10 +59,12 @@ prometheusMetrics sm rtm ts =
|
||||
{ socketStats,
|
||||
threadsCount,
|
||||
clientsCount,
|
||||
smpSubsCount,
|
||||
smpSubClientsCount,
|
||||
ntfSubsCount,
|
||||
ntfSubClientsCount,
|
||||
smpSubs,
|
||||
ntfSubs,
|
||||
-- smpSubsCount,
|
||||
-- smpSubClientsCount,
|
||||
-- ntfSubsCount,
|
||||
-- ntfSubClientsCount,
|
||||
loadedCounts
|
||||
} = rtm
|
||||
ServerStatsData
|
||||
@@ -367,21 +378,29 @@ prometheusMetrics sm rtm ts =
|
||||
\# TYPE simplex_smp_clients_total gauge\n\
|
||||
\simplex_smp_clients_total " <> mshow clientsCount <> "\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_subscribtion_total Total subscriptions\n\
|
||||
\# HELP simplex_smp_subscribtion_total Total SMP subscriptions\n\
|
||||
\# TYPE simplex_smp_subscribtion_total gauge\n\
|
||||
\simplex_smp_subscribtion_total " <> mshow smpSubsCount <> "\n# smpSubs\n\
|
||||
\simplex_smp_subscribtion_total " <> mshow (subsCount smpSubs) <> "\n# smp.subsCount\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_subscribtion_clients_total Subscribed clients, first counting method\n\
|
||||
\# HELP simplex_smp_subscribtion_vars_total Total SMP subscription vars\n\
|
||||
\# TYPE simplex_smp_subscribtion_vars_total gauge\n\
|
||||
\simplex_smp_subscribtion_vars_total " <> mshow (subVarsCount smpSubs) <> "\n# smp.subVarsCount\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_subscribtion_clients_total Subscribed clients\n\
|
||||
\# TYPE simplex_smp_subscribtion_clients_total gauge\n\
|
||||
\simplex_smp_subscribtion_clients_total " <> mshow smpSubClientsCount <> "\n# smpSubClients\n\
|
||||
\simplex_smp_subscribtion_clients_total " <> mshow (subClientsCount smpSubs) <> "\n# smp.subClientsCount\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_subscription_ntf_total Total notification subscripbtions (from ntf server)\n\
|
||||
\# TYPE simplex_smp_subscription_ntf_total gauge\n\
|
||||
\simplex_smp_subscription_ntf_total " <> mshow ntfSubsCount <> "\n# ntfSubs\n\
|
||||
\simplex_smp_subscription_ntf_total " <> mshow (subsCount ntfSubs) <> "\n# ntf.subsCount\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_subscription_ntf_clients_total Total subscribed NTF servers, first counting method\n\
|
||||
\# HELP simplex_smp_subscription_ntf_vars_total Total notification subscripbtion vars (from ntf server)\n\
|
||||
\# TYPE simplex_smp_subscription_ntf_vars_total gauge\n\
|
||||
\simplex_smp_subscription_ntf_vars_total " <> mshow (subVarsCount ntfSubs) <> "\n# ntf.subVarsCount\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_subscription_ntf_clients_total Total subscribed NTF servers\n\
|
||||
\# TYPE simplex_smp_subscription_ntf_clients_total gauge\n\
|
||||
\simplex_smp_subscription_ntf_clients_total " <> mshow ntfSubClientsCount <> "\n# ntfSubClients\n\
|
||||
\simplex_smp_subscription_ntf_clients_total " <> mshow (subClientsCount ntfSubs) <> "\n# ntf.subClientsCount\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_loaded_queues_queue_count Total loaded queues count (all queues for memory/journal storage)\n\
|
||||
\# TYPE simplex_smp_loaded_queues_queue_count gauge\n\
|
||||
|
||||
Reference in New Issue
Block a user