draft SEND stats update

This commit is contained in:
Alexander Bondarenko
2024-05-13 22:34:33 +03:00
parent 2fcc8d0107
commit 3d9e5a501e
4 changed files with 79 additions and 23 deletions

View File

@@ -80,6 +80,7 @@ import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.STM as QS
import Simplex.Messaging.Server.Stats
import qualified Simplex.Messaging.Server.Stats.Client as CS
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
@@ -440,10 +441,14 @@ runClientTransport :: Transport c => THandleSMP c 'TServer -> M ()
runClientTransport h@THandle {connection, params = THandleParams {thVersion, sessionId}} = do
q <- asks $ tbqSize . config
ts <- liftIO getSystemTime
statsIds <- asks statsClients
active <- asks clients
nextClientId <- asks clientSeq
let peerId = getPeerId connection
skipStats = False -- TODO: check peerId
c <- atomically $ do
new@Client {clientId} <- newClient (getPeerId connection) nextClientId q thVersion sessionId ts
new@Client {clientId} <- newClient peerId nextClientId q thVersion sessionId ts
unless skipStats $ modifyTVar' statsIds $ IM.insert clientId clientId -- until merged, its own fresh id is its stats id
modifyTVar' active $ IM.insert clientId new
pure new
s <- asks server
@@ -631,7 +636,7 @@ dummyKeyX25519 :: C.PublicKey 'C.X25519
dummyKeyX25519 = "MCowBQYDK2VuAyEA4JGSMYht18H4mas/jHeBwfcM7jLwNYJNOAhi2/g4RXg="
client :: Client -> Server -> M ()
client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Server {subscribedQ, ntfSubscribedQ, notifiers} = do
client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Server {subscribedQ, ntfSubscribedQ, notifiers} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
forever $
atomically (readTBQueue rcvQ)
@@ -887,11 +892,49 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
atomically $ modifyTVar' (msgSent stats) (+ 1)
atomically $ modifyTVar' (msgCount stats) (+ 1)
atomically $ updatePeriodStats (activeQueues stats) (recipientId qr)
-- TODO: increment client S counter
onwers' <- asks sendSignedClients -- TMap RecipientId (TVar ClientStatsId)
statIds' <- asks statsClients -- TVar (IntMap ClientStatsId)
stats' <- asks clientStats -- TVar (IntMap ClientStats)
now <- liftIO getCurrentTime
atomically $ case senderKey qr of
Nothing -> do
-- unsecured queue, no merging
currentStatsId_ <- IM.lookup clientId <$> readTVar statIds'
forM_ currentStatsId_ $ \statsId -> do
cs <- getClientStats stats' statsId now
-- XXX: perhaps only merging has to be atomic, with the var on hands, it could be a round of smaller transactions
modifyTVar' (CS.msgSentUnsigned cs) (+ 1)
Just _ -> do
-- secured queue, merging is possible
currentStatsId_ <- IM.lookup clientId <$> readTVar statIds'
forM_ currentStatsId_ $ \currentStatsId -> do
owners <- readTVar onwers'
statsId <- forM (M.lookup (recipientId qr) owners) readTVar >>= \case
Just ownerId | ownerId == currentStatsId -> pure ownerId -- keep going
Just olderSessionId -> do
-- TODO: merge client stats
pure olderSessionId
-- olderSessionId <$ mergeClientStats owners olderSessionId currentStatsId
Nothing -> do -- claim queue ownership (should've happened on NEW instead)
newOwner <- newTVar currentStatsId
writeTVar onwers' $ M.insert (recipientId qr) newOwner owners
pure currentStatsId
cs <- getClientStats stats' statsId now
modifyTVar' (CS.msgSentSigned cs) (+ 1)
-- TODO: increment current S counter in IP timeline
-- TODO: increment current S counter in server timeline
pure ok
where
getClientStats stats' statsId now = do
stats <- readTVar stats'
case IM.lookup statsId stats of
Nothing -> do
new <- CS.newClientStats newTVar peerId now
writeTVar stats' $ IM.insert statsId new stats
pure new
Just cs -> cs <$ writeTVar (CS.updatedAt cs) now
mkMessage :: C.MaxLenBS MaxMessageLen -> M Message
mkMessage body = do
msgId <- randomId =<< asks (msgIdBytes . config)

View File

@@ -15,8 +15,10 @@ import qualified Data.IntMap.Strict as IM
import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.IntPSQ (IntPSQ)
import qualified Data.IntPSQ as IP
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime)
import Data.Time.Clock.System (SystemTime, systemToUTCTime)
import Data.X509.Validation (Fingerprint (..))
import Network.Socket (ServiceName)
@@ -31,7 +33,7 @@ import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..))
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.Stats
import Simplex.Messaging.Server.Stats.Client (ClientStats, newClientStats)
import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsId, newClientStats)
import Simplex.Messaging.Server.Stats.Timeline (Timeline, newTimeline, perMinute)
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
@@ -116,8 +118,11 @@ data Env = Env
storeLog :: Maybe (StoreLog 'WriteMode),
tlsServerParams :: T.ServerParams,
serverStats :: ServerStats,
qCreatedByIp :: Timeline,
msgSentByIp :: Timeline,
qCreatedByIp :: Timeline Int,
msgSentByIp :: Timeline Int,
clientStats :: TVar (IntMap ClientStats), -- transitive session stats
statsClients :: TVar (IntMap ClientStatsId), -- reverse index from active clients
sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their owners
sockets :: SocketState,
clientSeq :: TVar Int,
clients :: TVar (IntMap Client)
@@ -134,7 +139,7 @@ data Server = Server
data Client = Client
{ clientId :: Int,
peerId :: PeerId, -- send updates for this Id to time series
clientStats :: ClientStats, -- capture final values on disconnect
-- socketStats :: ClientStats, -- TODO: measure and export histogram on disconnect
subscriptions :: TMap RecipientId (TVar Sub),
ntfSubscriptions :: TMap NotifierId (),
rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)),
@@ -179,8 +184,7 @@ newClient peerId nextClientId qSize thVersion sessionId createdAt = do
connected <- newTVar True
rcvActiveAt <- newTVar createdAt
sndActiveAt <- newTVar createdAt
clientStats <- newClientStats newTVar (systemToUTCTime createdAt)
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, peerId, clientStats}
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, peerId}
newSubscription :: SubscriptionThread -> STM Sub
newSubscription subThread = do
@@ -204,7 +208,10 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile,
now <- getPOSIXTime
qCreatedByIp <- atomically $ newTimeline perMinute now
msgSentByIp <- atomically $ newTimeline perMinute now
return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, qCreatedByIp, msgSentByIp, sockets, clientSeq, clients}
clientStats <- newTVarIO mempty
statsClients <- newTVarIO mempty
sendSignedClients <- newTVarIO mempty
return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, qCreatedByIp, msgSentByIp, clientStats, statsClients, sendSignedClients, sockets, clientSeq, clients}
where
restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode)
restoreQueues QueueStore {queues, senders, notifiers} f = do

