From 3d9e5a501ecdc12358a551bbf4f714d7798dc93e Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Mon, 13 May 2024 22:34:33 +0300 Subject: [PATCH] draft SEND stats update --- src/Simplex/Messaging/Server.hs | 49 +++++++++++++++++-- src/Simplex/Messaging/Server/Env/STM.hs | 23 ++++++--- src/Simplex/Messaging/Server/Stats/Client.hs | 19 ++++--- .../Messaging/Server/Stats/Timeline.hs | 11 +++-- 4 files changed, 79 insertions(+), 23 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index d62e79987..bda4a4572 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -80,6 +80,7 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.STM as QS import Simplex.Messaging.Server.Stats +import qualified Simplex.Messaging.Server.Stats.Client as CS import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -440,10 +441,14 @@ runClientTransport :: Transport c => THandleSMP c 'TServer -> M () runClientTransport h@THandle {connection, params = THandleParams {thVersion, sessionId}} = do q <- asks $ tbqSize . config ts <- liftIO getSystemTime + statsIds <- asks statsClients active <- asks clients nextClientId <- asks clientSeq + let peerId = getPeerId connection + skipStats = False -- TODO: check peerId c <- atomically $ do - new@Client {clientId} <- newClient (getPeerId connection) nextClientId q thVersion sessionId ts + new@Client {clientId} <- newClient peerId nextClientId q thVersion sessionId ts + unless skipStats $ modifyTVar' statsIds $ IM.insert clientId clientId -- until merged, its own fresh id is its stats id modifyTVar' active $ IM.insert clientId new pure new s <- asks server @@ -631,7 +636,7 @@ dummyKeyX25519 :: C.PublicKey 'C.X25519 dummyKeyX25519 = "MCowBQYDK2VuAyEA4JGSMYht18H4mas/jHeBwfcM7jLwNYJNOAhi2/g4RXg=" client :: Client -> Server -> M () -client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Server {subscribedQ, ntfSubscribedQ, notifiers} = do +client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Server {subscribedQ, ntfSubscribedQ, notifiers} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" forever $ atomically (readTBQueue rcvQ) @@ -887,11 +892,49 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv atomically $ modifyTVar' (msgSent stats) (+ 1) atomically $ modifyTVar' (msgCount stats) (+ 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) - -- TODO: increment client S counter + + onwers' <- asks sendSignedClients -- TMap RecipientId (TVar ClientStatsId) + statIds' <- asks statsClients -- TVar (IntMap ClientStatsId) + stats' <- asks clientStats -- TVar (IntMap ClientStats) + now <- liftIO getCurrentTime + atomically $ case senderKey qr of + Nothing -> do + -- unsecured queue, no merging + currentStatsId_ <- IM.lookup clientId <$> readTVar statIds' + forM_ currentStatsId_ $ \statsId -> do + cs <- getClientStats stats' statsId now + -- XXX: perhaps only merging has to be atomic, with the var on hands, it could be a round of smaller transactions + modifyTVar' (CS.msgSentUnsigned cs) (+ 1) + Just _ -> do + -- secured queue, merging is possible + currentStatsId_ <- IM.lookup clientId <$> readTVar statIds' + forM_ currentStatsId_ $ \currentStatsId -> do + owners <- readTVar onwers' + statsId <- forM (M.lookup (recipientId qr) owners) readTVar >>= \case + Just ownerId | ownerId == currentStatsId -> pure ownerId -- keep going + Just olderSessionId -> do + -- TODO: merge client stats + pure olderSessionId + -- olderSessionId <$ mergeClientStats owners olderSessionId currentStatsId + Nothing -> do -- claim queue ownership (should've happened on NEW instead) + newOwner <- newTVar currentStatsId + writeTVar onwers' $ M.insert (recipientId qr) newOwner owners + pure currentStatsId + cs <- getClientStats stats' statsId now + modifyTVar' (CS.msgSentSigned cs) (+ 1) -- TODO: increment current S counter in IP timeline -- TODO: increment current S counter in server timeline pure ok where + getClientStats stats' statsId now = do + stats <- readTVar stats' + case IM.lookup statsId stats of + Nothing -> do + new <- CS.newClientStats newTVar peerId now + writeTVar stats' $ IM.insert statsId new stats + pure new + Just cs -> cs <$ writeTVar (CS.updatedAt cs) now + mkMessage :: C.MaxLenBS MaxMessageLen -> M Message mkMessage body = do msgId <- randomId =<< asks (msgIdBytes . config) diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 6ab1b1baa..28792a0b1 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -15,8 +15,10 @@ import qualified Data.IntMap.Strict as IM import Data.List.NonEmpty (NonEmpty) import Data.Map.Strict (Map) import qualified Data.Map.Strict as M +import Data.IntPSQ (IntPSQ) +import qualified Data.IntPSQ as IP import Data.Time.Clock (getCurrentTime) -import Data.Time.Clock.POSIX (getPOSIXTime) +import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime) import Data.Time.Clock.System (SystemTime, systemToUTCTime) import Data.X509.Validation (Fingerprint (..)) import Network.Socket (ServiceName) @@ -31,7 +33,7 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..)) import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.Stats -import Simplex.Messaging.Server.Stats.Client (ClientStats, newClientStats) +import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsId, newClientStats) import Simplex.Messaging.Server.Stats.Timeline (Timeline, newTimeline, perMinute) import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) @@ -116,8 +118,11 @@ data Env = Env storeLog :: Maybe (StoreLog 'WriteMode), tlsServerParams :: T.ServerParams, serverStats :: ServerStats, - qCreatedByIp :: Timeline, - msgSentByIp :: Timeline, + qCreatedByIp :: Timeline Int, + msgSentByIp :: Timeline Int, + clientStats :: TVar (IntMap ClientStats), -- transitive session stats + statsClients :: TVar (IntMap ClientStatsId), -- reverse index from active clients + sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their owners sockets :: SocketState, clientSeq :: TVar Int, clients :: TVar (IntMap Client) @@ -134,7 +139,7 @@ data Server = Server data Client = Client { clientId :: Int, peerId :: PeerId, -- send updates for this Id to time series - clientStats :: ClientStats, -- capture final values on disconnect + -- socketStats :: ClientStats, -- TODO: measure and export histogram on disconnect subscriptions :: TMap RecipientId (TVar Sub), ntfSubscriptions :: TMap NotifierId (), rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)), @@ -179,8 +184,7 @@ newClient peerId nextClientId qSize thVersion sessionId createdAt = do connected <- newTVar True rcvActiveAt <- newTVar createdAt sndActiveAt <- newTVar createdAt - clientStats <- newClientStats newTVar (systemToUTCTime createdAt) - return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, peerId, clientStats} + return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, peerId} newSubscription :: SubscriptionThread -> STM Sub newSubscription subThread = do @@ -204,7 +208,10 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, now <- getPOSIXTime qCreatedByIp <- atomically $ newTimeline perMinute now msgSentByIp <- atomically $ newTimeline perMinute now - return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, qCreatedByIp, msgSentByIp, sockets, clientSeq, clients} + clientStats <- newTVarIO mempty + statsClients <- newTVarIO mempty + sendSignedClients <- newTVarIO mempty + return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, qCreatedByIp, msgSentByIp, clientStats, statsClients, sendSignedClients, sockets, clientSeq, clients} where restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode) restoreQueues QueueStore {queues, senders, notifiers} f = do diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs index 22a29a5d2..d5badcaca 100644 --- a/src/Simplex/Messaging/Server/Stats/Client.hs +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -15,6 +15,8 @@ import Data.IntMap (IntMap) import qualified Data.IntMap.Strict as IM import Data.IntPSQ (IntPSQ) import qualified Data.IntPSQ as IP +import Data.IntSet (IntSet) +import qualified Data.IntSet as IS import Data.Monoid (getSum) import Data.Set (Set) import qualified Data.Set as S @@ -28,13 +30,16 @@ import Simplex.Messaging.Protocol (RecipientId) import Simplex.Messaging.Transport (PeerId) import UnliftIO.STM +-- | Ephemeral client ID across reconnects +type ClientStatsId = Int + data ClientStats = ClientStats - { peerAddresses :: TVar (Set PeerId), + { peerAddresses :: TVar IntSet, -- cumulative set of used PeerIds socketCount :: TVar Int, createdAt :: TVar UTCTime, updatedAt :: TVar UTCTime, - qCreated :: TVar (Set RecipientId), - qSentSigned :: TVar (Set RecipientId), + qCreated :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs + qSentSigned :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs msgSentSigned :: TVar Int, msgSentUnsigned :: TVar Int, msgSentViaProxy :: TVar Int, @@ -43,7 +48,7 @@ data ClientStats = ClientStats -- may be combined with session duration to produce average rates (q/s, msg/s) data ClientStatsData = ClientStatsData - { _peerAddresses :: Set PeerId, + { _peerAddresses :: IntSet, _socketCount :: Int, _createdAt :: UTCTime, _updatedAt :: UTCTime, @@ -55,9 +60,9 @@ data ClientStatsData = ClientStatsData _msgDeliveredSigned :: Int } -newClientStats :: Monad m => (forall a. a -> m (TVar a)) -> UTCTime -> m ClientStats -newClientStats newF ts = do - peerAddresses <- newF mempty +newClientStats :: Monad m => (forall a. a -> m (TVar a)) -> PeerId -> UTCTime -> m ClientStats +newClientStats newF peerId ts = do + peerAddresses <- newF $ IS.singleton peerId socketCount <- newF 0 createdAt <- newF ts updatedAt <- newF ts diff --git a/src/Simplex/Messaging/Server/Stats/Timeline.hs b/src/Simplex/Messaging/Server/Stats/Timeline.hs index 112823f4e..77bf7ba97 100644 --- a/src/Simplex/Messaging/Server/Stats/Timeline.hs +++ b/src/Simplex/Messaging/Server/Stats/Timeline.hs @@ -26,12 +26,13 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (RecipientId) import UnliftIO.STM -type Timeline = (TVar SparseSeries, Current) +-- A time series of counters with an active head +type Timeline a = (TVar SparseSeries, Current a) -newTimeline :: QuantFun -> POSIXTime -> STM Timeline +newTimeline :: forall a. QuantFun -> POSIXTime -> STM (Timeline a) newTimeline quantF now = (,current) <$> newTVar IP.empty where - current :: Current + current :: Current a current = (quantF, quantF now, mempty) -- Sparse timeseries with 1 second resolution (or more coarse): @@ -47,7 +48,7 @@ type BucketId = Word32 type QuantFun = POSIXTime -> BucketId -- Current bucket that gets filled -type Current = (QuantFun, BucketId, IntMap (TVar Int)) +type Current a = (QuantFun, BucketId, IntMap (TVar a)) perSecond :: POSIXTime -> BucketId perSecond = truncate @@ -58,7 +59,7 @@ perMinute = (60 `secondsWidth`) secondsWidth :: NominalDiffTime -> POSIXTime -> BucketId secondsWidth w t = truncate $ t / w -finishCurrent :: POSIXTime -> Timeline -> STM Timeline +finishCurrent :: POSIXTime -> Timeline a -> STM (Timeline a) finishCurrent now (series, current) = error "TODO: read/reset current, push into series, evict minimal when it falls out of scope" type WindowData = IntMap Int -- PeerId -> counter