From 3c1834f93fface47cb0a0439cb3596f4082b08db Mon Sep 17 00:00:00 2001 From: Efim Poberezkin Date: Wed, 30 Dec 2020 18:54:17 +0400 Subject: [PATCH] implement addServer command --- src/Simplex/Messaging/Agent.hs | 25 ++++++++++--- src/Simplex/Messaging/Agent/Env/SQLite.hs | 9 +++-- src/Simplex/Messaging/Agent/Store.hs | 4 ++ src/Simplex/Messaging/Agent/Store/SQLite.hs | 37 +++++++++++++++++-- .../Messaging/Agent/Store/SQLite/Schema.hs | 4 +- 5 files changed, 65 insertions(+), 14 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 9e8158423..dd0096147 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -13,15 +13,19 @@ import Control.Monad.IO.Unlift import Control.Monad.Reader import Crypto.Random import qualified Data.ByteString.Char8 as B +import Data.Int import qualified Data.Map as M +import Data.Maybe (fromMaybe) import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.ServerClient (ServerClient (..), newServerClient) +import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Transmission import Simplex.Messaging.Server (randomBytes) import Simplex.Messaging.Server.Transmission (Cmd (..), CorrId (..), SParty (..)) import qualified Simplex.Messaging.Server.Transmission as SMP import Simplex.Messaging.Transport import UnliftIO.Async +import UnliftIO.Exception import UnliftIO.IO import UnliftIO.STM @@ -54,19 +58,30 @@ send :: MonadUnliftIO m => Handle -> AgentClient -> m () send h AgentClient {sndQ} = forever $ atomically (readTBQueue sndQ) >>= tPut h client :: forall m. (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () -client AgentClient {rcvQ, sndQ, respQ, commands} = forever $ do +client AgentClient {rcvQ, sndQ, respQ, servers, commands} = forever $ do t@(corrId, cAlias, cmd) <- atomically $ readTBQueue rcvQ processCommand t cmd >>= \case Left e -> atomically $ writeTBQueue sndQ (corrId, cAlias, ERR e) Right _ -> return () where + handler :: SomeException -> m (Either StoreError Int64) + handler e = do + liftIO (print e) + return $ Right 1 + processCommand :: ATransmission 'Client -> ACommand 'Client -> m (Either ErrorType ()) processCommand t = \case - NEW SMPServer {host, port, keyHash} (AckMode mode) -> do + NEW server@SMPServer {host, port, keyHash} (AckMode mode) -> do cfg <- asks $ smpConfig . config - srv <- newServerClient cfg respQ host port - t <- mkSmpNEW t - atomically $ writeTBQueue (smpSndQ srv) t + maybeServer <- atomically $ M.lookup (host, fromMaybe "5223" port) <$> readTVar servers + srv <- case maybeServer of + Nothing -> do + conn <- asks db + _serverId <- addServer conn server `catch` handler + newServerClient cfg respQ host port + Just s -> return s + _t <- mkSmpNEW t + atomically $ writeTBQueue (smpSndQ srv) _t liftIO $ putStrLn "sending NEW to server" liftIO $ print t return $ Right () diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index c684add8f..ffc922cdc 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -19,6 +19,7 @@ import Simplex.Messaging.Agent.Transmission import Simplex.Messaging.Server.Transmission (PublicKey) import qualified Simplex.Messaging.Server.Transmission as SMP import UnliftIO.STM +import Simplex.Messaging.Agent.Store.SQLite data AgentConfig = AgentConfig { tcpPort :: ServiceName, @@ -31,7 +32,7 @@ data AgentConfig = AgentConfig data Env = Env { config :: AgentConfig, idsDrg :: TVar ChaChaDRG, - db :: DB.Connection + db :: SQLiteStore } data AgentClient = AgentClient @@ -62,10 +63,10 @@ newAgentClient qSize = do commands <- newTVar M.empty return AgentClient {rcvQ, sndQ, respQ, servers, commands} -openDB :: MonadUnliftIO m => AgentConfig -> m DB.Connection +openDB :: MonadUnliftIO m => AgentConfig -> m SQLiteStore openDB AgentConfig {dbFile} = liftIO $ do - db <- DB.open dbFile - createSchema db + db <- SQLiteStore <$> DB.open dbFile + createSchema $ conn db return db newEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 580a41c4f..2eca846c3 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -12,6 +12,7 @@ import Data.Kind import Data.Time.Clock (UTCTime) import Simplex.Messaging.Agent.Transmission import Simplex.Messaging.Server.Transmission (Encoded, PublicKey, QueueId) +import Data.Int (Int64) data ReceiveQueue = ReceiveQueue { server :: SMPServer, @@ -68,7 +69,10 @@ data DeliveryStatus | MDConfirmed -- SMP: OK received / ACK sent | MDAcknowledged AckStatus -- SAMP: RCVD sent to agent client / ACK received from agent client and sent to the server +type SMPServerId = Int64 + class MonadAgentStore s m where + addServer :: s -> SMPServer -> m (Either StoreError SMPServerId) createRcvConn :: s -> Maybe ConnAlias -> ReceiveQueue -> m (Either StoreError (Connection CReceive)) createSndConn :: s -> Maybe ConnAlias -> SendQueue -> m (Either StoreError (Connection CSend)) getConn :: s -> ConnAlias -> m (Either StoreError SomeConn) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index a9de4aa8d..e4f7b261a 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -1,18 +1,47 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE InstanceSigs #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} + module Simplex.Messaging.Agent.Store.SQLite where +import Control.Monad.IO.Unlift +import Database.SQLite.Simple (NamedParam (..)) import qualified Database.SQLite.Simple as DB +import Multiline (s) +import Simplex.Messaging.Agent.Store +import Simplex.Messaging.Agent.Transmission + +addServerQuery :: DB.Query +addServerQuery = + [s| + INSERT INTO servers (host_address, port, key_hash) + VALUES (:host_address, :port, :key_hash) + ON CONFLICT(host_address, port) DO UPDATE SET + host_address=excluded.host_address, + port=excluded.port, + key_hash=excluded.key_hash; + |] + +newtype SQLiteStore = SQLiteStore {conn :: DB.Connection} + +instance MonadUnliftIO m => MonadAgentStore SQLiteStore m where + addServer :: SQLiteStore -> SMPServer -> m (Either StoreError SMPServerId) + addServer store SMPServer {host, port, keyHash} = liftIO $ do + DB.executeNamed (conn store) addServerQuery [":host_address" := host, ":port" := port, ":key_hash" := keyHash] + Right <$> DB.lastInsertRowId (conn store) --- instance MonadUnliftIO m => MonadQueueStore DB.Connection m where -- createRcvConn :: DB.Connection -> Maybe ConnAlias -> ReceiveQueue -> m (Either StoreError (Connection CReceive)) -- createRcvConn conn connAlias q = do -- id <- query conn "INSERT ..." -- query conn "INSERT ..." - - -- sqlite queries to create server, queue and connection -- *** step 1 - insert server before create request to server + -- ! "INSERT OR REPLACE INTO" with autoincrement apparently would change id, -- ! so going with "ON CONFLICT UPDATE" here @@ -24,6 +53,7 @@ import qualified Database.SQLite.Simple as DB -- key_hash=excluded.key_hash; -- *** step 2 - insert queue and connection after server's response + -- BEGIN TRANSACTION; -- INSERT INTO recipient_queues ( @@ -61,4 +91,5 @@ import qualified Database.SQLite.Simple as DB -- ); -- COMMIT; + -- *** diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Schema.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Schema.hs index 1188514b9..e7a0b2d9e 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Schema.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Schema.hs @@ -17,7 +17,7 @@ servers = key_hash BLOB, UNIQUE (host_address, port) ) - |] + |] -- TODO unique constraints on (server_id, rcv_id) and (server_id, snd_id) recipientQueues :: Query @@ -35,7 +35,7 @@ recipientQueues = status TEXT, ack_mode INTEGER ) - |] + |] senderQueues :: Query senderQueues =