From e59a098e667d49f2c4981f856a7e50df00c7f4e2 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Thu, 18 Jul 2024 10:59:48 +0100 Subject: [PATCH] smp server: remove subscriptions from the clients when queues are deleted (#1234) * smp server: remove subscriptions from the clients when queues are deleted * remove ntf subscriptions, update stats * add GET stats --- src/Simplex/Messaging/Server.hs | 208 +++++++++++++++--------- src/Simplex/Messaging/Server/Env/STM.hs | 6 +- src/Simplex/Messaging/Server/Stats.hs | 70 ++++++++ tests/ServerTests.hs | 6 +- 4 files changed, 212 insertions(+), 78 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index d88b2349a..1d7398b43 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -158,7 +158,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do forall s. Server -> String -> - (Server -> TQueue (QueueId, Client)) -> + (Server -> TQueue (QueueId, Client, Subscribed)) -> (Server -> TMap QueueId Client) -> (Client -> TMap QueueId s) -> (s -> IO ()) -> @@ -172,14 +172,16 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do where updateSubscribers :: STM (Maybe (QueueId, Client)) updateSubscribers = do - (qId, clnt) <- readTQueue $ subQ s - let clientToBeNotified c' = - if sameClientId clnt c' - then pure Nothing - else do + (qId, clnt, subscribed) <- readTQueue $ subQ s + let updateSub + | subscribed = TM.lookupInsert qId clnt (subs s) + | otherwise = TM.lookupDelete qId (subs s) + clientToBeNotified c' + | sameClientId clnt c' = pure Nothing + | otherwise = do yes <- readTVar $ connected c' pure $ if yes then Just (qId, c') else Nothing - TM.lookupInsert qId clnt (subs s) $>>= clientToBeNotified + updateSub $>>= clientToBeNotified endPreviousSubscriptions :: (QueueId, Client) -> M (Maybe s) endPreviousSubscriptions (qId, c) = do forkClient c (label <> ".endPreviousSubscriptions") $ @@ -229,7 +231,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) - ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats + ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubNoMsg, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats let interval = 1000000 * logInterval forever $ do withFile statsFilePath AppendMode $ \h -> liftIO $ do @@ -242,6 +244,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do qDeletedNew' <- atomically $ swapTVar qDeletedNew 0 qDeletedSecured' <- atomically $ swapTVar qDeletedSecured 0 qSub' <- atomically $ swapTVar qSub 0 + qSubNoMsg' <- atomically $ swapTVar qSubNoMsg 0 qSubAuth' <- atomically $ swapTVar qSubAuth 0 qSubDuplicate' <- atomically $ swapTVar qSubDuplicate 0 qSubProhibited' <- atomically $ swapTVar qSubProhibited 0 @@ -250,6 +253,12 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do msgSentQuota' <- atomically $ swapTVar msgSentQuota 0 msgSentLarge' <- atomically $ swapTVar msgSentLarge 0 msgRecv' <- atomically $ swapTVar msgRecv 0 + msgRecvGet' <- atomically $ swapTVar msgRecvGet 0 + msgGet' <- atomically $ swapTVar msgGet 0 + msgGetNoMsg' <- atomically $ swapTVar msgGetNoMsg 0 + msgGetAuth' <- atomically $ swapTVar msgGetAuth 0 + msgGetDuplicate' <- atomically $ swapTVar msgGetDuplicate 0 + msgGetProhibited' <- atomically $ swapTVar msgGetProhibited 0 msgExpired' <- atomically $ swapTVar msgExpired 0 ps <- atomically $ periodStatCounts activeQueues ts msgSentNtf' <- atomically $ swapTVar msgSentNtf 0 @@ -302,7 +311,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do show msgSentLarge', show msgNtfs', show msgNtfNoSub', - show msgNtfLost' + show msgNtfLost', + show qSubNoMsg', + show msgRecvGet', + show msgGet', + show msgGetNoMsg', + show msgGetAuth', + show msgGetDuplicate', + show msgGetProhibited' ] ) liftIO $ threadDelay' interval @@ -394,6 +410,12 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do readTVarIO (day $ activeQueues ss) >>= \v -> B.hPutStr h $ "dayMsgQueues" <> ": " <> bshow (S.size v) <> "\n" putStat "msgSent" msgSent putStat "msgRecv" msgRecv + putStat "msgRecvGet" msgRecvGet + putStat "msgGet" msgGet + putStat "msgGetNoMsg" msgGetNoMsg + putStat "msgGetAuth" msgGetAuth + putStat "msgGetDuplicate" msgGetDuplicate + putStat "msgGetProhibited" msgGetProhibited putStat "msgSentNtf" msgSentNtf putStat "msgRecvNtf" msgRecvNtf putStat "qCount" qCount @@ -452,9 +474,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do activeClients <- readTVarIO clients hPutStrLn h $ "Clients: " <> show (IM.size activeClients) when (r == CPRAdmin) $ do - (smpSubCnt, smpClCnt) <- countClientSubs subscriptions activeClients - (ntfSubCnt, ntfClCnt) <- countClientSubs ntfSubscriptions activeClients + (smpSubCnt, smpSubCntByGroup, smpClCnt) <- countClientSubs subscriptions countSMPSubs activeClients + (ntfSubCnt, _, ntfClCnt) <- countClientSubs ntfSubscriptions (\_ -> pure (0, 0, 0, 0)) activeClients hPutStrLn h $ "SMP subscriptions (via clients, slow): " <> show smpSubCnt + hPutStrLn h $ "SMP subscriptions (by group: NoSub, SubPending, SubThread, ProhibitSub): " <> show smpSubCntByGroup hPutStrLn h $ "SMP subscribed clients (via clients, slow): " <> show smpClCnt hPutStrLn h $ "Ntf subscriptions (via clients, slow): " <> show ntfSubCnt hPutStrLn h $ "Ntf subscribed clients (via clients, slow): " <> show ntfClCnt @@ -465,14 +488,25 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do hPutStrLn h $ "Ntf subscriptions: " <> show (M.size activeNtfSubs) hPutStrLn h $ "Ntf subscribed clients: " <> show (countSubClients activeNtfSubs) where - countClientSubs :: (Client -> TMap QueueId a) -> IM.IntMap Client -> IO (Int, Int) - countClientSubs subSel = foldM addSubs (0, 0) + countClientSubs :: (Client -> TMap QueueId a) -> (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap Client -> IO (Int, (Int, Int, Int, Int), Int) + countClientSubs subSel countSubs = foldM addSubs (0, (0, 0, 0, 0), 0) where - addSubs :: (Int, Int) -> Client -> IO (Int, Int) - addSubs (subCnt, clCnt) cl = do + addSubs :: (Int, (Int, Int, Int, Int), Int) -> Client -> IO (Int, (Int, Int, Int, Int), Int) + addSubs (subCnt, (c1, c2, c3, c4), clCnt) cl = do subs <- readTVarIO $ subSel cl + (c1', c2', c3', c4') <- countSubs subs let cnt = M.size subs - pure (subCnt + cnt, clCnt + if cnt == 0 then 0 else 1) + cnts' = (c1 + c1', c2 + c2', c3 + c3', c4 + c4') + pure (subCnt + cnt, cnts', clCnt + if cnt == 0 then 0 else 1) + countSMPSubs :: M.Map QueueId Sub -> IO (Int, Int, Int, Int) + countSMPSubs = foldM countSubs (0, 0, 0, 0) + where + countSubs (c1, c2, c3, c4) Sub {subThread} = + readTVarIO subThread >>= \st -> pure $ case st of + NoSub -> (c1 + 1, c2, c3, c4) + SubPending -> (c1, c2 + 1, c3, c4) + SubThread _ -> (c1, c2, c3 + 1, c4) + ProhibitSub -> (c1, c2, c3, c4 + 1) countSubClients = S.size . M.foldr' (S.insert . clientId) S.empty CPDelete queueId' -> withUserRole $ unliftIO u $ do st <- asks queueStore @@ -531,19 +565,22 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio noSubscriptions c = atomically $ (&&) <$> TM.null (subscriptions c) <*> TM.null (ntfSubscriptions c) clientDisconnected :: Client -> M () -clientDisconnected c@Client {clientId, subscriptions, connected, sessionId, endThreads} = do +clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connected, sessionId, endThreads} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " disc" - subs <- atomically $ do + (subs, ntfSubs) <- atomically $ do writeTVar connected False - swapTVar subscriptions M.empty + (,) <$> swapTVar subscriptions M.empty <*> swapTVar ntfSubscriptions M.empty liftIO $ mapM_ cancelSub subs - srvSubs <- asks $ subscribers . server - atomically $ modifyTVar' srvSubs $ \cs -> - M.foldrWithKey (\sub _ -> M.update deleteCurrentClient sub) cs subs + Server {subscribers, notifiers} <- asks server + updateSubscribers subs subscribers + updateSubscribers ntfSubs notifiers asks clients >>= atomically . (`modifyTVar'` IM.delete clientId) tIds <- atomically $ swapTVar endThreads IM.empty liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds where + updateSubscribers subs srvSubs = do + atomically $ modifyTVar' srvSubs $ \cs -> + M.foldrWithKey (\sub _ -> M.update deleteCurrentClient sub) cs subs deleteCurrentClient :: Client -> Maybe Client deleteCurrentClient c' | sameClientId c c' = Nothing @@ -579,8 +616,9 @@ receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiv VRVerified qr -> pure $ Right (qr, (corrId, entId, cmd)) VRFailed -> do case cmd of - Cmd _ SEND {} -> atomically $ modifyTVar' (msgSentAuth stats) (+ 1) - Cmd _ SUB -> atomically $ modifyTVar' (qSubAuth stats) (+ 1) + Cmd _ SEND {} -> incStat $ msgSentAuth stats + Cmd _ SUB -> incStat $ qSubAuth stats + Cmd _ GET -> incStat $ msgGetAuth stats _ -> pure () pure $ Left (corrId, entId, ERR AUTH) write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty @@ -808,8 +846,8 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi transportErr = PROXY . BROKER . TRANSPORT mkIncProxyStats :: MonadIO m => ProxyStats -> ProxyStats -> OwnServer -> (ProxyStats -> TVar Int) -> m () mkIncProxyStats ps psOwn own sel = do - atomically $ modifyTVar' (sel ps) (+ 1) - when own $ atomically $ modifyTVar' (sel psOwn) (+ 1) + incStat $ sel ps + when own $ incStat $ sel psOwn processCommand :: (Maybe QueueRec, Transmission Cmd) -> M (Maybe (Transmission BrokerMsg)) processCommand (qr_, (corrId, queueId, cmd)) = case cmd of Cmd SProxiedClient command -> processProxiedCmd (corrId, queueId, command) @@ -878,8 +916,8 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi Right _ -> do withLog (`logCreateById` rId) stats <- asks serverStats - atomically $ modifyTVar' (qCreated stats) (+ 1) - atomically $ modifyTVar' (qCount stats) (+ 1) + incStat $ qCreated stats + incStat $ qCount stats case subMode of SMOnlyCreate -> pure () SMSubscribe -> void $ subscribeQueue qr rId @@ -901,7 +939,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi withLog $ \s -> logSecureQueue s rId sKey st <- asks queueStore stats <- asks serverStats - atomically $ modifyTVar' (qSecured stats) (+ 1) + incStat $ qSecured stats atomically $ either ERR (const OK) <$> secureQueue st rId sKey addQueueNotifier_ :: QueueStore -> NtfPublicAuthKey -> RcvNtfPublicDhKey -> M (Transmission BrokerMsg) @@ -925,7 +963,12 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi deleteQueueNotifier_ :: QueueStore -> M (Transmission BrokerMsg) deleteQueueNotifier_ st = do withLog (`logDeleteNotifier` queueId) - okResp <$> atomically (deleteQueueNotifier st queueId) + atomically (deleteQueueNotifier st queueId) >>= \case + Right () -> do + -- Possibly, the same should be done if the queue is suspended, but currently we do not use it + atomically $ writeTQueue ntfSubscribedQ (queueId, clnt, False) + pure ok + Left e -> pure $ err e suspendQueue_ :: QueueStore -> M (Transmission BrokerMsg) suspendQueue_ st = do @@ -934,60 +977,69 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi subscribeQueue :: QueueRec -> RecipientId -> M (Transmission BrokerMsg) subscribeQueue qr rId = do - stats <- asks serverStats atomically (TM.lookup rId subscriptions) >>= \case - Nothing -> do - atomically $ modifyTVar' (qSub stats) (+ 1) - newSub >>= deliver - Just s@Sub {subThread} -> + Nothing -> newSub >>= deliver True + Just s@Sub {subThread} -> do + stats <- asks serverStats readTVarIO subThread >>= \case ProhibitSub -> do -- cannot use SUB in the same connection where GET was used - atomically $ modifyTVar' (qSubProhibited stats) (+ 1) + incStat $ qSubProhibited stats pure (corrId, rId, ERR $ CMD PROHIBITED) _ -> do - atomically $ modifyTVar' (qSubDuplicate stats) (+ 1) - atomically (tryTakeTMVar $ delivered s) >> deliver s + incStat $ qSubDuplicate stats + atomically (tryTakeTMVar $ delivered s) >> deliver False s where newSub :: M Sub newSub = time "SUB newSub" . atomically $ do - writeTQueue subscribedQ (rId, clnt) + writeTQueue subscribedQ (rId, clnt, True) sub <- newSubscription NoSub TM.insert rId sub subscriptions pure sub - deliver :: Sub -> M (Transmission BrokerMsg) - deliver sub = do + deliver :: Bool -> Sub -> M (Transmission BrokerMsg) + deliver inc sub = do q <- getStoreMsgQueue "SUB" rId msg_ <- atomically $ tryPeekMsg q + when inc $ do + stats <- asks serverStats + incStat $ (if isJust msg_ then qSub else qSubNoMsg) stats deliverMessage "SUB" qr rId sub msg_ getMessage :: QueueRec -> M (Transmission BrokerMsg) getMessage qr = time "GET" $ do atomically (TM.lookup queueId subscriptions) >>= \case Nothing -> - atomically newSub >>= getMessage_ + atomically newSub >>= (`getMessage_` Nothing) Just s@Sub {subThread} -> readTVarIO subThread >>= \case ProhibitSub -> atomically (tryTakeTMVar $ delivered s) - >> getMessage_ s + >>= getMessage_ s -- cannot use GET in the same connection where there is an active subscription - _ -> pure (corrId, queueId, ERR $ CMD PROHIBITED) + _ -> do + stats <- asks serverStats + incStat $ msgGetProhibited stats + pure (corrId, queueId, ERR $ CMD PROHIBITED) where newSub :: STM Sub newSub = do s <- newSubscription ProhibitSub TM.insert queueId s subscriptions pure s - getMessage_ :: Sub -> M (Transmission BrokerMsg) - getMessage_ s = do + getMessage_ :: Sub -> Maybe MsgId -> M (Transmission BrokerMsg) + getMessage_ s delivered_ = do q <- getStoreMsgQueue "GET" queueId - atomically $ - tryPeekMsg q >>= \case - Just msg -> - let encMsg = encryptMsg qr msg - in setDelivered s msg $> (corrId, queueId, MSG encMsg) - _ -> pure (corrId, queueId, OK) + stats <- asks serverStats + (statCnt, r) <- + atomically $ + tryPeekMsg q >>= \case + Just msg -> + let encMsg = encryptMsg qr msg + cnt = if isJust delivered_ then msgGetDuplicate else msgGet + in setDelivered s msg $> (cnt, (corrId, queueId, MSG encMsg)) + _ -> pure (msgGetNoMsg, (corrId, queueId, OK)) + incStat $ statCnt stats + pure r withQueue :: (QueueRec -> M (Transmission BrokerMsg)) -> M (Transmission BrokerMsg) withQueue action = maybe (pure $ err AUTH) action qr_ @@ -995,7 +1047,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi subscribeNotifications :: M (Transmission BrokerMsg) subscribeNotifications = time "NSUB" . atomically $ do unlessM (TM.member queueId ntfSubscriptions) $ do - writeTQueue ntfSubscribedQ (queueId, clnt) + writeTQueue ntfSubscribedQ (queueId, clnt, True) TM.insert queueId () ntfSubscriptions pure ok @@ -1010,11 +1062,11 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi case st of ProhibitSub -> do deletedMsg_ <- atomically $ tryDelMsg q msgId - mapM_ updateStats deletedMsg_ + mapM_ (updateStats True) deletedMsg_ pure ok _ -> do (deletedMsg_, msg_) <- atomically $ tryDelPeekMsg q msgId - mapM_ updateStats deletedMsg_ + mapM_ (updateStats False) deletedMsg_ deliverMessage "ACK" qr queueId sub msg_ _ -> pure $ err NO_MSG where @@ -1024,29 +1076,30 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi if msgId == msgId' || B.null msgId then Just <$> readTVar subThread else putTMVar delivered msgId' $> Nothing - updateStats :: Message -> M () - updateStats = \case + updateStats :: Bool -> Message -> M () + updateStats isGet = \case MessageQuota {} -> pure () Message {msgFlags} -> do stats <- asks serverStats - atomically $ modifyTVar' (msgRecv stats) (+ 1) + incStat $ msgRecv stats + when isGet $ incStat $ msgRecvGet stats atomically $ modifyTVar' (msgCount stats) (subtract 1) atomically $ updatePeriodStats (activeQueues stats) queueId when (notification msgFlags) $ do - atomically $ modifyTVar' (msgRecvNtf stats) (+ 1) + incStat $ msgRecvNtf stats atomically $ updatePeriodStats (activeQueuesNtf stats) queueId sendMessage :: QueueRec -> MsgFlags -> MsgBody -> M (Transmission BrokerMsg) sendMessage qr msgFlags msgBody | B.length msgBody > maxMessageLength thVersion = do stats <- asks serverStats - atomically $ modifyTVar' (msgSentLarge stats) (+ 1) + incStat $ msgSentLarge stats pure $ err LARGE_MSG | otherwise = do stats <- asks serverStats case status qr of QueueOff -> do - atomically $ modifyTVar' (msgSentAuth stats) (+ 1) + incStat $ msgSentAuth stats pure $ err AUTH QueueActive -> case C.maxLenBS msgBody of @@ -1058,7 +1111,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi atomically . writeMsg q =<< mkMessage body case msg_ of Nothing -> do - atomically $ modifyTVar' (msgSentQuota stats) (+ 1) + incStat $ msgSentQuota stats pure $ err QUOTA Just (msg, wasEmpty) -> time "SEND ok" $ do when wasEmpty $ tryDeliverMessage msg @@ -1066,16 +1119,16 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi forM_ (notifier qr) $ \ntf -> do asks random >>= atomically . trySendNotification ntf msg >>= \case Nothing -> do - atomically $ modifyTVar' (msgNtfNoSub stats) (+ 1) + incStat $ msgNtfNoSub stats logWarn "No notification subscription" Just False -> do - atomically $ modifyTVar' (msgNtfLost stats) (+ 1) + incStat $ msgNtfLost stats logWarn "Dropped message notification" - Just True -> atomically $ modifyTVar' (msgNtfs stats) (+ 1) - atomically $ modifyTVar' (msgSentNtf stats) (+ 1) + Just True -> incStat $ msgNtfs stats + incStat $ msgSentNtf stats atomically $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr) - atomically $ modifyTVar' (msgSent stats) (+ 1) - atomically $ modifyTVar' (msgCount stats) (+ 1) + incStat $ msgSent stats + incStat $ msgCount stats atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) pure ok where @@ -1197,7 +1250,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi let fr = FwdResponse {fwdCorrId, fwdResponse = r2} r3 = EncFwdResponse $ C.cbEncryptNoPad sessSecret (C.reverseNonce proxyNonce) (smpEncode fr) stats <- asks serverStats - atomically $ modifyTVar' (pMsgFwdsRecv stats) (+ 1) + incStat $ pMsgFwdsRecv stats pure $ RRES r3 where rejectOrVerify :: Maybe (THandleAuth 'TServer) -> SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd)) @@ -1254,7 +1307,12 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi withLog (`logDeleteQueue` queueId) ms <- asks msgStore atomically (deleteQueue st queueId $>>= \q -> delMsgQueue ms queueId $> Right q) >>= \case - Right q -> updateDeletedStats q $> ok + Right q -> do + -- Possibly, the same should be done if the queue is suspended, but currently we do not use it + atomically $ writeTQueue subscribedQ (queueId, clnt, False) + atomically $ writeTQueue ntfSubscribedQ (queueId, clnt, False) + updateDeletedStats q + pure ok Left e -> pure $ err e getQueueInfo :: QueueRec -> M (Transmission BrokerMsg) @@ -1290,9 +1348,13 @@ updateDeletedStats :: QueueRec -> M () updateDeletedStats q = do stats <- asks serverStats let delSel = if isNothing (senderKey q) then qDeletedNew else qDeletedSecured - atomically $ modifyTVar' (delSel stats) (+ 1) - atomically $ modifyTVar' (qDeletedAll stats) (+ 1) - atomically $ modifyTVar' (qCount stats) (subtract 1) + incStat $ delSel stats + incStat $ qDeletedAll stats + incStat $ qCount stats + +incStat :: MonadIO m => TVar Int -> m () +incStat v = atomically $ modifyTVar' v (+ 1) +{-# INLINE incStat #-} withLog :: (StoreLog 'WriteMode -> IO a) -> M () withLog action = do diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index b40e9fc16..dc77b7481 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -128,10 +128,12 @@ data Env = Env proxyAgent :: ProxyAgent -- senders served on this proxy } +type Subscribed = Bool + data Server = Server - { subscribedQ :: TQueue (RecipientId, Client), + { subscribedQ :: TQueue (RecipientId, Client, Subscribed), subscribers :: TMap RecipientId Client, - ntfSubscribedQ :: TQueue (NotifierId, Client), + ntfSubscribedQ :: TQueue (NotifierId, Client, Subscribed), notifiers :: TMap NotifierId Client, savingLock :: Lock } diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index f2716c9c3..2f4ed0613 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -27,6 +27,7 @@ data ServerStats = ServerStats qDeletedNew :: TVar Int, qDeletedSecured :: TVar Int, qSub :: TVar Int, + qSubNoMsg :: TVar Int, qSubAuth :: TVar Int, qSubDuplicate :: TVar Int, qSubProhibited :: TVar Int, @@ -35,6 +36,12 @@ data ServerStats = ServerStats msgSentQuota :: TVar Int, msgSentLarge :: TVar Int, msgRecv :: TVar Int, + msgRecvGet :: TVar Int, + msgGet :: TVar Int, + msgGetNoMsg :: TVar Int, + msgGetAuth :: TVar Int, + msgGetDuplicate :: TVar Int, + msgGetProhibited :: TVar Int, msgExpired :: TVar Int, activeQueues :: PeriodStats RecipientId, msgSentNtf :: TVar Int, -- sent messages with NTF flag @@ -60,6 +67,7 @@ data ServerStatsData = ServerStatsData _qDeletedNew :: Int, _qDeletedSecured :: Int, _qSub :: Int, + _qSubNoMsg :: Int, _qSubAuth :: Int, _qSubDuplicate :: Int, _qSubProhibited :: Int, @@ -68,6 +76,12 @@ data ServerStatsData = ServerStatsData _msgSentQuota :: Int, _msgSentLarge :: Int, _msgRecv :: Int, + _msgRecvGet :: Int, + _msgGet :: Int, + _msgGetNoMsg :: Int, + _msgGetAuth :: Int, + _msgGetDuplicate :: Int, + _msgGetProhibited :: Int, _msgExpired :: Int, _activeQueues :: PeriodStatsData RecipientId, _msgSentNtf :: Int, @@ -95,6 +109,7 @@ newServerStats ts = do qDeletedNew <- newTVar 0 qDeletedSecured <- newTVar 0 qSub <- newTVar 0 + qSubNoMsg <- newTVar 0 qSubAuth <- newTVar 0 qSubDuplicate <- newTVar 0 qSubProhibited <- newTVar 0 @@ -103,6 +118,12 @@ newServerStats ts = do msgSentQuota <- newTVar 0 msgSentLarge <- newTVar 0 msgRecv <- newTVar 0 + msgRecvGet <- newTVar 0 + msgGet <- newTVar 0 + msgGetNoMsg <- newTVar 0 + msgGetAuth <- newTVar 0 + msgGetDuplicate <- newTVar 0 + msgGetProhibited <- newTVar 0 msgExpired <- newTVar 0 activeQueues <- newPeriodStats msgSentNtf <- newTVar 0 @@ -127,6 +148,7 @@ newServerStats ts = do qDeletedNew, qDeletedSecured, qSub, + qSubNoMsg, qSubAuth, qSubDuplicate, qSubProhibited, @@ -135,6 +157,12 @@ newServerStats ts = do msgSentQuota, msgSentLarge, msgRecv, + msgRecvGet, + msgGet, + msgGetNoMsg, + msgGetAuth, + msgGetDuplicate, + msgGetProhibited, msgExpired, activeQueues, msgSentNtf, @@ -161,6 +189,7 @@ getServerStatsData s = do _qDeletedNew <- readTVar $ qDeletedNew s _qDeletedSecured <- readTVar $ qDeletedSecured s _qSub <- readTVar $ qSub s + _qSubNoMsg <- readTVar $ qSubNoMsg s _qSubAuth <- readTVar $ qSubAuth s _qSubDuplicate <- readTVar $ qSubDuplicate s _qSubProhibited <- readTVar $ qSubProhibited s @@ -169,6 +198,12 @@ getServerStatsData s = do _msgSentQuota <- readTVar $ msgSentQuota s _msgSentLarge <- readTVar $ msgSentLarge s _msgRecv <- readTVar $ msgRecv s + _msgRecvGet <- readTVar $ msgRecvGet s + _msgGet <- readTVar $ msgGet s + _msgGetNoMsg <- readTVar $ msgGetNoMsg s + _msgGetAuth <- readTVar $ msgGetAuth s + _msgGetDuplicate <- readTVar $ msgGetDuplicate s + _msgGetProhibited <- readTVar $ msgGetProhibited s _msgExpired <- readTVar $ msgExpired s _activeQueues <- getPeriodStatsData $ activeQueues s _msgSentNtf <- readTVar $ msgSentNtf s @@ -193,6 +228,7 @@ getServerStatsData s = do _qDeletedNew, _qDeletedSecured, _qSub, + _qSubNoMsg, _qSubAuth, _qSubDuplicate, _qSubProhibited, @@ -201,6 +237,12 @@ getServerStatsData s = do _msgSentQuota, _msgSentLarge, _msgRecv, + _msgRecvGet, + _msgGet, + _msgGetNoMsg, + _msgGetAuth, + _msgGetDuplicate, + _msgGetProhibited, _msgExpired, _activeQueues, _msgSentNtf, @@ -227,6 +269,7 @@ setServerStats s d = do writeTVar (qDeletedNew s) $! _qDeletedNew d writeTVar (qDeletedSecured s) $! _qDeletedSecured d writeTVar (qSub s) $! _qSub d + writeTVar (qSubNoMsg s) $! _qSubNoMsg d writeTVar (qSubAuth s) $! _qSubAuth d writeTVar (qSubDuplicate s) $! _qSubDuplicate d writeTVar (qSubProhibited s) $! _qSubProhibited d @@ -235,6 +278,12 @@ setServerStats s d = do writeTVar (msgSentQuota s) $! _msgSentQuota d writeTVar (msgSentLarge s) $! _msgSentLarge d writeTVar (msgRecv s) $! _msgRecv d + writeTVar (msgRecvGet s) $! _msgRecvGet d + writeTVar (msgGet s) $! _msgGet d + writeTVar (msgGetNoMsg s) $! _msgGetNoMsg d + writeTVar (msgGetAuth s) $! _msgGetAuth d + writeTVar (msgGetDuplicate s) $! _msgGetDuplicate d + writeTVar (msgGetProhibited s) $! _msgGetProhibited d writeTVar (msgExpired s) $! _msgExpired d setPeriodStats (activeQueues s) (_activeQueues d) writeTVar (msgSentNtf s) $! _msgSentNtf d @@ -262,6 +311,7 @@ instance StrEncoding ServerStatsData where "qDeletedSecured=" <> strEncode (_qDeletedSecured d), "qCount=" <> strEncode (_qCount d), "qSub=" <> strEncode (_qSub d), + "qSubNoMsg=" <> strEncode (_qSubNoMsg d), "qSubAuth=" <> strEncode (_qSubAuth d), "qSubDuplicate=" <> strEncode (_qSubDuplicate d), "qSubProhibited=" <> strEncode (_qSubProhibited d), @@ -270,6 +320,12 @@ instance StrEncoding ServerStatsData where "msgSentQuota=" <> strEncode (_msgSentQuota d), "msgSentLarge=" <> strEncode (_msgSentLarge d), "msgRecv=" <> strEncode (_msgRecv d), + "msgRecvGet=" <> strEncode (_msgRecvGet d), + "msgGet=" <> strEncode (_msgGet d), + "msgGetNoMsg=" <> strEncode (_msgGetNoMsg d), + "msgGetAuth=" <> strEncode (_msgGetAuth d), + "msgGetDuplicate=" <> strEncode (_msgGetDuplicate d), + "msgGetProhibited=" <> strEncode (_msgGetProhibited d), "msgExpired=" <> strEncode (_msgExpired d), "msgSentNtf=" <> strEncode (_msgSentNtf d), "msgRecvNtf=" <> strEncode (_msgRecvNtf d), @@ -299,6 +355,7 @@ instance StrEncoding ServerStatsData where <|> ((,,) <$> ("qDeletedAll=" *> strP <* A.endOfLine) <*> ("qDeletedNew=" *> strP <* A.endOfLine) <*> ("qDeletedSecured=" *> strP <* A.endOfLine)) _qCount <- opt "qCount=" _qSub <- opt "qSub=" + _qSubNoMsg <- opt "qSubNoMsg=" _qSubAuth <- opt "qSubAuth=" _qSubDuplicate <- opt "qSubDuplicate=" _qSubProhibited <- opt "qSubProhibited=" @@ -307,6 +364,12 @@ instance StrEncoding ServerStatsData where _msgSentQuota <- opt "msgSentQuota=" _msgSentLarge <- opt "msgSentLarge=" _msgRecv <- "msgRecv=" *> strP <* A.endOfLine + _msgRecvGet <- opt "msgRecvGet=" + _msgGet <- opt "msgGet=" + _msgGetNoMsg <- opt "msgGetNoMsg=" + _msgGetAuth <- opt "msgGetAuth=" + _msgGetDuplicate <- opt "msgGetDuplicate=" + _msgGetProhibited <- opt "msgGetProhibited=" _msgExpired <- opt "msgExpired=" _msgSentNtf <- opt "msgSentNtf=" _msgRecvNtf <- opt "msgRecvNtf=" @@ -339,6 +402,7 @@ instance StrEncoding ServerStatsData where _qDeletedNew, _qDeletedSecured, _qSub, + _qSubNoMsg, _qSubAuth, _qSubDuplicate, _qSubProhibited, @@ -347,6 +411,12 @@ instance StrEncoding ServerStatsData where _msgSentQuota, _msgSentLarge, _msgRecv, + _msgRecvGet, + _msgGet, + _msgGetNoMsg, + _msgGetAuth, + _msgGetDuplicate, + _msgGetProhibited, _msgExpired, _msgSentNtf, _msgRecvNtf, diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 10516b9f2..9534b7902 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -610,7 +610,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 2 logSize testStoreMsgsFile `shouldReturn` 5 - logSize testServerStatsBackupFile `shouldReturn` 55 + logSize testServerStatsBackupFile `shouldReturn` 62 Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats1 [rId] 5 1 @@ -628,7 +628,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 -- the last message is not removed because it was not ACK'd logSize testStoreMsgsFile `shouldReturn` 3 - logSize testServerStatsBackupFile `shouldReturn` 55 + logSize testServerStatsBackupFile `shouldReturn` 62 Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats2 [rId] 5 3 @@ -647,7 +647,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 logSize testStoreMsgsFile `shouldReturn` 0 - logSize testServerStatsBackupFile `shouldReturn` 55 + logSize testServerStatsBackupFile `shouldReturn` 62 Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats3 [rId] 5 5