choose different server for reply queue and during rotation

This commit is contained in:
Evgeny Poberezkin
2022-08-27 14:52:57 +01:00
parent 1ddf56f0e1
commit bfd9dafe1e
+22 -13
View File
@@ -304,7 +304,7 @@ processCommand c (connId, cmd) = case cmd of
newConn :: AgentMonad m => AgentClient -> ConnId -> Bool -> SConnectionMode c -> m (ConnId, ConnectionRequestUri c)
newConn c connId enableNtfs cMode = do
srv <- getSMPServer c
srv <- getAnySMPServer c
clientVRange <- asks $ smpClientVRange . config
(rq, qUri) <- newRcvQueue c srv clientVRange True
g <- asks idsDrg
@@ -367,8 +367,8 @@ joinConn c connId enableNtfs (CRContactUri (ConnReqUriData _ agentVRange (qUri :
_ -> throwError $ AGENT A_VERSION
createReplyQueue :: AgentMonad m => AgentClient -> ConnData -> SndQueue -> m SMPQueueInfo
createReplyQueue c ConnData {connId, enableNtfs} SndQueue {smpClientVersion} = do
srv <- getSMPServer c
createReplyQueue c ConnData {connId, enableNtfs} SndQueue {server, smpClientVersion} = do
srv <- getSMPServer c server
(rq, qUri) <- newRcvQueue c srv (versionToRange smpClientVersion) True
let qInfo = toVersionT qUri smpClientVersion
addSubscription c rq connId
@@ -467,7 +467,7 @@ createNextRcvQueue c cData rq@RcvQueue {server, sndId} sq = do
let queueAddress = SMPQueueAddress {smpServer, senderId, dhPublicKey = C.publicKey e2ePrivKey}
pure SMPQueueUri {clientVRange, queueAddress}
_ -> do
srv <- getSMPServer c
srv <- getSMPServer c server
(rq', qUri) <- newRcvQueue c srv clientVRange False
withStore' c $ \db -> dbCreateNextRcvQueue db rq rq'
pure qUri
@@ -1085,15 +1085,24 @@ suspendAgent' c@AgentClient {agentState = as} maxDelay = do
-- unsafeIOToSTM $ putStrLn $ "in timeout: suspendSendingAndDatabase"
suspendSendingAndDatabase c
getSMPServer :: AgentMonad m => AgentClient -> m SMPServer
getSMPServer c = do
smpServers <- readTVarIO $ smpServers c
case smpServers of
srv :| [] -> pure srv
servers -> do
gen <- asks randomServer
atomically . stateTVar gen $
first (servers L.!!) . randomR (0, L.length servers - 1)
getAnySMPServer :: AgentMonad m => AgentClient -> m SMPServer
getAnySMPServer c = readTVarIO (smpServers c) >>= pickServer
pickServer :: AgentMonad m => NonEmpty SMPServer -> m SMPServer
pickServer = \case
srv :| [] -> pure srv
servers -> do
gen <- asks randomServer
atomically $ (servers L.!!) <$> stateTVar gen (randomR (0, L.length servers - 1))
getSMPServer :: AgentMonad m => AgentClient -> SMPServer -> m SMPServer
getSMPServer c (SMPServer host port _) = do
srvs <- readTVarIO $ smpServers c
case L.nonEmpty $ L.filter different srvs of
Just srvs' -> pickServer srvs'
_ -> pure $ L.head srvs
where
different (SMPServer host' port' _) = host /= host' || port /= port'
subscriber :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
subscriber c@AgentClient {msgQ} = forever $ do