agent: getAgentQueuesInfo (#1180)

This commit is contained in:
spaced4ndy
2024-05-30 14:21:29 +04:00
committed by GitHub
parent 39b3b5a25e
commit 97a953550f
4 changed files with 106 additions and 32 deletions
+66 -21
View File
@@ -137,6 +137,8 @@ module Simplex.Messaging.Agent.Client
getAgentWorkersDetails,
AgentWorkersSummary (..),
getAgentWorkersSummary,
AgentQueuesInfo (..),
getAgentQueuesInfo,
SMPTransportSession,
NtfTransportSession,
XFTPTransportSession,
@@ -204,7 +206,7 @@ import Simplex.Messaging.Notifications.Client
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Transport (NTFVersion)
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Parsers (defaultJSON, dropPrefix, enumJSON, parse)
import Simplex.Messaging.Parsers (defaultJSON, dropPrefix, enumJSON, parse, sumTypeJSON)
import Simplex.Messaging.Protocol
( AProtocolType (..),
BrokerMsg,
@@ -582,7 +584,8 @@ instance ProtocolServerClient XFTPVersion XFTPErrorType FileResponse where
getSMPServerClient :: AgentClient -> SMPTransportSession -> AM SMPConnectedClient
getSMPServerClient c@AgentClient {active, smpClients, workerSeq} tSess = do
unlessM (readTVarIO active) . throwError $ INACTIVE
atomically (getSessVar workerSeq tSess smpClients)
ts <- liftIO getCurrentTime
atomically (getSessVar workerSeq tSess smpClients ts)
>>= either newClient (waitForProtocolClient c tSess smpClients)
where
newClient v = do
@@ -593,28 +596,30 @@ getSMPProxyClient :: AgentClient -> SMPTransportSession -> AM (SMPConnectedClien
getSMPProxyClient c@AgentClient {active, smpClients, smpProxiedRelays, workerSeq} destSess@(userId, destSrv, qId) = do
unlessM (readTVarIO active) . throwError $ INACTIVE
proxySrv <- getNextServer c userId [destSrv]
atomically (getClientVar proxySrv) >>= \(tSess, auth, v) ->
either (newProxyClient tSess auth) (waitForProxyClient tSess auth) v
ts <- liftIO getCurrentTime
atomically (getClientVar proxySrv ts) >>= \(tSess, auth, v) ->
either (newProxyClient tSess auth ts) (waitForProxyClient tSess auth) v
where
getClientVar :: SMPServerWithAuth -> STM (SMPTransportSession, Maybe SMP.BasicAuth, Either SMPClientVar SMPClientVar)
getClientVar proxySrv = do
getClientVar :: SMPServerWithAuth -> UTCTime -> STM (SMPTransportSession, Maybe SMP.BasicAuth, Either SMPClientVar SMPClientVar)
getClientVar proxySrv ts = do
ProtoServerWithAuth srv auth <- TM.lookup destSess smpProxiedRelays >>= maybe (TM.insert destSess proxySrv smpProxiedRelays $> proxySrv) pure
let tSess = (userId, srv, qId)
(tSess,auth,) <$> getSessVar workerSeq tSess smpClients
newProxyClient :: SMPTransportSession -> Maybe SMP.BasicAuth -> SMPClientVar -> AM (SMPConnectedClient, Either AgentErrorType ProxiedRelay)
newProxyClient tSess auth v = do
(tSess,auth,) <$> getSessVar workerSeq tSess smpClients ts
newProxyClient :: SMPTransportSession -> Maybe SMP.BasicAuth -> UTCTime -> SMPClientVar -> AM (SMPConnectedClient, Either AgentErrorType ProxiedRelay)
newProxyClient tSess auth ts v = do
(prs, rv) <- atomically $ do
prs <- TM.empty
-- we do not need to check if it is a new proxied relay session,
-- as the client is just created and there are no sessions yet
(prs,) . either id id <$> getSessVar workerSeq destSrv prs
(prs,) . either id id <$> getSessVar workerSeq destSrv prs ts
clnt <- smpConnectClient c tSess prs v
(clnt,) <$> newProxiedRelay clnt auth rv
waitForProxyClient :: SMPTransportSession -> Maybe SMP.BasicAuth -> SMPClientVar -> AM (SMPConnectedClient, Either AgentErrorType ProxiedRelay)
waitForProxyClient tSess auth v = do
clnt@(SMPConnectedClient _ prs) <- waitForProtocolClient c tSess smpClients v
ts <- liftIO getCurrentTime
sess <-
atomically (getSessVar workerSeq destSrv prs)
atomically (getSessVar workerSeq destSrv prs ts)
>>= either (newProxiedRelay clnt auth) (waitForProxiedRelay tSess)
pure (clnt, sess)
newProxiedRelay :: SMPConnectedClient -> Maybe SMP.BasicAuth -> ProxiedRelayVar -> AM (Either AgentErrorType ProxiedRelay)
@@ -688,14 +693,15 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd)
resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' ()
resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess =
atomically getWorkerVar >>= mapM_ (either newSubWorker (\_ -> pure ()))
resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
ts <- liftIO getCurrentTime
atomically (getWorkerVar ts) >>= mapM_ (either newSubWorker (\_ -> pure ()))
where
getWorkerVar =
getWorkerVar ts =
ifM
(null <$> getPending)
(pure Nothing) -- prevent race with cleanup and adding pending queues in another call
(Just <$> getSessVar workerSeq tSess smpSubWorkers)
(Just <$> getSessVar workerSeq tSess smpSubWorkers ts)
newSubWorker v = do
a <- async $ void (E.tryAny runSubWorker) >> atomically (cleanup v)
atomically $ putTMVar (sessionVar v) a
@@ -740,7 +746,8 @@ reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do
getNtfServerClient :: AgentClient -> NtfTransportSession -> AM NtfClient
getNtfServerClient c@AgentClient {active, ntfClients, workerSeq} tSess@(userId, srv, _) = do
unlessM (readTVarIO active) . throwError $ INACTIVE
atomically (getSessVar workerSeq tSess ntfClients)
ts <- liftIO getCurrentTime
atomically (getSessVar workerSeq tSess ntfClients ts)
>>= either
(newProtocolClient c tSess ntfClients connectClient)
(waitForProtocolClient c tSess ntfClients)
@@ -763,7 +770,8 @@ getNtfServerClient c@AgentClient {active, ntfClients, workerSeq} tSess@(userId,
getXFTPServerClient :: AgentClient -> XFTPTransportSession -> AM XFTPClient
getXFTPServerClient c@AgentClient {active, xftpClients, workerSeq} tSess@(userId, srv, _) = do
unlessM (readTVarIO active) . throwError $ INACTIVE
atomically (getSessVar workerSeq tSess xftpClients)
ts <- liftIO getCurrentTime
atomically (getSessVar workerSeq tSess xftpClients ts)
>>= either
(newProtocolClient c tSess xftpClients connectClient)
(waitForProtocolClient c tSess xftpClients)
@@ -1060,10 +1068,11 @@ sendOrProxySMPMessage c userId destSrv cmdStr spKey_ senderId msgFlags msg = do
$>>= \(ProtoServerWithAuth srv _) -> tryReadSessVar (userId, srv, qId) (smpClients c)
)
>>= \case
Just (Right (SMPConnectedClient smp' prs)) | sameClient smp' ->
tryReadSessVar destSrv prs >>= \case
Just (Right proxySess') | sameProxiedRelay proxySess' -> TM.delete destSrv prs
_ -> pure ()
Just (Right (SMPConnectedClient smp' prs))
| sameClient smp' ->
tryReadSessVar destSrv prs >>= \case
Just (Right proxySess') | sameProxiedRelay proxySess' -> TM.delete destSrv prs
_ -> pure ()
_ -> pure ()
sameClient smp' = sessionId (thParams smp) == sessionId (thParams smp')
sameProxiedRelay proxySess' = prSessionId proxySess == prSessionId proxySess'
@@ -2041,6 +2050,38 @@ getAgentWorkersSummary AgentClient {smpClients, ntfClients, xftpClients, smpDeli
(pure WorkersSummary {numActive, numIdle = numIdle + 1, totalRestarts = totalRestarts + restartCount})
(pure WorkersSummary {numActive = numActive + 1, numIdle, totalRestarts = totalRestarts + restartCount})
data AgentQueuesInfo = AgentQueuesInfo
{ msgQInfo :: TBQueueInfo,
subQInfo :: TBQueueInfo,
smpClientsQueues :: Map Text (Int, UTCTime, ClientInfo)
}
deriving (Show)
data ClientInfo
= ClientInfoQueues {sndQInfo :: TBQueueInfo, rcvQInfo :: TBQueueInfo}
| ClientInfoError {clientError :: (AgentErrorType, Maybe UTCTime)}
| ClientInfoConnecting
deriving (Show)
getAgentQueuesInfo :: AgentClient -> IO AgentQueuesInfo
getAgentQueuesInfo AgentClient {msgQ, subQ, smpClients} = do
msgQInfo <- atomically $ getTBQueueInfo msgQ
subQInfo <- atomically $ getTBQueueInfo subQ
smpClientsMap <- readTVarIO smpClients
let smpClientsMap' = M.mapKeys (decodeLatin1 . strEncode) smpClientsMap
smpClientsQueues <- mapM getClientQueuesInfo smpClientsMap'
pure AgentQueuesInfo {msgQInfo, subQInfo, smpClientsQueues}
where
getClientQueuesInfo :: SMPClientVar -> IO (Int, UTCTime, ClientInfo)
getClientQueuesInfo SessionVar {sessionVar, sessionVarId, sessionVarTs} = do
clientInfo <- atomically (tryReadTMVar sessionVar) >>= \case
Just (Right c) -> do
(sndQInfo, rcvQInfo) <- getProtocolClientQueuesInfo $ protocolClient c
pure ClientInfoQueues {sndQInfo, rcvQInfo}
Just (Left e) -> pure $ ClientInfoError e
Nothing -> pure ClientInfoConnecting
pure (sessionVarId, sessionVarTs, clientInfo)
$(J.deriveJSON defaultJSON ''AgentLocks)
$(J.deriveJSON (enumJSON $ dropPrefix "TS") ''ProtocolTestStep)
@@ -2059,6 +2100,10 @@ $(J.deriveJSON defaultJSON {J.fieldLabelModifier = takeWhile (/= '_')} ''AgentWo
$(J.deriveJSON defaultJSON ''AgentWorkersSummary)
$(J.deriveJSON (sumTypeJSON $ dropPrefix "ClientInfo") ''ClientInfo)
$(J.deriveJSON defaultJSON ''AgentQueuesInfo)
$(J.deriveJSON (enumJSON $ dropPrefix "UN") ''UserNetworkType)
$(J.deriveJSON defaultJSON ''UserNetworkInfo)
+25
View File
@@ -90,6 +90,11 @@ module Simplex.Messaging.Client
mkTransmission,
authTransmission,
smpClientStub,
-- * For debugging
TBQueueInfo (..),
getTBQueueInfo,
getProtocolClientQueuesInfo,
)
where
@@ -1054,6 +1059,24 @@ authTransmission thAuth pKey_ nonce t = traverse authenticate pKey_
sign :: forall a. (C.AlgorithmI a, C.SignatureAlgorithm a) => C.PrivateKey a -> Either TransportError TransmissionAuth
sign pk = Right $ TASignature $ C.ASignature (C.sAlgorithm @a) (C.sign' pk t)
data TBQueueInfo = TBQueueInfo
{ qLength :: Int,
qFull :: Bool
}
deriving (Show)
getTBQueueInfo :: TBQueue a -> STM TBQueueInfo
getTBQueueInfo q = do
qLength <- fromIntegral <$> lengthTBQueue q
qFull <- isFullTBQueue q
pure TBQueueInfo {qLength, qFull}
getProtocolClientQueuesInfo :: ProtocolClient v err msg -> IO (TBQueueInfo, TBQueueInfo)
getProtocolClientQueuesInfo ProtocolClient {client_ = PClient {sndQ, rcvQ}} = do
sndQInfo <- atomically $ getTBQueueInfo sndQ
rcvQInfo <- atomically $ getTBQueueInfo rcvQ
pure (sndQInfo, rcvQInfo)
$(J.deriveJSON (enumJSON $ dropPrefix "HM") ''HostMode)
$(J.deriveJSON (enumJSON $ dropPrefix "SM") ''SocksMode)
@@ -1067,3 +1090,5 @@ $(J.deriveJSON (enumJSON $ dropPrefix "SPF") ''SMPProxyFallback)
$(J.deriveJSON defaultJSON ''NetworkConfig)
$(J.deriveJSON (enumJSON $ dropPrefix "Proxy") ''ProxyClientError)
$(J.deriveJSON defaultJSON ''TBQueueInfo)
+9 -7
View File
@@ -143,10 +143,11 @@ getSMPServerClient' ca srv = snd <$> getSMPServerClient'' ca srv
{-# INLINE getSMPServerClient' #-}
getSMPServerClient'' :: SMPClientAgent -> SMPServer -> ExceptT SMPClientError IO (OwnServer, SMPClient)
getSMPServerClient'' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, workerSeq} srv =
atomically getClientVar >>= either (ExceptT . newSMPClient) waitForSMPClient
getSMPServerClient'' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, workerSeq} srv = do
ts <- liftIO getCurrentTime
atomically (getClientVar ts) >>= either (ExceptT . newSMPClient) waitForSMPClient
where
getClientVar :: STM (Either SMPClientVar SMPClientVar)
getClientVar :: UTCTime -> STM (Either SMPClientVar SMPClientVar)
getClientVar = getSessVar workerSeq srv smpClients
waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO (OwnServer, SMPClient)
@@ -227,14 +228,15 @@ connectClient ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ, random
-- | Spawn reconnect worker if needed
reconnectClient :: SMPClientAgent -> SMPServer -> IO ()
reconnectClient ca@SMPClientAgent {active, agentCfg, smpSubWorkers, workerSeq} srv =
whenM (readTVarIO active) $ atomically getWorkerVar >>= mapM_ (either newSubWorker (\_ -> pure ()))
reconnectClient ca@SMPClientAgent {active, agentCfg, smpSubWorkers, workerSeq} srv = do
ts <- getCurrentTime
whenM (readTVarIO active) $ atomically (getWorkerVar ts) >>= mapM_ (either newSubWorker (\_ -> pure ()))
where
getWorkerVar =
getWorkerVar ts =
ifM
(null <$> getPending)
(pure Nothing) -- prevent race with cleanup and adding pending queues in another call
(Just <$> getSessVar workerSeq srv smpSubWorkers)
(Just <$> getSessVar workerSeq srv smpSubWorkers ts)
newSubWorker :: SessionVar (Async ()) -> IO ()
newSubWorker v = do
a <- async $ void (E.tryAny runSubWorker) >> atomically (cleanup v)
+6 -4
View File
@@ -8,23 +8,25 @@ import Control.Concurrent.STM
import Control.Monad
import Data.Composition ((.:.))
import Data.Functor (($>))
import Data.Time (UTCTime)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (($>>=))
data SessionVar a = SessionVar
{ sessionVar :: TMVar a,
sessionVarId :: Int
sessionVarId :: Int,
sessionVarTs :: UTCTime
}
getSessVar :: forall k a. Ord k => TVar Int -> k -> TMap k (SessionVar a) -> STM (Either (SessionVar a) (SessionVar a))
getSessVar sessSeq sessKey vs = maybe (Left <$> newSessionVar) (pure . Right) =<< TM.lookup sessKey vs
getSessVar :: forall k a. Ord k => TVar Int -> k -> TMap k (SessionVar a) -> UTCTime -> STM (Either (SessionVar a) (SessionVar a))
getSessVar sessSeq sessKey vs sessionVarTs = maybe (Left <$> newSessionVar) (pure . Right) =<< TM.lookup sessKey vs
where
newSessionVar :: STM (SessionVar a)
newSessionVar = do
sessionVar <- newEmptyTMVar
sessionVarId <- stateTVar sessSeq $ \next -> (next, next + 1)
let v = SessionVar {sessionVar, sessionVarId}
let v = SessionVar {sessionVar, sessionVarId, sessionVarTs}
TM.insert sessKey v vs
pure v