diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 6340e5c3f..3a74f5def 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -212,7 +212,7 @@ import Simplex.Messaging.Agent.Stats import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), withTransaction) import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB -import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues (getRcvQueues), activeToPendingQueues) +import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues (getRcvQueues)) import qualified Simplex.Messaging.Agent.TRcvQueues as RQ import Simplex.Messaging.Client import qualified Simplex.Messaging.Crypto as C @@ -242,7 +242,6 @@ import Simplex.Messaging.Protocol QueueIdsKeys (..), RcvMessage (..), RcvNtfPublicDhKey, - RecipientId, SMPMsgMeta (..), SProtocolType (..), SenderCanSecure, @@ -926,36 +925,14 @@ reconnectServerClients c clientsSel = reconnectSMPServerClients :: AgentClient -> AM' () reconnectSMPServerClients c = do - -- 1. swap smpClients to empty map, move active subscriptions to pending - (clients, prevActive) <- atomically $ do - clients <- smpClients c `swapTVar` M.empty - prevActive <- activeToPendingQueues (activeSubs c) (pendingSubs c) - pure (clients, prevActive) - -- 2. notify DOWN for connections that had active subscriptions - let downConns = groupConnsByServer prevActive - forM_ (M.toList downConns) $ \(server, connIds) -> - liftIO $ notifyDOWN server connIds - -- 3. close clients + (clients, qs) <- atomically $ do + clients <- swapTVar (smpClients c) M.empty + qs <- RQ.getDelAllQueues (activeSubs c) + qs' <- RQ.getDelAllQueues (pendingSubs c) + pure (clients, qs <> qs') + atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone DOWN_ALL) mapM_ (liftIO . forkIO . closeClient_ c) clients - -- 4. resubscribe pending subscriptions - mode <- liftIO $ getSessionMode c - pending <- readTVarIO (getRcvQueues $ pendingSubs c) - -- Group transport sessions to avoid multiple UP events in case session mode is TSMUser - let tSessions = queuesToSessions pending mode - forM_ tSessions $ \tSess -> resubscribeSMPSession c tSess - where - groupConnsByServer :: Map (UserId, SMPServer, RecipientId) RcvQueue -> Map SMPServer [ConnId] - groupConnsByServer = foldl' insertConnId M.empty - where - insertConnId :: Map SMPServer [ConnId] -> RcvQueue -> Map SMPServer [ConnId] - insertConnId acc RcvQueue {server, connId} = - M.insertWith (<>) server [connId] acc - notifyDOWN :: SMPServer -> [ConnId] -> IO () - notifyDOWN server connIds = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone (DOWN server connIds)) - queuesToSessions :: Map (UserId, SMPServer, RecipientId) RcvQueue -> TransportSessionMode -> Set SMPTransportSession - queuesToSessions qs mode = case mode of - TSMEntity -> M.foldrWithKey (\(userId, srv, rId) _ acc -> S.insert (userId, srv, Just rId) acc) S.empty qs - TSMUser -> M.foldrWithKey (\(userId, srv, _) _ acc -> S.insert (userId, srv, Nothing) acc) S.empty qs + void $ subscribeQueues c qs reconnectSMPServer :: AgentClient -> UserId -> SMPServer -> IO () reconnectSMPServer c userId srv = do diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index b123fc1ec..5bcb7ded3 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -338,6 +338,7 @@ data AEvent (e :: AEntity) where CONNECT :: AProtocolType -> TransportHost -> AEvent AENone DISCONNECT :: AProtocolType -> TransportHost -> AEvent AENone DOWN :: SMPServer -> [ConnId] -> AEvent AENone + DOWN_ALL :: AEvent AENone UP :: SMPServer -> [ConnId] -> AEvent AENone SWITCH :: QueueDirection -> SwitchPhase -> ConnectionStats -> AEvent AEConn RSYNC :: RatchetSyncState -> Maybe AgentCryptoError -> ConnectionStats -> AEvent AEConn @@ -406,6 +407,7 @@ data AEventTag (e :: AEntity) where CONNECT_ :: AEventTag AENone DISCONNECT_ :: AEventTag AENone DOWN_ :: AEventTag AENone + DOWN_ALL_ :: AEventTag AENone UP_ :: AEventTag AENone SWITCH_ :: AEventTag AEConn RSYNC_ :: AEventTag AEConn @@ -458,6 +460,7 @@ aEventTag = \case CONNECT {} -> CONNECT_ DISCONNECT {} -> DISCONNECT_ DOWN {} -> DOWN_ + DOWN_ALL {} -> DOWN_ALL_ UP {} -> UP_ SWITCH {} -> SWITCH_ RSYNC {} -> RSYNC_ diff --git a/src/Simplex/Messaging/Agent/TRcvQueues.hs b/src/Simplex/Messaging/Agent/TRcvQueues.hs index d93de646e..c326e56f5 100644 --- a/src/Simplex/Messaging/Agent/TRcvQueues.hs +++ b/src/Simplex/Messaging/Agent/TRcvQueues.hs @@ -11,7 +11,7 @@ module Simplex.Messaging.Agent.TRcvQueues deleteQueue, getSessQueues, getDelSessQueues, - activeToPendingQueues, + getDelAllQueues, qKey, ) where @@ -20,7 +20,6 @@ import Control.Concurrent.STM import Data.Foldable (foldl') import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L -import Data.Map (Map) import qualified Data.Map.Strict as M import Simplex.Messaging.Agent.Protocol (ConnId, UserId) import Simplex.Messaging.Agent.Store (RcvQueue, StoredRcvQueue (..)) @@ -98,22 +97,10 @@ getDelSessQueues tSess (TRcvQueues qs cs) = do Nothing -> (cId : removed, Nothing) Nothing -> (removed, Nothing) -- "impossible" in invariant holds, because we get keys from the known queues --- moves active queues to pending queues and returns queues that were active -activeToPendingQueues :: TRcvQueues -> TRcvQueues -> STM (Map (UserId, SMPServer, RecipientId) RcvQueue) -activeToPendingQueues (TRcvQueues aqs acs) (TRcvQueues pqs pcs) = do - aqs' <- mergeQueues - mergeConns - pure aqs' - where - mergeQueues :: STM (Map (UserId, SMPServer, RecipientId) RcvQueue) - mergeQueues = do - aqs' <- aqs `swapTVar` M.empty - modifyTVar pqs $ \pqs' -> M.union aqs' pqs' - pure aqs' - mergeConns :: STM () - mergeConns = do - acs' <- acs `swapTVar` M.empty - modifyTVar pcs $ \pcs' -> M.unionWith (<>) acs' pcs' +getDelAllQueues :: TRcvQueues -> STM [RcvQueue] +getDelAllQueues (TRcvQueues qs cs) = do + writeTVar cs M.empty + M.elems <$> swapTVar qs M.empty isSession :: RcvQueue -> (UserId, SMPServer, Maybe ConnId) -> Bool isSession rq (uId, srv, connId_) = diff --git a/tests/CoreTests/TRcvQueuesTests.hs b/tests/CoreTests/TRcvQueuesTests.hs index 9683ba09b..14b894774 100644 --- a/tests/CoreTests/TRcvQueuesTests.hs +++ b/tests/CoreTests/TRcvQueuesTests.hs @@ -29,7 +29,6 @@ tRcvQueuesTests = do it "getDelSessQueues" getDelSessQueuesTest describe "queue transfer" $ do it "getDelSessQueues-batchAddQueues preserves total length" removeSubsTest - it "activeToPendingQueues" activeToPendingTest checkDataInvariant :: RQ.TRcvQueues -> IO Bool checkDataInvariant trq = atomically $ do @@ -164,33 +163,6 @@ removeSubsTest = do atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@beta", Just "c3") aq >>= RQ.batchAddQueues pq . fst atomically (totalSize aq pq) `shouldReturn` (4, 4) -activeToPendingTest :: IO () -activeToPendingTest = do - aq <- atomically RQ.empty - let qs1 = - [ dummyRQ 0 "smp://1234-w==@alpha" "c1", - dummyRQ 0 "smp://1234-w==@alpha" "c2" - ] - atomically $ RQ.batchAddQueues aq qs1 - - pq <- atomically RQ.empty - let qs2 = - [ dummyRQ 0 "smp://1234-w==@beta" "c3", - dummyRQ 1 "smp://1234-w==@beta" "c4" - ] - atomically $ RQ.batchAddQueues pq qs2 - - atomically (totalSize aq pq) `shouldReturn` (4, 4) - - prevActive <- atomically $ RQ.activeToPendingQueues aq pq - atomically (totalSize aq pq) `shouldReturn` (4, 4) - M.keys <$> readTVarIO (RQ.getConnections aq) `shouldReturn` [] - M.keys <$> readTVarIO (RQ.getConnections pq) `shouldReturn` ["c1", "c2", "c3", "c4"] - -- M.keys prevActive `shouldMatchList` [(0, "smp://1234-w==@alpha", ""), (0, "smp://1234-w==@alpha", "")] - M.keys prevActive `shouldMatchList` [(0, "smp://1234-w==@alpha", "c1"), (0, "smp://1234-w==@alpha", "c2")] - checkDataInvariant aq `shouldReturn` True - checkDataInvariant pq `shouldReturn` True - totalSize :: RQ.TRcvQueues -> RQ.TRcvQueues -> STM (Int, Int) totalSize a b = do qsizeA <- M.size <$> readTVar (RQ.getRcvQueues a)