smp server: remove subscriptions from the clients when queues are deleted (#1234)

* smp server: remove subscriptions from the clients when queues are deleted

* remove ntf subscriptions, update stats

* add GET stats
This commit is contained in:
Evgeny Poberezkin
2024-07-18 10:59:48 +01:00
committed by GitHub
parent 8d56b0ba85
commit e59a098e66
4 changed files with 212 additions and 78 deletions

View File

@@ -158,7 +158,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
forall s.
Server ->
String ->
(Server -> TQueue (QueueId, Client)) ->
(Server -> TQueue (QueueId, Client, Subscribed)) ->
(Server -> TMap QueueId Client) ->
(Client -> TMap QueueId s) ->
(s -> IO ()) ->
@@ -172,14 +172,16 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
where
updateSubscribers :: STM (Maybe (QueueId, Client))
updateSubscribers = do
(qId, clnt) <- readTQueue $ subQ s
let clientToBeNotified c' =
if sameClientId clnt c'
then pure Nothing
else do
(qId, clnt, subscribed) <- readTQueue $ subQ s
let updateSub
| subscribed = TM.lookupInsert qId clnt (subs s)
| otherwise = TM.lookupDelete qId (subs s)
clientToBeNotified c'
| sameClientId clnt c' = pure Nothing
| otherwise = do
yes <- readTVar $ connected c'
pure $ if yes then Just (qId, c') else Nothing
TM.lookupInsert qId clnt (subs s) $>>= clientToBeNotified
updateSub $>>= clientToBeNotified
endPreviousSubscriptions :: (QueueId, Client) -> M (Maybe s)
endPreviousSubscriptions (qId, c) = do
forkClient c (label <> ".endPreviousSubscriptions") $
@@ -229,7 +231,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats
ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubNoMsg, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats
let interval = 1000000 * logInterval
forever $ do
withFile statsFilePath AppendMode $ \h -> liftIO $ do
@@ -242,6 +244,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
qDeletedNew' <- atomically $ swapTVar qDeletedNew 0
qDeletedSecured' <- atomically $ swapTVar qDeletedSecured 0
qSub' <- atomically $ swapTVar qSub 0
qSubNoMsg' <- atomically $ swapTVar qSubNoMsg 0
qSubAuth' <- atomically $ swapTVar qSubAuth 0
qSubDuplicate' <- atomically $ swapTVar qSubDuplicate 0
qSubProhibited' <- atomically $ swapTVar qSubProhibited 0
@@ -250,6 +253,12 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
msgSentQuota' <- atomically $ swapTVar msgSentQuota 0
msgSentLarge' <- atomically $ swapTVar msgSentLarge 0
msgRecv' <- atomically $ swapTVar msgRecv 0
msgRecvGet' <- atomically $ swapTVar msgRecvGet 0
msgGet' <- atomically $ swapTVar msgGet 0
msgGetNoMsg' <- atomically $ swapTVar msgGetNoMsg 0
msgGetAuth' <- atomically $ swapTVar msgGetAuth 0
msgGetDuplicate' <- atomically $ swapTVar msgGetDuplicate 0
msgGetProhibited' <- atomically $ swapTVar msgGetProhibited 0
msgExpired' <- atomically $ swapTVar msgExpired 0
ps <- atomically $ periodStatCounts activeQueues ts
msgSentNtf' <- atomically $ swapTVar msgSentNtf 0
@@ -302,7 +311,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
show msgSentLarge',
show msgNtfs',
show msgNtfNoSub',
show msgNtfLost'
show msgNtfLost',
show qSubNoMsg',
show msgRecvGet',
show msgGet',
show msgGetNoMsg',
show msgGetAuth',
show msgGetDuplicate',
show msgGetProhibited'
]
)
liftIO $ threadDelay' interval
@@ -394,6 +410,12 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
readTVarIO (day $ activeQueues ss) >>= \v -> B.hPutStr h $ "dayMsgQueues" <> ": " <> bshow (S.size v) <> "\n"
putStat "msgSent" msgSent
putStat "msgRecv" msgRecv
putStat "msgRecvGet" msgRecvGet
putStat "msgGet" msgGet
putStat "msgGetNoMsg" msgGetNoMsg
putStat "msgGetAuth" msgGetAuth
putStat "msgGetDuplicate" msgGetDuplicate
putStat "msgGetProhibited" msgGetProhibited
putStat "msgSentNtf" msgSentNtf
putStat "msgRecvNtf" msgRecvNtf
putStat "qCount" qCount
@@ -452,9 +474,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
activeClients <- readTVarIO clients
hPutStrLn h $ "Clients: " <> show (IM.size activeClients)
when (r == CPRAdmin) $ do
(smpSubCnt, smpClCnt) <- countClientSubs subscriptions activeClients
(ntfSubCnt, ntfClCnt) <- countClientSubs ntfSubscriptions activeClients
(smpSubCnt, smpSubCntByGroup, smpClCnt) <- countClientSubs subscriptions countSMPSubs activeClients
(ntfSubCnt, _, ntfClCnt) <- countClientSubs ntfSubscriptions (\_ -> pure (0, 0, 0, 0)) activeClients
hPutStrLn h $ "SMP subscriptions (via clients, slow): " <> show smpSubCnt
hPutStrLn h $ "SMP subscriptions (by group: NoSub, SubPending, SubThread, ProhibitSub): " <> show smpSubCntByGroup
hPutStrLn h $ "SMP subscribed clients (via clients, slow): " <> show smpClCnt
hPutStrLn h $ "Ntf subscriptions (via clients, slow): " <> show ntfSubCnt
hPutStrLn h $ "Ntf subscribed clients (via clients, slow): " <> show ntfClCnt
@@ -465,14 +488,25 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
hPutStrLn h $ "Ntf subscriptions: " <> show (M.size activeNtfSubs)
hPutStrLn h $ "Ntf subscribed clients: " <> show (countSubClients activeNtfSubs)
where
countClientSubs :: (Client -> TMap QueueId a) -> IM.IntMap Client -> IO (Int, Int)
countClientSubs subSel = foldM addSubs (0, 0)
countClientSubs :: (Client -> TMap QueueId a) -> (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap Client -> IO (Int, (Int, Int, Int, Int), Int)
countClientSubs subSel countSubs = foldM addSubs (0, (0, 0, 0, 0), 0)
where
addSubs :: (Int, Int) -> Client -> IO (Int, Int)
addSubs (subCnt, clCnt) cl = do
addSubs :: (Int, (Int, Int, Int, Int), Int) -> Client -> IO (Int, (Int, Int, Int, Int), Int)
addSubs (subCnt, (c1, c2, c3, c4), clCnt) cl = do
subs <- readTVarIO $ subSel cl
(c1', c2', c3', c4') <- countSubs subs
let cnt = M.size subs
pure (subCnt + cnt, clCnt + if cnt == 0 then 0 else 1)
cnts' = (c1 + c1', c2 + c2', c3 + c3', c4 + c4')
pure (subCnt + cnt, cnts', clCnt + if cnt == 0 then 0 else 1)
countSMPSubs :: M.Map QueueId Sub -> IO (Int, Int, Int, Int)
countSMPSubs = foldM countSubs (0, 0, 0, 0)
where
countSubs (c1, c2, c3, c4) Sub {subThread} =
readTVarIO subThread >>= \st -> pure $ case st of
NoSub -> (c1 + 1, c2, c3, c4)
SubPending -> (c1, c2 + 1, c3, c4)
SubThread _ -> (c1, c2, c3 + 1, c4)
ProhibitSub -> (c1, c2, c3, c4 + 1)
countSubClients = S.size . M.foldr' (S.insert . clientId) S.empty
CPDelete queueId' -> withUserRole $ unliftIO u $ do
st <- asks queueStore
@@ -531,19 +565,22 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio
noSubscriptions c = atomically $ (&&) <$> TM.null (subscriptions c) <*> TM.null (ntfSubscriptions c)
clientDisconnected :: Client -> M ()
clientDisconnected c@Client {clientId, subscriptions, connected, sessionId, endThreads} = do
clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connected, sessionId, endThreads} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " disc"
subs <- atomically $ do
(subs, ntfSubs) <- atomically $ do
writeTVar connected False
swapTVar subscriptions M.empty
(,) <$> swapTVar subscriptions M.empty <*> swapTVar ntfSubscriptions M.empty
liftIO $ mapM_ cancelSub subs
srvSubs <- asks $ subscribers . server
atomically $ modifyTVar' srvSubs $ \cs ->
M.foldrWithKey (\sub _ -> M.update deleteCurrentClient sub) cs subs
Server {subscribers, notifiers} <- asks server
updateSubscribers subs subscribers
updateSubscribers ntfSubs notifiers
asks clients >>= atomically . (`modifyTVar'` IM.delete clientId)
tIds <- atomically $ swapTVar endThreads IM.empty
liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds
where
updateSubscribers subs srvSubs = do
atomically $ modifyTVar' srvSubs $ \cs ->
M.foldrWithKey (\sub _ -> M.update deleteCurrentClient sub) cs subs
deleteCurrentClient :: Client -> Maybe Client
deleteCurrentClient c'
| sameClientId c c' = Nothing
@@ -579,8 +616,9 @@ receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiv
VRVerified qr -> pure $ Right (qr, (corrId, entId, cmd))
VRFailed -> do
case cmd of
Cmd _ SEND {} -> atomically $ modifyTVar' (msgSentAuth stats) (+ 1)
Cmd _ SUB -> atomically $ modifyTVar' (qSubAuth stats) (+ 1)
Cmd _ SEND {} -> incStat $ msgSentAuth stats
Cmd _ SUB -> incStat $ qSubAuth stats
Cmd _ GET -> incStat $ msgGetAuth stats
_ -> pure ()
pure $ Left (corrId, entId, ERR AUTH)
write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty
@@ -808,8 +846,8 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
transportErr = PROXY . BROKER . TRANSPORT
mkIncProxyStats :: MonadIO m => ProxyStats -> ProxyStats -> OwnServer -> (ProxyStats -> TVar Int) -> m ()
mkIncProxyStats ps psOwn own sel = do
atomically $ modifyTVar' (sel ps) (+ 1)
when own $ atomically $ modifyTVar' (sel psOwn) (+ 1)
incStat $ sel ps
when own $ incStat $ sel psOwn
processCommand :: (Maybe QueueRec, Transmission Cmd) -> M (Maybe (Transmission BrokerMsg))
processCommand (qr_, (corrId, queueId, cmd)) = case cmd of
Cmd SProxiedClient command -> processProxiedCmd (corrId, queueId, command)
@@ -878,8 +916,8 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
Right _ -> do
withLog (`logCreateById` rId)
stats <- asks serverStats
atomically $ modifyTVar' (qCreated stats) (+ 1)
atomically $ modifyTVar' (qCount stats) (+ 1)
incStat $ qCreated stats
incStat $ qCount stats
case subMode of
SMOnlyCreate -> pure ()
SMSubscribe -> void $ subscribeQueue qr rId
@@ -901,7 +939,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
withLog $ \s -> logSecureQueue s rId sKey
st <- asks queueStore
stats <- asks serverStats
atomically $ modifyTVar' (qSecured stats) (+ 1)
incStat $ qSecured stats
atomically $ either ERR (const OK) <$> secureQueue st rId sKey
addQueueNotifier_ :: QueueStore -> NtfPublicAuthKey -> RcvNtfPublicDhKey -> M (Transmission BrokerMsg)
@@ -925,7 +963,12 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
deleteQueueNotifier_ :: QueueStore -> M (Transmission BrokerMsg)
deleteQueueNotifier_ st = do
withLog (`logDeleteNotifier` queueId)
okResp <$> atomically (deleteQueueNotifier st queueId)
atomically (deleteQueueNotifier st queueId) >>= \case
Right () -> do
-- Possibly, the same should be done if the queue is suspended, but currently we do not use it
atomically $ writeTQueue ntfSubscribedQ (queueId, clnt, False)
pure ok
Left e -> pure $ err e
suspendQueue_ :: QueueStore -> M (Transmission BrokerMsg)
suspendQueue_ st = do
@@ -934,60 +977,69 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
subscribeQueue :: QueueRec -> RecipientId -> M (Transmission BrokerMsg)
subscribeQueue qr rId = do
stats <- asks serverStats
atomically (TM.lookup rId subscriptions) >>= \case
Nothing -> do
atomically $ modifyTVar' (qSub stats) (+ 1)
newSub >>= deliver
Just s@Sub {subThread} ->
Nothing -> newSub >>= deliver True
Just s@Sub {subThread} -> do
stats <- asks serverStats
readTVarIO subThread >>= \case
ProhibitSub -> do
-- cannot use SUB in the same connection where GET was used
atomically $ modifyTVar' (qSubProhibited stats) (+ 1)
incStat $ qSubProhibited stats
pure (corrId, rId, ERR $ CMD PROHIBITED)
_ -> do
atomically $ modifyTVar' (qSubDuplicate stats) (+ 1)
atomically (tryTakeTMVar $ delivered s) >> deliver s
incStat $ qSubDuplicate stats
atomically (tryTakeTMVar $ delivered s) >> deliver False s
where
newSub :: M Sub
newSub = time "SUB newSub" . atomically $ do
writeTQueue subscribedQ (rId, clnt)
writeTQueue subscribedQ (rId, clnt, True)
sub <- newSubscription NoSub
TM.insert rId sub subscriptions
pure sub
deliver :: Sub -> M (Transmission BrokerMsg)
deliver sub = do
deliver :: Bool -> Sub -> M (Transmission BrokerMsg)
deliver inc sub = do
q <- getStoreMsgQueue "SUB" rId
msg_ <- atomically $ tryPeekMsg q
when inc $ do
stats <- asks serverStats
incStat $ (if isJust msg_ then qSub else qSubNoMsg) stats
deliverMessage "SUB" qr rId sub msg_
getMessage :: QueueRec -> M (Transmission BrokerMsg)
getMessage qr = time "GET" $ do
atomically (TM.lookup queueId subscriptions) >>= \case
Nothing ->
atomically newSub >>= getMessage_
atomically newSub >>= (`getMessage_` Nothing)
Just s@Sub {subThread} ->
readTVarIO subThread >>= \case
ProhibitSub ->
atomically (tryTakeTMVar $ delivered s)
>> getMessage_ s
>>= getMessage_ s
-- cannot use GET in the same connection where there is an active subscription
_ -> pure (corrId, queueId, ERR $ CMD PROHIBITED)
_ -> do
stats <- asks serverStats
incStat $ msgGetProhibited stats
pure (corrId, queueId, ERR $ CMD PROHIBITED)
where
newSub :: STM Sub
newSub = do
s <- newSubscription ProhibitSub
TM.insert queueId s subscriptions
pure s
getMessage_ :: Sub -> M (Transmission BrokerMsg)
getMessage_ s = do
getMessage_ :: Sub -> Maybe MsgId -> M (Transmission BrokerMsg)
getMessage_ s delivered_ = do
q <- getStoreMsgQueue "GET" queueId
atomically $
tryPeekMsg q >>= \case
Just msg ->
let encMsg = encryptMsg qr msg
in setDelivered s msg $> (corrId, queueId, MSG encMsg)
_ -> pure (corrId, queueId, OK)
stats <- asks serverStats
(statCnt, r) <-
atomically $
tryPeekMsg q >>= \case
Just msg ->
let encMsg = encryptMsg qr msg
cnt = if isJust delivered_ then msgGetDuplicate else msgGet
in setDelivered s msg $> (cnt, (corrId, queueId, MSG encMsg))
_ -> pure (msgGetNoMsg, (corrId, queueId, OK))
incStat $ statCnt stats
pure r
withQueue :: (QueueRec -> M (Transmission BrokerMsg)) -> M (Transmission BrokerMsg)
withQueue action = maybe (pure $ err AUTH) action qr_
@@ -995,7 +1047,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
subscribeNotifications :: M (Transmission BrokerMsg)
subscribeNotifications = time "NSUB" . atomically $ do
unlessM (TM.member queueId ntfSubscriptions) $ do
writeTQueue ntfSubscribedQ (queueId, clnt)
writeTQueue ntfSubscribedQ (queueId, clnt, True)
TM.insert queueId () ntfSubscriptions
pure ok
@@ -1010,11 +1062,11 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
case st of
ProhibitSub -> do
deletedMsg_ <- atomically $ tryDelMsg q msgId
mapM_ updateStats deletedMsg_
mapM_ (updateStats True) deletedMsg_
pure ok
_ -> do
(deletedMsg_, msg_) <- atomically $ tryDelPeekMsg q msgId
mapM_ updateStats deletedMsg_
mapM_ (updateStats False) deletedMsg_
deliverMessage "ACK" qr queueId sub msg_
_ -> pure $ err NO_MSG
where
@@ -1024,29 +1076,30 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
if msgId == msgId' || B.null msgId
then Just <$> readTVar subThread
else putTMVar delivered msgId' $> Nothing
updateStats :: Message -> M ()
updateStats = \case
updateStats :: Bool -> Message -> M ()
updateStats isGet = \case
MessageQuota {} -> pure ()
Message {msgFlags} -> do
stats <- asks serverStats
atomically $ modifyTVar' (msgRecv stats) (+ 1)
incStat $ msgRecv stats
when isGet $ incStat $ msgRecvGet stats
atomically $ modifyTVar' (msgCount stats) (subtract 1)
atomically $ updatePeriodStats (activeQueues stats) queueId
when (notification msgFlags) $ do
atomically $ modifyTVar' (msgRecvNtf stats) (+ 1)
incStat $ msgRecvNtf stats
atomically $ updatePeriodStats (activeQueuesNtf stats) queueId
sendMessage :: QueueRec -> MsgFlags -> MsgBody -> M (Transmission BrokerMsg)
sendMessage qr msgFlags msgBody
| B.length msgBody > maxMessageLength thVersion = do
stats <- asks serverStats
atomically $ modifyTVar' (msgSentLarge stats) (+ 1)
incStat $ msgSentLarge stats
pure $ err LARGE_MSG
| otherwise = do
stats <- asks serverStats
case status qr of
QueueOff -> do
atomically $ modifyTVar' (msgSentAuth stats) (+ 1)
incStat $ msgSentAuth stats
pure $ err AUTH
QueueActive ->
case C.maxLenBS msgBody of
@@ -1058,7 +1111,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
atomically . writeMsg q =<< mkMessage body
case msg_ of
Nothing -> do
atomically $ modifyTVar' (msgSentQuota stats) (+ 1)
incStat $ msgSentQuota stats
pure $ err QUOTA
Just (msg, wasEmpty) -> time "SEND ok" $ do
when wasEmpty $ tryDeliverMessage msg
@@ -1066,16 +1119,16 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
forM_ (notifier qr) $ \ntf -> do
asks random >>= atomically . trySendNotification ntf msg >>= \case
Nothing -> do
atomically $ modifyTVar' (msgNtfNoSub stats) (+ 1)
incStat $ msgNtfNoSub stats
logWarn "No notification subscription"
Just False -> do
atomically $ modifyTVar' (msgNtfLost stats) (+ 1)
incStat $ msgNtfLost stats
logWarn "Dropped message notification"
Just True -> atomically $ modifyTVar' (msgNtfs stats) (+ 1)
atomically $ modifyTVar' (msgSentNtf stats) (+ 1)
Just True -> incStat $ msgNtfs stats
incStat $ msgSentNtf stats
atomically $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr)
atomically $ modifyTVar' (msgSent stats) (+ 1)
atomically $ modifyTVar' (msgCount stats) (+ 1)
incStat $ msgSent stats
incStat $ msgCount stats
atomically $ updatePeriodStats (activeQueues stats) (recipientId qr)
pure ok
where
@@ -1197,7 +1250,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
let fr = FwdResponse {fwdCorrId, fwdResponse = r2}
r3 = EncFwdResponse $ C.cbEncryptNoPad sessSecret (C.reverseNonce proxyNonce) (smpEncode fr)
stats <- asks serverStats
atomically $ modifyTVar' (pMsgFwdsRecv stats) (+ 1)
incStat $ pMsgFwdsRecv stats
pure $ RRES r3
where
rejectOrVerify :: Maybe (THandleAuth 'TServer) -> SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd))
@@ -1254,7 +1307,12 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
withLog (`logDeleteQueue` queueId)
ms <- asks msgStore
atomically (deleteQueue st queueId $>>= \q -> delMsgQueue ms queueId $> Right q) >>= \case
Right q -> updateDeletedStats q $> ok
Right q -> do
-- Possibly, the same should be done if the queue is suspended, but currently we do not use it
atomically $ writeTQueue subscribedQ (queueId, clnt, False)
atomically $ writeTQueue ntfSubscribedQ (queueId, clnt, False)
updateDeletedStats q
pure ok
Left e -> pure $ err e
getQueueInfo :: QueueRec -> M (Transmission BrokerMsg)
@@ -1290,9 +1348,13 @@ updateDeletedStats :: QueueRec -> M ()
updateDeletedStats q = do
stats <- asks serverStats
let delSel = if isNothing (senderKey q) then qDeletedNew else qDeletedSecured
atomically $ modifyTVar' (delSel stats) (+ 1)
atomically $ modifyTVar' (qDeletedAll stats) (+ 1)
atomically $ modifyTVar' (qCount stats) (subtract 1)
incStat $ delSel stats
incStat $ qDeletedAll stats
incStat $ qCount stats
incStat :: MonadIO m => TVar Int -> m ()
incStat v = atomically $ modifyTVar' v (+ 1)
{-# INLINE incStat #-}
withLog :: (StoreLog 'WriteMode -> IO a) -> M ()
withLog action = do

View File

@@ -128,10 +128,12 @@ data Env = Env
proxyAgent :: ProxyAgent -- senders served on this proxy
}
type Subscribed = Bool
data Server = Server
{ subscribedQ :: TQueue (RecipientId, Client),
{ subscribedQ :: TQueue (RecipientId, Client, Subscribed),
subscribers :: TMap RecipientId Client,
ntfSubscribedQ :: TQueue (NotifierId, Client),
ntfSubscribedQ :: TQueue (NotifierId, Client, Subscribed),
notifiers :: TMap NotifierId Client,
savingLock :: Lock
}

View File

@@ -27,6 +27,7 @@ data ServerStats = ServerStats
qDeletedNew :: TVar Int,
qDeletedSecured :: TVar Int,
qSub :: TVar Int,
qSubNoMsg :: TVar Int,
qSubAuth :: TVar Int,
qSubDuplicate :: TVar Int,
qSubProhibited :: TVar Int,
@@ -35,6 +36,12 @@ data ServerStats = ServerStats
msgSentQuota :: TVar Int,
msgSentLarge :: TVar Int,
msgRecv :: TVar Int,
msgRecvGet :: TVar Int,
msgGet :: TVar Int,
msgGetNoMsg :: TVar Int,
msgGetAuth :: TVar Int,
msgGetDuplicate :: TVar Int,
msgGetProhibited :: TVar Int,
msgExpired :: TVar Int,
activeQueues :: PeriodStats RecipientId,
msgSentNtf :: TVar Int, -- sent messages with NTF flag
@@ -60,6 +67,7 @@ data ServerStatsData = ServerStatsData
_qDeletedNew :: Int,
_qDeletedSecured :: Int,
_qSub :: Int,
_qSubNoMsg :: Int,
_qSubAuth :: Int,
_qSubDuplicate :: Int,
_qSubProhibited :: Int,
@@ -68,6 +76,12 @@ data ServerStatsData = ServerStatsData
_msgSentQuota :: Int,
_msgSentLarge :: Int,
_msgRecv :: Int,
_msgRecvGet :: Int,
_msgGet :: Int,
_msgGetNoMsg :: Int,
_msgGetAuth :: Int,
_msgGetDuplicate :: Int,
_msgGetProhibited :: Int,
_msgExpired :: Int,
_activeQueues :: PeriodStatsData RecipientId,
_msgSentNtf :: Int,
@@ -95,6 +109,7 @@ newServerStats ts = do
qDeletedNew <- newTVar 0
qDeletedSecured <- newTVar 0
qSub <- newTVar 0
qSubNoMsg <- newTVar 0
qSubAuth <- newTVar 0
qSubDuplicate <- newTVar 0
qSubProhibited <- newTVar 0
@@ -103,6 +118,12 @@ newServerStats ts = do
msgSentQuota <- newTVar 0
msgSentLarge <- newTVar 0
msgRecv <- newTVar 0
msgRecvGet <- newTVar 0
msgGet <- newTVar 0
msgGetNoMsg <- newTVar 0
msgGetAuth <- newTVar 0
msgGetDuplicate <- newTVar 0
msgGetProhibited <- newTVar 0
msgExpired <- newTVar 0
activeQueues <- newPeriodStats
msgSentNtf <- newTVar 0
@@ -127,6 +148,7 @@ newServerStats ts = do
qDeletedNew,
qDeletedSecured,
qSub,
qSubNoMsg,
qSubAuth,
qSubDuplicate,
qSubProhibited,
@@ -135,6 +157,12 @@ newServerStats ts = do
msgSentQuota,
msgSentLarge,
msgRecv,
msgRecvGet,
msgGet,
msgGetNoMsg,
msgGetAuth,
msgGetDuplicate,
msgGetProhibited,
msgExpired,
activeQueues,
msgSentNtf,
@@ -161,6 +189,7 @@ getServerStatsData s = do
_qDeletedNew <- readTVar $ qDeletedNew s
_qDeletedSecured <- readTVar $ qDeletedSecured s
_qSub <- readTVar $ qSub s
_qSubNoMsg <- readTVar $ qSubNoMsg s
_qSubAuth <- readTVar $ qSubAuth s
_qSubDuplicate <- readTVar $ qSubDuplicate s
_qSubProhibited <- readTVar $ qSubProhibited s
@@ -169,6 +198,12 @@ getServerStatsData s = do
_msgSentQuota <- readTVar $ msgSentQuota s
_msgSentLarge <- readTVar $ msgSentLarge s
_msgRecv <- readTVar $ msgRecv s
_msgRecvGet <- readTVar $ msgRecvGet s
_msgGet <- readTVar $ msgGet s
_msgGetNoMsg <- readTVar $ msgGetNoMsg s
_msgGetAuth <- readTVar $ msgGetAuth s
_msgGetDuplicate <- readTVar $ msgGetDuplicate s
_msgGetProhibited <- readTVar $ msgGetProhibited s
_msgExpired <- readTVar $ msgExpired s
_activeQueues <- getPeriodStatsData $ activeQueues s
_msgSentNtf <- readTVar $ msgSentNtf s
@@ -193,6 +228,7 @@ getServerStatsData s = do
_qDeletedNew,
_qDeletedSecured,
_qSub,
_qSubNoMsg,
_qSubAuth,
_qSubDuplicate,
_qSubProhibited,
@@ -201,6 +237,12 @@ getServerStatsData s = do
_msgSentQuota,
_msgSentLarge,
_msgRecv,
_msgRecvGet,
_msgGet,
_msgGetNoMsg,
_msgGetAuth,
_msgGetDuplicate,
_msgGetProhibited,
_msgExpired,
_activeQueues,
_msgSentNtf,
@@ -227,6 +269,7 @@ setServerStats s d = do
writeTVar (qDeletedNew s) $! _qDeletedNew d
writeTVar (qDeletedSecured s) $! _qDeletedSecured d
writeTVar (qSub s) $! _qSub d
writeTVar (qSubNoMsg s) $! _qSubNoMsg d
writeTVar (qSubAuth s) $! _qSubAuth d
writeTVar (qSubDuplicate s) $! _qSubDuplicate d
writeTVar (qSubProhibited s) $! _qSubProhibited d
@@ -235,6 +278,12 @@ setServerStats s d = do
writeTVar (msgSentQuota s) $! _msgSentQuota d
writeTVar (msgSentLarge s) $! _msgSentLarge d
writeTVar (msgRecv s) $! _msgRecv d
writeTVar (msgRecvGet s) $! _msgRecvGet d
writeTVar (msgGet s) $! _msgGet d
writeTVar (msgGetNoMsg s) $! _msgGetNoMsg d
writeTVar (msgGetAuth s) $! _msgGetAuth d
writeTVar (msgGetDuplicate s) $! _msgGetDuplicate d
writeTVar (msgGetProhibited s) $! _msgGetProhibited d
writeTVar (msgExpired s) $! _msgExpired d
setPeriodStats (activeQueues s) (_activeQueues d)
writeTVar (msgSentNtf s) $! _msgSentNtf d
@@ -262,6 +311,7 @@ instance StrEncoding ServerStatsData where
"qDeletedSecured=" <> strEncode (_qDeletedSecured d),
"qCount=" <> strEncode (_qCount d),
"qSub=" <> strEncode (_qSub d),
"qSubNoMsg=" <> strEncode (_qSubNoMsg d),
"qSubAuth=" <> strEncode (_qSubAuth d),
"qSubDuplicate=" <> strEncode (_qSubDuplicate d),
"qSubProhibited=" <> strEncode (_qSubProhibited d),
@@ -270,6 +320,12 @@ instance StrEncoding ServerStatsData where
"msgSentQuota=" <> strEncode (_msgSentQuota d),
"msgSentLarge=" <> strEncode (_msgSentLarge d),
"msgRecv=" <> strEncode (_msgRecv d),
"msgRecvGet=" <> strEncode (_msgRecvGet d),
"msgGet=" <> strEncode (_msgGet d),
"msgGetNoMsg=" <> strEncode (_msgGetNoMsg d),
"msgGetAuth=" <> strEncode (_msgGetAuth d),
"msgGetDuplicate=" <> strEncode (_msgGetDuplicate d),
"msgGetProhibited=" <> strEncode (_msgGetProhibited d),
"msgExpired=" <> strEncode (_msgExpired d),
"msgSentNtf=" <> strEncode (_msgSentNtf d),
"msgRecvNtf=" <> strEncode (_msgRecvNtf d),
@@ -299,6 +355,7 @@ instance StrEncoding ServerStatsData where
<|> ((,,) <$> ("qDeletedAll=" *> strP <* A.endOfLine) <*> ("qDeletedNew=" *> strP <* A.endOfLine) <*> ("qDeletedSecured=" *> strP <* A.endOfLine))
_qCount <- opt "qCount="
_qSub <- opt "qSub="
_qSubNoMsg <- opt "qSubNoMsg="
_qSubAuth <- opt "qSubAuth="
_qSubDuplicate <- opt "qSubDuplicate="
_qSubProhibited <- opt "qSubProhibited="
@@ -307,6 +364,12 @@ instance StrEncoding ServerStatsData where
_msgSentQuota <- opt "msgSentQuota="
_msgSentLarge <- opt "msgSentLarge="
_msgRecv <- "msgRecv=" *> strP <* A.endOfLine
_msgRecvGet <- opt "msgRecvGet="
_msgGet <- opt "msgGet="
_msgGetNoMsg <- opt "msgGetNoMsg="
_msgGetAuth <- opt "msgGetAuth="
_msgGetDuplicate <- opt "msgGetDuplicate="
_msgGetProhibited <- opt "msgGetProhibited="
_msgExpired <- opt "msgExpired="
_msgSentNtf <- opt "msgSentNtf="
_msgRecvNtf <- opt "msgRecvNtf="
@@ -339,6 +402,7 @@ instance StrEncoding ServerStatsData where
_qDeletedNew,
_qDeletedSecured,
_qSub,
_qSubNoMsg,
_qSubAuth,
_qSubDuplicate,
_qSubProhibited,
@@ -347,6 +411,12 @@ instance StrEncoding ServerStatsData where
_msgSentQuota,
_msgSentLarge,
_msgRecv,
_msgRecvGet,
_msgGet,
_msgGetNoMsg,
_msgGetAuth,
_msgGetDuplicate,
_msgGetProhibited,
_msgExpired,
_msgSentNtf,
_msgRecvNtf,

View File

@@ -610,7 +610,7 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 2
logSize testStoreMsgsFile `shouldReturn` 5
logSize testServerStatsBackupFile `shouldReturn` 55
logSize testServerStatsBackupFile `shouldReturn` 62
Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats1 [rId] 5 1
@@ -628,7 +628,7 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 1
-- the last message is not removed because it was not ACK'd
logSize testStoreMsgsFile `shouldReturn` 3
logSize testServerStatsBackupFile `shouldReturn` 55
logSize testServerStatsBackupFile `shouldReturn` 62
Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats2 [rId] 5 3
@@ -647,7 +647,7 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 1
logSize testStoreMsgsFile `shouldReturn` 0
logSize testServerStatsBackupFile `shouldReturn` 55
logSize testServerStatsBackupFile `shouldReturn` 62
Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats3 [rId] 5 5