smp server: simplify client subscriptions (#1223)

This commit is contained in:
Evgeny Poberezkin
2024-07-09 08:36:03 +01:00
committed by GitHub
parent 21abc5cabe
commit 26979ff6b5
2 changed files with 43 additions and 45 deletions

View File

@@ -514,11 +514,11 @@ clientDisconnected c@Client {clientId, subscriptions, connected, sessionId, endT
sameClientId :: Client -> Client -> Bool
sameClientId Client {clientId} Client {clientId = cId'} = clientId == cId'
cancelSub :: TVar Sub -> IO ()
cancelSub sub =
readTVarIO sub >>= \case
Sub {subThread = SubThread t} -> liftIO $ deRefWeak t >>= mapM_ killThread
_ -> return ()
cancelSub :: Sub -> IO ()
cancelSub s =
readTVarIO (subThread s) >>= \case
SubThread t -> liftIO $ deRefWeak t >>= mapM_ killThread
_ -> pure ()
receive :: Transport c => THandleSMP c 'TServer -> Client -> M ()
receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do
@@ -901,23 +901,23 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
Nothing -> do
atomically $ modifyTVar' (qSub stats) (+ 1)
newSub >>= deliver
Just sub ->
readTVarIO sub >>= \case
Sub {subThread = ProhibitSub} -> do
Just s@Sub {subThread} ->
readTVarIO subThread >>= \case
ProhibitSub -> do
-- cannot use SUB in the same connection where GET was used
atomically $ modifyTVar' (qSubProhibited stats) (+ 1)
pure (corrId, rId, ERR $ CMD PROHIBITED)
s -> do
_ -> do
atomically $ modifyTVar' (qSubDuplicate stats) (+ 1)
atomically (tryTakeTMVar $ delivered s) >> deliver sub
atomically (tryTakeTMVar $ delivered s) >> deliver s
where
newSub :: M (TVar Sub)
newSub :: M Sub
newSub = time "SUB newSub" . atomically $ do
writeTQueue subscribedQ (rId, clnt)
sub <- newTVar =<< newSubscription NoSub
sub <- newSubscription NoSub
TM.insert rId sub subscriptions
pure sub
deliver :: TVar Sub -> M (Transmission BrokerMsg)
deliver :: Sub -> M (Transmission BrokerMsg)
deliver sub = do
q <- getStoreMsgQueue "SUB" rId
msg_ <- atomically $ tryPeekMsg q
@@ -928,9 +928,9 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
atomically (TM.lookup queueId subscriptions) >>= \case
Nothing ->
atomically newSub >>= getMessage_
Just sub ->
readTVarIO sub >>= \case
s@Sub {subThread = ProhibitSub} ->
Just s@Sub {subThread} ->
readTVarIO subThread >>= \case
ProhibitSub ->
atomically (tryTakeTMVar $ delivered s)
>> getMessage_ s
-- cannot use GET in the same connection where there is an active subscription
@@ -939,8 +939,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
newSub :: STM Sub
newSub = do
s <- newSubscription ProhibitSub
sub <- newTVar s
TM.insert queueId sub subscriptions
TM.insert queueId s subscriptions
pure s
getMessage_ :: Sub -> M (Transmission BrokerMsg)
getMessage_ s = do
@@ -968,10 +967,10 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
Nothing -> pure $ err NO_MSG
Just sub ->
atomically (getDelivered sub) >>= \case
Just s -> do
Just st -> do
q <- getStoreMsgQueue "ACK" queueId
case s of
Sub {subThread = ProhibitSub} -> do
case st of
ProhibitSub -> do
deletedMsg_ <- atomically $ tryDelMsg q msgId
mapM_ updateStats deletedMsg_
pure ok
@@ -981,12 +980,11 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
deliverMessage "ACK" qr queueId sub msg_
_ -> pure $ err NO_MSG
where
getDelivered :: TVar Sub -> STM (Maybe Sub)
getDelivered sub = do
s@Sub {delivered} <- readTVar sub
getDelivered :: Sub -> STM (Maybe SubscriptionThread)
getDelivered Sub {delivered, subThread} = do
tryTakeTMVar delivered $>>= \msgId' ->
if msgId == msgId' || B.null msgId
then pure $ Just s
then Just <$> readTVar subThread
else putTMVar delivered msgId' $> Nothing
updateStats :: Message -> M ()
updateStats = \case
@@ -1074,37 +1072,36 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
deliverToSub =
TM.lookup rId subscribers
$>>= \rc@Client {subscriptions = subs, sndQ = q} -> TM.lookup rId subs
$>>= \sub -> readTVar sub >>= \case
s@Sub {subThread = NoSub, delivered} ->
$>>= \s@Sub {subThread, delivered} -> readTVar subThread >>= \case
NoSub ->
tryTakeTMVar delivered >>= \case
Just _ -> pure Nothing -- if a message was already delivered, should not deliver more
Nothing ->
ifM
(isFullTBQueue q)
(modifyTVar' sub (\s' -> s' {subThread = SubPending}) $> Just (rc, sub))
(writeTVar subThread SubPending $> Just (rc, s))
(deliver q s $> Nothing)
_ -> pure Nothing
deliver q s = do
let encMsg = encryptMsg qr msg
writeTBQueue q [(CorrId "", rId, MSG encMsg)]
void $ setDelivered s msg
forkDeliver (rc@Client {sndQ = q}, sub) = do
forkDeliver (rc@Client {sndQ = q}, s@Sub {subThread, delivered}) = do
t <- mkWeakThreadId =<< forkIO deliverThread
atomically . modifyTVar' sub $ \case
atomically . modifyTVar' subThread $ \case
-- this case is needed because deliverThread can exit before it
s@Sub {subThread = SubPending} -> s {subThread = SubThread t}
s -> s
SubPending -> SubThread t
st -> st
where
deliverThread = do
labelMyThread $ B.unpack ("client $" <> encode sessionId) <> " deliver/SEND"
time "deliver" . atomically $
whenM (maybe False (sameClientId rc) <$> TM.lookup rId subscribers) $ do
s@Sub {delivered} <- readTVar sub
tryTakeTMVar delivered >>= \case
Just _ -> pure () -- if a message was already delivered, should not deliver more
Nothing -> do
deliver q s
writeTVar sub $! s {subThread = NoSub}
writeTVar subThread NoSub
trySendNotification :: NtfCreds -> Message -> TVar ChaChaDRG -> STM (Maybe Bool)
trySendNotification NtfCreds {notifierId, rcvNtfDhSecret} msg ntfNonceDrg =
@@ -1180,11 +1177,11 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
verified = \case
VRVerified qr -> Right (qr, (corrId', entId', cmd'))
VRFailed -> Left (corrId', entId', ERR AUTH)
deliverMessage :: T.Text -> QueueRec -> RecipientId -> TVar Sub -> Maybe Message -> M (Transmission BrokerMsg)
deliverMessage name qr rId sub msg_ = time (name <> " deliver") . atomically $
readTVar sub >>= \case
Sub {subThread = ProhibitSub} -> pure resp
s -> case msg_ of
deliverMessage :: T.Text -> QueueRec -> RecipientId -> Sub -> Maybe Message -> M (Transmission BrokerMsg)
deliverMessage name qr rId s@Sub {subThread} msg_ = time (name <> " deliver") . atomically $
readTVar subThread >>= \case
ProhibitSub -> pure resp
_ -> case msg_ of
Just msg ->
let encMsg = encryptMsg qr msg
in setDelivered s msg $> (corrId, rId, MSG encMsg)
@@ -1232,9 +1229,9 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
pure QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg}
pure (corrId, queueId, INFO info)
where
mkQSub sub = do
Sub {subThread, delivered} <- readTVar sub
let qSubThread = case subThread of
mkQSub Sub {subThread, delivered} = do
st <- readTVar subThread
let qSubThread = case st of
NoSub -> QNoSub
SubPending -> QSubPending
SubThread _ -> QSubThread

View File

@@ -144,7 +144,7 @@ type ClientId = Int
data Client = Client
{ clientId :: ClientId,
subscriptions :: TMap RecipientId (TVar Sub),
subscriptions :: TMap RecipientId Sub,
ntfSubscriptions :: TMap NotifierId (),
rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)),
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
@@ -163,7 +163,7 @@ data Client = Client
data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId) | ProhibitSub
data Sub = Sub
{ subThread :: SubscriptionThread,
{ subThread :: TVar SubscriptionThread,
delivered :: TMVar MsgId
}
@@ -193,8 +193,9 @@ newClient nextClientId qSize thVersion sessionId createdAt = do
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, procThreads, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt}
newSubscription :: SubscriptionThread -> STM Sub
newSubscription subThread = do
newSubscription st = do
delivered <- newEmptyTMVar
subThread <- newTVar st
return Sub {subThread, delivered}
newEnv :: ServerConfig -> IO Env