From bc26dc1d682a56597c899e5363d4290ef5bd4950 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 4 Jul 2022 10:45:35 +0100 Subject: [PATCH] save and restore server stats on restart (#460) --- apps/smp-server/Main.hs | 1 + simplexmq.cabal | 1 + src/Simplex/Messaging/Encoding/String.hs | 22 ++++- src/Simplex/Messaging/Server.hs | 45 ++++++++-- src/Simplex/Messaging/Server/Env/STM.hs | 32 +------ src/Simplex/Messaging/Server/Stats.hs | 101 +++++++++++++++++++++++ tests/SMPClient.hs | 8 +- 7 files changed, 171 insertions(+), 39 deletions(-) create mode 100644 src/Simplex/Messaging/Server/Stats.hs diff --git a/apps/smp-server/Main.hs b/apps/smp-server/Main.hs index 2e6661802..e5a9dbc9d 100644 --- a/apps/smp-server/Main.hs +++ b/apps/smp-server/Main.hs @@ -103,6 +103,7 @@ smpServerCLIConfig = else Nothing, logStatsInterval = Just 86400, -- seconds logStatsStartTime = 0, -- seconds from 00:00 UTC + serverStatsFile = Just $ combine logPath "smp-server-stats.log", smpServerVRange = supportedSMPServerVRange } } diff --git a/simplexmq.cabal b/simplexmq.cabal index 977d122e0..aa1b64e13 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -75,6 +75,7 @@ library Simplex.Messaging.Server.MsgStore.STM Simplex.Messaging.Server.QueueStore Simplex.Messaging.Server.QueueStore.STM + Simplex.Messaging.Server.Stats Simplex.Messaging.Server.StoreLog Simplex.Messaging.TMap Simplex.Messaging.Transport diff --git a/src/Simplex/Messaging/Encoding/String.hs b/src/Simplex/Messaging/Encoding/String.hs index ed9473983..2ff374e46 100644 --- a/src/Simplex/Messaging/Encoding/String.hs +++ b/src/Simplex/Messaging/Encoding/String.hs @@ -28,9 +28,13 @@ import qualified Data.ByteString.Char8 as B import Data.Char (isAlphaNum) import Data.Int (Int64) import qualified Data.List.NonEmpty as L +import Data.Set (Set) +import qualified Data.Set as S import Data.Text (Text) import Data.Text.Encoding (decodeLatin1, encodeUtf8) +import Data.Time.Clock (UTCTime) import Data.Time.Clock.System (SystemTime (..)) +import Data.Time.Format.ISO8601 import Data.Word (Word16) import Simplex.Messaging.Encoding import Simplex.Messaging.Parsers (parseAll) @@ -101,14 +105,26 @@ instance StrEncoding Bool where strP = smpP {-# INLINE strP #-} +instance StrEncoding Int where + strEncode = B.pack . show + {-# INLINE strEncode #-} + strP = A.decimal + {-# INLINE strP #-} + instance StrEncoding Int64 where strEncode = B.pack . show + {-# INLINE strEncode #-} strP = A.decimal + {-# INLINE strP #-} instance StrEncoding SystemTime where strEncode = strEncode . systemSeconds strP = MkSystemTime <$> strP <*> pure 0 +instance StrEncoding UTCTime where + strEncode = B.pack . iso8601Show + strP = maybe (Left "bad UTCTime") Right . iso8601ParseM . B.unpack <$?> A.takeTill (\c -> c == ' ' || c == '\n') + -- lists encode/parse as comma-separated strings strEncodeList :: StrEncoding a => [a] -> ByteString strEncodeList = B.intercalate "," . map strEncode @@ -121,8 +137,12 @@ instance StrEncoding a => StrEncoding (L.NonEmpty a) where strEncode = strEncodeList . L.toList strP = L.fromList <$> listItem `A.sepBy1'` A.char ',' +instance (StrEncoding a, Ord a) => StrEncoding (Set a) where + strEncode = strEncodeList . S.toList + strP = S.fromList <$> listItem `A.sepBy'` A.char ',' + listItem :: StrEncoding a => Parser a -listItem = parseAll strP <$?> A.takeTill (== ',') +listItem = parseAll strP <$?> A.takeTill (\c -> c == ',' || c == ' ' || c == '\n') instance (StrEncoding a, StrEncoding b) => StrEncoding (a, b) where strEncode (a, b) = B.unwords [strEncode a, strEncode b] diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 8de05d83f..9d82ec6d7 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -50,10 +50,12 @@ import Data.Maybe (isNothing) import Data.Set (Set) import qualified Data.Set as S import qualified Data.Text as T +import Data.Text.Encoding (decodeLatin1) import Data.Time.Calendar.Month.Compat (pattern MonthDay) import Data.Time.Calendar.OrdinalDate (mondayStartWeek) import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) +import Data.Time.Format.ISO8601 (iso8601Show) import Data.Type.Equality import Network.Socket (ServiceName) import qualified Simplex.Messaging.Crypto as C @@ -66,6 +68,7 @@ import Simplex.Messaging.Server.MsgStore import Simplex.Messaging.Server.MsgStore.STM (MsgQueue) import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.STM (QueueStore) +import Simplex.Messaging.Server.Stats import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -98,13 +101,14 @@ smpServer :: forall m. (MonadUnliftIO m, MonadReader Env m) => TMVar Bool -> m ( smpServer started = do s <- asks server cfg@ServerConfig {transports} <- asks config + restoreServerStats restoreServerMessages raceAny_ ( serverThread s subscribedQ subscribers subscriptions cancelSub : serverThread s ntfSubscribedQ notifiers ntfSubscriptions (\_ -> pure ()) : map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg ) - `finally` (withLog closeStoreLog >> saveServerMessages) + `finally` (withLog closeStoreLog >> saveServerMessages >> saveServerStats) where runServer :: (ServiceName, ATransport) -> m () runServer (tcpPort, ATransport t) = do @@ -182,7 +186,7 @@ smpServer started = do MonthDay _ mDay = day (dayMsgQueues', weekMsgQueues', monthMsgQueues') <- atomically $ (,,) <$> periodCount 1 dayMsgQueues <*> periodCount wDay weekMsgQueues <*> periodCount mDay monthMsgQueues - logInfo . T.pack $ intercalate "," [show fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', show dayMsgQueues', weekMsgQueues', monthMsgQueues'] + logInfo . T.pack $ intercalate "," [iso8601Show fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', show dayMsgQueues', weekMsgQueues', monthMsgQueues'] threadDelay interval where periodCount :: Int -> TVar (Set RecipientId) -> STM String @@ -638,33 +642,58 @@ randomId n = do gVar <- asks idsDrg atomically (C.pseudoRandomBytes n gVar) -saveServerMessages :: forall m. (MonadUnliftIO m, MonadReader Env m) => m () +saveServerMessages :: (MonadUnliftIO m, MonadReader Env m) => m () saveServerMessages = asks (storeMsgsFile . config) >>= mapM_ saveMessages where saveMessages f = do - liftIO $ putStrLn $ "saving messages to file " <> f + logInfo $ "saving messages to file " <> T.pack f ms <- asks msgStore liftIO . withFile f WriteMode $ \h -> readTVarIO ms >>= mapM_ (saveQueueMsgs ms h) . M.keys + logInfo "messages saved" where saveQueueMsgs ms h rId = atomically (flushMsgQueue ms rId) >>= mapM_ (B.hPutStrLn h . strEncode . MsgLogRecord rId) -restoreServerMessages :: forall m. (MonadUnliftIO m, MonadReader Env m) => m () +restoreServerMessages :: (MonadUnliftIO m, MonadReader Env m) => m () restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages where restoreMessages f = whenM (doesFileExist f) $ do - liftIO $ putStrLn $ "restoring messages from file " <> f + logInfo $ "restoring messages from file " <> T.pack f ms <- asks msgStore quota <- asks $ msgQueueQuota . config liftIO $ mapM_ (restoreMsg ms quota) . B.lines =<< B.readFile f renameFile f $ f <> ".bak" + logInfo $ "messages restored" where restoreMsg ms quota s = case strDecode s of - Left e -> B.putStrLn $ "message parsing error (" <> B.pack e <> "): " <> B.take 100 s + Left e -> logError . decodeLatin1 $ "message parsing error (" <> B.pack e <> "): " <> B.take 100 s Right (MsgLogRecord rId msg) -> do full <- atomically $ do q <- getMsgQueue ms rId quota ifM (isFull q) (pure True) (writeMsg q msg $> False) - when full . B.putStrLn $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message)) + when full . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message)) + +saveServerStats :: (MonadUnliftIO m, MonadReader Env m) => m () +saveServerStats = + asks (serverStatsFile . config) + >>= mapM_ (\f -> asks serverStats >>= atomically . getServerStatsData >>= liftIO . saveStats f) + where + saveStats f stats = do + logInfo $ "saving server stats to file " <> T.pack f + B.writeFile f $ strEncode stats + logInfo $ "server stats saved" + +restoreServerStats :: (MonadUnliftIO m, MonadReader Env m) => m () +restoreServerStats = asks (serverStatsFile . config) >>= mapM_ restoreStats + where + restoreStats f = whenM (doesFileExist f) $ do + logInfo $ "restoring server stats from file " <> T.pack f + liftIO (strDecode <$> B.readFile f) >>= \case + Right d -> do + s <- asks serverStats + atomically $ setServerStatsData s d + renameFile f $ f <> ".bak" + logInfo "server stats restored" + Left e -> logInfo $ "error restoring server stats: " <> T.pack e diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index b64495e0e..565438eab 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -12,9 +12,7 @@ import Crypto.Random import Data.ByteString.Char8 (ByteString) import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Set (Set) -import qualified Data.Set as S -import Data.Time.Clock (UTCTime, getCurrentTime) +import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.System (SystemTime) import Data.X509.Validation (Fingerprint (..)) import Network.Socket (ServiceName) @@ -26,6 +24,7 @@ import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..)) import Simplex.Messaging.Server.QueueStore.STM +import Simplex.Messaging.Server.Stats import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -57,6 +56,8 @@ data ServerConfig = ServerConfig -- | time of the day when the stats are logged first, to log at consistent times, -- irrespective of when the server is started (seconds from 00:00 UTC) logStatsStartTime :: Int, + -- | file to save and restore stats + serverStatsFile :: Maybe FilePath, -- | CA certificate private key is not needed for initialization caCertificateFile :: FilePath, privateKeyFile :: FilePath, @@ -108,18 +109,6 @@ data Client = Client activeAt :: TVar SystemTime } -data ServerStats = ServerStats - { qCreated :: TVar Int, - qSecured :: TVar Int, - qDeleted :: TVar Int, - msgSent :: TVar Int, - msgRecv :: TVar Int, - dayMsgQueues :: TVar (Set RecipientId), - weekMsgQueues :: TVar (Set RecipientId), - monthMsgQueues :: TVar (Set RecipientId), - fromTime :: TVar UTCTime - } - data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId) | ProhibitSub data Sub = Sub @@ -145,19 +134,6 @@ newClient qSize sessionId ts = do activeAt <- newTVar ts return Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, connected, activeAt} -newServerStats :: UTCTime -> STM ServerStats -newServerStats ts = do - qCreated <- newTVar 0 - qSecured <- newTVar 0 - qDeleted <- newTVar 0 - msgSent <- newTVar 0 - msgRecv <- newTVar 0 - dayMsgQueues <- newTVar S.empty - weekMsgQueues <- newTVar S.empty - monthMsgQueues <- newTVar S.empty - fromTime <- newTVar ts - pure ServerStats {qCreated, qSecured, qDeleted, msgSent, msgRecv, dayMsgQueues, weekMsgQueues, monthMsgQueues, fromTime} - newSubscription :: SubscriptionThread -> STM Sub newSubscription subThread = do delivered <- newEmptyTMVar diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs new file mode 100644 index 000000000..f99dc135e --- /dev/null +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -0,0 +1,101 @@ +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} + +module Simplex.Messaging.Server.Stats where + +import Control.Applicative (optional) +import qualified Data.Attoparsec.ByteString.Char8 as A +import qualified Data.ByteString.Char8 as B +import Data.Set (Set) +import qualified Data.Set as S +import Data.Time.Clock (UTCTime) +import Simplex.Messaging.Encoding.String +import Simplex.Messaging.Protocol (RecipientId) +import UnliftIO.STM + +data ServerStats = ServerStats + { fromTime :: TVar UTCTime, + qCreated :: TVar Int, + qSecured :: TVar Int, + qDeleted :: TVar Int, + msgSent :: TVar Int, + msgRecv :: TVar Int, + dayMsgQueues :: TVar (Set RecipientId), + weekMsgQueues :: TVar (Set RecipientId), + monthMsgQueues :: TVar (Set RecipientId) + } + +data ServerStatsData = ServerStatsData + { _fromTime :: UTCTime, + _qCreated :: Int, + _qSecured :: Int, + _qDeleted :: Int, + _msgSent :: Int, + _msgRecv :: Int, + _dayMsgQueues :: Set RecipientId, + _weekMsgQueues :: Set RecipientId, + _monthMsgQueues :: Set RecipientId + } + +newServerStats :: UTCTime -> STM ServerStats +newServerStats ts = do + fromTime <- newTVar ts + qCreated <- newTVar 0 + qSecured <- newTVar 0 + qDeleted <- newTVar 0 + msgSent <- newTVar 0 + msgRecv <- newTVar 0 + dayMsgQueues <- newTVar S.empty + weekMsgQueues <- newTVar S.empty + monthMsgQueues <- newTVar S.empty + pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, dayMsgQueues, weekMsgQueues, monthMsgQueues} + +getServerStatsData :: ServerStats -> STM ServerStatsData +getServerStatsData s = do + _fromTime <- readTVar $ fromTime s + _qCreated <- readTVar $ qCreated s + _qSecured <- readTVar $ qSecured s + _qDeleted <- readTVar $ qDeleted s + _msgSent <- readTVar $ msgSent s + _msgRecv <- readTVar $ msgRecv s + _dayMsgQueues <- readTVar $ dayMsgQueues s + _weekMsgQueues <- readTVar $ weekMsgQueues s + _monthMsgQueues <- readTVar $ monthMsgQueues s + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _dayMsgQueues, _weekMsgQueues, _monthMsgQueues} + +setServerStatsData :: ServerStats -> ServerStatsData -> STM () +setServerStatsData s d = do + writeTVar (fromTime s) (_fromTime d) + writeTVar (qCreated s) (_qCreated d) + writeTVar (qSecured s) (_qSecured d) + writeTVar (qDeleted s) (_qDeleted d) + writeTVar (msgSent s) (_msgSent d) + writeTVar (msgRecv s) (_msgRecv d) + writeTVar (dayMsgQueues s) (_dayMsgQueues d) + writeTVar (weekMsgQueues s) (_weekMsgQueues d) + writeTVar (monthMsgQueues s) (_monthMsgQueues d) + +instance StrEncoding ServerStatsData where + strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _dayMsgQueues, _weekMsgQueues, _monthMsgQueues} = + B.unlines + [ "fromTime=" <> strEncode _fromTime, + "qCreated=" <> strEncode _qCreated, + "qSecured=" <> strEncode _qSecured, + "qDeleted=" <> strEncode _qDeleted, + "msgSent=" <> strEncode _msgSent, + "msgRecv=" <> strEncode _msgRecv, + "dayMsgQueues=" <> strEncode _dayMsgQueues, + "weekMsgQueues=" <> strEncode _weekMsgQueues, + "monthMsgQueues=" <> strEncode _monthMsgQueues + ] + strP = do + _fromTime <- "fromTime=" *> strP <* A.endOfLine + _qCreated <- "qCreated=" *> strP <* A.endOfLine + _qSecured <- "qSecured=" *> strP <* A.endOfLine + _qDeleted <- "qDeleted=" *> strP <* A.endOfLine + _msgSent <- "msgSent=" *> strP <* A.endOfLine + _msgRecv <- "msgRecv=" *> strP <* A.endOfLine + _dayMsgQueues <- "dayMsgQueues=" *> strP <* A.endOfLine + _weekMsgQueues <- "weekMsgQueues=" *> strP <* A.endOfLine + _monthMsgQueues <- "monthMsgQueues=" *> strP <* optional A.endOfLine + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _dayMsgQueues, _weekMsgQueues, _monthMsgQueues} diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 4761e2dae..c3927fa27 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -46,6 +46,9 @@ testStoreLogFile = "tests/tmp/smp-server-store.log" testStoreMsgsFile :: FilePath testStoreMsgsFile = "tests/tmp/smp-server-messages.log" +testServerStatsFile :: FilePath +testServerStatsFile = "tests/tmp/smp-server-stats.log" + testSMPClient :: (Transport c, MonadUnliftIO m) => (THandle c -> m a) -> m a testSMPClient client = runTransportClient testHost testPort (Just testKeyHash) (Just defaultKeepAliveOpts) $ \h -> @@ -69,6 +72,7 @@ cfg = inactiveClientExpiration = Just defaultInactiveClientExpiration, logStatsInterval = Nothing, logStatsStartTime = 0, + serverStatsFile = Nothing, caCertificateFile = "tests/fixtures/ca.crt", privateKeyFile = "tests/fixtures/server.key", certificateFile = "tests/fixtures/server.crt", @@ -76,10 +80,10 @@ cfg = } withSmpServerStoreMsgLogOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a -withSmpServerStoreMsgLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile} +withSmpServerStoreMsgLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, serverStatsFile = Just testServerStatsFile} withSmpServerStoreLogOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a -withSmpServerStoreLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile} +withSmpServerStoreLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, serverStatsFile = Just testServerStatsFile} withSmpServerConfigOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServerConfig -> ServiceName -> (ThreadId -> m a) -> m a withSmpServerConfigOn t cfg' port' =