From 4647d69d4b29f06684703917b8e3ff06c8e65a13 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Wed, 30 Jul 2025 11:12:57 +0100 Subject: [PATCH] smp server: do not include previously blocked queues in stats, prevent leak of client threads (#1593) --- src/Simplex/Messaging/Server.hs | 12 +++++++----- src/Simplex/Messaging/Transport/Server.hs | 7 ++++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 801597b48..60c05a7e8 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -989,13 +989,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt then liftIO $ hPutStrLn h $ "error: reached limit of " <> show quota <> " queues blocked daily" else do r <- liftIO $ runExceptT $ do - q <- ExceptT $ getQueue st SSender sId - ExceptT $ blockQueue (queueStore st) q info + (q, QueueRec {status}) <- ExceptT $ getQueueRec st SSender sId + when (status == EntityActive) $ ExceptT $ blockQueue (queueStore st) q info + pure status case r of Left e -> liftIO $ hPutStrLn h $ "error: " <> show e - Right () -> do + Right EntityActive -> do incStat $ qBlocked stats - liftIO $ hPutStrLn h "ok" + liftIO $ hPutStrLn h "ok, queue blocked" + Right status -> liftIO $ hPutStrLn h $ "ok, already inactive: " <> show status CPUnblock sId -> withUserRole $ unliftIO u $ do st <- asks msgStore r <- liftIO $ runExceptT $ do @@ -1003,7 +1005,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt ExceptT $ unblockQueue (queueStore st) q liftIO $ hPutStrLn h $ case r of Left e -> "error: " <> show e - Right () -> "ok" + Right () -> "ok, queue unblocked" CPSave -> withAdminRole $ withLock' (savingLock srv) "control" $ do hPutStrLn h "saving server state..." unliftIO u $ saveServer False diff --git a/src/Simplex/Messaging/Transport/Server.hs b/src/Simplex/Messaging/Transport/Server.hs index 89a3bf063..00b94ddc5 100644 --- a/src/Simplex/Messaging/Transport/Server.hs +++ b/src/Simplex/Messaging/Transport/Server.hs @@ -49,7 +49,7 @@ import Network.Socket import qualified Network.TLS as T import Simplex.Messaging.Transport import Simplex.Messaging.Transport.Shared -import Simplex.Messaging.Util (catchAll_, labelMyThread, tshow) +import Simplex.Messaging.Util (catchAll_, labelMyThread, tshow, unlessM) import System.Exit (exitFailure) import System.IO.Error (tryIOError) import System.Mem.Weak (Weak, deRefWeak) @@ -172,12 +172,13 @@ runTCPServerSocket (accepted, gracefullyClosed, clients) started getSocket serve E.bracket getSocket (closeServer started clients) $ \sock -> forever . E.bracketOnError (safeAccept sock) (close . fst) $ \(conn, _peer) -> do cId <- atomically $ stateTVar accepted $ \cId -> let cId' = cId + 1 in cId' `seq` (cId', cId') + closed <- newTVarIO False let closeConn _ = do - atomically $ modifyTVar' clients $ IM.delete cId + atomically $ writeTVar closed True >> modifyTVar' clients (IM.delete cId) gracefulClose conn 5000 `catchAll_` pure () -- catchAll_ is needed here in case the connection was closed earlier atomically $ modifyTVar' gracefullyClosed (+ 1) tId <- mkWeakThreadId =<< server conn `forkFinally` closeConn - atomically $ modifyTVar' clients $ IM.insert cId tId + atomically $ unlessM (readTVar closed) $ modifyTVar' clients $ IM.insert cId tId -- | Recover from errors in `accept` whenever it is safe. -- Some errors are safe to ignore, while blindly restaring `accept` may trigger a busy loop.