mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 20:44:49 +00:00
agent: send UP from initial subscriptions
This commit is contained in:
@@ -170,7 +170,7 @@ import Data.List.NonEmpty (NonEmpty (..), (<|))
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (isJust, isNothing, listToMaybe)
|
||||
import Data.Maybe (catMaybes, isJust, isNothing, listToMaybe)
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import Data.Text (Text)
|
||||
@@ -678,15 +678,12 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess
|
||||
serverDown :: ([RcvQueue], [ConnId]) -> IO ()
|
||||
serverDown (qs, conns) = whenM (readTVarIO active) $ do
|
||||
incClientStat' c userId client "DISCONNECT" ""
|
||||
notifySub "" $ hostEvent' DISCONNECT client
|
||||
unless (null conns) $ notifySub "" $ DOWN srv conns
|
||||
notifySub c "" $ hostEvent' DISCONNECT client
|
||||
unless (null conns) $ notifySub c "" $ DOWN srv conns
|
||||
unless (null qs) $ do
|
||||
atomically $ mapM_ (releaseGetLock c) qs
|
||||
runReaderT (resubscribeSMPSession c tSess) env
|
||||
|
||||
notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO ()
|
||||
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd)
|
||||
|
||||
resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' ()
|
||||
resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess =
|
||||
atomically getWorkerVar >>= mapM_ (either newSubWorker (\_ -> pure ()))
|
||||
@@ -721,21 +718,22 @@ reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do
|
||||
(rs, sessId_) <- subscribeQueues c $ L.toList qs
|
||||
let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs
|
||||
conns = filter (`M.notMember` cs) okConns
|
||||
unless (null conns) $ notifySub "" $ UP srv conns
|
||||
unless (null conns) $ notifySub c "" $ UP srv conns
|
||||
let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs
|
||||
mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs
|
||||
mapM_ (\(connId, e) -> notifySub c connId $ ERR e) finalErrs
|
||||
forM_ (listToMaybe tempErrs) $ \(connId, e) -> do
|
||||
when (null okConns && M.null cs && null finalErrs) . liftIO $
|
||||
forM_ sessId_ $ \sessId -> do
|
||||
-- We only close the client session that was used to subscribe.
|
||||
v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing)
|
||||
mapM_ (closeClient_ c) v_
|
||||
notifySub connId $ ERR e
|
||||
notifySub c connId $ ERR e
|
||||
where
|
||||
handleNotify :: AM' () -> AM' ()
|
||||
handleNotify = E.handleAny $ notifySub "" . ERR . INTERNAL . show
|
||||
notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> AM' ()
|
||||
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd)
|
||||
handleNotify = E.handleAny $ notifySub c "" . ERR . INTERNAL . show
|
||||
|
||||
notifySub :: forall e m. (AEntityI e, MonadIO m) => AgentClient -> ConnId -> ACommand 'Agent e -> m ()
|
||||
notifySub c connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd)
|
||||
|
||||
getNtfServerClient :: AgentClient -> NtfTransportSession -> AM NtfClient
|
||||
getNtfServerClient c@AgentClient {active, ntfClients, workerSeq} tSess@(userId, srv, _) = do
|
||||
@@ -1291,14 +1289,10 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange subMode = do
|
||||
qUri = SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey
|
||||
pure (rq, qUri, tSess, sessId)
|
||||
|
||||
processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM ()
|
||||
processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM (Maybe ConnId)
|
||||
processSubResult c rq@RcvQueue {connId} = \case
|
||||
Left e ->
|
||||
unless (temporaryClientError e) $
|
||||
failSubscription c rq e
|
||||
Right () ->
|
||||
whenM (hasPendingSubscription c connId) $
|
||||
addSubscription c rq
|
||||
Left e -> Nothing <$ unless (temporaryClientError e) (failSubscription c rq e)
|
||||
Right () -> ifM (hasPendingSubscription c connId) (Just connId <$ addSubscription c rq) (pure Nothing)
|
||||
|
||||
temporaryAgentError :: AgentErrorType -> Bool
|
||||
temporaryAgentError = \case
|
||||
@@ -1349,23 +1343,26 @@ subscribeQueues c qs = do
|
||||
subscribeQueues_ :: Env -> TVar (Maybe SessionId) -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses SMPClientError ())
|
||||
subscribeQueues_ env session smp qs' = do
|
||||
rs <- sendBatch subscribeSMPQueues smp qs'
|
||||
active <-
|
||||
(active, subResults) <-
|
||||
atomically $
|
||||
ifM
|
||||
(activeClientSession c tSess sessId)
|
||||
(writeTVar session (Just sessId) >> processSubResults rs $> True)
|
||||
(pure False)
|
||||
(writeTVar session (Just sessId) >> ((True,) <$> processSubResults rs))
|
||||
(pure (False, []))
|
||||
if active
|
||||
then when (hasTempErrors rs) resubscribe $> rs
|
||||
then do
|
||||
when (any isNothing subResults) resubscribe
|
||||
let up = catMaybes $ L.toList subResults
|
||||
unless (null up) $ notifySub c "" $ UP srv up
|
||||
pure rs
|
||||
else do
|
||||
logWarn "subcription batch result for replaced SMP client, resubscribing"
|
||||
resubscribe $> L.map (second $ \_ -> Left PCENetworkError) rs
|
||||
where
|
||||
tSess = transportSession' smp
|
||||
tSess@(_, srv, _) = transportSession' smp
|
||||
sessId = sessionId $ thParams smp
|
||||
hasTempErrors = any (either temporaryClientError (const False) . snd)
|
||||
processSubResults :: NonEmpty (RcvQueue, Either SMPClientError ()) -> STM ()
|
||||
processSubResults = mapM_ $ uncurry $ processSubResult c
|
||||
processSubResults :: NonEmpty (RcvQueue, Either SMPClientError ()) -> STM (NonEmpty (Maybe ConnId))
|
||||
processSubResults = mapM (uncurry $ processSubResult c)
|
||||
resubscribe = resubscribeSMPSession c tSess `runReaderT` env
|
||||
|
||||
activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool
|
||||
|
||||
Reference in New Issue
Block a user