This commit is contained in:
Evgeny Poberezkin
2024-07-09 10:51:45 +01:00
parent 492457303f
commit ba1e551111
4 changed files with 16 additions and 77 deletions
+8 -31
View File
@@ -212,7 +212,7 @@ import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), withTransaction)
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues (getRcvQueues), activeToPendingQueues)
import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues (getRcvQueues))
import qualified Simplex.Messaging.Agent.TRcvQueues as RQ
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
@@ -242,7 +242,6 @@ import Simplex.Messaging.Protocol
QueueIdsKeys (..),
RcvMessage (..),
RcvNtfPublicDhKey,
RecipientId,
SMPMsgMeta (..),
SProtocolType (..),
SenderCanSecure,
@@ -926,36 +925,14 @@ reconnectServerClients c clientsSel =
reconnectSMPServerClients :: AgentClient -> AM' ()
reconnectSMPServerClients c = do
-- 1. swap smpClients to empty map, move active subscriptions to pending
(clients, prevActive) <- atomically $ do
clients <- smpClients c `swapTVar` M.empty
prevActive <- activeToPendingQueues (activeSubs c) (pendingSubs c)
pure (clients, prevActive)
-- 2. notify DOWN for connections that had active subscriptions
let downConns = groupConnsByServer prevActive
forM_ (M.toList downConns) $ \(server, connIds) ->
liftIO $ notifyDOWN server connIds
-- 3. close clients
(clients, qs) <- atomically $ do
clients <- swapTVar (smpClients c) M.empty
qs <- RQ.getDelAllQueues (activeSubs c)
qs' <- RQ.getDelAllQueues (pendingSubs c)
pure (clients, qs <> qs')
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone DOWN_ALL)
mapM_ (liftIO . forkIO . closeClient_ c) clients
-- 4. resubscribe pending subscriptions
mode <- liftIO $ getSessionMode c
pending <- readTVarIO (getRcvQueues $ pendingSubs c)
-- Group transport sessions to avoid multiple UP events in case session mode is TSMUser
let tSessions = queuesToSessions pending mode
forM_ tSessions $ \tSess -> resubscribeSMPSession c tSess
where
groupConnsByServer :: Map (UserId, SMPServer, RecipientId) RcvQueue -> Map SMPServer [ConnId]
groupConnsByServer = foldl' insertConnId M.empty
where
insertConnId :: Map SMPServer [ConnId] -> RcvQueue -> Map SMPServer [ConnId]
insertConnId acc RcvQueue {server, connId} =
M.insertWith (<>) server [connId] acc
notifyDOWN :: SMPServer -> [ConnId] -> IO ()
notifyDOWN server connIds = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone (DOWN server connIds))
queuesToSessions :: Map (UserId, SMPServer, RecipientId) RcvQueue -> TransportSessionMode -> Set SMPTransportSession
queuesToSessions qs mode = case mode of
TSMEntity -> M.foldrWithKey (\(userId, srv, rId) _ acc -> S.insert (userId, srv, Just rId) acc) S.empty qs
TSMUser -> M.foldrWithKey (\(userId, srv, _) _ acc -> S.insert (userId, srv, Nothing) acc) S.empty qs
void $ subscribeQueues c qs
reconnectSMPServer :: AgentClient -> UserId -> SMPServer -> IO ()
reconnectSMPServer c userId srv = do
+3
View File
@@ -338,6 +338,7 @@ data AEvent (e :: AEntity) where
CONNECT :: AProtocolType -> TransportHost -> AEvent AENone
DISCONNECT :: AProtocolType -> TransportHost -> AEvent AENone
DOWN :: SMPServer -> [ConnId] -> AEvent AENone
DOWN_ALL :: AEvent AENone
UP :: SMPServer -> [ConnId] -> AEvent AENone
SWITCH :: QueueDirection -> SwitchPhase -> ConnectionStats -> AEvent AEConn
RSYNC :: RatchetSyncState -> Maybe AgentCryptoError -> ConnectionStats -> AEvent AEConn
@@ -406,6 +407,7 @@ data AEventTag (e :: AEntity) where
CONNECT_ :: AEventTag AENone
DISCONNECT_ :: AEventTag AENone
DOWN_ :: AEventTag AENone
DOWN_ALL_ :: AEventTag AENone
UP_ :: AEventTag AENone
SWITCH_ :: AEventTag AEConn
RSYNC_ :: AEventTag AEConn
@@ -458,6 +460,7 @@ aEventTag = \case
CONNECT {} -> CONNECT_
DISCONNECT {} -> DISCONNECT_
DOWN {} -> DOWN_
DOWN_ALL {} -> DOWN_ALL_
UP {} -> UP_
SWITCH {} -> SWITCH_
RSYNC {} -> RSYNC_
+5 -18
View File
@@ -11,7 +11,7 @@ module Simplex.Messaging.Agent.TRcvQueues
deleteQueue,
getSessQueues,
getDelSessQueues,
activeToPendingQueues,
getDelAllQueues,
qKey,
)
where
@@ -20,7 +20,6 @@ import Control.Concurrent.STM
import Data.Foldable (foldl')
import Data.List.NonEmpty (NonEmpty (..), (<|))
import qualified Data.List.NonEmpty as L
import Data.Map (Map)
import qualified Data.Map.Strict as M
import Simplex.Messaging.Agent.Protocol (ConnId, UserId)
import Simplex.Messaging.Agent.Store (RcvQueue, StoredRcvQueue (..))
@@ -98,22 +97,10 @@ getDelSessQueues tSess (TRcvQueues qs cs) = do
Nothing -> (cId : removed, Nothing)
Nothing -> (removed, Nothing) -- "impossible" in invariant holds, because we get keys from the known queues
-- moves active queues to pending queues and returns queues that were active
activeToPendingQueues :: TRcvQueues -> TRcvQueues -> STM (Map (UserId, SMPServer, RecipientId) RcvQueue)
activeToPendingQueues (TRcvQueues aqs acs) (TRcvQueues pqs pcs) = do
aqs' <- mergeQueues
mergeConns
pure aqs'
where
mergeQueues :: STM (Map (UserId, SMPServer, RecipientId) RcvQueue)
mergeQueues = do
aqs' <- aqs `swapTVar` M.empty
modifyTVar pqs $ \pqs' -> M.union aqs' pqs'
pure aqs'
mergeConns :: STM ()
mergeConns = do
acs' <- acs `swapTVar` M.empty
modifyTVar pcs $ \pcs' -> M.unionWith (<>) acs' pcs'
getDelAllQueues :: TRcvQueues -> STM [RcvQueue]
getDelAllQueues (TRcvQueues qs cs) = do
writeTVar cs M.empty
M.elems <$> swapTVar qs M.empty
isSession :: RcvQueue -> (UserId, SMPServer, Maybe ConnId) -> Bool
isSession rq (uId, srv, connId_) =
-28
View File
@@ -29,7 +29,6 @@ tRcvQueuesTests = do
it "getDelSessQueues" getDelSessQueuesTest
describe "queue transfer" $ do
it "getDelSessQueues-batchAddQueues preserves total length" removeSubsTest
it "activeToPendingQueues" activeToPendingTest
checkDataInvariant :: RQ.TRcvQueues -> IO Bool
checkDataInvariant trq = atomically $ do
@@ -164,33 +163,6 @@ removeSubsTest = do
atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@beta", Just "c3") aq >>= RQ.batchAddQueues pq . fst
atomically (totalSize aq pq) `shouldReturn` (4, 4)
activeToPendingTest :: IO ()
activeToPendingTest = do
aq <- atomically RQ.empty
let qs1 =
[ dummyRQ 0 "smp://1234-w==@alpha" "c1",
dummyRQ 0 "smp://1234-w==@alpha" "c2"
]
atomically $ RQ.batchAddQueues aq qs1
pq <- atomically RQ.empty
let qs2 =
[ dummyRQ 0 "smp://1234-w==@beta" "c3",
dummyRQ 1 "smp://1234-w==@beta" "c4"
]
atomically $ RQ.batchAddQueues pq qs2
atomically (totalSize aq pq) `shouldReturn` (4, 4)
prevActive <- atomically $ RQ.activeToPendingQueues aq pq
atomically (totalSize aq pq) `shouldReturn` (4, 4)
M.keys <$> readTVarIO (RQ.getConnections aq) `shouldReturn` []
M.keys <$> readTVarIO (RQ.getConnections pq) `shouldReturn` ["c1", "c2", "c3", "c4"]
-- M.keys prevActive `shouldMatchList` [(0, "smp://1234-w==@alpha", ""), (0, "smp://1234-w==@alpha", "")]
M.keys prevActive `shouldMatchList` [(0, "smp://1234-w==@alpha", "c1"), (0, "smp://1234-w==@alpha", "c2")]
checkDataInvariant aq `shouldReturn` True
checkDataInvariant pq `shouldReturn` True
totalSize :: RQ.TRcvQueues -> RQ.TRcvQueues -> STM (Int, Int)
totalSize a b = do
qsizeA <- M.size <$> readTVar (RQ.getRcvQueues a)