smp server: optimize concurrency and memory usage, refactor

This commit is contained in:
Evgeny Poberezkin
2025-05-22 10:02:08 +01:00
parent d352d518c2
commit 43ef908309
4 changed files with 173 additions and 127 deletions
@@ -94,11 +94,11 @@ data NtfEnv = NtfEnv
}
newNtfServerEnv :: NtfServerConfig -> IO NtfEnv
newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, startOptions} = do
newNtfServerEnv config@NtfServerConfig {pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, startOptions} = do
when (compactLog startOptions) $ compactDbStoreLog $ dbStoreLogPath dbStoreConfig
random <- C.newRandom
store <- newNtfDbStore dbStoreConfig
subscriber <- newNtfSubscriber subQSize smpAgentCfg random
subscriber <- newNtfSubscriber smpAgentCfg random
pushServer <- newNtfPushServer pushQSize apnsConfig
tlsServerCreds <- loadServerCredential ntfCredentials
Fingerprint fp <- loadFingerprint ntfCredentials
@@ -121,8 +121,8 @@ data NtfSubscriber = NtfSubscriber
type SMPSubscriberVar = SessionVar SMPSubscriber
newNtfSubscriber :: Natural -> SMPClientAgentConfig -> TVar ChaChaDRG -> IO NtfSubscriber
newNtfSubscriber qSize smpAgentCfg random = do
newNtfSubscriber :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO NtfSubscriber
newNtfSubscriber smpAgentCfg random = do
smpSubscribers <- TM.emptyIO
subscriberSeq <- newTVarIO 0
smpAgent <- newSMPClientAgent smpAgentCfg random
+104 -103
View File
@@ -162,8 +162,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
stopServer s
liftIO $ exitSuccess
raceAny_
( serverThread s "server subscribedQ" subscribedQ subscribers subClients pendingSubEvents subscriptions cancelSub
: serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubClients pendingNtfSubEvents ntfSubscriptions (\_ -> pure ())
( serverThread "server subscribers" (subscribers s) subscriptions cancelSub
: serverThread "server notifiers" (ntfSubscribers s) ntfSubscriptions (\_ -> pure ())
: deliverNtfsThread s
: sendPendingEvtsThread s
: receiveFromProxyAgent pa
@@ -229,66 +229,65 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
serverThread ::
forall s.
Server ->
String ->
(Server -> TQueue (QueueId, ClientId, Subscribed)) ->
(Server -> TMap QueueId (TVar AClient)) ->
(Server -> TVar (IM.IntMap AClient)) ->
(Server -> TVar (IM.IntMap (NonEmpty (QueueId, Subscribed)))) ->
ServerSubscribers ->
(forall st. Client st -> TMap QueueId s) ->
(s -> IO ()) ->
M ()
serverThread s label subQ subs subClnts pendingEvts clientSubs unsub = do
serverThread label ServerSubscribers {subQ, queueSubscribers = ss, subClients, pendingEvents} clientSubs unsub = do
labelMyThread label
cls <- asks clients
liftIO . forever $
(atomically (readTQueue $ subQ s) >>= atomically . updateSubscribers cls)
liftIO . forever $ do
-- Reading clients outside of `updateSubscribers` transaction to avoid transaction re-evaluation on each new connected client.
-- In case client disconnects during the transaction (its `connected` property is read),
-- the transaction will still be re-evaluated, and the client won't be stored as subscribed.
sub@(_, clntId, _) <- atomically $ readTQueue subQ
c_ <- IM.lookup clntId <$> readTVarIO cls
atomically (updateSubscribers c_ sub)
$>>= endPreviousSubscriptions
>>= mapM_ unsub
where
updateSubscribers :: TVar (IM.IntMap (Maybe AClient)) -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, Subscribed), AClient))
updateSubscribers cls (qId, clntId, subscribed) =
-- Client lookup by ID is in the same STM transaction.
-- In case client disconnects during the transaction,
-- it will be re-evaluated, and the client won't be stored as subscribed.
(readTVar cls >>= updateSub . IM.lookup clntId)
$>>= clientToBeNotified
updateSubscribers :: Maybe AClient -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, BrokerMsg), AClient))
updateSubscribers c_ (qId, clntId, subscribed) = updateSub $>>= clientToBeNotified
where
ss = subs s
updateSub = \case
Just (Just clnt)
| subscribed -> do
modifyTVar' (subClnts s) $ IM.insert clntId clnt -- add client to server's subscribed cients
TM.lookup qId ss >>= -- insert subscribed and current client
maybe
(newTVar clnt >>= \cv -> TM.insert qId cv ss $> Nothing)
(\cv -> Just <$> swapTVar cv clnt)
| otherwise -> do
removeWhenNoSubs clnt
TM.lookupDelete qId ss >>= mapM readTVar
-- This case catches Just Nothing - it cannot happen here.
-- Nothing is there only before client thread is started.
_ -> TM.lookup qId ss >>= mapM readTVar -- do not insert client if it is already disconnected, but send END to any other client
updateSub = case c_ of
Just c@(AClient _ _ c') -> ifM (readTVar $ connected c') (updateSubConnected c) updateSubDisconnected
Nothing -> updateSubDisconnected
updateSubConnected c
| subscribed = do
modifyTVar' subClients $ IS.insert clntId -- add client to server's subscribed cients
TM.lookup qId ss >>= -- insert subscribed and current client
maybe
(newTVar c >>= \cv -> TM.insert qId cv ss $> Nothing)
(\cv -> Just <$> swapTVar cv c)
| otherwise = do
removeWhenNoSubs c
TM.lookupDelete qId ss >>= mapM readTVar
updateSubDisconnected = TM.lookup qId ss >>= mapM readTVar -- do not insert client if it is already disconnected, but send END to any other client
clientToBeNotified ac@(AClient _ _ c')
| clntId == clientId c' = pure Nothing
| otherwise = (\yes -> if yes then Just ((qId, subscribed), ac) else Nothing) <$> readTVar (connected c')
endPreviousSubscriptions :: ((QueueId, Subscribed), AClient) -> IO (Maybe s)
endPreviousSubscriptions (qEvt@(qId, _), ac@(AClient _ _ c)) = do
atomically $ modifyTVar' (pendingEvts s) $ IM.alter (Just . maybe [qEvt] (qEvt <|)) (clientId c)
| otherwise = (\yes -> if yes then Just ((qId, subEvt), ac) else Nothing) <$> readTVar (connected c')
where
subEvt = if subscribed then END else DELD
endPreviousSubscriptions :: ((QueueId, BrokerMsg), AClient) -> IO (Maybe s)
endPreviousSubscriptions (evt@(qId, _), ac@(AClient _ _ c)) = do
atomically $ modifyTVar' pendingEvents $ IM.alter (Just . maybe [evt] (evt <|)) (clientId c)
atomically $ do
sub <- TM.lookupDelete qId (clientSubs c)
removeWhenNoSubs ac $> sub
-- remove client from server's subscribed cients
removeWhenNoSubs (AClient _ _ c) = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' (subClnts s) $ IM.delete (clientId c)
removeWhenNoSubs (AClient _ _ c) = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' subClients $ IS.delete (clientId c)
deliverNtfsThread :: Server -> M ()
deliverNtfsThread Server {ntfSubClients} = do
deliverNtfsThread Server {ntfSubscribers} = do
ntfInt <- asks $ ntfDeliveryInterval . config
NtfStore ns <- asks ntfStore
stats <- asks serverStats
cls <- asks clients
liftIO $ forever $ do
threadDelay ntfInt
readTVarIO ntfSubClients >>= mapM_ (deliverNtfs ns stats)
cIds <- IS.toList <$> readTVarIO (subClients ntfSubscribers)
forM_ cIds $ \cId -> mapM_ (deliverNtfs ns stats) . IM.lookup cId =<< readTVarIO cls
where
deliverNtfs ns stats (AClient _ _ Client {clientId, ntfSubscriptions, sndQ, connected}) =
whenM (currentClient readTVarIO) $ do
@@ -308,7 +307,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
writeTBQueue sndQ ts
pure $ length ts_
currentClient :: Monad m => (forall a. TVar a -> m a) -> m Bool
currentClient rd = (&&) <$> rd connected <*> (IM.member clientId <$> rd ntfSubClients)
currentClient rd = (&&) <$> rd connected <*> (IS.member clientId <$> rd (subClients ntfSubscribers))
addNtfs :: [Transmission BrokerMsg] -> (NotifierId, TVar [MsgNtf]) -> STM [Transmission BrokerMsg]
addNtfs acc (nId, v) =
readTVar v >>= \case
@@ -324,34 +323,32 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
atomicModifyIORef'_ (msgNtfsB stats) (+ (len `div` 80 + 1)) -- up to 80 NMSG in the batch
sendPendingEvtsThread :: Server -> M ()
sendPendingEvtsThread s = do
sendPendingEvtsThread Server {subscribers, ntfSubscribers} = do
endInt <- asks $ pendingENDInterval . config
cls <- asks clients
forever $ do
threadDelay endInt
sendPending cls $ pendingSubEvents s
sendPending cls $ pendingNtfSubEvents s
sendPending cls subscribers
sendPending cls ntfSubscribers
where
sendPending cls ref = do
ends <- atomically $ swapTVar ref IM.empty
unless (null ends) $ forM_ (IM.assocs ends) $ \(cId, qEvts) ->
mapM_ (queueEvts qEvts) . join . IM.lookup cId =<< readTVarIO cls
queueEvts qEvts (AClient _ _ c@Client {connected, sndQ = q}) =
sendPending cls ServerSubscribers {pendingEvents} = do
ends <- atomically $ swapTVar pendingEvents IM.empty
unless (null ends) $ forM_ (IM.assocs ends) $ \(cId, evts) ->
mapM_ (enqueueEvts evts) . IM.lookup cId =<< readTVarIO cls
enqueueEvts evts (AClient _ _ c@Client {connected, sndQ = q}) =
whenM (readTVarIO connected) $ do
sent <- atomically $ tryWriteTBQueue q ts
if sent
then updateEndStats
else -- if queue is full it can block
forkClient c ("sendPendingEvtsThread.queueEvts") $
forkClient c ("sendPendingEvtsThread.enqueueEvts") $
atomically (writeTBQueue q ts) >> updateEndStats
where
ts = L.map (\(qId, subscribed) -> (CorrId "", qId, evt subscribed)) qEvts
evt True = END
evt False = DELD
ts = L.map (\(qId, evt) -> (CorrId "", qId, evt)) evts
-- this accounts for both END and DELD events
updateEndStats = do
stats <- asks serverStats
let len = L.length qEvts
let len = L.length evts
when (len > 0) $ liftIO $ do
atomicModifyIORef'_ (qSubEnd stats) (+ len)
atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1)) -- up to 255 ENDs or DELDs in the batch
@@ -581,7 +578,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount, rtsOptions}
getRealTimeMetrics :: Env -> IO RealTimeMetrics
getRealTimeMetrics Env {clients, sockets, msgStore = AMS _ _ ms, server = Server {subscribers, notifiers, subClients, ntfSubClients}} = do
getRealTimeMetrics Env {clients, sockets, msgStore = AMS _ _ ms, server = Server {subscribers, ntfSubscribers}} = do
socketStats <- mapM (traverse getSocketStats) =<< readTVarIO sockets
#if MIN_VERSION_base(4,18,0)
threadsCount <- length <$> listThreads
@@ -589,10 +586,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
let threadsCount = 0
#endif
clientsCount <- IM.size <$> readTVarIO clients
smpSubsCount <- M.size <$> readTVarIO subscribers
smpSubClientsCount <- IM.size <$> readTVarIO subClients
ntfSubsCount <- M.size <$> readTVarIO notifiers
ntfSubClientsCount <- IM.size <$> readTVarIO ntfSubClients
smpSubsCount <- M.size <$> readTVarIO (queueSubscribers subscribers)
smpSubClientsCount <- IS.size <$> readTVarIO (subClients subscribers)
ntfSubsCount <- M.size <$> readTVarIO (queueSubscribers ntfSubscribers)
ntfSubClientsCount <- IS.size <$> readTVarIO (subClients ntfSubscribers)
loadedCounts <- loadedQueueCounts ms
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount, loadedCounts}
@@ -655,7 +652,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
CPClients -> withAdminRole $ do
active <- unliftIO u (asks clients) >>= readTVarIO
hPutStrLn h "clientId,sessionId,connected,createdAt,rcvActiveAt,sndActiveAt,age,subscriptions"
forM_ (IM.toList active) $ \(cid, cl) -> forM_ cl $ \(AClient _ _ Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions}) -> do
forM_ (IM.toList active) $ \(cid, (AClient _ _ Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions})) -> do
connected' <- bshow <$> readTVarIO connected
rcvActiveAt' <- strEncode <$> readTVarIO rcvActiveAt
sndActiveAt' <- strEncode <$> readTVarIO sndActiveAt
@@ -767,7 +764,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
#else
hPutStrLn h "Threads: not available on GHC 8.10"
#endif
Env {clients, server = Server {subscribers, notifiers, subClients, ntfSubClients}} <- unliftIO u ask
Env {clients, server = Server {subscribers, ntfSubscribers}} <- unliftIO u ask
activeClients <- readTVarIO clients
hPutStrLn h $ "Clients: " <> show (IM.size activeClients)
when (r == CPRAdmin) $ do
@@ -782,10 +779,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
hPutStrLn h $ "Ntf subscriptions (via clients): " <> show ntfSubCnt
hPutStrLn h $ "Ntf subscribed clients (via clients): " <> show ntfClCnt
hPutStrLn h $ "Ntf subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show ntfClQs
putActiveClientsInfo "SMP" subscribers False
putActiveClientsInfo "Ntf" notifiers True
putSubscribedClients "SMP" subClients False
putSubscribedClients "Ntf" ntfSubClients True
putActiveClientsInfo "SMP" (queueSubscribers subscribers) False
putActiveClientsInfo "Ntf" (queueSubscribers ntfSubscribers) True
putSubscribedClients "SMP" (subClients subscribers) False
putSubscribedClients "Ntf" (subClients ntfSubscribers) True
where
putActiveClientsInfo :: String -> TMap QueueId (TVar AClient) -> Bool -> IO ()
putActiveClientsInfo protoName clients showIds = do
@@ -796,16 +793,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
where
countSubClients :: M.Map QueueId (TVar AClient) -> IO IS.IntSet
countSubClients = foldM (\ !s c -> (`IS.insert` s) . clientId' <$> readTVarIO c) IS.empty
putSubscribedClients :: String -> TVar (IM.IntMap AClient) -> Bool -> IO ()
putSubscribedClients :: String -> TVar IS.IntSet -> Bool -> IO ()
putSubscribedClients protoName subClnts showIds = do
clnts <- readTVarIO subClnts
hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IM.size clnts) <> (if showIds then " " <> show (IM.keys clnts) else "")
countClientSubs :: (forall s. Client s -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap (Maybe AClient) -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IS.size clnts) <> (if showIds then " " <> show clnts else "")
countClientSubs :: (forall s. Client s -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
countClientSubs subSel countSubs_ = foldM addSubs (0, (0, 0, 0, 0), 0, (0, 0, 0))
where
addSubs :: (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) -> Maybe AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
addSubs acc Nothing = pure acc
addSubs (!subCnt, cnts@(!c1, !c2, !c3, !c4), !clCnt, !qs) (Just acl@(AClient _ _ cl)) = do
addSubs :: (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) -> AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
addSubs (!subCnt, cnts@(!c1, !c2, !c3, !c4), !clCnt, !qs) acl@(AClient _ _ cl) = do
subs <- readTVarIO $ subSel cl
cnts' <- case countSubs_ of
Nothing -> pure cnts
@@ -816,8 +812,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
clCnt' = if cnt == 0 then clCnt else clCnt + 1
qs' <- if cnt == 0 then pure qs else addQueueLengths qs acl
pure (subCnt + cnt, cnts', clCnt', qs')
clientTBQueueLengths' :: Foldable t => t (Maybe AClient) -> IO (Natural, Natural, Natural)
clientTBQueueLengths' = foldM (\acc -> maybe (pure acc) (addQueueLengths acc)) (0, 0, 0)
clientTBQueueLengths' :: Foldable t => t AClient -> IO (Natural, Natural, Natural)
clientTBQueueLengths' = foldM addQueueLengths (0, 0, 0)
addQueueLengths (!rl, !sl, !ml) (AClient _ _ cl) = do
(rl', sl', ml') <- queueLengths cl
pure (rl + rl', sl + sl', ml + ml')
@@ -896,30 +892,35 @@ runClientTransport :: Transport c => THandleSMP c 'TServer -> M ()
runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessionId}} = do
q <- asks $ tbqSize . config
ts <- liftIO getSystemTime
active <- asks clients
nextClientId <- asks clientSeq
clientId <- atomically $ stateTVar nextClientId $ \next -> (next, next + 1)
atomically $ modifyTVar' active $ IM.insert clientId Nothing
AMS qt mt ms <- asks msgStore
c <- liftIO $ newClient qt mt clientId q thVersion sessionId ts
runClientThreads qt mt ms active c clientId `finally` clientDisconnected c
runClientThreads qt mt ms c clientId `finally` clientDisconnected c
where
runClientThreads :: MsgStoreClass (MsgStore qs ms) => SQSType qs -> SMSType ms -> MsgStore qs ms -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore qs ms) -> IS.Key -> M ()
runClientThreads qt mt ms active c clientId = do
atomically $ modifyTVar' active $ IM.insert clientId $ Just (AClient qt mt c)
s <- asks server
expCfg <- asks $ inactiveClientExpiration . config
th <- newMVar h -- put TH under a fair lock to interleave messages and command responses
labelMyThread . B.unpack $ "client $" <> encode sessionId
raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams s ms c, receive h ms c] <> disconnectThread_ c s expCfg
runClientThreads :: MsgStoreClass (MsgStore qs ms) => SQSType qs -> SMSType ms -> MsgStore qs ms -> Client (MsgStore qs ms) -> IS.Key -> M ()
runClientThreads qt mt ms c clientId = do
cls <- asks clients
ok <-
atomically $ do
ifM
(readTVar $ connected c)
(True <$ modifyTVar' cls (IM.insert clientId $ AClient qt mt c))
(pure False)
when ok $ do
s <- asks server
expCfg <- asks $ inactiveClientExpiration . config
th <- newMVar h -- put TH under a fair lock to interleave messages and command responses
labelMyThread . B.unpack $ "client $" <> encode sessionId
raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams s ms c, receive h ms c] <> disconnectThread_ c s expCfg
disconnectThread_ :: Client s -> Server -> Maybe ExpirationConfig -> [M ()]
disconnectThread_ c s (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c s)]
disconnectThread_ _ _ _ = []
noSubscriptions Client {clientId} s = do
hasSubs <- IM.member clientId <$> readTVarIO (subClients s)
noSubscriptions Client {clientId} Server {subscribers, ntfSubscribers} = do
hasSubs <- IS.member clientId <$> readTVarIO (subClients subscribers)
if hasSubs
then pure False
else not . IM.member clientId <$> readTVarIO (ntfSubClients s)
else not . IS.member clientId <$> readTVarIO (subClients ntfSubscribers)
clientDisconnected :: Client s -> M ()
clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connected, sessionId, endThreads} = do
@@ -931,12 +932,12 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connecte
ntfSubs <- atomically $ swapTVar ntfSubscriptions M.empty
liftIO $ mapM_ cancelSub subs
whenM (asks serverActive >>= readTVarIO) $ do
Server {subscribers, notifiers, subClients, ntfSubClients} <- asks server
liftIO $ updateSubscribers subs subscribers
liftIO $ updateSubscribers ntfSubs notifiers
Server {subscribers, ntfSubscribers} <- asks server
liftIO $ updateSubscribers subs $ queueSubscribers subscribers
liftIO $ updateSubscribers ntfSubs $ queueSubscribers ntfSubscribers
asks clients >>= atomically . (`modifyTVar'` IM.delete clientId)
atomically $ modifyTVar' subClients $ IM.delete clientId
atomically $ modifyTVar' ntfSubClients $ IM.delete clientId
atomically $ modifyTVar' (subClients subscribers) $ IS.delete clientId
atomically $ modifyTVar' (subClients ntfSubscribers) $ IS.delete clientId
tIds <- atomically $ swapTVar endThreads IM.empty
liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds
where
@@ -1151,7 +1152,7 @@ forkClient Client {endThreads, endThreadSeq} label action = do
client :: forall s. MsgStoreClass s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M ()
client
thParams'
Server {subscribedQ, ntfSubscribedQ, subscribers}
Server {subscribers, ntfSubscribers}
ms
clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
@@ -1372,7 +1373,7 @@ client
Left e -> pure $ ERR e
Right nId_ -> do
incStat . ntfCreated =<< asks serverStats
forM_ nId_ $ \nId -> atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False)
forM_ nId_ $ \nId -> atomically $ writeTQueue (subQ ntfSubscribers) (nId, clientId, False)
pure $ NID notifierId rcvPublicDhKey
deleteQueueNotifier_ :: StoreQueue s -> M (Transmission BrokerMsg)
@@ -1383,7 +1384,7 @@ client
stats <- asks serverStats
deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId)
when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted)
atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False)
atomically $ writeTQueue (subQ ntfSubscribers) (nId, clientId, False)
incStat $ ntfDeleted stats
pure ok
Right Nothing -> pure ok
@@ -1410,7 +1411,7 @@ client
rId = recipientId q
newSub :: M Sub
newSub = time "SUB newSub" . atomically $ do
writeTQueue subscribedQ (rId, clientId, True)
writeTQueue (subQ subscribers) (rId, clientId, True)
sub <- newSubscription NoSub
TM.insert rId sub subscriptions
pure sub
@@ -1486,7 +1487,7 @@ client
pure ok
where
newSub = do
writeTQueue ntfSubscribedQ (entId, clientId, True)
writeTQueue (subQ ntfSubscribers) (entId, clientId, True)
TM.insert entId () ntfSubscriptions
acknowledgeMsg :: MsgId -> StoreQueue s -> QueueRec -> M (Transmission BrokerMsg)
@@ -1522,7 +1523,7 @@ client
incStat $ msgRecv stats
if isGet
then incStat $ msgRecvGet stats
else pure () -- TODO skip notification delivery for delivered message
else pure () -- TODO skip notification delivery for delivered message
-- skipping delivery fails tests, it should be counted in msgNtfSkipped
-- forM_ (notifierId <$> notifier qr) $ \nId -> do
-- ns <- asks ntfStore
@@ -1597,7 +1598,7 @@ client
tryDeliverMessage msg =
-- the subscription is checked outside of STM to avoid transaction cost
-- in case no client is subscribed.
whenM (TM.memberIO rId subscribers) $
whenM (TM.memberIO rId $ queueSubscribers subscribers) $
atomically deliverToSub >>= mapM_ forkDeliver
where
rId = recipientId q
@@ -1606,7 +1607,7 @@ client
-- so that if subscription ends, it re-evalutates
-- and delivery is cancelled -
-- the new client will receive message in response to SUB.
(TM.lookup rId subscribers >>= mapM readTVar)
(TM.lookup rId (queueSubscribers subscribers) >>= mapM readTVar)
$>>= \rc@(AClient _ _ Client {subscriptions = subs, sndQ = sndQ'}) -> TM.lookup rId subs
$>>= \s@Sub {subThread, delivered} -> case subThread of
ProhibitSub -> pure Nothing
@@ -1635,7 +1636,7 @@ client
labelMyThread $ B.unpack ("client $" <> encode sessionId) <> " deliver/SEND"
-- lookup can be outside of STM transaction,
-- as long as the check that it is the same client is inside.
TM.lookupIO rId subscribers >>= mapM_ deliverIfSame
TM.lookupIO rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame
deliverIfSame rc' = time "deliver" . atomically $
whenM (sameClientId rc <$> readTVar rc') $
tryTakeTMVar delivered >>= \case
@@ -1750,7 +1751,7 @@ client
Right qr -> do
-- Possibly, the same should be done if the queue is suspended, but currently we do not use it
atomically $ do
writeTQueue subscribedQ (entId, clientId, False)
writeTQueue (subQ subscribers) (entId, clientId, False)
-- queue is usually deleted by the same client that is currently subscribed,
-- we delete subscription here, so the client with no subscriptions can be disconnected.
TM.delete entId subscriptions
@@ -1760,7 +1761,7 @@ client
stats <- asks serverStats
deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId)
when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted)
atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False)
atomically $ writeTQueue (subQ ntfSubscribers) (nId, clientId, False)
updateDeletedStats qr
pure ok
Left e -> pure $ err e
@@ -1985,7 +1986,7 @@ restoreServerNtfs =
renameFile f $ f <> ".bak"
let NtfStore ns' = ns
storedQueues <- M.size <$> readTVarIO ns'
logNote $ "notifications restored, " <> tshow lineCount <> " lines processed"
logNote $ "notifications restored, " <> tshow lineCount <> " lines processed"
pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues}
where
restoreNtf :: NtfStore -> Int64 -> (Int, Int, Int) -> LB.ByteString -> ExceptT String IO (Int, Int, Int)
+64 -19
View File
@@ -18,7 +18,47 @@
#endif
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
module Simplex.Messaging.Server.Env.STM where
module Simplex.Messaging.Server.Env.STM
( ServerConfig (..),
ServerStoreCfg (..),
AServerStoreCfg (..),
StorePaths (..),
StartOptions (..),
Env (..),
Server (..),
ServerSubscribers (..),
ProxyAgent (..),
Client (..),
AClient (..),
ClientId,
Subscribed,
Sub (..),
ServerSub (..),
SubscriptionThread (..),
MsgStore,
AMsgStore (..),
AStoreType (..),
newEnv,
mkJournalStoreConfig,
newClient,
clientId',
newSubscription,
newProhibitedSub,
defaultMsgQueueQuota,
defMsgExpirationDays,
defNtfExpirationHours,
defaultMessageExpiration,
defaultNtfExpiration,
defaultInactiveClientExpiration,
defaultProxyClientConcurrency,
defaultMaxJournalMsgCount,
defaultMaxJournalStateLines,
defaultIdleQueueInterval,
journalMsgStoreDepth,
readWriteQueueStore,
noPostgresExit,
)
where
import Control.Concurrent (ThreadId)
import Control.Logger.Simple
@@ -29,6 +69,8 @@ import Data.ByteString.Char8 (ByteString)
import Data.Int (Int64)
import Data.IntMap.Strict (IntMap)
import qualified Data.IntMap.Strict as IM
import Data.IntSet (IntSet)
import qualified Data.IntSet as IS
import Data.Kind (Constraint)
import Data.List (intercalate)
import Data.List.NonEmpty (NonEmpty)
@@ -203,7 +245,7 @@ data Env = Env
serverStats :: ServerStats,
sockets :: TVar [(ServiceName, SocketState)],
clientSeq :: TVar ClientId,
clients :: TVar (IntMap (Maybe AClient)),
clients :: TVar (IntMap AClient),
proxyAgent :: ProxyAgent -- senders served on this proxy
}
@@ -236,17 +278,18 @@ data AMsgStore =
type Subscribed = Bool
data Server = Server
{ subscribedQ :: TQueue (RecipientId, ClientId, Subscribed),
subscribers :: TMap RecipientId (TVar AClient),
ntfSubscribedQ :: TQueue (NotifierId, ClientId, Subscribed),
notifiers :: TMap NotifierId (TVar AClient),
subClients :: TVar (IntMap AClient), -- clients with SMP subscriptions
ntfSubClients :: TVar (IntMap AClient), -- clients with Ntf subscriptions
pendingSubEvents :: TVar (IntMap (NonEmpty (RecipientId, Subscribed))),
pendingNtfSubEvents :: TVar (IntMap (NonEmpty (NotifierId, Subscribed))),
{ subscribers :: ServerSubscribers,
ntfSubscribers :: ServerSubscribers,
savingLock :: Lock
}
data ServerSubscribers = ServerSubscribers
{ subQ :: TQueue (QueueId, ClientId, Subscribed),
queueSubscribers :: TMap QueueId (TVar AClient),
subClients :: TVar IntSet,
pendingEvents :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
}
newtype ProxyAgent = ProxyAgent
{ smpAgent :: SMPClientAgent
}
@@ -288,16 +331,18 @@ data Sub = Sub
newServer :: IO Server
newServer = do
subscribedQ <- newTQueueIO
subscribers <- TM.emptyIO
ntfSubscribedQ <- newTQueueIO
notifiers <- TM.emptyIO
subClients <- newTVarIO IM.empty
ntfSubClients <- newTVarIO IM.empty
pendingSubEvents <- newTVarIO IM.empty
pendingNtfSubEvents <- newTVarIO IM.empty
subscribers <- newServerSubscribers
ntfSubscribers <- newServerSubscribers
savingLock <- createLockIO
return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, subClients, ntfSubClients, pendingSubEvents, pendingNtfSubEvents, savingLock}
return Server {subscribers, ntfSubscribers, savingLock}
newServerSubscribers :: IO ServerSubscribers
newServerSubscribers = do
subQ <- newTQueueIO
queueSubscribers <- TM.emptyIO
subClients <- newTVarIO IS.empty
pendingEvents <- newTVarIO IM.empty
pure ServerSubscribers {subQ, queueSubscribers, subClients, pendingEvents}
newClient :: SQSType qs -> SMSType ms -> ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> IO (Client (MsgStore qs ms))
newClient _ _ clientId qSize thVersion sessionId createdAt = do
+1 -1
View File
@@ -61,7 +61,7 @@ import Data.Time.Clock.System (systemToUTCTime)
import qualified Database.PostgreSQL.Simple as PSQL
import NtfClient
import SMPAgentClient (agentCfg, initAgentServers, initAgentServers2, testDB, testDB2, testNtfServer, testNtfServer2)
import SMPClient (cfgMS, cfgJ2QS, cfgVPrev, ntfTestPort, ntfTestPort2, serverStoreConfig, testPort, testPort2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn, xit'')
import SMPClient (cfgMS, cfgJ2QS, cfgVPrev, ntfTestPort, ntfTestPort2, serverStoreConfig, testPort, testPort2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn)
import Simplex.Messaging.Agent hiding (createConnection, joinConnection, sendMessage)
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..), withStore')
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig, Env (..), InitialAgentServers)