diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index a5e58eeb9..7df930378 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -260,9 +260,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt upsertSubscribedClient qId c queueSubscribers | otherwise = do removeWhenNoSubs c - lookupDeleteSubscribedClient qId queueSubscribers + lookupRemoveSubscribedClient qId queueSubscribers -- do not insert client if it is already disconnected, but send END to any other client - updateSubDisconnected = lookupDeleteSubscribedClient qId queueSubscribers + updateSubDisconnected = lookupRemoveSubscribedClient qId queueSubscribers clientToBeNotified ac@(AClient _ _ Client {clientId, connected}) | clntId == clientId = pure Nothing | otherwise = (\yes -> if yes then Just ((qId, subEvt), ac) else Nothing) <$> readTVar connected @@ -579,12 +579,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt let threadsCount = 0 #endif clientsCount <- IM.size <$> getServerClients srv - smpSubsCount <- M.size <$> getSubscribedClients (queueSubscribers subscribers) - smpSubClientsCount <- IS.size <$> readTVarIO (subClients subscribers) - ntfSubsCount <- M.size <$> getSubscribedClients (queueSubscribers ntfSubscribers) - ntfSubClientsCount <- IS.size <$> readTVarIO (subClients ntfSubscribers) + smpSubs <- getSubscribersMetrics subscribers + ntfSubs <- getSubscribersMetrics ntfSubscribers loadedCounts <- loadedQueueCounts ms - pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount, loadedCounts} + pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubs, ntfSubs, loadedCounts} + where + getSubscribersMetrics ServerSubscribers {queueSubscribers, subClients} = do + (storedSubs, subsCount) <- getSubscribedClients queueSubscribers + subClientsCount <- IS.size <$> readTVarIO subClients + pure RTSubscriberMetrics {subsCount, subVarsCount = M.size storedSubs, subClientsCount} runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M () runClient signKey tp h = do @@ -779,9 +782,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt where putActiveClientsInfo :: String -> SubscribedClients -> Bool -> IO () putActiveClientsInfo protoName clients showIds = do - activeSubs <- getSubscribedClients clients - hPutStrLn h $ protoName <> " subscriptions: " <> show (M.size activeSubs) - clnts <- countSubClients activeSubs + (storedSubs, subsCount) <- getSubscribedClients clients + hPutStrLn h $ protoName <> " subscription vars: " <> show (M.size storedSubs) + hPutStrLn h $ protoName <> " subscriptions: " <> show subsCount + clnts <- countSubClients storedSubs hPutStrLn h $ protoName <> " subscribed clients: " <> show (IS.size clnts) <> (if showIds then " " <> show (IS.toList clnts) else "") where countSubClients :: M.Map QueueId (TVar (Maybe AClient)) -> IO IS.IntSet @@ -927,7 +931,7 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connecte where updateSubscribers :: M.Map QueueId a -> ServerSubscribers -> IO () updateSubscribers subs ServerSubscribers {queueSubscribers, subClients} = do - mapM_ (\qId -> deleteSubcribedClient qId c queueSubscribers) (M.keys subs) + mapM_ (\qId -> removeSubcribedClient qId c queueSubscribers) (M.keys subs) atomically $ modifyTVar' subClients $ IS.delete clientId cancelSub :: Sub -> IO () @@ -1615,8 +1619,8 @@ client -- lookup can be outside of STM transaction, -- as long as the check that it is the same client is inside. getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame - deliverIfSame rc' = time "deliver" . atomically $ - whenM (maybe False (sameClientId rc) <$> readTVar rc') $ + deliverIfSame rcv = time "deliver" . atomically $ + whenM (sameClient rc rcv) $ tryTakeTMVar delivered >>= \case Just _ -> pure () -- if a message was already delivered, should not deliver more Nothing -> do diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index f2413c418..d1d7ac063 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -49,9 +49,10 @@ module Simplex.Messaging.Server.Env.STM getSubscribedClients, getSubscribedClient, upsertSubscribedClient, - lookupDeleteSubscribedClient, - deleteSubcribedClient, + lookupRemoveSubscribedClient, + removeSubcribedClient, sameClientId, + sameClient, clientId', newSubscription, newProhibitedSub, @@ -313,43 +314,57 @@ data ServerSubscribers = ServerSubscribers -- any STM transaction that reads subscribed client will re-evaluate in this case. -- The subscriptions that were made at any point are not removed - -- this is a better trade-off with intermittently connected mobile clients. -newtype SubscribedClients = SubscribedClients (TMap EntityId (TVar (Maybe AClient))) +data SubscribedClients = SubscribedClients {subscribedClients :: TMap EntityId (TVar (Maybe AClient)), subcribedCount :: TVar Int} -getSubscribedClients :: SubscribedClients -> IO (Map EntityId (TVar (Maybe AClient))) -getSubscribedClients (SubscribedClients cs) = readTVarIO cs -{-# INLINE getSubscribedClients #-} +getSubscribedClients :: SubscribedClients -> IO (Map EntityId (TVar (Maybe AClient)), Int) +getSubscribedClients (SubscribedClients cs cnt) = (,) <$> readTVarIO cs <*> readTVarIO cnt getSubscribedClient :: EntityId -> SubscribedClients -> IO (Maybe (TVar (Maybe AClient))) -getSubscribedClient entId (SubscribedClients cs) = TM.lookupIO entId cs +getSubscribedClient entId (SubscribedClients cs _) = TM.lookupIO entId cs {-# INLINE getSubscribedClient #-} -- insert subscribed and current client, return previously subscribed client if it is different upsertSubscribedClient :: EntityId -> AClient -> SubscribedClients -> STM (Maybe AClient) -upsertSubscribedClient entId ac@(AClient _ _ c) (SubscribedClients cs) = +upsertSubscribedClient entId ac@(AClient _ _ c) (SubscribedClients cs cnt) = TM.lookup entId cs >>= \case - Nothing -> Nothing <$ TM.insertM entId (newTVar $ Just ac) cs + Nothing -> do + TM.insertM entId (newTVar $ Just ac) cs + modifyTVar' cnt (+ 1) + pure Nothing Just cv -> readTVar cv >>= \case - Just c' | sameClientId c c' -> pure Nothing - c_ -> c_ <$ writeTVar cv (Just ac) + Just c' + | sameClientId c c' -> pure Nothing + | otherwise -> Just c' <$ writeTVar cv (Just ac) + Nothing -> do + writeTVar cv (Just ac) + modifyTVar' cnt (+ 1) + pure Nothing --- insert delete subscribed client -lookupDeleteSubscribedClient :: EntityId -> SubscribedClients -> STM (Maybe AClient) -lookupDeleteSubscribedClient entId (SubscribedClients cs) = TM.lookupDelete entId cs $>>= readTVar -{-# INLINE lookupDeleteSubscribedClient #-} +-- lookup and delete currently subscribed client +lookupRemoveSubscribedClient :: EntityId -> SubscribedClients -> STM (Maybe AClient) +lookupRemoveSubscribedClient entId (SubscribedClients cs cnt) = + TM.lookup entId cs $>>= (`swapTVar` Nothing) >>= mapM (<$ modifyTVar' cnt (subtract 1)) -deleteSubcribedClient :: EntityId -> Client s -> SubscribedClients -> IO () -deleteSubcribedClient entId c (SubscribedClients cs) = +removeSubcribedClient :: EntityId -> Client s -> SubscribedClients -> IO () +removeSubcribedClient entId c (SubscribedClients cs cnt) = -- lookup of the subscribed client TVar can be in separate transaction, -- as long as the client is read in the same transaction - -- it prevents removing the next subscribed client and also avoids STM contention for the Map. - TM.lookupIO entId cs >>= - mapM_ (\c' -> atomically $ whenM (maybe False (sameClientId c) <$> readTVar c') $ writeTVar c' Nothing) + TM.lookupIO entId cs >>= mapM_ (\cv -> atomically $ whenM (sameClient c cv) $ remove cv) + where + remove cv = do + writeTVar cv Nothing + modifyTVar' cnt (subtract 1) sameClientId :: Client s -> AClient -> Bool sameClientId Client {clientId} ac = clientId == clientId' ac {-# INLINE sameClientId #-} +sameClient :: Client s -> TVar (Maybe AClient) -> STM Bool +sameClient c cv = maybe False (sameClientId c) <$> readTVar cv +{-# INLINE sameClient #-} + newtype ProxyAgent = ProxyAgent { smpAgent :: SMPClientAgent } @@ -421,7 +436,7 @@ deleteServerClient cId Server {clients} = atomically $ modifyTVar' (serverClient newServerSubscribers :: IO ServerSubscribers newServerSubscribers = do subQ <- newTQueueIO - queueSubscribers <- SubscribedClients <$> TM.emptyIO + queueSubscribers <- SubscribedClients <$> TM.emptyIO <*> newTVarIO 0 subClients <- newTVarIO IS.empty pendingEvents <- newTVarIO IM.empty pure ServerSubscribers {subQ, queueSubscribers, subClients, pendingEvents} diff --git a/src/Simplex/Messaging/Server/Prometheus.hs b/src/Simplex/Messaging/Server/Prometheus.hs index 39dbc854f..194313227 100644 --- a/src/Simplex/Messaging/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Server/Prometheus.hs @@ -33,13 +33,22 @@ data RealTimeMetrics = RealTimeMetrics { socketStats :: [(ServiceName, SocketStats)], threadsCount :: Int, clientsCount :: Int, - smpSubsCount :: Int, - smpSubClientsCount :: Int, - ntfSubsCount :: Int, - ntfSubClientsCount :: Int, + smpSubs :: RTSubscriberMetrics, + ntfSubs :: RTSubscriberMetrics, + + -- smpSubsCount :: Int, + -- smpSubClientsCount :: Int, + -- ntfSubsCount :: Int, + -- ntfSubClientsCount :: Int, loadedCounts :: LoadedQueueCounts } +data RTSubscriberMetrics = RTSubscriberMetrics + { subsCount :: Int, + subVarsCount :: Int, + subClientsCount :: Int + } + {-# FOURMOLU_DISABLE\n#-} prometheusMetrics :: ServerMetrics -> RealTimeMetrics -> UTCTime -> Text prometheusMetrics sm rtm ts = @@ -50,10 +59,12 @@ prometheusMetrics sm rtm ts = { socketStats, threadsCount, clientsCount, - smpSubsCount, - smpSubClientsCount, - ntfSubsCount, - ntfSubClientsCount, + smpSubs, + ntfSubs, + -- smpSubsCount, + -- smpSubClientsCount, + -- ntfSubsCount, + -- ntfSubClientsCount, loadedCounts } = rtm ServerStatsData @@ -367,21 +378,29 @@ prometheusMetrics sm rtm ts = \# TYPE simplex_smp_clients_total gauge\n\ \simplex_smp_clients_total " <> mshow clientsCount <> "\n\ \\n\ - \# HELP simplex_smp_subscribtion_total Total subscriptions\n\ + \# HELP simplex_smp_subscribtion_total Total SMP subscriptions\n\ \# TYPE simplex_smp_subscribtion_total gauge\n\ - \simplex_smp_subscribtion_total " <> mshow smpSubsCount <> "\n# smpSubs\n\ + \simplex_smp_subscribtion_total " <> mshow (subsCount smpSubs) <> "\n# smp.subsCount\n\ \\n\ - \# HELP simplex_smp_subscribtion_clients_total Subscribed clients, first counting method\n\ + \# HELP simplex_smp_subscribtion_vars_total Total SMP subscription vars\n\ + \# TYPE simplex_smp_subscribtion_vars_total gauge\n\ + \simplex_smp_subscribtion_vars_total " <> mshow (subVarsCount smpSubs) <> "\n# smp.subVarsCount\n\ + \\n\ + \# HELP simplex_smp_subscribtion_clients_total Subscribed clients\n\ \# TYPE simplex_smp_subscribtion_clients_total gauge\n\ - \simplex_smp_subscribtion_clients_total " <> mshow smpSubClientsCount <> "\n# smpSubClients\n\ + \simplex_smp_subscribtion_clients_total " <> mshow (subClientsCount smpSubs) <> "\n# smp.subClientsCount\n\ \\n\ \# HELP simplex_smp_subscription_ntf_total Total notification subscripbtions (from ntf server)\n\ \# TYPE simplex_smp_subscription_ntf_total gauge\n\ - \simplex_smp_subscription_ntf_total " <> mshow ntfSubsCount <> "\n# ntfSubs\n\ + \simplex_smp_subscription_ntf_total " <> mshow (subsCount ntfSubs) <> "\n# ntf.subsCount\n\ \\n\ - \# HELP simplex_smp_subscription_ntf_clients_total Total subscribed NTF servers, first counting method\n\ + \# HELP simplex_smp_subscription_ntf_vars_total Total notification subscripbtion vars (from ntf server)\n\ + \# TYPE simplex_smp_subscription_ntf_vars_total gauge\n\ + \simplex_smp_subscription_ntf_vars_total " <> mshow (subVarsCount ntfSubs) <> "\n# ntf.subVarsCount\n\ + \\n\ + \# HELP simplex_smp_subscription_ntf_clients_total Total subscribed NTF servers\n\ \# TYPE simplex_smp_subscription_ntf_clients_total gauge\n\ - \simplex_smp_subscription_ntf_clients_total " <> mshow ntfSubClientsCount <> "\n# ntfSubClients\n\ + \simplex_smp_subscription_ntf_clients_total " <> mshow (subClientsCount ntfSubs) <> "\n# ntf.subClientsCount\n\ \\n\ \# HELP simplex_smp_loaded_queues_queue_count Total loaded queues count (all queues for memory/journal storage)\n\ \# TYPE simplex_smp_loaded_queues_queue_count gauge\n\