smp servers: separately account for subscribed clients, to disconnect those without subscriptions more effectively (and to be able to deliver notifications to subscribed ntf servers) (#1339)

This commit is contained in:
Evgeny
2024-09-28 19:59:53 +01:00
committed by GitHub
parent 03168b9fbf
commit 21eee2b548
2 changed files with 90 additions and 68 deletions
+79 -53
View File
@@ -136,10 +136,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
expired <- restoreServerMessages
restoreServerStats expired
raceAny_
( serverThread s "server subscribedQ" True subscribedQ subscribers pendingENDs subscriptions cancelSub
: serverThread s "server deletedQ" False deletedQ subscribers pendingDELDs subscriptions cancelSub
: serverThread s "server ntfSubscribedQ" True ntfSubscribedQ Env.notifiers pendingNtfENDs ntfSubscriptions (\_ -> pure ())
: serverThread s "server ntfDeletedQ" False ntfDeletedQ Env.notifiers pendingNtfDELDs ntfSubscriptions (\_ -> pure ())
( serverThread s "server subscribedQ" subscribedQ subscribers subClients pendingSubEvents subscriptions cancelSub
: serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubClients pendingNtfSubEvents ntfSubscriptions (\_ -> pure ())
: sendPendingEvtsThread s
: receiveFromProxyAgent pa
: map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg
@@ -165,14 +163,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
forall s.
Server ->
String ->
Subscribed ->
(Server -> TQueue (QueueId, ClientId)) ->
(Server -> TQueue (QueueId, ClientId, Subscribed)) ->
(Server -> TMap QueueId (TVar Client)) ->
(Server -> TVar (IM.IntMap (NonEmpty RecipientId))) ->
(Server -> TVar (IM.IntMap Client)) ->
(Server -> TVar (IM.IntMap (NonEmpty (QueueId, Subscribed)))) ->
(Client -> TMap QueueId s) ->
(s -> IO ()) ->
M ()
serverThread s label subscribed subQ subs pendingEvts clientSubs unsub = do
serverThread s label subQ subs subClnts pendingEvts clientSubs unsub = do
labelMyThread label
cls <- asks clients
liftIO . forever $
@@ -180,32 +178,40 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
$>>= endPreviousSubscriptions
>>= mapM_ unsub
where
updateSubscribers :: TVar (IM.IntMap (Maybe Client)) -> (QueueId, ClientId) -> STM (Maybe (QueueId, Client))
updateSubscribers cls (qId, clntId) =
updateSubscribers :: TVar (IM.IntMap (Maybe Client)) -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, Subscribed), Client))
updateSubscribers cls (qId, clntId, subscribed) =
-- Client lookup by ID is in the same STM transaction.
-- In case client disconnects during the transaction,
-- it will be re-evaluated, and the client won't be stored as subscribed.
(readTVar cls >>= updateSub (subs s) . IM.lookup clntId)
(readTVar cls >>= updateSub . IM.lookup clntId)
$>>= clientToBeNotified
where
updateSub ss = \case
ss = subs s
updateSub = \case
Just (Just clnt)
| subscribed ->
| subscribed -> do
modifyTVar' (subClnts s) $ IM.insert clntId clnt -- add client to server's subscribed cients
TM.lookup qId ss >>= -- insert subscribed and current client
maybe
(newTVar clnt >>= \cv -> TM.insert qId cv ss $> Nothing)
(\cv -> Just <$> swapTVar cv clnt)
| otherwise -> TM.lookupDelete qId ss >>= mapM readTVar
| otherwise -> do
removeWhenNoSubs clnt
TM.lookupDelete qId ss >>= mapM readTVar
-- This case catches Just Nothing - it cannot happen here.
-- Nothing is there only before client thread is started.
_ -> TM.lookup qId ss >>= mapM readTVar -- do not insert client if it is already disconnected, but send END to any other client
clientToBeNotified c'
| clntId == clientId c' = pure Nothing
| otherwise = (\yes -> if yes then Just (qId, c') else Nothing) <$> readTVar (connected c')
endPreviousSubscriptions :: (QueueId, Client) -> IO (Maybe s)
endPreviousSubscriptions (qId, c) = do
atomically $ modifyTVar' (pendingEvts s) $ IM.alter (Just . maybe [qId] (qId <|)) (clientId c)
atomically $ TM.lookupDelete qId (clientSubs c)
| otherwise = (\yes -> if yes then Just ((qId, subscribed), c') else Nothing) <$> readTVar (connected c')
endPreviousSubscriptions :: ((QueueId, Subscribed), Client) -> IO (Maybe s)
endPreviousSubscriptions (qEvt@(qId, _), c) = do
atomically $ modifyTVar' (pendingEvts s) $ IM.alter (Just . maybe [qEvt] (qEvt <|)) (clientId c)
atomically $ do
sub <- TM.lookupDelete qId (clientSubs c)
removeWhenNoSubs c $> sub
-- remove client from server's subscribed cients
removeWhenNoSubs c = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' (subClnts s) $ IM.delete (clientId c)
sendPendingEvtsThread :: Server -> M ()
sendPendingEvtsThread s = do
@@ -213,16 +219,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
cls <- asks clients
forever $ do
threadDelay endInt
sendPending cls END $ pendingENDs s
sendPending cls DELD $ pendingDELDs s
sendPending cls END $ pendingNtfENDs s
sendPending cls DELD $ pendingNtfDELDs s
sendPending cls $ pendingSubEvents s
sendPending cls $ pendingNtfSubEvents s
where
sendPending cls evt ref = do
sendPending cls ref = do
ends <- atomically $ swapTVar ref IM.empty
unless (null ends) $ forM_ (IM.assocs ends) $ \(cId, qIds) ->
mapM_ (queueEvts qIds evt) . join . IM.lookup cId =<< readTVarIO cls
queueEvts qIds evt c@Client {connected, sndQ = q} =
unless (null ends) $ forM_ (IM.assocs ends) $ \(cId, qEvts) ->
mapM_ (queueEvts qEvts) . join . IM.lookup cId =<< readTVarIO cls
queueEvts qEvts c@Client {connected, sndQ = q} =
whenM (readTVarIO connected) $ do
sent <- atomically $ ifM (isFullTBQueue q) (pure False) (writeTBQueue q ts $> True)
if sent
@@ -231,14 +235,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
forkClient c ("sendPendingEvtsThread.queueEvts") $
atomically (writeTBQueue q ts) >> updateEndStats
where
ts = L.map (CorrId "",,evt) qIds
updateEndStats = case evt of
END -> do
stats <- asks serverStats
let len = L.length qIds
liftIO $ atomicModifyIORef'_ (qSubEnd stats) (+ len)
liftIO $ atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1)) -- up to 255 ENDs in the batch
_ -> pure ()
ts = L.map (\(qId, subscribed) -> (CorrId "", qId, evt subscribed)) qEvts
evt True = END
evt False = DELD
-- this accounts for both END and DELD events
updateEndStats = do
stats <- asks serverStats
let len = L.length qEvts
liftIO $ atomicModifyIORef'_ (qSubEnd stats) (+ len)
liftIO $ atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1)) -- up to 255 ENDs or DELDs in the batch
receiveFromProxyAgent :: ProxyAgent -> M ()
receiveFromProxyAgent ProxyAgent {smpAgent = SMPClientAgent {agentQ}} =
@@ -253,16 +258,16 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
showServer' = decodeLatin1 . strEncode . host
expireMessagesThread_ :: ServerConfig -> [M ()]
expireMessagesThread_ ServerConfig {messageExpiration = Just msgExp} = [expireMessages msgExp]
expireMessagesThread_ ServerConfig {messageExpiration = Just msgExp} = [expireMessagesThread msgExp]
expireMessagesThread_ _ = []
expireMessages :: ExpirationConfig -> M ()
expireMessages expCfg = do
expireMessagesThread :: ExpirationConfig -> M ()
expireMessagesThread expCfg = do
ms <- asks msgStore
quota <- asks $ msgQueueQuota . config
let interval = checkInterval expCfg * 1000000
stats <- asks serverStats
labelMyThread "expireMessages"
labelMyThread "expireMessagesThread"
forever $ do
liftIO $ threadDelay' interval
old <- liftIO $ expireBeforeEpoch expCfg
@@ -564,7 +569,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
#else
hPutStrLn h "Threads: not available on GHC 8.10"
#endif
Env {clients, server = Server {subscribers, notifiers}} <- unliftIO u ask
Env {clients, server = Server {subscribers, notifiers, subClients, ntfSubClients}} <- unliftIO u ask
activeClients <- readTVarIO clients
hPutStrLn h $ "Clients: " <> show (IM.size activeClients)
when (r == CPRAdmin) $ do
@@ -581,6 +586,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
hPutStrLn h $ "Ntf subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show ntfClQs
putActiveClientsInfo "SMP" subscribers
putActiveClientsInfo "Ntf" notifiers
putSubscribedClients "SMP" subClients
putSubscribedClients "Ntf" ntfSubClients
where
putActiveClientsInfo :: String -> TMap QueueId (TVar Client) -> IO ()
putActiveClientsInfo protoName clients = do
@@ -591,6 +598,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
where
countSubClients :: M.Map QueueId (TVar Client) -> IO IS.IntSet
countSubClients = foldM (\ !s c -> (`IS.insert` s) . clientId <$> readTVarIO c) IS.empty
putSubscribedClients :: String -> TVar (IM.IntMap Client) -> IO ()
putSubscribedClients protoName subClnts = do
clnts <- readTVarIO subClnts
hPutStrLn h $ protoName <> " subscribed clients count:" <> show (IM.size clnts)
countClientSubs :: (Client -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap (Maybe Client) -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
countClientSubs subSel countSubs_ = foldM addSubs (0, (0, 0, 0, 0), 0, (0, 0, 0))
where
@@ -680,11 +691,14 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio
expCfg <- asks $ inactiveClientExpiration . config
th <- newMVar h -- put TH under a fair lock to interleave messages and command responses
labelMyThread . B.unpack $ "client $" <> encode sessionId
raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams c s, receive h c] <> disconnectThread_ c expCfg
disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)]
disconnectThread_ _ _ = []
noSubscriptions c = atomically $ (&&) <$> TM.null (ntfSubscriptions c) <*> (not . hasSubs <$> readTVar (subscriptions c))
hasSubs = any $ (\case ServerSub _ -> True; ProhibitSub -> False) . subThread
raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams c s, receive h c] <> disconnectThread_ c s expCfg
disconnectThread_ c s (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c s)]
disconnectThread_ _ _ _ = []
noSubscriptions Client {clientId} s = do
hasSubs <- IM.member clientId <$> readTVarIO (subClients s)
if hasSubs
then pure False
else not . IM.member clientId <$> readTVarIO (ntfSubClients s)
clientDisconnected :: Client -> M ()
clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connected, sessionId, endThreads} = do
@@ -695,10 +709,12 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connecte
subs <- atomically $ swapTVar subscriptions M.empty
ntfSubs <- atomically $ swapTVar ntfSubscriptions M.empty
liftIO $ mapM_ cancelSub subs
Server {subscribers, notifiers} <- asks server
Server {subscribers, notifiers, subClients, ntfSubClients} <- asks server
liftIO $ updateSubscribers subs subscribers
liftIO $ updateSubscribers ntfSubs notifiers
asks clients >>= atomically . (`modifyTVar'` IM.delete clientId)
atomically $ modifyTVar' subClients $ IM.delete clientId
atomically $ modifyTVar' ntfSubClients $ IM.delete clientId
tIds <- atomically $ swapTVar endThreads IM.empty
liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds
where
@@ -899,7 +915,7 @@ forkClient Client {endThreads, endThreadSeq} label action = do
mkWeakThreadId t >>= atomically . modifyTVar' endThreads . IM.insert tId
client :: THandleParams SMPVersion 'TServer -> Client -> Server -> M ()
client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} Server {subscribedQ, deletedQ, ntfSubscribedQ, ntfDeletedQ, subscribers, notifiers} = do
client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} Server {subscribedQ, ntfSubscribedQ, subscribers, notifiers} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
forever $
atomically (readTBQueue rcvQ)
@@ -1093,7 +1109,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
Right nId_ -> do
withLog $ \s -> logAddNotifier s entId ntfCreds
incStat . ntfCreated =<< asks serverStats
forM_ nId_ $ \nId -> atomically $ writeTQueue ntfDeletedQ (nId, clientId)
forM_ nId_ $ \nId -> atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False)
pure $ NID notifierId rcvPublicDhKey
deleteQueueNotifier_ :: QueueStore -> M (Transmission BrokerMsg)
@@ -1102,7 +1118,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
liftIO (deleteQueueNotifier st entId) >>= \case
Right (Just nId) -> do
-- Possibly, the same should be done if the queue is suspended, but currently we do not use it
atomically $ writeTQueue ntfDeletedQ (nId, clientId)
atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False)
incStat . ntfDeleted =<< asks serverStats
pure ok
Right Nothing -> pure ok
@@ -1130,7 +1146,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
where
newSub :: M Sub
newSub = time "SUB newSub" . atomically $ do
writeTQueue subscribedQ (rId, clientId)
writeTQueue subscribedQ (rId, clientId, True)
sub <- newSubscription NoSub
TM.insert rId sub subscriptions
pure sub
@@ -1163,6 +1179,10 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
newSub = do
s <- newProhibitedSub
TM.insert entId s subscriptions
-- Here we don't account for this client as subscribed in the server
-- and don't notify other subscribed clients.
-- This is tracked as "subscription" in the client to prevent these
-- clients from being able to subscribe.
pure s
getMessage_ :: Sub -> Maybe MsgId -> M (Transmission BrokerMsg)
getMessage_ s delivered_ = do
@@ -1205,7 +1225,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
pure ok
where
newSub = do
writeTQueue ntfSubscribedQ (entId, clientId)
writeTQueue ntfSubscribedQ (entId, clientId, True)
TM.insert entId () ntfSubscriptions
acknowledgeMsg :: QueueRec -> MsgId -> M (Transmission BrokerMsg)
@@ -1486,9 +1506,15 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
liftIO (deleteQueue st entId $>>= \q -> delMsgQueue ms entId $> Right q) >>= \case
Right q -> do
-- Possibly, the same should be done if the queue is suspended, but currently we do not use it
atomically $ writeTQueue deletedQ (entId, clientId)
atomically $ do
writeTQueue subscribedQ (entId, clientId, False)
-- queue is usually deleted by the same client that is currently subscribed,
-- we delete subscription here, so the client with no subscriptions can be disconnected.
TM.delete entId subscriptions
forM_ (notifierId <$> notifier q) $ \nId ->
atomically $ writeTQueue ntfDeletedQ (nId, clientId)
-- queue is deleted by a different client from the one subscribed to notifications,
-- so we don't need to remove subscription from the current client.
atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False)
updateDeletedStats q
pure ok
Left e -> pure $ err e
+11 -15
View File
@@ -136,16 +136,14 @@ data Env = Env
type Subscribed = Bool
data Server = Server
{ subscribedQ :: TQueue (RecipientId, ClientId),
deletedQ :: TQueue (RecipientId, ClientId),
{ subscribedQ :: TQueue (RecipientId, ClientId, Subscribed),
subscribers :: TMap RecipientId (TVar Client),
ntfSubscribedQ :: TQueue (NotifierId, ClientId),
ntfDeletedQ :: TQueue (NotifierId, ClientId),
ntfSubscribedQ :: TQueue (NotifierId, ClientId, Subscribed),
notifiers :: TMap NotifierId (TVar Client),
pendingENDs :: TVar (IntMap (NonEmpty RecipientId)),
pendingDELDs :: TVar (IntMap (NonEmpty RecipientId)),
pendingNtfENDs :: TVar (IntMap (NonEmpty NotifierId)),
pendingNtfDELDs :: TVar (IntMap (NonEmpty NotifierId)),
subClients :: TVar (IntMap Client), -- clients with SMP subscriptions
ntfSubClients :: TVar (IntMap Client), -- clients with Ntf subscriptions
pendingSubEvents :: TVar (IntMap (NonEmpty (RecipientId, Subscribed))),
pendingNtfSubEvents :: TVar (IntMap (NonEmpty (NotifierId, Subscribed))),
savingLock :: Lock
}
@@ -185,17 +183,15 @@ data Sub = Sub
newServer :: IO Server
newServer = do
subscribedQ <- newTQueueIO
deletedQ <- newTQueueIO
subscribers <- TM.emptyIO
ntfSubscribedQ <- newTQueueIO
ntfDeletedQ <- newTQueueIO
notifiers <- TM.emptyIO
pendingENDs <- newTVarIO IM.empty
pendingDELDs <- newTVarIO IM.empty
pendingNtfENDs <- newTVarIO IM.empty
pendingNtfDELDs <- newTVarIO IM.empty
subClients <- newTVarIO IM.empty
ntfSubClients <- newTVarIO IM.empty
pendingSubEvents <- newTVarIO IM.empty
pendingNtfSubEvents <- newTVarIO IM.empty
savingLock <- atomically createLock
return Server {subscribedQ, deletedQ, subscribers, ntfSubscribedQ, ntfDeletedQ, notifiers, pendingENDs, pendingDELDs, pendingNtfENDs, pendingNtfDELDs, savingLock}
return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, subClients, ntfSubClients, pendingSubEvents, pendingNtfSubEvents, savingLock}
newClient :: ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> IO Client
newClient clientId qSize thVersion sessionId createdAt = do