From 5e31c60c35163f90d48a0683fc9c2cd137929f9a Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Thu, 12 Jan 2023 23:26:21 +0000 Subject: [PATCH 1/3] do not show warning on server restart when restoring "quota" message and quota is exceeded (#603) * do not show warning on server restart when restoring "quota" message and quota is exceeded * complete case * line break --- src/Simplex/Messaging/Server.hs | 5 +++- tests/ServerTests.hs | 18 ++++++++++---- tests/Test.hs | 42 ++++++++++++++++----------------- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 9f5c24a9f..2ae1bfdb5 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -726,7 +726,10 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages full <- atomically $ do q <- getMsgQueue ms rId quota isNothing <$> writeMsg q msg - when full . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message)) + case msg of + Message {} -> + when full . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message)) + MessageQuota {} -> pure () updateMsgV1toV3 QueueRec {rcvDhSecret} RcvMessage {msgId, msgTs, msgFlags, msgBody = EncRcvMsgBody body} = do let nonce = C.cbNonce msgId msgBody <- liftEither . first (msgErr "v1 message decryption") $ C.maxLenBS =<< C.cbDecrypt rcvDhSecret nonce body diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index cf3208775..66315336a 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -52,7 +52,7 @@ serverTests t@(ATransport t') = do describe "Exceeding queue quota" $ testExceedQueueQuota t' describe "Store log" $ testWithStoreLog t describe "Restore messages" $ testRestoreMessages t - describe "Restore messages (v2)" $ testRestoreMessagesV2 t + describe "Restore messages (old / v2)" $ testRestoreMessagesV2 t describe "Timing of AUTH error" $ testTiming t describe "Message notifications" $ testMessageNotifications t describe "Message expiration" $ do @@ -628,10 +628,12 @@ testRestoreMessages at@(ATransport t) = Resp "2" _ OK <- signSendRecv h sKey ("2", sId, _SEND "hello 2") Resp "3" _ OK <- signSendRecv h sKey ("3", sId, _SEND "hello 3") Resp "4" _ OK <- signSendRecv h sKey ("4", sId, _SEND "hello 4") + Resp "5" _ OK <- signSendRecv h sKey ("5", sId, _SEND "hello 5") + Resp "6" _ (ERR QUOTA) <- signSendRecv h sKey ("6", sId, _SEND "hello 6") pure () logSize testStoreLogFile `shouldReturn` 2 - logSize testStoreMsgsFile `shouldReturn` 3 + logSize testStoreMsgsFile `shouldReturn` 5 withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do rId <- readTVarIO recipientId @@ -647,15 +649,21 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 -- the last message is not removed because it was not ACK'd - logSize testStoreMsgsFile `shouldReturn` 1 + logSize testStoreMsgsFile `shouldReturn` 3 withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do rId <- readTVarIO recipientId Just rKey <- readTVarIO recipientKey Just dh <- readTVarIO dhShared + let dec = decryptMsgV3 dh Resp "4" _ (Msg mId4 msg4) <- signSendRecv h rKey ("4", rId, SUB) - Resp "5" _ OK <- signSendRecv h rKey ("5", rId, ACK mId4) - (decryptMsgV3 dh mId4 msg4, Right "hello 4") #== "restored message delivered" + (dec mId4 msg4, Right "hello 4") #== "restored message delivered" + Resp "5" _ (Msg mId5 msg5) <- signSendRecv h rKey ("5", rId, ACK mId4) + (dec mId5 msg5, Right "hello 5") #== "restored message delivered" + Resp "6" _ (Msg mId6 msg6) <- signSendRecv h rKey ("6", rId, ACK mId5) + (dec mId6 msg6, Left "ClientRcvMsgQuota") #== "restored message delivered" + Resp "7" _ OK <- signSendRecv h rKey ("7", rId, ACK mId6) + pure () logSize testStoreLogFile `shouldReturn` 1 logSize testStoreMsgsFile `shouldReturn` 0 diff --git a/tests/Test.hs b/tests/Test.hs index d074d9c86..c710a58df 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -1,8 +1,8 @@ {-# LANGUAGE TypeApplications #-} import AgentTests (agentTests) --- import Control.Logger.Simple import CLITests +import Control.Logger.Simple import CoreTests.CryptoTests import CoreTests.EncodingTests import CoreTests.ProtocolErrorTests @@ -16,26 +16,26 @@ import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive) import System.Environment (setEnv) import Test.Hspec --- logCfg :: LogConfig --- logCfg = LogConfig {lc_file = Nothing, lc_stderr = True} +logCfg :: LogConfig +logCfg = LogConfig {lc_file = Nothing, lc_stderr = True} main :: IO () main = do - -- setLogLevel LogInfo -- LogError - -- withGlobalLogging logCfg $ do - createDirectoryIfMissing False "tests/tmp" - setEnv "APNS_KEY_ID" "H82WD9K9AQ" - setEnv "APNS_KEY_FILE" "./tests/fixtures/AuthKey_H82WD9K9AQ.p8" - hspec $ do - describe "Core tests" $ do - describe "Encoding tests" encodingTests - describe "Protocol error tests" protocolErrorTests - describe "Version range" versionRangeTests - describe "Encryption tests" cryptoTests - describe "Retry interval tests" retryIntervalTests - describe "SMP server via TLS" $ serverTests (transport @TLS) - describe "SMP server via WebSockets" $ serverTests (transport @WS) - describe "Notifications server" $ ntfServerTests (transport @TLS) - describe "SMP client agent" $ agentTests (transport @TLS) - describe "Server CLIs" cliTests - removeDirectoryRecursive "tests/tmp" + setLogLevel LogError -- LogInfo + withGlobalLogging logCfg $ do + createDirectoryIfMissing False "tests/tmp" + setEnv "APNS_KEY_ID" "H82WD9K9AQ" + setEnv "APNS_KEY_FILE" "./tests/fixtures/AuthKey_H82WD9K9AQ.p8" + hspec $ do + describe "Core tests" $ do + describe "Encoding tests" encodingTests + describe "Protocol error tests" protocolErrorTests + describe "Version range" versionRangeTests + describe "Encryption tests" cryptoTests + describe "Retry interval tests" retryIntervalTests + describe "SMP server via TLS" $ serverTests (transport @TLS) + describe "SMP server via WebSockets" $ serverTests (transport @WS) + describe "Notifications server" $ ntfServerTests (transport @TLS) + describe "SMP client agent" $ agentTests (transport @TLS) + describe "Server CLIs" cliTests + removeDirectoryRecursive "tests/tmp" From f47e7bf3c53181643e27e481a32b2a5407e6431e Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Thu, 12 Jan 2023 23:27:35 +0000 Subject: [PATCH 2/3] 4.2.3 --- package.yaml | 2 +- simplexmq.cabal | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package.yaml b/package.yaml index ac4c85600..b5d443eaf 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 4.2.2 +version: 4.2.3 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 91ddf8394..8903f41e1 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 4.2.2 +version: 4.2.3 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and From 56cc2bc71f3afd54e49892fcfbcb56e910f5b66c Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Sat, 14 Jan 2023 13:23:37 +0000 Subject: [PATCH 3/3] additional SMP server stats (#605) * additional SMP server stats * refactor --- src/Simplex/Messaging/Server.hs | 67 ++++++++++++++---- src/Simplex/Messaging/Server/MsgStore/STM.hs | 20 +++--- src/Simplex/Messaging/Server/Stats.hs | 73 +++++++++++++++----- 3 files changed, 118 insertions(+), 42 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 2ae1bfdb5..e5cb7f919 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -104,8 +104,8 @@ type M a = ReaderT Env IO a smpServer :: TMVar Bool -> ServerConfig -> M () smpServer started cfg@ServerConfig {transports, logTLSErrors} = do s <- asks server - restoreServerStats restoreServerMessages + restoreServerStats raceAny_ ( serverThread s subscribedQ subscribers subscriptions cancelSub : serverThread s ntfSubscribedQ notifiers ntfSubscriptions (\_ -> pure ()) : @@ -174,7 +174,7 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) - ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues} <- asks serverStats + ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats let interval = 1000000 * logInterval withFile statsFilePath AppendMode $ \h -> liftIO $ do hSetBuffering h LineBuffering @@ -187,7 +187,31 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do msgSent' <- atomically $ swapTVar msgSent 0 msgRecv' <- atomically $ swapTVar msgRecv 0 ps <- atomically $ periodStatCounts activeQueues ts - hPutStrLn h $ intercalate "," [iso8601Show $ utctDay fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', dayCount ps, weekCount ps, monthCount ps] + msgSentNtf' <- atomically $ swapTVar msgSentNtf 0 + msgRecvNtf' <- atomically $ swapTVar msgRecvNtf 0 + psNtf <- atomically $ periodStatCounts activeQueuesNtf ts + qCount' <- readTVarIO qCount + msgCount' <- readTVarIO msgCount + hPutStrLn h $ + intercalate + "," + [ iso8601Show $ utctDay fromTime', + show qCreated', + show qSecured', + show qDeleted', + show msgSent', + show msgRecv', + dayCount ps, + weekCount ps, + monthCount ps, + show msgSentNtf', + show msgRecvNtf', + dayCount psNtf, + weekCount psNtf, + monthCount psNtf, + show qCount', + show msgCount' + ] threadDelay interval runClient :: Transport c => TProxy c -> c -> M () @@ -387,6 +411,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv withLog (`logCreateById` rId) stats <- asks serverStats atomically $ modifyTVar' (qCreated stats) (+ 1) + atomically $ modifyTVar' (qCount stats) (+ 1) subscribeQueue qr rId $> IDS (qik ids) logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO () @@ -509,12 +534,12 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv q <- getStoreMsgQueue "ACK" queueId case s of Sub {subThread = ProhibitSub} -> do - msgDeleted <- atomically $ tryDelMsg q msgId - when msgDeleted updateStats + deletedMsg_ <- atomically $ tryDelMsg q msgId + mapM_ updateStats deletedMsg_ pure ok _ -> do - (msgDeleted, msg_) <- atomically $ tryDelPeekMsg q msgId - when msgDeleted updateStats + (deletedMsg_, msg_) <- atomically $ tryDelPeekMsg q msgId + mapM_ updateStats deletedMsg_ deliverMessage "ACK" qr queueId sub q msg_ _ -> pure $ err NO_MSG where @@ -525,11 +550,17 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv if msgId == msgId' || B.null msgId then pure $ Just s else putTMVar delivered msgId' $> Nothing - updateStats :: m () - updateStats = do - stats <- asks serverStats - atomically $ modifyTVar' (msgRecv stats) (+ 1) - atomically $ updatePeriodStats (activeQueues stats) queueId + updateStats :: Message -> m () + updateStats = \case + MessageQuota {} -> pure () + Message {msgFlags} -> do + stats <- asks serverStats + atomically $ modifyTVar' (msgRecv stats) (+ 1) + atomically $ modifyTVar' (msgCount stats) (+ 1) + atomically $ updatePeriodStats (activeQueues stats) queueId + when (notification msgFlags) $ do + atomically $ modifyTVar' (msgRecvNtf stats) (+ 1) + atomically $ updatePeriodStats (activeQueuesNtf stats) queueId sendMessage :: QueueRec -> MsgFlags -> MsgBody -> m (Transmission BrokerMsg) sendMessage qr msgFlags msgBody @@ -547,10 +578,13 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv case msg_ of Nothing -> pure $ err QUOTA Just msg -> time "SEND ok" $ do - when (notification msgFlags) $ - atomically . trySendNotification msg =<< asks idsDrg stats <- asks serverStats + when (notification msgFlags) $ do + atomically . trySendNotification msg =<< asks idsDrg + atomically $ modifyTVar' (msgSentNtf stats) (+ 1) + atomically $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr) atomically $ modifyTVar' (msgSent stats) (+ 1) + atomically $ modifyTVar' (msgCount stats) (subtract 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) pure ok where @@ -647,6 +681,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv ms <- asks msgStore stats <- asks serverStats atomically $ modifyTVar' (qDeleted stats) (+ 1) + atomically $ modifyTVar' (qCount stats) (subtract 1) atomically $ deleteQueue st queueId >>= \case Left e -> pure $ err e @@ -755,7 +790,9 @@ restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStat liftIO (strDecode <$> B.readFile f) >>= \case Right d -> do s <- asks serverStats - atomically $ setServerStats s d + _qCount <- fmap (length . M.keys) . readTVarIO . queues =<< asks queueStore + _msgCount <- foldM (\n q -> (n +) <$> readTVarIO (size q)) 0 =<< readTVarIO =<< asks msgStore + atomically $ setServerStats s d {_qCount, _msgCount} renameFile f $ f <> ".bak" logInfo "server stats restored" Left e -> do diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 4ecf6d152..e9dd95eec 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -8,7 +8,7 @@ module Simplex.Messaging.Server.MsgStore.STM ( STMMsgStore, - MsgQueue, + MsgQueue (..), newMsgStore, getMsgQueue, delMsgQueue, @@ -86,22 +86,22 @@ peekMsg :: MsgQueue -> STM Message peekMsg = peekTQueue . msgQueue {-# INLINE peekMsg #-} -tryDelMsg :: MsgQueue -> MsgId -> STM Bool +tryDelMsg :: MsgQueue -> MsgId -> STM (Maybe Message) tryDelMsg mq msgId' = tryPeekMsg mq >>= \case - Just msg - | msgId msg == msgId' || B.null msgId' -> tryDeleteMsg mq >> pure True - | otherwise -> pure False - _ -> pure False + msg_@(Just msg) + | msgId msg == msgId' || B.null msgId' -> tryDeleteMsg mq >> pure msg_ + | otherwise -> pure Nothing + _ -> pure Nothing -- atomic delete (== read) last and peek next message if available -tryDelPeekMsg :: MsgQueue -> MsgId -> STM (Bool, Maybe Message) +tryDelPeekMsg :: MsgQueue -> MsgId -> STM (Maybe Message, Maybe Message) tryDelPeekMsg mq msgId' = tryPeekMsg mq >>= \case msg_@(Just msg) - | msgId msg == msgId' || B.null msgId' -> (True,) <$> (tryDeleteMsg mq >> tryPeekMsg mq) - | otherwise -> pure (False, msg_) - _ -> pure (False, Nothing) + | msgId msg == msgId' || B.null msgId' -> (msg_,) <$> (tryDeleteMsg mq >> tryPeekMsg mq) + | otherwise -> pure (Nothing, msg_) + _ -> pure (Nothing, Nothing) deleteExpiredMsgs :: MsgQueue -> Int64 -> STM () deleteExpiredMsgs mq old = loop diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 3f08d5cb8..82170e90f 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} @@ -5,7 +6,7 @@ module Simplex.Messaging.Server.Stats where -import Control.Applicative (optional) +import Control.Applicative (optional, (<|>)) import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B import Data.Set (Set) @@ -24,7 +25,12 @@ data ServerStats = ServerStats qDeleted :: TVar Int, msgSent :: TVar Int, msgRecv :: TVar Int, - activeQueues :: PeriodStats RecipientId + activeQueues :: PeriodStats RecipientId, + msgSentNtf :: TVar Int, + msgRecvNtf :: TVar Int, + activeQueuesNtf :: PeriodStats RecipientId, + qCount :: TVar Int, + msgCount :: TVar Int } data ServerStatsData = ServerStatsData @@ -34,7 +40,12 @@ data ServerStatsData = ServerStatsData _qDeleted :: Int, _msgSent :: Int, _msgRecv :: Int, - _activeQueues :: PeriodStatsData RecipientId + _activeQueues :: PeriodStatsData RecipientId, + _msgSentNtf :: Int, + _msgRecvNtf :: Int, + _activeQueuesNtf :: PeriodStatsData RecipientId, + _qCount :: Int, + _msgCount :: Int } newServerStats :: UTCTime -> STM ServerStats @@ -46,7 +57,12 @@ newServerStats ts = do msgSent <- newTVar 0 msgRecv <- newTVar 0 activeQueues <- newPeriodStats - pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues} + msgSentNtf <- newTVar 0 + msgRecvNtf <- newTVar 0 + activeQueuesNtf <- newPeriodStats + qCount <- newTVar 0 + msgCount <- newTVar 0 + pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} getServerStatsData :: ServerStats -> STM ServerStatsData getServerStatsData s = do @@ -57,7 +73,12 @@ getServerStatsData s = do _msgSent <- readTVar $ msgSent s _msgRecv <- readTVar $ msgRecv s _activeQueues <- getPeriodStatsData $ activeQueues s - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues} + _msgSentNtf <- readTVar $ msgSentNtf s + _msgRecvNtf <- readTVar $ msgRecvNtf s + _activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s + _qCount <- readTVar $ qCount s + _msgCount <- readTVar $ msgCount s + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount} setServerStats :: ServerStats -> ServerStatsData -> STM () setServerStats s d = do @@ -67,10 +88,15 @@ setServerStats s d = do writeTVar (qDeleted s) $! _qDeleted d writeTVar (msgSent s) $! _msgSent d writeTVar (msgRecv s) $! _msgRecv d - setPeriodStats (activeQueues s) (_activeQueues d) + setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d) + writeTVar (msgSentNtf s) $! _msgSentNtf d + writeTVar (msgRecvNtf s) $! _msgRecvNtf d + setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d) + writeTVar (qCount s) $! _qCount d + writeTVar (msgCount s) $! _qCount d instance StrEncoding ServerStatsData where - strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues} = + strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf} = B.unlines [ "fromTime=" <> strEncode _fromTime, "qCreated=" <> strEncode _qCreated, @@ -78,8 +104,12 @@ instance StrEncoding ServerStatsData where "qDeleted=" <> strEncode _qDeleted, "msgSent=" <> strEncode _msgSent, "msgRecv=" <> strEncode _msgRecv, + "msgSentNtf=" <> strEncode _msgSentNtf, + "msgRecvNtf=" <> strEncode _msgRecvNtf, "activeQueues:", - strEncode _activeQueues + strEncode _activeQueues, + "activeQueuesNtf:", + strEncode _activeQueuesNtf ] strP = do _fromTime <- "fromTime=" *> strP <* A.endOfLine @@ -88,15 +118,21 @@ instance StrEncoding ServerStatsData where _qDeleted <- "qDeleted=" *> strP <* A.endOfLine _msgSent <- "msgSent=" *> strP <* A.endOfLine _msgRecv <- "msgRecv=" *> strP <* A.endOfLine - r <- optional ("activeQueues:" <* A.endOfLine) - _activeQueues <- case r of - Just _ -> strP <* optional A.endOfLine - _ -> do - _day <- "dayMsgQueues=" *> strP <* A.endOfLine - _week <- "weekMsgQueues=" *> strP <* A.endOfLine - _month <- "monthMsgQueues=" *> strP <* optional A.endOfLine - pure PeriodStatsData {_day, _week, _month} - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues} + _msgSentNtf <- "msgSentNtf=" *> strP <* A.endOfLine <|> pure 0 + _msgRecvNtf <- "msgRecvNtf=" *> strP <* A.endOfLine <|> pure 0 + _activeQueues <- + optional ("activeQueues:" <* A.endOfLine) >>= \case + Just _ -> strP <* optional A.endOfLine + _ -> do + _day <- "dayMsgQueues=" *> strP <* A.endOfLine + _week <- "weekMsgQueues=" *> strP <* A.endOfLine + _month <- "monthMsgQueues=" *> strP <* optional A.endOfLine + pure PeriodStatsData {_day, _week, _month} + _activeQueuesNtf <- + optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case + Just _ -> strP <* optional A.endOfLine + _ -> pure newPeriodStatsData + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount = 0, _msgCount = 0} data PeriodStats a = PeriodStats { day :: TVar (Set a), @@ -117,6 +153,9 @@ data PeriodStatsData a = PeriodStatsData _month :: Set a } +newPeriodStatsData :: PeriodStatsData a +newPeriodStatsData = PeriodStatsData {_day = S.empty, _week = S.empty, _month = S.empty} + getPeriodStatsData :: PeriodStats a -> STM (PeriodStatsData a) getPeriodStatsData s = do _day <- readTVar $ day s