mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-26 10:57:27 +00:00
smp server: fix race when client is marked as subscribed after it is disconnected, preventing its GC (#1250)
* smp server: fix race when client is marked as subscribed after it is disconnected, preventing its GC * refactor
This commit is contained in:
@@ -169,23 +169,26 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
M ()
|
||||
serverThread s label subQ subs clientSubs unsub = do
|
||||
labelMyThread label
|
||||
cls <- asks clients
|
||||
forever $
|
||||
atomically updateSubscribers
|
||||
atomically (updateSubscribers cls)
|
||||
$>>= endPreviousSubscriptions
|
||||
>>= liftIO . mapM_ unsub
|
||||
where
|
||||
updateSubscribers :: STM (Maybe (QueueId, Client))
|
||||
updateSubscribers = do
|
||||
updateSubscribers :: TVar (IM.IntMap Client) -> STM (Maybe (QueueId, Client))
|
||||
updateSubscribers cls = do
|
||||
(qId, clnt, subscribed) <- readTQueue $ subQ s
|
||||
current <- IM.member (clientId clnt) <$> readTVar cls
|
||||
let updateSub
|
||||
| subscribed = TM.lookupInsert qId clnt (subs s)
|
||||
| otherwise = TM.lookupDelete qId (subs s)
|
||||
| not subscribed = TM.lookupDelete
|
||||
| not current = TM.lookup -- do not insert client if it is already disconnected, but send END to any other client
|
||||
| otherwise = (`TM.lookupInsert` clnt) -- insert subscribed and current client
|
||||
clientToBeNotified c'
|
||||
| sameClientId clnt c' = pure Nothing
|
||||
| otherwise = do
|
||||
yes <- readTVar $ connected c'
|
||||
pure $ if yes then Just (qId, c') else Nothing
|
||||
updateSub $>>= clientToBeNotified
|
||||
updateSub qId (subs s) $>>= clientToBeNotified
|
||||
endPreviousSubscriptions :: (QueueId, Client) -> M (Maybe s)
|
||||
endPreviousSubscriptions (qId, c) = do
|
||||
forkClient c (label <> ".endPreviousSubscriptions") $
|
||||
|
||||
Reference in New Issue
Block a user