mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-29 08:00:09 +00:00
smp server: do not include previously blocked queues in stats, prevent leak of client threads (#1593)
This commit is contained in:
@@ -989,13 +989,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
then liftIO $ hPutStrLn h $ "error: reached limit of " <> show quota <> " queues blocked daily"
|
||||
else do
|
||||
r <- liftIO $ runExceptT $ do
|
||||
q <- ExceptT $ getQueue st SSender sId
|
||||
ExceptT $ blockQueue (queueStore st) q info
|
||||
(q, QueueRec {status}) <- ExceptT $ getQueueRec st SSender sId
|
||||
when (status == EntityActive) $ ExceptT $ blockQueue (queueStore st) q info
|
||||
pure status
|
||||
case r of
|
||||
Left e -> liftIO $ hPutStrLn h $ "error: " <> show e
|
||||
Right () -> do
|
||||
Right EntityActive -> do
|
||||
incStat $ qBlocked stats
|
||||
liftIO $ hPutStrLn h "ok"
|
||||
liftIO $ hPutStrLn h "ok, queue blocked"
|
||||
Right status -> liftIO $ hPutStrLn h $ "ok, already inactive: " <> show status
|
||||
CPUnblock sId -> withUserRole $ unliftIO u $ do
|
||||
st <- asks msgStore
|
||||
r <- liftIO $ runExceptT $ do
|
||||
@@ -1003,7 +1005,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
ExceptT $ unblockQueue (queueStore st) q
|
||||
liftIO $ hPutStrLn h $ case r of
|
||||
Left e -> "error: " <> show e
|
||||
Right () -> "ok"
|
||||
Right () -> "ok, queue unblocked"
|
||||
CPSave -> withAdminRole $ withLock' (savingLock srv) "control" $ do
|
||||
hPutStrLn h "saving server state..."
|
||||
unliftIO u $ saveServer False
|
||||
|
||||
@@ -49,7 +49,7 @@ import Network.Socket
|
||||
import qualified Network.TLS as T
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Transport.Shared
|
||||
import Simplex.Messaging.Util (catchAll_, labelMyThread, tshow)
|
||||
import Simplex.Messaging.Util (catchAll_, labelMyThread, tshow, unlessM)
|
||||
import System.Exit (exitFailure)
|
||||
import System.IO.Error (tryIOError)
|
||||
import System.Mem.Weak (Weak, deRefWeak)
|
||||
@@ -172,12 +172,13 @@ runTCPServerSocket (accepted, gracefullyClosed, clients) started getSocket serve
|
||||
E.bracket getSocket (closeServer started clients) $ \sock ->
|
||||
forever . E.bracketOnError (safeAccept sock) (close . fst) $ \(conn, _peer) -> do
|
||||
cId <- atomically $ stateTVar accepted $ \cId -> let cId' = cId + 1 in cId' `seq` (cId', cId')
|
||||
closed <- newTVarIO False
|
||||
let closeConn _ = do
|
||||
atomically $ modifyTVar' clients $ IM.delete cId
|
||||
atomically $ writeTVar closed True >> modifyTVar' clients (IM.delete cId)
|
||||
gracefulClose conn 5000 `catchAll_` pure () -- catchAll_ is needed here in case the connection was closed earlier
|
||||
atomically $ modifyTVar' gracefullyClosed (+ 1)
|
||||
tId <- mkWeakThreadId =<< server conn `forkFinally` closeConn
|
||||
atomically $ modifyTVar' clients $ IM.insert cId tId
|
||||
atomically $ unlessM (readTVar closed) $ modifyTVar' clients $ IM.insert cId tId
|
||||
|
||||
-- | Recover from errors in `accept` whenever it is safe.
|
||||
-- Some errors are safe to ignore, while blindly restaring `accept` may trigger a busy loop.
|
||||
|
||||
Reference in New Issue
Block a user