mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 18:35:59 +00:00
servers: reduce memory used for period stats (#1298)
This commit is contained in:
@@ -47,6 +47,7 @@ dependencies:
|
||||
- direct-sqlcipher == 2.3.*
|
||||
- directory == 1.3.*
|
||||
- filepath == 1.4.*
|
||||
- hashable == 1.4.*
|
||||
- hourglass == 0.2.*
|
||||
- http-types == 0.12.*
|
||||
- http2 >= 4.2.2 && < 4.3
|
||||
|
||||
@@ -236,6 +236,7 @@ library
|
||||
, direct-sqlcipher ==2.3.*
|
||||
, directory ==1.3.*
|
||||
, filepath ==1.4.*
|
||||
, hashable ==1.4.*
|
||||
, hourglass ==0.2.*
|
||||
, http-types ==0.12.*
|
||||
, http2 >=4.2.2 && <4.3
|
||||
@@ -310,6 +311,7 @@ executable ntf-server
|
||||
, direct-sqlcipher ==2.3.*
|
||||
, directory ==1.3.*
|
||||
, filepath ==1.4.*
|
||||
, hashable ==1.4.*
|
||||
, hourglass ==0.2.*
|
||||
, http-types ==0.12.*
|
||||
, http2 >=4.2.2 && <4.3
|
||||
@@ -389,6 +391,7 @@ executable smp-server
|
||||
, directory ==1.3.*
|
||||
, file-embed
|
||||
, filepath ==1.4.*
|
||||
, hashable ==1.4.*
|
||||
, hourglass ==0.2.*
|
||||
, http-types ==0.12.*
|
||||
, http2 >=4.2.2 && <4.3
|
||||
@@ -467,6 +470,7 @@ executable xftp
|
||||
, direct-sqlcipher ==2.3.*
|
||||
, directory ==1.3.*
|
||||
, filepath ==1.4.*
|
||||
, hashable ==1.4.*
|
||||
, hourglass ==0.2.*
|
||||
, http-types ==0.12.*
|
||||
, http2 >=4.2.2 && <4.3
|
||||
@@ -542,6 +546,7 @@ executable xftp-server
|
||||
, direct-sqlcipher ==2.3.*
|
||||
, directory ==1.3.*
|
||||
, filepath ==1.4.*
|
||||
, hashable ==1.4.*
|
||||
, hourglass ==0.2.*
|
||||
, http-types ==0.12.*
|
||||
, http2 >=4.2.2 && <4.3
|
||||
@@ -653,6 +658,7 @@ test-suite simplexmq-test
|
||||
, directory ==1.3.*
|
||||
, filepath ==1.4.*
|
||||
, generic-random ==1.5.*
|
||||
, hashable ==1.4.*
|
||||
, hourglass ==0.2.*
|
||||
, hspec ==2.11.*
|
||||
, hspec-core ==2.11.*
|
||||
|
||||
@@ -11,7 +11,6 @@ import Data.IORef
|
||||
import Data.Int (Int64)
|
||||
import Data.Time.Clock (UTCTime)
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (SenderId)
|
||||
import Simplex.Messaging.Server.Stats (PeriodStats, PeriodStatsData, getPeriodStatsData, newPeriodStats, setPeriodStats)
|
||||
|
||||
data FileServerStats = FileServerStats
|
||||
@@ -21,7 +20,7 @@ data FileServerStats = FileServerStats
|
||||
filesUploaded :: IORef Int,
|
||||
filesExpired :: IORef Int,
|
||||
filesDeleted :: IORef Int,
|
||||
filesDownloaded :: PeriodStats SenderId,
|
||||
filesDownloaded :: PeriodStats,
|
||||
fileDownloads :: IORef Int,
|
||||
fileDownloadAcks :: IORef Int,
|
||||
filesCount :: IORef Int,
|
||||
@@ -35,7 +34,7 @@ data FileServerStatsData = FileServerStatsData
|
||||
_filesUploaded :: Int,
|
||||
_filesExpired :: Int,
|
||||
_filesDeleted :: Int,
|
||||
_filesDownloaded :: PeriodStatsData SenderId,
|
||||
_filesDownloaded :: PeriodStatsData,
|
||||
_fileDownloads :: Int,
|
||||
_fileDownloadAcks :: Int,
|
||||
_filesCount :: Int,
|
||||
|
||||
@@ -28,6 +28,8 @@ import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Char (isAlphaNum)
|
||||
import Data.Int (Int64)
|
||||
import Data.IntSet (IntSet)
|
||||
import qualified Data.IntSet as IS
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
@@ -39,7 +41,7 @@ import Data.Time.Format.ISO8601
|
||||
import Data.Word (Word16, Word32)
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Parsers (parseAll)
|
||||
import Simplex.Messaging.Util ((<$?>))
|
||||
import Simplex.Messaging.Util (bshow, (<$?>))
|
||||
|
||||
class TextEncoding a where
|
||||
textEncode :: a -> Text
|
||||
@@ -125,15 +127,15 @@ instance StrEncoding Bool where
|
||||
{-# INLINE strP #-}
|
||||
|
||||
instance StrEncoding Int where
|
||||
strEncode = B.pack . show
|
||||
strEncode = bshow
|
||||
{-# INLINE strEncode #-}
|
||||
strP = A.decimal
|
||||
strP = A.signed A.decimal
|
||||
{-# INLINE strP #-}
|
||||
|
||||
instance StrEncoding Int64 where
|
||||
strEncode = B.pack . show
|
||||
strEncode = bshow
|
||||
{-# INLINE strEncode #-}
|
||||
strP = A.decimal
|
||||
strP = A.signed A.decimal
|
||||
{-# INLINE strP #-}
|
||||
|
||||
instance StrEncoding SystemTime where
|
||||
@@ -160,6 +162,10 @@ instance (StrEncoding a, Ord a) => StrEncoding (Set a) where
|
||||
strEncode = strEncodeList . S.toList
|
||||
strP = S.fromList <$> listItem `A.sepBy'` A.char ','
|
||||
|
||||
instance StrEncoding IntSet where
|
||||
strEncode = strEncodeList . IS.toList
|
||||
strP = IS.fromList <$> listItem `A.sepBy'` A.char ','
|
||||
|
||||
listItem :: StrEncoding a => Parser a
|
||||
listItem = parseAll strP <$?> A.takeTill (\c -> c == ',' || c == ' ' || c == '\n')
|
||||
|
||||
|
||||
@@ -10,8 +10,6 @@ import qualified Data.ByteString.Char8 as B
|
||||
import Data.IORef
|
||||
import Data.Time.Clock (UTCTime)
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Notifications.Protocol (NtfTokenId)
|
||||
import Simplex.Messaging.Protocol (NotifierId)
|
||||
import Simplex.Messaging.Server.Stats
|
||||
|
||||
data NtfServerStats = NtfServerStats
|
||||
@@ -23,8 +21,8 @@ data NtfServerStats = NtfServerStats
|
||||
subDeleted :: IORef Int,
|
||||
ntfReceived :: IORef Int,
|
||||
ntfDelivered :: IORef Int,
|
||||
activeTokens :: PeriodStats NtfTokenId,
|
||||
activeSubs :: PeriodStats NotifierId
|
||||
activeTokens :: PeriodStats,
|
||||
activeSubs :: PeriodStats
|
||||
}
|
||||
|
||||
data NtfServerStatsData = NtfServerStatsData
|
||||
@@ -36,8 +34,8 @@ data NtfServerStatsData = NtfServerStatsData
|
||||
_subDeleted :: Int,
|
||||
_ntfReceived :: Int,
|
||||
_ntfDelivered :: Int,
|
||||
_activeTokens :: PeriodStatsData NtfTokenId,
|
||||
_activeSubs :: PeriodStatsData NotifierId
|
||||
_activeTokens :: PeriodStatsData,
|
||||
_activeSubs :: PeriodStatsData
|
||||
}
|
||||
|
||||
newNtfServerStats :: UTCTime -> IO NtfServerStats
|
||||
|
||||
@@ -62,7 +62,6 @@ import Data.List.NonEmpty (NonEmpty (..), (<|))
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing)
|
||||
import qualified Data.Set as S
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (decodeLatin1)
|
||||
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
|
||||
@@ -472,7 +471,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
putStat "qDeletedAllB" qDeletedAllB
|
||||
putStat "qDeletedNew" qDeletedNew
|
||||
putStat "qDeletedSecured" qDeletedSecured
|
||||
getStat (day . activeQueues) >>= \v -> hPutStrLn h $ "daily active queues: " <> show (S.size v)
|
||||
getStat (day . activeQueues) >>= \v -> hPutStrLn h $ "daily active queues: " <> show (IS.size v)
|
||||
-- removed to reduce memory usage
|
||||
-- getStat (day . subscribedQueues) >>= \v -> hPutStrLn h $ "daily subscribed queues: " <> show (S.size v)
|
||||
putStat "qSub" qSub
|
||||
|
||||
@@ -10,8 +10,12 @@ module Simplex.Messaging.Server.Stats where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Hashable (hash)
|
||||
import Data.IORef
|
||||
import Data.IntSet (IntSet)
|
||||
import qualified Data.IntSet as IS
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import Data.Time.Calendar.Month (pattern MonthDay)
|
||||
@@ -19,7 +23,7 @@ import Data.Time.Calendar.OrdinalDate (mondayStartWeek)
|
||||
import Data.Time.Clock (UTCTime (..))
|
||||
import GHC.IORef (atomicSwapIORef)
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (RecipientId)
|
||||
import Simplex.Messaging.Protocol (EntityId (..))
|
||||
import Simplex.Messaging.Util (atomicModifyIORef'_, unlessM)
|
||||
|
||||
data ServerStats = ServerStats
|
||||
@@ -57,11 +61,11 @@ data ServerStats = ServerStats
|
||||
msgGetDuplicate :: IORef Int,
|
||||
msgGetProhibited :: IORef Int,
|
||||
msgExpired :: IORef Int,
|
||||
activeQueues :: PeriodStats RecipientId,
|
||||
-- subscribedQueues :: PeriodStats RecipientId, -- this stat uses too much memory
|
||||
activeQueues :: PeriodStats,
|
||||
-- subscribedQueues :: PeriodStats, -- this stat uses too much memory
|
||||
msgSentNtf :: IORef Int, -- sent messages with NTF flag
|
||||
msgRecvNtf :: IORef Int, -- received messages with NTF flag
|
||||
activeQueuesNtf :: PeriodStats RecipientId,
|
||||
activeQueuesNtf :: PeriodStats,
|
||||
msgNtfs :: IORef Int, -- messages notications delivered to NTF server (<= msgSentNtf)
|
||||
msgNtfNoSub :: IORef Int, -- no subscriber to notifications (e.g., NTF server not connected)
|
||||
msgNtfLost :: IORef Int, -- notification is lost because NTF delivery queue is full
|
||||
@@ -108,10 +112,10 @@ data ServerStatsData = ServerStatsData
|
||||
_msgGetDuplicate :: Int,
|
||||
_msgGetProhibited :: Int,
|
||||
_msgExpired :: Int,
|
||||
_activeQueues :: PeriodStatsData RecipientId,
|
||||
_activeQueues :: PeriodStatsData,
|
||||
_msgSentNtf :: Int,
|
||||
_msgRecvNtf :: Int,
|
||||
_activeQueuesNtf :: PeriodStatsData RecipientId,
|
||||
_activeQueuesNtf :: PeriodStatsData,
|
||||
_msgNtfs :: Int,
|
||||
_msgNtfNoSub :: Int,
|
||||
_msgNtfLost :: Int,
|
||||
@@ -483,7 +487,7 @@ instance StrEncoding ServerStatsData where
|
||||
pure PeriodStatsData {_day, _week, _month}
|
||||
_subscribedQueues <-
|
||||
optional ("subscribedQueues:" <* A.endOfLine) >>= \case
|
||||
Just _ -> newPeriodStatsData <$ (strP @(PeriodStatsData RecipientId) <* optional A.endOfLine)
|
||||
Just _ -> newPeriodStatsData <$ (strP @PeriodStatsData <* optional A.endOfLine)
|
||||
_ -> pure newPeriodStatsData
|
||||
_activeQueuesNtf <-
|
||||
optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case
|
||||
@@ -552,30 +556,30 @@ instance StrEncoding ServerStatsData where
|
||||
Just _ -> strP <* optional A.endOfLine
|
||||
_ -> pure newProxyStatsData
|
||||
|
||||
data PeriodStats a = PeriodStats
|
||||
{ day :: IORef (Set a),
|
||||
week :: IORef (Set a),
|
||||
month :: IORef (Set a)
|
||||
data PeriodStats = PeriodStats
|
||||
{ day :: IORef IntSet,
|
||||
week :: IORef IntSet,
|
||||
month :: IORef IntSet
|
||||
}
|
||||
|
||||
newPeriodStats :: IO (PeriodStats a)
|
||||
newPeriodStats :: IO PeriodStats
|
||||
newPeriodStats = do
|
||||
day <- newIORef S.empty
|
||||
week <- newIORef S.empty
|
||||
month <- newIORef S.empty
|
||||
day <- newIORef IS.empty
|
||||
week <- newIORef IS.empty
|
||||
month <- newIORef IS.empty
|
||||
pure PeriodStats {day, week, month}
|
||||
|
||||
data PeriodStatsData a = PeriodStatsData
|
||||
{ _day :: Set a,
|
||||
_week :: Set a,
|
||||
_month :: Set a
|
||||
data PeriodStatsData = PeriodStatsData
|
||||
{ _day :: IntSet,
|
||||
_week :: IntSet,
|
||||
_month :: IntSet
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
newPeriodStatsData :: PeriodStatsData a
|
||||
newPeriodStatsData = PeriodStatsData {_day = S.empty, _week = S.empty, _month = S.empty}
|
||||
newPeriodStatsData :: PeriodStatsData
|
||||
newPeriodStatsData = PeriodStatsData {_day = IS.empty, _week = IS.empty, _month = IS.empty}
|
||||
|
||||
getPeriodStatsData :: PeriodStats a -> IO (PeriodStatsData a)
|
||||
getPeriodStatsData :: PeriodStats -> IO PeriodStatsData
|
||||
getPeriodStatsData s = do
|
||||
_day <- readIORef $ day s
|
||||
_week <- readIORef $ week s
|
||||
@@ -583,20 +587,22 @@ getPeriodStatsData s = do
|
||||
pure PeriodStatsData {_day, _week, _month}
|
||||
|
||||
-- this function is not thread safe, it is used on server start only
|
||||
setPeriodStats :: PeriodStats a -> PeriodStatsData a -> IO ()
|
||||
setPeriodStats :: PeriodStats -> PeriodStatsData -> IO ()
|
||||
setPeriodStats s d = do
|
||||
writeIORef (day s) $! _day d
|
||||
writeIORef (week s) $! _week d
|
||||
writeIORef (month s) $! _month d
|
||||
|
||||
instance (Ord a, StrEncoding a) => StrEncoding (PeriodStatsData a) where
|
||||
instance StrEncoding PeriodStatsData where
|
||||
strEncode PeriodStatsData {_day, _week, _month} =
|
||||
"day=" <> strEncode _day <> "\nweek=" <> strEncode _week <> "\nmonth=" <> strEncode _month
|
||||
"dayHashes=" <> strEncode _day <> "\nweekHashes=" <> strEncode _week <> "\nmonthHashes=" <> strEncode _month
|
||||
strP = do
|
||||
_day <- "day=" *> strP <* A.endOfLine
|
||||
_week <- "week=" *> strP <* A.endOfLine
|
||||
_month <- "month=" *> strP
|
||||
_day <- ("day=" *> bsSetP <|> "dayHashes=" *> strP) <* A.endOfLine
|
||||
_week <- ("week=" *> bsSetP <|> "weekHashes=" *> strP) <* A.endOfLine
|
||||
_month <- "month=" *> bsSetP <|> "monthHashes=" *> strP
|
||||
pure PeriodStatsData {_day, _week, _month}
|
||||
where
|
||||
bsSetP = S.foldl' (\s -> (`IS.insert` s) . hash) IS.empty <$> strP @(Set ByteString)
|
||||
|
||||
data PeriodStatCounts = PeriodStatCounts
|
||||
{ dayCount :: String,
|
||||
@@ -604,7 +610,7 @@ data PeriodStatCounts = PeriodStatCounts
|
||||
monthCount :: String
|
||||
}
|
||||
|
||||
periodStatCounts :: forall a. PeriodStats a -> UTCTime -> IO PeriodStatCounts
|
||||
periodStatCounts :: PeriodStats -> UTCTime -> IO PeriodStatCounts
|
||||
periodStatCounts ps ts = do
|
||||
let d = utctDay ts
|
||||
(_, wDay) = mondayStartWeek d
|
||||
@@ -614,17 +620,18 @@ periodStatCounts ps ts = do
|
||||
monthCount <- periodCount mDay $ month ps
|
||||
pure PeriodStatCounts {dayCount, weekCount, monthCount}
|
||||
where
|
||||
periodCount :: Int -> IORef (Set a) -> IO String
|
||||
periodCount 1 ref = show . S.size <$> atomicSwapIORef ref S.empty
|
||||
periodCount :: Int -> IORef IntSet -> IO String
|
||||
periodCount 1 ref = show . IS.size <$> atomicSwapIORef ref IS.empty
|
||||
periodCount _ _ = pure ""
|
||||
|
||||
updatePeriodStats :: Ord a => PeriodStats a -> a -> IO ()
|
||||
updatePeriodStats ps pId = do
|
||||
updatePeriodStats :: PeriodStats -> EntityId -> IO ()
|
||||
updatePeriodStats ps (EntityId pId) = do
|
||||
updatePeriod $ day ps
|
||||
updatePeriod $ week ps
|
||||
updatePeriod $ month ps
|
||||
where
|
||||
updatePeriod ref = unlessM (S.member pId <$> readIORef ref) $ atomicModifyIORef'_ ref $ S.insert pId
|
||||
ph = hash pId
|
||||
updatePeriod ref = unlessM (IS.member ph <$> readIORef ref) $ atomicModifyIORef'_ ref $ IS.insert ph
|
||||
|
||||
data ProxyStats = ProxyStats
|
||||
{ pRequests :: IORef Int,
|
||||
|
||||
@@ -25,7 +25,8 @@ import Data.Bifunctor (first)
|
||||
import Data.ByteString.Base64
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.Set as S
|
||||
import Data.Hashable (hash)
|
||||
import qualified Data.IntSet as IS
|
||||
import Data.Type.Equality
|
||||
import GHC.Stack (withFrozenCallStack)
|
||||
import SMPClient
|
||||
@@ -675,9 +676,9 @@ checkStats s qs sent received = do
|
||||
_msgSentNtf s `shouldBe` 0
|
||||
_msgRecvNtf s `shouldBe` 0
|
||||
let PeriodStatsData {_day, _week, _month} = _activeQueues s
|
||||
S.toList _day `shouldBe` qs
|
||||
S.toList _week `shouldBe` qs
|
||||
S.toList _month `shouldBe` qs
|
||||
IS.toList _day `shouldBe` map (hash . unEntityId) qs
|
||||
IS.toList _week `shouldBe` map (hash . unEntityId) qs
|
||||
IS.toList _month `shouldBe` map (hash . unEntityId) qs
|
||||
|
||||
testRestoreExpireMessages :: ATransport -> Spec
|
||||
testRestoreExpireMessages at@(ATransport t) =
|
||||
|
||||
Reference in New Issue
Block a user