control: add delete command (#933)

* control: add delete command

* logDeleteQueue only when found

* use default StrEncoding for CPDelete arg

* move stats update from main transaction

* use size

* stabilize AUTH timing tests

* more iterations

---------

Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
Alexander Bondarenko
2023-12-26 22:20:12 +02:00
committed by GitHub
parent 577e3cf14d
commit efe7ce27e7
5 changed files with 47 additions and 20 deletions
+17 -3
View File
@@ -83,7 +83,7 @@ import Simplex.Messaging.Transport.Buffer (trimCR)
import Simplex.Messaging.Transport.Server
import Simplex.Messaging.Util
import System.Exit (exitFailure)
import System.IO (hPutStrLn, hSetNewlineMode, universalNewlineMode)
import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode)
import System.Mem.Weak (deRefWeak)
import UnliftIO (timeout)
import UnliftIO.Concurrent
@@ -304,7 +304,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
where
putStat :: Show a => String -> TVar a -> IO ()
putStat label var = readTVarIO var >>= \v -> hPutStrLn h $ label <> ": " <> show v
CPStatsRTS -> getRTSStats >>= hPutStrLn h . show
CPStatsRTS -> getRTSStats >>= hPrint h
CPThreads -> do
#if MIN_VERSION_base(4,18,0)
threads <- liftIO listThreads
@@ -338,11 +338,25 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
#else
hPutStrLn h "Not available on GHC 8.10"
#endif
CPDelete queueId -> unliftIO u $ do
st <- asks queueStore
ms <- asks msgStore
stats <- asks serverStats
r <- atomically $
deleteQueue st queueId $>>= \() ->
Right <$> delMsgQueueSize ms queueId
case r of
Left e -> liftIO . hPutStrLn h $ "error: " <> show e
Right numDeleted -> do
withLog (`logDeleteQueue` queueId)
atomically $ modifyTVar' (qDeleted stats) (+ 1)
atomically $ modifyTVar' (qCount stats) (subtract 1)
liftIO . hPutStrLn h $ "ok, " <> show numDeleted <> " messages deleted"
CPSave -> withLock (savingLock srv) "control" $ do
hPutStrLn h "saving server state..."
unliftIO u $ saveServer True
hPutStrLn h "server state saved!"
CPHelp -> hPutStrLn h "commands: stats, stats-rts, clients, sockets, socket-threads, threads, save, help, quit"
CPHelp -> hPutStrLn h "commands: stats, stats-rts, clients, sockets, socket-threads, threads, delete, save, help, quit"
CPQuit -> pure ()
CPSkip -> pure ()
+4
View File
@@ -4,6 +4,7 @@
module Simplex.Messaging.Server.Control where
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.ByteString (ByteString)
import Simplex.Messaging.Encoding.String
data ControlProtocol
@@ -15,6 +16,7 @@ data ControlProtocol
| CPThreads
| CPSockets
| CPSocketThreads
| CPDelete ByteString
| CPSave
| CPHelp
| CPQuit
@@ -30,6 +32,7 @@ instance StrEncoding ControlProtocol where
CPThreads -> "threads"
CPSockets -> "sockets"
CPSocketThreads -> "socket-threads"
CPDelete bs -> "delete " <> strEncode bs
CPSave -> "save"
CPHelp -> "help"
CPQuit -> "quit"
@@ -44,6 +47,7 @@ instance StrEncoding ControlProtocol where
"threads" -> pure CPThreads
"sockets" -> pure CPSockets
"socket-threads" -> pure CPSocketThreads
"delete" -> CPDelete <$> (A.space *> strP)
"save" -> pure CPSave
"help" -> pure CPHelp
"quit" -> pure CPQuit
@@ -12,6 +12,7 @@ module Simplex.Messaging.Server.MsgStore.STM
newMsgStore,
getMsgQueue,
delMsgQueue,
delMsgQueueSize,
flushMsgQueue,
snapshotMsgQueue,
writeMsg,
@@ -60,6 +61,9 @@ getMsgQueue st rId quota = maybe newQ pure =<< TM.lookup rId st
delMsgQueue :: STMMsgStore -> RecipientId -> STM ()
delMsgQueue st rId = TM.delete rId st
delMsgQueueSize :: STMMsgStore -> RecipientId -> STM Int
delMsgQueueSize st rId = TM.lookupDelete rId st >>= maybe (pure 0) (\MsgQueue {size} -> readTVar size)
flushMsgQueue :: STMMsgStore -> RecipientId -> STM [Message]
flushMsgQueue st rId = TM.lookupDelete rId st >>= maybe (pure []) (flushTQueue . msgQueue)