diff --git a/package.yaml b/package.yaml index fd983e476..7cd522404 100644 --- a/package.yaml +++ b/package.yaml @@ -60,6 +60,7 @@ dependencies: - template-haskell == 2.16.* - text == 1.2.* - time == 1.9.* + - time-compat == 1.9.* - time-manager == 0.0.* - tls >= 1.6.0 && < 1.7 - transformers == 0.5.* diff --git a/simplexmq.cabal b/simplexmq.cabal index 8ef8fe65e..3db372d22 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -127,6 +127,7 @@ library , template-haskell ==2.16.* , text ==1.2.* , time ==1.9.* + , time-compat ==1.9.* , time-manager ==0.0.* , tls >=1.6.0 && <1.7 , transformers ==0.5.* @@ -187,6 +188,7 @@ executable ntf-server , template-haskell ==2.16.* , text ==1.2.* , time ==1.9.* + , time-compat ==1.9.* , time-manager ==0.0.* , tls >=1.6.0 && <1.7 , transformers ==0.5.* @@ -247,6 +249,7 @@ executable smp-agent , template-haskell ==2.16.* , text ==1.2.* , time ==1.9.* + , time-compat ==1.9.* , time-manager ==0.0.* , tls >=1.6.0 && <1.7 , transformers ==0.5.* @@ -307,6 +310,7 @@ executable smp-server , template-haskell ==2.16.* , text ==1.2.* , time ==1.9.* + , time-compat ==1.9.* , time-manager ==0.0.* , tls >=1.6.0 && <1.7 , transformers ==0.5.* @@ -386,6 +390,7 @@ test-suite smp-server-test , template-haskell ==2.16.* , text ==1.2.* , time ==1.9.* + , time-compat ==1.9.* , time-manager ==0.0.* , timeit ==2.0.* , tls >=1.6.0 && <1.7 diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 9f0730799..46a65e2e0 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -7,6 +7,7 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} @@ -45,8 +46,11 @@ import Data.Functor (($>)) import Data.List (intercalate) import qualified Data.Map.Strict as M import Data.Maybe (isNothing) +import Data.Set (Set) import qualified Data.Set as S import qualified Data.Text as T +import Data.Time.Calendar.Month.Compat (pattern MonthDay) +import Data.Time.Calendar.OrdinalDate (mondayStartWeek) import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Data.Type.Equality @@ -159,9 +163,9 @@ smpServer started = do logServerStats :: Int -> Int -> m () logServerStats startAt logInterval = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime - logInfo $ "fromTime,qCreated,qSecured,qDeleted,msgSent,msgRecv,msgQueues" + logInfo $ "fromTime,qCreated,qSecured,qDeleted,msgSent,msgRecv,dayMsgQueues,weekMsgQueues,monthMsgQueues" threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) - ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgQueues} <- asks serverStats + ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, dayMsgQueues, weekMsgQueues, monthMsgQueues} <- asks serverStats let interval = 1000000 * logInterval forever $ do ts <- liftIO getCurrentTime @@ -171,9 +175,17 @@ smpServer started = do qDeleted' <- atomically $ swapTVar qDeleted 0 msgSent' <- atomically $ swapTVar msgSent 0 msgRecv' <- atomically $ swapTVar msgRecv 0 - msgQueues' <- atomically $ S.size <$> swapTVar msgQueues S.empty - logInfo . T.pack $ intercalate "," [show fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', show msgQueues'] + let day = utctDay ts + (_, wDay) = mondayStartWeek day + MonthDay _ mDay = day + (dayMsgQueues', weekMsgQueues', monthMsgQueues') <- + atomically $ (,,) <$> periodCount 1 dayMsgQueues <*> periodCount wDay weekMsgQueues <*> periodCount mDay monthMsgQueues + logInfo . T.pack $ intercalate "," [show fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', show dayMsgQueues', weekMsgQueues', monthMsgQueues'] threadDelay interval + where + periodCount :: Int -> TVar (Set RecipientId) -> STM String + periodCount 1 pVar = show . S.size <$> swapTVar pVar S.empty + periodCount _ _ = pure "" runClient :: Transport c => TProxy c -> c -> m () runClient _ h = do @@ -417,10 +429,18 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri Just (Just s) -> do stats <- asks serverStats atomically $ modifyTVar (msgRecv stats) (+ 1) - atomically $ modifyTVar (msgQueues stats) (S.insert queueId) + atomically $ updateActiveQueues stats queueId deliverMessage tryDelPeekMsg queueId s _ -> return $ err NO_MSG + updateActiveQueues :: ServerStats -> RecipientId -> STM () + updateActiveQueues stats qId = do + updatePeriod dayMsgQueues + updatePeriod weekMsgQueues + updatePeriod monthMsgQueues + where + updatePeriod pSel = modifyTVar (pSel stats) (S.insert qId) + withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a) withSub rId f = mapM f =<< TM.lookup rId subscriptions @@ -451,7 +471,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri when (sent == OK) $ do stats <- asks serverStats atomically $ modifyTVar (msgSent stats) (+ 1) - atomically $ modifyTVar (msgQueues stats) (S.insert $ recipientId qr) + atomically $ updateActiveQueues stats $ recipientId qr pure resp where mkMessage :: m (Either C.CryptoError Message) diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index badf6a983..f995eceb1 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -111,7 +111,9 @@ data ServerStats = ServerStats qDeleted :: TVar Int, msgSent :: TVar Int, msgRecv :: TVar Int, - msgQueues :: TVar (Set RecipientId), + dayMsgQueues :: TVar (Set RecipientId), + weekMsgQueues :: TVar (Set RecipientId), + monthMsgQueues :: TVar (Set RecipientId), fromTime :: TVar UTCTime } @@ -147,9 +149,11 @@ newServerStats ts = do qDeleted <- newTVar 0 msgSent <- newTVar 0 msgRecv <- newTVar 0 - msgQueues <- newTVar S.empty + dayMsgQueues <- newTVar S.empty + weekMsgQueues <- newTVar S.empty + monthMsgQueues <- newTVar S.empty fromTime <- newTVar ts - pure ServerStats {qCreated, qSecured, qDeleted, msgSent, msgRecv, msgQueues, fromTime} + pure ServerStats {qCreated, qSecured, qDeleted, msgSent, msgRecv, dayMsgQueues, weekMsgQueues, monthMsgQueues, fromTime} newSubscription :: STM Sub newSubscription = do