diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index f10e15206..f21399e24 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -304,7 +304,7 @@ processCommand c (connId, cmd) = case cmd of newConn :: AgentMonad m => AgentClient -> ConnId -> Bool -> SConnectionMode c -> m (ConnId, ConnectionRequestUri c) newConn c connId enableNtfs cMode = do - srv <- getSMPServer c + srv <- getAnySMPServer c clientVRange <- asks $ smpClientVRange . config (rq, qUri) <- newRcvQueue c srv clientVRange True g <- asks idsDrg @@ -367,8 +367,8 @@ joinConn c connId enableNtfs (CRContactUri (ConnReqUriData _ agentVRange (qUri : _ -> throwError $ AGENT A_VERSION createReplyQueue :: AgentMonad m => AgentClient -> ConnData -> SndQueue -> m SMPQueueInfo -createReplyQueue c ConnData {connId, enableNtfs} SndQueue {smpClientVersion} = do - srv <- getSMPServer c +createReplyQueue c ConnData {connId, enableNtfs} SndQueue {server, smpClientVersion} = do + srv <- getSMPServer c server (rq, qUri) <- newRcvQueue c srv (versionToRange smpClientVersion) True let qInfo = toVersionT qUri smpClientVersion addSubscription c rq connId @@ -467,7 +467,7 @@ createNextRcvQueue c cData rq@RcvQueue {server, sndId} sq = do let queueAddress = SMPQueueAddress {smpServer, senderId, dhPublicKey = C.publicKey e2ePrivKey} pure SMPQueueUri {clientVRange, queueAddress} _ -> do - srv <- getSMPServer c + srv <- getSMPServer c server (rq', qUri) <- newRcvQueue c srv clientVRange False withStore' c $ \db -> dbCreateNextRcvQueue db rq rq' pure qUri @@ -1085,15 +1085,24 @@ suspendAgent' c@AgentClient {agentState = as} maxDelay = do -- unsafeIOToSTM $ putStrLn $ "in timeout: suspendSendingAndDatabase" suspendSendingAndDatabase c -getSMPServer :: AgentMonad m => AgentClient -> m SMPServer -getSMPServer c = do - smpServers <- readTVarIO $ smpServers c - case smpServers of - srv :| [] -> pure srv - servers -> do - gen <- asks randomServer - atomically . stateTVar gen $ - first (servers L.!!) . randomR (0, L.length servers - 1) +getAnySMPServer :: AgentMonad m => AgentClient -> m SMPServer +getAnySMPServer c = readTVarIO (smpServers c) >>= pickServer + +pickServer :: AgentMonad m => NonEmpty SMPServer -> m SMPServer +pickServer = \case + srv :| [] -> pure srv + servers -> do + gen <- asks randomServer + atomically $ (servers L.!!) <$> stateTVar gen (randomR (0, L.length servers - 1)) + +getSMPServer :: AgentMonad m => AgentClient -> SMPServer -> m SMPServer +getSMPServer c (SMPServer host port _) = do + srvs <- readTVarIO $ smpServers c + case L.nonEmpty $ L.filter different srvs of + Just srvs' -> pickServer srvs' + _ -> pure $ L.head srvs + where + different (SMPServer host' port' _) = host /= host' || port /= port' subscriber :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () subscriber c@AgentClient {msgQ} = forever $ do