use new session subscriptions data

This commit is contained in:
Evgeny Poberezkin
2025-10-05 22:03:07 +01:00
parent 78c340ecaa
commit 0b45f7c00f
4 changed files with 53 additions and 91 deletions

View File

@@ -202,6 +202,7 @@ import qualified Data.ByteString.Base64 as B64
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Composition ((.:))
import Data.Containers.ListUtils (nubOrd)
import Data.Either (isRight, partitionEithers)
import Data.Functor (($>))
import Data.Int (Int64)
@@ -218,7 +219,6 @@ import Data.Text.Encoding
import Data.Time (UTCTime, addUTCTime, defaultTimeLocale, formatTime, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Data.Word (Word16)
import GHC.Conc (unsafeIOToSTM)
import Network.Socket (HostName)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError)
import qualified Simplex.FileTransfer.Client as X
@@ -238,8 +238,6 @@ import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction)
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Agent.TSessionSubs (TSessionSubs)
import qualified Simplex.Messaging.Agent.TSessionSubs as SS
import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues)
import qualified Simplex.Messaging.Agent.TRcvQueues as RQ
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
@@ -339,8 +337,8 @@ data AgentClient = AgentClient
userNetworkInfo :: TVar UserNetworkInfo,
userNetworkUpdated :: TVar (Maybe UTCTime),
subscrConns :: TVar (Set ConnId),
activeSubs :: TRcvQueues (SessionId, RcvQueueSub),
pendingSubs :: TRcvQueues RcvQueueSub,
-- activeSubs :: TRcvQueues (SessionId, RcvQueueSub),
-- pendingSubs :: TRcvQueues RcvQueueSub,
currentSubs :: TSessionSubs,
removedSubs :: TMap (UserId, SMPServer, SMP.RecipientId) SMPClientError,
workerSeq :: TVar Int,
@@ -508,8 +506,6 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai
userNetworkInfo <- newTVarIO $ UserNetworkInfo UNOther True
userNetworkUpdated <- newTVarIO Nothing
subscrConns <- newTVarIO S.empty
activeSubs <- RQ.empty
pendingSubs <- RQ.empty
currentSubs <- SS.emptyIO
removedSubs <- TM.emptyIO
workerSeq <- newTVarIO 0
@@ -548,8 +544,8 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai
userNetworkInfo,
userNetworkUpdated,
subscrConns,
activeSubs,
pendingSubs,
-- activeSubs,
-- pendingSubs,
currentSubs,
removedSubs,
workerSeq,
@@ -718,26 +714,20 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess
-- because we can have a race condition when a new current client could have already
-- made subscriptions active, and the old client would be processing diconnection later.
removeClientAndSubs :: IO ([RcvQueueSub], [ConnId])
removeClientAndSubs = do
(qs, cs, subs) <- atomically $ do
removeSessVar v tSess smpClients
ifM (readTVar active) removeSubs (pure ([], [], M.empty))
-- TODO [subs] remove logs
when (M.keysSet subs /= S.fromList (map queueId qs)) $ error $ "setSubsPending different queues: " <> show (S.size $ M.keysSet subs) <> " " <> show (S.size $ S.fromList $ map queueId qs)
when (S.fromList (map qConnId $ M.elems subs) /= S.fromList (map qConnId qs)) $ error $ "setSubsPending different connections: " <> show (S.size (S.fromList $ map qConnId $ M.elems subs)) <> " " <> show (S.size $ S.fromList $ map qConnId qs)
pure (qs, cs)
removeClientAndSubs = atomically $ do
removeSessVar v tSess smpClients
ifM (readTVar active) removeSubs (pure ([], []))
where
sessId = sessionId $ thParams client
removeSubs = do
(qs, cs) <- RQ.getDelSessQueues tSess sessId $ activeSubs c
RQ.batchAddQueues qs $ pendingSubs c
-- TODO [subs]
mode <- getSessionMode c
subs <- SS.setSubsPending mode tSess sessId $ currentSubs c
let qs = M.elems subs
cs = nubOrd $ map qConnId qs
-- this removes proxied relays that this client created sessions to
destSrvs <- M.keys <$> readTVar prs
forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, cId) smpProxiedRelays
pure (qs, cs, subs)
pure (qs, cs)
serverDown :: ([RcvQueueSub], [ConnId]) -> IO ()
serverDown (qs, conns) = whenM (readTVarIO active) $ do
@@ -758,30 +748,20 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
where
getWorkerVar ts =
ifM
(not <$> hasQueues)
(not <$> SS.hasPendingSubs tSess (currentSubs c))
(pure Nothing) -- prevent race with cleanup and adding pending queues in another call
(Just <$> getSessVar workerSeq tSess smpSubWorkers ts)
where
hasQueues = do
-- TODO [subs]
yes <- RQ.hasSessQueues tSess (pendingSubs c)
yes' <- SS.hasPendingSubs tSess $ currentSubs c
when (yes /= yes') $ unsafeIOToSTM $ error "hasPendingSubs different result"
pure yes
newSubWorker v = do
a <- async $ void (E.tryAny runSubWorker) >> atomically (cleanup v)
atomically $ putTMVar (sessionVar v) a
runSubWorker = do
ri <- asks $ reconnectInterval . config
withRetryForeground ri isForeground (isNetworkOnline c) $ \_ loop -> do
pending <- liftIO $ RQ.getSessQueues tSess $ pendingSubs c
-- TODO [subs]
subs <- atomically $ SS.getPendingSubs tSess $ currentSubs c
when (M.keysSet subs /= S.fromList (map queueId pending)) $ error "getPendingSubs different queues"
unless (null pending) $ do
pending <- atomically $ SS.getPendingSubs tSess $ currentSubs c
unless (M.null pending) $ do
liftIO $ waitUntilForeground c
liftIO $ waitForUserNetwork c
handleNotify $ resubscribeSessQueues c tSess pending
handleNotify $ resubscribeSessQueues c tSess $ M.elems pending
loop
isForeground = (ASForeground ==) <$> readTVar (agentState c)
cleanup :: SessionVar (Async ()) -> STM ()
@@ -937,8 +917,6 @@ closeAgentClient c = do
atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect
clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst)
clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker
atomically $ RQ.clear $ activeSubs c
atomically $ RQ.clear $ pendingSubs c
atomically $ SS.clear $ currentSubs c
clear subscrConns
clear getMsgLocks
@@ -1528,10 +1506,7 @@ subscribeQueues c qs withEvents = do
when (withEvents && not (null errs)) $ notifySub c "" $ ERRS $ map (first qConnId) errs
pure $ map (second Left) errs <> concatMap L.toList rs
where
addPendingSubs (tSess, qs') = atomically $ do
let qs'' = L.toList qs'
RQ.batchAddQueues qs'' $ pendingSubs c
SS.batchAddPendingSubs qs'' tSess $ currentSubs c
addPendingSubs (tSess, qs') = atomically $ SS.batchAddPendingSubs (L.toList qs') tSess $ currentSubs c
subscribeQueues_ qs'@(tSess@(_, srv, _), _) = do
(rs, active) <- subscribeSessQueues_ c qs' withEvents
if active
@@ -1572,12 +1547,7 @@ subscribeSessQueues_ c qs withEvents = sendClientBatch_ "SUB" False subscribeQue
rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs'
cs_ <-
if withEvents
then do
cs <- liftIO $ RQ.getSessConns tSess $ activeSubs c
-- TODO [subs]
subs <- atomically $ SS.getActiveSubs tSess $ currentSubs c
when (S.fromList (map qConnId $ M.elems subs) /= cs) $ error "getActiveSubs different connections"
pure $ Just cs
then Just . S.fromList . map qConnId . M.elems <$> atomically (SS.getActiveSubs tSess $ currentSubs c)
else pure Nothing
active <-
atomically $
@@ -1650,25 +1620,18 @@ sendBatch smpCmdFunc smp nm qs = L.zip qs <$> smpCmdFunc smp nm (L.map queueCred
addSubscription :: AgentClient -> SessionId -> RcvQueueSub -> STM ()
addSubscription c sessId rq = do
modifyTVar' (subscrConns c) $ S.insert $ qConnId rq
RQ.addSessQueue (sessId, rq) $ activeSubs c
RQ.deleteQueue rq $ pendingSubs c
-- TODO [subs]
tSess <- mkSMPTransportSession c rq
SS.addActiveSub sessId rq tSess $ currentSubs c
failSubscription :: SomeRcvQueue q => AgentClient -> q -> SMPClientError -> STM ()
failSubscription c rq e = do
RQ.deleteQueue rq (pendingSubs c)
TM.insert (RQ.qKey rq) e (removedSubs c)
-- TODO [subs]
TM.insert (qUserId rq, qServer rq, queueId rq) e (removedSubs c)
tSess <- mkSMPTransportSession c rq
SS.deletePendingSub (queueId rq) tSess $ currentSubs c
addPendingSubscription :: AgentClient -> RcvQueueSub -> STM ()
addPendingSubscription c rq = do
modifyTVar' (subscrConns c) $ S.insert $ qConnId rq
RQ.addQueue rq $ pendingSubs c
-- TODO [subs]
tSess <- mkSMPTransportSession c rq
SS.addPendingSub rq tSess $ currentSubs c
@@ -1685,39 +1648,25 @@ addNewQueueSubscription c rq' tSess sessId = do
hasActiveSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool
hasActiveSubscription c rq = do
yes <- RQ.hasQueue rq $ activeSubs c
-- TODO [subs]
tSess <- mkSMPTransportSession c rq
yes' <- SS.hasActiveSub (queueId rq) tSess $ currentSubs c
when (yes /= yes') $ unsafeIOToSTM $ error "hasActiveSub different result"
pure yes
SS.hasActiveSub (queueId rq) tSess $ currentSubs c
{-# INLINE hasActiveSubscription #-}
hasPendingSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool
hasPendingSubscription c rq = do
yes <- RQ.hasQueue rq $ pendingSubs c
-- TODO [subs]
tSess <- mkSMPTransportSession c rq
yes' <- SS.hasPendingSub (queueId rq) tSess $ currentSubs c
when (yes /= yes') $ unsafeIOToSTM $ error "hasPendingSub different result"
pure yes
SS.hasPendingSub (queueId rq) tSess $ currentSubs c
{-# INLINE hasPendingSubscription #-}
removeSubscription :: SomeRcvQueue q => AgentClient -> ConnId -> q -> STM ()
removeSubscription c connId rq = do
modifyTVar' (subscrConns c) $ S.delete connId
RQ.deleteQueue rq $ activeSubs c
RQ.deleteQueue rq $ pendingSubs c
-- TODO [subs]
tSess <- mkSMPTransportSession c rq
SS.deleteSub (queueId rq) tSess $ currentSubs c
removeSubscriptions :: SomeRcvQueue q => AgentClient -> [ConnId] -> [q] -> STM ()
removeSubscriptions c connIds rqs = do
unless (null connIds) $ modifyTVar' (subscrConns c) (`S.difference` (S.fromList connIds))
RQ.batchDeleteQueues rqs $ activeSubs c
RQ.batchDeleteQueues rqs $ pendingSubs c
-- TODO [subs] batch
forM_ rqs $ \rq -> do
tSess <- mkSMPTransportSession c rq
SS.deleteSub (queueId rq) tSess $ currentSubs c
@@ -2424,15 +2373,16 @@ data ServerSessions = ServerSessions
getAgentSubsTotal :: AgentClient -> [UserId] -> IO (SMPServerSubs, Bool)
getAgentSubsTotal c userIds = do
ssActive <- getSubsCount activeSubs
ssPending <- getSubsCount pendingSubs
(ssActive, ssPending) <- SS.foldSessionSubs addSub (0, 0) $ currentSubs c
sess <- hasSession . M.toList =<< readTVarIO (smpClients c)
pure (SMPServerSubs {ssActive, ssPending}, sess)
where
getSubsCount :: (AgentClient -> TRcvQueues q) -> IO Int
getSubsCount subs = M.foldrWithKey' addSub 0 <$> readTVarIO (RQ.getRcvQueues $ subs c)
addSub :: (UserId, SMPServer, SMP.RecipientId) -> q -> Int -> Int
addSub (userId, _, _) _ cnt = if userId `elem` userIds then cnt + 1 else cnt
addSub :: (Int, Int) -> (SMPTransportSession, SS.SessSubs) -> IO (Int, Int)
addSub acc@(!ssActive, !ssPending) ((userId, _, _), s)
| userId `elem` userIds = do
(active, pending) <- SS.mapSubs M.size s
pure (ssActive + active, ssPending + pending)
| otherwise = pure acc
hasSession :: [(SMPTransportSession, SMPClientVar)] -> IO Bool
hasSession = \case
[] -> pure False
@@ -2469,13 +2419,12 @@ getAgentServersSummary c@AgentClient {smpServersStats, xftpServersStats, ntfServ
ntfServersSessions
}
where
getServerSubs = do
subs <- M.foldrWithKey' (addSub incActive) M.empty <$> readTVarIO (RQ.getRcvQueues $ activeSubs c)
M.foldrWithKey' (addSub incPending) subs <$> readTVarIO (RQ.getRcvQueues $ pendingSubs c)
getServerSubs = SS.foldSessionSubs addSub M.empty $ currentSubs c
where
addSub f (userId, srv, _) _ = M.alter (Just . f . fromMaybe SMPServerSubs {ssActive = 0, ssPending = 0}) (userId, srv)
incActive ss = ss {ssActive = ssActive ss + 1}
incPending ss = ss {ssPending = ssPending ss + 1}
addSub subs ((userId, srv, _), s) = do
(active, pending) <- SS.mapSubs M.size s
let add ss = ss {ssActive = ssActive ss + active, ssPending = ssPending ss + pending}
pure $ M.alter (Just . add . fromMaybe (SMPServerSubs 0 0)) (userId, srv) subs
Env {xftpAgent = XFTPAgent {xftpRcvWorkers, xftpSndWorkers, xftpDelWorkers}} = agentEnv
getXFTPWorkerSrvs workers = foldM addSrv [] . M.toList =<< readTVarIO workers
where
@@ -2507,13 +2456,14 @@ data SubscriptionsInfo = SubscriptionsInfo
getAgentSubscriptions :: AgentClient -> IO SubscriptionsInfo
getAgentSubscriptions c = do
activeSubscriptions <- getSubs activeSubs
pendingSubscriptions <- getSubs pendingSubs
(activeSubscriptions, pendingSubscriptions) <- SS.foldSessionSubs addSubs ([], []) $ currentSubs c
removedSubscriptions <- getRemovedSubs
pure $ SubscriptionsInfo {activeSubscriptions, pendingSubscriptions, removedSubscriptions}
where
getSubs :: (AgentClient -> TRcvQueues q) -> IO [SubInfo]
getSubs sel = map (`subInfo` Nothing) . M.keys <$> readTVarIO (RQ.getRcvQueues $ sel c)
addSubs :: ([SubInfo], [SubInfo]) -> (SMPTransportSession, SS.SessSubs) -> IO ([SubInfo], [SubInfo])
addSubs (active, pending) ((userId, srv, _), s) = do
(active', pending') <- SS.mapSubs (map (\rId -> subInfo (userId, srv, rId) Nothing) . M.keys) s
pure (active' ++ active, pending' ++ pending)
getRemovedSubs = map (uncurry subInfo . second Just) . M.assocs <$> readTVarIO (removedSubs c)
subInfo :: (UserId, SMPServer, SMP.RecipientId) -> Maybe SMPClientError -> SubInfo
subInfo (uId, srv, rId) err = SubInfo {userId = uId, server = enc srv, rcvId = enc rId, subError = show <$> err}

View File

@@ -4,6 +4,7 @@
module Simplex.Messaging.Agent.TSessionSubs
( TSessionSubs (sessionSubs),
SessSubs (..),
emptyIO,
clear,
hasActiveSub,
@@ -19,6 +20,8 @@ module Simplex.Messaging.Agent.TSessionSubs
getPendingSubs,
getActiveSubs,
setSubsPending,
foldSessionSubs,
mapSubs,
)
where
@@ -162,3 +165,12 @@ setSubsPending_ s sessId_ = do
writeTVar as M.empty
modifyTVar' (pendingSubs s) $ M.union subs
pure subs
foldSessionSubs :: (a -> (SMPTransportSession, SessSubs) -> IO a) -> a -> TSessionSubs -> IO a
foldSessionSubs f a = foldM f a . M.assocs <=< readTVarIO . sessionSubs
mapSubs :: (Map RecipientId RcvQueueSub -> a) -> SessSubs -> IO (a, a)
mapSubs f s = do
active <- readTVarIO $ activeSubs s
pending <- readTVarIO $ pendingSubs s
pure (f active, f pending)

View File

@@ -134,13 +134,13 @@ main = do
ntfServerTests (transport @TLS, ASType SQSPostgres SMSPostgres)
around_ (postgressBracket testServerDBConnectInfo) $ do
xdescribe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal)
fdescribe "SMP client agent, postgres-only message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSPostgres)
describe "SMP client agent, postgres-only message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSPostgres)
xdescribe "SMP proxy, postgres+jornal message store" $
before (pure $ ASType SQSPostgres SMSJournal) smpProxyTests
describe "SMP proxy, postgres-only message store" $
before (pure $ ASType SQSPostgres SMSPostgres) smpProxyTests
#endif
xdescribe "SMP client agent, jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal)
describe "SMP client agent, jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal)
describe "SMP proxy, jornal message store" $
before (pure $ ASType SQSMemory SMSJournal) smpProxyTests
describe "XFTP" $ do

View File

@@ -48,7 +48,7 @@ newtype TestWrapper a = TestWrapper a
-- TODO [ntfdb] running wiht LogWarn level shows potential issue "Queue count differs"
testLogLevel :: LogLevel
testLogLevel = LogWarn
testLogLevel = LogError
instance Example a => Example (TestWrapper a) where
type Arg (TestWrapper a) = Arg a
@@ -56,7 +56,7 @@ instance Example a => Example (TestWrapper a) where
ci <- envCI
runTest `E.catches` [E.Handler (onTestFailure ci), E.Handler (onTestException ci)]
where
tt = 30
tt = 120
runTest =
timeout (tt * 1000000) (evaluateExample action params hooks state) `finally` callCommand "sync" >>= \case
Just r -> pure r