View File

@@ -15,6 +15,8 @@ import Data.IntMap (IntMap)
import qualified Data.IntMap.Strict as IM
import Data.IntPSQ (IntPSQ)
import qualified Data.IntPSQ as IP
import Data.IntSet (IntSet)
import qualified Data.IntSet as IS
import Data.Monoid (getSum)
import Data.Set (Set)
import qualified Data.Set as S
@@ -28,13 +30,16 @@ import Simplex.Messaging.Protocol (RecipientId)
import Simplex.Messaging.Transport (PeerId)
import UnliftIO.STM
-- | Ephemeral client ID across reconnects
type ClientStatsId = Int
data ClientStats = ClientStats
{ peerAddresses :: TVar (Set PeerId),
{ peerAddresses :: TVar IntSet, -- cumulative set of used PeerIds
socketCount :: TVar Int,
createdAt :: TVar UTCTime,
updatedAt :: TVar UTCTime,
qCreated :: TVar (Set RecipientId),
qSentSigned :: TVar (Set RecipientId),
qCreated :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs
qSentSigned :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs
msgSentSigned :: TVar Int,
msgSentUnsigned :: TVar Int,
msgSentViaProxy :: TVar Int,
@@ -43,7 +48,7 @@ data ClientStats = ClientStats
-- may be combined with session duration to produce average rates (q/s, msg/s)
data ClientStatsData = ClientStatsData
{ _peerAddresses :: Set PeerId,
{ _peerAddresses :: IntSet,
_socketCount :: Int,
_createdAt :: UTCTime,
_updatedAt :: UTCTime,
@@ -55,9 +60,9 @@ data ClientStatsData = ClientStatsData
_msgDeliveredSigned :: Int
}
newClientStats :: Monad m => (forall a. a -> m (TVar a)) -> UTCTime -> m ClientStats
newClientStats newF ts = do
peerAddresses <- newF mempty
newClientStats :: Monad m => (forall a. a -> m (TVar a)) -> PeerId -> UTCTime -> m ClientStats
newClientStats newF peerId ts = do
peerAddresses <- newF $ IS.singleton peerId
socketCount <- newF 0
createdAt <- newF ts
updatedAt <- newF ts

View File

@@ -26,12 +26,13 @@ import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (RecipientId)
import UnliftIO.STM
type Timeline = (TVar SparseSeries, Current)
-- A time series of counters with an active head
type Timeline a = (TVar SparseSeries, Current a)
newTimeline :: QuantFun -> POSIXTime -> STM Timeline
newTimeline :: forall a. QuantFun -> POSIXTime -> STM (Timeline a)
newTimeline quantF now = (,current) <$> newTVar IP.empty
where
current :: Current
current :: Current a
current = (quantF, quantF now, mempty)
-- Sparse timeseries with 1 second resolution (or more coarse):
@@ -47,7 +48,7 @@ type BucketId = Word32
type QuantFun = POSIXTime -> BucketId
-- Current bucket that gets filled
type Current = (QuantFun, BucketId, IntMap (TVar Int))
type Current a = (QuantFun, BucketId, IntMap (TVar a))
perSecond :: POSIXTime -> BucketId
perSecond = truncate
@@ -58,7 +59,7 @@ perMinute = (60 `secondsWidth`)
secondsWidth :: NominalDiffTime -> POSIXTime -> BucketId
secondsWidth w t = truncate $ t / w
finishCurrent :: POSIXTime -> Timeline -> STM Timeline
finishCurrent :: POSIXTime -> Timeline a -> STM (Timeline a)
finishCurrent now (series, current) = error "TODO: read/reset current, push into series, evict minimal when it falls out of scope"
type WindowData = IntMap Int -- PeerId -> counter