mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-18 02:35:16 +00:00
log weekly and monthly active queues (#404)
* log weekly and monthly active queues * update
This commit is contained in:
committed by
GitHub
parent
5ccebaeb21
commit
50ddcd3a13
@@ -60,6 +60,7 @@ dependencies:
|
||||
- template-haskell == 2.16.*
|
||||
- text == 1.2.*
|
||||
- time == 1.9.*
|
||||
- time-compat == 1.9.*
|
||||
- time-manager == 0.0.*
|
||||
- tls >= 1.6.0 && < 1.7
|
||||
- transformers == 0.5.*
|
||||
|
||||
@@ -127,6 +127,7 @@ library
|
||||
, template-haskell ==2.16.*
|
||||
, text ==1.2.*
|
||||
, time ==1.9.*
|
||||
, time-compat ==1.9.*
|
||||
, time-manager ==0.0.*
|
||||
, tls >=1.6.0 && <1.7
|
||||
, transformers ==0.5.*
|
||||
@@ -187,6 +188,7 @@ executable ntf-server
|
||||
, template-haskell ==2.16.*
|
||||
, text ==1.2.*
|
||||
, time ==1.9.*
|
||||
, time-compat ==1.9.*
|
||||
, time-manager ==0.0.*
|
||||
, tls >=1.6.0 && <1.7
|
||||
, transformers ==0.5.*
|
||||
@@ -247,6 +249,7 @@ executable smp-agent
|
||||
, template-haskell ==2.16.*
|
||||
, text ==1.2.*
|
||||
, time ==1.9.*
|
||||
, time-compat ==1.9.*
|
||||
, time-manager ==0.0.*
|
||||
, tls >=1.6.0 && <1.7
|
||||
, transformers ==0.5.*
|
||||
@@ -307,6 +310,7 @@ executable smp-server
|
||||
, template-haskell ==2.16.*
|
||||
, text ==1.2.*
|
||||
, time ==1.9.*
|
||||
, time-compat ==1.9.*
|
||||
, time-manager ==0.0.*
|
||||
, tls >=1.6.0 && <1.7
|
||||
, transformers ==0.5.*
|
||||
@@ -386,6 +390,7 @@ test-suite smp-server-test
|
||||
, template-haskell ==2.16.*
|
||||
, text ==1.2.*
|
||||
, time ==1.9.*
|
||||
, time-compat ==1.9.*
|
||||
, time-manager ==0.0.*
|
||||
, timeit ==2.0.*
|
||||
, tls >=1.6.0 && <1.7
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
@@ -45,8 +46,11 @@ import Data.Functor (($>))
|
||||
import Data.List (intercalate)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (isNothing)
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import qualified Data.Text as T
|
||||
import Data.Time.Calendar.Month.Compat (pattern MonthDay)
|
||||
import Data.Time.Calendar.OrdinalDate (mondayStartWeek)
|
||||
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
|
||||
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
|
||||
import Data.Type.Equality
|
||||
@@ -159,9 +163,9 @@ smpServer started = do
|
||||
logServerStats :: Int -> Int -> m ()
|
||||
logServerStats startAt logInterval = do
|
||||
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
|
||||
logInfo $ "fromTime,qCreated,qSecured,qDeleted,msgSent,msgRecv,msgQueues"
|
||||
logInfo $ "fromTime,qCreated,qSecured,qDeleted,msgSent,msgRecv,dayMsgQueues,weekMsgQueues,monthMsgQueues"
|
||||
threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
|
||||
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgQueues} <- asks serverStats
|
||||
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, dayMsgQueues, weekMsgQueues, monthMsgQueues} <- asks serverStats
|
||||
let interval = 1000000 * logInterval
|
||||
forever $ do
|
||||
ts <- liftIO getCurrentTime
|
||||
@@ -171,9 +175,17 @@ smpServer started = do
|
||||
qDeleted' <- atomically $ swapTVar qDeleted 0
|
||||
msgSent' <- atomically $ swapTVar msgSent 0
|
||||
msgRecv' <- atomically $ swapTVar msgRecv 0
|
||||
msgQueues' <- atomically $ S.size <$> swapTVar msgQueues S.empty
|
||||
logInfo . T.pack $ intercalate "," [show fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', show msgQueues']
|
||||
let day = utctDay ts
|
||||
(_, wDay) = mondayStartWeek day
|
||||
MonthDay _ mDay = day
|
||||
(dayMsgQueues', weekMsgQueues', monthMsgQueues') <-
|
||||
atomically $ (,,) <$> periodCount 1 dayMsgQueues <*> periodCount wDay weekMsgQueues <*> periodCount mDay monthMsgQueues
|
||||
logInfo . T.pack $ intercalate "," [show fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', show dayMsgQueues', weekMsgQueues', monthMsgQueues']
|
||||
threadDelay interval
|
||||
where
|
||||
periodCount :: Int -> TVar (Set RecipientId) -> STM String
|
||||
periodCount 1 pVar = show . S.size <$> swapTVar pVar S.empty
|
||||
periodCount _ _ = pure ""
|
||||
|
||||
runClient :: Transport c => TProxy c -> c -> m ()
|
||||
runClient _ h = do
|
||||
@@ -417,10 +429,18 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
|
||||
Just (Just s) -> do
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar (msgRecv stats) (+ 1)
|
||||
atomically $ modifyTVar (msgQueues stats) (S.insert queueId)
|
||||
atomically $ updateActiveQueues stats queueId
|
||||
deliverMessage tryDelPeekMsg queueId s
|
||||
_ -> return $ err NO_MSG
|
||||
|
||||
updateActiveQueues :: ServerStats -> RecipientId -> STM ()
|
||||
updateActiveQueues stats qId = do
|
||||
updatePeriod dayMsgQueues
|
||||
updatePeriod weekMsgQueues
|
||||
updatePeriod monthMsgQueues
|
||||
where
|
||||
updatePeriod pSel = modifyTVar (pSel stats) (S.insert qId)
|
||||
|
||||
withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a)
|
||||
withSub rId f = mapM f =<< TM.lookup rId subscriptions
|
||||
|
||||
@@ -451,7 +471,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
|
||||
when (sent == OK) $ do
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar (msgSent stats) (+ 1)
|
||||
atomically $ modifyTVar (msgQueues stats) (S.insert $ recipientId qr)
|
||||
atomically $ updateActiveQueues stats $ recipientId qr
|
||||
pure resp
|
||||
where
|
||||
mkMessage :: m (Either C.CryptoError Message)
|
||||
|
||||
@@ -111,7 +111,9 @@ data ServerStats = ServerStats
|
||||
qDeleted :: TVar Int,
|
||||
msgSent :: TVar Int,
|
||||
msgRecv :: TVar Int,
|
||||
msgQueues :: TVar (Set RecipientId),
|
||||
dayMsgQueues :: TVar (Set RecipientId),
|
||||
weekMsgQueues :: TVar (Set RecipientId),
|
||||
monthMsgQueues :: TVar (Set RecipientId),
|
||||
fromTime :: TVar UTCTime
|
||||
}
|
||||
|
||||
@@ -147,9 +149,11 @@ newServerStats ts = do
|
||||
qDeleted <- newTVar 0
|
||||
msgSent <- newTVar 0
|
||||
msgRecv <- newTVar 0
|
||||
msgQueues <- newTVar S.empty
|
||||
dayMsgQueues <- newTVar S.empty
|
||||
weekMsgQueues <- newTVar S.empty
|
||||
monthMsgQueues <- newTVar S.empty
|
||||
fromTime <- newTVar ts
|
||||
pure ServerStats {qCreated, qSecured, qDeleted, msgSent, msgRecv, msgQueues, fromTime}
|
||||
pure ServerStats {qCreated, qSecured, qDeleted, msgSent, msgRecv, dayMsgQueues, weekMsgQueues, monthMsgQueues, fromTime}
|
||||
|
||||
newSubscription :: STM Sub
|
||||
newSubscription = do
|
||||
|
||||
Reference in New Issue
Block a user