diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 969c5399b..599938ab6 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -136,10 +136,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do expired <- restoreServerMessages restoreServerStats expired raceAny_ - ( serverThread s "server subscribedQ" True subscribedQ subscribers pendingENDs subscriptions cancelSub - : serverThread s "server deletedQ" False deletedQ subscribers pendingDELDs subscriptions cancelSub - : serverThread s "server ntfSubscribedQ" True ntfSubscribedQ Env.notifiers pendingNtfENDs ntfSubscriptions (\_ -> pure ()) - : serverThread s "server ntfDeletedQ" False ntfDeletedQ Env.notifiers pendingNtfDELDs ntfSubscriptions (\_ -> pure ()) + ( serverThread s "server subscribedQ" subscribedQ subscribers subClients pendingSubEvents subscriptions cancelSub + : serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubClients pendingNtfSubEvents ntfSubscriptions (\_ -> pure ()) : sendPendingEvtsThread s : receiveFromProxyAgent pa : map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg @@ -165,14 +163,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do forall s. Server -> String -> - Subscribed -> - (Server -> TQueue (QueueId, ClientId)) -> + (Server -> TQueue (QueueId, ClientId, Subscribed)) -> (Server -> TMap QueueId (TVar Client)) -> - (Server -> TVar (IM.IntMap (NonEmpty RecipientId))) -> + (Server -> TVar (IM.IntMap Client)) -> + (Server -> TVar (IM.IntMap (NonEmpty (QueueId, Subscribed)))) -> (Client -> TMap QueueId s) -> (s -> IO ()) -> M () - serverThread s label subscribed subQ subs pendingEvts clientSubs unsub = do + serverThread s label subQ subs subClnts pendingEvts clientSubs unsub = do labelMyThread label cls <- asks clients liftIO . forever $ @@ -180,32 +178,40 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do $>>= endPreviousSubscriptions >>= mapM_ unsub where - updateSubscribers :: TVar (IM.IntMap (Maybe Client)) -> (QueueId, ClientId) -> STM (Maybe (QueueId, Client)) - updateSubscribers cls (qId, clntId) = + updateSubscribers :: TVar (IM.IntMap (Maybe Client)) -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, Subscribed), Client)) + updateSubscribers cls (qId, clntId, subscribed) = -- Client lookup by ID is in the same STM transaction. -- In case client disconnects during the transaction, -- it will be re-evaluated, and the client won't be stored as subscribed. - (readTVar cls >>= updateSub (subs s) . IM.lookup clntId) + (readTVar cls >>= updateSub . IM.lookup clntId) $>>= clientToBeNotified where - updateSub ss = \case + ss = subs s + updateSub = \case Just (Just clnt) - | subscribed -> + | subscribed -> do + modifyTVar' (subClnts s) $ IM.insert clntId clnt -- add client to server's subscribed cients TM.lookup qId ss >>= -- insert subscribed and current client maybe (newTVar clnt >>= \cv -> TM.insert qId cv ss $> Nothing) (\cv -> Just <$> swapTVar cv clnt) - | otherwise -> TM.lookupDelete qId ss >>= mapM readTVar + | otherwise -> do + removeWhenNoSubs clnt + TM.lookupDelete qId ss >>= mapM readTVar -- This case catches Just Nothing - it cannot happen here. -- Nothing is there only before client thread is started. _ -> TM.lookup qId ss >>= mapM readTVar -- do not insert client if it is already disconnected, but send END to any other client clientToBeNotified c' | clntId == clientId c' = pure Nothing - | otherwise = (\yes -> if yes then Just (qId, c') else Nothing) <$> readTVar (connected c') - endPreviousSubscriptions :: (QueueId, Client) -> IO (Maybe s) - endPreviousSubscriptions (qId, c) = do - atomically $ modifyTVar' (pendingEvts s) $ IM.alter (Just . maybe [qId] (qId <|)) (clientId c) - atomically $ TM.lookupDelete qId (clientSubs c) + | otherwise = (\yes -> if yes then Just ((qId, subscribed), c') else Nothing) <$> readTVar (connected c') + endPreviousSubscriptions :: ((QueueId, Subscribed), Client) -> IO (Maybe s) + endPreviousSubscriptions (qEvt@(qId, _), c) = do + atomically $ modifyTVar' (pendingEvts s) $ IM.alter (Just . maybe [qEvt] (qEvt <|)) (clientId c) + atomically $ do + sub <- TM.lookupDelete qId (clientSubs c) + removeWhenNoSubs c $> sub + -- remove client from server's subscribed cients + removeWhenNoSubs c = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' (subClnts s) $ IM.delete (clientId c) sendPendingEvtsThread :: Server -> M () sendPendingEvtsThread s = do @@ -213,16 +219,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do cls <- asks clients forever $ do threadDelay endInt - sendPending cls END $ pendingENDs s - sendPending cls DELD $ pendingDELDs s - sendPending cls END $ pendingNtfENDs s - sendPending cls DELD $ pendingNtfDELDs s + sendPending cls $ pendingSubEvents s + sendPending cls $ pendingNtfSubEvents s where - sendPending cls evt ref = do + sendPending cls ref = do ends <- atomically $ swapTVar ref IM.empty - unless (null ends) $ forM_ (IM.assocs ends) $ \(cId, qIds) -> - mapM_ (queueEvts qIds evt) . join . IM.lookup cId =<< readTVarIO cls - queueEvts qIds evt c@Client {connected, sndQ = q} = + unless (null ends) $ forM_ (IM.assocs ends) $ \(cId, qEvts) -> + mapM_ (queueEvts qEvts) . join . IM.lookup cId =<< readTVarIO cls + queueEvts qEvts c@Client {connected, sndQ = q} = whenM (readTVarIO connected) $ do sent <- atomically $ ifM (isFullTBQueue q) (pure False) (writeTBQueue q ts $> True) if sent @@ -231,14 +235,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do forkClient c ("sendPendingEvtsThread.queueEvts") $ atomically (writeTBQueue q ts) >> updateEndStats where - ts = L.map (CorrId "",,evt) qIds - updateEndStats = case evt of - END -> do - stats <- asks serverStats - let len = L.length qIds - liftIO $ atomicModifyIORef'_ (qSubEnd stats) (+ len) - liftIO $ atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1)) -- up to 255 ENDs in the batch - _ -> pure () + ts = L.map (\(qId, subscribed) -> (CorrId "", qId, evt subscribed)) qEvts + evt True = END + evt False = DELD + -- this accounts for both END and DELD events + updateEndStats = do + stats <- asks serverStats + let len = L.length qEvts + liftIO $ atomicModifyIORef'_ (qSubEnd stats) (+ len) + liftIO $ atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1)) -- up to 255 ENDs or DELDs in the batch receiveFromProxyAgent :: ProxyAgent -> M () receiveFromProxyAgent ProxyAgent {smpAgent = SMPClientAgent {agentQ}} = @@ -253,16 +258,16 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do showServer' = decodeLatin1 . strEncode . host expireMessagesThread_ :: ServerConfig -> [M ()] - expireMessagesThread_ ServerConfig {messageExpiration = Just msgExp} = [expireMessages msgExp] + expireMessagesThread_ ServerConfig {messageExpiration = Just msgExp} = [expireMessagesThread msgExp] expireMessagesThread_ _ = [] - expireMessages :: ExpirationConfig -> M () - expireMessages expCfg = do + expireMessagesThread :: ExpirationConfig -> M () + expireMessagesThread expCfg = do ms <- asks msgStore quota <- asks $ msgQueueQuota . config let interval = checkInterval expCfg * 1000000 stats <- asks serverStats - labelMyThread "expireMessages" + labelMyThread "expireMessagesThread" forever $ do liftIO $ threadDelay' interval old <- liftIO $ expireBeforeEpoch expCfg @@ -564,7 +569,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do #else hPutStrLn h "Threads: not available on GHC 8.10" #endif - Env {clients, server = Server {subscribers, notifiers}} <- unliftIO u ask + Env {clients, server = Server {subscribers, notifiers, subClients, ntfSubClients}} <- unliftIO u ask activeClients <- readTVarIO clients hPutStrLn h $ "Clients: " <> show (IM.size activeClients) when (r == CPRAdmin) $ do @@ -581,6 +586,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do hPutStrLn h $ "Ntf subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show ntfClQs putActiveClientsInfo "SMP" subscribers putActiveClientsInfo "Ntf" notifiers + putSubscribedClients "SMP" subClients + putSubscribedClients "Ntf" ntfSubClients where putActiveClientsInfo :: String -> TMap QueueId (TVar Client) -> IO () putActiveClientsInfo protoName clients = do @@ -591,6 +598,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do where countSubClients :: M.Map QueueId (TVar Client) -> IO IS.IntSet countSubClients = foldM (\ !s c -> (`IS.insert` s) . clientId <$> readTVarIO c) IS.empty + putSubscribedClients :: String -> TVar (IM.IntMap Client) -> IO () + putSubscribedClients protoName subClnts = do + clnts <- readTVarIO subClnts + hPutStrLn h $ protoName <> " subscribed clients count:" <> show (IM.size clnts) countClientSubs :: (Client -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap (Maybe Client) -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) countClientSubs subSel countSubs_ = foldM addSubs (0, (0, 0, 0, 0), 0, (0, 0, 0)) where @@ -680,11 +691,14 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio expCfg <- asks $ inactiveClientExpiration . config th <- newMVar h -- put TH under a fair lock to interleave messages and command responses labelMyThread . B.unpack $ "client $" <> encode sessionId - raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams c s, receive h c] <> disconnectThread_ c expCfg - disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)] - disconnectThread_ _ _ = [] - noSubscriptions c = atomically $ (&&) <$> TM.null (ntfSubscriptions c) <*> (not . hasSubs <$> readTVar (subscriptions c)) - hasSubs = any $ (\case ServerSub _ -> True; ProhibitSub -> False) . subThread + raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams c s, receive h c] <> disconnectThread_ c s expCfg + disconnectThread_ c s (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c s)] + disconnectThread_ _ _ _ = [] + noSubscriptions Client {clientId} s = do + hasSubs <- IM.member clientId <$> readTVarIO (subClients s) + if hasSubs + then pure False + else not . IM.member clientId <$> readTVarIO (ntfSubClients s) clientDisconnected :: Client -> M () clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connected, sessionId, endThreads} = do @@ -695,10 +709,12 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connecte subs <- atomically $ swapTVar subscriptions M.empty ntfSubs <- atomically $ swapTVar ntfSubscriptions M.empty liftIO $ mapM_ cancelSub subs - Server {subscribers, notifiers} <- asks server + Server {subscribers, notifiers, subClients, ntfSubClients} <- asks server liftIO $ updateSubscribers subs subscribers liftIO $ updateSubscribers ntfSubs notifiers asks clients >>= atomically . (`modifyTVar'` IM.delete clientId) + atomically $ modifyTVar' subClients $ IM.delete clientId + atomically $ modifyTVar' ntfSubClients $ IM.delete clientId tIds <- atomically $ swapTVar endThreads IM.empty liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds where @@ -899,7 +915,7 @@ forkClient Client {endThreads, endThreadSeq} label action = do mkWeakThreadId t >>= atomically . modifyTVar' endThreads . IM.insert tId client :: THandleParams SMPVersion 'TServer -> Client -> Server -> M () -client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} Server {subscribedQ, deletedQ, ntfSubscribedQ, ntfDeletedQ, subscribers, notifiers} = do +client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} Server {subscribedQ, ntfSubscribedQ, subscribers, notifiers} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" forever $ atomically (readTBQueue rcvQ) @@ -1093,7 +1109,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s Right nId_ -> do withLog $ \s -> logAddNotifier s entId ntfCreds incStat . ntfCreated =<< asks serverStats - forM_ nId_ $ \nId -> atomically $ writeTQueue ntfDeletedQ (nId, clientId) + forM_ nId_ $ \nId -> atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False) pure $ NID notifierId rcvPublicDhKey deleteQueueNotifier_ :: QueueStore -> M (Transmission BrokerMsg) @@ -1102,7 +1118,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s liftIO (deleteQueueNotifier st entId) >>= \case Right (Just nId) -> do -- Possibly, the same should be done if the queue is suspended, but currently we do not use it - atomically $ writeTQueue ntfDeletedQ (nId, clientId) + atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False) incStat . ntfDeleted =<< asks serverStats pure ok Right Nothing -> pure ok @@ -1130,7 +1146,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s where newSub :: M Sub newSub = time "SUB newSub" . atomically $ do - writeTQueue subscribedQ (rId, clientId) + writeTQueue subscribedQ (rId, clientId, True) sub <- newSubscription NoSub TM.insert rId sub subscriptions pure sub @@ -1163,6 +1179,10 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s newSub = do s <- newProhibitedSub TM.insert entId s subscriptions + -- Here we don't account for this client as subscribed in the server + -- and don't notify other subscribed clients. + -- This is tracked as "subscription" in the client to prevent these + -- clients from being able to subscribe. pure s getMessage_ :: Sub -> Maybe MsgId -> M (Transmission BrokerMsg) getMessage_ s delivered_ = do @@ -1205,7 +1225,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s pure ok where newSub = do - writeTQueue ntfSubscribedQ (entId, clientId) + writeTQueue ntfSubscribedQ (entId, clientId, True) TM.insert entId () ntfSubscriptions acknowledgeMsg :: QueueRec -> MsgId -> M (Transmission BrokerMsg) @@ -1486,9 +1506,15 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s liftIO (deleteQueue st entId $>>= \q -> delMsgQueue ms entId $> Right q) >>= \case Right q -> do -- Possibly, the same should be done if the queue is suspended, but currently we do not use it - atomically $ writeTQueue deletedQ (entId, clientId) + atomically $ do + writeTQueue subscribedQ (entId, clientId, False) + -- queue is usually deleted by the same client that is currently subscribed, + -- we delete subscription here, so the client with no subscriptions can be disconnected. + TM.delete entId subscriptions forM_ (notifierId <$> notifier q) $ \nId -> - atomically $ writeTQueue ntfDeletedQ (nId, clientId) + -- queue is deleted by a different client from the one subscribed to notifications, + -- so we don't need to remove subscription from the current client. + atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False) updateDeletedStats q pure ok Left e -> pure $ err e diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 8012f5154..9413ef24e 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -136,16 +136,14 @@ data Env = Env type Subscribed = Bool data Server = Server - { subscribedQ :: TQueue (RecipientId, ClientId), - deletedQ :: TQueue (RecipientId, ClientId), + { subscribedQ :: TQueue (RecipientId, ClientId, Subscribed), subscribers :: TMap RecipientId (TVar Client), - ntfSubscribedQ :: TQueue (NotifierId, ClientId), - ntfDeletedQ :: TQueue (NotifierId, ClientId), + ntfSubscribedQ :: TQueue (NotifierId, ClientId, Subscribed), notifiers :: TMap NotifierId (TVar Client), - pendingENDs :: TVar (IntMap (NonEmpty RecipientId)), - pendingDELDs :: TVar (IntMap (NonEmpty RecipientId)), - pendingNtfENDs :: TVar (IntMap (NonEmpty NotifierId)), - pendingNtfDELDs :: TVar (IntMap (NonEmpty NotifierId)), + subClients :: TVar (IntMap Client), -- clients with SMP subscriptions + ntfSubClients :: TVar (IntMap Client), -- clients with Ntf subscriptions + pendingSubEvents :: TVar (IntMap (NonEmpty (RecipientId, Subscribed))), + pendingNtfSubEvents :: TVar (IntMap (NonEmpty (NotifierId, Subscribed))), savingLock :: Lock } @@ -185,17 +183,15 @@ data Sub = Sub newServer :: IO Server newServer = do subscribedQ <- newTQueueIO - deletedQ <- newTQueueIO subscribers <- TM.emptyIO ntfSubscribedQ <- newTQueueIO - ntfDeletedQ <- newTQueueIO notifiers <- TM.emptyIO - pendingENDs <- newTVarIO IM.empty - pendingDELDs <- newTVarIO IM.empty - pendingNtfENDs <- newTVarIO IM.empty - pendingNtfDELDs <- newTVarIO IM.empty + subClients <- newTVarIO IM.empty + ntfSubClients <- newTVarIO IM.empty + pendingSubEvents <- newTVarIO IM.empty + pendingNtfSubEvents <- newTVarIO IM.empty savingLock <- atomically createLock - return Server {subscribedQ, deletedQ, subscribers, ntfSubscribedQ, ntfDeletedQ, notifiers, pendingENDs, pendingDELDs, pendingNtfENDs, pendingNtfDELDs, savingLock} + return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, subClients, ntfSubClients, pendingSubEvents, pendingNtfSubEvents, savingLock} newClient :: ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> IO Client newClient clientId qSize thVersion sessionId createdAt = do