mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-14 16:15:12 +00:00
send NEW command to SMP server (response is not received for some reason...)
This commit is contained in:
+11
-2
@@ -1,7 +1,10 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
|
||||
module Main where
|
||||
|
||||
import Simplex.Messaging.Agent (runSMPAgent)
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.ServerClient
|
||||
|
||||
cfg :: AgentConfig
|
||||
cfg =
|
||||
@@ -9,10 +12,16 @@ cfg =
|
||||
{ tcpPort = "5224",
|
||||
tbqSize = 16,
|
||||
connIdBytes = 12,
|
||||
dbFile = "smp-agent.db"
|
||||
dbFile = "smp-agent.db",
|
||||
smpConfig =
|
||||
ServerClientConfig
|
||||
{ tcpPort = "5223",
|
||||
tbqSize = 16,
|
||||
corrIdBytes = 4
|
||||
}
|
||||
}
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
putStrLn $ "SMP agent listening on port " ++ tcpPort cfg
|
||||
putStrLn $ "SMP agent listening on port " ++ tcpPort (cfg :: AgentConfig)
|
||||
runSMPAgent cfg
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
@@ -12,9 +13,13 @@ import Control.Monad.IO.Unlift
|
||||
import Control.Monad.Reader
|
||||
import Crypto.Random
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.Map as M
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.ServerClient (ServerClient (..), newServerClient)
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Server.Transmission (CorrId (..))
|
||||
import Simplex.Messaging.Server (randomBytes)
|
||||
import Simplex.Messaging.Server.Transmission (Cmd (..), CorrId (..), SParty (..))
|
||||
import qualified Simplex.Messaging.Server.Transmission as SMP
|
||||
import Simplex.Messaging.Transport
|
||||
import UnliftIO.Async
|
||||
import UnliftIO.IO
|
||||
@@ -35,7 +40,7 @@ runSMPAgent cfg@AgentConfig {tcpPort} = do
|
||||
connectClient :: MonadUnliftIO m => Handle -> AgentClient -> m ()
|
||||
connectClient h c = race_ (send h c) (receive h c)
|
||||
|
||||
runClient :: MonadUnliftIO m => AgentClient -> m ()
|
||||
runClient :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
|
||||
runClient c = race_ (processSmp c) (client c)
|
||||
|
||||
receive :: MonadUnliftIO m => Handle -> AgentClient -> m ()
|
||||
@@ -48,17 +53,48 @@ receive h AgentClient {rcvQ, sndQ} =
|
||||
send :: MonadUnliftIO m => Handle -> AgentClient -> m ()
|
||||
send h AgentClient {sndQ} = forever $ atomically (readTBQueue sndQ) >>= tPut h
|
||||
|
||||
client :: forall m. MonadUnliftIO m => AgentClient -> m ()
|
||||
client AgentClient {rcvQ, sndQ} = forever $ do
|
||||
(corrId, cAlias, cmd) <- atomically (readTBQueue rcvQ)
|
||||
processCommand cmd >>= \case
|
||||
client :: forall m. (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
|
||||
client AgentClient {rcvQ, sndQ, respQ, commands} = forever $ do
|
||||
t@(corrId, cAlias, cmd) <- atomically $ readTBQueue rcvQ
|
||||
processCommand t cmd >>= \case
|
||||
Left e -> atomically $ writeTBQueue sndQ (corrId, cAlias, ERR e)
|
||||
Right _ -> return ()
|
||||
where
|
||||
processCommand :: ACommand 'Client -> m (Either ErrorType ())
|
||||
processCommand _ = return $ Left PROHIBITED
|
||||
processCommand :: ATransmission 'Client -> ACommand 'Client -> m (Either ErrorType ())
|
||||
processCommand t = \case
|
||||
NEW SMPServer {host, port, keyHash} (AckMode mode) -> do
|
||||
cfg <- asks $ smpConfig . config
|
||||
srv <- newServerClient cfg respQ host port
|
||||
t <- mkSmpNEW t
|
||||
atomically $ writeTBQueue (smpSndQ srv) t
|
||||
liftIO $ putStrLn "sending NEW to server"
|
||||
liftIO $ print t
|
||||
return $ Right ()
|
||||
_ -> return $ Left PROHIBITED
|
||||
|
||||
mkSmpNEW :: ATransmission 'Client -> m SMP.Transmission
|
||||
mkSmpNEW t = do
|
||||
g <- asks idsDrg
|
||||
smpCorrId <- atomically $ CorrId <$> randomBytes 4 g
|
||||
recipientKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair
|
||||
let recipientPrivateKey = recipientKey
|
||||
toSMP = ("", (smpCorrId, "", Cmd SRecipient $ SMP.NEW recipientKey))
|
||||
req =
|
||||
Request
|
||||
{ fromClient = t,
|
||||
toSMP,
|
||||
state = NEWRequestState {recipientKey, recipientPrivateKey}
|
||||
}
|
||||
atomically . modifyTVar commands $ M.insert smpCorrId req
|
||||
return toSMP
|
||||
|
||||
processSmp :: MonadUnliftIO m => AgentClient -> m ()
|
||||
processSmp AgentClient {respQ, sndQ} = forever . atomically $ do
|
||||
readTBQueue respQ
|
||||
writeTBQueue sndQ (CorrId B.empty, B.empty, ERR UNKNOWN)
|
||||
processSmp AgentClient {respQ, sndQ, commands} = forever $ do
|
||||
(_, (smpCorrId, qId, cmdOrErr)) <- atomically $ readTBQueue respQ
|
||||
liftIO $ putStrLn "received from server"
|
||||
liftIO $ print (smpCorrId, qId, cmdOrErr)
|
||||
req <- atomically $ M.lookup smpCorrId <$> readTVar commands
|
||||
atomically $ case req of -- TODO empty correlation ID is ok - it can be a message
|
||||
Nothing -> writeTBQueue sndQ ("", "", ERR $ SMP smpErrCorrelationId)
|
||||
Just Request {fromClient = (corrId, cAlias, cmd), toSMP, state} -> do
|
||||
writeTBQueue sndQ (corrId, cAlias, ERR UNKNOWN)
|
||||
|
||||
@@ -12,8 +12,11 @@ import qualified Data.Map.Strict as M
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import Network.Socket (HostName, ServiceName)
|
||||
import Numeric.Natural
|
||||
import Simplex.Messaging.Agent.ServerClient
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Schema
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Server.Transmission (PublicKey)
|
||||
import qualified Simplex.Messaging.Server.Transmission as SMP
|
||||
import UnliftIO.STM
|
||||
|
||||
@@ -21,7 +24,8 @@ data AgentConfig = AgentConfig
|
||||
{ tcpPort :: ServiceName,
|
||||
tbqSize :: Natural,
|
||||
connIdBytes :: Int,
|
||||
dbFile :: String
|
||||
dbFile :: String,
|
||||
smpConfig :: ServerClientConfig
|
||||
}
|
||||
|
||||
data Env = Env
|
||||
@@ -33,13 +37,20 @@ data Env = Env
|
||||
data AgentClient = AgentClient
|
||||
{ rcvQ :: TBQueue (ATransmission Client),
|
||||
sndQ :: TBQueue (ATransmission Agent),
|
||||
respQ :: TBQueue (),
|
||||
servers :: Map (HostName, ServiceName) ServerClient
|
||||
respQ :: TBQueue SMP.TransmissionOrError,
|
||||
servers :: TVar (Map (HostName, ServiceName) ServerClient),
|
||||
commands :: TVar (Map SMP.CorrId Request)
|
||||
}
|
||||
|
||||
data ServerClient = ServerClient
|
||||
{ sndQ :: TBQueue SMP.Transmission,
|
||||
commands :: Map SMP.QueueId (TBQueue SMP.Cmd)
|
||||
data Request = Request
|
||||
{ fromClient :: ATransmission Client,
|
||||
toSMP :: SMP.Transmission,
|
||||
state :: RequestState
|
||||
}
|
||||
|
||||
data RequestState = NEWRequestState
|
||||
{ recipientKey :: PublicKey,
|
||||
recipientPrivateKey :: PrivateKey
|
||||
}
|
||||
|
||||
newAgentClient :: Natural -> STM AgentClient
|
||||
@@ -47,12 +58,9 @@ newAgentClient qSize = do
|
||||
rcvQ <- newTBQueue qSize
|
||||
sndQ <- newTBQueue qSize
|
||||
respQ <- newTBQueue qSize
|
||||
return AgentClient {rcvQ, sndQ, respQ, servers = M.empty}
|
||||
|
||||
newServerClient :: Natural -> STM ServerClient
|
||||
newServerClient qSize = do
|
||||
sndQ <- newTBQueue qSize
|
||||
return ServerClient {sndQ, commands = M.empty}
|
||||
servers <- newTVar M.empty
|
||||
commands <- newTVar M.empty
|
||||
return AgentClient {rcvQ, sndQ, respQ, servers, commands}
|
||||
|
||||
openDB :: MonadUnliftIO m => AgentConfig -> m DB.Connection
|
||||
openDB AgentConfig {dbFile} = liftIO $ do
|
||||
|
||||
@@ -0,0 +1,54 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.Messaging.Agent.ServerClient where
|
||||
|
||||
import Control.Monad.IO.Unlift
|
||||
import Data.Maybe
|
||||
import Network.Socket (HostName, ServiceName)
|
||||
import Numeric.Natural
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import qualified Simplex.Messaging.Server.Transmission as SMP
|
||||
import Simplex.Messaging.Transport
|
||||
import UnliftIO.Async
|
||||
import UnliftIO.IO
|
||||
import UnliftIO.STM
|
||||
|
||||
data ServerClientConfig = ServerClientConfig
|
||||
{ tcpPort :: ServiceName,
|
||||
tbqSize :: Natural,
|
||||
corrIdBytes :: Natural
|
||||
}
|
||||
|
||||
data ServerClient = ServerClient
|
||||
{ smpSndQ :: TBQueue SMP.Transmission,
|
||||
smpRcvQ :: TBQueue SMP.TransmissionOrError
|
||||
-- srvA :: Async ()
|
||||
}
|
||||
|
||||
newServerClient ::
|
||||
forall m.
|
||||
MonadUnliftIO m =>
|
||||
ServerClientConfig ->
|
||||
TBQueue SMP.TransmissionOrError ->
|
||||
HostName ->
|
||||
Maybe ServiceName ->
|
||||
m ServerClient
|
||||
newServerClient cfg smpRcvQ host port = do
|
||||
smpSndQ <- atomically . newTBQueue $ tbqSize cfg
|
||||
let c = ServerClient {smpSndQ, smpRcvQ}
|
||||
_srvA <- async $ runClient (fromMaybe (tcpPort cfg) port) c
|
||||
return c
|
||||
where
|
||||
runClient :: ServiceName -> ServerClient -> m ()
|
||||
runClient p c =
|
||||
runTCPClient host p $ \h -> do
|
||||
_line <- getLn h -- "Welcome to SMP"
|
||||
-- TODO test connection failure
|
||||
race_ (send h c) (receive h)
|
||||
|
||||
send :: Handle -> ServerClient -> m ()
|
||||
send h ServerClient {smpSndQ} = atomically (readTBQueue smpSndQ) >>= SMP.tPut h
|
||||
|
||||
receive :: Handle -> m ()
|
||||
receive h = SMP.tGet SMP.fromServer h >>= atomically . writeTBQueue smpRcvQ
|
||||
@@ -1,5 +1,4 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DeriveAnyClass #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
@@ -23,6 +22,7 @@ import Data.Time.Clock (UTCTime)
|
||||
import Data.Type.Equality
|
||||
import Data.Typeable ()
|
||||
import Network.Socket
|
||||
import Numeric.Natural
|
||||
import Simplex.Messaging.Server.Transmission (CorrId (..), Encoded, MsgBody, PublicKey, QueueId, errBadParameters, errMessageBody)
|
||||
import Simplex.Messaging.Transport
|
||||
import System.IO
|
||||
@@ -123,7 +123,7 @@ data MsgStatus = MsgOk | MsgError MsgErrorType
|
||||
data MsgErrorType = MsgSkipped AgentMsgId AgentMsgId | MsgBadId AgentMsgId | MsgBadHash
|
||||
deriving (Show)
|
||||
|
||||
data ErrorType = UNKNOWN | PROHIBITED | SYNTAX Int | SIZE -- etc. TODO SYNTAX Natural
|
||||
data ErrorType = UNKNOWN | PROHIBITED | SYNTAX Int | SMP Natural | SIZE -- etc. TODO SYNTAX Natural
|
||||
deriving (Show)
|
||||
|
||||
data AckStatus = AckOk | AckError AckErrorType
|
||||
@@ -138,6 +138,9 @@ errBadInvitation = 10
|
||||
errNoConnAlias :: Int
|
||||
errNoConnAlias = 11
|
||||
|
||||
smpErrCorrelationId :: Natural
|
||||
smpErrCorrelationId = 1
|
||||
|
||||
parseCommand :: ByteString -> Either ErrorType ACmd
|
||||
parseCommand command = case B.words command of
|
||||
["NEW", srv] -> newConn srv . Right $ AckMode On
|
||||
|
||||
@@ -10,7 +10,8 @@
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
module Simplex.Messaging.Server (runSMPServer) where
|
||||
-- TODO move randomBytes to another module
|
||||
module Simplex.Messaging.Server (runSMPServer, randomBytes) where
|
||||
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Monad
|
||||
|
||||
@@ -145,7 +145,7 @@ serializeCommand = \case
|
||||
type Encoded = ByteString
|
||||
|
||||
-- newtype to avoid accidentally changing order of transmission parts
|
||||
newtype CorrId = CorrId {bs :: ByteString} deriving (Eq, Show)
|
||||
newtype CorrId = CorrId {bs :: ByteString} deriving (Eq, Ord, Show)
|
||||
|
||||
instance IsString CorrId where
|
||||
fromString = CorrId . fromString
|
||||
|
||||
Reference in New Issue
Block a user