From f871f20172f07f563d8f4eb13a3ac49663f5930d Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 7 Oct 2024 09:01:28 +0100 Subject: [PATCH] smp server: fix notification delivery (#1350) * .401 * stats for undelivered notifications * logs, stats * control port show ntf client IDs * check that Ntf client is still current and that queue is not full, drop notifications otherwise * prevent losing notifications when client is not current or queue full * add log when no notifications, remove some logs * reduce STM transaction * revert version change --- src/Simplex/Messaging/Server.hs | 107 ++++++++++++++++------- src/Simplex/Messaging/Server/NtfStore.hs | 14 +-- src/Simplex/Messaging/Server/Stats.hs | 18 ++-- tests/CoreTests/StoreLogTests.hs | 2 +- 4 files changed, 91 insertions(+), 50 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 014e78e8e..8cdf55e8b 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -38,6 +38,7 @@ module Simplex.Messaging.Server ) where +import Control.Concurrent.STM (throwSTM) import Control.Concurrent.STM.TQueue (flushTQueue) import Control.Logger.Simple import Control.Monad @@ -237,27 +238,46 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT deliverNtfsThread :: Server -> M () deliverNtfsThread Server {ntfSubClients} = do ntfInt <- asks $ ntfDeliveryInterval . config - ns <- asks ntfStore + NtfStore ns <- asks ntfStore stats <- asks serverStats liftIO $ forever $ do threadDelay ntfInt readTVarIO ntfSubClients >>= mapM_ (deliverNtfs ns stats) where - deliverNtfs ns stats Client {clientId, ntfSubscriptions, sndQ, connected} = whenM currentClient $ - readTVarIO ntfSubscriptions >>= \subs -> do - ts_ <- foldM addNtfs [] (M.keys subs) - mapM_ (atomically . writeTBQueue sndQ) $ L.nonEmpty ts_ - updateNtfStats $ length ts_ + deliverNtfs ns stats Client {clientId, ntfSubscriptions, sndQ, connected} = + whenM (currentClient readTVarIO) $ do + subs <- readTVarIO ntfSubscriptions + logDebug $ "NOTIFICATIONS: client #" <> tshow clientId <> " is current with " <> tshow (M.size subs) <> " subs" + ntfQs <- M.assocs . M.filterWithKey (\nId _ -> M.member nId subs) <$> readTVarIO ns + tryAny (atomically $ flushSubscribedNtfs ntfQs) >>= \case + Right len -> updateNtfStats len + Left e -> logDebug $ "NOTIFICATIONS: cancelled for client #" <> tshow clientId <> ", reason: " <> tshow e where - currentClient = (&&) <$> readTVarIO connected <*> (IM.member clientId <$> readTVarIO ntfSubClients) - addNtfs :: [Transmission BrokerMsg] -> NotifierId -> IO [Transmission BrokerMsg] - addNtfs acc nId = - (foldl' (\acc' ntf -> nmsg nId ntf : acc') acc) -- reverses, to order by time - <$> flushNtfs ns nId + flushSubscribedNtfs :: [(NotifierId, TVar [MsgNtf])] -> STM Int + flushSubscribedNtfs ntfQs = do + ts_ <- foldM addNtfs [] ntfQs + forM_ (L.nonEmpty ts_) $ \ts -> do + let cancelNtfs s = throwSTM $ userError $ s <> ", " <> show (length ts_) <> " ntfs kept" + unlessM (currentClient readTVar) $ cancelNtfs "not current client" + whenM (isFullTBQueue sndQ) $ cancelNtfs "sending queue full" + writeTBQueue sndQ ts + pure $ length ts_ + currentClient :: Monad m => (forall a. TVar a -> m a) -> m Bool + currentClient rd = (&&) <$> rd connected <*> (IM.member clientId <$> rd ntfSubClients) + addNtfs :: [Transmission BrokerMsg] -> (NotifierId, TVar [MsgNtf]) -> STM [Transmission BrokerMsg] + addNtfs acc (nId, v) = + readTVar v >>= \case + [] -> pure acc + ntfs -> do + writeTVar v [] + pure $ foldl' (\acc' ntf -> nmsg nId ntf : acc') acc ntfs -- reverses, to order by time nmsg nId MsgNtf {ntfNonce, ntfEncMeta} = (CorrId "", nId, NMSG ntfNonce ntfEncMeta) - updateNtfStats len = when (len > 0) $ liftIO $ do + updateNtfStats 0 = logDebug $ "NOTIFICATIONS: no ntfs for client #" <> tshow clientId + updateNtfStats len = liftIO $ do + atomicModifyIORef'_ (ntfCount stats) (subtract len) atomicModifyIORef'_ (msgNtfs stats) (+ len) atomicModifyIORef'_ (msgNtfsB stats) (+ (len `div` 80 + 1)) -- up to 80 NMSG in the batch + logDebug $ "NOTIFICATIONS: delivered to client #" <> tshow clientId <> " " <> tshow len <> " ntfs" sendPendingEvtsThread :: Server -> M () sendPendingEvtsThread s = do @@ -334,7 +354,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT threadDelay' interval old <- expireBeforeEpoch expCfg expired <- deleteExpiredNtfs ns old - when (expired > 0) $ atomicModifyIORef'_ (msgNtfExpired stats) (+ expired) + when (expired > 0) $ do + atomicModifyIORef'_ (msgNtfExpired stats) (+ expired) + atomicModifyIORef'_ (ntfCount stats) (subtract expired) serverStatsThread_ :: ServerConfig -> [M ()] serverStatsThread_ ServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} = @@ -347,7 +369,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) - ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} + ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, ntfCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats QueueStore {queues, notifiers} <- asks queueStore let interval = 1000000 * logInterval @@ -404,8 +426,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT pMsgFwdsRecv' <- atomicSwapIORef pMsgFwdsRecv 0 qCount' <- readIORef qCount qCount'' <- M.size <$> readTVarIO queues - ntfCount' <- M.size <$> readTVarIO notifiers + notifierCount' <- M.size <$> readTVarIO notifiers msgCount' <- readIORef msgCount + ntfCount' <- readIORef ntfCount hPutStrLn h $ intercalate "," @@ -462,7 +485,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT show ntfSub', show ntfSubAuth', show ntfSubDuplicate', - show ntfCount', + show notifierCount', show qDeletedAllB', show qSubAllB', show qSubEnd', @@ -470,7 +493,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT show ntfDeletedB', show ntfSubB', show msgNtfsB', - show msgNtfExpired' + show msgNtfExpired', + show ntfCount' ] ) liftIO $ threadDelay' interval @@ -547,6 +571,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions'] CPStats -> withUserRole $ do ss <- unliftIO u $ asks serverStats + QueueStore {queues, notifiers} <- unliftIO u $ asks queueStore let getStat :: (ServerStats -> IORef a) -> IO a getStat var = readIORef (var ss) putStat :: Show a => String -> (ServerStats -> IORef a) -> IO () @@ -584,7 +609,18 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT putStat "msgNtfsB" msgNtfsB putStat "msgNtfExpired" msgNtfExpired putStat "qCount" qCount + qCount2 <- M.size <$> readTVarIO queues + hPutStrLn h $ "qCount 2: " <> show qCount2 + notifierCount <- M.size <$> readTVarIO notifiers + hPutStrLn h $ "notifiers: " <> show notifierCount putStat "msgCount" msgCount + putStat "ntfCount" ntfCount + readTVarIO role >>= \case + CPRAdmin -> do + NtfStore ns <- unliftIO u $ asks ntfStore + ntfCount2 <- liftIO . foldM (\(!n) q -> (n +) . length <$> readTVarIO q) 0 =<< readTVarIO ns + hPutStrLn h $ "ntfCount 2: " <> show ntfCount2 + _ -> pure () putProxyStat "pRelays" pRelays putProxyStat "pRelaysOwn" pRelaysOwn putProxyStat "pMsgFwds" pMsgFwds @@ -650,24 +686,24 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT hPutStrLn h $ "Ntf subscriptions (via clients): " <> show ntfSubCnt hPutStrLn h $ "Ntf subscribed clients (via clients): " <> show ntfClCnt hPutStrLn h $ "Ntf subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show ntfClQs - putActiveClientsInfo "SMP" subscribers - putActiveClientsInfo "Ntf" notifiers - putSubscribedClients "SMP" subClients - putSubscribedClients "Ntf" ntfSubClients + putActiveClientsInfo "SMP" subscribers False + putActiveClientsInfo "Ntf" notifiers True + putSubscribedClients "SMP" subClients False + putSubscribedClients "Ntf" ntfSubClients True where - putActiveClientsInfo :: String -> TMap QueueId (TVar Client) -> IO () - putActiveClientsInfo protoName clients = do + putActiveClientsInfo :: String -> TMap QueueId (TVar Client) -> Bool -> IO () + putActiveClientsInfo protoName clients showIds = do activeSubs <- readTVarIO clients hPutStrLn h $ protoName <> " subscriptions: " <> show (M.size activeSubs) - clCnt <- IS.size <$> countSubClients activeSubs - hPutStrLn h $ protoName <> " subscribed clients: " <> show clCnt + clnts <- countSubClients activeSubs + hPutStrLn h $ protoName <> " subscribed clients: " <> show (IS.size clnts) <> (if showIds then " " <> show (IS.toList clnts) else "") where countSubClients :: M.Map QueueId (TVar Client) -> IO IS.IntSet countSubClients = foldM (\ !s c -> (`IS.insert` s) . clientId <$> readTVarIO c) IS.empty - putSubscribedClients :: String -> TVar (IM.IntMap Client) -> IO () - putSubscribedClients protoName subClnts = do + putSubscribedClients :: String -> TVar (IM.IntMap Client) -> Bool -> IO () + putSubscribedClients protoName subClnts showIds = do clnts <- readTVarIO subClnts - hPutStrLn h $ protoName <> " subscribed clients count:" <> show (IM.size clnts) + hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IM.size clnts) <> (if showIds then " " <> show (IM.keys clnts) else "") countClientSubs :: (Client -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap (Maybe Client) -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) countClientSubs subSel countSubs_ = foldM addSubs (0, (0, 0, 0, 0), 0, (0, 0, 0)) where @@ -1184,9 +1220,11 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s liftIO (deleteQueueNotifier st entId) >>= \case Right (Just nId) -> do -- Possibly, the same should be done if the queue is suspended, but currently we do not use it - asks ntfStore >>= liftIO . (`deleteNtfs` nId) + stats <- asks serverStats + deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId) + when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted) atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False) - incStat . ntfDeleted =<< asks serverStats + incStat $ ntfDeleted stats pure ok Right Nothing -> pure ok Left e -> pure $ err e @@ -1459,6 +1497,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s ns <- asks ntfStore ntf <- mkMessageNotification msgId msgTs rcvNtfDhSecret liftIO $ storeNtf ns nId ntf + incStat . ntfCount =<< asks serverStats mkMessageNotification :: ByteString -> SystemTime -> RcvNtfDhSecret -> M MsgNtf mkMessageNotification msgId msgTs rcvNtfDhSecret = do @@ -1569,7 +1608,9 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s forM_ (notifierId <$> notifier q) $ \nId -> do -- queue is deleted by a different client from the one subscribed to notifications, -- so we don't need to remove subscription from the current client. - asks ntfStore >>= liftIO . (`deleteNtfs` nId) + stats <- asks serverStats + deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId) + when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted) atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False) updateDeletedStats q pure ok @@ -1760,7 +1801,9 @@ restoreServerStats expiredMsgs expiredNtfs = asks (serverStatsBackupFile . confi s <- asks serverStats _qCount <- fmap M.size . readTVarIO . queues =<< asks queueStore _msgCount <- liftIO . foldM (\(!n) q -> (n +) <$> getQueueSize q) 0 =<< readTVarIO =<< asks msgStore - liftIO $ setServerStats s d {_qCount, _msgCount, _msgExpired = _msgExpired d + expiredMsgs, _msgNtfExpired = _msgNtfExpired d + expiredNtfs} + NtfStore ns <- asks ntfStore + _ntfCount <- liftIO . foldM (\(!n) q -> (n +) . length <$> readTVarIO q) 0 =<< readTVarIO ns + liftIO $ setServerStats s d {_qCount, _msgCount, _ntfCount, _msgExpired = _msgExpired d + expiredMsgs, _msgNtfExpired = _msgNtfExpired d + expiredNtfs} renameFile f $ f <> ".bak" logInfo "server stats restored" when (_qCount /= statsQCount) $ logWarn $ "Queue count differs: stats: " <> tshow statsQCount <> ", store: " <> tshow _qCount diff --git a/src/Simplex/Messaging/Server/NtfStore.hs b/src/Simplex/Messaging/Server/NtfStore.hs index ded2bd85d..7895f64e9 100644 --- a/src/Simplex/Messaging/Server/NtfStore.hs +++ b/src/Simplex/Messaging/Server/NtfStore.hs @@ -34,18 +34,8 @@ storeNtf (NtfStore ns) nId ntf = do where newNtfs = TM.lookup nId ns >>= maybe (TM.insertM nId (newTVar [ntf]) ns) (`modifyTVar'` (ntf :)) -deleteNtfs :: NtfStore -> NotifierId -> IO () -deleteNtfs (NtfStore ns) nId = atomically $ TM.delete nId ns - -flushNtfs :: NtfStore -> NotifierId -> IO [MsgNtf] -flushNtfs (NtfStore ns) nId = do - TM.lookupIO nId ns >>= maybe (pure []) swapNtfs - where - swapNtfs v = - readTVarIO v >>= \case - [] -> pure [] - -- if notifications available, atomically swap with empty array - _ -> atomically (swapTVar v []) +deleteNtfs :: NtfStore -> NotifierId -> IO Int +deleteNtfs (NtfStore ns) nId = atomically (TM.lookupDelete nId ns) >>= maybe (pure 0) (fmap length . readTVarIO) deleteExpiredNtfs :: NtfStore -> Int64 -> IO Int deleteExpiredNtfs (NtfStore ns) old = diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 49078d2d9..385ba119b 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -77,7 +77,8 @@ data ServerStats = ServerStats pMsgFwdsOwn :: ProxyStats, pMsgFwdsRecv :: IORef Int, qCount :: IORef Int, - msgCount :: IORef Int + msgCount :: IORef Int, + ntfCount :: IORef Int } data ServerStatsData = ServerStatsData @@ -129,7 +130,8 @@ data ServerStatsData = ServerStatsData _pMsgFwdsOwn :: ProxyStatsData, _pMsgFwdsRecv :: Int, _qCount :: Int, - _msgCount :: Int + _msgCount :: Int, + _ntfCount :: Int } deriving (Show) @@ -184,6 +186,7 @@ newServerStats ts = do pMsgFwdsRecv <- newIORef 0 qCount <- newIORef 0 msgCount <- newIORef 0 + ntfCount <- newIORef 0 pure ServerStats { fromTime, @@ -234,7 +237,8 @@ newServerStats ts = do pMsgFwdsOwn, pMsgFwdsRecv, qCount, - msgCount + msgCount, + ntfCount } getServerStatsData :: ServerStats -> IO ServerStatsData @@ -288,6 +292,7 @@ getServerStatsData s = do _pMsgFwdsRecv <- readIORef $ pMsgFwdsRecv s _qCount <- readIORef $ qCount s _msgCount <- readIORef $ msgCount s + _ntfCount <- readIORef $ ntfCount s pure ServerStatsData { _fromTime, @@ -338,7 +343,8 @@ getServerStatsData s = do _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, - _msgCount + _msgCount, + _ntfCount } -- this function is not thread safe, it is used on server start only @@ -393,6 +399,7 @@ setServerStats s d = do writeIORef (pMsgFwdsRecv s) $! _pMsgFwdsRecv d writeIORef (qCount s) $! _qCount d writeIORef (msgCount s) $! _msgCount d + writeIORef (ntfCount s) $! _ntfCount d instance StrEncoding ServerStatsData where strEncode d = @@ -566,7 +573,8 @@ instance StrEncoding ServerStatsData where _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, - _msgCount = 0 + _msgCount = 0, + _ntfCount = 0 } where opt s = A.string s *> strP <* A.endOfLine <|> pure 0 diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index 33d2c5a4a..d2462a09e 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -109,7 +109,7 @@ storeLogTests = testSMPStoreLog :: String -> [SMPStoreLogTestCase] -> Spec testSMPStoreLog testSuite tests = - fdescribe testSuite $ forM_ tests $ \t@SLTC {name, saved} -> it name $ do + describe testSuite $ forM_ tests $ \t@SLTC {name, saved} -> it name $ do l <- openWriteStoreLog testStoreLogFile mapM_ (writeStoreLogRecord l) saved closeStoreLog l