diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index 42632a7a7..b513a3399 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -94,11 +94,11 @@ data NtfEnv = NtfEnv } newNtfServerEnv :: NtfServerConfig -> IO NtfEnv -newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, startOptions} = do +newNtfServerEnv config@NtfServerConfig {pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, startOptions} = do when (compactLog startOptions) $ compactDbStoreLog $ dbStoreLogPath dbStoreConfig random <- C.newRandom store <- newNtfDbStore dbStoreConfig - subscriber <- newNtfSubscriber subQSize smpAgentCfg random + subscriber <- newNtfSubscriber smpAgentCfg random pushServer <- newNtfPushServer pushQSize apnsConfig tlsServerCreds <- loadServerCredential ntfCredentials Fingerprint fp <- loadFingerprint ntfCredentials @@ -121,8 +121,8 @@ data NtfSubscriber = NtfSubscriber type SMPSubscriberVar = SessionVar SMPSubscriber -newNtfSubscriber :: Natural -> SMPClientAgentConfig -> TVar ChaChaDRG -> IO NtfSubscriber -newNtfSubscriber qSize smpAgentCfg random = do +newNtfSubscriber :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO NtfSubscriber +newNtfSubscriber smpAgentCfg random = do smpSubscribers <- TM.emptyIO subscriberSeq <- newTVarIO 0 smpAgent <- newSMPClientAgent smpAgentCfg random diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index db041c4e7..61e7d1981 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -162,8 +162,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt stopServer s liftIO $ exitSuccess raceAny_ - ( serverThread s "server subscribedQ" subscribedQ subscribers subClients pendingSubEvents subscriptions cancelSub - : serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubClients pendingNtfSubEvents ntfSubscriptions (\_ -> pure ()) + ( serverThread "server subscribers" (subscribers s) subscriptions cancelSub + : serverThread "server notifiers" (ntfSubscribers s) ntfSubscriptions (\_ -> pure ()) : deliverNtfsThread s : sendPendingEvtsThread s : receiveFromProxyAgent pa @@ -229,66 +229,65 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt serverThread :: forall s. - Server -> String -> - (Server -> TQueue (QueueId, ClientId, Subscribed)) -> - (Server -> TMap QueueId (TVar AClient)) -> - (Server -> TVar (IM.IntMap AClient)) -> - (Server -> TVar (IM.IntMap (NonEmpty (QueueId, Subscribed)))) -> + ServerSubscribers -> (forall st. Client st -> TMap QueueId s) -> (s -> IO ()) -> M () - serverThread s label subQ subs subClnts pendingEvts clientSubs unsub = do + serverThread label ServerSubscribers {subQ, queueSubscribers = ss, subClients, pendingEvents} clientSubs unsub = do labelMyThread label cls <- asks clients - liftIO . forever $ - (atomically (readTQueue $ subQ s) >>= atomically . updateSubscribers cls) + liftIO . forever $ do + -- Reading clients outside of `updateSubscribers` transaction to avoid transaction re-evaluation on each new connected client. + -- In case client disconnects during the transaction (its `connected` property is read), + -- the transaction will still be re-evaluated, and the client won't be stored as subscribed. + sub@(_, clntId, _) <- atomically $ readTQueue subQ + c_ <- IM.lookup clntId <$> readTVarIO cls + atomically (updateSubscribers c_ sub) $>>= endPreviousSubscriptions >>= mapM_ unsub where - updateSubscribers :: TVar (IM.IntMap (Maybe AClient)) -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, Subscribed), AClient)) - updateSubscribers cls (qId, clntId, subscribed) = - -- Client lookup by ID is in the same STM transaction. - -- In case client disconnects during the transaction, - -- it will be re-evaluated, and the client won't be stored as subscribed. - (readTVar cls >>= updateSub . IM.lookup clntId) - $>>= clientToBeNotified + updateSubscribers :: Maybe AClient -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, BrokerMsg), AClient)) + updateSubscribers c_ (qId, clntId, subscribed) = updateSub $>>= clientToBeNotified where - ss = subs s - updateSub = \case - Just (Just clnt) - | subscribed -> do - modifyTVar' (subClnts s) $ IM.insert clntId clnt -- add client to server's subscribed cients - TM.lookup qId ss >>= -- insert subscribed and current client - maybe - (newTVar clnt >>= \cv -> TM.insert qId cv ss $> Nothing) - (\cv -> Just <$> swapTVar cv clnt) - | otherwise -> do - removeWhenNoSubs clnt - TM.lookupDelete qId ss >>= mapM readTVar - -- This case catches Just Nothing - it cannot happen here. - -- Nothing is there only before client thread is started. - _ -> TM.lookup qId ss >>= mapM readTVar -- do not insert client if it is already disconnected, but send END to any other client + updateSub = case c_ of + Just c@(AClient _ _ c') -> ifM (readTVar $ connected c') (updateSubConnected c) updateSubDisconnected + Nothing -> updateSubDisconnected + updateSubConnected c + | subscribed = do + modifyTVar' subClients $ IS.insert clntId -- add client to server's subscribed cients + TM.lookup qId ss >>= -- insert subscribed and current client + maybe + (newTVar c >>= \cv -> TM.insert qId cv ss $> Nothing) + (\cv -> Just <$> swapTVar cv c) + | otherwise = do + removeWhenNoSubs c + TM.lookupDelete qId ss >>= mapM readTVar + updateSubDisconnected = TM.lookup qId ss >>= mapM readTVar -- do not insert client if it is already disconnected, but send END to any other client clientToBeNotified ac@(AClient _ _ c') | clntId == clientId c' = pure Nothing - | otherwise = (\yes -> if yes then Just ((qId, subscribed), ac) else Nothing) <$> readTVar (connected c') - endPreviousSubscriptions :: ((QueueId, Subscribed), AClient) -> IO (Maybe s) - endPreviousSubscriptions (qEvt@(qId, _), ac@(AClient _ _ c)) = do - atomically $ modifyTVar' (pendingEvts s) $ IM.alter (Just . maybe [qEvt] (qEvt <|)) (clientId c) + | otherwise = (\yes -> if yes then Just ((qId, subEvt), ac) else Nothing) <$> readTVar (connected c') + where + subEvt = if subscribed then END else DELD + endPreviousSubscriptions :: ((QueueId, BrokerMsg), AClient) -> IO (Maybe s) + endPreviousSubscriptions (evt@(qId, _), ac@(AClient _ _ c)) = do + atomically $ modifyTVar' pendingEvents $ IM.alter (Just . maybe [evt] (evt <|)) (clientId c) atomically $ do sub <- TM.lookupDelete qId (clientSubs c) removeWhenNoSubs ac $> sub -- remove client from server's subscribed cients - removeWhenNoSubs (AClient _ _ c) = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' (subClnts s) $ IM.delete (clientId c) + removeWhenNoSubs (AClient _ _ c) = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' subClients $ IS.delete (clientId c) deliverNtfsThread :: Server -> M () - deliverNtfsThread Server {ntfSubClients} = do + deliverNtfsThread Server {ntfSubscribers} = do ntfInt <- asks $ ntfDeliveryInterval . config NtfStore ns <- asks ntfStore stats <- asks serverStats + cls <- asks clients liftIO $ forever $ do threadDelay ntfInt - readTVarIO ntfSubClients >>= mapM_ (deliverNtfs ns stats) + cIds <- IS.toList <$> readTVarIO (subClients ntfSubscribers) + forM_ cIds $ \cId -> mapM_ (deliverNtfs ns stats) . IM.lookup cId =<< readTVarIO cls where deliverNtfs ns stats (AClient _ _ Client {clientId, ntfSubscriptions, sndQ, connected}) = whenM (currentClient readTVarIO) $ do @@ -308,7 +307,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt writeTBQueue sndQ ts pure $ length ts_ currentClient :: Monad m => (forall a. TVar a -> m a) -> m Bool - currentClient rd = (&&) <$> rd connected <*> (IM.member clientId <$> rd ntfSubClients) + currentClient rd = (&&) <$> rd connected <*> (IS.member clientId <$> rd (subClients ntfSubscribers)) addNtfs :: [Transmission BrokerMsg] -> (NotifierId, TVar [MsgNtf]) -> STM [Transmission BrokerMsg] addNtfs acc (nId, v) = readTVar v >>= \case @@ -324,34 +323,32 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt atomicModifyIORef'_ (msgNtfsB stats) (+ (len `div` 80 + 1)) -- up to 80 NMSG in the batch sendPendingEvtsThread :: Server -> M () - sendPendingEvtsThread s = do + sendPendingEvtsThread Server {subscribers, ntfSubscribers} = do endInt <- asks $ pendingENDInterval . config cls <- asks clients forever $ do threadDelay endInt - sendPending cls $ pendingSubEvents s - sendPending cls $ pendingNtfSubEvents s + sendPending cls subscribers + sendPending cls ntfSubscribers where - sendPending cls ref = do - ends <- atomically $ swapTVar ref IM.empty - unless (null ends) $ forM_ (IM.assocs ends) $ \(cId, qEvts) -> - mapM_ (queueEvts qEvts) . join . IM.lookup cId =<< readTVarIO cls - queueEvts qEvts (AClient _ _ c@Client {connected, sndQ = q}) = + sendPending cls ServerSubscribers {pendingEvents} = do + ends <- atomically $ swapTVar pendingEvents IM.empty + unless (null ends) $ forM_ (IM.assocs ends) $ \(cId, evts) -> + mapM_ (enqueueEvts evts) . IM.lookup cId =<< readTVarIO cls + enqueueEvts evts (AClient _ _ c@Client {connected, sndQ = q}) = whenM (readTVarIO connected) $ do sent <- atomically $ tryWriteTBQueue q ts if sent then updateEndStats else -- if queue is full it can block - forkClient c ("sendPendingEvtsThread.queueEvts") $ + forkClient c ("sendPendingEvtsThread.enqueueEvts") $ atomically (writeTBQueue q ts) >> updateEndStats where - ts = L.map (\(qId, subscribed) -> (CorrId "", qId, evt subscribed)) qEvts - evt True = END - evt False = DELD + ts = L.map (\(qId, evt) -> (CorrId "", qId, evt)) evts -- this accounts for both END and DELD events updateEndStats = do stats <- asks serverStats - let len = L.length qEvts + let len = L.length evts when (len > 0) $ liftIO $ do atomicModifyIORef'_ (qSubEnd stats) (+ len) atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1)) -- up to 255 ENDs or DELDs in the batch @@ -581,7 +578,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount, rtsOptions} getRealTimeMetrics :: Env -> IO RealTimeMetrics - getRealTimeMetrics Env {clients, sockets, msgStore = AMS _ _ ms, server = Server {subscribers, notifiers, subClients, ntfSubClients}} = do + getRealTimeMetrics Env {clients, sockets, msgStore = AMS _ _ ms, server = Server {subscribers, ntfSubscribers}} = do socketStats <- mapM (traverse getSocketStats) =<< readTVarIO sockets #if MIN_VERSION_base(4,18,0) threadsCount <- length <$> listThreads @@ -589,10 +586,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt let threadsCount = 0 #endif clientsCount <- IM.size <$> readTVarIO clients - smpSubsCount <- M.size <$> readTVarIO subscribers - smpSubClientsCount <- IM.size <$> readTVarIO subClients - ntfSubsCount <- M.size <$> readTVarIO notifiers - ntfSubClientsCount <- IM.size <$> readTVarIO ntfSubClients + smpSubsCount <- M.size <$> readTVarIO (queueSubscribers subscribers) + smpSubClientsCount <- IS.size <$> readTVarIO (subClients subscribers) + ntfSubsCount <- M.size <$> readTVarIO (queueSubscribers ntfSubscribers) + ntfSubClientsCount <- IS.size <$> readTVarIO (subClients ntfSubscribers) loadedCounts <- loadedQueueCounts ms pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount, loadedCounts} @@ -655,7 +652,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt CPClients -> withAdminRole $ do active <- unliftIO u (asks clients) >>= readTVarIO hPutStrLn h "clientId,sessionId,connected,createdAt,rcvActiveAt,sndActiveAt,age,subscriptions" - forM_ (IM.toList active) $ \(cid, cl) -> forM_ cl $ \(AClient _ _ Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions}) -> do + forM_ (IM.toList active) $ \(cid, (AClient _ _ Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions})) -> do connected' <- bshow <$> readTVarIO connected rcvActiveAt' <- strEncode <$> readTVarIO rcvActiveAt sndActiveAt' <- strEncode <$> readTVarIO sndActiveAt @@ -767,7 +764,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt #else hPutStrLn h "Threads: not available on GHC 8.10" #endif - Env {clients, server = Server {subscribers, notifiers, subClients, ntfSubClients}} <- unliftIO u ask + Env {clients, server = Server {subscribers, ntfSubscribers}} <- unliftIO u ask activeClients <- readTVarIO clients hPutStrLn h $ "Clients: " <> show (IM.size activeClients) when (r == CPRAdmin) $ do @@ -782,10 +779,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt hPutStrLn h $ "Ntf subscriptions (via clients): " <> show ntfSubCnt hPutStrLn h $ "Ntf subscribed clients (via clients): " <> show ntfClCnt hPutStrLn h $ "Ntf subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show ntfClQs - putActiveClientsInfo "SMP" subscribers False - putActiveClientsInfo "Ntf" notifiers True - putSubscribedClients "SMP" subClients False - putSubscribedClients "Ntf" ntfSubClients True + putActiveClientsInfo "SMP" (queueSubscribers subscribers) False + putActiveClientsInfo "Ntf" (queueSubscribers ntfSubscribers) True + putSubscribedClients "SMP" (subClients subscribers) False + putSubscribedClients "Ntf" (subClients ntfSubscribers) True where putActiveClientsInfo :: String -> TMap QueueId (TVar AClient) -> Bool -> IO () putActiveClientsInfo protoName clients showIds = do @@ -796,16 +793,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt where countSubClients :: M.Map QueueId (TVar AClient) -> IO IS.IntSet countSubClients = foldM (\ !s c -> (`IS.insert` s) . clientId' <$> readTVarIO c) IS.empty - putSubscribedClients :: String -> TVar (IM.IntMap AClient) -> Bool -> IO () + putSubscribedClients :: String -> TVar IS.IntSet -> Bool -> IO () putSubscribedClients protoName subClnts showIds = do clnts <- readTVarIO subClnts - hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IM.size clnts) <> (if showIds then " " <> show (IM.keys clnts) else "") - countClientSubs :: (forall s. Client s -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap (Maybe AClient) -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) + hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IS.size clnts) <> (if showIds then " " <> show clnts else "") + countClientSubs :: (forall s. Client s -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) countClientSubs subSel countSubs_ = foldM addSubs (0, (0, 0, 0, 0), 0, (0, 0, 0)) where - addSubs :: (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) -> Maybe AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) - addSubs acc Nothing = pure acc - addSubs (!subCnt, cnts@(!c1, !c2, !c3, !c4), !clCnt, !qs) (Just acl@(AClient _ _ cl)) = do + addSubs :: (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) -> AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) + addSubs (!subCnt, cnts@(!c1, !c2, !c3, !c4), !clCnt, !qs) acl@(AClient _ _ cl) = do subs <- readTVarIO $ subSel cl cnts' <- case countSubs_ of Nothing -> pure cnts @@ -816,8 +812,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt clCnt' = if cnt == 0 then clCnt else clCnt + 1 qs' <- if cnt == 0 then pure qs else addQueueLengths qs acl pure (subCnt + cnt, cnts', clCnt', qs') - clientTBQueueLengths' :: Foldable t => t (Maybe AClient) -> IO (Natural, Natural, Natural) - clientTBQueueLengths' = foldM (\acc -> maybe (pure acc) (addQueueLengths acc)) (0, 0, 0) + clientTBQueueLengths' :: Foldable t => t AClient -> IO (Natural, Natural, Natural) + clientTBQueueLengths' = foldM addQueueLengths (0, 0, 0) addQueueLengths (!rl, !sl, !ml) (AClient _ _ cl) = do (rl', sl', ml') <- queueLengths cl pure (rl + rl', sl + sl', ml + ml') @@ -896,30 +892,35 @@ runClientTransport :: Transport c => THandleSMP c 'TServer -> M () runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessionId}} = do q <- asks $ tbqSize . config ts <- liftIO getSystemTime - active <- asks clients nextClientId <- asks clientSeq clientId <- atomically $ stateTVar nextClientId $ \next -> (next, next + 1) - atomically $ modifyTVar' active $ IM.insert clientId Nothing AMS qt mt ms <- asks msgStore c <- liftIO $ newClient qt mt clientId q thVersion sessionId ts - runClientThreads qt mt ms active c clientId `finally` clientDisconnected c + runClientThreads qt mt ms c clientId `finally` clientDisconnected c where - runClientThreads :: MsgStoreClass (MsgStore qs ms) => SQSType qs -> SMSType ms -> MsgStore qs ms -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore qs ms) -> IS.Key -> M () - runClientThreads qt mt ms active c clientId = do - atomically $ modifyTVar' active $ IM.insert clientId $ Just (AClient qt mt c) - s <- asks server - expCfg <- asks $ inactiveClientExpiration . config - th <- newMVar h -- put TH under a fair lock to interleave messages and command responses - labelMyThread . B.unpack $ "client $" <> encode sessionId - raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams s ms c, receive h ms c] <> disconnectThread_ c s expCfg + runClientThreads :: MsgStoreClass (MsgStore qs ms) => SQSType qs -> SMSType ms -> MsgStore qs ms -> Client (MsgStore qs ms) -> IS.Key -> M () + runClientThreads qt mt ms c clientId = do + cls <- asks clients + ok <- + atomically $ do + ifM + (readTVar $ connected c) + (True <$ modifyTVar' cls (IM.insert clientId $ AClient qt mt c)) + (pure False) + when ok $ do + s <- asks server + expCfg <- asks $ inactiveClientExpiration . config + th <- newMVar h -- put TH under a fair lock to interleave messages and command responses + labelMyThread . B.unpack $ "client $" <> encode sessionId + raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams s ms c, receive h ms c] <> disconnectThread_ c s expCfg disconnectThread_ :: Client s -> Server -> Maybe ExpirationConfig -> [M ()] disconnectThread_ c s (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c s)] disconnectThread_ _ _ _ = [] - noSubscriptions Client {clientId} s = do - hasSubs <- IM.member clientId <$> readTVarIO (subClients s) + noSubscriptions Client {clientId} Server {subscribers, ntfSubscribers} = do + hasSubs <- IS.member clientId <$> readTVarIO (subClients subscribers) if hasSubs then pure False - else not . IM.member clientId <$> readTVarIO (ntfSubClients s) + else not . IS.member clientId <$> readTVarIO (subClients ntfSubscribers) clientDisconnected :: Client s -> M () clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connected, sessionId, endThreads} = do @@ -931,12 +932,12 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connecte ntfSubs <- atomically $ swapTVar ntfSubscriptions M.empty liftIO $ mapM_ cancelSub subs whenM (asks serverActive >>= readTVarIO) $ do - Server {subscribers, notifiers, subClients, ntfSubClients} <- asks server - liftIO $ updateSubscribers subs subscribers - liftIO $ updateSubscribers ntfSubs notifiers + Server {subscribers, ntfSubscribers} <- asks server + liftIO $ updateSubscribers subs $ queueSubscribers subscribers + liftIO $ updateSubscribers ntfSubs $ queueSubscribers ntfSubscribers asks clients >>= atomically . (`modifyTVar'` IM.delete clientId) - atomically $ modifyTVar' subClients $ IM.delete clientId - atomically $ modifyTVar' ntfSubClients $ IM.delete clientId + atomically $ modifyTVar' (subClients subscribers) $ IS.delete clientId + atomically $ modifyTVar' (subClients ntfSubscribers) $ IS.delete clientId tIds <- atomically $ swapTVar endThreads IM.empty liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds where @@ -1151,7 +1152,7 @@ forkClient Client {endThreads, endThreadSeq} label action = do client :: forall s. MsgStoreClass s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M () client thParams' - Server {subscribedQ, ntfSubscribedQ, subscribers} + Server {subscribers, ntfSubscribers} ms clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" @@ -1372,7 +1373,7 @@ client Left e -> pure $ ERR e Right nId_ -> do incStat . ntfCreated =<< asks serverStats - forM_ nId_ $ \nId -> atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False) + forM_ nId_ $ \nId -> atomically $ writeTQueue (subQ ntfSubscribers) (nId, clientId, False) pure $ NID notifierId rcvPublicDhKey deleteQueueNotifier_ :: StoreQueue s -> M (Transmission BrokerMsg) @@ -1383,7 +1384,7 @@ client stats <- asks serverStats deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId) when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted) - atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False) + atomically $ writeTQueue (subQ ntfSubscribers) (nId, clientId, False) incStat $ ntfDeleted stats pure ok Right Nothing -> pure ok @@ -1410,7 +1411,7 @@ client rId = recipientId q newSub :: M Sub newSub = time "SUB newSub" . atomically $ do - writeTQueue subscribedQ (rId, clientId, True) + writeTQueue (subQ subscribers) (rId, clientId, True) sub <- newSubscription NoSub TM.insert rId sub subscriptions pure sub @@ -1486,7 +1487,7 @@ client pure ok where newSub = do - writeTQueue ntfSubscribedQ (entId, clientId, True) + writeTQueue (subQ ntfSubscribers) (entId, clientId, True) TM.insert entId () ntfSubscriptions acknowledgeMsg :: MsgId -> StoreQueue s -> QueueRec -> M (Transmission BrokerMsg) @@ -1522,7 +1523,7 @@ client incStat $ msgRecv stats if isGet then incStat $ msgRecvGet stats - else pure () -- TODO skip notification delivery for delivered message + else pure () -- TODO skip notification delivery for delivered message -- skipping delivery fails tests, it should be counted in msgNtfSkipped -- forM_ (notifierId <$> notifier qr) $ \nId -> do -- ns <- asks ntfStore @@ -1597,7 +1598,7 @@ client tryDeliverMessage msg = -- the subscription is checked outside of STM to avoid transaction cost -- in case no client is subscribed. - whenM (TM.memberIO rId subscribers) $ + whenM (TM.memberIO rId $ queueSubscribers subscribers) $ atomically deliverToSub >>= mapM_ forkDeliver where rId = recipientId q @@ -1606,7 +1607,7 @@ client -- so that if subscription ends, it re-evalutates -- and delivery is cancelled - -- the new client will receive message in response to SUB. - (TM.lookup rId subscribers >>= mapM readTVar) + (TM.lookup rId (queueSubscribers subscribers) >>= mapM readTVar) $>>= \rc@(AClient _ _ Client {subscriptions = subs, sndQ = sndQ'}) -> TM.lookup rId subs $>>= \s@Sub {subThread, delivered} -> case subThread of ProhibitSub -> pure Nothing @@ -1635,7 +1636,7 @@ client labelMyThread $ B.unpack ("client $" <> encode sessionId) <> " deliver/SEND" -- lookup can be outside of STM transaction, -- as long as the check that it is the same client is inside. - TM.lookupIO rId subscribers >>= mapM_ deliverIfSame + TM.lookupIO rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame deliverIfSame rc' = time "deliver" . atomically $ whenM (sameClientId rc <$> readTVar rc') $ tryTakeTMVar delivered >>= \case @@ -1750,7 +1751,7 @@ client Right qr -> do -- Possibly, the same should be done if the queue is suspended, but currently we do not use it atomically $ do - writeTQueue subscribedQ (entId, clientId, False) + writeTQueue (subQ subscribers) (entId, clientId, False) -- queue is usually deleted by the same client that is currently subscribed, -- we delete subscription here, so the client with no subscriptions can be disconnected. TM.delete entId subscriptions @@ -1760,7 +1761,7 @@ client stats <- asks serverStats deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId) when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted) - atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False) + atomically $ writeTQueue (subQ ntfSubscribers) (nId, clientId, False) updateDeletedStats qr pure ok Left e -> pure $ err e @@ -1985,7 +1986,7 @@ restoreServerNtfs = renameFile f $ f <> ".bak" let NtfStore ns' = ns storedQueues <- M.size <$> readTVarIO ns' - logNote $ "notifications restored, " <> tshow lineCount <> " lines processed" + logNote $ "notifications restored, " <> tshow lineCount <> " lines processed" pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues} where restoreNtf :: NtfStore -> Int64 -> (Int, Int, Int) -> LB.ByteString -> ExceptT String IO (Int, Int, Int) diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 8895ba8ed..e1e241dd1 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -18,7 +18,47 @@ #endif {-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} -module Simplex.Messaging.Server.Env.STM where +module Simplex.Messaging.Server.Env.STM + ( ServerConfig (..), + ServerStoreCfg (..), + AServerStoreCfg (..), + StorePaths (..), + StartOptions (..), + Env (..), + Server (..), + ServerSubscribers (..), + ProxyAgent (..), + Client (..), + AClient (..), + ClientId, + Subscribed, + Sub (..), + ServerSub (..), + SubscriptionThread (..), + MsgStore, + AMsgStore (..), + AStoreType (..), + newEnv, + mkJournalStoreConfig, + newClient, + clientId', + newSubscription, + newProhibitedSub, + defaultMsgQueueQuota, + defMsgExpirationDays, + defNtfExpirationHours, + defaultMessageExpiration, + defaultNtfExpiration, + defaultInactiveClientExpiration, + defaultProxyClientConcurrency, + defaultMaxJournalMsgCount, + defaultMaxJournalStateLines, + defaultIdleQueueInterval, + journalMsgStoreDepth, + readWriteQueueStore, + noPostgresExit, + ) +where import Control.Concurrent (ThreadId) import Control.Logger.Simple @@ -29,6 +69,8 @@ import Data.ByteString.Char8 (ByteString) import Data.Int (Int64) import Data.IntMap.Strict (IntMap) import qualified Data.IntMap.Strict as IM +import Data.IntSet (IntSet) +import qualified Data.IntSet as IS import Data.Kind (Constraint) import Data.List (intercalate) import Data.List.NonEmpty (NonEmpty) @@ -203,7 +245,7 @@ data Env = Env serverStats :: ServerStats, sockets :: TVar [(ServiceName, SocketState)], clientSeq :: TVar ClientId, - clients :: TVar (IntMap (Maybe AClient)), + clients :: TVar (IntMap AClient), proxyAgent :: ProxyAgent -- senders served on this proxy } @@ -236,17 +278,18 @@ data AMsgStore = type Subscribed = Bool data Server = Server - { subscribedQ :: TQueue (RecipientId, ClientId, Subscribed), - subscribers :: TMap RecipientId (TVar AClient), - ntfSubscribedQ :: TQueue (NotifierId, ClientId, Subscribed), - notifiers :: TMap NotifierId (TVar AClient), - subClients :: TVar (IntMap AClient), -- clients with SMP subscriptions - ntfSubClients :: TVar (IntMap AClient), -- clients with Ntf subscriptions - pendingSubEvents :: TVar (IntMap (NonEmpty (RecipientId, Subscribed))), - pendingNtfSubEvents :: TVar (IntMap (NonEmpty (NotifierId, Subscribed))), + { subscribers :: ServerSubscribers, + ntfSubscribers :: ServerSubscribers, savingLock :: Lock } +data ServerSubscribers = ServerSubscribers + { subQ :: TQueue (QueueId, ClientId, Subscribed), + queueSubscribers :: TMap QueueId (TVar AClient), + subClients :: TVar IntSet, + pendingEvents :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg))) + } + newtype ProxyAgent = ProxyAgent { smpAgent :: SMPClientAgent } @@ -288,16 +331,18 @@ data Sub = Sub newServer :: IO Server newServer = do - subscribedQ <- newTQueueIO - subscribers <- TM.emptyIO - ntfSubscribedQ <- newTQueueIO - notifiers <- TM.emptyIO - subClients <- newTVarIO IM.empty - ntfSubClients <- newTVarIO IM.empty - pendingSubEvents <- newTVarIO IM.empty - pendingNtfSubEvents <- newTVarIO IM.empty + subscribers <- newServerSubscribers + ntfSubscribers <- newServerSubscribers savingLock <- createLockIO - return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, subClients, ntfSubClients, pendingSubEvents, pendingNtfSubEvents, savingLock} + return Server {subscribers, ntfSubscribers, savingLock} + +newServerSubscribers :: IO ServerSubscribers +newServerSubscribers = do + subQ <- newTQueueIO + queueSubscribers <- TM.emptyIO + subClients <- newTVarIO IS.empty + pendingEvents <- newTVarIO IM.empty + pure ServerSubscribers {subQ, queueSubscribers, subClients, pendingEvents} newClient :: SQSType qs -> SMSType ms -> ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> IO (Client (MsgStore qs ms)) newClient _ _ clientId qSize thVersion sessionId createdAt = do diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index 2c3ba40d4..054692d3a 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -61,7 +61,7 @@ import Data.Time.Clock.System (systemToUTCTime) import qualified Database.PostgreSQL.Simple as PSQL import NtfClient import SMPAgentClient (agentCfg, initAgentServers, initAgentServers2, testDB, testDB2, testNtfServer, testNtfServer2) -import SMPClient (cfgMS, cfgJ2QS, cfgVPrev, ntfTestPort, ntfTestPort2, serverStoreConfig, testPort, testPort2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn, xit'') +import SMPClient (cfgMS, cfgJ2QS, cfgVPrev, ntfTestPort, ntfTestPort2, serverStoreConfig, testPort, testPort2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn) import Simplex.Messaging.Agent hiding (createConnection, joinConnection, sendMessage) import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..), withStore') import Simplex.Messaging.Agent.Env.SQLite (AgentConfig, Env (..), InitialAgentServers)