agent: optimize subscriptions memory usage more (do not store subscribed queues in memory) WIP

This commit is contained in:
Evgeny Poberezkin
2025-10-05 19:42:19 +01:00
parent 779222d1a7
commit 78c340ecaa
9 changed files with 392 additions and 116 deletions
+1
View File
@@ -109,6 +109,7 @@ library
Simplex.Messaging.Agent.Store.Postgres.Options
Simplex.Messaging.Agent.Store.Shared
Simplex.Messaging.Agent.TRcvQueues
Simplex.Messaging.Agent.TSessionSubs
Simplex.Messaging.Client
Simplex.Messaging.Client.Agent
Simplex.Messaging.Compression
+5 -4
View File
@@ -555,13 +555,14 @@ testProtocolServer c nm userId srv = withAgentEnv' c $ case protocolTypeI @p of
-- | set SOCKS5 proxy on/off and optionally set TCP timeouts for fast network
setNetworkConfig :: AgentClient -> NetworkConfig -> IO ()
setNetworkConfig c@AgentClient {useNetworkConfig, proxySessTs} cfg' = do
(spChanged, changed) <- atomically $ do
ts <- getCurrentTime
changed <- atomically $ do
(_, cfg) <- readTVar useNetworkConfig
let changed = cfg /= cfg'
!cfgSlow = slowNetworkConfig cfg'
when changed $ writeTVar useNetworkConfig (cfgSlow, cfg')
pure (socksProxy cfg /= socksProxy cfg', changed)
when spChanged $ getCurrentTime >>= atomically . writeTVar proxySessTs
when (socksProxy cfg /= socksProxy cfg') $ writeTVar proxySessTs ts
pure changed
when changed $ reconnectAllServers c
setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO ()
@@ -1270,7 +1271,7 @@ subscribeConnections_ c conns = do
let (subRs, cs) = foldr partitionResultsConns ([], []) conns
resumeDelivery cs
resumeConnCmds c $ map fst cs
rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concatMap rcvQueues cs)
rcvRs <- lift $ connResults <$> subscribeQueues c (concatMap rcvQueues cs) False
rcvRs' <- storeClientServiceAssocs rcvRs
ns <- asks ntfSupervisor
lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs' cs
+207 -104
View File
@@ -201,6 +201,7 @@ import Data.Bifunctor (bimap, first, second)
import qualified Data.ByteString.Base64 as B64
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Composition ((.:))
import Data.Either (isRight, partitionEithers)
import Data.Functor (($>))
import Data.Int (Int64)
@@ -217,6 +218,7 @@ import Data.Text.Encoding
import Data.Time (UTCTime, addUTCTime, defaultTimeLocale, formatTime, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Data.Word (Word16)
import GHC.Conc (unsafeIOToSTM)
import Network.Socket (HostName)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError)
import qualified Simplex.FileTransfer.Client as X
@@ -234,7 +236,9 @@ import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction)
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues (getRcvQueues))
import Simplex.Messaging.Agent.TSessionSubs (TSessionSubs)
import qualified Simplex.Messaging.Agent.TSessionSubs as SS
import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues)
import qualified Simplex.Messaging.Agent.TRcvQueues as RQ
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
@@ -311,8 +315,6 @@ type NtfClientVar = ClientVar NtfResponse
type XFTPClientVar = ClientVar FileResponse
type SMPTransportSession = TransportSession SMP.BrokerMsg
type NtfTransportSession = TransportSession NtfResponse
type XFTPTransportSession = TransportSession FileResponse
@@ -339,6 +341,7 @@ data AgentClient = AgentClient
subscrConns :: TVar (Set ConnId),
activeSubs :: TRcvQueues (SessionId, RcvQueueSub),
pendingSubs :: TRcvQueues RcvQueueSub,
currentSubs :: TSessionSubs,
removedSubs :: TMap (UserId, SMPServer, SMP.RecipientId) SMPClientError,
workerSeq :: TVar Int,
smpDeliveryWorkers :: TMap SndQAddr (Worker, TMVar ()),
@@ -507,6 +510,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai
subscrConns <- newTVarIO S.empty
activeSubs <- RQ.empty
pendingSubs <- RQ.empty
currentSubs <- SS.emptyIO
removedSubs <- TM.emptyIO
workerSeq <- newTVarIO 0
smpDeliveryWorkers <- TM.emptyIO
@@ -546,6 +550,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai
subscrConns,
activeSubs,
pendingSubs,
currentSubs,
removedSubs,
workerSeq,
smpDeliveryWorkers,
@@ -701,10 +706,11 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm
liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do
ts <- readTVarIO proxySessTs
smp <- ExceptT $ getProtocolClient g nm tSess cfg presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs
atomically $ SS.setSessionId (sessionId $ thParams smp) tSess $ currentSubs c
pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs}
smpClientDisconnected :: AgentClient -> SMPTransportSession -> Env -> SMPClientVar -> TMap SMPServer ProxiedRelayVar -> SMPClient -> IO ()
smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess@(userId, srv, qId) env v prs client = do
smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess@(userId, srv, cId) env v prs client = do
removeClientAndSubs >>= serverDown
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
where
@@ -712,29 +718,38 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess
-- because we can have a race condition when a new current client could have already
-- made subscriptions active, and the old client would be processing diconnection later.
removeClientAndSubs :: IO ([RcvQueueSub], [ConnId])
removeClientAndSubs = atomically $ do
removeSessVar v tSess smpClients
ifM (readTVar active) removeSubs (pure ([], []))
removeClientAndSubs = do
(qs, cs, subs) <- atomically $ do
removeSessVar v tSess smpClients
ifM (readTVar active) removeSubs (pure ([], [], M.empty))
-- TODO [subs] remove logs
when (M.keysSet subs /= S.fromList (map queueId qs)) $ error $ "setSubsPending different queues: " <> show (S.size $ M.keysSet subs) <> " " <> show (S.size $ S.fromList $ map queueId qs)
when (S.fromList (map qConnId $ M.elems subs) /= S.fromList (map qConnId qs)) $ error $ "setSubsPending different connections: " <> show (S.size (S.fromList $ map qConnId $ M.elems subs)) <> " " <> show (S.size $ S.fromList $ map qConnId qs)
pure (qs, cs)
where
sessId = sessionId $ thParams client
removeSubs = do
(qs, cs) <- RQ.getDelSessQueues tSess sessId $ activeSubs c
RQ.batchAddQueues qs $ pendingSubs c
-- TODO [subs]
mode <- getSessionMode c
subs <- SS.setSubsPending mode tSess sessId $ currentSubs c
-- this removes proxied relays that this client created sessions to
destSrvs <- M.keys <$> readTVar prs
forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, qId) smpProxiedRelays
pure (qs, cs)
forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, cId) smpProxiedRelays
pure (qs, cs, subs)
serverDown :: ([RcvQueueSub], [ConnId]) -> IO ()
serverDown (qs, conns) = whenM (readTVarIO active) $ do
notifySub "" $ hostEvent' DISCONNECT client
unless (null conns) $ notifySub "" $ DOWN srv conns
notifySub c "" $ hostEvent' DISCONNECT client
unless (null conns) $ notifySub c "" $ DOWN srv conns
unless (null qs) $ do
atomically $ mapM_ (releaseGetLock c) qs
runReaderT (resubscribeSMPSession c tSess) env
notifySub :: forall e. AEntityI e => ConnId -> AEvent e -> IO ()
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd)
releaseGetLocksIO c qs
mode <- getSessionModeIO c
let resubscribe
| (mode == TSMEntity) == isJust cId = resubscribeSMPSession c tSess
| otherwise = void $ subscribeQueues c qs True
runReaderT resubscribe env
resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' ()
resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
@@ -743,9 +758,16 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
where
getWorkerVar ts =
ifM
(not <$> RQ.hasSessQueues tSess (pendingSubs c))
(not <$> hasQueues)
(pure Nothing) -- prevent race with cleanup and adding pending queues in another call
(Just <$> getSessVar workerSeq tSess smpSubWorkers ts)
where
hasQueues = do
-- TODO [subs]
yes <- RQ.hasSessQueues tSess (pendingSubs c)
yes' <- SS.hasPendingSubs tSess $ currentSubs c
when (yes /= yes') $ unsafeIOToSTM $ error "hasPendingSubs different result"
pure yes
newSubWorker v = do
a <- async $ void (E.tryAny runSubWorker) >> atomically (cleanup v)
atomically $ putTMVar (sessionVar v) a
@@ -753,10 +775,13 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
ri <- asks $ reconnectInterval . config
withRetryForeground ri isForeground (isNetworkOnline c) $ \_ loop -> do
pending <- liftIO $ RQ.getSessQueues tSess $ pendingSubs c
forM_ (L.nonEmpty pending) $ \qs -> do
-- TODO [subs]
subs <- atomically $ SS.getPendingSubs tSess $ currentSubs c
when (M.keysSet subs /= S.fromList (map queueId pending)) $ error "getPendingSubs different queues"
unless (null pending) $ do
liftIO $ waitUntilForeground c
liftIO $ waitForUserNetwork c
reconnectSMPClient c tSess qs
handleNotify $ resubscribeSessQueues c tSess pending
loop
isForeground = (ASForeground ==) <$> readTVar (agentState c)
cleanup :: SessionVar (Async ()) -> STM ()
@@ -765,28 +790,11 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
-- Not waiting may result in terminated worker remaining in the map.
whenM (isEmptyTMVar $ sessionVar v) retry
removeSessVar v tSess smpSubWorkers
reconnectSMPClient :: AgentClient -> SMPTransportSession -> NonEmpty RcvQueueSub -> AM' ()
reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do
cs <- liftIO $ RQ.getSessConns tSess $ activeSubs c
(rs, sessId_) <- subscribeQueues c $ L.toList qs
let (errs, okConns) = partitionEithers $ map (\(RcvQueueSub {connId}, r) -> bimap (connId,) (const connId) r) rs
conns = filter (`S.notMember` cs) okConns
unless (null conns) $ notifySub "" $ UP srv conns
let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs
mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs
forM_ (listToMaybe tempErrs) $ \(connId, e) -> do
when (null okConns && S.null cs && null finalErrs) . liftIO $
forM_ sessId_ $ \sessId -> do
-- We only close the client session that was used to subscribe.
v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing)
mapM_ (closeClient_ c) v_
notifySub connId $ ERR e
where
handleNotify :: AM' () -> AM' ()
handleNotify = E.handleAny $ notifySub "" . ERR . INTERNAL . show
notifySub :: forall e. AEntityI e => ConnId -> AEvent e -> AM' ()
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd)
handleNotify = E.handleAny $ notifySub c "" . ERR . INTERNAL . show
notifySub :: forall e m. (AEntityI e, MonadIO m) => AgentClient -> ConnId -> AEvent e -> m ()
notifySub c connId cmd = liftIO $ nonBlockingWriteTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd)
getNtfServerClient :: AgentClient -> NetworkRequestMode -> NtfTransportSession -> AM NtfClient
getNtfServerClient c@AgentClient {active, ntfClients, workerSeq, proxySessTs, presetDomains} nm tSess@(_, srv, _) = do
@@ -929,8 +937,9 @@ closeAgentClient c = do
atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect
clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst)
clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker
atomically . RQ.clear $ activeSubs c
atomically . RQ.clear $ pendingSubs c
atomically $ RQ.clear $ activeSubs c
atomically $ RQ.clear $ pendingSubs c
atomically $ SS.clear $ currentSubs c
clear subscrConns
clear getMsgLocks
where
@@ -1071,7 +1080,7 @@ withLogClient c nm tSess entId cmdStr action = withLogClient_ c nm tSess entId c
withSMPClient :: SMPQueueRec q => AgentClient -> NetworkRequestMode -> q -> ByteString -> (SMPClient -> ExceptT SMPClientError IO a) -> AM a
withSMPClient c nm q cmdStr action = do
tSess <- mkSMPTransportSession c q
tSess <- mkSMPTransportSessionIO c q
withLogClient c nm tSess (unEntityId $ queueId q) cmdStr $ action . connectedClient
sendOrProxySMPMessage :: AgentClient -> NetworkRequestMode -> UserId -> SMPServer -> ConnId -> ByteString -> Maybe SMP.SndPrivateAuthKey -> SMP.SenderId -> MsgFlags -> SMP.MsgBody -> AM (Maybe SMPServer)
@@ -1336,14 +1345,18 @@ getXFTPWorkPath = do
maybe getTemporaryDirectory pure workDir
mkTransportSession :: MonadIO m => AgentClient -> UserId -> ProtoServer msg -> ByteString -> m (TransportSession msg)
mkTransportSession c userId srv sessEntId = mkTSession userId srv sessEntId <$> getSessionMode c
mkTransportSession c userId srv sessEntId = mkTSession userId srv sessEntId <$> getSessionModeIO c
{-# INLINE mkTransportSession #-}
mkTSession :: UserId -> ProtoServer msg -> ByteString -> TransportSessionMode -> TransportSession msg
mkTSession userId srv sessEntId mode = (userId, srv, if mode == TSMEntity then Just sessEntId else Nothing)
{-# INLINE mkTSession #-}
mkSMPTransportSession :: (SMPQueueRec q, MonadIO m) => AgentClient -> q -> m SMPTransportSession
mkSMPTransportSessionIO :: (SMPQueueRec q, MonadIO m) => AgentClient -> q -> m SMPTransportSession
mkSMPTransportSessionIO c q = mkSMPTSession q <$> getSessionModeIO c
{-# INLINE mkSMPTransportSessionIO #-}
mkSMPTransportSession :: SMPQueueRec q => AgentClient -> q -> STM SMPTransportSession
mkSMPTransportSession c q = mkSMPTSession q <$> getSessionMode c
{-# INLINE mkSMPTransportSession #-}
@@ -1351,8 +1364,12 @@ mkSMPTSession :: SMPQueueRec q => q -> TransportSessionMode -> SMPTransportSessi
mkSMPTSession q = mkTSession (qUserId q) (qServer q) (qConnId q)
{-# INLINE mkSMPTSession #-}
getSessionMode :: MonadIO m => AgentClient -> m TransportSessionMode
getSessionMode = fmap sessionMode . getNetworkConfig
getSessionModeIO :: MonadIO m => AgentClient -> m TransportSessionMode
getSessionModeIO = fmap (sessionMode . snd) . readTVarIO . useNetworkConfig
{-# INLINE getSessionModeIO #-}
getSessionMode :: AgentClient -> STM TransportSessionMode
getSessionMode = fmap (sessionMode . snd) . readTVar . useNetworkConfig
{-# INLINE getSessionMode #-}
newRcvQueue :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SConnectionMode c -> Bool -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId)
@@ -1500,46 +1517,92 @@ serverHostError = \case
SMP.TRANSPORT TEVersion -> True
_ -> False
-- | Subscribe to queues. The list of results can have a different order.
subscribeQueues :: AgentClient -> [RcvQueueSub] -> AM' ([(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))], Maybe SessionId)
subscribeQueues c qs = do
(errs, qs') <- partitionEithers <$> mapM checkQueue qs
atomically $ do
modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId qs'))
RQ.batchAddQueues qs' $ pendingSubs c
env <- ask
-- only "checked" queues are subscribed
session <- newTVarIO Nothing
rs <- sendTSessionBatches "SUB" mkSMPTSession (subscribeQueues_ env session) c NRMBackground qs'
(errs <> rs,) <$> readTVarIO session
-- | Batch by transport session and subscribe queues. The list of results can have a different order.
subscribeQueues :: AgentClient -> [RcvQueueSub] -> Bool -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))]
subscribeQueues c qs withEvents = do
(errs, qs') <- checkQueues c qs
atomically $ modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId qs'))
qss <- batchQueues mkSMPTSession c qs'
mapM_ addPendingSubs qss
rs <- mapConcurrently subscribeQueues_ qss
when (withEvents && not (null errs)) $ notifySub c "" $ ERRS $ map (first qConnId) errs
pure $ map (second Left) errs <> concatMap L.toList rs
where
checkQueue rq = do
prohibited <- liftIO $ hasGetLock c rq
pure $ if prohibited then Left (rq, Left $ CMD PROHIBITED "subscribeQueues") else Right rq
subscribeQueues_ :: Env -> TVar (Maybe SessionId) -> SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId))
subscribeQueues_ env session smp qs' = do
let (userId, srv, _) = transportSession' smp
atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs'
rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs'
active <-
atomically $
ifM
(activeClientSession c tSess sessId)
(writeTVar session (Just sessId) >> processSubResults rs $> True)
(incSMPServerStat' c userId srv connSubIgnored (length rs) $> False)
addPendingSubs (tSess, qs') = atomically $ do
let qs'' = L.toList qs'
RQ.batchAddQueues qs'' $ pendingSubs c
SS.batchAddPendingSubs qs'' tSess $ currentSubs c
subscribeQueues_ qs'@(tSess@(_, srv, _), _) = do
(rs, active) <- subscribeSessQueues_ c qs' withEvents
if active
then when (hasTempErrors rs) resubscribe $> rs
else do
logWarn "subcription batch result for replaced SMP client, resubscribing"
-- TODO we probably use PCENetworkError here instead of the original error, so it becomes temporary.
resubscribe $> L.map (second $ Left . PCENetworkError . NESubscribeError . show) rs
-- we use BROKER NETWORK error here instead of the original error, so it becomes temporary.
resubscribe $> L.map (second $ Left . toNESubscribeError) rs
where
-- treating host errors as temporary here as well
hasTempErrors = any (either temporaryOrHostError (const False) . snd)
toNESubscribeError = BROKER (B.unpack $ strEncode srv) . NETWORK . NESubscribeError . show
resubscribe = resubscribeSMPSession c tSess
-- only "checked" queues are subscribed
checkQueues :: AgentClient -> [RcvQueueSub] -> AM' ([(RcvQueueSub, AgentErrorType)], [RcvQueueSub])
checkQueues c = fmap partitionEithers . mapM checkQueue
where
checkQueue rq = do
prohibited <- liftIO $ hasGetLock c rq
pure $ if prohibited then Left (rq, CMD PROHIBITED "checkQueues") else Right rq
-- This function expects that all queues belong to one transport session,
-- and that they are already added to pending subscriptions.
resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [RcvQueueSub] -> AM' ()
resubscribeSessQueues c tSess qs = do
(errs, qs_) <- checkQueues c qs
forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c (tSess, qs') True
unless (null errs) $ notifySub c "" $ ERRS $ map (first qConnId) errs
subscribeSessQueues_ :: AgentClient -> (SMPTransportSession, NonEmpty RcvQueueSub) -> Bool -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool)
subscribeSessQueues_ c qs withEvents = sendClientBatch_ "SUB" False subscribeQueues_ c NRMBackground qs
where
subscribeQueues_ :: SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId), Bool)
subscribeQueues_ smp qs' = do
let (userId, srv, _) = tSess
atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs'
rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs'
cs_ <-
if withEvents
then do
cs <- liftIO $ RQ.getSessConns tSess $ activeSubs c
-- TODO [subs]
subs <- atomically $ SS.getActiveSubs tSess $ currentSubs c
when (S.fromList (map qConnId $ M.elems subs) /= cs) $ error "getActiveSubs different connections"
pure $ Just cs
else pure Nothing
active <-
atomically $
ifM
(activeClientSession c tSess sessId)
(processSubResults rs $> True)
(incSMPServerStat' c userId srv connSubIgnored (length rs) $> False)
forM_ cs_ $ \cs -> do
let (errs, okConns) = partitionEithers $ map (\(RcvQueueSub {connId}, r) -> bimap (connId,) (const connId) r) $ L.toList rs
conns = filter (`S.notMember` cs) okConns
unless (null conns) $ notifySub c "" $ UP srv conns
let (tempErrs, finalErrs) = partition (temporaryClientError . snd) errs
mapM_ (\(connId, e) -> notifySub c connId $ ERR $ protocolClientError SMP (clientServer smp) e) finalErrs
forM_ (listToMaybe tempErrs) $ \(connId, e) -> do
when (null okConns && S.null cs && null finalErrs && active) $ liftIO $ do
-- We only close the client session that was used to subscribe.
v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing)
mapM_ (closeClient_ c) v_
notifySub c connId $ ERR $ protocolClientError SMP (clientServer smp) e
pure (rs, active)
where
tSess = transportSession' smp
sessId = sessionId $ thParams smp
hasTempErrors = any (either temporaryClientError (const False) . snd)
processSubResults :: NonEmpty (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> STM ()
processSubResults = mapM_ $ uncurry $ processSubResult c sessId
resubscribe = resubscribeSMPSession c tSess `runReaderT` env
activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool
activeClientSession c tSess sessId = sameSess <$> tryReadSessVar tSess (smpClients c)
@@ -1554,25 +1617,30 @@ type BatchResponses q e r = NonEmpty (q, Either e r)
-- it includes arguments in the results instead.
sendTSessionBatches :: forall q r. ByteString -> (q -> TransportSessionMode -> SMPTransportSession) -> (SMPClient -> NonEmpty q -> IO (BatchResponses q SMPClientError r)) -> AgentClient -> NetworkRequestMode -> [q] -> AM' [(q, Either AgentErrorType r)]
sendTSessionBatches statCmd mkSession action c nm qs =
concatMap L.toList <$> (mapConcurrently sendClientBatch =<< batchQueues)
concatMap L.toList <$> (mapConcurrently (sendClientBatch statCmd action c nm) =<< batchQueues mkSession c qs)
batchQueues :: (q -> TransportSessionMode -> SMPTransportSession) -> AgentClient -> [q] -> AM' [(SMPTransportSession, NonEmpty q)]
batchQueues mkSession c qs = do
mode <- getSessionModeIO c
pure . M.assocs $ foldr (batch mode) M.empty qs
where
batchQueues :: AM' [(SMPTransportSession, NonEmpty q)]
batchQueues = do
mode <- getSessionMode c
pure . M.assocs $ foldr (batch mode) M.empty qs
batch mode q m =
let tSess = mkSession q mode
in M.alter (Just . maybe [q] (q <|)) tSess m
sendClientBatch :: ByteString -> (SMPClient -> NonEmpty q -> IO (BatchResponses q SMPClientError r)) -> AgentClient -> NetworkRequestMode -> (SMPTransportSession, NonEmpty q) -> AM' (BatchResponses q AgentErrorType r)
sendClientBatch statCmd action c nm qs = fmap fst $ sendClientBatch_ statCmd () (fmap (,()) .: action) c nm qs
{-# INLINE sendClientBatch #-}
sendClientBatch_ :: ByteString -> res -> (SMPClient -> NonEmpty q -> IO (BatchResponses q SMPClientError r, res)) -> AgentClient -> NetworkRequestMode -> (SMPTransportSession, NonEmpty q) -> AM' (BatchResponses q AgentErrorType r, res)
sendClientBatch_ statCmd errRes action c nm (tSess@(_, srv, _), qs') =
tryAllErrors' (getSMPServerClient c nm tSess) >>= \case
Left e -> pure (L.map (,Left e) qs', errRes)
Right (SMPConnectedClient smp _) -> liftIO $ do
logServer' "-->" c srv (bshow (length qs') <> " queues") statCmd
first (L.map agentError) <$> action smp qs'
where
batch mode q m =
let tSess = mkSession q mode
in M.alter (Just . maybe [q] (q <|)) tSess m
sendClientBatch :: (SMPTransportSession, NonEmpty q) -> AM' (BatchResponses q AgentErrorType r)
sendClientBatch (tSess@(_, srv, _), qs') =
tryAllErrors' (getSMPServerClient c nm tSess) >>= \case
Left e -> pure $ L.map (,Left e) qs'
Right (SMPConnectedClient smp _) -> liftIO $ do
logServer' "-->" c srv (bshow (length qs') <> " queues") statCmd
L.map agentError <$> action smp qs'
where
agentError = second . first $ protocolClientError SMP $ clientServer smp
agentError = second . first $ protocolClientError SMP $ clientServer smp
sendBatch :: SomeRcvQueue q => (SMPClient -> NetworkRequestMode -> NonEmpty (SMP.RecipientId, SMP.RcvPrivateAuthKey) -> IO (NonEmpty (Either SMPClientError a))) -> SMPClient -> NetworkRequestMode -> NonEmpty q -> IO (BatchResponses q SMPClientError a)
sendBatch smpCmdFunc smp nm qs = L.zip qs <$> smpCmdFunc smp nm (L.map queueCreds qs)
@@ -1580,20 +1648,29 @@ sendBatch smpCmdFunc smp nm qs = L.zip qs <$> smpCmdFunc smp nm (L.map queueCred
queueCreds q = (queueId q, rcvAuthKey q)
addSubscription :: AgentClient -> SessionId -> RcvQueueSub -> STM ()
addSubscription c sessId rq@RcvQueueSub {connId} = do
modifyTVar' (subscrConns c) $ S.insert connId
addSubscription c sessId rq = do
modifyTVar' (subscrConns c) $ S.insert $ qConnId rq
RQ.addSessQueue (sessId, rq) $ activeSubs c
RQ.deleteQueue rq $ pendingSubs c
-- TODO [subs]
tSess <- mkSMPTransportSession c rq
SS.addActiveSub sessId rq tSess $ currentSubs c
failSubscription :: SomeRcvQueue q => AgentClient -> q -> SMPClientError -> STM ()
failSubscription c rq e = do
RQ.deleteQueue rq (pendingSubs c)
TM.insert (RQ.qKey rq) e (removedSubs c)
-- TODO [subs]
tSess <- mkSMPTransportSession c rq
SS.deletePendingSub (queueId rq) tSess $ currentSubs c
addPendingSubscription :: AgentClient -> RcvQueueSub -> STM ()
addPendingSubscription c rq@RcvQueueSub {connId} = do
modifyTVar' (subscrConns c) $ S.insert connId
addPendingSubscription c rq = do
modifyTVar' (subscrConns c) $ S.insert $ qConnId rq
RQ.addQueue rq $ pendingSubs c
-- TODO [subs]
tSess <- mkSMPTransportSession c rq
SS.addPendingSub rq tSess $ currentSubs c
addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> AM' ()
addNewQueueSubscription c rq' tSess sessId = do
@@ -1607,11 +1684,23 @@ addNewQueueSubscription c rq' tSess sessId = do
unless same $ resubscribeSMPSession c tSess
hasActiveSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool
hasActiveSubscription c rq = RQ.hasQueue rq $ activeSubs c
hasActiveSubscription c rq = do
yes <- RQ.hasQueue rq $ activeSubs c
-- TODO [subs]
tSess <- mkSMPTransportSession c rq
yes' <- SS.hasActiveSub (queueId rq) tSess $ currentSubs c
when (yes /= yes') $ unsafeIOToSTM $ error "hasActiveSub different result"
pure yes
{-# INLINE hasActiveSubscription #-}
hasPendingSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool
hasPendingSubscription c rq = RQ.hasQueue rq $ pendingSubs c
hasPendingSubscription c rq = do
yes <- RQ.hasQueue rq $ pendingSubs c
-- TODO [subs]
tSess <- mkSMPTransportSession c rq
yes' <- SS.hasPendingSub (queueId rq) tSess $ currentSubs c
when (yes /= yes') $ unsafeIOToSTM $ error "hasPendingSub different result"
pure yes
{-# INLINE hasPendingSubscription #-}
removeSubscription :: SomeRcvQueue q => AgentClient -> ConnId -> q -> STM ()
@@ -1619,12 +1708,19 @@ removeSubscription c connId rq = do
modifyTVar' (subscrConns c) $ S.delete connId
RQ.deleteQueue rq $ activeSubs c
RQ.deleteQueue rq $ pendingSubs c
-- TODO [subs]
tSess <- mkSMPTransportSession c rq
SS.deleteSub (queueId rq) tSess $ currentSubs c
removeSubscriptions :: SomeRcvQueue q => AgentClient -> [ConnId] -> [q] -> STM ()
removeSubscriptions c connIds rqs = do
unless (null connIds) $ modifyTVar' (subscrConns c) (`S.difference` (S.fromList connIds))
RQ.batchDeleteQueues rqs $ activeSubs c
RQ.batchDeleteQueues rqs $ pendingSubs c
-- TODO [subs] batch
forM_ rqs $ \rq -> do
tSess <- mkSMPTransportSession c rq
SS.deleteSub (queueId rq) tSess $ currentSubs c
getSubscriptions :: AgentClient -> IO (Set ConnId)
getSubscriptions = readTVarIO . subscrConns
@@ -1782,6 +1878,13 @@ releaseGetLock c rq =
TM.lookup (qServer rq, queueId rq) (getMsgLocks c) >>= mapM_ (`tryPutTMVar` ())
{-# INLINE releaseGetLock #-}
releaseGetLocksIO :: SomeRcvQueue q => AgentClient -> [q] -> IO ()
releaseGetLocksIO c rqs = do
locks <- readTVarIO $ getMsgLocks c
forM_ rqs $ \rq ->
forM_ (M.lookup ((qServer rq, queueId rq)) locks) $ \lock ->
atomically $ tryPutTMVar lock ()
suspendQueue :: AgentClient -> NetworkRequestMode -> RcvQueue -> AM ()
suspendQueue c nm rq@RcvQueue {rcvId, rcvPrivateKey} =
withSMPClient c nm rq "OFF" $ \smp ->
@@ -2327,7 +2430,7 @@ getAgentSubsTotal c userIds = do
pure (SMPServerSubs {ssActive, ssPending}, sess)
where
getSubsCount :: (AgentClient -> TRcvQueues q) -> IO Int
getSubsCount subs = M.foldrWithKey' addSub 0 <$> readTVarIO (getRcvQueues $ subs c)
getSubsCount subs = M.foldrWithKey' addSub 0 <$> readTVarIO (RQ.getRcvQueues $ subs c)
addSub :: (UserId, SMPServer, SMP.RecipientId) -> q -> Int -> Int
addSub (userId, _, _) _ cnt = if userId `elem` userIds then cnt + 1 else cnt
hasSession :: [(SMPTransportSession, SMPClientVar)] -> IO Bool
@@ -2367,8 +2470,8 @@ getAgentServersSummary c@AgentClient {smpServersStats, xftpServersStats, ntfServ
}
where
getServerSubs = do
subs <- M.foldrWithKey' (addSub incActive) M.empty <$> readTVarIO (getRcvQueues $ activeSubs c)
M.foldrWithKey' (addSub incPending) subs <$> readTVarIO (getRcvQueues $ pendingSubs c)
subs <- M.foldrWithKey' (addSub incActive) M.empty <$> readTVarIO (RQ.getRcvQueues $ activeSubs c)
M.foldrWithKey' (addSub incPending) subs <$> readTVarIO (RQ.getRcvQueues $ pendingSubs c)
where
addSub f (userId, srv, _) _ = M.alter (Just . f . fromMaybe SMPServerSubs {ssActive = 0, ssPending = 0}) (userId, srv)
incActive ss = ss {ssActive = ssActive ss + 1}
@@ -2410,7 +2513,7 @@ getAgentSubscriptions c = do
pure $ SubscriptionsInfo {activeSubscriptions, pendingSubscriptions, removedSubscriptions}
where
getSubs :: (AgentClient -> TRcvQueues q) -> IO [SubInfo]
getSubs sel = map (`subInfo` Nothing) . M.keys <$> readTVarIO (getRcvQueues $ sel c)
getSubs sel = map (`subInfo` Nothing) . M.keys <$> readTVarIO (RQ.getRcvQueues $ sel c)
getRemovedSubs = map (uncurry subInfo . second Just) . M.assocs <$> readTVarIO (removedSubs c)
subInfo :: (UserId, SMPServer, SMP.RecipientId) -> Maybe SMPClientError -> SubInfo
subInfo (uId, srv, rId) err = SubInfo {userId = uId, server = enc srv, rcvId = enc rId, subError = show <$> err}
@@ -1,3 +1,4 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
+164
View File
@@ -0,0 +1,164 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
module Simplex.Messaging.Agent.TSessionSubs
( TSessionSubs (sessionSubs),
emptyIO,
clear,
hasActiveSub,
hasPendingSub,
addPendingSub,
setSessionId,
addActiveSub,
batchAddPendingSubs,
deletePendingSub,
deleteSub,
batchDeleteSubs,
hasPendingSubs,
getPendingSubs,
getActiveSubs,
setSubsPending,
)
where
import Control.Concurrent.STM
import Control.Monad
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (isJust)
import qualified Data.Set as S
import Simplex.Messaging.Agent.Protocol (SMPQueue (..))
import Simplex.Messaging.Agent.Store (RcvQueueSub (..), SomeRcvQueue)
import Simplex.Messaging.Client (SMPTransportSession, TransportSessionMode (..))
import Simplex.Messaging.Protocol (RecipientId)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Util (($>>=))
-- the fields in this record have the same data with swapped keys for lookup efficiency,
-- and all methods must maintain this invariant.
data TSessionSubs = TSessionSubs
{ sessionSubs :: TMap SMPTransportSession SessSubs
}
data SessSubs = SessSubs
{ subsSessId :: TVar (Maybe SessionId),
activeSubs :: TMap RecipientId RcvQueueSub,
pendingSubs :: TMap RecipientId RcvQueueSub
}
emptyIO :: IO TSessionSubs
emptyIO = TSessionSubs <$> TM.emptyIO
{-# INLINE emptyIO #-}
clear :: TSessionSubs -> STM ()
clear = TM.clear . sessionSubs
{-# INLINE clear #-}
lookupSubs :: SMPTransportSession -> TSessionSubs -> STM (Maybe SessSubs)
lookupSubs tSess = TM.lookup tSess . sessionSubs
{-# INLINE lookupSubs #-}
getSessSubs :: SMPTransportSession -> TSessionSubs -> STM SessSubs
getSessSubs tSess ss = lookupSubs tSess ss >>= maybe new pure
where
new = do
s <- SessSubs <$> newTVar Nothing <*> newTVar M.empty <*> newTVar M.empty
TM.insert tSess s $ sessionSubs ss
pure s
hasActiveSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool
hasActiveSub = hasQueue_ activeSubs
{-# INLINE hasActiveSub #-}
hasPendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool
hasPendingSub = hasQueue_ pendingSubs
{-# INLINE hasPendingSub #-}
hasQueue_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool
hasQueue_ subs rId tSess ss = isJust <$> (lookupSubs tSess ss $>>= TM.lookup rId . subs)
{-# INLINE hasQueue_ #-}
addPendingSub :: RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM ()
addPendingSub rq tSess ss = getSessSubs tSess ss >>= TM.insert (rcvId rq) rq . pendingSubs
setSessionId :: SessionId -> SMPTransportSession -> TSessionSubs -> STM ()
setSessionId sessId tSess ss = do
s <- getSessSubs tSess ss
readTVar (subsSessId s) >>= \case
Nothing -> writeTVar (subsSessId s) (Just sessId)
Just sessId' -> unless (sessId == sessId') $ void $ setSubsPending_ s $ Just sessId
addActiveSub :: SessionId -> RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM ()
addActiveSub sessId rq tSess ss = do
s <- getSessSubs tSess ss
sessId' <- readTVar $ subsSessId s
let rId = rcvId rq
if Just sessId == sessId'
then do
TM.insert rId rq $ activeSubs s
TM.delete rId $ pendingSubs s
else TM.insert rId rq $ pendingSubs s
batchAddPendingSubs :: [RcvQueueSub] -> SMPTransportSession -> TSessionSubs -> STM ()
batchAddPendingSubs rqs tSess ss = do
s <- getSessSubs tSess ss
modifyTVar' (pendingSubs s) $ M.union $ M.fromList $ map (\rq -> (rcvId rq, rq)) rqs
deletePendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM ()
deletePendingSub rId tSess = lookupSubs tSess >=> mapM_ (TM.delete rId . pendingSubs)
deleteSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM ()
deleteSub rId tSess = lookupSubs tSess >=> mapM_ (\s -> TM.delete rId (activeSubs s) >> TM.delete rId (pendingSubs s))
batchDeleteSubs :: SomeRcvQueue q => [q] -> SMPTransportSession -> TSessionSubs -> STM ()
batchDeleteSubs rqs tSess = lookupSubs tSess >=> mapM_ (\s -> delete (activeSubs s) >> delete (pendingSubs s))
where
rIds = S.fromList $ map queueId rqs
delete = (`modifyTVar'` (`M.withoutKeys` rIds))
hasPendingSubs :: SMPTransportSession -> TSessionSubs -> STM Bool
hasPendingSubs tSess = lookupSubs tSess >=> maybe (pure False) (fmap (not . null) . readTVar . pendingSubs)
getPendingSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
getPendingSubs = getSubs_ pendingSubs
{-# INLINE getPendingSubs #-}
getActiveSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
getActiveSubs = getSubs_ activeSubs
{-# INLINE getActiveSubs #-}
getSubs_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
getSubs_ subs tSess = lookupSubs tSess >=> maybe (pure M.empty) (readTVar . subs)
setSubsPending :: TransportSessionMode -> SMPTransportSession -> SessionId -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
setSubsPending mode tSess@(uId, srv, connId_) sessId tss@(TSessionSubs ss)
| entitySession == isJust connId_ =
TM.lookup tSess ss >>= withSessSubs (`setSubsPending_` Nothing)
| otherwise =
TM.lookupDelete tSess ss >>= withSessSubs setPendingChangeMode
where
entitySession = mode == TSMEntity
sessEntId = if entitySession then Just else const Nothing
withSessSubs run = \case
Nothing -> pure M.empty
Just s -> do
sessId' <- readTVar $ subsSessId s
if Just sessId == sessId' then run s else pure M.empty
setPendingChangeMode s = do
subs <- M.union <$> readTVar (activeSubs s) <*> readTVar (pendingSubs s)
unless (null subs) $
forM_ subs $ \rq -> addPendingSub rq (uId, srv, sessEntId (connId rq)) tss
pure subs
setSubsPending_ :: SessSubs -> Maybe SessionId -> STM (Map RecipientId RcvQueueSub)
setSubsPending_ s sessId_ = do
writeTVar (subsSessId s) sessId_
let as = activeSubs s
subs <- readTVar as
unless (null subs) $ do
writeTVar as M.empty
modifyTVar' (pendingSubs s) $ M.union subs
pure subs
+3
View File
@@ -29,6 +29,7 @@
module Simplex.Messaging.Client
( -- * Connect (disconnect) client to (from) SMP server
TransportSession,
SMPTransportSession,
ProtocolClient (thParams, sessionTs),
SMPClient,
ProxiedRelay (..),
@@ -549,6 +550,8 @@ type UserId = Int64
-- Please note that for SMP connection ID is used as entity ID, not queue ID.
type TransportSession msg = (UserId, ProtoServer msg, Maybe ByteString)
type SMPTransportSession = TransportSession BrokerMsg
-- | Connects to 'ProtocolServer' using passed client configuration
-- and queue for messages and notifications.
--
+3
View File
@@ -3569,6 +3569,7 @@ testTwoUsers = withAgentClients2 $ \a b -> do
liftIO $ threadDelay 250000
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
a `hasClients` 2
exchangeGreetingsMsgId 4 a bId1 b aId1
@@ -3595,6 +3596,8 @@ testTwoUsers = withAgentClients2 $ \a b -> do
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
a `hasClients` 4
exchangeGreetingsMsgId 6 a bId1 b aId1
exchangeGreetingsMsgId 6 a bId1' b aId1'
+6 -6
View File
@@ -103,7 +103,7 @@ main = do
testStoreDBOpts
"src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql"
around_ (postgressBracket testServerDBConnectInfo) $ do
describe "SMP server via TLS, postgres+jornal message store" $
xdescribe "SMP server via TLS, postgres+jornal message store" $
before (pure (transport @TLS, ASType SQSPostgres SMSJournal)) serverTests
describe "SMP server via TLS, postgres-only message store" $
before (pure (transport @TLS, ASType SQSPostgres SMSPostgres)) serverTests
@@ -128,19 +128,19 @@ main = do
describe "Notifications server (SMP server: jornal store)" $
ntfServerTests (transport @TLS, ASType SQSMemory SMSJournal)
around_ (postgressBracket testServerDBConnectInfo) $ do
describe "Notifications server (SMP server: postgres+jornal store)" $
xdescribe "Notifications server (SMP server: postgres+jornal store)" $
ntfServerTests (transport @TLS, ASType SQSPostgres SMSJournal)
describe "Notifications server (SMP server: postgres-only store)" $
ntfServerTests (transport @TLS, ASType SQSPostgres SMSPostgres)
around_ (postgressBracket testServerDBConnectInfo) $ do
describe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal)
describe "SMP client agent, postgres-only message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSPostgres)
describe "SMP proxy, postgres+jornal message store" $
xdescribe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal)
fdescribe "SMP client agent, postgres-only message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSPostgres)
xdescribe "SMP proxy, postgres+jornal message store" $
before (pure $ ASType SQSPostgres SMSJournal) smpProxyTests
describe "SMP proxy, postgres-only message store" $
before (pure $ ASType SQSPostgres SMSPostgres) smpProxyTests
#endif
describe "SMP client agent, jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal)
xdescribe "SMP client agent, jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal)
describe "SMP proxy, jornal message store" $
before (pure $ ASType SQSMemory SMSJournal) smpProxyTests
describe "XFTP" $ do
+2 -2
View File
@@ -48,7 +48,7 @@ newtype TestWrapper a = TestWrapper a
-- TODO [ntfdb] running wiht LogWarn level shows potential issue "Queue count differs"
testLogLevel :: LogLevel
testLogLevel = LogError
testLogLevel = LogWarn
instance Example a => Example (TestWrapper a) where
type Arg (TestWrapper a) = Arg a
@@ -56,7 +56,7 @@ instance Example a => Example (TestWrapper a) where
ci <- envCI
runTest `E.catches` [E.Handler (onTestFailure ci), E.Handler (onTestException ci)]
where
tt = 120
tt = 30
runTest =
timeout (tt * 1000000) (evaluateExample action params hooks state) `finally` callCommand "sync" >>= \case
Just r -> pure r