diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index d7eb652ce..cfd05b87f 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -680,8 +680,13 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, ServerConfig {allowSMPProxy, newQueueBasicAuth} <- asks config pure $ allowSMPProxy && maybe True ((== auth) . Just) newQueueBasicAuth getRelay = do + withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.proxyRelaysRequested cs) (+ 1) ProxyAgent {smpAgent} <- asks proxyAgent - liftIO $ proxyResp <$> runExceptT (getSMPServerClient' smpAgent srv) `catch` (pure . Left . PCEIOError) + r <- liftIO $ proxyResp <$> runExceptT (getSMPServerClient' smpAgent srv) `catch` (pure . Left . PCEIOError) + case r of + PKEY {} -> withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.proxyRelaysConnected cs) (+ 1) + _ -> pure () + pure r where proxyResp = \case Left err -> ERR $ smpProxyError err @@ -698,9 +703,13 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, ProxyAgent {smpAgent} <- asks proxyAgent atomically (lookupSMPServerClient smpAgent sessId) >>= \case Just smp - | v >= sendingProxySMPVersion -> - liftIO $ either (ERR . smpProxyError) PRES <$> + | v >= sendingProxySMPVersion -> do + r <- liftIO $ either (ERR . smpProxyError) PRES <$> runExceptT (forwardSMPMessage smp corrId fwdV pubKey encBlock) `catchError` (pure . Left . PCEIOError) + case r of + PRES {} -> withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.msgSentViaProxy cs) (+ 1) + _ -> pure () + pure r | otherwise -> pure . ERR $ transportErr TEVersion where THandleParams {thVersion = v} = thParams smp @@ -773,13 +782,7 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, stats <- asks serverStats atomically $ modifyTVar' (qCreated stats) (+ 1) atomically $ modifyTVar' (qCount stats) (+ 1) - - now <- liftIO getCurrentTime - statsIds' <- asks statsClients -- TVar (IntMap ClientStatsId) - stats' <- asks clientStats -- TVar (IntMap ClientStats) - atomically $ withClientStatId statsIds' $ \statsId -> do - cs <- getClientStats stats' statsId now - modifyTVar' (CS.qCreated cs) $ S.insert rId + withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.qCreated cs) $ S.insert rId -- TODO: increment current Q counter in IP timeline -- TODO: increment current Q counter in server timeline case subMode of @@ -970,36 +973,12 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, atomically $ modifyTVar' (msgSent stats) (+ 1) atomically $ modifyTVar' (msgCount stats) (+ 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) - - logDebug $ "Senders gonna send..." - senders' <- asks sendSignedClients -- TMap RecipientId (TVar ClientStatsId) - statsIds' <- asks statsClients -- TVar (IntMap ClientStatsId) - stats' <- asks clientStats -- TVar (IntMap ClientStats) - now <- liftIO getCurrentTime - atomically $ case senderKey qr of - Nothing -> withClientStatId statsIds' $ \statsId -> do - -- unsecured queue, no merging - cs <- getClientStats stats' statsId now - -- XXX: perhaps only merging has to be atomic, with the var on hands, it could be a round of smaller transactions - modifyTVar' (CS.msgSentUnsigned cs) (+ 1) - Just _secured -> withClientStatId statsIds' $ \currentStatsId -> do - -- secured queue, merging is possible - senders <- readTVar senders' - statsId <- case M.lookup (recipientId qr) senders of - Nothing -> do - newOwner <- newTVar currentStatsId - writeTVar senders' $ M.insert (recipientId qr) newOwner senders - pure currentStatsId - Just sender -> do - prevStatsId <- readTVar sender - unless (prevStatsId == currentStatsId) $ do - modifyTVar' statsIds' $ IM.insert clientId prevStatsId - qsToTransfer <- mergeClientStats stats' prevStatsId currentStatsId - unless (S.null qsToTransfer) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k sender os) senders qsToTransfer - pure prevStatsId - cs <- getClientStats stats' statsId now - modifyTVar' (CS.qSentSigned cs) $ S.insert (recipientId qr) - modifyTVar' (CS.msgSentSigned cs) (+ 1) + case senderKey qr of + Nothing -> withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.msgSentUnsigned cs) (+ 1) + Just _secured -> do + withMergedClientStatsId qr $ \cs -> do + atomically $ modifyTVar' (CS.qSentSigned cs) $ S.insert (recipientId qr) + atomically $ modifyTVar' (CS.msgSentSigned cs) (+ 1) -- TODO: increment current S counter in IP timeline -- TODO: increment current S counter in server timeline pure ok @@ -1158,32 +1137,61 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, okResp :: Either ErrorType () -> Transmission BrokerMsg okResp = either err $ const ok - -- missing clientId entry means the client is exempt from stats - withClientStatId statsIds' action = readTVar statsIds' >>= mapM_ action . IM.lookup clientId + -- missing clientId entry means the client is exempt from stats + withClientStatsId_ statsIds' getCS = IM.lookup clientId <$> readTVar statsIds' >>= mapM getCS - getClientStats stats' statsId now = do - stats <- readTVar stats' - case IM.lookup statsId stats of - Nothing -> do - new <- CS.newClientStats newTVar peerId now - writeTVar stats' $ IM.insert statsId new stats - pure new - Just cs -> cs <$ writeTVar (CS.updatedAt cs) now + withClientStatsId updateCS = do + statsIds' <- asks statsClients + stats' <- asks clientStats + now <- liftIO getCurrentTime + atomically (withClientStatsId_ statsIds' $ getClientStats stats' now) >>= mapM_ updateCS - mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId) - mergeClientStats stats' prevId curId = do - stats <- readTVar stats' - case (IM.lookup prevId stats, IM.lookup curId stats) of - (_, Nothing) -> pure mempty - (Nothing, Just cur@CS.ClientStats {qCreated}) -> do - writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats) - readTVar qCreated - (Just prev, Just cur) -> do - curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur - prevData <- CS.readClientStatsData readTVar prev - CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData - writeTVar stats' $ IM.delete curId stats - pure _qCreated + getClientStats stats' now statsId = do + stats <- readTVar stats' + case IM.lookup statsId stats of + Nothing -> do + new <- CS.newClientStats newTVar peerId now + writeTVar stats' $ IM.insert statsId new stats + pure new + Just cs -> cs <$ writeTVar (CS.updatedAt cs) now + + withMergedClientStatsId qr updateCS = do + senders' <- asks sendSignedClients + statsIds' <- asks statsClients + stats' <- asks clientStats + now <- liftIO getCurrentTime + atomically (withClientStatsId_ statsIds' $ getMergeClientStats senders' statsIds' stats' now qr) >>= mapM_ updateCS + + getMergeClientStats senders' statsIds' stats' now qr currentStatsId = do + senders <- readTVar senders' + statsId <- case M.lookup (recipientId qr) senders of + Nothing -> do + newOwner <- newTVar currentStatsId + writeTVar senders' $ M.insert (recipientId qr) newOwner senders + pure currentStatsId + Just sender -> do + prevStatsId <- readTVar sender + unless (prevStatsId == currentStatsId) $ do + modifyTVar' statsIds' $ IM.insert clientId prevStatsId + qsToTransfer <- mergeClientStats stats' prevStatsId currentStatsId + unless (S.null qsToTransfer) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k sender os) senders qsToTransfer + pure prevStatsId + getClientStats stats' now statsId + + mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId) + mergeClientStats stats' prevId curId = do + stats <- readTVar stats' + case (IM.lookup prevId stats, IM.lookup curId stats) of + (_, Nothing) -> pure mempty + (Nothing, Just cur@CS.ClientStats {qCreated}) -> do + writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats) + readTVar qCreated + (Just prev, Just cur) -> do + curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur + prevData <- CS.readClientStatsData readTVar prev + CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData + writeTVar stats' $ IM.delete curId stats + pure _qCreated updateDeletedStats :: QueueRec -> M () updateDeletedStats q = do diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 1f0a045f5..2f02d78c5 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -15,11 +15,9 @@ import qualified Data.IntMap.Strict as IM import Data.List.NonEmpty (NonEmpty) import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.IntPSQ (IntPSQ) -import qualified Data.IntPSQ as IP import Data.Time.Clock (getCurrentTime) -import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime) -import Data.Time.Clock.System (SystemTime, systemToUTCTime) +import Data.Time.Clock.POSIX (getPOSIXTime) +import Data.Time.Clock.System (SystemTime) import Data.X509.Validation (Fingerprint (..)) import Network.Socket (ServiceName) import qualified Network.TLS as T @@ -34,7 +32,7 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..)) import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.Stats -import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsId, newClientStats) +import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsId) import Simplex.Messaging.Server.Stats.Timeline (Timeline, newTimeline, perMinute) import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs index 285e2b255..03722256e 100644 --- a/src/Simplex/Messaging/Server/Stats/Client.hs +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -28,8 +28,10 @@ data ClientStats = ClientStats qSentSigned :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs msgSentSigned :: TVar Int, msgSentUnsigned :: TVar Int, - msgSentViaProxy :: TVar Int, -- TODO - msgDeliveredSigned :: TVar Int + msgDeliveredSigned :: TVar Int, + proxyRelaysRequested :: TVar Int, + proxyRelaysConnected :: TVar Int, + msgSentViaProxy :: TVar Int } -- may be combined with session duration to produce average rates (q/s, msg/s) @@ -42,8 +44,10 @@ data ClientStatsData = ClientStatsData _qSentSigned :: Set RecipientId, _msgSentSigned :: Int, _msgSentUnsigned :: Int, - _msgSentViaProxy :: Int, - _msgDeliveredSigned :: Int + _msgDeliveredSigned :: Int, + _proxyRelaysRequested :: Int, + _proxyRelaysConnected :: Int, + _msgSentViaProxy :: Int } newClientStats :: Monad m => (forall a. a -> m (TVar a)) -> PeerId -> UTCTime -> m ClientStats @@ -56,8 +60,10 @@ newClientStats newF peerId ts = do qSentSigned <- newF mempty msgSentSigned <- newF 0 msgSentUnsigned <- newF 0 - msgSentViaProxy <- newF 0 msgDeliveredSigned <- newF 0 + proxyRelaysRequested <- newF 0 + proxyRelaysConnected <- newF 0 + msgSentViaProxy <- newF 0 pure ClientStats { peerAddresses, @@ -68,8 +74,10 @@ newClientStats newF peerId ts = do qSentSigned, msgSentSigned, msgSentUnsigned, - msgSentViaProxy, - msgDeliveredSigned + msgDeliveredSigned, + proxyRelaysRequested, + proxyRelaysConnected, + msgSentViaProxy } {-# INLINE newClientStats #-} @@ -83,8 +91,10 @@ readClientStatsData readF cs = do _qSentSigned <- readF $ qSentSigned cs _msgSentSigned <- readF $ msgSentSigned cs _msgSentUnsigned <- readF $ msgSentUnsigned cs - _msgSentViaProxy <- readF $ msgSentViaProxy cs _msgDeliveredSigned <- readF $ msgDeliveredSigned cs + _proxyRelaysRequested <- readF $ proxyRelaysRequested cs + _proxyRelaysConnected <- readF $ proxyRelaysConnected cs + _msgSentViaProxy <- readF $ msgSentViaProxy cs pure ClientStatsData { _peerAddresses, @@ -95,8 +105,10 @@ readClientStatsData readF cs = do _qSentSigned, _msgSentSigned, _msgSentUnsigned, - _msgSentViaProxy, - _msgDeliveredSigned + _msgDeliveredSigned, + _proxyRelaysRequested, + _proxyRelaysConnected, + _msgSentViaProxy } {-# INLINE readClientStatsData #-} @@ -110,8 +122,10 @@ writeClientStatsData cs csd = do writeTVar (qSentSigned cs) (_qSentSigned csd) writeTVar (msgSentSigned cs) (_msgSentSigned csd) writeTVar (msgSentUnsigned cs) (_msgSentUnsigned csd) - writeTVar (msgSentViaProxy cs) (_msgSentViaProxy csd) writeTVar (msgDeliveredSigned cs) (_msgDeliveredSigned csd) + writeTVar (proxyRelaysRequested cs) (_proxyRelaysRequested csd) + writeTVar (proxyRelaysConnected cs) (_proxyRelaysConnected csd) + writeTVar (msgSentViaProxy cs) (_msgSentViaProxy csd) mergeClientStatsData :: ClientStatsData -> ClientStatsData -> ClientStatsData mergeClientStatsData a b = @@ -124,6 +138,8 @@ mergeClientStatsData a b = _qSentSigned = _qSentSigned a <> _qSentSigned b, _msgSentSigned = _msgSentSigned a + _msgSentSigned b, _msgSentUnsigned = _msgSentUnsigned a + _msgSentUnsigned b, - _msgSentViaProxy = _msgSentViaProxy a + _msgSentViaProxy b, - _msgDeliveredSigned = _msgDeliveredSigned a + _msgDeliveredSigned b + _msgDeliveredSigned = _msgDeliveredSigned a + _msgDeliveredSigned b, + _proxyRelaysRequested = _proxyRelaysRequested a + _proxyRelaysRequested b, + _proxyRelaysConnected = _proxyRelaysConnected a + _proxyRelaysConnected b, + _msgSentViaProxy = _msgSentViaProxy a + _msgSentViaProxy b }