diff --git a/package.yaml b/package.yaml index 610266e82..26936b4af 100644 --- a/package.yaml +++ b/package.yaml @@ -47,6 +47,7 @@ dependencies: - direct-sqlcipher == 2.3.* - directory == 1.3.* - filepath == 1.4.* + - hashable == 1.4.* - hourglass == 0.2.* - http-types == 0.12.* - http2 >= 4.2.2 && < 4.3 diff --git a/simplexmq.cabal b/simplexmq.cabal index 5426aa4ca..90b507f3c 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -236,6 +236,7 @@ library , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -310,6 +311,7 @@ executable ntf-server , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -389,6 +391,7 @@ executable smp-server , directory ==1.3.* , file-embed , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -467,6 +470,7 @@ executable xftp , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -542,6 +546,7 @@ executable xftp-server , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -653,6 +658,7 @@ test-suite simplexmq-test , directory ==1.3.* , filepath ==1.4.* , generic-random ==1.5.* + , hashable ==1.4.* , hourglass ==0.2.* , hspec ==2.11.* , hspec-core ==2.11.* diff --git a/src/Simplex/FileTransfer/Server/Stats.hs b/src/Simplex/FileTransfer/Server/Stats.hs index 8737e6f78..a7951a65a 100644 --- a/src/Simplex/FileTransfer/Server/Stats.hs +++ b/src/Simplex/FileTransfer/Server/Stats.hs @@ -11,7 +11,6 @@ import Data.IORef import Data.Int (Int64) import Data.Time.Clock (UTCTime) import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Protocol (SenderId) import Simplex.Messaging.Server.Stats (PeriodStats, PeriodStatsData, getPeriodStatsData, newPeriodStats, setPeriodStats) data FileServerStats = FileServerStats @@ -21,7 +20,7 @@ data FileServerStats = FileServerStats filesUploaded :: IORef Int, filesExpired :: IORef Int, filesDeleted :: IORef Int, - filesDownloaded :: PeriodStats SenderId, + filesDownloaded :: PeriodStats, fileDownloads :: IORef Int, fileDownloadAcks :: IORef Int, filesCount :: IORef Int, @@ -35,7 +34,7 @@ data FileServerStatsData = FileServerStatsData _filesUploaded :: Int, _filesExpired :: Int, _filesDeleted :: Int, - _filesDownloaded :: PeriodStatsData SenderId, + _filesDownloaded :: PeriodStatsData, _fileDownloads :: Int, _fileDownloadAcks :: Int, _filesCount :: Int, diff --git a/src/Simplex/Messaging/Encoding/String.hs b/src/Simplex/Messaging/Encoding/String.hs index 8995b9679..97e7d087b 100644 --- a/src/Simplex/Messaging/Encoding/String.hs +++ b/src/Simplex/Messaging/Encoding/String.hs @@ -28,6 +28,8 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Char (isAlphaNum) import Data.Int (Int64) +import Data.IntSet (IntSet) +import qualified Data.IntSet as IS import qualified Data.List.NonEmpty as L import Data.Set (Set) import qualified Data.Set as S @@ -39,7 +41,7 @@ import Data.Time.Format.ISO8601 import Data.Word (Word16, Word32) import Simplex.Messaging.Encoding import Simplex.Messaging.Parsers (parseAll) -import Simplex.Messaging.Util ((<$?>)) +import Simplex.Messaging.Util (bshow, (<$?>)) class TextEncoding a where textEncode :: a -> Text @@ -125,15 +127,15 @@ instance StrEncoding Bool where {-# INLINE strP #-} instance StrEncoding Int where - strEncode = B.pack . show + strEncode = bshow {-# INLINE strEncode #-} - strP = A.decimal + strP = A.signed A.decimal {-# INLINE strP #-} instance StrEncoding Int64 where - strEncode = B.pack . show + strEncode = bshow {-# INLINE strEncode #-} - strP = A.decimal + strP = A.signed A.decimal {-# INLINE strP #-} instance StrEncoding SystemTime where @@ -160,6 +162,10 @@ instance (StrEncoding a, Ord a) => StrEncoding (Set a) where strEncode = strEncodeList . S.toList strP = S.fromList <$> listItem `A.sepBy'` A.char ',' +instance StrEncoding IntSet where + strEncode = strEncodeList . IS.toList + strP = IS.fromList <$> listItem `A.sepBy'` A.char ',' + listItem :: StrEncoding a => Parser a listItem = parseAll strP <$?> A.takeTill (\c -> c == ',' || c == ' ' || c == '\n') diff --git a/src/Simplex/Messaging/Notifications/Server/Stats.hs b/src/Simplex/Messaging/Notifications/Server/Stats.hs index 2c469e335..d05257664 100644 --- a/src/Simplex/Messaging/Notifications/Server/Stats.hs +++ b/src/Simplex/Messaging/Notifications/Server/Stats.hs @@ -10,8 +10,6 @@ import qualified Data.ByteString.Char8 as B import Data.IORef import Data.Time.Clock (UTCTime) import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Notifications.Protocol (NtfTokenId) -import Simplex.Messaging.Protocol (NotifierId) import Simplex.Messaging.Server.Stats data NtfServerStats = NtfServerStats @@ -23,8 +21,8 @@ data NtfServerStats = NtfServerStats subDeleted :: IORef Int, ntfReceived :: IORef Int, ntfDelivered :: IORef Int, - activeTokens :: PeriodStats NtfTokenId, - activeSubs :: PeriodStats NotifierId + activeTokens :: PeriodStats, + activeSubs :: PeriodStats } data NtfServerStatsData = NtfServerStatsData @@ -36,8 +34,8 @@ data NtfServerStatsData = NtfServerStatsData _subDeleted :: Int, _ntfReceived :: Int, _ntfDelivered :: Int, - _activeTokens :: PeriodStatsData NtfTokenId, - _activeSubs :: PeriodStatsData NotifierId + _activeTokens :: PeriodStatsData, + _activeSubs :: PeriodStatsData } newNtfServerStats :: UTCTime -> IO NtfServerStats diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index e84f26a5a..d5ba6695f 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -62,7 +62,6 @@ import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing) -import qualified Data.Set as S import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) @@ -472,7 +471,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do putStat "qDeletedAllB" qDeletedAllB putStat "qDeletedNew" qDeletedNew putStat "qDeletedSecured" qDeletedSecured - getStat (day . activeQueues) >>= \v -> hPutStrLn h $ "daily active queues: " <> show (S.size v) + getStat (day . activeQueues) >>= \v -> hPutStrLn h $ "daily active queues: " <> show (IS.size v) -- removed to reduce memory usage -- getStat (day . subscribedQueues) >>= \v -> hPutStrLn h $ "daily subscribed queues: " <> show (S.size v) putStat "qSub" qSub diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 60f55100e..097ddb607 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -10,8 +10,12 @@ module Simplex.Messaging.Server.Stats where import Control.Applicative (optional, (<|>)) import qualified Data.Attoparsec.ByteString.Char8 as A +import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import Data.Hashable (hash) import Data.IORef +import Data.IntSet (IntSet) +import qualified Data.IntSet as IS import Data.Set (Set) import qualified Data.Set as S import Data.Time.Calendar.Month (pattern MonthDay) @@ -19,7 +23,7 @@ import Data.Time.Calendar.OrdinalDate (mondayStartWeek) import Data.Time.Clock (UTCTime (..)) import GHC.IORef (atomicSwapIORef) import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Protocol (RecipientId) +import Simplex.Messaging.Protocol (EntityId (..)) import Simplex.Messaging.Util (atomicModifyIORef'_, unlessM) data ServerStats = ServerStats @@ -57,11 +61,11 @@ data ServerStats = ServerStats msgGetDuplicate :: IORef Int, msgGetProhibited :: IORef Int, msgExpired :: IORef Int, - activeQueues :: PeriodStats RecipientId, - -- subscribedQueues :: PeriodStats RecipientId, -- this stat uses too much memory + activeQueues :: PeriodStats, + -- subscribedQueues :: PeriodStats, -- this stat uses too much memory msgSentNtf :: IORef Int, -- sent messages with NTF flag msgRecvNtf :: IORef Int, -- received messages with NTF flag - activeQueuesNtf :: PeriodStats RecipientId, + activeQueuesNtf :: PeriodStats, msgNtfs :: IORef Int, -- messages notications delivered to NTF server (<= msgSentNtf) msgNtfNoSub :: IORef Int, -- no subscriber to notifications (e.g., NTF server not connected) msgNtfLost :: IORef Int, -- notification is lost because NTF delivery queue is full @@ -108,10 +112,10 @@ data ServerStatsData = ServerStatsData _msgGetDuplicate :: Int, _msgGetProhibited :: Int, _msgExpired :: Int, - _activeQueues :: PeriodStatsData RecipientId, + _activeQueues :: PeriodStatsData, _msgSentNtf :: Int, _msgRecvNtf :: Int, - _activeQueuesNtf :: PeriodStatsData RecipientId, + _activeQueuesNtf :: PeriodStatsData, _msgNtfs :: Int, _msgNtfNoSub :: Int, _msgNtfLost :: Int, @@ -483,7 +487,7 @@ instance StrEncoding ServerStatsData where pure PeriodStatsData {_day, _week, _month} _subscribedQueues <- optional ("subscribedQueues:" <* A.endOfLine) >>= \case - Just _ -> newPeriodStatsData <$ (strP @(PeriodStatsData RecipientId) <* optional A.endOfLine) + Just _ -> newPeriodStatsData <$ (strP @PeriodStatsData <* optional A.endOfLine) _ -> pure newPeriodStatsData _activeQueuesNtf <- optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case @@ -552,30 +556,30 @@ instance StrEncoding ServerStatsData where Just _ -> strP <* optional A.endOfLine _ -> pure newProxyStatsData -data PeriodStats a = PeriodStats - { day :: IORef (Set a), - week :: IORef (Set a), - month :: IORef (Set a) +data PeriodStats = PeriodStats + { day :: IORef IntSet, + week :: IORef IntSet, + month :: IORef IntSet } -newPeriodStats :: IO (PeriodStats a) +newPeriodStats :: IO PeriodStats newPeriodStats = do - day <- newIORef S.empty - week <- newIORef S.empty - month <- newIORef S.empty + day <- newIORef IS.empty + week <- newIORef IS.empty + month <- newIORef IS.empty pure PeriodStats {day, week, month} -data PeriodStatsData a = PeriodStatsData - { _day :: Set a, - _week :: Set a, - _month :: Set a +data PeriodStatsData = PeriodStatsData + { _day :: IntSet, + _week :: IntSet, + _month :: IntSet } deriving (Show) -newPeriodStatsData :: PeriodStatsData a -newPeriodStatsData = PeriodStatsData {_day = S.empty, _week = S.empty, _month = S.empty} +newPeriodStatsData :: PeriodStatsData +newPeriodStatsData = PeriodStatsData {_day = IS.empty, _week = IS.empty, _month = IS.empty} -getPeriodStatsData :: PeriodStats a -> IO (PeriodStatsData a) +getPeriodStatsData :: PeriodStats -> IO PeriodStatsData getPeriodStatsData s = do _day <- readIORef $ day s _week <- readIORef $ week s @@ -583,20 +587,22 @@ getPeriodStatsData s = do pure PeriodStatsData {_day, _week, _month} -- this function is not thread safe, it is used on server start only -setPeriodStats :: PeriodStats a -> PeriodStatsData a -> IO () +setPeriodStats :: PeriodStats -> PeriodStatsData -> IO () setPeriodStats s d = do writeIORef (day s) $! _day d writeIORef (week s) $! _week d writeIORef (month s) $! _month d -instance (Ord a, StrEncoding a) => StrEncoding (PeriodStatsData a) where +instance StrEncoding PeriodStatsData where strEncode PeriodStatsData {_day, _week, _month} = - "day=" <> strEncode _day <> "\nweek=" <> strEncode _week <> "\nmonth=" <> strEncode _month + "dayHashes=" <> strEncode _day <> "\nweekHashes=" <> strEncode _week <> "\nmonthHashes=" <> strEncode _month strP = do - _day <- "day=" *> strP <* A.endOfLine - _week <- "week=" *> strP <* A.endOfLine - _month <- "month=" *> strP + _day <- ("day=" *> bsSetP <|> "dayHashes=" *> strP) <* A.endOfLine + _week <- ("week=" *> bsSetP <|> "weekHashes=" *> strP) <* A.endOfLine + _month <- "month=" *> bsSetP <|> "monthHashes=" *> strP pure PeriodStatsData {_day, _week, _month} + where + bsSetP = S.foldl' (\s -> (`IS.insert` s) . hash) IS.empty <$> strP @(Set ByteString) data PeriodStatCounts = PeriodStatCounts { dayCount :: String, @@ -604,7 +610,7 @@ data PeriodStatCounts = PeriodStatCounts monthCount :: String } -periodStatCounts :: forall a. PeriodStats a -> UTCTime -> IO PeriodStatCounts +periodStatCounts :: PeriodStats -> UTCTime -> IO PeriodStatCounts periodStatCounts ps ts = do let d = utctDay ts (_, wDay) = mondayStartWeek d @@ -614,17 +620,18 @@ periodStatCounts ps ts = do monthCount <- periodCount mDay $ month ps pure PeriodStatCounts {dayCount, weekCount, monthCount} where - periodCount :: Int -> IORef (Set a) -> IO String - periodCount 1 ref = show . S.size <$> atomicSwapIORef ref S.empty + periodCount :: Int -> IORef IntSet -> IO String + periodCount 1 ref = show . IS.size <$> atomicSwapIORef ref IS.empty periodCount _ _ = pure "" -updatePeriodStats :: Ord a => PeriodStats a -> a -> IO () -updatePeriodStats ps pId = do +updatePeriodStats :: PeriodStats -> EntityId -> IO () +updatePeriodStats ps (EntityId pId) = do updatePeriod $ day ps updatePeriod $ week ps updatePeriod $ month ps where - updatePeriod ref = unlessM (S.member pId <$> readIORef ref) $ atomicModifyIORef'_ ref $ S.insert pId + ph = hash pId + updatePeriod ref = unlessM (IS.member ph <$> readIORef ref) $ atomicModifyIORef'_ ref $ IS.insert ph data ProxyStats = ProxyStats { pRequests :: IORef Int, diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index d3154e228..63508a033 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -25,7 +25,8 @@ import Data.Bifunctor (first) import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import qualified Data.Set as S +import Data.Hashable (hash) +import qualified Data.IntSet as IS import Data.Type.Equality import GHC.Stack (withFrozenCallStack) import SMPClient @@ -675,9 +676,9 @@ checkStats s qs sent received = do _msgSentNtf s `shouldBe` 0 _msgRecvNtf s `shouldBe` 0 let PeriodStatsData {_day, _week, _month} = _activeQueues s - S.toList _day `shouldBe` qs - S.toList _week `shouldBe` qs - S.toList _month `shouldBe` qs + IS.toList _day `shouldBe` map (hash . unEntityId) qs + IS.toList _week `shouldBe` map (hash . unEntityId) qs + IS.toList _month `shouldBe` map (hash . unEntityId) qs testRestoreExpireMessages :: ATransport -> Spec testRestoreExpireMessages at@(ATransport t) =