From efe7ce27e7855d22df65b764f85a1eb89a37942d Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Tue, 26 Dec 2023 22:20:12 +0200 Subject: [PATCH] control: add delete command (#933) * control: add delete command * logDeleteQueue only when found * use default StrEncoding for CPDelete arg * move stats update from main transaction * use size * stabilize AUTH timing tests * more iterations --------- Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> --- src/Simplex/Messaging/Server.hs | 20 +++++++++++-- src/Simplex/Messaging/Server/Control.hs | 4 +++ src/Simplex/Messaging/Server/MsgStore/STM.hs | 4 +++ tests/ServerTests.hs | 30 +++++++++++--------- tests/Test.hs | 9 +++--- 5 files changed, 47 insertions(+), 20 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 7417c948d..9a9ddc482 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -83,7 +83,7 @@ import Simplex.Messaging.Transport.Buffer (trimCR) import Simplex.Messaging.Transport.Server import Simplex.Messaging.Util import System.Exit (exitFailure) -import System.IO (hPutStrLn, hSetNewlineMode, universalNewlineMode) +import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) import UnliftIO (timeout) import UnliftIO.Concurrent @@ -304,7 +304,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do where putStat :: Show a => String -> TVar a -> IO () putStat label var = readTVarIO var >>= \v -> hPutStrLn h $ label <> ": " <> show v - CPStatsRTS -> getRTSStats >>= hPutStrLn h . show + CPStatsRTS -> getRTSStats >>= hPrint h CPThreads -> do #if MIN_VERSION_base(4,18,0) threads <- liftIO listThreads @@ -338,11 +338,25 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do #else hPutStrLn h "Not available on GHC 8.10" #endif + CPDelete queueId -> unliftIO u $ do + st <- asks queueStore + ms <- asks msgStore + stats <- asks serverStats + r <- atomically $ + deleteQueue st queueId $>>= \() -> + Right <$> delMsgQueueSize ms queueId + case r of + Left e -> liftIO . hPutStrLn h $ "error: " <> show e + Right numDeleted -> do + withLog (`logDeleteQueue` queueId) + atomically $ modifyTVar' (qDeleted stats) (+ 1) + atomically $ modifyTVar' (qCount stats) (subtract 1) + liftIO . hPutStrLn h $ "ok, " <> show numDeleted <> " messages deleted" CPSave -> withLock (savingLock srv) "control" $ do hPutStrLn h "saving server state..." unliftIO u $ saveServer True hPutStrLn h "server state saved!" - CPHelp -> hPutStrLn h "commands: stats, stats-rts, clients, sockets, socket-threads, threads, save, help, quit" + CPHelp -> hPutStrLn h "commands: stats, stats-rts, clients, sockets, socket-threads, threads, delete, save, help, quit" CPQuit -> pure () CPSkip -> pure () diff --git a/src/Simplex/Messaging/Server/Control.hs b/src/Simplex/Messaging/Server/Control.hs index 2566a6d80..077c8a340 100644 --- a/src/Simplex/Messaging/Server/Control.hs +++ b/src/Simplex/Messaging/Server/Control.hs @@ -4,6 +4,7 @@ module Simplex.Messaging.Server.Control where import qualified Data.Attoparsec.ByteString.Char8 as A +import Data.ByteString (ByteString) import Simplex.Messaging.Encoding.String data ControlProtocol @@ -15,6 +16,7 @@ data ControlProtocol | CPThreads | CPSockets | CPSocketThreads + | CPDelete ByteString | CPSave | CPHelp | CPQuit @@ -30,6 +32,7 @@ instance StrEncoding ControlProtocol where CPThreads -> "threads" CPSockets -> "sockets" CPSocketThreads -> "socket-threads" + CPDelete bs -> "delete " <> strEncode bs CPSave -> "save" CPHelp -> "help" CPQuit -> "quit" @@ -44,6 +47,7 @@ instance StrEncoding ControlProtocol where "threads" -> pure CPThreads "sockets" -> pure CPSockets "socket-threads" -> pure CPSocketThreads + "delete" -> CPDelete <$> (A.space *> strP) "save" -> pure CPSave "help" -> pure CPHelp "quit" -> pure CPQuit diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 74f204103..f5158ecc8 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -12,6 +12,7 @@ module Simplex.Messaging.Server.MsgStore.STM newMsgStore, getMsgQueue, delMsgQueue, + delMsgQueueSize, flushMsgQueue, snapshotMsgQueue, writeMsg, @@ -60,6 +61,9 @@ getMsgQueue st rId quota = maybe newQ pure =<< TM.lookup rId st delMsgQueue :: STMMsgStore -> RecipientId -> STM () delMsgQueue st rId = TM.delete rId st +delMsgQueueSize :: STMMsgStore -> RecipientId -> STM Int +delMsgQueueSize st rId = TM.lookupDelete rId st >>= maybe (pure 0) (\MsgQueue {size} -> readTVar size) + flushMsgQueue :: STMMsgStore -> RecipientId -> STM [Message] flushMsgQueue st rId = TM.lookupDelete rId st >>= maybe (pure []) (flushTQueue . msgQueue) diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index cd9ef27a7..607c9ce9e 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -874,13 +874,13 @@ testTiming (ATransport t) = where timingTests :: [(Int, Int, Int)] timingTests = - [ (32, 32, 200), - (32, 57, 100), - (57, 32, 200), - (57, 57, 100) + [ (32, 32, 300), + (32, 57, 150), + (57, 32, 300), + (57, 57, 150) ] timeRepeat n = fmap fst . timeItT . forM_ (replicate n ()) . const - similarTime t1 t2 = abs (t2 / t1 - 1) < 0.25 `shouldBe` True + similarTime t1 t2 = abs (t2 / t1 - 1) < 0.25 testSameTiming :: Transport c => THandle c -> THandle c -> (Int, Int, Int) -> Expectation testSameTiming rh sh (goodKeySize, badKeySize, n) = do g <- C.newRandom @@ -907,20 +907,24 @@ testTiming (ATransport t) = 57 -> atomically $ C.generateSignatureKeyPair C.SEd448 g _ -> error "unsupported key size" runTimingTest h badKey qId cmd = do + threadDelay 100000 timeWrongKey <- timeRepeat n $ do Resp "cdab" _ (ERR AUTH) <- signSendRecv h badKey ("cdab", qId, cmd) return () + threadDelay 100000 timeNoQueue <- timeRepeat n $ do Resp "dabc" _ (ERR AUTH) <- signSendRecv h badKey ("dabc", "1234", cmd) return () - -- (putStrLn . unwords . map show) - -- [ fromIntegral goodKeySize, - -- fromIntegral badKeySize, - -- timeWrongKey, - -- timeNoQueue, - -- timeWrongKey / timeNoQueue - 1 - -- ] - similarTime timeNoQueue timeWrongKey + let ok = similarTime timeNoQueue timeWrongKey + unless ok $ + (putStrLn . unwords . map show) + [ fromIntegral goodKeySize, + fromIntegral badKeySize, + timeWrongKey, + timeNoQueue, + abs (timeWrongKey / timeNoQueue - 1) + ] + ok `shouldBe` True testMessageNotifications :: ATransport -> Spec testMessageNotifications (ATransport t) = diff --git a/tests/Test.hs b/tests/Test.hs index 926065354..b7fc3e9cb 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -1,5 +1,5 @@ -{-# LANGUAGE TypeApplications #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE TypeApplications #-} import AgentTests (agentTests) import AgentTests.SchemaDump (schemaDumpTest) @@ -68,8 +68,9 @@ main = do eventuallyRemove :: FilePath -> Int -> IO () eventuallyRemove path retries = case retries of 0 -> action - n -> action `E.catch` \ioe@IOError {ioe_type, ioe_filename} -> case ioe_type of - IOException.UnsatisfiedConstraints | ioe_filename == Just path -> threadDelay 1000000 >> eventuallyRemove path (n - 1) - _ -> E.throwIO ioe + n -> + action `E.catch` \ioe@IOError {ioe_type, ioe_filename} -> case ioe_type of + IOException.UnsatisfiedConstraints | ioe_filename == Just path -> threadDelay 1000000 >> eventuallyRemove path (n - 1) + _ -> E.throwIO ioe where action = removeDirectoryRecursive path