From 5c6ec96d6477371d8e617bcc71e6ecbcdd5c78cc Mon Sep 17 00:00:00 2001 From: John Roberts <8711996+johnroberts755@users.noreply.github.com> Date: Thu, 10 Mar 2022 10:49:22 +0400 Subject: [PATCH] make smp servers configurable for running agent (#326) --- apps/smp-agent/Main.hs | 2 +- src/Simplex/Messaging/Agent.hs | 21 ++++++++++++++++----- src/Simplex/Messaging/Agent/Client.hs | 5 ++++- src/Simplex/Messaging/Agent/Env/SQLite.hs | 4 ++-- src/Simplex/Messaging/Protocol.hs | 4 ++++ tests/SMPAgentClient.hs | 4 ++-- 6 files changed, 29 insertions(+), 11 deletions(-) diff --git a/apps/smp-agent/Main.hs b/apps/smp-agent/Main.hs index 1087530f9..32e038f56 100644 --- a/apps/smp-agent/Main.hs +++ b/apps/smp-agent/Main.hs @@ -11,7 +11,7 @@ import Simplex.Messaging.Agent.Server (runSMPAgent) import Simplex.Messaging.Transport (TLS, Transport (..)) cfg :: AgentConfig -cfg = defaultAgentConfig {smpServers = L.fromList ["smp://bU0K-bRg24xWW__lS0umO1Zdw_SXqpJNtm1_RrPLViE=@localhost:5223"]} +cfg = defaultAgentConfig {initialSMPServers = L.fromList ["smp://bU0K-bRg24xWW__lS0umO1Zdw_SXqpJNtm1_RrPLViE=@localhost:5223"]} logCfg :: LogConfig logCfg = LogConfig {lc_file = Nothing, lc_stderr = True} diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 7b4e202bd..f96e8a723 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -47,6 +47,7 @@ module Simplex.Messaging.Agent ackMessage, suspendConnection, deleteConnection, + setSMPServers, logConnection, ) where @@ -143,6 +144,10 @@ suspendConnection c = withAgentEnv c . suspendConnection' c deleteConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m () deleteConnection c = withAgentEnv c . deleteConnection' c +-- | Change servers to be used for creating new queues +setSMPServers :: AgentErrorMonad m => AgentClient -> NonEmpty SMPServer -> m () +setSMPServers c = withAgentEnv c . setSMPServers' c + withAgentEnv :: AgentClient -> ReaderT Env m a -> m a withAgentEnv c = (`runReaderT` agentEnv c) @@ -209,7 +214,7 @@ processCommand c (connId, cmd) = case cmd of newConn :: AgentMonad m => AgentClient -> ConnId -> SConnectionMode c -> m (ConnId, ConnectionRequestUri c) newConn c connId cMode = do - srv <- getSMPServer + srv <- getSMPServer c (rq, qUri) <- newRcvQueue c srv g <- asks idsDrg let cData = ConnData {connId} @@ -262,7 +267,7 @@ joinConn c connId (CRContactUri (ConnReqUriData _ agentVRange (qUri :| _))) cInf createReplyQueue :: AgentMonad m => AgentClient -> ConnId -> SndQueue -> m () createReplyQueue c connId sq = do - srv <- getSMPServer + srv <- getSMPServer c (rq, qUri) <- newRcvQueue c srv -- TODO reply queue version should be the same as send queue, ignoring it in v1 let qInfo = toVersionT qUri SMP.smpClientVersion @@ -488,9 +493,15 @@ deleteConnection' c connId = removeSubscription c connId withStore (`deleteConn` connId) -getSMPServer :: AgentMonad m => m SMPServer -getSMPServer = - asks (smpServers . config) >>= \case +-- | Change servers to be used for creating new queues, in Reader monad +setSMPServers' :: forall m. AgentMonad m => AgentClient -> NonEmpty SMPServer -> m () +setSMPServers' c servers = do + atomically $ writeTVar (smpServers c) servers + +getSMPServer :: AgentMonad m => AgentClient -> m SMPServer +getSMPServer c = do + smpServers <- readTVarIO $ smpServers c + case smpServers of srv :| [] -> pure srv servers -> do gen <- asks randomServer diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index c2f87f764..140bcb20b 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -47,6 +47,7 @@ import Data.Bifunctor (first) import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import Data.List.NonEmpty (NonEmpty) import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Maybe (isNothing) @@ -76,6 +77,7 @@ data AgentClient = AgentClient { rcvQ :: TBQueue (ATransmission 'Client), subQ :: TBQueue (ATransmission 'Agent), msgQ :: TBQueue SMPServerTransmission, + smpServers :: TVar (NonEmpty SMPServer), smpClients :: TVar (Map SMPServer SMPClientVar), subscrSrvrs :: TVar (Map SMPServer (Map ConnId RcvQueue)), pendingSubscrSrvrs :: TVar (Map SMPServer (Map ConnId RcvQueue)), @@ -97,6 +99,7 @@ newAgentClient agentEnv = do rcvQ <- newTBQueue qSize subQ <- newTBQueue qSize msgQ <- newTBQueue qSize + smpServers <- newTVar $ initialSMPServers (config agentEnv) smpClients <- newTVar M.empty subscrSrvrs <- newTVar M.empty pendingSubscrSrvrs <- newTVar M.empty @@ -108,7 +111,7 @@ newAgentClient agentEnv = do asyncClients <- newTVar [] clientId <- stateTVar (clientCounter agentEnv) $ \i -> (i + 1, i + 1) lock <- newTMVar () - return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, reconnections, asyncClients, clientId, agentEnv, smpSubscriber = undefined, lock} + return AgentClient {rcvQ, subQ, msgQ, smpServers, smpClients, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, reconnections, asyncClients, clientId, agentEnv, smpSubscriber = undefined, lock} -- | Agent monad with MonadReader Env and MonadError AgentErrorType type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m) diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index ba54d00bc..a1f03cf6f 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -29,7 +29,7 @@ import UnliftIO.STM data AgentConfig = AgentConfig { tcpPort :: ServiceName, - smpServers :: NonEmpty SMPServer, + initialSMPServers :: NonEmpty SMPServer, cmdSignAlg :: C.SignAlg, connIdBytes :: Int, tbqSize :: Natural, @@ -48,7 +48,7 @@ defaultAgentConfig :: AgentConfig defaultAgentConfig = AgentConfig { tcpPort = "5224", - smpServers = undefined, -- TODO move it elsewhere? + initialSMPServers = undefined, -- TODO move it elsewhere? cmdSignAlg = C.SignAlg C.SEd448, connIdBytes = 12, tbqSize = 64, diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 0705ad697..0d148bf04 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -399,6 +399,10 @@ instance StrEncoding SMPServer where SrvLoc host port <- strP pure SMPServer {host, port, keyHash} +instance ToJSON SMPServer where + toJSON = strToJSON + toEncoding = strToJEncoding + data SrvLoc = SrvLoc HostName ServiceName deriving (Eq, Ord, Show) diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index 4cf4f308a..8c144509e 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -157,7 +157,7 @@ cfg :: AgentConfig cfg = defaultAgentConfig { tcpPort = agentTestPort, - smpServers = L.fromList ["smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5001"], + initialSMPServers = L.fromList ["smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5001"], tbqSize = 1, dbFile = testDB, smpCfg = @@ -174,7 +174,7 @@ cfg = withSmpAgentThreadOn_ :: (MonadUnliftIO m, MonadRandom m) => ATransport -> (ServiceName, ServiceName, String) -> m () -> (ThreadId -> m a) -> m a withSmpAgentThreadOn_ t (port', smpPort', db') afterProcess = - let cfg' = cfg {tcpPort = port', dbFile = db', smpServers = L.fromList [SMPServer "localhost" smpPort' testKeyHash]} + let cfg' = cfg {tcpPort = port', dbFile = db', initialSMPServers = L.fromList [SMPServer "localhost" smpPort' testKeyHash]} in serverBracket (\started -> runSMPAgentBlocking t started cfg') afterProcess