mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-14 16:15:12 +00:00
implement addServer command
This commit is contained in:
@@ -13,15 +13,19 @@ import Control.Monad.IO.Unlift
|
||||
import Control.Monad.Reader
|
||||
import Crypto.Random
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Int
|
||||
import qualified Data.Map as M
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.ServerClient (ServerClient (..), newServerClient)
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Server (randomBytes)
|
||||
import Simplex.Messaging.Server.Transmission (Cmd (..), CorrId (..), SParty (..))
|
||||
import qualified Simplex.Messaging.Server.Transmission as SMP
|
||||
import Simplex.Messaging.Transport
|
||||
import UnliftIO.Async
|
||||
import UnliftIO.Exception
|
||||
import UnliftIO.IO
|
||||
import UnliftIO.STM
|
||||
|
||||
@@ -54,19 +58,30 @@ send :: MonadUnliftIO m => Handle -> AgentClient -> m ()
|
||||
send h AgentClient {sndQ} = forever $ atomically (readTBQueue sndQ) >>= tPut h
|
||||
|
||||
client :: forall m. (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
|
||||
client AgentClient {rcvQ, sndQ, respQ, commands} = forever $ do
|
||||
client AgentClient {rcvQ, sndQ, respQ, servers, commands} = forever $ do
|
||||
t@(corrId, cAlias, cmd) <- atomically $ readTBQueue rcvQ
|
||||
processCommand t cmd >>= \case
|
||||
Left e -> atomically $ writeTBQueue sndQ (corrId, cAlias, ERR e)
|
||||
Right _ -> return ()
|
||||
where
|
||||
handler :: SomeException -> m (Either StoreError Int64)
|
||||
handler e = do
|
||||
liftIO (print e)
|
||||
return $ Right 1
|
||||
|
||||
processCommand :: ATransmission 'Client -> ACommand 'Client -> m (Either ErrorType ())
|
||||
processCommand t = \case
|
||||
NEW SMPServer {host, port, keyHash} (AckMode mode) -> do
|
||||
NEW server@SMPServer {host, port, keyHash} (AckMode mode) -> do
|
||||
cfg <- asks $ smpConfig . config
|
||||
srv <- newServerClient cfg respQ host port
|
||||
t <- mkSmpNEW t
|
||||
atomically $ writeTBQueue (smpSndQ srv) t
|
||||
maybeServer <- atomically $ M.lookup (host, fromMaybe "5223" port) <$> readTVar servers
|
||||
srv <- case maybeServer of
|
||||
Nothing -> do
|
||||
conn <- asks db
|
||||
_serverId <- addServer conn server `catch` handler
|
||||
newServerClient cfg respQ host port
|
||||
Just s -> return s
|
||||
_t <- mkSmpNEW t
|
||||
atomically $ writeTBQueue (smpSndQ srv) _t
|
||||
liftIO $ putStrLn "sending NEW to server"
|
||||
liftIO $ print t
|
||||
return $ Right ()
|
||||
|
||||
@@ -19,6 +19,7 @@ import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Server.Transmission (PublicKey)
|
||||
import qualified Simplex.Messaging.Server.Transmission as SMP
|
||||
import UnliftIO.STM
|
||||
import Simplex.Messaging.Agent.Store.SQLite
|
||||
|
||||
data AgentConfig = AgentConfig
|
||||
{ tcpPort :: ServiceName,
|
||||
@@ -31,7 +32,7 @@ data AgentConfig = AgentConfig
|
||||
data Env = Env
|
||||
{ config :: AgentConfig,
|
||||
idsDrg :: TVar ChaChaDRG,
|
||||
db :: DB.Connection
|
||||
db :: SQLiteStore
|
||||
}
|
||||
|
||||
data AgentClient = AgentClient
|
||||
@@ -62,10 +63,10 @@ newAgentClient qSize = do
|
||||
commands <- newTVar M.empty
|
||||
return AgentClient {rcvQ, sndQ, respQ, servers, commands}
|
||||
|
||||
openDB :: MonadUnliftIO m => AgentConfig -> m DB.Connection
|
||||
openDB :: MonadUnliftIO m => AgentConfig -> m SQLiteStore
|
||||
openDB AgentConfig {dbFile} = liftIO $ do
|
||||
db <- DB.open dbFile
|
||||
createSchema db
|
||||
db <- SQLiteStore <$> DB.open dbFile
|
||||
createSchema $ conn db
|
||||
return db
|
||||
|
||||
newEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env
|
||||
|
||||
@@ -12,6 +12,7 @@ import Data.Kind
|
||||
import Data.Time.Clock (UTCTime)
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Server.Transmission (Encoded, PublicKey, QueueId)
|
||||
import Data.Int (Int64)
|
||||
|
||||
data ReceiveQueue = ReceiveQueue
|
||||
{ server :: SMPServer,
|
||||
@@ -68,7 +69,10 @@ data DeliveryStatus
|
||||
| MDConfirmed -- SMP: OK received / ACK sent
|
||||
| MDAcknowledged AckStatus -- SAMP: RCVD sent to agent client / ACK received from agent client and sent to the server
|
||||
|
||||
type SMPServerId = Int64
|
||||
|
||||
class MonadAgentStore s m where
|
||||
addServer :: s -> SMPServer -> m (Either StoreError SMPServerId)
|
||||
createRcvConn :: s -> Maybe ConnAlias -> ReceiveQueue -> m (Either StoreError (Connection CReceive))
|
||||
createSndConn :: s -> Maybe ConnAlias -> SendQueue -> m (Either StoreError (Connection CSend))
|
||||
getConn :: s -> ConnAlias -> m (Either StoreError SomeConn)
|
||||
|
||||
@@ -1,18 +1,47 @@
|
||||
{-# LANGUAGE FlexibleInstances #-}
|
||||
{-# LANGUAGE InstanceSigs #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.SQLite where
|
||||
|
||||
import Control.Monad.IO.Unlift
|
||||
import Database.SQLite.Simple (NamedParam (..))
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import Multiline (s)
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
|
||||
addServerQuery :: DB.Query
|
||||
addServerQuery =
|
||||
[s|
|
||||
INSERT INTO servers (host_address, port, key_hash)
|
||||
VALUES (:host_address, :port, :key_hash)
|
||||
ON CONFLICT(host_address, port) DO UPDATE SET
|
||||
host_address=excluded.host_address,
|
||||
port=excluded.port,
|
||||
key_hash=excluded.key_hash;
|
||||
|]
|
||||
|
||||
newtype SQLiteStore = SQLiteStore {conn :: DB.Connection}
|
||||
|
||||
instance MonadUnliftIO m => MonadAgentStore SQLiteStore m where
|
||||
addServer :: SQLiteStore -> SMPServer -> m (Either StoreError SMPServerId)
|
||||
addServer store SMPServer {host, port, keyHash} = liftIO $ do
|
||||
DB.executeNamed (conn store) addServerQuery [":host_address" := host, ":port" := port, ":key_hash" := keyHash]
|
||||
Right <$> DB.lastInsertRowId (conn store)
|
||||
|
||||
-- instance MonadUnliftIO m => MonadQueueStore DB.Connection m where
|
||||
-- createRcvConn :: DB.Connection -> Maybe ConnAlias -> ReceiveQueue -> m (Either StoreError (Connection CReceive))
|
||||
-- createRcvConn conn connAlias q = do
|
||||
-- id <- query conn "INSERT ..."
|
||||
-- query conn "INSERT ..."
|
||||
|
||||
|
||||
|
||||
-- sqlite queries to create server, queue and connection
|
||||
|
||||
-- *** step 1 - insert server before create request to server
|
||||
|
||||
-- ! "INSERT OR REPLACE INTO" with autoincrement apparently would change id,
|
||||
-- ! so going with "ON CONFLICT UPDATE" here
|
||||
|
||||
@@ -24,6 +53,7 @@ import qualified Database.SQLite.Simple as DB
|
||||
-- key_hash=excluded.key_hash;
|
||||
|
||||
-- *** step 2 - insert queue and connection after server's response
|
||||
|
||||
-- BEGIN TRANSACTION;
|
||||
|
||||
-- INSERT INTO recipient_queues (
|
||||
@@ -61,4 +91,5 @@ import qualified Database.SQLite.Simple as DB
|
||||
-- );
|
||||
|
||||
-- COMMIT;
|
||||
|
||||
-- ***
|
||||
|
||||
@@ -17,7 +17,7 @@ servers =
|
||||
key_hash BLOB,
|
||||
UNIQUE (host_address, port)
|
||||
)
|
||||
|]
|
||||
|]
|
||||
|
||||
-- TODO unique constraints on (server_id, rcv_id) and (server_id, snd_id)
|
||||
recipientQueues :: Query
|
||||
@@ -35,7 +35,7 @@ recipientQueues =
|
||||
status TEXT,
|
||||
ack_mode INTEGER
|
||||
)
|
||||
|]
|
||||
|]
|
||||
|
||||
senderQueues :: Query
|
||||
senderQueues =
|
||||
|
||||
Reference in New Issue
Block a user