diff --git a/apps/smp-agent/Main.hs b/apps/smp-agent/Main.hs index df918fe5c..e5f33a219 100644 --- a/apps/smp-agent/Main.hs +++ b/apps/smp-agent/Main.hs @@ -1,7 +1,10 @@ +{-# LANGUAGE DuplicateRecordFields #-} + module Main where import Simplex.Messaging.Agent (runSMPAgent) import Simplex.Messaging.Agent.Env.SQLite +import Simplex.Messaging.Agent.ServerClient cfg :: AgentConfig cfg = @@ -9,10 +12,16 @@ cfg = { tcpPort = "5224", tbqSize = 16, connIdBytes = 12, - dbFile = "smp-agent.db" + dbFile = "smp-agent.db", + smpConfig = + ServerClientConfig + { tcpPort = "5223", + tbqSize = 16, + corrIdBytes = 4 + } } main :: IO () main = do - putStrLn $ "SMP agent listening on port " ++ tcpPort cfg + putStrLn $ "SMP agent listening on port " ++ tcpPort (cfg :: AgentConfig) runSMPAgent cfg diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 80bea40bf..9e8158423 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1,6 +1,7 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} @@ -12,9 +13,13 @@ import Control.Monad.IO.Unlift import Control.Monad.Reader import Crypto.Random import qualified Data.ByteString.Char8 as B +import qualified Data.Map as M import Simplex.Messaging.Agent.Env.SQLite +import Simplex.Messaging.Agent.ServerClient (ServerClient (..), newServerClient) import Simplex.Messaging.Agent.Transmission -import Simplex.Messaging.Server.Transmission (CorrId (..)) +import Simplex.Messaging.Server (randomBytes) +import Simplex.Messaging.Server.Transmission (Cmd (..), CorrId (..), SParty (..)) +import qualified Simplex.Messaging.Server.Transmission as SMP import Simplex.Messaging.Transport import UnliftIO.Async import UnliftIO.IO @@ -35,7 +40,7 @@ runSMPAgent cfg@AgentConfig {tcpPort} = do connectClient :: MonadUnliftIO m => Handle -> AgentClient -> m () connectClient h c = race_ (send h c) (receive h c) -runClient :: MonadUnliftIO m => AgentClient -> m () +runClient :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () runClient c = race_ (processSmp c) (client c) receive :: MonadUnliftIO m => Handle -> AgentClient -> m () @@ -48,17 +53,48 @@ receive h AgentClient {rcvQ, sndQ} = send :: MonadUnliftIO m => Handle -> AgentClient -> m () send h AgentClient {sndQ} = forever $ atomically (readTBQueue sndQ) >>= tPut h -client :: forall m. MonadUnliftIO m => AgentClient -> m () -client AgentClient {rcvQ, sndQ} = forever $ do - (corrId, cAlias, cmd) <- atomically (readTBQueue rcvQ) - processCommand cmd >>= \case +client :: forall m. (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () +client AgentClient {rcvQ, sndQ, respQ, commands} = forever $ do + t@(corrId, cAlias, cmd) <- atomically $ readTBQueue rcvQ + processCommand t cmd >>= \case Left e -> atomically $ writeTBQueue sndQ (corrId, cAlias, ERR e) Right _ -> return () where - processCommand :: ACommand 'Client -> m (Either ErrorType ()) - processCommand _ = return $ Left PROHIBITED + processCommand :: ATransmission 'Client -> ACommand 'Client -> m (Either ErrorType ()) + processCommand t = \case + NEW SMPServer {host, port, keyHash} (AckMode mode) -> do + cfg <- asks $ smpConfig . config + srv <- newServerClient cfg respQ host port + t <- mkSmpNEW t + atomically $ writeTBQueue (smpSndQ srv) t + liftIO $ putStrLn "sending NEW to server" + liftIO $ print t + return $ Right () + _ -> return $ Left PROHIBITED + + mkSmpNEW :: ATransmission 'Client -> m SMP.Transmission + mkSmpNEW t = do + g <- asks idsDrg + smpCorrId <- atomically $ CorrId <$> randomBytes 4 g + recipientKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair + let recipientPrivateKey = recipientKey + toSMP = ("", (smpCorrId, "", Cmd SRecipient $ SMP.NEW recipientKey)) + req = + Request + { fromClient = t, + toSMP, + state = NEWRequestState {recipientKey, recipientPrivateKey} + } + atomically . modifyTVar commands $ M.insert smpCorrId req + return toSMP processSmp :: MonadUnliftIO m => AgentClient -> m () -processSmp AgentClient {respQ, sndQ} = forever . atomically $ do - readTBQueue respQ - writeTBQueue sndQ (CorrId B.empty, B.empty, ERR UNKNOWN) +processSmp AgentClient {respQ, sndQ, commands} = forever $ do + (_, (smpCorrId, qId, cmdOrErr)) <- atomically $ readTBQueue respQ + liftIO $ putStrLn "received from server" + liftIO $ print (smpCorrId, qId, cmdOrErr) + req <- atomically $ M.lookup smpCorrId <$> readTVar commands + atomically $ case req of -- TODO empty correlation ID is ok - it can be a message + Nothing -> writeTBQueue sndQ ("", "", ERR $ SMP smpErrCorrelationId) + Just Request {fromClient = (corrId, cAlias, cmd), toSMP, state} -> do + writeTBQueue sndQ (corrId, cAlias, ERR UNKNOWN) diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index cbe906902..c684add8f 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -12,8 +12,11 @@ import qualified Data.Map.Strict as M import qualified Database.SQLite.Simple as DB import Network.Socket (HostName, ServiceName) import Numeric.Natural +import Simplex.Messaging.Agent.ServerClient +import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite.Schema import Simplex.Messaging.Agent.Transmission +import Simplex.Messaging.Server.Transmission (PublicKey) import qualified Simplex.Messaging.Server.Transmission as SMP import UnliftIO.STM @@ -21,7 +24,8 @@ data AgentConfig = AgentConfig { tcpPort :: ServiceName, tbqSize :: Natural, connIdBytes :: Int, - dbFile :: String + dbFile :: String, + smpConfig :: ServerClientConfig } data Env = Env @@ -33,13 +37,20 @@ data Env = Env data AgentClient = AgentClient { rcvQ :: TBQueue (ATransmission Client), sndQ :: TBQueue (ATransmission Agent), - respQ :: TBQueue (), - servers :: Map (HostName, ServiceName) ServerClient + respQ :: TBQueue SMP.TransmissionOrError, + servers :: TVar (Map (HostName, ServiceName) ServerClient), + commands :: TVar (Map SMP.CorrId Request) } -data ServerClient = ServerClient - { sndQ :: TBQueue SMP.Transmission, - commands :: Map SMP.QueueId (TBQueue SMP.Cmd) +data Request = Request + { fromClient :: ATransmission Client, + toSMP :: SMP.Transmission, + state :: RequestState + } + +data RequestState = NEWRequestState + { recipientKey :: PublicKey, + recipientPrivateKey :: PrivateKey } newAgentClient :: Natural -> STM AgentClient @@ -47,12 +58,9 @@ newAgentClient qSize = do rcvQ <- newTBQueue qSize sndQ <- newTBQueue qSize respQ <- newTBQueue qSize - return AgentClient {rcvQ, sndQ, respQ, servers = M.empty} - -newServerClient :: Natural -> STM ServerClient -newServerClient qSize = do - sndQ <- newTBQueue qSize - return ServerClient {sndQ, commands = M.empty} + servers <- newTVar M.empty + commands <- newTVar M.empty + return AgentClient {rcvQ, sndQ, respQ, servers, commands} openDB :: MonadUnliftIO m => AgentConfig -> m DB.Connection openDB AgentConfig {dbFile} = liftIO $ do diff --git a/src/Simplex/Messaging/Agent/ServerClient.hs b/src/Simplex/Messaging/Agent/ServerClient.hs new file mode 100644 index 000000000..872c2c46e --- /dev/null +++ b/src/Simplex/Messaging/Agent/ServerClient.hs @@ -0,0 +1,54 @@ +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Simplex.Messaging.Agent.ServerClient where + +import Control.Monad.IO.Unlift +import Data.Maybe +import Network.Socket (HostName, ServiceName) +import Numeric.Natural +import Simplex.Messaging.Agent.Store +import qualified Simplex.Messaging.Server.Transmission as SMP +import Simplex.Messaging.Transport +import UnliftIO.Async +import UnliftIO.IO +import UnliftIO.STM + +data ServerClientConfig = ServerClientConfig + { tcpPort :: ServiceName, + tbqSize :: Natural, + corrIdBytes :: Natural + } + +data ServerClient = ServerClient + { smpSndQ :: TBQueue SMP.Transmission, + smpRcvQ :: TBQueue SMP.TransmissionOrError + -- srvA :: Async () + } + +newServerClient :: + forall m. + MonadUnliftIO m => + ServerClientConfig -> + TBQueue SMP.TransmissionOrError -> + HostName -> + Maybe ServiceName -> + m ServerClient +newServerClient cfg smpRcvQ host port = do + smpSndQ <- atomically . newTBQueue $ tbqSize cfg + let c = ServerClient {smpSndQ, smpRcvQ} + _srvA <- async $ runClient (fromMaybe (tcpPort cfg) port) c + return c + where + runClient :: ServiceName -> ServerClient -> m () + runClient p c = + runTCPClient host p $ \h -> do + _line <- getLn h -- "Welcome to SMP" + -- TODO test connection failure + race_ (send h c) (receive h) + + send :: Handle -> ServerClient -> m () + send h ServerClient {smpSndQ} = atomically (readTBQueue smpSndQ) >>= SMP.tPut h + + receive :: Handle -> m () + receive h = SMP.tGet SMP.fromServer h >>= atomically . writeTBQueue smpRcvQ diff --git a/src/Simplex/Messaging/Agent/Transmission.hs b/src/Simplex/Messaging/Agent/Transmission.hs index 9a2b611dd..84dcb226a 100644 --- a/src/Simplex/Messaging/Agent/Transmission.hs +++ b/src/Simplex/Messaging/Agent/Transmission.hs @@ -1,5 +1,4 @@ {-# LANGUAGE DataKinds #-} -{-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} @@ -23,6 +22,7 @@ import Data.Time.Clock (UTCTime) import Data.Type.Equality import Data.Typeable () import Network.Socket +import Numeric.Natural import Simplex.Messaging.Server.Transmission (CorrId (..), Encoded, MsgBody, PublicKey, QueueId, errBadParameters, errMessageBody) import Simplex.Messaging.Transport import System.IO @@ -123,7 +123,7 @@ data MsgStatus = MsgOk | MsgError MsgErrorType data MsgErrorType = MsgSkipped AgentMsgId AgentMsgId | MsgBadId AgentMsgId | MsgBadHash deriving (Show) -data ErrorType = UNKNOWN | PROHIBITED | SYNTAX Int | SIZE -- etc. TODO SYNTAX Natural +data ErrorType = UNKNOWN | PROHIBITED | SYNTAX Int | SMP Natural | SIZE -- etc. TODO SYNTAX Natural deriving (Show) data AckStatus = AckOk | AckError AckErrorType @@ -138,6 +138,9 @@ errBadInvitation = 10 errNoConnAlias :: Int errNoConnAlias = 11 +smpErrCorrelationId :: Natural +smpErrCorrelationId = 1 + parseCommand :: ByteString -> Either ErrorType ACmd parseCommand command = case B.words command of ["NEW", srv] -> newConn srv . Right $ AckMode On diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 2f7b07988..433f673fa 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -10,7 +10,8 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} -module Simplex.Messaging.Server (runSMPServer) where +-- TODO move randomBytes to another module +module Simplex.Messaging.Server (runSMPServer, randomBytes) where import Control.Concurrent.STM (stateTVar) import Control.Monad diff --git a/src/Simplex/Messaging/Server/Transmission.hs b/src/Simplex/Messaging/Server/Transmission.hs index 74d5fdeba..1222b08c8 100644 --- a/src/Simplex/Messaging/Server/Transmission.hs +++ b/src/Simplex/Messaging/Server/Transmission.hs @@ -145,7 +145,7 @@ serializeCommand = \case type Encoded = ByteString -- newtype to avoid accidentally changing order of transmission parts -newtype CorrId = CorrId {bs :: ByteString} deriving (Eq, Show) +newtype CorrId = CorrId {bs :: ByteString} deriving (Eq, Ord, Show) instance IsString CorrId where fromString = CorrId . fromString