From 3efb15ecb33ec0d4f42f9bcef9087bad96193cd5 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Fri, 15 Jan 2021 15:54:43 +0000 Subject: [PATCH] receiving messages and remaining client functions (#15) * SMPClient queues for messages and notifications * style * SMPClient: put all messages (and uncorrelated server commands) to provided TBQueue --- src/Simplex/Messaging/Agent.hs | 18 ++++-- src/Simplex/Messaging/Agent/Env/SQLite.hs | 8 +-- src/Simplex/Messaging/Client.hs | 68 +++++++++++++++++++---- 3 files changed, 73 insertions(+), 21 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index b703ed6c4..ea07a1670 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -48,7 +48,7 @@ connectClient :: MonadUnliftIO m => Handle -> AgentClient -> m () connectClient h c = race_ (send h c) (receive h c) runClient :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () -runClient c = race_ (processSmp c) (client c) +runClient c = race_ (subscriber c) (client c) receive :: MonadUnliftIO m => Handle -> AgentClient -> m () receive h AgentClient {rcvQ, sndQ} = @@ -95,7 +95,7 @@ processCommand :: AgentClient -> ATransmission 'Client -> m () -processCommand AgentClient {sndQ, smpClients} (corrId, connAlias, cmd) = +processCommand AgentClient {sndQ, msgQ, smpClients} (corrId, connAlias, cmd) = case cmd of NEW smpServer -> createNewConnection smpServer JOIN smpQueueInfo replyMode -> joinConnection smpQueueInfo replyMode @@ -164,7 +164,7 @@ processCommand AgentClient {sndQ, smpClients} (corrId, connAlias, cmd) = newSMPClient :: m SMPClient newSMPClient = do cfg <- asks $ smpCfg . config - c <- liftIO (getSMPClient srv cfg) `E.catch` replyError (BROKER smpErrTCPConnection) + c <- liftIO (getSMPClient srv cfg msgQ) `E.catch` replyError (BROKER smpErrTCPConnection) atomically . modifyTVar smpClients $ M.insert srv c return c @@ -177,8 +177,14 @@ processCommand AgentClient {sndQ, smpClients} (corrId, connAlias, cmd) = respond :: ACommand 'Agent -> m () respond c = atomically $ writeTBQueue sndQ (corrId, connAlias, c) -processSmp :: MonadUnliftIO m => AgentClient -> m () -processSmp AgentClient {respQ} = forever $ do +subscriber :: MonadUnliftIO m => AgentClient -> m () +subscriber AgentClient {msgQ} = forever $ do -- TODO this will only process messages and notifications - (_, (_smpCorrId, _qId, _cmdOrErr)) <- atomically $ readTBQueue respQ + (_srv, _qId, _cmd) <- atomically $ readTBQueue msgQ + -- case respOrErr of + -- Right (Cmd _ (MSG msgId ts msgBody)) -> + -- writeTBQueue messageQ (qId, Message {msgId, ts, msgBody}) + -- Right (Cmd _ END) -> writeTBQueue endSubQ qId + -- -- TODO maybe have one more queue to write unexpected responses + -- _ -> return () return () diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 89f84fc48..76b61ee3c 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -14,7 +14,6 @@ import Numeric.Natural import Simplex.Messaging.Agent.Store.SQLite import Simplex.Messaging.Agent.Transmission import Simplex.Messaging.Client -import qualified Simplex.Messaging.Server.Transmission as SMP import UnliftIO.STM data AgentConfig = AgentConfig @@ -34,8 +33,7 @@ data Env = Env data AgentClient = AgentClient { rcvQ :: TBQueue (ATransmission Client), sndQ :: TBQueue (ATransmission Agent), - -- TODO rename, respQ is only for messages and notifications, not for responses - respQ :: TBQueue SMP.TransmissionOrError, + msgQ :: TBQueue SMPServerTransmission, smpClients :: TVar (Map SMPServer SMPClient) } @@ -43,9 +41,9 @@ newAgentClient :: Natural -> STM AgentClient newAgentClient qSize = do rcvQ <- newTBQueue qSize sndQ <- newTBQueue qSize - respQ <- newTBQueue qSize + msgQ <- newTBQueue qSize smpClients <- newTVar M.empty - return AgentClient {rcvQ, sndQ, respQ, smpClients} + return AgentClient {rcvQ, sndQ, msgQ, smpClients} newEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env newEnv config = do diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index f13d33bad..ac80d6c74 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} @@ -9,11 +10,17 @@ module Simplex.Messaging.Client ( SMPClient, getSMPClient, createSMPQueue, + subscribeSMPQueue, + secureSMPQueue, sendSMPMessage, + ackSMPMessage, sendSMPCommand, + suspendSMPQueue, + deleteSMPQueue, SMPClientError (..), SMPClientConfig (..), smpDefaultConfig, + SMPServerTransmission, ) where @@ -21,6 +28,7 @@ import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception import Control.Monad +import Control.Monad.Trans.Class import Control.Monad.Trans.Except import qualified Data.ByteString.Char8 as B import Data.Map.Strict (Map) @@ -36,12 +44,16 @@ import System.IO data SMPClient = SMPClient { action :: Async (), + smpServer :: SMPServer, clientCorrId :: TVar Natural, sentCommands :: TVar (Map CorrId Request), sndQ :: TBQueue Transmission, - rcvQ :: TBQueue TransmissionOrError + rcvQ :: TBQueue TransmissionOrError, + msgQ :: TBQueue SMPServerTransmission } +type SMPServerTransmission = (SMPServer, RecipientId, Cmd) + data SMPClientConfig = SMPClientConfig { qSize :: Natural, defaultPort :: ServiceName @@ -55,14 +67,20 @@ data Request = Request responseVar :: TMVar (Either SMPClientError Cmd) } -getSMPClient :: SMPServer -> SMPClientConfig -> IO SMPClient -getSMPClient SMPServer {host, port} SMPClientConfig {qSize, defaultPort} = do - c <- - atomically $ - SMPClient undefined <$> newTVar 0 <*> newTVar M.empty <*> newTBQueue qSize <*> newTBQueue qSize +getSMPClient :: SMPServer -> SMPClientConfig -> TBQueue SMPServerTransmission -> IO SMPClient +getSMPClient smpServer@SMPServer {host, port} SMPClientConfig {qSize, defaultPort} msgQ = do + c <- atomically mkSMPClient action <- async $ runTCPClient host (fromMaybe defaultPort port) (client c) return c {action} where + mkSMPClient :: STM SMPClient + mkSMPClient = do + clientCorrId <- newTVar 0 + sentCommands <- newTVar M.empty + sndQ <- newTBQueue qSize + rcvQ <- newTBQueue qSize + return SMPClient {action = undefined, smpServer, clientCorrId, sentCommands, sndQ, rcvQ, msgQ} + client :: SMPClient -> Handle -> IO () client c h = do _line <- getLn h -- "Welcome to SMP" @@ -80,7 +98,9 @@ getSMPClient SMPServer {host, port} SMPClientConfig {qSize, defaultPort} = do (_, (corrId, qId, respOrErr)) <- readTBQueue rcvQ cs <- readTVar sentCommands case M.lookup corrId cs of - Nothing -> return () -- TODO send to message channel or error channel + Nothing -> case respOrErr of + Right resp -> writeTBQueue msgQ (smpServer, qId, resp) + Left _ -> return () Just Request {queueId, responseVar} -> do modifyTVar sentCommands $ M.delete corrId putTMVar responseVar $ @@ -101,14 +121,42 @@ data SMPClientError deriving (Eq, Show, Exception) createSMPQueue :: SMPClient -> RecipientKey -> ExceptT SMPClientError IO (RecipientId, SenderId) -createSMPQueue c rKey = do +createSMPQueue c rKey = sendSMPCommand c "" "" (Cmd SRecipient $ NEW rKey) >>= \case Cmd _ (IDS rId sId) -> return (rId, sId) _ -> throwE SMPUnexpectedResponse +subscribeSMPQueue :: SMPClient -> RecipientKey -> QueueId -> ExceptT SMPClientError IO () +subscribeSMPQueue c@SMPClient {smpServer, msgQ} rKey rId = + sendSMPCommand c rKey rId (Cmd SRecipient SUB) >>= \case + Cmd _ OK -> return () + cmd@(Cmd _ MSG {}) -> + lift . atomically $ writeTBQueue msgQ (smpServer, rId, cmd) + _ -> throwE SMPUnexpectedResponse + +secureSMPQueue :: SMPClient -> RecipientKey -> QueueId -> SenderKey -> ExceptT SMPClientError IO () +secureSMPQueue c rKey rId senderKey = okSMPCommand (Cmd SRecipient $ KEY senderKey) c rKey rId + sendSMPMessage :: SMPClient -> SenderKey -> QueueId -> MsgBody -> ExceptT SMPClientError IO () -sendSMPMessage c sKey qId msg = do - sendSMPCommand c sKey qId (Cmd SSender $ SEND msg) >>= \case +sendSMPMessage c sKey sId msg = okSMPCommand (Cmd SSender $ SEND msg) c sKey sId + +ackSMPMessage :: SMPClient -> RecipientKey -> QueueId -> ExceptT SMPClientError IO () +ackSMPMessage c@SMPClient {smpServer, msgQ} rKey rId = + sendSMPCommand c rKey rId (Cmd SRecipient ACK) >>= \case + Cmd _ OK -> return () + cmd@(Cmd _ MSG {}) -> + lift . atomically $ writeTBQueue msgQ (smpServer, rId, cmd) + _ -> throwE SMPUnexpectedResponse + +suspendSMPQueue :: SMPClient -> RecipientKey -> QueueId -> ExceptT SMPClientError IO () +suspendSMPQueue = okSMPCommand $ Cmd SRecipient OFF + +deleteSMPQueue :: SMPClient -> RecipientKey -> QueueId -> ExceptT SMPClientError IO () +deleteSMPQueue = okSMPCommand $ Cmd SRecipient DEL + +okSMPCommand :: Cmd -> SMPClient -> PrivateKey -> QueueId -> ExceptT SMPClientError IO () +okSMPCommand cmd c pKey qId = + sendSMPCommand c pKey qId cmd >>= \case Cmd _ OK -> return () _ -> throwE SMPUnexpectedResponse