mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 14:16:00 +00:00
agent: optimize subscriptions memory usage (#1651)
* agent: optimize subscriptions memory usage more (do not store subscribed queues in memory) WIP * use new session subscriptions data * version * remove old data structure * remove version * batch deletions * test TSessionSubs * comment
This commit is contained in:
@@ -555,13 +555,14 @@ testProtocolServer c nm userId srv = withAgentEnv' c $ case protocolTypeI @p of
|
||||
-- | set SOCKS5 proxy on/off and optionally set TCP timeouts for fast network
|
||||
setNetworkConfig :: AgentClient -> NetworkConfig -> IO ()
|
||||
setNetworkConfig c@AgentClient {useNetworkConfig, proxySessTs} cfg' = do
|
||||
(spChanged, changed) <- atomically $ do
|
||||
ts <- getCurrentTime
|
||||
changed <- atomically $ do
|
||||
(_, cfg) <- readTVar useNetworkConfig
|
||||
let changed = cfg /= cfg'
|
||||
!cfgSlow = slowNetworkConfig cfg'
|
||||
when changed $ writeTVar useNetworkConfig (cfgSlow, cfg')
|
||||
pure (socksProxy cfg /= socksProxy cfg', changed)
|
||||
when spChanged $ getCurrentTime >>= atomically . writeTVar proxySessTs
|
||||
when (socksProxy cfg /= socksProxy cfg') $ writeTVar proxySessTs ts
|
||||
pure changed
|
||||
when changed $ reconnectAllServers c
|
||||
|
||||
setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO ()
|
||||
@@ -1270,7 +1271,7 @@ subscribeConnections_ c conns = do
|
||||
let (subRs, cs) = foldr partitionResultsConns ([], []) conns
|
||||
resumeDelivery cs
|
||||
resumeConnCmds c $ map fst cs
|
||||
rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concatMap rcvQueues cs)
|
||||
rcvRs <- lift $ connResults <$> subscribeQueues c (concatMap rcvQueues cs) False
|
||||
rcvRs' <- storeClientServiceAssocs rcvRs
|
||||
ns <- asks ntfSupervisor
|
||||
lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs' cs
|
||||
|
||||
@@ -201,6 +201,8 @@ import Data.Bifunctor (bimap, first, second)
|
||||
import qualified Data.ByteString.Base64 as B64
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Composition ((.:), (.:.))
|
||||
import Data.Containers.ListUtils (nubOrd)
|
||||
import Data.Either (isRight, partitionEithers)
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
@@ -234,8 +236,8 @@ import Simplex.Messaging.Agent.Stats
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction)
|
||||
import qualified Simplex.Messaging.Agent.Store.DB as DB
|
||||
import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues (getRcvQueues))
|
||||
import qualified Simplex.Messaging.Agent.TRcvQueues as RQ
|
||||
import Simplex.Messaging.Agent.TSessionSubs (TSessionSubs)
|
||||
import qualified Simplex.Messaging.Agent.TSessionSubs as SS
|
||||
import Simplex.Messaging.Client
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding
|
||||
@@ -311,8 +313,6 @@ type NtfClientVar = ClientVar NtfResponse
|
||||
|
||||
type XFTPClientVar = ClientVar FileResponse
|
||||
|
||||
type SMPTransportSession = TransportSession SMP.BrokerMsg
|
||||
|
||||
type NtfTransportSession = TransportSession NtfResponse
|
||||
|
||||
type XFTPTransportSession = TransportSession FileResponse
|
||||
@@ -337,8 +337,7 @@ data AgentClient = AgentClient
|
||||
userNetworkInfo :: TVar UserNetworkInfo,
|
||||
userNetworkUpdated :: TVar (Maybe UTCTime),
|
||||
subscrConns :: TVar (Set ConnId),
|
||||
activeSubs :: TRcvQueues (SessionId, RcvQueueSub),
|
||||
pendingSubs :: TRcvQueues RcvQueueSub,
|
||||
currentSubs :: TSessionSubs,
|
||||
removedSubs :: TMap (UserId, SMPServer, SMP.RecipientId) SMPClientError,
|
||||
workerSeq :: TVar Int,
|
||||
smpDeliveryWorkers :: TMap SndQAddr (Worker, TMVar ()),
|
||||
@@ -505,8 +504,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai
|
||||
userNetworkInfo <- newTVarIO $ UserNetworkInfo UNOther True
|
||||
userNetworkUpdated <- newTVarIO Nothing
|
||||
subscrConns <- newTVarIO S.empty
|
||||
activeSubs <- RQ.empty
|
||||
pendingSubs <- RQ.empty
|
||||
currentSubs <- SS.emptyIO
|
||||
removedSubs <- TM.emptyIO
|
||||
workerSeq <- newTVarIO 0
|
||||
smpDeliveryWorkers <- TM.emptyIO
|
||||
@@ -544,8 +542,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai
|
||||
userNetworkInfo,
|
||||
userNetworkUpdated,
|
||||
subscrConns,
|
||||
activeSubs,
|
||||
pendingSubs,
|
||||
currentSubs,
|
||||
removedSubs,
|
||||
workerSeq,
|
||||
smpDeliveryWorkers,
|
||||
@@ -701,10 +698,11 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm
|
||||
liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do
|
||||
ts <- readTVarIO proxySessTs
|
||||
smp <- ExceptT $ getProtocolClient g nm tSess cfg presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs
|
||||
atomically $ SS.setSessionId (sessionId $ thParams smp) tSess $ currentSubs c
|
||||
pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs}
|
||||
|
||||
smpClientDisconnected :: AgentClient -> SMPTransportSession -> Env -> SMPClientVar -> TMap SMPServer ProxiedRelayVar -> SMPClient -> IO ()
|
||||
smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess@(userId, srv, qId) env v prs client = do
|
||||
smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess@(userId, srv, cId) env v prs client = do
|
||||
removeClientAndSubs >>= serverDown
|
||||
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
|
||||
where
|
||||
@@ -718,23 +716,26 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess
|
||||
where
|
||||
sessId = sessionId $ thParams client
|
||||
removeSubs = do
|
||||
(qs, cs) <- RQ.getDelSessQueues tSess sessId $ activeSubs c
|
||||
RQ.batchAddQueues qs $ pendingSubs c
|
||||
mode <- getSessionMode c
|
||||
subs <- SS.setSubsPending mode tSess sessId $ currentSubs c
|
||||
let qs = M.elems subs
|
||||
cs = nubOrd $ map qConnId qs
|
||||
-- this removes proxied relays that this client created sessions to
|
||||
destSrvs <- M.keys <$> readTVar prs
|
||||
forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, qId) smpProxiedRelays
|
||||
forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, cId) smpProxiedRelays
|
||||
pure (qs, cs)
|
||||
|
||||
serverDown :: ([RcvQueueSub], [ConnId]) -> IO ()
|
||||
serverDown (qs, conns) = whenM (readTVarIO active) $ do
|
||||
notifySub "" $ hostEvent' DISCONNECT client
|
||||
unless (null conns) $ notifySub "" $ DOWN srv conns
|
||||
notifySub c "" $ hostEvent' DISCONNECT client
|
||||
unless (null conns) $ notifySub c "" $ DOWN srv conns
|
||||
unless (null qs) $ do
|
||||
atomically $ mapM_ (releaseGetLock c) qs
|
||||
runReaderT (resubscribeSMPSession c tSess) env
|
||||
|
||||
notifySub :: forall e. AEntityI e => ConnId -> AEvent e -> IO ()
|
||||
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd)
|
||||
releaseGetLocksIO c qs
|
||||
mode <- getSessionModeIO c
|
||||
let resubscribe
|
||||
| (mode == TSMEntity) == isJust cId = resubscribeSMPSession c tSess
|
||||
| otherwise = void $ subscribeQueues c qs True
|
||||
runReaderT resubscribe env
|
||||
|
||||
resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' ()
|
||||
resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
|
||||
@@ -743,7 +744,7 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
|
||||
where
|
||||
getWorkerVar ts =
|
||||
ifM
|
||||
(not <$> RQ.hasSessQueues tSess (pendingSubs c))
|
||||
(not <$> SS.hasPendingSubs tSess (currentSubs c))
|
||||
(pure Nothing) -- prevent race with cleanup and adding pending queues in another call
|
||||
(Just <$> getSessVar workerSeq tSess smpSubWorkers ts)
|
||||
newSubWorker v = do
|
||||
@@ -752,11 +753,11 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
|
||||
runSubWorker = do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryForeground ri isForeground (isNetworkOnline c) $ \_ loop -> do
|
||||
pending <- liftIO $ RQ.getSessQueues tSess $ pendingSubs c
|
||||
forM_ (L.nonEmpty pending) $ \qs -> do
|
||||
pending <- atomically $ SS.getPendingSubs tSess $ currentSubs c
|
||||
unless (M.null pending) $ do
|
||||
liftIO $ waitUntilForeground c
|
||||
liftIO $ waitForUserNetwork c
|
||||
reconnectSMPClient c tSess qs
|
||||
handleNotify $ resubscribeSessQueues c tSess $ M.elems pending
|
||||
loop
|
||||
isForeground = (ASForeground ==) <$> readTVar (agentState c)
|
||||
cleanup :: SessionVar (Async ()) -> STM ()
|
||||
@@ -765,28 +766,11 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
|
||||
-- Not waiting may result in terminated worker remaining in the map.
|
||||
whenM (isEmptyTMVar $ sessionVar v) retry
|
||||
removeSessVar v tSess smpSubWorkers
|
||||
|
||||
reconnectSMPClient :: AgentClient -> SMPTransportSession -> NonEmpty RcvQueueSub -> AM' ()
|
||||
reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do
|
||||
cs <- liftIO $ RQ.getSessConns tSess $ activeSubs c
|
||||
(rs, sessId_) <- subscribeQueues c $ L.toList qs
|
||||
let (errs, okConns) = partitionEithers $ map (\(RcvQueueSub {connId}, r) -> bimap (connId,) (const connId) r) rs
|
||||
conns = filter (`S.notMember` cs) okConns
|
||||
unless (null conns) $ notifySub "" $ UP srv conns
|
||||
let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs
|
||||
mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs
|
||||
forM_ (listToMaybe tempErrs) $ \(connId, e) -> do
|
||||
when (null okConns && S.null cs && null finalErrs) . liftIO $
|
||||
forM_ sessId_ $ \sessId -> do
|
||||
-- We only close the client session that was used to subscribe.
|
||||
v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing)
|
||||
mapM_ (closeClient_ c) v_
|
||||
notifySub connId $ ERR e
|
||||
where
|
||||
handleNotify :: AM' () -> AM' ()
|
||||
handleNotify = E.handleAny $ notifySub "" . ERR . INTERNAL . show
|
||||
notifySub :: forall e. AEntityI e => ConnId -> AEvent e -> AM' ()
|
||||
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd)
|
||||
handleNotify = E.handleAny $ notifySub c "" . ERR . INTERNAL . show
|
||||
|
||||
notifySub :: forall e m. (AEntityI e, MonadIO m) => AgentClient -> ConnId -> AEvent e -> m ()
|
||||
notifySub c connId cmd = liftIO $ nonBlockingWriteTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd)
|
||||
|
||||
getNtfServerClient :: AgentClient -> NetworkRequestMode -> NtfTransportSession -> AM NtfClient
|
||||
getNtfServerClient c@AgentClient {active, ntfClients, workerSeq, proxySessTs, presetDomains} nm tSess@(_, srv, _) = do
|
||||
@@ -929,8 +913,7 @@ closeAgentClient c = do
|
||||
atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect
|
||||
clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst)
|
||||
clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker
|
||||
atomically . RQ.clear $ activeSubs c
|
||||
atomically . RQ.clear $ pendingSubs c
|
||||
atomically $ SS.clear $ currentSubs c
|
||||
clear subscrConns
|
||||
clear getMsgLocks
|
||||
where
|
||||
@@ -1071,7 +1054,7 @@ withLogClient c nm tSess entId cmdStr action = withLogClient_ c nm tSess entId c
|
||||
|
||||
withSMPClient :: SMPQueueRec q => AgentClient -> NetworkRequestMode -> q -> ByteString -> (SMPClient -> ExceptT SMPClientError IO a) -> AM a
|
||||
withSMPClient c nm q cmdStr action = do
|
||||
tSess <- mkSMPTransportSession c q
|
||||
tSess <- mkSMPTransportSessionIO c q
|
||||
withLogClient c nm tSess (unEntityId $ queueId q) cmdStr $ action . connectedClient
|
||||
|
||||
sendOrProxySMPMessage :: AgentClient -> NetworkRequestMode -> UserId -> SMPServer -> ConnId -> ByteString -> Maybe SMP.SndPrivateAuthKey -> SMP.SenderId -> MsgFlags -> SMP.MsgBody -> AM (Maybe SMPServer)
|
||||
@@ -1336,14 +1319,18 @@ getXFTPWorkPath = do
|
||||
maybe getTemporaryDirectory pure workDir
|
||||
|
||||
mkTransportSession :: MonadIO m => AgentClient -> UserId -> ProtoServer msg -> ByteString -> m (TransportSession msg)
|
||||
mkTransportSession c userId srv sessEntId = mkTSession userId srv sessEntId <$> getSessionMode c
|
||||
mkTransportSession c userId srv sessEntId = mkTSession userId srv sessEntId <$> getSessionModeIO c
|
||||
{-# INLINE mkTransportSession #-}
|
||||
|
||||
mkTSession :: UserId -> ProtoServer msg -> ByteString -> TransportSessionMode -> TransportSession msg
|
||||
mkTSession userId srv sessEntId mode = (userId, srv, if mode == TSMEntity then Just sessEntId else Nothing)
|
||||
{-# INLINE mkTSession #-}
|
||||
|
||||
mkSMPTransportSession :: (SMPQueueRec q, MonadIO m) => AgentClient -> q -> m SMPTransportSession
|
||||
mkSMPTransportSessionIO :: (SMPQueueRec q, MonadIO m) => AgentClient -> q -> m SMPTransportSession
|
||||
mkSMPTransportSessionIO c q = mkSMPTSession q <$> getSessionModeIO c
|
||||
{-# INLINE mkSMPTransportSessionIO #-}
|
||||
|
||||
mkSMPTransportSession :: SMPQueueRec q => AgentClient -> q -> STM SMPTransportSession
|
||||
mkSMPTransportSession c q = mkSMPTSession q <$> getSessionMode c
|
||||
{-# INLINE mkSMPTransportSession #-}
|
||||
|
||||
@@ -1351,8 +1338,12 @@ mkSMPTSession :: SMPQueueRec q => q -> TransportSessionMode -> SMPTransportSessi
|
||||
mkSMPTSession q = mkTSession (qUserId q) (qServer q) (qConnId q)
|
||||
{-# INLINE mkSMPTSession #-}
|
||||
|
||||
getSessionMode :: MonadIO m => AgentClient -> m TransportSessionMode
|
||||
getSessionMode = fmap sessionMode . getNetworkConfig
|
||||
getSessionModeIO :: MonadIO m => AgentClient -> m TransportSessionMode
|
||||
getSessionModeIO = fmap (sessionMode . snd) . readTVarIO . useNetworkConfig
|
||||
{-# INLINE getSessionModeIO #-}
|
||||
|
||||
getSessionMode :: AgentClient -> STM TransportSessionMode
|
||||
getSessionMode = fmap (sessionMode . snd) . readTVar . useNetworkConfig
|
||||
{-# INLINE getSessionMode #-}
|
||||
|
||||
newRcvQueue :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SConnectionMode c -> Bool -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId)
|
||||
@@ -1500,46 +1491,84 @@ serverHostError = \case
|
||||
SMP.TRANSPORT TEVersion -> True
|
||||
_ -> False
|
||||
|
||||
-- | Subscribe to queues. The list of results can have a different order.
|
||||
subscribeQueues :: AgentClient -> [RcvQueueSub] -> AM' ([(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))], Maybe SessionId)
|
||||
subscribeQueues c qs = do
|
||||
(errs, qs') <- partitionEithers <$> mapM checkQueue qs
|
||||
atomically $ do
|
||||
modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId qs'))
|
||||
RQ.batchAddQueues qs' $ pendingSubs c
|
||||
env <- ask
|
||||
-- only "checked" queues are subscribed
|
||||
session <- newTVarIO Nothing
|
||||
rs <- sendTSessionBatches "SUB" mkSMPTSession (subscribeQueues_ env session) c NRMBackground qs'
|
||||
(errs <> rs,) <$> readTVarIO session
|
||||
-- | Batch by transport session and subscribe queues. The list of results can have a different order.
|
||||
subscribeQueues :: AgentClient -> [RcvQueueSub] -> Bool -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))]
|
||||
subscribeQueues c qs withEvents = do
|
||||
(errs, qs') <- checkQueues c qs
|
||||
atomically $ modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId qs'))
|
||||
qss <- batchQueues mkSMPTSession c qs' <$> getSessionModeIO c
|
||||
mapM_ addPendingSubs qss
|
||||
rs <- mapConcurrently subscribeQueues_ qss
|
||||
when (withEvents && not (null errs)) $ notifySub c "" $ ERRS $ map (first qConnId) errs
|
||||
pure $ map (second Left) errs <> concatMap L.toList rs
|
||||
where
|
||||
checkQueue rq = do
|
||||
prohibited <- liftIO $ hasGetLock c rq
|
||||
pure $ if prohibited then Left (rq, Left $ CMD PROHIBITED "subscribeQueues") else Right rq
|
||||
subscribeQueues_ :: Env -> TVar (Maybe SessionId) -> SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId))
|
||||
subscribeQueues_ env session smp qs' = do
|
||||
let (userId, srv, _) = transportSession' smp
|
||||
atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs'
|
||||
rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs'
|
||||
active <-
|
||||
atomically $
|
||||
ifM
|
||||
(activeClientSession c tSess sessId)
|
||||
(writeTVar session (Just sessId) >> processSubResults rs $> True)
|
||||
(incSMPServerStat' c userId srv connSubIgnored (length rs) $> False)
|
||||
addPendingSubs (tSess, qs') = atomically $ SS.batchAddPendingSubs (L.toList qs') tSess $ currentSubs c
|
||||
subscribeQueues_ qs'@(tSess@(_, srv, _), _) = do
|
||||
(rs, active) <- subscribeSessQueues_ c qs' withEvents
|
||||
if active
|
||||
then when (hasTempErrors rs) resubscribe $> rs
|
||||
else do
|
||||
logWarn "subcription batch result for replaced SMP client, resubscribing"
|
||||
-- TODO we probably use PCENetworkError here instead of the original error, so it becomes temporary.
|
||||
resubscribe $> L.map (second $ Left . PCENetworkError . NESubscribeError . show) rs
|
||||
-- we use BROKER NETWORK error here instead of the original error, so it becomes temporary.
|
||||
resubscribe $> L.map (second $ Left . toNESubscribeError) rs
|
||||
where
|
||||
-- treating host errors as temporary here as well
|
||||
hasTempErrors = any (either temporaryOrHostError (const False) . snd)
|
||||
toNESubscribeError = BROKER (B.unpack $ strEncode srv) . NETWORK . NESubscribeError . show
|
||||
resubscribe = resubscribeSMPSession c tSess
|
||||
|
||||
-- only "checked" queues are subscribed
|
||||
checkQueues :: AgentClient -> [RcvQueueSub] -> AM' ([(RcvQueueSub, AgentErrorType)], [RcvQueueSub])
|
||||
checkQueues c = fmap partitionEithers . mapM checkQueue
|
||||
where
|
||||
checkQueue rq = do
|
||||
prohibited <- liftIO $ hasGetLock c rq
|
||||
pure $ if prohibited then Left (rq, CMD PROHIBITED "checkQueues") else Right rq
|
||||
|
||||
-- This function expects that all queues belong to one transport session,
|
||||
-- and that they are already added to pending subscriptions.
|
||||
resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [RcvQueueSub] -> AM' ()
|
||||
resubscribeSessQueues c tSess qs = do
|
||||
(errs, qs_) <- checkQueues c qs
|
||||
forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c (tSess, qs') True
|
||||
unless (null errs) $ notifySub c "" $ ERRS $ map (first qConnId) errs
|
||||
|
||||
subscribeSessQueues_ :: AgentClient -> (SMPTransportSession, NonEmpty RcvQueueSub) -> Bool -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool)
|
||||
subscribeSessQueues_ c qs withEvents = sendClientBatch_ "SUB" False subscribeQueues_ c NRMBackground qs
|
||||
where
|
||||
subscribeQueues_ :: SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId), Bool)
|
||||
subscribeQueues_ smp qs' = do
|
||||
let (userId, srv, _) = tSess
|
||||
atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs'
|
||||
rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs'
|
||||
cs_ <-
|
||||
if withEvents
|
||||
then Just . S.fromList . map qConnId . M.elems <$> atomically (SS.getActiveSubs tSess $ currentSubs c)
|
||||
else pure Nothing
|
||||
active <-
|
||||
atomically $
|
||||
ifM
|
||||
(activeClientSession c tSess sessId)
|
||||
(processSubResults rs $> True)
|
||||
(incSMPServerStat' c userId srv connSubIgnored (length rs) $> False)
|
||||
forM_ cs_ $ \cs -> do
|
||||
let (errs, okConns) = partitionEithers $ map (\(RcvQueueSub {connId}, r) -> bimap (connId,) (const connId) r) $ L.toList rs
|
||||
conns = filter (`S.notMember` cs) okConns
|
||||
unless (null conns) $ notifySub c "" $ UP srv conns
|
||||
let (tempErrs, finalErrs) = partition (temporaryClientError . snd) errs
|
||||
mapM_ (\(connId, e) -> notifySub c connId $ ERR $ protocolClientError SMP (clientServer smp) e) finalErrs
|
||||
forM_ (listToMaybe tempErrs) $ \(connId, e) -> do
|
||||
when (null okConns && S.null cs && null finalErrs && active) $ liftIO $ do
|
||||
-- We only close the client session that was used to subscribe.
|
||||
v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing)
|
||||
mapM_ (closeClient_ c) v_
|
||||
notifySub c connId $ ERR $ protocolClientError SMP (clientServer smp) e
|
||||
pure (rs, active)
|
||||
where
|
||||
tSess = transportSession' smp
|
||||
sessId = sessionId $ thParams smp
|
||||
hasTempErrors = any (either temporaryClientError (const False) . snd)
|
||||
processSubResults :: NonEmpty (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> STM ()
|
||||
processSubResults = mapM_ $ uncurry $ processSubResult c sessId
|
||||
resubscribe = resubscribeSMPSession c tSess `runReaderT` env
|
||||
|
||||
activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool
|
||||
activeClientSession c tSess sessId = sameSess <$> tryReadSessVar tSess (smpClients c)
|
||||
@@ -1553,26 +1582,30 @@ type BatchResponses q e r = NonEmpty (q, Either e r)
|
||||
-- Please note: this function does not preserve order of results to be the same as the order of arguments,
|
||||
-- it includes arguments in the results instead.
|
||||
sendTSessionBatches :: forall q r. ByteString -> (q -> TransportSessionMode -> SMPTransportSession) -> (SMPClient -> NonEmpty q -> IO (BatchResponses q SMPClientError r)) -> AgentClient -> NetworkRequestMode -> [q] -> AM' [(q, Either AgentErrorType r)]
|
||||
sendTSessionBatches statCmd mkSession action c nm qs =
|
||||
concatMap L.toList <$> (mapConcurrently sendClientBatch =<< batchQueues)
|
||||
sendTSessionBatches statCmd mkSession action c nm qs = do
|
||||
qs' <- batchQueues mkSession c qs <$> getSessionModeIO c
|
||||
concatMap L.toList <$> mapConcurrently (sendClientBatch statCmd action c nm) qs'
|
||||
|
||||
batchQueues :: (q -> TransportSessionMode -> SMPTransportSession) -> AgentClient -> [q] -> TransportSessionMode -> [(SMPTransportSession, NonEmpty q)]
|
||||
batchQueues mkSession c qs mode = M.assocs $ foldr batch M.empty qs
|
||||
where
|
||||
batchQueues :: AM' [(SMPTransportSession, NonEmpty q)]
|
||||
batchQueues = do
|
||||
mode <- getSessionMode c
|
||||
pure . M.assocs $ foldr (batch mode) M.empty qs
|
||||
batch q m =
|
||||
let tSess = mkSession q mode
|
||||
in M.alter (Just . maybe [q] (q <|)) tSess m
|
||||
|
||||
sendClientBatch :: ByteString -> (SMPClient -> NonEmpty q -> IO (BatchResponses q SMPClientError r)) -> AgentClient -> NetworkRequestMode -> (SMPTransportSession, NonEmpty q) -> AM' (BatchResponses q AgentErrorType r)
|
||||
sendClientBatch statCmd action = fmap fst .:. sendClientBatch_ statCmd () (fmap (,()) .: action)
|
||||
{-# INLINE sendClientBatch #-}
|
||||
|
||||
sendClientBatch_ :: ByteString -> res -> (SMPClient -> NonEmpty q -> IO (BatchResponses q SMPClientError r, res)) -> AgentClient -> NetworkRequestMode -> (SMPTransportSession, NonEmpty q) -> AM' (BatchResponses q AgentErrorType r, res)
|
||||
sendClientBatch_ statCmd errRes action c nm (tSess@(_, srv, _), qs') =
|
||||
tryAllErrors' (getSMPServerClient c nm tSess) >>= \case
|
||||
Left e -> pure (L.map (,Left e) qs', errRes)
|
||||
Right (SMPConnectedClient smp _) -> liftIO $ do
|
||||
logServer' "-->" c srv (bshow (length qs') <> " queues") statCmd
|
||||
first (L.map agentError) <$> action smp qs'
|
||||
where
|
||||
batch mode q m =
|
||||
let tSess = mkSession q mode
|
||||
in M.alter (Just . maybe [q] (q <|)) tSess m
|
||||
sendClientBatch :: (SMPTransportSession, NonEmpty q) -> AM' (BatchResponses q AgentErrorType r)
|
||||
sendClientBatch (tSess@(_, srv, _), qs') =
|
||||
tryAllErrors' (getSMPServerClient c nm tSess) >>= \case
|
||||
Left e -> pure $ L.map (,Left e) qs'
|
||||
Right (SMPConnectedClient smp _) -> liftIO $ do
|
||||
logServer' "-->" c srv (bshow (length qs') <> " queues") statCmd
|
||||
L.map agentError <$> action smp qs'
|
||||
where
|
||||
agentError = second . first $ protocolClientError SMP $ clientServer smp
|
||||
agentError = second . first $ protocolClientError SMP $ clientServer smp
|
||||
|
||||
sendBatch :: SomeRcvQueue q => (SMPClient -> NetworkRequestMode -> NonEmpty (SMP.RecipientId, SMP.RcvPrivateAuthKey) -> IO (NonEmpty (Either SMPClientError a))) -> SMPClient -> NetworkRequestMode -> NonEmpty q -> IO (BatchResponses q SMPClientError a)
|
||||
sendBatch smpCmdFunc smp nm qs = L.zip qs <$> smpCmdFunc smp nm (L.map queueCreds qs)
|
||||
@@ -1580,20 +1613,22 @@ sendBatch smpCmdFunc smp nm qs = L.zip qs <$> smpCmdFunc smp nm (L.map queueCred
|
||||
queueCreds q = (queueId q, rcvAuthKey q)
|
||||
|
||||
addSubscription :: AgentClient -> SessionId -> RcvQueueSub -> STM ()
|
||||
addSubscription c sessId rq@RcvQueueSub {connId} = do
|
||||
modifyTVar' (subscrConns c) $ S.insert connId
|
||||
RQ.addSessQueue (sessId, rq) $ activeSubs c
|
||||
RQ.deleteQueue rq $ pendingSubs c
|
||||
addSubscription c sessId rq = do
|
||||
modifyTVar' (subscrConns c) $ S.insert $ qConnId rq
|
||||
tSess <- mkSMPTransportSession c rq
|
||||
SS.addActiveSub sessId rq tSess $ currentSubs c
|
||||
|
||||
failSubscription :: SomeRcvQueue q => AgentClient -> q -> SMPClientError -> STM ()
|
||||
failSubscription c rq e = do
|
||||
RQ.deleteQueue rq (pendingSubs c)
|
||||
TM.insert (RQ.qKey rq) e (removedSubs c)
|
||||
TM.insert (qUserId rq, qServer rq, queueId rq) e (removedSubs c)
|
||||
tSess <- mkSMPTransportSession c rq
|
||||
SS.deletePendingSub (queueId rq) tSess $ currentSubs c
|
||||
|
||||
addPendingSubscription :: AgentClient -> RcvQueueSub -> STM ()
|
||||
addPendingSubscription c rq@RcvQueueSub {connId} = do
|
||||
modifyTVar' (subscrConns c) $ S.insert connId
|
||||
RQ.addQueue rq $ pendingSubs c
|
||||
addPendingSubscription c rq = do
|
||||
modifyTVar' (subscrConns c) $ S.insert $ qConnId rq
|
||||
tSess <- mkSMPTransportSession c rq
|
||||
SS.addPendingSub rq tSess $ currentSubs c
|
||||
|
||||
addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> AM' ()
|
||||
addNewQueueSubscription c rq' tSess sessId = do
|
||||
@@ -1607,24 +1642,28 @@ addNewQueueSubscription c rq' tSess sessId = do
|
||||
unless same $ resubscribeSMPSession c tSess
|
||||
|
||||
hasActiveSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool
|
||||
hasActiveSubscription c rq = RQ.hasQueue rq $ activeSubs c
|
||||
hasActiveSubscription c rq = do
|
||||
tSess <- mkSMPTransportSession c rq
|
||||
SS.hasActiveSub (queueId rq) tSess $ currentSubs c
|
||||
{-# INLINE hasActiveSubscription #-}
|
||||
|
||||
hasPendingSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool
|
||||
hasPendingSubscription c rq = RQ.hasQueue rq $ pendingSubs c
|
||||
hasPendingSubscription c rq = do
|
||||
tSess <- mkSMPTransportSession c rq
|
||||
SS.hasPendingSub (queueId rq) tSess $ currentSubs c
|
||||
{-# INLINE hasPendingSubscription #-}
|
||||
|
||||
removeSubscription :: SomeRcvQueue q => AgentClient -> ConnId -> q -> STM ()
|
||||
removeSubscription c connId rq = do
|
||||
modifyTVar' (subscrConns c) $ S.delete connId
|
||||
RQ.deleteQueue rq $ activeSubs c
|
||||
RQ.deleteQueue rq $ pendingSubs c
|
||||
tSess <- mkSMPTransportSession c rq
|
||||
SS.deleteSub (queueId rq) tSess $ currentSubs c
|
||||
|
||||
removeSubscriptions :: SomeRcvQueue q => AgentClient -> [ConnId] -> [q] -> STM ()
|
||||
removeSubscriptions c connIds rqs = do
|
||||
removeSubscriptions c connIds qs = do
|
||||
unless (null connIds) $ modifyTVar' (subscrConns c) (`S.difference` (S.fromList connIds))
|
||||
RQ.batchDeleteQueues rqs $ activeSubs c
|
||||
RQ.batchDeleteQueues rqs $ pendingSubs c
|
||||
qss <- batchQueues mkSMPTSession c qs <$> getSessionMode c
|
||||
forM_ qss $ \(tSess, qs') -> SS.batchDeleteSubs (L.toList qs') tSess $ currentSubs c
|
||||
|
||||
getSubscriptions :: AgentClient -> IO (Set ConnId)
|
||||
getSubscriptions = readTVarIO . subscrConns
|
||||
@@ -1782,6 +1821,13 @@ releaseGetLock c rq =
|
||||
TM.lookup (qServer rq, queueId rq) (getMsgLocks c) >>= mapM_ (`tryPutTMVar` ())
|
||||
{-# INLINE releaseGetLock #-}
|
||||
|
||||
releaseGetLocksIO :: SomeRcvQueue q => AgentClient -> [q] -> IO ()
|
||||
releaseGetLocksIO c rqs = do
|
||||
locks <- readTVarIO $ getMsgLocks c
|
||||
forM_ rqs $ \rq ->
|
||||
forM_ (M.lookup ((qServer rq, queueId rq)) locks) $ \lock ->
|
||||
atomically $ tryPutTMVar lock ()
|
||||
|
||||
suspendQueue :: AgentClient -> NetworkRequestMode -> RcvQueue -> AM ()
|
||||
suspendQueue c nm rq@RcvQueue {rcvId, rcvPrivateKey} =
|
||||
withSMPClient c nm rq "OFF" $ \smp ->
|
||||
@@ -2321,15 +2367,16 @@ data ServerSessions = ServerSessions
|
||||
|
||||
getAgentSubsTotal :: AgentClient -> [UserId] -> IO (SMPServerSubs, Bool)
|
||||
getAgentSubsTotal c userIds = do
|
||||
ssActive <- getSubsCount activeSubs
|
||||
ssPending <- getSubsCount pendingSubs
|
||||
(ssActive, ssPending) <- SS.foldSessionSubs addSub (0, 0) $ currentSubs c
|
||||
sess <- hasSession . M.toList =<< readTVarIO (smpClients c)
|
||||
pure (SMPServerSubs {ssActive, ssPending}, sess)
|
||||
where
|
||||
getSubsCount :: (AgentClient -> TRcvQueues q) -> IO Int
|
||||
getSubsCount subs = M.foldrWithKey' addSub 0 <$> readTVarIO (getRcvQueues $ subs c)
|
||||
addSub :: (UserId, SMPServer, SMP.RecipientId) -> q -> Int -> Int
|
||||
addSub (userId, _, _) _ cnt = if userId `elem` userIds then cnt + 1 else cnt
|
||||
addSub :: (Int, Int) -> (SMPTransportSession, SS.SessSubs) -> IO (Int, Int)
|
||||
addSub acc@(!ssActive, !ssPending) ((userId, _, _), s)
|
||||
| userId `elem` userIds = do
|
||||
(active, pending) <- SS.mapSubs M.size s
|
||||
pure (ssActive + active, ssPending + pending)
|
||||
| otherwise = pure acc
|
||||
hasSession :: [(SMPTransportSession, SMPClientVar)] -> IO Bool
|
||||
hasSession = \case
|
||||
[] -> pure False
|
||||
@@ -2366,13 +2413,12 @@ getAgentServersSummary c@AgentClient {smpServersStats, xftpServersStats, ntfServ
|
||||
ntfServersSessions
|
||||
}
|
||||
where
|
||||
getServerSubs = do
|
||||
subs <- M.foldrWithKey' (addSub incActive) M.empty <$> readTVarIO (getRcvQueues $ activeSubs c)
|
||||
M.foldrWithKey' (addSub incPending) subs <$> readTVarIO (getRcvQueues $ pendingSubs c)
|
||||
getServerSubs = SS.foldSessionSubs addSub M.empty $ currentSubs c
|
||||
where
|
||||
addSub f (userId, srv, _) _ = M.alter (Just . f . fromMaybe SMPServerSubs {ssActive = 0, ssPending = 0}) (userId, srv)
|
||||
incActive ss = ss {ssActive = ssActive ss + 1}
|
||||
incPending ss = ss {ssPending = ssPending ss + 1}
|
||||
addSub subs ((userId, srv, _), s) = do
|
||||
(active, pending) <- SS.mapSubs M.size s
|
||||
let add ss = ss {ssActive = ssActive ss + active, ssPending = ssPending ss + pending}
|
||||
pure $ M.alter (Just . add . fromMaybe (SMPServerSubs 0 0)) (userId, srv) subs
|
||||
Env {xftpAgent = XFTPAgent {xftpRcvWorkers, xftpSndWorkers, xftpDelWorkers}} = agentEnv
|
||||
getXFTPWorkerSrvs workers = foldM addSrv [] . M.toList =<< readTVarIO workers
|
||||
where
|
||||
@@ -2404,13 +2450,14 @@ data SubscriptionsInfo = SubscriptionsInfo
|
||||
|
||||
getAgentSubscriptions :: AgentClient -> IO SubscriptionsInfo
|
||||
getAgentSubscriptions c = do
|
||||
activeSubscriptions <- getSubs activeSubs
|
||||
pendingSubscriptions <- getSubs pendingSubs
|
||||
(activeSubscriptions, pendingSubscriptions) <- SS.foldSessionSubs addSubs ([], []) $ currentSubs c
|
||||
removedSubscriptions <- getRemovedSubs
|
||||
pure $ SubscriptionsInfo {activeSubscriptions, pendingSubscriptions, removedSubscriptions}
|
||||
where
|
||||
getSubs :: (AgentClient -> TRcvQueues q) -> IO [SubInfo]
|
||||
getSubs sel = map (`subInfo` Nothing) . M.keys <$> readTVarIO (getRcvQueues $ sel c)
|
||||
addSubs :: ([SubInfo], [SubInfo]) -> (SMPTransportSession, SS.SessSubs) -> IO ([SubInfo], [SubInfo])
|
||||
addSubs (active, pending) ((userId, srv, _), s) = do
|
||||
(active', pending') <- SS.mapSubs (map (\rId -> subInfo (userId, srv, rId) Nothing) . M.keys) s
|
||||
pure (active' ++ active, pending' ++ pending)
|
||||
getRemovedSubs = map (uncurry subInfo . second Just) . M.assocs <$> readTVarIO (removedSubs c)
|
||||
subInfo :: (UserId, SMPServer, SMP.RecipientId) -> Maybe SMPClientError -> SubInfo
|
||||
subInfo (uId, srv, rId) err = SubInfo {userId = uId, server = enc srv, rcvId = enc rId, subError = show <$> err}
|
||||
|
||||
@@ -1,104 +0,0 @@
|
||||
{-# LANGUAGE FlexibleInstances #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
|
||||
module Simplex.Messaging.Agent.TRcvQueues
|
||||
( TRcvQueues (getRcvQueues),
|
||||
empty,
|
||||
clear,
|
||||
hasQueue,
|
||||
addQueue,
|
||||
addSessQueue,
|
||||
batchAddQueues,
|
||||
deleteQueue,
|
||||
batchDeleteQueues,
|
||||
hasSessQueues,
|
||||
getSessQueues,
|
||||
getSessConns,
|
||||
getDelSessQueues,
|
||||
qKey,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Data.Foldable (foldl')
|
||||
import qualified Data.Map.Strict as M
|
||||
import qualified Data.Set as S
|
||||
import Simplex.Messaging.Agent.Protocol (ConnId, SMPQueue (..), UserId)
|
||||
import Simplex.Messaging.Agent.Store (RcvQueueSub (..), SMPQueueRec (..), SomeRcvQueue)
|
||||
import Simplex.Messaging.Protocol (QueueId, RecipientId, SMPServer)
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Transport
|
||||
|
||||
-- the fields in this record have the same data with swapped keys for lookup efficiency,
|
||||
-- and all methods must maintain this invariant.
|
||||
data TRcvQueues q = TRcvQueues
|
||||
{ getRcvQueues :: TMap (UserId, SMPServer, RecipientId) q
|
||||
}
|
||||
|
||||
empty :: IO (TRcvQueues q)
|
||||
empty = TRcvQueues <$> TM.emptyIO
|
||||
|
||||
clear :: TRcvQueues q -> STM ()
|
||||
clear (TRcvQueues qs) = TM.clear qs
|
||||
|
||||
hasQueue :: SomeRcvQueue q => q -> TRcvQueues q' -> STM Bool
|
||||
hasQueue rq (TRcvQueues qs) = TM.member (qKey rq) qs
|
||||
|
||||
addQueue :: RcvQueueSub -> TRcvQueues RcvQueueSub -> STM ()
|
||||
addQueue rq = addQueue_ rq rq
|
||||
{-# INLINE addQueue #-}
|
||||
|
||||
addSessQueue :: (SessionId, RcvQueueSub) -> TRcvQueues (SessionId, RcvQueueSub) -> STM ()
|
||||
addSessQueue q@(_, rq) = addQueue_ rq q
|
||||
{-# INLINE addSessQueue #-}
|
||||
|
||||
addQueue_ :: RcvQueueSub -> q -> TRcvQueues q -> STM ()
|
||||
addQueue_ rq q (TRcvQueues qs) = TM.insert (qKey rq) q qs
|
||||
{-# INLINE addQueue_ #-}
|
||||
|
||||
-- Save time by aggregating modifyTVar'
|
||||
batchAddQueues :: [RcvQueueSub] -> TRcvQueues RcvQueueSub -> STM ()
|
||||
batchAddQueues rqs (TRcvQueues qs) =
|
||||
modifyTVar' qs $ \m -> foldl' (\rqs' rq -> M.insert (qKey rq) rq rqs') m rqs
|
||||
|
||||
deleteQueue :: SomeRcvQueue q => q -> TRcvQueues q' -> STM ()
|
||||
deleteQueue rq (TRcvQueues qs) = TM.delete (qKey rq) qs
|
||||
{-# INLINE deleteQueue #-}
|
||||
|
||||
batchDeleteQueues :: SomeRcvQueue q => [q] -> TRcvQueues q' -> STM ()
|
||||
batchDeleteQueues rqs (TRcvQueues qs) =
|
||||
modifyTVar' qs $ \m -> foldl' (\rqs' rq -> M.delete (qKey rq) rqs') m rqs
|
||||
|
||||
hasSessQueues :: (UserId, SMPServer, Maybe ConnId) -> TRcvQueues RcvQueueSub -> STM Bool
|
||||
hasSessQueues tSess (TRcvQueues qs) = any (`isSession` tSess) <$> readTVar qs
|
||||
|
||||
getSessQueues :: (UserId, SMPServer, Maybe ConnId) -> TRcvQueues RcvQueueSub -> IO [RcvQueueSub]
|
||||
getSessQueues tSess (TRcvQueues qs) = M.foldl' addQ [] <$> readTVarIO qs
|
||||
where
|
||||
addQ qs' rq = if rq `isSession` tSess then rq : qs' else qs'
|
||||
|
||||
getSessConns :: (UserId, SMPServer, Maybe ConnId) -> TRcvQueues (SessionId, RcvQueueSub) -> IO (S.Set ConnId)
|
||||
getSessConns tSess (TRcvQueues qs) = M.foldl' addConn S.empty <$> readTVarIO qs
|
||||
where
|
||||
addConn cIds (_, rq) = if rq `isSession` tSess then S.insert (connId rq) cIds else cIds
|
||||
|
||||
getDelSessQueues :: (UserId, SMPServer, Maybe ConnId) -> SessionId -> TRcvQueues (SessionId, RcvQueueSub) -> STM ([RcvQueueSub], [ConnId])
|
||||
getDelSessQueues tSess sessId' (TRcvQueues qs) = do
|
||||
(removedQs, removedConns, qs'') <- (\qs' -> M.foldl' delQ ([], S.empty, qs') qs') <$> readTVar qs
|
||||
writeTVar qs $! qs''
|
||||
let removedConns' = S.toList $ removedConns `S.difference` queueConns qs''
|
||||
pure (removedQs, removedConns')
|
||||
where
|
||||
delQ acc@(removed, cIds, qs') (sessId, rq)
|
||||
| rq `isSession` tSess && sessId == sessId' = (rq : removed, S.insert (connId rq) cIds, M.delete (qKey rq) qs')
|
||||
| otherwise = acc
|
||||
queueConns = M.foldl' (\cIds (_, rq) -> S.insert (connId rq) cIds) S.empty
|
||||
|
||||
isSession :: RcvQueueSub -> (UserId, SMPServer, Maybe ConnId) -> Bool
|
||||
isSession rq (uId, srv, connId_) =
|
||||
userId rq == uId && server rq == srv && maybe True (connId rq ==) connId_
|
||||
|
||||
qKey :: SomeRcvQueue q => q -> (UserId, SMPServer, QueueId)
|
||||
qKey rq = (qUserId rq, qServer rq, queueId rq)
|
||||
{-# INLINE qKey #-}
|
||||
174
src/Simplex/Messaging/Agent/TSessionSubs.hs
Normal file
174
src/Simplex/Messaging/Agent/TSessionSubs.hs
Normal file
@@ -0,0 +1,174 @@
|
||||
{-# LANGUAGE FlexibleInstances #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Agent.TSessionSubs
|
||||
( TSessionSubs (sessionSubs),
|
||||
SessSubs (..),
|
||||
emptyIO,
|
||||
clear,
|
||||
hasActiveSub,
|
||||
hasPendingSub,
|
||||
addPendingSub,
|
||||
setSessionId,
|
||||
addActiveSub,
|
||||
batchAddPendingSubs,
|
||||
deletePendingSub,
|
||||
deleteSub,
|
||||
batchDeleteSubs,
|
||||
hasPendingSubs,
|
||||
getPendingSubs,
|
||||
getActiveSubs,
|
||||
setSubsPending,
|
||||
foldSessionSubs,
|
||||
mapSubs,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (isJust)
|
||||
import qualified Data.Set as S
|
||||
import Simplex.Messaging.Agent.Protocol (SMPQueue (..))
|
||||
import Simplex.Messaging.Agent.Store (RcvQueueSub (..), SomeRcvQueue)
|
||||
import Simplex.Messaging.Client (SMPTransportSession, TransportSessionMode (..))
|
||||
import Simplex.Messaging.Protocol (RecipientId)
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Util (($>>=))
|
||||
|
||||
data TSessionSubs = TSessionSubs
|
||||
{ sessionSubs :: TMap SMPTransportSession SessSubs
|
||||
}
|
||||
|
||||
data SessSubs = SessSubs
|
||||
{ subsSessId :: TVar (Maybe SessionId),
|
||||
activeSubs :: TMap RecipientId RcvQueueSub,
|
||||
pendingSubs :: TMap RecipientId RcvQueueSub
|
||||
}
|
||||
|
||||
emptyIO :: IO TSessionSubs
|
||||
emptyIO = TSessionSubs <$> TM.emptyIO
|
||||
{-# INLINE emptyIO #-}
|
||||
|
||||
clear :: TSessionSubs -> STM ()
|
||||
clear = TM.clear . sessionSubs
|
||||
{-# INLINE clear #-}
|
||||
|
||||
lookupSubs :: SMPTransportSession -> TSessionSubs -> STM (Maybe SessSubs)
|
||||
lookupSubs tSess = TM.lookup tSess . sessionSubs
|
||||
{-# INLINE lookupSubs #-}
|
||||
|
||||
getSessSubs :: SMPTransportSession -> TSessionSubs -> STM SessSubs
|
||||
getSessSubs tSess ss = lookupSubs tSess ss >>= maybe new pure
|
||||
where
|
||||
new = do
|
||||
s <- SessSubs <$> newTVar Nothing <*> newTVar M.empty <*> newTVar M.empty
|
||||
TM.insert tSess s $ sessionSubs ss
|
||||
pure s
|
||||
|
||||
hasActiveSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool
|
||||
hasActiveSub = hasQueue_ activeSubs
|
||||
{-# INLINE hasActiveSub #-}
|
||||
|
||||
hasPendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool
|
||||
hasPendingSub = hasQueue_ pendingSubs
|
||||
{-# INLINE hasPendingSub #-}
|
||||
|
||||
hasQueue_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool
|
||||
hasQueue_ subs rId tSess ss = isJust <$> (lookupSubs tSess ss $>>= TM.lookup rId . subs)
|
||||
{-# INLINE hasQueue_ #-}
|
||||
|
||||
addPendingSub :: RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM ()
|
||||
addPendingSub rq tSess ss = getSessSubs tSess ss >>= TM.insert (rcvId rq) rq . pendingSubs
|
||||
|
||||
setSessionId :: SessionId -> SMPTransportSession -> TSessionSubs -> STM ()
|
||||
setSessionId sessId tSess ss = do
|
||||
s <- getSessSubs tSess ss
|
||||
readTVar (subsSessId s) >>= \case
|
||||
Nothing -> writeTVar (subsSessId s) (Just sessId)
|
||||
Just sessId' -> unless (sessId == sessId') $ void $ setSubsPending_ s $ Just sessId
|
||||
|
||||
addActiveSub :: SessionId -> RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM ()
|
||||
addActiveSub sessId rq tSess ss = do
|
||||
s <- getSessSubs tSess ss
|
||||
sessId' <- readTVar $ subsSessId s
|
||||
let rId = rcvId rq
|
||||
if Just sessId == sessId'
|
||||
then do
|
||||
TM.insert rId rq $ activeSubs s
|
||||
TM.delete rId $ pendingSubs s
|
||||
else TM.insert rId rq $ pendingSubs s
|
||||
|
||||
batchAddPendingSubs :: [RcvQueueSub] -> SMPTransportSession -> TSessionSubs -> STM ()
|
||||
batchAddPendingSubs rqs tSess ss = do
|
||||
s <- getSessSubs tSess ss
|
||||
modifyTVar' (pendingSubs s) $ M.union $ M.fromList $ map (\rq -> (rcvId rq, rq)) rqs
|
||||
|
||||
deletePendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM ()
|
||||
deletePendingSub rId tSess = lookupSubs tSess >=> mapM_ (TM.delete rId . pendingSubs)
|
||||
|
||||
deleteSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM ()
|
||||
deleteSub rId tSess = lookupSubs tSess >=> mapM_ (\s -> TM.delete rId (activeSubs s) >> TM.delete rId (pendingSubs s))
|
||||
|
||||
batchDeleteSubs :: SomeRcvQueue q => [q] -> SMPTransportSession -> TSessionSubs -> STM ()
|
||||
batchDeleteSubs rqs tSess = lookupSubs tSess >=> mapM_ (\s -> delete (activeSubs s) >> delete (pendingSubs s))
|
||||
where
|
||||
rIds = S.fromList $ map queueId rqs
|
||||
delete = (`modifyTVar'` (`M.withoutKeys` rIds))
|
||||
|
||||
hasPendingSubs :: SMPTransportSession -> TSessionSubs -> STM Bool
|
||||
hasPendingSubs tSess = lookupSubs tSess >=> maybe (pure False) (fmap (not . null) . readTVar . pendingSubs)
|
||||
|
||||
getPendingSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
|
||||
getPendingSubs = getSubs_ pendingSubs
|
||||
{-# INLINE getPendingSubs #-}
|
||||
|
||||
getActiveSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
|
||||
getActiveSubs = getSubs_ activeSubs
|
||||
{-# INLINE getActiveSubs #-}
|
||||
|
||||
getSubs_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
|
||||
getSubs_ subs tSess = lookupSubs tSess >=> maybe (pure M.empty) (readTVar . subs)
|
||||
|
||||
setSubsPending :: TransportSessionMode -> SMPTransportSession -> SessionId -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
|
||||
setSubsPending mode tSess@(uId, srv, connId_) sessId tss@(TSessionSubs ss)
|
||||
| entitySession == isJust connId_ =
|
||||
TM.lookup tSess ss >>= withSessSubs (`setSubsPending_` Nothing)
|
||||
| otherwise =
|
||||
TM.lookupDelete tSess ss >>= withSessSubs setPendingChangeMode
|
||||
where
|
||||
entitySession = mode == TSMEntity
|
||||
sessEntId = if entitySession then Just else const Nothing
|
||||
withSessSubs run = \case
|
||||
Nothing -> pure M.empty
|
||||
Just s -> do
|
||||
sessId' <- readTVar $ subsSessId s
|
||||
if Just sessId == sessId' then run s else pure M.empty
|
||||
setPendingChangeMode s = do
|
||||
subs <- M.union <$> readTVar (activeSubs s) <*> readTVar (pendingSubs s)
|
||||
unless (null subs) $
|
||||
forM_ subs $ \rq -> addPendingSub rq (uId, srv, sessEntId (connId rq)) tss
|
||||
pure subs
|
||||
|
||||
setSubsPending_ :: SessSubs -> Maybe SessionId -> STM (Map RecipientId RcvQueueSub)
|
||||
setSubsPending_ s sessId_ = do
|
||||
writeTVar (subsSessId s) sessId_
|
||||
let as = activeSubs s
|
||||
subs <- readTVar as
|
||||
unless (null subs) $ do
|
||||
writeTVar as M.empty
|
||||
modifyTVar' (pendingSubs s) $ M.union subs
|
||||
pure subs
|
||||
|
||||
foldSessionSubs :: (a -> (SMPTransportSession, SessSubs) -> IO a) -> a -> TSessionSubs -> IO a
|
||||
foldSessionSubs f a = foldM f a . M.assocs <=< readTVarIO . sessionSubs
|
||||
|
||||
mapSubs :: (Map RecipientId RcvQueueSub -> a) -> SessSubs -> IO (a, a)
|
||||
mapSubs f s = do
|
||||
active <- readTVarIO $ activeSubs s
|
||||
pending <- readTVarIO $ pendingSubs s
|
||||
pure (f active, f pending)
|
||||
@@ -29,6 +29,7 @@
|
||||
module Simplex.Messaging.Client
|
||||
( -- * Connect (disconnect) client to (from) SMP server
|
||||
TransportSession,
|
||||
SMPTransportSession,
|
||||
ProtocolClient (thParams, sessionTs),
|
||||
SMPClient,
|
||||
ProxiedRelay (..),
|
||||
@@ -549,6 +550,8 @@ type UserId = Int64
|
||||
-- Please note that for SMP connection ID is used as entity ID, not queue ID.
|
||||
type TransportSession msg = (UserId, ProtoServer msg, Maybe ByteString)
|
||||
|
||||
type SMPTransportSession = TransportSession BrokerMsg
|
||||
|
||||
-- | Connects to 'ProtocolServer' using passed client configuration
|
||||
-- and queue for messages and notifications.
|
||||
--
|
||||
|
||||
Reference in New Issue
Block a user