mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-07-02 11:22:07 +00:00
f0b7a4be73
* smp server: messaging services (#1565)
* smp server: refactor message delivery to always respond SOK to subscriptions
* refactor ntf subscribe
* cancel subscription thread and reduce service subscription count when queue is deleted
* subscribe rcv service, deliver sent messages to subscribed service
* subscribe rcv service to messages (TODO delivery on subscription)
* WIP
* efficient initial delivery of messages to subscribed service
* test: delivery to client with service certificate
* test: upgrade/downgrade to/from service subscriptions
* remove service association from agent API, add per-user flag to use the service
* agent client (WIP)
* service certificates in the client
* rfc about drift detection, and SALL to mark end of message delivery
* fix test
* fix test
* add function for postgresql message storage
* update migration
* servers: maintain xor-hash of all associated queue IDs in PostgreSQL (#1668)
* servers: maintain xor-hash of all associated queue IDs in PostgreSQL (#1615)
* ntf server: maintain xor-hash of all associated queue IDs via PostgreSQL triggers
* smp server: xor hash with triggers
* fix sql and using pgcrypto extension in tests
* track counts and hashes in smp/ntf servers via triggers, smp server stats for service subscription, update SMP protocol to pass expected count and hash in SSUB/NSSUB commands
* agent migrations with functions/triggers
* remove agent triggers
* try tracking service subs in the agent (WIP, does not compile)
* Revert "try tracking service subs in the agent (WIP, does not compile)"
This reverts commit 59e908100d.
* comment
* agent database triggers
* service subscriptions in the client
* test / fix client services
* update schema
* fix postgres migration
* update schema
* move schema test to the end
* use static function with SQLite to avoid dynamic wrapper
* agent: fail when per-connection transport isolation is used with services (#1670)
* agent: service subscription events (#1671)
* agent: use server keyhash when loading service record
* agent: process queue/service associations with delayed subscription results
* agent: service subscription events
* agent: finalize initial service subscriptions, remove associations on service ID changes (#1672)
* agent: remove service/queue associations when service ID changes
* agent: check that service ID in NEW response matches session ID in transport session
* agent subscription WIP
* test
* comment
* enable tests
* update queries
* agent: option to add SQLite aggregates to DB connection (#1673)
* agent: add build_relations_vector function to sqlite
* update aggregate
* use static aggregate
* remove relations
---------
Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
* add test, treat BAD_SERVICE as temp error, only remove queue associations on service errors
* add packZipWith for backward compatibility with GHC 8.10.7
---------
Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
* servers: service stats and logging, allow services without option (removed), report errors during service message delivery, remove threads when service subscription ended (#1676)
* smp server: always allow services without option
* smp server: maintain IDs hash in session subscription states
* smp server: service message delivery error handling
* ntf server: log subscription count and hash differences
* smp server: remove delivery threads when service subscription ended/client disconnected
* agent: remove service queue association when service ID changed, process ENDS event, test migrating to/from service (#1677)
* agent: remove service queue association when service ID changed
* agent: process ENDS event
* agent: send service subscription error event
* agent: test migrating to/from service subscriptions, fixes
* agent: always remove service when disabled, fix service subscriptions
* ntf server: use different client certs for each SMP server, remove support for store log (#1681)
* ntf server: remove support for store log
* ntf server: use different client certificates for each SMP server
* smp protocol: fix encoding for SOKS/ENDS responses (#1683)
* agent: create user with option to enable client service (#1684)
* agent: create user with option to enable client service
* handle HTTP2 errors
* do not catch async exceptions
* agent: minor fixes
* docs: update protocol (#1705)
* docs: agent threat model
* update protocol docs
* update RFCs (#1730)
* update RFCs
* update
* update overview
* update terminology
* original language in threat model
---------
Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
* docs: fix minor issues in protocols
* docs: add e2e encrypted message wire encoding to PQDR spec
* docs: add missing encodings and other protocol corrections
* docs: move implemented rfcs
* smp: service fixes (#1737)
* smp: deliver service subscription to correct client
* tests: more resilient to concurrency
* optimize PostgreSQL query
* fix service re-association after server "downgrade"
* correctly handle service removed from server (and ID changed)
* remove unused
---------
Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
* prometheus: fix metrics names (#1747)
* test: rcv service re-association on restart (#1746)
* agent: correct log message
* docs: update whitepaper
* smp: fix messaging client service issues (#1751)
* services: fix minor issues
* fix accounting for subscribed service queues, add prometheus stats
* fix uncorrelated subquery
* fix potential race condition when inserting service defensively, as it is also prevented by how client is created
---------
Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
* agent: refactor cleanup if no pending subs (#1757)
* smp server: batch processing of subscription messages (#1753)
* smp server: batch processing of subscription messages
* refactor
* empty line
* fix
---------
Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
* smp: batch queue association updates on subscriptions (#1760)
* smp: batch queue association updates on subscriptions
* refactor to fused batching
* simpler
* batch assoc functions
* clean up
* fix
---------
Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
* agent: use primary key index in setRcvServiceAssocs (#1783)
* agent: use primary key index in setRcvServiceAssocs
Previous WHERE rcv_id = ? did not match the (host, port, rcv_id)
primary key prefix and fell back to a table scan via
idx_rcv_queues_client_notice_id. With ~390k rows per queue, each
update in a 1350-row batch scanned the whole table, yielding ~290s
per batch and a multi-hour rcv-services migration.
* agent: pass SMPServer explicitly to setRcvServiceAssocs
Avoid extracting host/port from the first queue inside setRcvServiceAssocs.
The caller already has SMPServer in scope (from tSess) and the call chain
is short, so threading it through is simpler than inspecting the list.
Removes the empty-list guard from setRcvServiceAssocs (it remains in
processRcvServiceAssocs).
---------
Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
Co-authored-by: sh <37271604+shumvgolove@users.noreply.github.com>
1115 lines
36 KiB
Haskell
1115 lines
36 KiB
Haskell
{-# LANGUAGE LambdaCase #-}
|
|
{-# LANGUAGE NamedFieldPuns #-}
|
|
{-# LANGUAGE OverloadedStrings #-}
|
|
{-# LANGUAGE PatternSynonyms #-}
|
|
{-# LANGUAGE ScopedTypeVariables #-}
|
|
{-# LANGUAGE TupleSections #-}
|
|
{-# LANGUAGE TypeApplications #-}
|
|
|
|
module Simplex.Messaging.Server.Stats
|
|
( ServerStats (..),
|
|
ServerStatsData (..),
|
|
PeriodStats (..),
|
|
PeriodStatsData (..),
|
|
PeriodStatCounts (..),
|
|
ProxyStats (..),
|
|
ProxyStatsData (..),
|
|
ServiceStats (..),
|
|
ServiceStatsData (..),
|
|
TimeBuckets (..),
|
|
newServerStats,
|
|
getServerStatsData,
|
|
setServerStats,
|
|
newPeriodStats,
|
|
newPeriodStatsData,
|
|
getPeriodStatsData,
|
|
setPeriodStats,
|
|
periodStatDataCounts,
|
|
periodStatCounts,
|
|
updatePeriodStats,
|
|
newProxyStats,
|
|
newProxyStatsData,
|
|
getProxyStatsData,
|
|
getResetProxyStatsData,
|
|
setProxyStats,
|
|
newServiceStatsData,
|
|
newServiceStats,
|
|
getServiceStatsData,
|
|
getResetServiceStatsData,
|
|
setServiceStats,
|
|
emptyTimeBuckets,
|
|
updateTimeBuckets,
|
|
) where
|
|
|
|
import Control.Applicative (optional, (<|>))
|
|
import qualified Data.Attoparsec.ByteString.Char8 as A
|
|
import Data.ByteString.Char8 (ByteString)
|
|
import qualified Data.ByteString.Char8 as B
|
|
import Data.Hashable (hash)
|
|
import Data.IORef
|
|
import Data.Int (Int64)
|
|
import qualified Data.IntMap.Strict as IM
|
|
import Data.IntSet (IntSet)
|
|
import qualified Data.IntSet as IS
|
|
import Data.Set (Set)
|
|
import qualified Data.Set as S
|
|
import Data.Text (Text)
|
|
import Data.Time.Calendar.Month (pattern MonthDay)
|
|
import Data.Time.Calendar.OrdinalDate (mondayStartWeek)
|
|
import Data.Time.Clock (UTCTime (..))
|
|
import GHC.IORef (atomicSwapIORef)
|
|
import Simplex.Messaging.Encoding.String
|
|
import Simplex.Messaging.Protocol (EntityId (..))
|
|
import Simplex.Messaging.SystemTime
|
|
import Simplex.Messaging.Util (atomicModifyIORef'_, tshow, unlessM)
|
|
|
|
data ServerStats = ServerStats
|
|
{ fromTime :: IORef UTCTime,
|
|
qCreated :: IORef Int,
|
|
qSecured :: IORef Int,
|
|
qDeletedAll :: IORef Int,
|
|
qDeletedAllB :: IORef Int,
|
|
qDeletedNew :: IORef Int,
|
|
qDeletedSecured :: IORef Int,
|
|
qBlocked :: IORef Int,
|
|
qSub :: IORef Int, -- only includes subscriptions when there were pending messages
|
|
-- qSubNoMsg :: IORef Int, -- this stat creates too many STM transactions
|
|
qSubAllB :: IORef Int, -- count of all subscription batches (with and without pending messages)
|
|
qSubAuth :: IORef Int,
|
|
qSubDuplicate :: IORef Int,
|
|
qSubProhibited :: IORef Int,
|
|
qSubEnd :: IORef Int,
|
|
qSubEndB :: IORef Int,
|
|
ntfCreated :: IORef Int,
|
|
ntfNewCreated :: IORef Int, -- credentials created at the time of queue creation
|
|
ntfDeleted :: IORef Int,
|
|
ntfDeletedB :: IORef Int,
|
|
ntfSub :: IORef Int,
|
|
ntfSubB :: IORef Int,
|
|
ntfSubAuth :: IORef Int,
|
|
ntfSubDuplicate :: IORef Int,
|
|
msgSent :: IORef Int,
|
|
msgSentAuth :: IORef Int,
|
|
msgSentQuota :: IORef Int,
|
|
msgSentLarge :: IORef Int,
|
|
msgSentBlock :: IORef Int,
|
|
msgRecv :: IORef Int,
|
|
msgRecvAckTimes :: IORef TimeBuckets,
|
|
msgRecvGet :: IORef Int,
|
|
msgGet :: IORef Int,
|
|
msgGetNoMsg :: IORef Int,
|
|
msgGetAuth :: IORef Int,
|
|
msgGetDuplicate :: IORef Int,
|
|
msgGetProhibited :: IORef Int,
|
|
msgExpired :: IORef Int,
|
|
activeQueues :: PeriodStats,
|
|
-- subscribedQueues :: PeriodStats, -- this stat uses too much memory
|
|
msgSentNtf :: IORef Int, -- sent messages with NTF flag
|
|
msgRecvNtf :: IORef Int, -- received messages with NTF flag
|
|
activeQueuesNtf :: PeriodStats,
|
|
msgNtfs :: IORef Int, -- messages notications delivered to NTF server (<= msgSentNtf)
|
|
msgNtfsB :: IORef Int, -- messages notication batches delivered to NTF server
|
|
msgNtfNoSub :: IORef Int, -- no subscriber to notifications (e.g., NTF server not connected)
|
|
msgNtfLost :: IORef Int, -- notification is lost because NTF delivery queue is full
|
|
msgNtfExpired :: IORef Int, -- expired
|
|
pRelays :: ProxyStats,
|
|
pRelaysOwn :: ProxyStats,
|
|
pMsgFwds :: ProxyStats,
|
|
pMsgFwdsOwn :: ProxyStats,
|
|
pMsgFwdsRecv :: IORef Int,
|
|
rcvServices :: ServiceStats,
|
|
ntfServices :: ServiceStats,
|
|
rcvServicesSubMsg :: IORef Int,
|
|
rcvServicesSubDuplicate :: IORef Int,
|
|
qCount :: IORef Int,
|
|
msgCount :: IORef Int,
|
|
ntfCount :: IORef Int
|
|
}
|
|
|
|
data ServerStatsData = ServerStatsData
|
|
{ _fromTime :: UTCTime,
|
|
_qCreated :: Int,
|
|
_qSecured :: Int,
|
|
_qDeletedAll :: Int,
|
|
_qDeletedAllB :: Int,
|
|
_qDeletedNew :: Int,
|
|
_qDeletedSecured :: Int,
|
|
_qBlocked :: Int,
|
|
_qSub :: Int,
|
|
_qSubAllB :: Int,
|
|
_qSubAuth :: Int,
|
|
_qSubDuplicate :: Int,
|
|
_qSubProhibited :: Int,
|
|
_qSubEnd :: Int,
|
|
_qSubEndB :: Int,
|
|
_ntfCreated :: Int,
|
|
_ntfNewCreated :: Int,
|
|
_ntfDeleted :: Int,
|
|
_ntfDeletedB :: Int,
|
|
_ntfSub :: Int,
|
|
_ntfSubB :: Int,
|
|
_ntfSubAuth :: Int,
|
|
_ntfSubDuplicate :: Int,
|
|
_msgSent :: Int,
|
|
_msgSentAuth :: Int,
|
|
_msgSentQuota :: Int,
|
|
_msgSentLarge :: Int,
|
|
_msgSentBlock :: Int,
|
|
_msgRecv :: Int,
|
|
_msgRecvAckTimes :: TimeBuckets,
|
|
_msgRecvGet :: Int,
|
|
_msgGet :: Int,
|
|
_msgGetNoMsg :: Int,
|
|
_msgGetAuth :: Int,
|
|
_msgGetDuplicate :: Int,
|
|
_msgGetProhibited :: Int,
|
|
_msgExpired :: Int,
|
|
_activeQueues :: PeriodStatsData,
|
|
_msgSentNtf :: Int,
|
|
_msgRecvNtf :: Int,
|
|
_activeQueuesNtf :: PeriodStatsData,
|
|
_msgNtfs :: Int,
|
|
_msgNtfsB :: Int,
|
|
_msgNtfNoSub :: Int,
|
|
_msgNtfLost :: Int,
|
|
_msgNtfExpired :: Int,
|
|
_pRelays :: ProxyStatsData,
|
|
_pRelaysOwn :: ProxyStatsData,
|
|
_pMsgFwds :: ProxyStatsData,
|
|
_pMsgFwdsOwn :: ProxyStatsData,
|
|
_pMsgFwdsRecv :: Int,
|
|
_ntfServices :: ServiceStatsData,
|
|
_rcvServices :: ServiceStatsData,
|
|
_rcvServicesSubMsg :: Int,
|
|
_rcvServicesSubDuplicate :: Int,
|
|
_qCount :: Int,
|
|
_msgCount :: Int,
|
|
_ntfCount :: Int
|
|
}
|
|
deriving (Show)
|
|
|
|
newServerStats :: UTCTime -> IO ServerStats
|
|
newServerStats ts = do
|
|
fromTime <- newIORef ts
|
|
qCreated <- newIORef 0
|
|
qSecured <- newIORef 0
|
|
qDeletedAll <- newIORef 0
|
|
qDeletedAllB <- newIORef 0
|
|
qDeletedNew <- newIORef 0
|
|
qDeletedSecured <- newIORef 0
|
|
qBlocked <- newIORef 0
|
|
qSub <- newIORef 0
|
|
qSubAllB <- newIORef 0
|
|
qSubAuth <- newIORef 0
|
|
qSubDuplicate <- newIORef 0
|
|
qSubProhibited <- newIORef 0
|
|
qSubEnd <- newIORef 0
|
|
qSubEndB <- newIORef 0
|
|
ntfCreated <- newIORef 0
|
|
ntfNewCreated <- newIORef 0
|
|
ntfDeleted <- newIORef 0
|
|
ntfDeletedB <- newIORef 0
|
|
ntfSub <- newIORef 0
|
|
ntfSubB <- newIORef 0
|
|
ntfSubAuth <- newIORef 0
|
|
ntfSubDuplicate <- newIORef 0
|
|
msgSent <- newIORef 0
|
|
msgSentAuth <- newIORef 0
|
|
msgSentQuota <- newIORef 0
|
|
msgSentLarge <- newIORef 0
|
|
msgSentBlock <- newIORef 0
|
|
msgRecv <- newIORef 0
|
|
msgRecvAckTimes <- newIORef $ TimeBuckets 0 0 IM.empty
|
|
msgRecvGet <- newIORef 0
|
|
msgGet <- newIORef 0
|
|
msgGetNoMsg <- newIORef 0
|
|
msgGetAuth <- newIORef 0
|
|
msgGetDuplicate <- newIORef 0
|
|
msgGetProhibited <- newIORef 0
|
|
msgExpired <- newIORef 0
|
|
activeQueues <- newPeriodStats
|
|
msgSentNtf <- newIORef 0
|
|
msgRecvNtf <- newIORef 0
|
|
activeQueuesNtf <- newPeriodStats
|
|
msgNtfs <- newIORef 0
|
|
msgNtfsB <- newIORef 0
|
|
msgNtfNoSub <- newIORef 0
|
|
msgNtfLost <- newIORef 0
|
|
msgNtfExpired <- newIORef 0
|
|
pRelays <- newProxyStats
|
|
pRelaysOwn <- newProxyStats
|
|
pMsgFwds <- newProxyStats
|
|
pMsgFwdsOwn <- newProxyStats
|
|
pMsgFwdsRecv <- newIORef 0
|
|
rcvServices <- newServiceStats
|
|
ntfServices <- newServiceStats
|
|
rcvServicesSubMsg <- newIORef 0
|
|
rcvServicesSubDuplicate <- newIORef 0
|
|
qCount <- newIORef 0
|
|
msgCount <- newIORef 0
|
|
ntfCount <- newIORef 0
|
|
pure
|
|
ServerStats
|
|
{ fromTime,
|
|
qCreated,
|
|
qSecured,
|
|
qDeletedAll,
|
|
qDeletedAllB,
|
|
qDeletedNew,
|
|
qDeletedSecured,
|
|
qBlocked,
|
|
qSub,
|
|
qSubAllB,
|
|
qSubAuth,
|
|
qSubDuplicate,
|
|
qSubProhibited,
|
|
qSubEnd,
|
|
qSubEndB,
|
|
ntfCreated,
|
|
ntfNewCreated,
|
|
ntfDeleted,
|
|
ntfDeletedB,
|
|
ntfSub,
|
|
ntfSubB,
|
|
ntfSubAuth,
|
|
ntfSubDuplicate,
|
|
msgSent,
|
|
msgSentAuth,
|
|
msgSentQuota,
|
|
msgSentLarge,
|
|
msgSentBlock,
|
|
msgRecv,
|
|
msgRecvAckTimes,
|
|
msgRecvGet,
|
|
msgGet,
|
|
msgGetNoMsg,
|
|
msgGetAuth,
|
|
msgGetDuplicate,
|
|
msgGetProhibited,
|
|
msgExpired,
|
|
activeQueues,
|
|
msgSentNtf,
|
|
msgRecvNtf,
|
|
activeQueuesNtf,
|
|
msgNtfs,
|
|
msgNtfsB,
|
|
msgNtfNoSub,
|
|
msgNtfLost,
|
|
msgNtfExpired,
|
|
pRelays,
|
|
pRelaysOwn,
|
|
pMsgFwds,
|
|
pMsgFwdsOwn,
|
|
pMsgFwdsRecv,
|
|
rcvServices,
|
|
ntfServices,
|
|
rcvServicesSubMsg,
|
|
rcvServicesSubDuplicate,
|
|
qCount,
|
|
msgCount,
|
|
ntfCount
|
|
}
|
|
|
|
getServerStatsData :: ServerStats -> IO ServerStatsData
|
|
getServerStatsData s = do
|
|
_fromTime <- readIORef $ fromTime s
|
|
_qCreated <- readIORef $ qCreated s
|
|
_qSecured <- readIORef $ qSecured s
|
|
_qDeletedAll <- readIORef $ qDeletedAll s
|
|
_qDeletedAllB <- readIORef $ qDeletedAllB s
|
|
_qDeletedNew <- readIORef $ qDeletedNew s
|
|
_qDeletedSecured <- readIORef $ qDeletedSecured s
|
|
_qBlocked <- readIORef $ qBlocked s
|
|
_qSub <- readIORef $ qSub s
|
|
_qSubAllB <- readIORef $ qSubAllB s
|
|
_qSubAuth <- readIORef $ qSubAuth s
|
|
_qSubDuplicate <- readIORef $ qSubDuplicate s
|
|
_qSubProhibited <- readIORef $ qSubProhibited s
|
|
_qSubEnd <- readIORef $ qSubEnd s
|
|
_qSubEndB <- readIORef $ qSubEndB s
|
|
_ntfCreated <- readIORef $ ntfCreated s
|
|
_ntfNewCreated <- readIORef $ ntfNewCreated s
|
|
_ntfDeleted <- readIORef $ ntfDeleted s
|
|
_ntfDeletedB <- readIORef $ ntfDeletedB s
|
|
_ntfSub <- readIORef $ ntfSub s
|
|
_ntfSubB <- readIORef $ ntfSubB s
|
|
_ntfSubAuth <- readIORef $ ntfSubAuth s
|
|
_ntfSubDuplicate <- readIORef $ ntfSubDuplicate s
|
|
_msgSent <- readIORef $ msgSent s
|
|
_msgSentAuth <- readIORef $ msgSentAuth s
|
|
_msgSentQuota <- readIORef $ msgSentQuota s
|
|
_msgSentLarge <- readIORef $ msgSentLarge s
|
|
_msgSentBlock <- readIORef $ msgSentBlock s
|
|
_msgRecv <- readIORef $ msgRecv s
|
|
_msgRecvAckTimes <- readIORef $ msgRecvAckTimes s
|
|
_msgRecvGet <- readIORef $ msgRecvGet s
|
|
_msgGet <- readIORef $ msgGet s
|
|
_msgGetNoMsg <- readIORef $ msgGetNoMsg s
|
|
_msgGetAuth <- readIORef $ msgGetAuth s
|
|
_msgGetDuplicate <- readIORef $ msgGetDuplicate s
|
|
_msgGetProhibited <- readIORef $ msgGetProhibited s
|
|
_msgExpired <- readIORef $ msgExpired s
|
|
_activeQueues <- getPeriodStatsData $ activeQueues s
|
|
_msgSentNtf <- readIORef $ msgSentNtf s
|
|
_msgRecvNtf <- readIORef $ msgRecvNtf s
|
|
_activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s
|
|
_msgNtfs <- readIORef $ msgNtfs s
|
|
_msgNtfsB <- readIORef $ msgNtfsB s
|
|
_msgNtfNoSub <- readIORef $ msgNtfNoSub s
|
|
_msgNtfLost <- readIORef $ msgNtfLost s
|
|
_msgNtfExpired <- readIORef $ msgNtfExpired s
|
|
_pRelays <- getProxyStatsData $ pRelays s
|
|
_pRelaysOwn <- getProxyStatsData $ pRelaysOwn s
|
|
_pMsgFwds <- getProxyStatsData $ pMsgFwds s
|
|
_pMsgFwdsOwn <- getProxyStatsData $ pMsgFwdsOwn s
|
|
_pMsgFwdsRecv <- readIORef $ pMsgFwdsRecv s
|
|
_rcvServices <- getServiceStatsData $ rcvServices s
|
|
_ntfServices <- getServiceStatsData $ ntfServices s
|
|
_rcvServicesSubMsg <- readIORef $ rcvServicesSubMsg s
|
|
_rcvServicesSubDuplicate <- readIORef $ rcvServicesSubDuplicate s
|
|
_qCount <- readIORef $ qCount s
|
|
_msgCount <- readIORef $ msgCount s
|
|
_ntfCount <- readIORef $ ntfCount s
|
|
pure
|
|
ServerStatsData
|
|
{ _fromTime,
|
|
_qCreated,
|
|
_qSecured,
|
|
_qDeletedAll,
|
|
_qDeletedAllB,
|
|
_qDeletedNew,
|
|
_qDeletedSecured,
|
|
_qBlocked,
|
|
_qSub,
|
|
_qSubAllB,
|
|
_qSubAuth,
|
|
_qSubDuplicate,
|
|
_qSubProhibited,
|
|
_qSubEnd,
|
|
_qSubEndB,
|
|
_ntfCreated,
|
|
_ntfNewCreated,
|
|
_ntfDeleted,
|
|
_ntfDeletedB,
|
|
_ntfSub,
|
|
_ntfSubB,
|
|
_ntfSubAuth,
|
|
_ntfSubDuplicate,
|
|
_msgSent,
|
|
_msgSentAuth,
|
|
_msgSentQuota,
|
|
_msgSentLarge,
|
|
_msgSentBlock,
|
|
_msgRecv,
|
|
_msgRecvAckTimes,
|
|
_msgRecvGet,
|
|
_msgGet,
|
|
_msgGetNoMsg,
|
|
_msgGetAuth,
|
|
_msgGetDuplicate,
|
|
_msgGetProhibited,
|
|
_msgExpired,
|
|
_activeQueues,
|
|
_msgSentNtf,
|
|
_msgRecvNtf,
|
|
_activeQueuesNtf,
|
|
_msgNtfs,
|
|
_msgNtfsB,
|
|
_msgNtfNoSub,
|
|
_msgNtfLost,
|
|
_msgNtfExpired,
|
|
_pRelays,
|
|
_pRelaysOwn,
|
|
_pMsgFwds,
|
|
_pMsgFwdsOwn,
|
|
_pMsgFwdsRecv,
|
|
_rcvServices,
|
|
_ntfServices,
|
|
_rcvServicesSubMsg,
|
|
_rcvServicesSubDuplicate,
|
|
_qCount,
|
|
_msgCount,
|
|
_ntfCount
|
|
}
|
|
|
|
-- this function is not thread safe, it is used on server start only
|
|
setServerStats :: ServerStats -> ServerStatsData -> IO ()
|
|
setServerStats s d = do
|
|
writeIORef (fromTime s) $! _fromTime d
|
|
writeIORef (qCreated s) $! _qCreated d
|
|
writeIORef (qSecured s) $! _qSecured d
|
|
writeIORef (qDeletedAll s) $! _qDeletedAll d
|
|
writeIORef (qDeletedAllB s) $! _qDeletedAllB d
|
|
writeIORef (qDeletedNew s) $! _qDeletedNew d
|
|
writeIORef (qDeletedSecured s) $! _qDeletedSecured d
|
|
writeIORef (qBlocked s) $! _qBlocked d
|
|
writeIORef (qSub s) $! _qSub d
|
|
writeIORef (qSubAllB s) $! _qSubAllB d
|
|
writeIORef (qSubAuth s) $! _qSubAuth d
|
|
writeIORef (qSubDuplicate s) $! _qSubDuplicate d
|
|
writeIORef (qSubProhibited s) $! _qSubProhibited d
|
|
writeIORef (qSubEnd s) $! _qSubEnd d
|
|
writeIORef (qSubEndB s) $! _qSubEndB d
|
|
writeIORef (ntfCreated s) $! _ntfCreated d
|
|
writeIORef (ntfNewCreated s) $! _ntfNewCreated d
|
|
writeIORef (ntfDeleted s) $! _ntfDeleted d
|
|
writeIORef (ntfDeletedB s) $! _ntfDeletedB d
|
|
writeIORef (ntfSub s) $! _ntfSub d
|
|
writeIORef (ntfSubB s) $! _ntfSubB d
|
|
writeIORef (ntfSubAuth s) $! _ntfSubAuth d
|
|
writeIORef (ntfSubDuplicate s) $! _ntfSubDuplicate d
|
|
writeIORef (msgSent s) $! _msgSent d
|
|
writeIORef (msgSentAuth s) $! _msgSentAuth d
|
|
writeIORef (msgSentQuota s) $! _msgSentQuota d
|
|
writeIORef (msgSentLarge s) $! _msgSentLarge d
|
|
writeIORef (msgSentBlock s) $! _msgSentBlock d
|
|
writeIORef (msgRecv s) $! _msgRecv d
|
|
writeIORef (msgRecvAckTimes s) $! _msgRecvAckTimes d
|
|
writeIORef (msgRecvGet s) $! _msgRecvGet d
|
|
writeIORef (msgGet s) $! _msgGet d
|
|
writeIORef (msgGetNoMsg s) $! _msgGetNoMsg d
|
|
writeIORef (msgGetAuth s) $! _msgGetAuth d
|
|
writeIORef (msgGetDuplicate s) $! _msgGetDuplicate d
|
|
writeIORef (msgGetProhibited s) $! _msgGetProhibited d
|
|
writeIORef (msgExpired s) $! _msgExpired d
|
|
setPeriodStats (activeQueues s) (_activeQueues d)
|
|
writeIORef (msgSentNtf s) $! _msgSentNtf d
|
|
writeIORef (msgRecvNtf s) $! _msgRecvNtf d
|
|
setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d)
|
|
writeIORef (msgNtfs s) $! _msgNtfs d
|
|
writeIORef (msgNtfsB s) $! _msgNtfsB d
|
|
writeIORef (msgNtfNoSub s) $! _msgNtfNoSub d
|
|
writeIORef (msgNtfLost s) $! _msgNtfLost d
|
|
writeIORef (msgNtfExpired s) $! _msgNtfExpired d
|
|
setProxyStats (pRelays s) $! _pRelays d
|
|
setProxyStats (pRelaysOwn s) $! _pRelaysOwn d
|
|
setProxyStats (pMsgFwds s) $! _pMsgFwds d
|
|
setProxyStats (pMsgFwdsOwn s) $! _pMsgFwdsOwn d
|
|
writeIORef (pMsgFwdsRecv s) $! _pMsgFwdsRecv d
|
|
setServiceStats (rcvServices s) $! _rcvServices d
|
|
setServiceStats (ntfServices s) $! _ntfServices d
|
|
writeIORef (rcvServicesSubMsg s) $! _rcvServicesSubMsg d
|
|
writeIORef (rcvServicesSubDuplicate s) $! _rcvServicesSubDuplicate d
|
|
writeIORef (qCount s) $! _qCount d
|
|
writeIORef (msgCount s) $! _msgCount d
|
|
writeIORef (ntfCount s) $! _ntfCount d
|
|
|
|
instance StrEncoding ServerStatsData where
|
|
strEncode d =
|
|
B.unlines
|
|
[ "fromTime=" <> strEncode (_fromTime d),
|
|
"qCreated=" <> strEncode (_qCreated d),
|
|
"qSecured=" <> strEncode (_qSecured d),
|
|
"qDeletedAll=" <> strEncode (_qDeletedAll d),
|
|
"qDeletedNew=" <> strEncode (_qDeletedNew d),
|
|
"qDeletedSecured=" <> strEncode (_qDeletedSecured d),
|
|
"qDeletedAllB=" <> strEncode (_qDeletedAllB d),
|
|
"qBlocked=" <> strEncode (_qBlocked d),
|
|
"qCount=" <> strEncode (_qCount d),
|
|
"qSub=" <> strEncode (_qSub d),
|
|
"qSubAllB=" <> strEncode (_qSubAllB d),
|
|
"qSubAuth=" <> strEncode (_qSubAuth d),
|
|
"qSubDuplicate=" <> strEncode (_qSubDuplicate d),
|
|
"qSubProhibited=" <> strEncode (_qSubProhibited d),
|
|
"qSubEnd=" <> strEncode (_qSubEnd d),
|
|
"qSubEndB=" <> strEncode (_qSubEndB d),
|
|
"ntfCreated=" <> strEncode (_ntfCreated d),
|
|
"ntfNewCreated=" <> strEncode (_ntfNewCreated d),
|
|
"ntfDeleted=" <> strEncode (_ntfDeleted d),
|
|
"ntfDeletedB=" <> strEncode (_ntfDeletedB d),
|
|
"ntfSub=" <> strEncode (_ntfSub d),
|
|
"ntfSubB=" <> strEncode (_ntfSubB d),
|
|
"ntfSubAuth=" <> strEncode (_ntfSubAuth d),
|
|
"ntfSubDuplicate=" <> strEncode (_ntfSubDuplicate d),
|
|
"msgSent=" <> strEncode (_msgSent d),
|
|
"msgSentAuth=" <> strEncode (_msgSentAuth d),
|
|
"msgSentQuota=" <> strEncode (_msgSentQuota d),
|
|
"msgSentLarge=" <> strEncode (_msgSentLarge d),
|
|
"msgSentBlock=" <> strEncode (_msgSentBlock d),
|
|
"msgRecv=" <> strEncode (_msgRecv d),
|
|
"msgRecvGet=" <> strEncode (_msgRecvGet d),
|
|
"msgGet=" <> strEncode (_msgGet d),
|
|
"msgGetNoMsg=" <> strEncode (_msgGetNoMsg d),
|
|
"msgGetAuth=" <> strEncode (_msgGetAuth d),
|
|
"msgGetDuplicate=" <> strEncode (_msgGetDuplicate d),
|
|
"msgGetProhibited=" <> strEncode (_msgGetProhibited d),
|
|
"msgExpired=" <> strEncode (_msgExpired d),
|
|
"msgSentNtf=" <> strEncode (_msgSentNtf d),
|
|
"msgRecvNtf=" <> strEncode (_msgRecvNtf d),
|
|
"msgNtfs=" <> strEncode (_msgNtfs d),
|
|
"msgNtfsB=" <> strEncode (_msgNtfsB d),
|
|
"msgNtfNoSub=" <> strEncode (_msgNtfNoSub d),
|
|
"msgNtfLost=" <> strEncode (_msgNtfLost d),
|
|
"msgNtfExpired=" <> strEncode (_msgNtfExpired d),
|
|
"activeQueues:",
|
|
strEncode (_activeQueues d),
|
|
"activeQueuesNtf:",
|
|
strEncode (_activeQueuesNtf d),
|
|
"pRelays:",
|
|
strEncode (_pRelays d),
|
|
"pRelaysOwn:",
|
|
strEncode (_pRelaysOwn d),
|
|
"pMsgFwds:",
|
|
strEncode (_pMsgFwds d),
|
|
"pMsgFwdsOwn:",
|
|
strEncode (_pMsgFwdsOwn d),
|
|
"pMsgFwdsRecv=" <> strEncode (_pMsgFwdsRecv d),
|
|
"rcvServices:",
|
|
strEncode (_rcvServices d),
|
|
"ntfServices:",
|
|
strEncode (_ntfServices d)
|
|
]
|
|
strP = do
|
|
_fromTime <- "fromTime=" *> strP <* A.endOfLine
|
|
_qCreated <- "qCreated=" *> strP <* A.endOfLine
|
|
_qSecured <- "qSecured=" *> strP <* A.endOfLine
|
|
(_qDeletedAll, _qDeletedNew, _qDeletedSecured) <-
|
|
(,0,0) <$> ("qDeleted=" *> strP <* A.endOfLine)
|
|
<|> ((,,) <$> ("qDeletedAll=" *> strP <* A.endOfLine) <*> ("qDeletedNew=" *> strP <* A.endOfLine) <*> ("qDeletedSecured=" *> strP <* A.endOfLine))
|
|
_qDeletedAllB <- opt "qDeletedAllB="
|
|
_qBlocked <- opt "qBlocked="
|
|
_qCount <- opt "qCount="
|
|
_qSub <- opt "qSub="
|
|
_qSubNoMsg <- skipInt "qSubNoMsg=" -- skipping it for backward compatibility
|
|
_qSubAllB <- opt "qSubAllB="
|
|
_qSubAuth <- opt "qSubAuth="
|
|
_qSubDuplicate <- opt "qSubDuplicate="
|
|
_qSubProhibited <- opt "qSubProhibited="
|
|
_qSubEnd <- opt "qSubEnd="
|
|
_qSubEndB <- opt "qSubEndB="
|
|
_ntfCreated <- opt "ntfCreated="
|
|
_ntfNewCreated <- opt "ntfNewCreated="
|
|
_ntfDeleted <- opt "ntfDeleted="
|
|
_ntfDeletedB <- opt "ntfDeletedB="
|
|
_ntfSub <- opt "ntfSub="
|
|
_ntfSubB <- opt "ntfSubB="
|
|
_ntfSubAuth <- opt "ntfSubAuth="
|
|
_ntfSubDuplicate <- opt "ntfSubDuplicate="
|
|
_msgSent <- "msgSent=" *> strP <* A.endOfLine
|
|
_msgSentAuth <- opt "msgSentAuth="
|
|
_msgSentQuota <- opt "msgSentQuota="
|
|
_msgSentLarge <- opt "msgSentLarge="
|
|
_msgSentBlock <- opt "msgSentBlock="
|
|
_msgRecv <- "msgRecv=" *> strP <* A.endOfLine
|
|
_msgRecvGet <- opt "msgRecvGet="
|
|
_msgGet <- opt "msgGet="
|
|
_msgGetNoMsg <- opt "msgGetNoMsg="
|
|
_msgGetAuth <- opt "msgGetAuth="
|
|
_msgGetDuplicate <- opt "msgGetDuplicate="
|
|
_msgGetProhibited <- opt "msgGetProhibited="
|
|
_msgExpired <- opt "msgExpired="
|
|
_msgSentNtf <- opt "msgSentNtf="
|
|
_msgRecvNtf <- opt "msgRecvNtf="
|
|
_msgNtfs <- opt "msgNtfs="
|
|
_msgNtfsB <- opt "msgNtfsB="
|
|
_msgNtfNoSub <- opt "msgNtfNoSub="
|
|
_msgNtfLost <- opt "msgNtfLost="
|
|
_msgNtfExpired <- opt "msgNtfExpired="
|
|
_activeQueues <-
|
|
optional ("activeQueues:" <* A.endOfLine) >>= \case
|
|
Just _ -> strP <* optional A.endOfLine
|
|
_ -> do
|
|
_day <- "dayMsgQueues=" *> strP <* A.endOfLine
|
|
_week <- "weekMsgQueues=" *> strP <* A.endOfLine
|
|
_month <- "monthMsgQueues=" *> strP <* optional A.endOfLine
|
|
pure PeriodStatsData {_day, _week, _month}
|
|
_subscribedQueues <-
|
|
optional ("subscribedQueues:" <* A.endOfLine) >>= \case
|
|
Just _ -> newPeriodStatsData <$ (strP @PeriodStatsData <* optional A.endOfLine)
|
|
_ -> pure newPeriodStatsData
|
|
_activeQueuesNtf <-
|
|
optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case
|
|
Just _ -> strP <* optional A.endOfLine
|
|
_ -> pure newPeriodStatsData
|
|
_pRelays <- proxyStatsP "pRelays:"
|
|
_pRelaysOwn <- proxyStatsP "pRelaysOwn:"
|
|
_pMsgFwds <- proxyStatsP "pMsgFwds:"
|
|
_pMsgFwdsOwn <- proxyStatsP "pMsgFwdsOwn:"
|
|
_pMsgFwdsRecv <- opt "pMsgFwdsRecv="
|
|
_rcvServices <- serviceStatsP "rcvServices:"
|
|
_ntfServices <- serviceStatsP "ntfServices:"
|
|
pure
|
|
ServerStatsData
|
|
{ _fromTime,
|
|
_qCreated,
|
|
_qSecured,
|
|
_qDeletedAll,
|
|
_qDeletedAllB,
|
|
_qDeletedNew,
|
|
_qDeletedSecured,
|
|
_qBlocked,
|
|
_qSub,
|
|
_qSubAllB,
|
|
_qSubAuth,
|
|
_qSubDuplicate,
|
|
_qSubProhibited,
|
|
_qSubEnd,
|
|
_qSubEndB,
|
|
_ntfCreated,
|
|
_ntfNewCreated,
|
|
_ntfDeleted,
|
|
_ntfDeletedB,
|
|
_ntfSub,
|
|
_ntfSubB,
|
|
_ntfSubAuth,
|
|
_ntfSubDuplicate,
|
|
_msgSent,
|
|
_msgSentAuth,
|
|
_msgSentQuota,
|
|
_msgSentLarge,
|
|
_msgSentBlock,
|
|
_msgRecv,
|
|
_msgRecvAckTimes = emptyTimeBuckets,
|
|
_msgRecvGet,
|
|
_msgGet,
|
|
_msgGetNoMsg,
|
|
_msgGetAuth,
|
|
_msgGetDuplicate,
|
|
_msgGetProhibited,
|
|
_msgExpired,
|
|
_msgSentNtf,
|
|
_msgRecvNtf,
|
|
_msgNtfs,
|
|
_msgNtfsB,
|
|
_msgNtfNoSub,
|
|
_msgNtfLost,
|
|
_msgNtfExpired,
|
|
_activeQueues,
|
|
_activeQueuesNtf,
|
|
_pRelays,
|
|
_pRelaysOwn,
|
|
_pMsgFwds,
|
|
_pMsgFwdsOwn,
|
|
_pMsgFwdsRecv,
|
|
_rcvServices,
|
|
_ntfServices,
|
|
_rcvServicesSubMsg = 0,
|
|
_rcvServicesSubDuplicate = 0,
|
|
_qCount,
|
|
_msgCount = 0,
|
|
_ntfCount = 0
|
|
}
|
|
where
|
|
opt s = A.string s *> strP <* A.endOfLine <|> pure 0
|
|
skipInt s = (0 :: Int) <$ optional (A.string s *> strP @Int *> A.endOfLine)
|
|
proxyStatsP key =
|
|
optional (A.string key >> A.endOfLine) >>= \case
|
|
Just _ -> strP <* optional A.endOfLine
|
|
_ -> pure newProxyStatsData
|
|
serviceStatsP key =
|
|
optional (A.string key >> A.endOfLine) >>= \case
|
|
Just _ -> strP <* optional A.endOfLine
|
|
_ -> pure newServiceStatsData
|
|
|
|
data PeriodStats = PeriodStats
|
|
{ day :: IORef IntSet,
|
|
week :: IORef IntSet,
|
|
month :: IORef IntSet
|
|
}
|
|
|
|
newPeriodStats :: IO PeriodStats
|
|
newPeriodStats = do
|
|
day <- newIORef IS.empty
|
|
week <- newIORef IS.empty
|
|
month <- newIORef IS.empty
|
|
pure PeriodStats {day, week, month}
|
|
|
|
data PeriodStatsData = PeriodStatsData
|
|
{ _day :: IntSet,
|
|
_week :: IntSet,
|
|
_month :: IntSet
|
|
}
|
|
deriving (Show)
|
|
|
|
newPeriodStatsData :: PeriodStatsData
|
|
newPeriodStatsData = PeriodStatsData {_day = IS.empty, _week = IS.empty, _month = IS.empty}
|
|
|
|
getPeriodStatsData :: PeriodStats -> IO PeriodStatsData
|
|
getPeriodStatsData s = do
|
|
_day <- readIORef $ day s
|
|
_week <- readIORef $ week s
|
|
_month <- readIORef $ month s
|
|
pure PeriodStatsData {_day, _week, _month}
|
|
|
|
-- this function is not thread safe, it is used on server start only
|
|
setPeriodStats :: PeriodStats -> PeriodStatsData -> IO ()
|
|
setPeriodStats s d = do
|
|
writeIORef (day s) $! _day d
|
|
writeIORef (week s) $! _week d
|
|
writeIORef (month s) $! _month d
|
|
|
|
instance StrEncoding PeriodStatsData where
|
|
strEncode PeriodStatsData {_day, _week, _month} =
|
|
"dayHashes=" <> strEncode _day <> "\nweekHashes=" <> strEncode _week <> "\nmonthHashes=" <> strEncode _month
|
|
strP = do
|
|
_day <- ("day=" *> bsSetP <|> "dayHashes=" *> strP) <* A.endOfLine
|
|
_week <- ("week=" *> bsSetP <|> "weekHashes=" *> strP) <* A.endOfLine
|
|
_month <- "month=" *> bsSetP <|> "monthHashes=" *> strP
|
|
pure PeriodStatsData {_day, _week, _month}
|
|
where
|
|
bsSetP = S.foldl' (\s -> (`IS.insert` s) . hash) IS.empty <$> strP @(Set ByteString)
|
|
|
|
data PeriodStatCounts = PeriodStatCounts
|
|
{ dayCount :: Text,
|
|
weekCount :: Text,
|
|
monthCount :: Text
|
|
}
|
|
|
|
periodStatDataCounts :: PeriodStatsData -> PeriodStatCounts
|
|
periodStatDataCounts PeriodStatsData {_day, _week, _month} =
|
|
PeriodStatCounts
|
|
{ dayCount = tshow $ IS.size _day,
|
|
weekCount = tshow $ IS.size _week,
|
|
monthCount = tshow $ IS.size _month
|
|
}
|
|
|
|
periodStatCounts :: PeriodStats -> UTCTime -> IO PeriodStatCounts
|
|
periodStatCounts ps ts = do
|
|
let d = utctDay ts
|
|
(_, wDay) = mondayStartWeek d
|
|
MonthDay _ mDay = d
|
|
dayCount <- periodCount 1 $ day ps
|
|
weekCount <- periodCount wDay $ week ps
|
|
monthCount <- periodCount mDay $ month ps
|
|
pure PeriodStatCounts {dayCount, weekCount, monthCount}
|
|
where
|
|
periodCount :: Int -> IORef IntSet -> IO Text
|
|
periodCount 1 ref = tshow . IS.size <$> atomicSwapIORef ref IS.empty
|
|
periodCount _ _ = pure ""
|
|
|
|
updatePeriodStats :: PeriodStats -> EntityId -> IO ()
|
|
updatePeriodStats ps (EntityId pId) = do
|
|
updatePeriod $ day ps
|
|
updatePeriod $ week ps
|
|
updatePeriod $ month ps
|
|
where
|
|
ph = hash pId
|
|
updatePeriod ref = unlessM (IS.member ph <$> readIORef ref) $ atomicModifyIORef'_ ref $ IS.insert ph
|
|
|
|
data ProxyStats = ProxyStats
|
|
{ pRequests :: IORef Int,
|
|
pSuccesses :: IORef Int, -- includes destination server error responses that will be forwarded to the client
|
|
pErrorsConnect :: IORef Int,
|
|
pErrorsCompat :: IORef Int,
|
|
pErrorsOther :: IORef Int
|
|
}
|
|
|
|
newProxyStats :: IO ProxyStats
|
|
newProxyStats = do
|
|
pRequests <- newIORef 0
|
|
pSuccesses <- newIORef 0
|
|
pErrorsConnect <- newIORef 0
|
|
pErrorsCompat <- newIORef 0
|
|
pErrorsOther <- newIORef 0
|
|
pure ProxyStats {pRequests, pSuccesses, pErrorsConnect, pErrorsCompat, pErrorsOther}
|
|
|
|
data ProxyStatsData = ProxyStatsData
|
|
{ _pRequests :: Int,
|
|
_pSuccesses :: Int,
|
|
_pErrorsConnect :: Int,
|
|
_pErrorsCompat :: Int,
|
|
_pErrorsOther :: Int
|
|
}
|
|
deriving (Show)
|
|
|
|
newProxyStatsData :: ProxyStatsData
|
|
newProxyStatsData = ProxyStatsData {_pRequests = 0, _pSuccesses = 0, _pErrorsConnect = 0, _pErrorsCompat = 0, _pErrorsOther = 0}
|
|
|
|
getProxyStatsData :: ProxyStats -> IO ProxyStatsData
|
|
getProxyStatsData s = do
|
|
_pRequests <- readIORef $ pRequests s
|
|
_pSuccesses <- readIORef $ pSuccesses s
|
|
_pErrorsConnect <- readIORef $ pErrorsConnect s
|
|
_pErrorsCompat <- readIORef $ pErrorsCompat s
|
|
_pErrorsOther <- readIORef $ pErrorsOther s
|
|
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}
|
|
|
|
getResetProxyStatsData :: ProxyStats -> IO ProxyStatsData
|
|
getResetProxyStatsData s = do
|
|
_pRequests <- atomicSwapIORef (pRequests s) 0
|
|
_pSuccesses <- atomicSwapIORef (pSuccesses s) 0
|
|
_pErrorsConnect <- atomicSwapIORef (pErrorsConnect s) 0
|
|
_pErrorsCompat <- atomicSwapIORef (pErrorsCompat s) 0
|
|
_pErrorsOther <- atomicSwapIORef (pErrorsOther s) 0
|
|
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}
|
|
|
|
-- this function is not thread safe, it is used on server start only
|
|
setProxyStats :: ProxyStats -> ProxyStatsData -> IO ()
|
|
setProxyStats s d = do
|
|
writeIORef (pRequests s) $! _pRequests d
|
|
writeIORef (pSuccesses s) $! _pSuccesses d
|
|
writeIORef (pErrorsConnect s) $! _pErrorsConnect d
|
|
writeIORef (pErrorsCompat s) $! _pErrorsCompat d
|
|
writeIORef (pErrorsOther s) $! _pErrorsOther d
|
|
|
|
instance StrEncoding ProxyStatsData where
|
|
strEncode ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} =
|
|
"requests="
|
|
<> strEncode _pRequests
|
|
<> "\nsuccesses="
|
|
<> strEncode _pSuccesses
|
|
<> "\nerrorsConnect="
|
|
<> strEncode _pErrorsConnect
|
|
<> "\nerrorsCompat="
|
|
<> strEncode _pErrorsCompat
|
|
<> "\nerrorsOther="
|
|
<> strEncode _pErrorsOther
|
|
strP = do
|
|
_pRequests <- "requests=" *> strP <* A.endOfLine
|
|
_pSuccesses <- "successes=" *> strP <* A.endOfLine
|
|
_pErrorsConnect <- "errorsConnect=" *> strP <* A.endOfLine
|
|
_pErrorsCompat <- "errorsCompat=" *> strP <* A.endOfLine
|
|
_pErrorsOther <- "errorsOther=" *> strP
|
|
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}
|
|
|
|
data ServiceStats = ServiceStats
|
|
{ srvAssocNew :: IORef Int,
|
|
srvAssocDuplicate :: IORef Int,
|
|
srvAssocUpdated :: IORef Int,
|
|
srvAssocRemoved :: IORef Int,
|
|
srvSubCount :: IORef Int,
|
|
srvSubDuplicate :: IORef Int,
|
|
srvSubQueues :: IORef Int,
|
|
srvSubEnd :: IORef Int,
|
|
-- counts of subscriptions
|
|
srvSubOk :: IORef Int, -- server has the same queues as expected
|
|
srvSubMore :: IORef Int, -- server has more queues than expected
|
|
srvSubFewer :: IORef Int, -- server has fewer queues than expected
|
|
srvSubDiff :: IORef Int, -- server has the same count, but different queues than expected (based on xor-hash)
|
|
-- adds actual deviations
|
|
srvSubMoreTotal :: IORef Int, -- server has more queues than expected, adds diff
|
|
srvSubFewerTotal :: IORef Int
|
|
}
|
|
|
|
data ServiceStatsData = ServiceStatsData
|
|
{ _srvAssocNew :: Int,
|
|
_srvAssocDuplicate :: Int,
|
|
_srvAssocUpdated :: Int,
|
|
_srvAssocRemoved :: Int,
|
|
_srvSubCount :: Int,
|
|
_srvSubDuplicate :: Int,
|
|
_srvSubQueues :: Int,
|
|
_srvSubEnd :: Int,
|
|
_srvSubOk :: Int,
|
|
_srvSubMore :: Int,
|
|
_srvSubFewer :: Int,
|
|
_srvSubDiff :: Int,
|
|
_srvSubMoreTotal :: Int,
|
|
_srvSubFewerTotal :: Int
|
|
}
|
|
deriving (Show)
|
|
|
|
newServiceStatsData :: ServiceStatsData
|
|
newServiceStatsData =
|
|
ServiceStatsData
|
|
{ _srvAssocNew = 0,
|
|
_srvAssocDuplicate = 0,
|
|
_srvAssocUpdated = 0,
|
|
_srvAssocRemoved = 0,
|
|
_srvSubCount = 0,
|
|
_srvSubDuplicate = 0,
|
|
_srvSubQueues = 0,
|
|
_srvSubEnd = 0,
|
|
_srvSubOk = 0,
|
|
_srvSubMore = 0,
|
|
_srvSubFewer = 0,
|
|
_srvSubDiff = 0,
|
|
_srvSubMoreTotal = 0,
|
|
_srvSubFewerTotal = 0
|
|
}
|
|
|
|
newServiceStats :: IO ServiceStats
|
|
newServiceStats = do
|
|
srvAssocNew <- newIORef 0
|
|
srvAssocDuplicate <- newIORef 0
|
|
srvAssocUpdated <- newIORef 0
|
|
srvAssocRemoved <- newIORef 0
|
|
srvSubCount <- newIORef 0
|
|
srvSubDuplicate <- newIORef 0
|
|
srvSubQueues <- newIORef 0
|
|
srvSubEnd <- newIORef 0
|
|
srvSubOk <- newIORef 0
|
|
srvSubMore <- newIORef 0
|
|
srvSubFewer <- newIORef 0
|
|
srvSubDiff <- newIORef 0
|
|
srvSubMoreTotal <- newIORef 0
|
|
srvSubFewerTotal <- newIORef 0
|
|
pure
|
|
ServiceStats
|
|
{ srvAssocNew,
|
|
srvAssocDuplicate,
|
|
srvAssocUpdated,
|
|
srvAssocRemoved,
|
|
srvSubCount,
|
|
srvSubDuplicate,
|
|
srvSubQueues,
|
|
srvSubEnd,
|
|
srvSubOk,
|
|
srvSubMore,
|
|
srvSubFewer,
|
|
srvSubDiff,
|
|
srvSubMoreTotal,
|
|
srvSubFewerTotal
|
|
}
|
|
|
|
getServiceStatsData :: ServiceStats -> IO ServiceStatsData
|
|
getServiceStatsData s = do
|
|
_srvAssocNew <- readIORef $ srvAssocNew s
|
|
_srvAssocDuplicate <- readIORef $ srvAssocDuplicate s
|
|
_srvAssocUpdated <- readIORef $ srvAssocUpdated s
|
|
_srvAssocRemoved <- readIORef $ srvAssocRemoved s
|
|
_srvSubCount <- readIORef $ srvSubCount s
|
|
_srvSubDuplicate <- readIORef $ srvSubDuplicate s
|
|
_srvSubQueues <- readIORef $ srvSubQueues s
|
|
_srvSubEnd <- readIORef $ srvSubEnd s
|
|
_srvSubOk <- readIORef $ srvSubOk s
|
|
_srvSubMore <- readIORef $ srvSubMore s
|
|
_srvSubFewer <- readIORef $ srvSubFewer s
|
|
_srvSubDiff <- readIORef $ srvSubDiff s
|
|
_srvSubMoreTotal <- readIORef $ srvSubMoreTotal s
|
|
_srvSubFewerTotal <- readIORef $ srvSubFewerTotal s
|
|
pure
|
|
ServiceStatsData
|
|
{ _srvAssocNew,
|
|
_srvAssocDuplicate,
|
|
_srvAssocUpdated,
|
|
_srvAssocRemoved,
|
|
_srvSubCount,
|
|
_srvSubDuplicate,
|
|
_srvSubQueues,
|
|
_srvSubEnd,
|
|
_srvSubOk,
|
|
_srvSubMore,
|
|
_srvSubFewer,
|
|
_srvSubDiff,
|
|
_srvSubMoreTotal,
|
|
_srvSubFewerTotal
|
|
}
|
|
|
|
getResetServiceStatsData :: ServiceStats -> IO ServiceStatsData
|
|
getResetServiceStatsData s = do
|
|
_srvAssocNew <- atomicSwapIORef (srvAssocNew s) 0
|
|
_srvAssocDuplicate <- atomicSwapIORef (srvAssocDuplicate s) 0
|
|
_srvAssocUpdated <- atomicSwapIORef (srvAssocUpdated s) 0
|
|
_srvAssocRemoved <- atomicSwapIORef (srvAssocRemoved s) 0
|
|
_srvSubCount <- atomicSwapIORef (srvSubCount s) 0
|
|
_srvSubDuplicate <- atomicSwapIORef (srvSubDuplicate s) 0
|
|
_srvSubQueues <- atomicSwapIORef (srvSubQueues s) 0
|
|
_srvSubEnd <- atomicSwapIORef (srvSubEnd s) 0
|
|
_srvSubOk <- atomicSwapIORef (srvSubOk s) 0
|
|
_srvSubMore <- atomicSwapIORef (srvSubMore s) 0
|
|
_srvSubFewer <- atomicSwapIORef (srvSubFewer s) 0
|
|
_srvSubDiff <- atomicSwapIORef (srvSubDiff s) 0
|
|
_srvSubMoreTotal <- atomicSwapIORef (srvSubMoreTotal s) 0
|
|
_srvSubFewerTotal <- atomicSwapIORef (srvSubFewerTotal s) 0
|
|
pure
|
|
ServiceStatsData
|
|
{ _srvAssocNew,
|
|
_srvAssocDuplicate,
|
|
_srvAssocUpdated,
|
|
_srvAssocRemoved,
|
|
_srvSubCount,
|
|
_srvSubDuplicate,
|
|
_srvSubQueues,
|
|
_srvSubEnd,
|
|
_srvSubOk,
|
|
_srvSubMore,
|
|
_srvSubFewer,
|
|
_srvSubDiff,
|
|
_srvSubMoreTotal,
|
|
_srvSubFewerTotal
|
|
}
|
|
|
|
-- this function is not thread safe, it is used on server start only
|
|
setServiceStats :: ServiceStats -> ServiceStatsData -> IO ()
|
|
setServiceStats s d = do
|
|
writeIORef (srvAssocNew s) $! _srvAssocNew d
|
|
writeIORef (srvAssocDuplicate s) $! _srvAssocDuplicate d
|
|
writeIORef (srvAssocUpdated s) $! _srvAssocUpdated d
|
|
writeIORef (srvAssocRemoved s) $! _srvAssocRemoved d
|
|
writeIORef (srvSubCount s) $! _srvSubCount d
|
|
writeIORef (srvSubDuplicate s) $! _srvSubDuplicate d
|
|
writeIORef (srvSubQueues s) $! _srvSubQueues d
|
|
writeIORef (srvSubEnd s) $! _srvSubEnd d
|
|
writeIORef (srvSubOk s) $! _srvSubOk d
|
|
writeIORef (srvSubMore s) $! _srvSubMore d
|
|
writeIORef (srvSubFewer s) $! _srvSubFewer d
|
|
writeIORef (srvSubDiff s) $! _srvSubDiff d
|
|
writeIORef (srvSubMoreTotal s) $! _srvSubMoreTotal d
|
|
writeIORef (srvSubFewerTotal s) $! _srvSubFewerTotal d
|
|
|
|
instance StrEncoding ServiceStatsData where
|
|
strEncode ServiceStatsData {_srvAssocNew, _srvAssocDuplicate, _srvAssocUpdated, _srvAssocRemoved, _srvSubCount, _srvSubDuplicate, _srvSubQueues, _srvSubEnd} =
|
|
"assocNew="
|
|
<> strEncode _srvAssocNew
|
|
<> "\nassocDuplicate="
|
|
<> strEncode _srvAssocDuplicate
|
|
<> "\nassocUpdatedt="
|
|
<> strEncode _srvAssocUpdated
|
|
<> "\nassocRemoved="
|
|
<> strEncode _srvAssocRemoved
|
|
<> "\nsubCount="
|
|
<> strEncode _srvSubCount
|
|
<> "\nsubDuplicate="
|
|
<> strEncode _srvSubDuplicate
|
|
<> "\nsubQueues="
|
|
<> strEncode _srvSubQueues
|
|
<> "\nsubEnd="
|
|
<> strEncode _srvSubEnd
|
|
strP = do
|
|
_srvAssocNew <- "assocNew=" *> strP <* A.endOfLine
|
|
_srvAssocDuplicate <- "assocDuplicate=" *> strP <* A.endOfLine
|
|
_srvAssocUpdated <- "assocUpdatedt=" *> strP <* A.endOfLine
|
|
_srvAssocRemoved <- "assocRemoved=" *> strP <* A.endOfLine
|
|
_srvSubCount <- "subCount=" *> strP <* A.endOfLine
|
|
_srvSubDuplicate <- "subDuplicate=" *> strP <* A.endOfLine
|
|
_srvSubQueues <- "subQueues=" *> strP <* A.endOfLine
|
|
_srvSubEnd <- "subEnd=" *> strP
|
|
pure
|
|
ServiceStatsData
|
|
{ _srvAssocNew,
|
|
_srvAssocDuplicate,
|
|
_srvAssocUpdated,
|
|
_srvAssocRemoved,
|
|
_srvSubCount,
|
|
_srvSubDuplicate,
|
|
_srvSubQueues,
|
|
_srvSubEnd,
|
|
_srvSubOk = 0,
|
|
_srvSubMore = 0,
|
|
_srvSubFewer = 0,
|
|
_srvSubDiff = 0,
|
|
_srvSubMoreTotal = 0,
|
|
_srvSubFewerTotal = 0
|
|
}
|
|
|
|
data TimeBuckets = TimeBuckets
|
|
{ sumTime :: Int64,
|
|
maxTime :: Int64,
|
|
timeBuckets :: IM.IntMap Int
|
|
}
|
|
deriving (Show)
|
|
|
|
emptyTimeBuckets :: TimeBuckets
|
|
emptyTimeBuckets = TimeBuckets 0 0 IM.empty
|
|
|
|
updateTimeBuckets :: SystemSeconds -> SystemSeconds -> TimeBuckets -> TimeBuckets
|
|
updateTimeBuckets
|
|
(RoundedSystemTime deliveryTime)
|
|
(RoundedSystemTime currTime)
|
|
TimeBuckets {sumTime, maxTime, timeBuckets} =
|
|
TimeBuckets
|
|
{ sumTime = sumTime + t,
|
|
maxTime = max maxTime t,
|
|
timeBuckets = IM.alter (Just . maybe 1 (+ 1)) seconds timeBuckets
|
|
}
|
|
where
|
|
t = currTime - deliveryTime
|
|
seconds
|
|
| t <= 5 = fromIntegral t
|
|
| t <= 30 = t `toBucket` 5
|
|
| t <= 60 = t `toBucket` 10
|
|
| t <= 180 = t `toBucket` 30
|
|
| otherwise = t `toBucket` 60
|
|
toBucket n m = - fromIntegral (((- n) `div` m) * m) -- round up
|