Separate db connection for each TCP client connection (#60)

* use separate db connections for each TCP client connection

* refactor atomically, increast delay in tests

* increase test delay for SMP server to start

* increase SMP ping frequency

* remove comment

* separate SQLite connection per thread, to support multi-threaded mode

* remove redundant import
This commit is contained in:
Evgeny Poberezkin
2021-02-28 14:59:29 +00:00
committed by GitHub
parent 7570ef9e22
commit 927ff230da
7 changed files with 65 additions and 59 deletions
+43 -42
View File
@@ -28,7 +28,7 @@ import Data.Time.Clock
import Simplex.Messaging.Agent.Client
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore)
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore, connectSQLiteStore)
import Simplex.Messaging.Agent.Transmission
import Simplex.Messaging.Client (SMPServerTransmission)
import qualified Simplex.Messaging.Crypto as C
@@ -55,9 +55,9 @@ runSMPAgent cfg@AgentConfig {tcpPort} = runReaderT smpAgent =<< newSMPAgentEnv c
getSMPAgentClient :: (MonadUnliftIO m, MonadReader Env m) => m AgentClient
getSMPAgentClient = do
q <- asks $ tbqSize . config
n <- asks clientCounter
atomically $ newAgentClient n q
cfg <- asks config
atomically $ newAgentClient n cfg
connectClient :: MonadUnliftIO m => Handle -> AgentClient -> m ()
connectClient h c = race_ (send h c) (receive h c)
@@ -68,7 +68,11 @@ logConnection c connected =
in logInfo $ T.unwords ["client", showText (clientId c), event, "Agent"]
runSMPAgentClient :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
runSMPAgentClient c = race_ (subscriber c) (client c)
runSMPAgentClient c = do
db <- asks $ dbFile . config
s1 <- connectSQLiteStore db
s2 <- connectSQLiteStore db
race_ (subscriber c s1) (client c s2)
receive :: forall m. MonadUnliftIO m => Handle -> AgentClient -> m ()
receive h c@AgentClient {rcvQ, sndQ} = forever $ do
@@ -92,28 +96,27 @@ logClient :: MonadUnliftIO m => AgentClient -> ByteString -> ATransmission a ->
logClient AgentClient {clientId} dir (CorrId corrId, cAlias, cmd) = do
logInfo . decodeUtf8 $ B.unwords [B.pack $ show clientId, dir, "A :", corrId, cAlias, B.takeWhile (/= ' ') $ serializeCommand cmd]
client :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
client c@AgentClient {rcvQ, sndQ} = forever $ do
client :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> SQLiteStore -> m ()
client c@AgentClient {rcvQ, sndQ} st = forever $ do
t@(corrId, cAlias, _) <- atomically $ readTBQueue rcvQ
runExceptT (processCommand c t) >>= \case
runExceptT (processCommand c st t) >>= \case
Left e -> atomically $ writeTBQueue sndQ (corrId, cAlias, ERR e)
Right _ -> return ()
withStore ::
AgentMonad m =>
(forall m'. (MonadUnliftIO m', MonadError StoreError m') => SQLiteStore -> m' a) ->
(forall m'. (MonadUnliftIO m', MonadError StoreError m') => m' a) ->
m a
withStore action = do
store <- asks db
runExceptT (action store `E.catch` handleInternal) >>= \case
runExceptT (action `E.catch` handleInternal) >>= \case
Right c -> return c
Left _ -> throwError STORE
where
handleInternal :: (MonadError StoreError m') => SomeException -> m' a
handleInternal _ = throwError SEInternal
processCommand :: forall m. AgentMonad m => AgentClient -> ATransmission 'Client -> m ()
processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
processCommand :: forall m. AgentMonad m => AgentClient -> SQLiteStore -> ATransmission 'Client -> m ()
processCommand c@AgentClient {sndQ} st (corrId, connAlias, cmd) =
case cmd of
NEW smpServer -> createNewConnection smpServer
JOIN smpQueueInfo replyMode -> joinConnection smpQueueInfo replyMode
@@ -127,7 +130,7 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
-- TODO create connection alias if not passed
-- make connAlias Maybe?
(rq, qInfo) <- newReceiveQueue c server connAlias
withStore $ \st -> createRcvConn st rq
withStore $ createRcvConn st rq
respond $ INV qInfo
joinConnection :: SMPQueueInfo -> ReplyMode -> m ()
@@ -135,8 +138,8 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
-- TODO create connection alias if not passed
-- make connAlias Maybe?
(sq, senderKey, verifyKey) <- newSendQueue qInfo connAlias
withStore $ \st -> createSndConn st sq
connectToSendQueue c sq senderKey verifyKey
withStore $ createSndConn st sq
connectToSendQueue c st sq senderKey verifyKey
case replyMode of
ReplyOn -> sendReplyQInfo srv sq
ReplyVia srv' -> sendReplyQInfo srv' sq
@@ -145,7 +148,7 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
subscribeConnection :: m ()
subscribeConnection =
withStore (`getConn` connAlias) >>= \case
withStore (getConn st connAlias) >>= \case
SomeConn _ (DuplexConnection _ rq _) -> subscribe rq
SomeConn _ (RcvConnection _ rq) -> subscribe rq
-- TODO possibly there should be a separate error type trying
@@ -156,7 +159,7 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
sendMessage :: MsgBody -> m ()
sendMessage msgBody =
withStore (`getConn` connAlias) >>= \case
withStore (getConn st connAlias) >>= \case
SomeConn _ (DuplexConnection _ _ sq) -> sendMsg sq
SomeConn _ (SndConnection _ sq) -> sendMsg sq
-- TODO possibly there should be a separate error type trying
@@ -165,13 +168,13 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
where
sendMsg sq = do
senderTs <- liftIO getCurrentTime
senderId <- withStore $ \st -> createSndMsg st connAlias msgBody senderTs
senderId <- withStore $ createSndMsg st connAlias msgBody senderTs
sendAgentMessage c sq senderTs $ A_MSG msgBody
respond $ SENT senderId
suspendConnection :: m ()
suspendConnection =
withStore (`getConn` connAlias) >>= \case
withStore (getConn st connAlias) >>= \case
SomeConn _ (DuplexConnection _ rq _) -> suspend rq
SomeConn _ (RcvConnection _ rq) -> suspend rq
_ -> throwError PROHIBITED
@@ -180,7 +183,7 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
deleteConnection :: m ()
deleteConnection =
withStore (`getConn` connAlias) >>= \case
withStore (getConn st connAlias) >>= \case
SomeConn _ (DuplexConnection _ rq _) -> delete rq
SomeConn _ (RcvConnection _ rq) -> delete rq
_ -> throwError PROHIBITED
@@ -188,30 +191,30 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
delete rq = do
deleteQueue c rq
removeSubscription c connAlias
withStore (`deleteConn` connAlias)
withStore (deleteConn st connAlias)
respond OK
sendReplyQInfo :: SMPServer -> SndQueue -> m ()
sendReplyQInfo srv sq = do
(rq, qInfo) <- newReceiveQueue c srv connAlias
withStore $ \st -> upgradeSndConnToDuplex st connAlias rq
withStore $ upgradeSndConnToDuplex st connAlias rq
senderTs <- liftIO getCurrentTime
sendAgentMessage c sq senderTs $ REPLY qInfo
respond :: ACommand 'Agent -> m ()
respond resp = atomically $ writeTBQueue sndQ (corrId, connAlias, resp)
subscriber :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
subscriber c@AgentClient {msgQ} = forever $ do
subscriber :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> SQLiteStore -> m ()
subscriber c@AgentClient {msgQ} st = forever $ do
-- TODO this will only process messages and notifications
t <- atomically $ readTBQueue msgQ
runExceptT (processSMPTransmission c t) >>= \case
runExceptT (processSMPTransmission c st t) >>= \case
Left e -> liftIO $ print e
Right _ -> return ()
processSMPTransmission :: forall m. AgentMonad m => AgentClient -> SMPServerTransmission -> m ()
processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do
rq@RcvQueue {connAlias, decryptKey, status} <- withStore $ \st -> getRcvQueue st srv rId
processSMPTransmission :: forall m. AgentMonad m => AgentClient -> SQLiteStore -> SMPServerTransmission -> m ()
processSMPTransmission c@AgentClient {sndQ} st (srv, rId, cmd) = do
rq@RcvQueue {connAlias, decryptKey, status} <- withStore $ getRcvQueue st srv rId
case cmd of
SMP.MSG srvMsgId srvTs msgBody -> do
-- TODO deduplicate with previously received
@@ -222,13 +225,11 @@ processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do
case status of
New -> do
-- TODO currently it automatically allows whoever sends the confirmation
-- Commands CONF and LET are not implemented yet
-- They are probably not needed in v0.2?
-- TODO notification that connection confirmed?
withStore $ \st -> setRcvQueueStatus st rq Confirmed
-- TODO update sender key in the store
-- Commands CONF and LET are not supported in v0.2
withStore $ setRcvQueueStatus st rq Confirmed
-- TODO update sender key in the store?
secureQueue c rq senderKey
withStore $ \st -> setRcvQueueStatus st rq Secured
withStore $ setRcvQueueStatus st rq Secured
sendAck c rq
s ->
-- TODO maybe send notification to the user
@@ -238,21 +239,21 @@ processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do
HELLO _verifyKey _ -> do
logServer "<--" c srv rId "MSG <HELLO>"
-- TODO send status update to the user?
withStore $ \st -> setRcvQueueStatus st rq Active
withStore $ setRcvQueueStatus st rq Active
sendAck c rq
REPLY qInfo -> do
logServer "<--" c srv rId "MSG <REPLY>"
-- TODO move senderKey inside SndQueue
(sq, senderKey, verifyKey) <- newSendQueue qInfo connAlias
withStore $ \st -> upgradeRcvConnToDuplex st connAlias sq
connectToSendQueue c sq senderKey verifyKey
withStore $ upgradeRcvConnToDuplex st connAlias sq
connectToSendQueue c st sq senderKey verifyKey
notify connAlias CON
sendAck c rq
A_MSG body -> do
logServer "<--" c srv rId "MSG <MSG>"
-- TODO check message status
recipientTs <- liftIO getCurrentTime
recipientId <- withStore $ \st -> createRcvMsg st connAlias body recipientTs senderMsgId senderTimestamp srvMsgId srvTs
recipientId <- withStore $ createRcvMsg st connAlias body recipientTs senderMsgId senderTimestamp srvMsgId srvTs
notify connAlias $
MSG
{ m_status = MsgOk,
@@ -272,12 +273,12 @@ processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do
notify :: ConnAlias -> ACommand 'Agent -> m ()
notify connAlias msg = atomically $ writeTBQueue sndQ ("", connAlias, msg)
connectToSendQueue :: AgentMonad m => AgentClient -> SndQueue -> SenderPublicKey -> VerificationKey -> m ()
connectToSendQueue c sq senderKey verifyKey = do
connectToSendQueue :: AgentMonad m => AgentClient -> SQLiteStore -> SndQueue -> SenderPublicKey -> VerificationKey -> m ()
connectToSendQueue c st sq senderKey verifyKey = do
sendConfirmation c sq senderKey
withStore $ \st -> setSndQueueStatus st sq Confirmed
withStore $ setSndQueueStatus st sq Confirmed
sendHello c sq verifyKey
withStore $ \st -> setSndQueueStatus st sq Active
withStore $ setSndQueueStatus st sq Active
decryptMessage :: (MonadUnliftIO m, MonadError AgentErrorType m) => DecryptionKey -> ByteString -> m ByteString
decryptMessage decryptKey msg = liftError CRYPTO $ C.decrypt decryptKey msg
+5 -6
View File
@@ -41,7 +41,6 @@ import Data.Set (Set)
import qualified Data.Set as S
import Data.Text.Encoding
import Data.Time.Clock
import Numeric.Natural
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Transmission
@@ -65,11 +64,11 @@ data AgentClient = AgentClient
clientId :: Int
}
newAgentClient :: TVar Int -> Natural -> STM AgentClient
newAgentClient cc qSize = do
rcvQ <- newTBQueue qSize
sndQ <- newTBQueue qSize
msgQ <- newTBQueue qSize
newAgentClient :: TVar Int -> AgentConfig -> STM AgentClient
newAgentClient cc AgentConfig {tbqSize} = do
rcvQ <- newTBQueue tbqSize
sndQ <- newTBQueue tbqSize
msgQ <- newTBQueue tbqSize
smpClients <- newTVar M.empty
subscrSrvrs <- newTVar M.empty
subscrConns <- newTVar M.empty
+2 -3
View File
@@ -25,13 +25,12 @@ data AgentConfig = AgentConfig
data Env = Env
{ config :: AgentConfig,
idsDrg :: TVar ChaChaDRG,
db :: SQLiteStore,
clientCounter :: TVar Int
}
newSMPAgentEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env
newSMPAgentEnv config = do
idsDrg <- drgNew >>= newTVarIO
db <- newSQLiteStore $ dbFile config
_ <- createSQLiteStore $ dbFile config
clientCounter <- newTVarIO 0
return Env {config, idsDrg, db, clientCounter}
return Env {config, idsDrg, clientCounter}
+10 -4
View File
@@ -13,7 +13,8 @@
module Simplex.Messaging.Agent.Store.SQLite
( SQLiteStore (..),
newSQLiteStore,
createSQLiteStore,
connectSQLiteStore,
)
where
@@ -45,10 +46,15 @@ data SQLiteStore = SQLiteStore
dbConn :: DB.Connection
}
newSQLiteStore :: MonadUnliftIO m => String -> m SQLiteStore
newSQLiteStore dbFilename = do
createSQLiteStore :: MonadUnliftIO m => String -> m SQLiteStore
createSQLiteStore dbFilename = do
store <- connectSQLiteStore dbFilename
liftIO . createSchema $ dbConn store
return store
connectSQLiteStore :: MonadUnliftIO m => String -> m SQLiteStore
connectSQLiteStore dbFilename = do
dbConn <- liftIO $ DB.open dbFilename
liftIO $ createSchema dbConn
return SQLiteStore {dbFilename, dbConn}
instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteStore m where
+1 -1
View File
@@ -31,7 +31,7 @@ withStore = before createStore . after removeStore
-- Randomize DB file name to avoid SQLite IO errors supposedly caused by asynchronous
-- IO operations on multiple similarly named files; error seems to be environment specific
r <- randomIO :: IO Word32
newSQLiteStore $ testDB <> show r
createSQLiteStore $ testDB <> show r
removeStore :: SQLiteStore -> IO ()
removeStore store = do
+2 -2
View File
@@ -118,7 +118,7 @@ cfg =
{ qSize = 1,
defaultPort = testPort,
tcpTimeout = 500_000,
smpPing = 2_000_000
smpPing = 30_000_000
}
}
@@ -136,7 +136,7 @@ withSmpAgent = withSmpAgentOn (agentTestPort, testDB)
testSMPAgentClientOn :: MonadUnliftIO m => ServiceName -> (Handle -> m a) -> m a
testSMPAgentClientOn port' client = do
threadDelay 200_000 -- TODO hack: thread delay for SMP agent to start
threadDelay 250_000 -- TODO hack: thread delay for SMP agent to start
runTCPClient agentTestHost port' $ \h -> do
line <- liftIO $ getLn h
if line == "Welcome to SMP v0.2.0 agent"
+2 -1
View File
@@ -1,4 +1,5 @@
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
@@ -24,7 +25,7 @@ testPort = "5000"
testSMPClient :: MonadUnliftIO m => (Handle -> m a) -> m a
testSMPClient client = do
threadDelay 20000 -- TODO hack: thread delay for SMP server to start
threadDelay 50_000 -- TODO hack: thread delay for SMP server to start
runTCPClient testHost testPort $ \h -> do
line <- liftIO $ getLn h
if line == "Welcome to SMP v0.2.0"