receiving messages and remaining client functions (#15)

* SMPClient queues for messages and notifications

* style

* SMPClient: put all messages (and uncorrelated server commands) to provided TBQueue
This commit is contained in:
Evgeny Poberezkin
2021-01-15 15:54:43 +00:00
committed by Efim Poberezkin
parent 5f59fcc969
commit 3efb15ecb3
3 changed files with 73 additions and 21 deletions
+12 -6
View File
@@ -48,7 +48,7 @@ connectClient :: MonadUnliftIO m => Handle -> AgentClient -> m ()
connectClient h c = race_ (send h c) (receive h c)
runClient :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
runClient c = race_ (processSmp c) (client c)
runClient c = race_ (subscriber c) (client c)
receive :: MonadUnliftIO m => Handle -> AgentClient -> m ()
receive h AgentClient {rcvQ, sndQ} =
@@ -95,7 +95,7 @@ processCommand ::
AgentClient ->
ATransmission 'Client ->
m ()
processCommand AgentClient {sndQ, smpClients} (corrId, connAlias, cmd) =
processCommand AgentClient {sndQ, msgQ, smpClients} (corrId, connAlias, cmd) =
case cmd of
NEW smpServer -> createNewConnection smpServer
JOIN smpQueueInfo replyMode -> joinConnection smpQueueInfo replyMode
@@ -164,7 +164,7 @@ processCommand AgentClient {sndQ, smpClients} (corrId, connAlias, cmd) =
newSMPClient :: m SMPClient
newSMPClient = do
cfg <- asks $ smpCfg . config
c <- liftIO (getSMPClient srv cfg) `E.catch` replyError (BROKER smpErrTCPConnection)
c <- liftIO (getSMPClient srv cfg msgQ) `E.catch` replyError (BROKER smpErrTCPConnection)
atomically . modifyTVar smpClients $ M.insert srv c
return c
@@ -177,8 +177,14 @@ processCommand AgentClient {sndQ, smpClients} (corrId, connAlias, cmd) =
respond :: ACommand 'Agent -> m ()
respond c = atomically $ writeTBQueue sndQ (corrId, connAlias, c)
processSmp :: MonadUnliftIO m => AgentClient -> m ()
processSmp AgentClient {respQ} = forever $ do
subscriber :: MonadUnliftIO m => AgentClient -> m ()
subscriber AgentClient {msgQ} = forever $ do
-- TODO this will only process messages and notifications
(_, (_smpCorrId, _qId, _cmdOrErr)) <- atomically $ readTBQueue respQ
(_srv, _qId, _cmd) <- atomically $ readTBQueue msgQ
-- case respOrErr of
-- Right (Cmd _ (MSG msgId ts msgBody)) ->
-- writeTBQueue messageQ (qId, Message {msgId, ts, msgBody})
-- Right (Cmd _ END) -> writeTBQueue endSubQ qId
-- -- TODO maybe have one more queue to write unexpected responses
-- _ -> return ()
return ()
+3 -5
View File
@@ -14,7 +14,6 @@ import Numeric.Natural
import Simplex.Messaging.Agent.Store.SQLite
import Simplex.Messaging.Agent.Transmission
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Server.Transmission as SMP
import UnliftIO.STM
data AgentConfig = AgentConfig
@@ -34,8 +33,7 @@ data Env = Env
data AgentClient = AgentClient
{ rcvQ :: TBQueue (ATransmission Client),
sndQ :: TBQueue (ATransmission Agent),
-- TODO rename, respQ is only for messages and notifications, not for responses
respQ :: TBQueue SMP.TransmissionOrError,
msgQ :: TBQueue SMPServerTransmission,
smpClients :: TVar (Map SMPServer SMPClient)
}
@@ -43,9 +41,9 @@ newAgentClient :: Natural -> STM AgentClient
newAgentClient qSize = do
rcvQ <- newTBQueue qSize
sndQ <- newTBQueue qSize
respQ <- newTBQueue qSize
msgQ <- newTBQueue qSize
smpClients <- newTVar M.empty
return AgentClient {rcvQ, sndQ, respQ, smpClients}
return AgentClient {rcvQ, sndQ, msgQ, smpClients}
newEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env
newEnv config = do
+58 -10
View File
@@ -1,3 +1,4 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
@@ -9,11 +10,17 @@ module Simplex.Messaging.Client
( SMPClient,
getSMPClient,
createSMPQueue,
subscribeSMPQueue,
secureSMPQueue,
sendSMPMessage,
ackSMPMessage,
sendSMPCommand,
suspendSMPQueue,
deleteSMPQueue,
SMPClientError (..),
SMPClientConfig (..),
smpDefaultConfig,
SMPServerTransmission,
)
where
@@ -21,6 +28,7 @@ import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Trans.Class
import Control.Monad.Trans.Except
import qualified Data.ByteString.Char8 as B
import Data.Map.Strict (Map)
@@ -36,12 +44,16 @@ import System.IO
data SMPClient = SMPClient
{ action :: Async (),
smpServer :: SMPServer,
clientCorrId :: TVar Natural,
sentCommands :: TVar (Map CorrId Request),
sndQ :: TBQueue Transmission,
rcvQ :: TBQueue TransmissionOrError
rcvQ :: TBQueue TransmissionOrError,
msgQ :: TBQueue SMPServerTransmission
}
type SMPServerTransmission = (SMPServer, RecipientId, Cmd)
data SMPClientConfig = SMPClientConfig
{ qSize :: Natural,
defaultPort :: ServiceName
@@ -55,14 +67,20 @@ data Request = Request
responseVar :: TMVar (Either SMPClientError Cmd)
}
getSMPClient :: SMPServer -> SMPClientConfig -> IO SMPClient
getSMPClient SMPServer {host, port} SMPClientConfig {qSize, defaultPort} = do
c <-
atomically $
SMPClient undefined <$> newTVar 0 <*> newTVar M.empty <*> newTBQueue qSize <*> newTBQueue qSize
getSMPClient :: SMPServer -> SMPClientConfig -> TBQueue SMPServerTransmission -> IO SMPClient
getSMPClient smpServer@SMPServer {host, port} SMPClientConfig {qSize, defaultPort} msgQ = do
c <- atomically mkSMPClient
action <- async $ runTCPClient host (fromMaybe defaultPort port) (client c)
return c {action}
where
mkSMPClient :: STM SMPClient
mkSMPClient = do
clientCorrId <- newTVar 0
sentCommands <- newTVar M.empty
sndQ <- newTBQueue qSize
rcvQ <- newTBQueue qSize
return SMPClient {action = undefined, smpServer, clientCorrId, sentCommands, sndQ, rcvQ, msgQ}
client :: SMPClient -> Handle -> IO ()
client c h = do
_line <- getLn h -- "Welcome to SMP"
@@ -80,7 +98,9 @@ getSMPClient SMPServer {host, port} SMPClientConfig {qSize, defaultPort} = do
(_, (corrId, qId, respOrErr)) <- readTBQueue rcvQ
cs <- readTVar sentCommands
case M.lookup corrId cs of
Nothing -> return () -- TODO send to message channel or error channel
Nothing -> case respOrErr of
Right resp -> writeTBQueue msgQ (smpServer, qId, resp)
Left _ -> return ()
Just Request {queueId, responseVar} -> do
modifyTVar sentCommands $ M.delete corrId
putTMVar responseVar $
@@ -101,14 +121,42 @@ data SMPClientError
deriving (Eq, Show, Exception)
createSMPQueue :: SMPClient -> RecipientKey -> ExceptT SMPClientError IO (RecipientId, SenderId)
createSMPQueue c rKey = do
createSMPQueue c rKey =
sendSMPCommand c "" "" (Cmd SRecipient $ NEW rKey) >>= \case
Cmd _ (IDS rId sId) -> return (rId, sId)
_ -> throwE SMPUnexpectedResponse
subscribeSMPQueue :: SMPClient -> RecipientKey -> QueueId -> ExceptT SMPClientError IO ()
subscribeSMPQueue c@SMPClient {smpServer, msgQ} rKey rId =
sendSMPCommand c rKey rId (Cmd SRecipient SUB) >>= \case
Cmd _ OK -> return ()
cmd@(Cmd _ MSG {}) ->
lift . atomically $ writeTBQueue msgQ (smpServer, rId, cmd)
_ -> throwE SMPUnexpectedResponse
secureSMPQueue :: SMPClient -> RecipientKey -> QueueId -> SenderKey -> ExceptT SMPClientError IO ()
secureSMPQueue c rKey rId senderKey = okSMPCommand (Cmd SRecipient $ KEY senderKey) c rKey rId
sendSMPMessage :: SMPClient -> SenderKey -> QueueId -> MsgBody -> ExceptT SMPClientError IO ()
sendSMPMessage c sKey qId msg = do
sendSMPCommand c sKey qId (Cmd SSender $ SEND msg) >>= \case
sendSMPMessage c sKey sId msg = okSMPCommand (Cmd SSender $ SEND msg) c sKey sId
ackSMPMessage :: SMPClient -> RecipientKey -> QueueId -> ExceptT SMPClientError IO ()
ackSMPMessage c@SMPClient {smpServer, msgQ} rKey rId =
sendSMPCommand c rKey rId (Cmd SRecipient ACK) >>= \case
Cmd _ OK -> return ()
cmd@(Cmd _ MSG {}) ->
lift . atomically $ writeTBQueue msgQ (smpServer, rId, cmd)
_ -> throwE SMPUnexpectedResponse
suspendSMPQueue :: SMPClient -> RecipientKey -> QueueId -> ExceptT SMPClientError IO ()
suspendSMPQueue = okSMPCommand $ Cmd SRecipient OFF
deleteSMPQueue :: SMPClient -> RecipientKey -> QueueId -> ExceptT SMPClientError IO ()
deleteSMPQueue = okSMPCommand $ Cmd SRecipient DEL
okSMPCommand :: Cmd -> SMPClient -> PrivateKey -> QueueId -> ExceptT SMPClientError IO ()
okSMPCommand cmd c pKey qId =
sendSMPCommand c pKey qId cmd >>= \case
Cmd _ OK -> return ()
_ -> throwE SMPUnexpectedResponse