From 8d5d84b061875efb0bf02895eafd8521fac95c03 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 25 Aug 2024 17:12:38 +0100 Subject: [PATCH] smp server: count queued and sent END events --- src/Simplex/Messaging/Server.hs | 22 +++++++++++++++------- src/Simplex/Messaging/Server/Stats.hs | 18 ++++++++++++++++++ 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index a70995d5e..d9eecfe48 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -170,9 +170,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do serverThread s label subQ subs clientSubs unsub = do labelMyThread label cls <- asks clients + stats <- asks serverStats forever $ atomically (updateSubscribers cls) - $>>= endPreviousSubscriptions + $>>= endPreviousSubscriptions stats >>= liftIO . mapM_ unsub where updateSubscribers :: TVar (IM.IntMap (Maybe Client)) -> STM (Maybe (QueueId, Client)) @@ -189,10 +190,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do yes <- readTVar $ connected c' pure $ if yes then Just (qId, c') else Nothing updateSub qId (subs s) $>>= clientToBeNotified - endPreviousSubscriptions :: (QueueId, Client) -> M (Maybe s) - endPreviousSubscriptions (qId, c) = do - forkClient c (label <> ".endPreviousSubscriptions") $ + endPreviousSubscriptions :: ServerStats -> (QueueId, Client) -> M (Maybe s) + endPreviousSubscriptions stats (qId, c) = do + forkClient c (label <> ".endPreviousSubscriptions") $ do atomically $ writeTBQueue (sndQ c) [(CorrId "", qId, END)] + incStat $ qSubEnd stats atomically $ TM.lookupDelete qId (clientSubs c) receiveFromProxyAgent :: ProxyAgent -> M () @@ -442,6 +444,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do putStat "qSubNoMsg" qSubNoMsg subs <- (,,) <$> getStat qSubAuth <*> getStat qSubDuplicate <*> getStat qSubProhibited hPutStrLn h $ "other SUB events (auth, duplicate, prohibited): " <> show subs + putStat "qSubEnd" qSubEnd + putStat "qSubEndSent" qSubEndSent putStat "msgSent" msgSent putStat "msgRecv" msgRecv putStat "msgRecvGet" msgRecvGet @@ -631,9 +635,10 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio atomically $ modifyTVar' active $ IM.insert clientId $ Just c s <- asks server expCfg <- asks $ inactiveClientExpiration . config + stats <- asks serverStats th <- newMVar h -- put TH under a fair lock to interleave messages and command responses labelMyThread . B.unpack $ "client $" <> encode sessionId - raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams c s, receive h c] <> disconnectThread_ c expCfg + raceAny_ $ [liftIO $ send th c stats, liftIO $ sendMsg th c, client thParams c s, receive h c] <> disconnectThread_ c expCfg disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)] disconnectThread_ _ _ = [] noSubscriptions c = atomically $ (&&) <$> TM.null (ntfSubscriptions c) <*> (not . hasSubs <$> readTVar (subscriptions c)) @@ -701,8 +706,8 @@ receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiv pure $ Left (corrId, entId, ERR AUTH) write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty -send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO () -send th c@Client {sndQ, msgQ, sessionId} = do +send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> ServerStats -> IO () +send th c@Client {sndQ, msgQ, sessionId} stats = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send" forever $ atomically (readTBQueue sndQ) >>= sendTransmissions where @@ -715,6 +720,9 @@ send th c@Client {sndQ, msgQ, sessionId} = do -- this will reply OK to all SUBs in the first batched transmission, -- to reduce client timeouts. tSend th c ts' + case ts' of + [(_, _, END)] -> incStat $ qSubEndSent stats + _ -> pure () -- After that all messages will be sent in separate transmissions, -- without any client response timeouts, and allowing them to interleave -- with other requests responses. diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index f5b430bb6..b3309b002 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -31,6 +31,8 @@ data ServerStats = ServerStats qSubAuth :: TVar Int, qSubDuplicate :: TVar Int, qSubProhibited :: TVar Int, + qSubEnd :: TVar Int, + qSubEndSent :: TVar Int, ntfCreated :: TVar Int, ntfDeleted :: TVar Int, ntfSub :: TVar Int, @@ -77,6 +79,8 @@ data ServerStatsData = ServerStatsData _qSubAuth :: Int, _qSubDuplicate :: Int, _qSubProhibited :: Int, + _qSubEnd :: Int, + _qSubEndSent :: Int, _ntfCreated :: Int, _ntfDeleted :: Int, _ntfSub :: Int, @@ -125,6 +129,8 @@ newServerStats ts = do qSubAuth <- newTVarIO 0 qSubDuplicate <- newTVarIO 0 qSubProhibited <- newTVarIO 0 + qSubEnd <- newTVarIO 0 + qSubEndSent <- newTVarIO 0 ntfCreated <- newTVarIO 0 ntfDeleted <- newTVarIO 0 ntfSub <- newTVarIO 0 @@ -170,6 +176,8 @@ newServerStats ts = do qSubAuth, qSubDuplicate, qSubProhibited, + qSubEnd, + qSubEndSent, ntfCreated, ntfDeleted, ntfSub, @@ -217,6 +225,8 @@ getServerStatsData s = do _qSubAuth <- readTVarIO $ qSubAuth s _qSubDuplicate <- readTVarIO $ qSubDuplicate s _qSubProhibited <- readTVarIO $ qSubProhibited s + _qSubEnd <- readTVarIO $ qSubEnd s + _qSubEndSent <- readTVarIO $ qSubEndSent s _ntfCreated <- readTVarIO $ ntfCreated s _ntfDeleted <- readTVarIO $ ntfDeleted s _ntfSub <- readTVarIO $ ntfSub s @@ -262,6 +272,8 @@ getServerStatsData s = do _qSubAuth, _qSubDuplicate, _qSubProhibited, + _qSubEnd, + _qSubEndSent, _ntfCreated, _ntfDeleted, _ntfSub, @@ -309,6 +321,8 @@ setServerStats s d = do writeTVar (qSubAuth s) $! _qSubAuth d writeTVar (qSubDuplicate s) $! _qSubDuplicate d writeTVar (qSubProhibited s) $! _qSubProhibited d + writeTVar (qSubEnd s) $! _qSubEnd d + writeTVar (qSubEndSent s) $! _qSubEndSent d writeTVar (ntfCreated s) $! _ntfCreated d writeTVar (ntfDeleted s) $! _ntfDeleted d writeTVar (ntfSub s) $! _ntfSub d @@ -408,6 +422,8 @@ instance StrEncoding ServerStatsData where _qSubAuth <- opt "qSubAuth=" _qSubDuplicate <- opt "qSubDuplicate=" _qSubProhibited <- opt "qSubProhibited=" + _qSubEnd <- pure 0 + _qSubEndSent <- pure 0 _ntfCreated <- opt "ntfCreated=" _ntfDeleted <- opt "ntfDeleted=" _ntfSub <- opt "ntfSub=" @@ -464,6 +480,8 @@ instance StrEncoding ServerStatsData where _qSubAuth, _qSubDuplicate, _qSubProhibited, + _qSubEnd, + _qSubEndSent, _ntfCreated, _ntfDeleted, _ntfSub,