From 927ff230daa2ce5a8200640f069989ccb4fd2682 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Sun, 28 Feb 2021 14:59:29 +0000 Subject: [PATCH] Separate db connection for each TCP client connection (#60) * use separate db connections for each TCP client connection * refactor atomically, increast delay in tests * increase test delay for SMP server to start * increase SMP ping frequency * remove comment * separate SQLite connection per thread, to support multi-threaded mode * remove redundant import --- src/Simplex/Messaging/Agent.hs | 85 +++++++++++---------- src/Simplex/Messaging/Agent/Client.hs | 11 ++- src/Simplex/Messaging/Agent/Env/SQLite.hs | 5 +- src/Simplex/Messaging/Agent/Store/SQLite.hs | 14 +++- tests/AgentTests/SQLiteTests.hs | 2 +- tests/SMPAgentClient.hs | 4 +- tests/SMPClient.hs | 3 +- 7 files changed, 65 insertions(+), 59 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 4dfee0849..9893b1d47 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -28,7 +28,7 @@ import Data.Time.Clock import Simplex.Messaging.Agent.Client import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Store -import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore) +import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore, connectSQLiteStore) import Simplex.Messaging.Agent.Transmission import Simplex.Messaging.Client (SMPServerTransmission) import qualified Simplex.Messaging.Crypto as C @@ -55,9 +55,9 @@ runSMPAgent cfg@AgentConfig {tcpPort} = runReaderT smpAgent =<< newSMPAgentEnv c getSMPAgentClient :: (MonadUnliftIO m, MonadReader Env m) => m AgentClient getSMPAgentClient = do - q <- asks $ tbqSize . config n <- asks clientCounter - atomically $ newAgentClient n q + cfg <- asks config + atomically $ newAgentClient n cfg connectClient :: MonadUnliftIO m => Handle -> AgentClient -> m () connectClient h c = race_ (send h c) (receive h c) @@ -68,7 +68,11 @@ logConnection c connected = in logInfo $ T.unwords ["client", showText (clientId c), event, "Agent"] runSMPAgentClient :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () -runSMPAgentClient c = race_ (subscriber c) (client c) +runSMPAgentClient c = do + db <- asks $ dbFile . config + s1 <- connectSQLiteStore db + s2 <- connectSQLiteStore db + race_ (subscriber c s1) (client c s2) receive :: forall m. MonadUnliftIO m => Handle -> AgentClient -> m () receive h c@AgentClient {rcvQ, sndQ} = forever $ do @@ -92,28 +96,27 @@ logClient :: MonadUnliftIO m => AgentClient -> ByteString -> ATransmission a -> logClient AgentClient {clientId} dir (CorrId corrId, cAlias, cmd) = do logInfo . decodeUtf8 $ B.unwords [B.pack $ show clientId, dir, "A :", corrId, cAlias, B.takeWhile (/= ' ') $ serializeCommand cmd] -client :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () -client c@AgentClient {rcvQ, sndQ} = forever $ do +client :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> SQLiteStore -> m () +client c@AgentClient {rcvQ, sndQ} st = forever $ do t@(corrId, cAlias, _) <- atomically $ readTBQueue rcvQ - runExceptT (processCommand c t) >>= \case + runExceptT (processCommand c st t) >>= \case Left e -> atomically $ writeTBQueue sndQ (corrId, cAlias, ERR e) Right _ -> return () withStore :: AgentMonad m => - (forall m'. (MonadUnliftIO m', MonadError StoreError m') => SQLiteStore -> m' a) -> + (forall m'. (MonadUnliftIO m', MonadError StoreError m') => m' a) -> m a withStore action = do - store <- asks db - runExceptT (action store `E.catch` handleInternal) >>= \case + runExceptT (action `E.catch` handleInternal) >>= \case Right c -> return c Left _ -> throwError STORE where handleInternal :: (MonadError StoreError m') => SomeException -> m' a handleInternal _ = throwError SEInternal -processCommand :: forall m. AgentMonad m => AgentClient -> ATransmission 'Client -> m () -processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) = +processCommand :: forall m. AgentMonad m => AgentClient -> SQLiteStore -> ATransmission 'Client -> m () +processCommand c@AgentClient {sndQ} st (corrId, connAlias, cmd) = case cmd of NEW smpServer -> createNewConnection smpServer JOIN smpQueueInfo replyMode -> joinConnection smpQueueInfo replyMode @@ -127,7 +130,7 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) = -- TODO create connection alias if not passed -- make connAlias Maybe? (rq, qInfo) <- newReceiveQueue c server connAlias - withStore $ \st -> createRcvConn st rq + withStore $ createRcvConn st rq respond $ INV qInfo joinConnection :: SMPQueueInfo -> ReplyMode -> m () @@ -135,8 +138,8 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) = -- TODO create connection alias if not passed -- make connAlias Maybe? (sq, senderKey, verifyKey) <- newSendQueue qInfo connAlias - withStore $ \st -> createSndConn st sq - connectToSendQueue c sq senderKey verifyKey + withStore $ createSndConn st sq + connectToSendQueue c st sq senderKey verifyKey case replyMode of ReplyOn -> sendReplyQInfo srv sq ReplyVia srv' -> sendReplyQInfo srv' sq @@ -145,7 +148,7 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) = subscribeConnection :: m () subscribeConnection = - withStore (`getConn` connAlias) >>= \case + withStore (getConn st connAlias) >>= \case SomeConn _ (DuplexConnection _ rq _) -> subscribe rq SomeConn _ (RcvConnection _ rq) -> subscribe rq -- TODO possibly there should be a separate error type trying @@ -156,7 +159,7 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) = sendMessage :: MsgBody -> m () sendMessage msgBody = - withStore (`getConn` connAlias) >>= \case + withStore (getConn st connAlias) >>= \case SomeConn _ (DuplexConnection _ _ sq) -> sendMsg sq SomeConn _ (SndConnection _ sq) -> sendMsg sq -- TODO possibly there should be a separate error type trying @@ -165,13 +168,13 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) = where sendMsg sq = do senderTs <- liftIO getCurrentTime - senderId <- withStore $ \st -> createSndMsg st connAlias msgBody senderTs + senderId <- withStore $ createSndMsg st connAlias msgBody senderTs sendAgentMessage c sq senderTs $ A_MSG msgBody respond $ SENT senderId suspendConnection :: m () suspendConnection = - withStore (`getConn` connAlias) >>= \case + withStore (getConn st connAlias) >>= \case SomeConn _ (DuplexConnection _ rq _) -> suspend rq SomeConn _ (RcvConnection _ rq) -> suspend rq _ -> throwError PROHIBITED @@ -180,7 +183,7 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) = deleteConnection :: m () deleteConnection = - withStore (`getConn` connAlias) >>= \case + withStore (getConn st connAlias) >>= \case SomeConn _ (DuplexConnection _ rq _) -> delete rq SomeConn _ (RcvConnection _ rq) -> delete rq _ -> throwError PROHIBITED @@ -188,30 +191,30 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) = delete rq = do deleteQueue c rq removeSubscription c connAlias - withStore (`deleteConn` connAlias) + withStore (deleteConn st connAlias) respond OK sendReplyQInfo :: SMPServer -> SndQueue -> m () sendReplyQInfo srv sq = do (rq, qInfo) <- newReceiveQueue c srv connAlias - withStore $ \st -> upgradeSndConnToDuplex st connAlias rq + withStore $ upgradeSndConnToDuplex st connAlias rq senderTs <- liftIO getCurrentTime sendAgentMessage c sq senderTs $ REPLY qInfo respond :: ACommand 'Agent -> m () respond resp = atomically $ writeTBQueue sndQ (corrId, connAlias, resp) -subscriber :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () -subscriber c@AgentClient {msgQ} = forever $ do +subscriber :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> SQLiteStore -> m () +subscriber c@AgentClient {msgQ} st = forever $ do -- TODO this will only process messages and notifications t <- atomically $ readTBQueue msgQ - runExceptT (processSMPTransmission c t) >>= \case + runExceptT (processSMPTransmission c st t) >>= \case Left e -> liftIO $ print e Right _ -> return () -processSMPTransmission :: forall m. AgentMonad m => AgentClient -> SMPServerTransmission -> m () -processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do - rq@RcvQueue {connAlias, decryptKey, status} <- withStore $ \st -> getRcvQueue st srv rId +processSMPTransmission :: forall m. AgentMonad m => AgentClient -> SQLiteStore -> SMPServerTransmission -> m () +processSMPTransmission c@AgentClient {sndQ} st (srv, rId, cmd) = do + rq@RcvQueue {connAlias, decryptKey, status} <- withStore $ getRcvQueue st srv rId case cmd of SMP.MSG srvMsgId srvTs msgBody -> do -- TODO deduplicate with previously received @@ -222,13 +225,11 @@ processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do case status of New -> do -- TODO currently it automatically allows whoever sends the confirmation - -- Commands CONF and LET are not implemented yet - -- They are probably not needed in v0.2? - -- TODO notification that connection confirmed? - withStore $ \st -> setRcvQueueStatus st rq Confirmed - -- TODO update sender key in the store + -- Commands CONF and LET are not supported in v0.2 + withStore $ setRcvQueueStatus st rq Confirmed + -- TODO update sender key in the store? secureQueue c rq senderKey - withStore $ \st -> setRcvQueueStatus st rq Secured + withStore $ setRcvQueueStatus st rq Secured sendAck c rq s -> -- TODO maybe send notification to the user @@ -238,21 +239,21 @@ processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do HELLO _verifyKey _ -> do logServer "<--" c srv rId "MSG " -- TODO send status update to the user? - withStore $ \st -> setRcvQueueStatus st rq Active + withStore $ setRcvQueueStatus st rq Active sendAck c rq REPLY qInfo -> do logServer "<--" c srv rId "MSG " -- TODO move senderKey inside SndQueue (sq, senderKey, verifyKey) <- newSendQueue qInfo connAlias - withStore $ \st -> upgradeRcvConnToDuplex st connAlias sq - connectToSendQueue c sq senderKey verifyKey + withStore $ upgradeRcvConnToDuplex st connAlias sq + connectToSendQueue c st sq senderKey verifyKey notify connAlias CON sendAck c rq A_MSG body -> do logServer "<--" c srv rId "MSG " -- TODO check message status recipientTs <- liftIO getCurrentTime - recipientId <- withStore $ \st -> createRcvMsg st connAlias body recipientTs senderMsgId senderTimestamp srvMsgId srvTs + recipientId <- withStore $ createRcvMsg st connAlias body recipientTs senderMsgId senderTimestamp srvMsgId srvTs notify connAlias $ MSG { m_status = MsgOk, @@ -272,12 +273,12 @@ processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do notify :: ConnAlias -> ACommand 'Agent -> m () notify connAlias msg = atomically $ writeTBQueue sndQ ("", connAlias, msg) -connectToSendQueue :: AgentMonad m => AgentClient -> SndQueue -> SenderPublicKey -> VerificationKey -> m () -connectToSendQueue c sq senderKey verifyKey = do +connectToSendQueue :: AgentMonad m => AgentClient -> SQLiteStore -> SndQueue -> SenderPublicKey -> VerificationKey -> m () +connectToSendQueue c st sq senderKey verifyKey = do sendConfirmation c sq senderKey - withStore $ \st -> setSndQueueStatus st sq Confirmed + withStore $ setSndQueueStatus st sq Confirmed sendHello c sq verifyKey - withStore $ \st -> setSndQueueStatus st sq Active + withStore $ setSndQueueStatus st sq Active decryptMessage :: (MonadUnliftIO m, MonadError AgentErrorType m) => DecryptionKey -> ByteString -> m ByteString decryptMessage decryptKey msg = liftError CRYPTO $ C.decrypt decryptKey msg diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index b49c294d7..4a313c55d 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -41,7 +41,6 @@ import Data.Set (Set) import qualified Data.Set as S import Data.Text.Encoding import Data.Time.Clock -import Numeric.Natural import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Transmission @@ -65,11 +64,11 @@ data AgentClient = AgentClient clientId :: Int } -newAgentClient :: TVar Int -> Natural -> STM AgentClient -newAgentClient cc qSize = do - rcvQ <- newTBQueue qSize - sndQ <- newTBQueue qSize - msgQ <- newTBQueue qSize +newAgentClient :: TVar Int -> AgentConfig -> STM AgentClient +newAgentClient cc AgentConfig {tbqSize} = do + rcvQ <- newTBQueue tbqSize + sndQ <- newTBQueue tbqSize + msgQ <- newTBQueue tbqSize smpClients <- newTVar M.empty subscrSrvrs <- newTVar M.empty subscrConns <- newTVar M.empty diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index ae2d6053b..1c7dedb51 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -25,13 +25,12 @@ data AgentConfig = AgentConfig data Env = Env { config :: AgentConfig, idsDrg :: TVar ChaChaDRG, - db :: SQLiteStore, clientCounter :: TVar Int } newSMPAgentEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env newSMPAgentEnv config = do idsDrg <- drgNew >>= newTVarIO - db <- newSQLiteStore $ dbFile config + _ <- createSQLiteStore $ dbFile config clientCounter <- newTVarIO 0 - return Env {config, idsDrg, db, clientCounter} + return Env {config, idsDrg, clientCounter} diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 9912c304c..c64d07767 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -13,7 +13,8 @@ module Simplex.Messaging.Agent.Store.SQLite ( SQLiteStore (..), - newSQLiteStore, + createSQLiteStore, + connectSQLiteStore, ) where @@ -45,10 +46,15 @@ data SQLiteStore = SQLiteStore dbConn :: DB.Connection } -newSQLiteStore :: MonadUnliftIO m => String -> m SQLiteStore -newSQLiteStore dbFilename = do +createSQLiteStore :: MonadUnliftIO m => String -> m SQLiteStore +createSQLiteStore dbFilename = do + store <- connectSQLiteStore dbFilename + liftIO . createSchema $ dbConn store + return store + +connectSQLiteStore :: MonadUnliftIO m => String -> m SQLiteStore +connectSQLiteStore dbFilename = do dbConn <- liftIO $ DB.open dbFilename - liftIO $ createSchema dbConn return SQLiteStore {dbFilename, dbConn} instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteStore m where diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 92b122ace..1f168191f 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -31,7 +31,7 @@ withStore = before createStore . after removeStore -- Randomize DB file name to avoid SQLite IO errors supposedly caused by asynchronous -- IO operations on multiple similarly named files; error seems to be environment specific r <- randomIO :: IO Word32 - newSQLiteStore $ testDB <> show r + createSQLiteStore $ testDB <> show r removeStore :: SQLiteStore -> IO () removeStore store = do diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index afee18fe5..14cb55def 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -118,7 +118,7 @@ cfg = { qSize = 1, defaultPort = testPort, tcpTimeout = 500_000, - smpPing = 2_000_000 + smpPing = 30_000_000 } } @@ -136,7 +136,7 @@ withSmpAgent = withSmpAgentOn (agentTestPort, testDB) testSMPAgentClientOn :: MonadUnliftIO m => ServiceName -> (Handle -> m a) -> m a testSMPAgentClientOn port' client = do - threadDelay 200_000 -- TODO hack: thread delay for SMP agent to start + threadDelay 250_000 -- TODO hack: thread delay for SMP agent to start runTCPClient agentTestHost port' $ \h -> do line <- liftIO $ getLn h if line == "Welcome to SMP v0.2.0 agent" diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 274257a64..99f6004f2 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -1,4 +1,5 @@ {-# LANGUAGE BlockArguments #-} +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -24,7 +25,7 @@ testPort = "5000" testSMPClient :: MonadUnliftIO m => (Handle -> m a) -> m a testSMPClient client = do - threadDelay 20000 -- TODO hack: thread delay for SMP server to start + threadDelay 50_000 -- TODO hack: thread delay for SMP server to start runTCPClient testHost testPort $ \h -> do line <- liftIO $ getLn h if line == "Welcome to SMP v0.2.0"