smp server: fix notification delivery (#1350)

* .401

* stats for undelivered notifications

* logs, stats

* control port show ntf client IDs

* check that Ntf client is still current and that queue is not full, drop notifications otherwise

* prevent losing notifications when client is not current or queue full

* add log when no notifications, remove some logs

* reduce STM transaction

* revert version change
This commit is contained in:
Evgeny
2024-10-07 09:01:28 +01:00
committed by GitHub
parent 80d3518d55
commit f871f20172
4 changed files with 91 additions and 50 deletions
+75 -32
View File
@@ -38,6 +38,7 @@ module Simplex.Messaging.Server
)
where
import Control.Concurrent.STM (throwSTM)
import Control.Concurrent.STM.TQueue (flushTQueue)
import Control.Logger.Simple
import Control.Monad
@@ -237,27 +238,46 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
deliverNtfsThread :: Server -> M ()
deliverNtfsThread Server {ntfSubClients} = do
ntfInt <- asks $ ntfDeliveryInterval . config
ns <- asks ntfStore
NtfStore ns <- asks ntfStore
stats <- asks serverStats
liftIO $ forever $ do
threadDelay ntfInt
readTVarIO ntfSubClients >>= mapM_ (deliverNtfs ns stats)
where
deliverNtfs ns stats Client {clientId, ntfSubscriptions, sndQ, connected} = whenM currentClient $
readTVarIO ntfSubscriptions >>= \subs -> do
ts_ <- foldM addNtfs [] (M.keys subs)
mapM_ (atomically . writeTBQueue sndQ) $ L.nonEmpty ts_
updateNtfStats $ length ts_
deliverNtfs ns stats Client {clientId, ntfSubscriptions, sndQ, connected} =
whenM (currentClient readTVarIO) $ do
subs <- readTVarIO ntfSubscriptions
logDebug $ "NOTIFICATIONS: client #" <> tshow clientId <> " is current with " <> tshow (M.size subs) <> " subs"
ntfQs <- M.assocs . M.filterWithKey (\nId _ -> M.member nId subs) <$> readTVarIO ns
tryAny (atomically $ flushSubscribedNtfs ntfQs) >>= \case
Right len -> updateNtfStats len
Left e -> logDebug $ "NOTIFICATIONS: cancelled for client #" <> tshow clientId <> ", reason: " <> tshow e
where
currentClient = (&&) <$> readTVarIO connected <*> (IM.member clientId <$> readTVarIO ntfSubClients)
addNtfs :: [Transmission BrokerMsg] -> NotifierId -> IO [Transmission BrokerMsg]
addNtfs acc nId =
(foldl' (\acc' ntf -> nmsg nId ntf : acc') acc) -- reverses, to order by time
<$> flushNtfs ns nId
flushSubscribedNtfs :: [(NotifierId, TVar [MsgNtf])] -> STM Int
flushSubscribedNtfs ntfQs = do
ts_ <- foldM addNtfs [] ntfQs
forM_ (L.nonEmpty ts_) $ \ts -> do
let cancelNtfs s = throwSTM $ userError $ s <> ", " <> show (length ts_) <> " ntfs kept"
unlessM (currentClient readTVar) $ cancelNtfs "not current client"
whenM (isFullTBQueue sndQ) $ cancelNtfs "sending queue full"
writeTBQueue sndQ ts
pure $ length ts_
currentClient :: Monad m => (forall a. TVar a -> m a) -> m Bool
currentClient rd = (&&) <$> rd connected <*> (IM.member clientId <$> rd ntfSubClients)
addNtfs :: [Transmission BrokerMsg] -> (NotifierId, TVar [MsgNtf]) -> STM [Transmission BrokerMsg]
addNtfs acc (nId, v) =
readTVar v >>= \case
[] -> pure acc
ntfs -> do
writeTVar v []
pure $ foldl' (\acc' ntf -> nmsg nId ntf : acc') acc ntfs -- reverses, to order by time
nmsg nId MsgNtf {ntfNonce, ntfEncMeta} = (CorrId "", nId, NMSG ntfNonce ntfEncMeta)
updateNtfStats len = when (len > 0) $ liftIO $ do
updateNtfStats 0 = logDebug $ "NOTIFICATIONS: no ntfs for client #" <> tshow clientId
updateNtfStats len = liftIO $ do
atomicModifyIORef'_ (ntfCount stats) (subtract len)
atomicModifyIORef'_ (msgNtfs stats) (+ len)
atomicModifyIORef'_ (msgNtfsB stats) (+ (len `div` 80 + 1)) -- up to 80 NMSG in the batch
logDebug $ "NOTIFICATIONS: delivered to client #" <> tshow clientId <> " " <> tshow len <> " ntfs"
sendPendingEvtsThread :: Server -> M ()
sendPendingEvtsThread s = do
@@ -334,7 +354,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
threadDelay' interval
old <- expireBeforeEpoch expCfg
expired <- deleteExpiredNtfs ns old
when (expired > 0) $ atomicModifyIORef'_ (msgNtfExpired stats) (+ expired)
when (expired > 0) $ do
atomicModifyIORef'_ (msgNtfExpired stats) (+ expired)
atomicModifyIORef'_ (ntfCount stats) (subtract expired)
serverStatsThread_ :: ServerConfig -> [M ()]
serverStatsThread_ ServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} =
@@ -347,7 +369,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv}
ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, ntfCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv}
<- asks serverStats
QueueStore {queues, notifiers} <- asks queueStore
let interval = 1000000 * logInterval
@@ -404,8 +426,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
pMsgFwdsRecv' <- atomicSwapIORef pMsgFwdsRecv 0
qCount' <- readIORef qCount
qCount'' <- M.size <$> readTVarIO queues
ntfCount' <- M.size <$> readTVarIO notifiers
notifierCount' <- M.size <$> readTVarIO notifiers
msgCount' <- readIORef msgCount
ntfCount' <- readIORef ntfCount
hPutStrLn h $
intercalate
","
@@ -462,7 +485,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
show ntfSub',
show ntfSubAuth',
show ntfSubDuplicate',
show ntfCount',
show notifierCount',
show qDeletedAllB',
show qSubAllB',
show qSubEnd',
@@ -470,7 +493,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
show ntfDeletedB',
show ntfSubB',
show msgNtfsB',
show msgNtfExpired'
show msgNtfExpired',
show ntfCount'
]
)
liftIO $ threadDelay' interval
@@ -547,6 +571,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions']
CPStats -> withUserRole $ do
ss <- unliftIO u $ asks serverStats
QueueStore {queues, notifiers} <- unliftIO u $ asks queueStore
let getStat :: (ServerStats -> IORef a) -> IO a
getStat var = readIORef (var ss)
putStat :: Show a => String -> (ServerStats -> IORef a) -> IO ()
@@ -584,7 +609,18 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
putStat "msgNtfsB" msgNtfsB
putStat "msgNtfExpired" msgNtfExpired
putStat "qCount" qCount
qCount2 <- M.size <$> readTVarIO queues
hPutStrLn h $ "qCount 2: " <> show qCount2
notifierCount <- M.size <$> readTVarIO notifiers
hPutStrLn h $ "notifiers: " <> show notifierCount
putStat "msgCount" msgCount
putStat "ntfCount" ntfCount
readTVarIO role >>= \case
CPRAdmin -> do
NtfStore ns <- unliftIO u $ asks ntfStore
ntfCount2 <- liftIO . foldM (\(!n) q -> (n +) . length <$> readTVarIO q) 0 =<< readTVarIO ns
hPutStrLn h $ "ntfCount 2: " <> show ntfCount2
_ -> pure ()
putProxyStat "pRelays" pRelays
putProxyStat "pRelaysOwn" pRelaysOwn
putProxyStat "pMsgFwds" pMsgFwds
@@ -650,24 +686,24 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
hPutStrLn h $ "Ntf subscriptions (via clients): " <> show ntfSubCnt
hPutStrLn h $ "Ntf subscribed clients (via clients): " <> show ntfClCnt
hPutStrLn h $ "Ntf subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show ntfClQs
putActiveClientsInfo "SMP" subscribers
putActiveClientsInfo "Ntf" notifiers
putSubscribedClients "SMP" subClients
putSubscribedClients "Ntf" ntfSubClients
putActiveClientsInfo "SMP" subscribers False
putActiveClientsInfo "Ntf" notifiers True
putSubscribedClients "SMP" subClients False
putSubscribedClients "Ntf" ntfSubClients True
where
putActiveClientsInfo :: String -> TMap QueueId (TVar Client) -> IO ()
putActiveClientsInfo protoName clients = do
putActiveClientsInfo :: String -> TMap QueueId (TVar Client) -> Bool -> IO ()
putActiveClientsInfo protoName clients showIds = do
activeSubs <- readTVarIO clients
hPutStrLn h $ protoName <> " subscriptions: " <> show (M.size activeSubs)
clCnt <- IS.size <$> countSubClients activeSubs
hPutStrLn h $ protoName <> " subscribed clients: " <> show clCnt
clnts <- countSubClients activeSubs
hPutStrLn h $ protoName <> " subscribed clients: " <> show (IS.size clnts) <> (if showIds then " " <> show (IS.toList clnts) else "")
where
countSubClients :: M.Map QueueId (TVar Client) -> IO IS.IntSet
countSubClients = foldM (\ !s c -> (`IS.insert` s) . clientId <$> readTVarIO c) IS.empty
putSubscribedClients :: String -> TVar (IM.IntMap Client) -> IO ()
putSubscribedClients protoName subClnts = do
putSubscribedClients :: String -> TVar (IM.IntMap Client) -> Bool -> IO ()
putSubscribedClients protoName subClnts showIds = do
clnts <- readTVarIO subClnts
hPutStrLn h $ protoName <> " subscribed clients count:" <> show (IM.size clnts)
hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IM.size clnts) <> (if showIds then " " <> show (IM.keys clnts) else "")
countClientSubs :: (Client -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap (Maybe Client) -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
countClientSubs subSel countSubs_ = foldM addSubs (0, (0, 0, 0, 0), 0, (0, 0, 0))
where
@@ -1184,9 +1220,11 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
liftIO (deleteQueueNotifier st entId) >>= \case
Right (Just nId) -> do
-- Possibly, the same should be done if the queue is suspended, but currently we do not use it
asks ntfStore >>= liftIO . (`deleteNtfs` nId)
stats <- asks serverStats
deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId)
when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted)
atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False)
incStat . ntfDeleted =<< asks serverStats
incStat $ ntfDeleted stats
pure ok
Right Nothing -> pure ok
Left e -> pure $ err e
@@ -1459,6 +1497,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
ns <- asks ntfStore
ntf <- mkMessageNotification msgId msgTs rcvNtfDhSecret
liftIO $ storeNtf ns nId ntf
incStat . ntfCount =<< asks serverStats
mkMessageNotification :: ByteString -> SystemTime -> RcvNtfDhSecret -> M MsgNtf
mkMessageNotification msgId msgTs rcvNtfDhSecret = do
@@ -1569,7 +1608,9 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
forM_ (notifierId <$> notifier q) $ \nId -> do
-- queue is deleted by a different client from the one subscribed to notifications,
-- so we don't need to remove subscription from the current client.
asks ntfStore >>= liftIO . (`deleteNtfs` nId)
stats <- asks serverStats
deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId)
when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted)
atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False)
updateDeletedStats q
pure ok
@@ -1760,7 +1801,9 @@ restoreServerStats expiredMsgs expiredNtfs = asks (serverStatsBackupFile . confi
s <- asks serverStats
_qCount <- fmap M.size . readTVarIO . queues =<< asks queueStore
_msgCount <- liftIO . foldM (\(!n) q -> (n +) <$> getQueueSize q) 0 =<< readTVarIO =<< asks msgStore
liftIO $ setServerStats s d {_qCount, _msgCount, _msgExpired = _msgExpired d + expiredMsgs, _msgNtfExpired = _msgNtfExpired d + expiredNtfs}
NtfStore ns <- asks ntfStore
_ntfCount <- liftIO . foldM (\(!n) q -> (n +) . length <$> readTVarIO q) 0 =<< readTVarIO ns
liftIO $ setServerStats s d {_qCount, _msgCount, _ntfCount, _msgExpired = _msgExpired d + expiredMsgs, _msgNtfExpired = _msgNtfExpired d + expiredNtfs}
renameFile f $ f <> ".bak"
logInfo "server stats restored"
when (_qCount /= statsQCount) $ logWarn $ "Queue count differs: stats: " <> tshow statsQCount <> ", store: " <> tshow _qCount
+2 -12
View File
@@ -34,18 +34,8 @@ storeNtf (NtfStore ns) nId ntf = do
where
newNtfs = TM.lookup nId ns >>= maybe (TM.insertM nId (newTVar [ntf]) ns) (`modifyTVar'` (ntf :))
deleteNtfs :: NtfStore -> NotifierId -> IO ()
deleteNtfs (NtfStore ns) nId = atomically $ TM.delete nId ns
flushNtfs :: NtfStore -> NotifierId -> IO [MsgNtf]
flushNtfs (NtfStore ns) nId = do
TM.lookupIO nId ns >>= maybe (pure []) swapNtfs
where
swapNtfs v =
readTVarIO v >>= \case
[] -> pure []
-- if notifications available, atomically swap with empty array
_ -> atomically (swapTVar v [])
deleteNtfs :: NtfStore -> NotifierId -> IO Int
deleteNtfs (NtfStore ns) nId = atomically (TM.lookupDelete nId ns) >>= maybe (pure 0) (fmap length . readTVarIO)
deleteExpiredNtfs :: NtfStore -> Int64 -> IO Int
deleteExpiredNtfs (NtfStore ns) old =
+13 -5
View File
@@ -77,7 +77,8 @@ data ServerStats = ServerStats
pMsgFwdsOwn :: ProxyStats,
pMsgFwdsRecv :: IORef Int,
qCount :: IORef Int,
msgCount :: IORef Int
msgCount :: IORef Int,
ntfCount :: IORef Int
}
data ServerStatsData = ServerStatsData
@@ -129,7 +130,8 @@ data ServerStatsData = ServerStatsData
_pMsgFwdsOwn :: ProxyStatsData,
_pMsgFwdsRecv :: Int,
_qCount :: Int,
_msgCount :: Int
_msgCount :: Int,
_ntfCount :: Int
}
deriving (Show)
@@ -184,6 +186,7 @@ newServerStats ts = do
pMsgFwdsRecv <- newIORef 0
qCount <- newIORef 0
msgCount <- newIORef 0
ntfCount <- newIORef 0
pure
ServerStats
{ fromTime,
@@ -234,7 +237,8 @@ newServerStats ts = do
pMsgFwdsOwn,
pMsgFwdsRecv,
qCount,
msgCount
msgCount,
ntfCount
}
getServerStatsData :: ServerStats -> IO ServerStatsData
@@ -288,6 +292,7 @@ getServerStatsData s = do
_pMsgFwdsRecv <- readIORef $ pMsgFwdsRecv s
_qCount <- readIORef $ qCount s
_msgCount <- readIORef $ msgCount s
_ntfCount <- readIORef $ ntfCount s
pure
ServerStatsData
{ _fromTime,
@@ -338,7 +343,8 @@ getServerStatsData s = do
_pMsgFwdsOwn,
_pMsgFwdsRecv,
_qCount,
_msgCount
_msgCount,
_ntfCount
}
-- this function is not thread safe, it is used on server start only
@@ -393,6 +399,7 @@ setServerStats s d = do
writeIORef (pMsgFwdsRecv s) $! _pMsgFwdsRecv d
writeIORef (qCount s) $! _qCount d
writeIORef (msgCount s) $! _msgCount d
writeIORef (ntfCount s) $! _ntfCount d
instance StrEncoding ServerStatsData where
strEncode d =
@@ -566,7 +573,8 @@ instance StrEncoding ServerStatsData where
_pMsgFwdsOwn,
_pMsgFwdsRecv,
_qCount,
_msgCount = 0
_msgCount = 0,
_ntfCount = 0
}
where
opt s = A.string s *> strP <* A.endOfLine <|> pure 0
+1 -1
View File
@@ -109,7 +109,7 @@ storeLogTests =
testSMPStoreLog :: String -> [SMPStoreLogTestCase] -> Spec
testSMPStoreLog testSuite tests =
fdescribe testSuite $ forM_ tests $ \t@SLTC {name, saved} -> it name $ do
describe testSuite $ forM_ tests $ \t@SLTC {name, saved} -> it name $ do
l <- openWriteStoreLog testStoreLogFile
mapM_ (writeStoreLogRecord l) saved
closeStoreLog l