Files
simplexmq/src/Simplex/Messaging/Client.hs
2022-04-11 10:24:02 +01:00

386 lines
15 KiB
Haskell

{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
-- |
-- Module : Simplex.Messaging.Client
-- Copyright : (c) simplex.chat
-- License : AGPL-3
--
-- Maintainer : chat@simplex.chat
-- Stability : experimental
-- Portability : non-portable
--
-- This module provides a functional client API for SMP protocol.
--
-- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md
module Simplex.Messaging.Client
( -- * Connect (disconnect) client to (from) SMP server
SMPClient,
getSMPClient,
closeSMPClient,
-- * SMP protocol command functions
createSMPQueue,
subscribeSMPQueue,
subscribeSMPQueueNotifications,
secureSMPQueue,
enableSMPQueueNotifications,
sendSMPMessage,
ackSMPMessage,
suspendSMPQueue,
deleteSMPQueue,
sendSMPCommand,
-- * Supporting types and client configuration
SMPClientError (..),
SMPClientConfig (..),
smpDefaultConfig,
SMPServerTransmission,
)
where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Trans.Class
import Control.Monad.Trans.Except
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Maybe (fromMaybe)
import Network.Socket (ServiceName)
import Numeric.Natural
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport (..), THandle (..), TLS, TProxy, Transport (..), TransportError, clientHandshake)
import Simplex.Messaging.Transport.Client (runTransportClient)
import Simplex.Messaging.Transport.KeepAlive
import Simplex.Messaging.Transport.WebSockets (WS)
import Simplex.Messaging.Util (bshow, liftError, raceAny_)
import System.Timeout (timeout)
-- | 'SMPClient' is a handle used to send commands to a specific SMP server.
--
-- The only exported selector is blockSize that is negotiated
-- with the server during the TCP transport handshake.
--
-- Use 'getSMPClient' to connect to an SMP server and create a client handle.
data SMPClient = SMPClient
{ action :: Async (),
connected :: TVar Bool,
sessionId :: ByteString,
smpServer :: SMPServer,
tcpTimeout :: Int,
smpPingFailures :: TVar Int,
clientCorrId :: TVar Natural,
sentCommands :: TMap CorrId Request,
sndQ :: TBQueue SentRawTransmission,
rcvQ :: TBQueue (SignedTransmission BrokerMsg),
msgQ :: TBQueue SMPServerTransmission
}
-- | Type synonym for transmission from some SPM server queue.
type SMPServerTransmission = (SMPServer, RecipientId, BrokerMsg)
-- | SMP client configuration.
data SMPClientConfig = SMPClientConfig
{ -- | size of TBQueue to use for server commands and responses
qSize :: Natural,
-- | default SMP server port if port is not specified in SMPServer
defaultTransport :: (ServiceName, ATransport),
-- | timeout of TCP commands (microseconds)
tcpTimeout :: Int,
-- | TCP keep-alive options, Nothing to skip enabling keep-alive
tcpKeepAlive :: Maybe KeepAliveOpts,
-- | period for SMP ping commands (microseconds)
smpPing :: Int,
-- | timpeout for SMP pings (microseconds)
smpPingTimeout :: Int,
-- | failed pings count
smpPingFailLimit :: Int
}
-- | Default SMP client configuration.
smpDefaultConfig :: SMPClientConfig
smpDefaultConfig =
SMPClientConfig
{ qSize = 64,
defaultTransport = ("5223", transport @TLS),
tcpTimeout = 5_000_000,
tcpKeepAlive = Just defaultKeepAliveOpts,
smpPing = 300_000_000, -- 5 min,
smpPingTimeout = 10_000_000,
smpPingFailLimit = 3
}
data Request = Request
{ queueId :: QueueId,
responseVar :: TMVar Response
}
type Response = Either SMPClientError BrokerMsg
-- | Connects to 'SMPServer' using passed client configuration
-- and queue for messages and notifications.
--
-- A single queue can be used for multiple 'SMPClient' instances,
-- as 'SMPServerTransmission' includes server information.
getSMPClient :: SMPServer -> SMPClientConfig -> TBQueue SMPServerTransmission -> IO () -> IO (Either SMPClientError SMPClient)
getSMPClient smpServer cfg@SMPClientConfig {qSize, tcpTimeout, smpPingTimeout, tcpKeepAlive, smpPing, smpPingFailLimit} msgQ disconnected =
atomically mkSMPClient >>= runClient useTransport
where
mkSMPClient :: STM SMPClient
mkSMPClient = do
connected <- newTVar False
smpPingFailures <- newTVar smpPingFailLimit
clientCorrId <- newTVar 0
sentCommands <- TM.empty
sndQ <- newTBQueue qSize
rcvQ <- newTBQueue qSize
return
SMPClient
{ action = undefined,
sessionId = undefined,
connected,
smpServer,
tcpTimeout,
smpPingFailures,
clientCorrId,
sentCommands,
sndQ,
rcvQ,
msgQ
}
runClient :: (ServiceName, ATransport) -> SMPClient -> IO (Either SMPClientError SMPClient)
runClient (port', ATransport t) c = do
thVar <- newEmptyTMVarIO
action <-
async $
runTransportClient (host smpServer) port' (keyHash smpServer) tcpKeepAlive (client t c thVar)
`finally` atomically (putTMVar thVar $ Left SMPNetworkError)
th_ <- tcpTimeout `timeout` atomically (takeTMVar thVar)
pure $ case th_ of
Just (Right THandle {sessionId}) -> Right c {action, sessionId}
Just (Left e) -> Left e
Nothing -> Left SMPNetworkError
useTransport :: (ServiceName, ATransport)
useTransport = case port smpServer of
"" -> defaultTransport cfg
"80" -> ("80", transport @WS)
p -> (p, transport @TLS)
client :: forall c. Transport c => TProxy c -> SMPClient -> TMVar (Either SMPClientError (THandle c)) -> c -> IO ()
client _ c thVar h =
runExceptT (clientHandshake h $ keyHash smpServer) >>= \case
Left e -> atomically . putTMVar thVar . Left $ SMPTransportError e
Right th@THandle {sessionId} -> do
atomically $ do
writeTVar (connected c) True
putTMVar thVar $ Right th
let c' = c {sessionId} :: SMPClient
raceAny_ [send c' th, process c', receive c' th, ping c']
`finally` disconnected
send :: Transport c => SMPClient -> THandle c -> IO ()
send SMPClient {sndQ} h = forever $ atomically (readTBQueue sndQ) >>= tPut h
receive :: Transport c => SMPClient -> THandle c -> IO ()
receive SMPClient {rcvQ} h = forever $ tGet h >>= atomically . writeTBQueue rcvQ
ping :: SMPClient -> IO ()
ping c@SMPClient {smpPingFailures} = forever $ do
threadDelay smpPing
runExceptT (sendSMPCommand c Nothing "" PING $ Just smpPingTimeout) >>= \case
Right _ -> atomically $ writeTVar smpPingFailures smpPingFailLimit
Left e -> do
n <- atomically $ stateTVar smpPingFailures $ \n -> (n - 1, n - 1)
when (n == 0) $ throwIO e
process :: SMPClient -> IO ()
process SMPClient {rcvQ, sentCommands} = forever $ do
(_, _, (corrId, qId, respOrErr)) <- atomically $ readTBQueue rcvQ
if B.null $ bs corrId
then sendMsg qId respOrErr
else do
atomically (TM.lookup corrId sentCommands) >>= \case
Nothing -> sendMsg qId respOrErr
Just Request {queueId, responseVar} -> atomically $ do
TM.delete corrId sentCommands
putTMVar responseVar $
if queueId == qId
then case respOrErr of
Left e -> Left $ SMPResponseError e
Right (ERR e) -> Left $ SMPServerError e
Right r -> Right r
else Left SMPUnexpectedResponse
sendMsg :: QueueId -> Either ErrorType BrokerMsg -> IO ()
sendMsg qId = \case
Right cmd -> atomically $ writeTBQueue msgQ (smpServer, qId, cmd)
-- TODO send everything else to errQ and log in agent
_ -> return ()
-- | Disconnects SMP client from the server and terminates client threads.
closeSMPClient :: SMPClient -> IO ()
closeSMPClient = uninterruptibleCancel . action
-- | SMP client error type.
data SMPClientError
= -- | Correctly parsed SMP server ERR response.
-- This error is forwarded to the agent client as `ERR SMP err`.
SMPServerError ErrorType
| -- | Invalid server response that failed to parse.
-- Forwarded to the agent client as `ERR BROKER RESPONSE`.
SMPResponseError ErrorType
| -- | Different response from what is expected to a certain SMP command,
-- e.g. server should respond `IDS` or `ERR` to `NEW` command,
-- other responses would result in this error.
-- Forwarded to the agent client as `ERR BROKER UNEXPECTED`.
SMPUnexpectedResponse
| -- | Used for TCP connection and command response timeouts.
-- Forwarded to the agent client as `ERR BROKER TIMEOUT`.
SMPResponseTimeout
| -- | Failure to establish TCP connection.
-- Forwarded to the agent client as `ERR BROKER NETWORK`.
SMPNetworkError
| -- | TCP transport handshake or some other transport error.
-- Forwarded to the agent client as `ERR BROKER TRANSPORT e`.
SMPTransportError TransportError
| -- | Error when cryptographically "signing" the command.
SMPSignatureError C.CryptoError
deriving (Eq, Show, Exception)
-- | Create a new SMP queue.
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#create-queue-command
createSMPQueue ::
SMPClient ->
RcvPrivateSignKey ->
RcvPublicVerifyKey ->
RcvPublicDhKey ->
ExceptT SMPClientError IO QueueIdsKeys
createSMPQueue c rpKey rKey dhKey =
sendSMPCommand c (Just rpKey) "" (NEW rKey dhKey) Nothing >>= \case
IDS qik -> pure qik
_ -> throwE SMPUnexpectedResponse
-- | Subscribe to the SMP queue.
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#subscribe-to-queue
subscribeSMPQueue :: SMPClient -> RcvPrivateSignKey -> RecipientId -> ExceptT SMPClientError IO ()
subscribeSMPQueue c@SMPClient {smpServer, msgQ} rpKey rId =
sendSMPCommand c (Just rpKey) rId SUB Nothing >>= \case
OK -> return ()
cmd@MSG {} ->
lift . atomically $ writeTBQueue msgQ (smpServer, rId, cmd)
_ -> throwE SMPUnexpectedResponse
-- | Subscribe to the SMP queue notifications.
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#subscribe-to-queue-notifications
subscribeSMPQueueNotifications :: SMPClient -> NtfPrivateSignKey -> NotifierId -> ExceptT SMPClientError IO ()
subscribeSMPQueueNotifications = okSMPCommand NSUB
-- | Secure the SMP queue by adding a sender public key.
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#secure-queue-command
secureSMPQueue :: SMPClient -> RcvPrivateSignKey -> RecipientId -> SndPublicVerifyKey -> ExceptT SMPClientError IO ()
secureSMPQueue c rpKey rId senderKey = okSMPCommand (KEY senderKey) c rpKey rId
-- | Enable notifications for the queue for push notifications server.
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#enable-notifications-command
enableSMPQueueNotifications :: SMPClient -> RcvPrivateSignKey -> RecipientId -> NtfPublicVerifyKey -> ExceptT SMPClientError IO NotifierId
enableSMPQueueNotifications c rpKey rId notifierKey =
sendSMPCommand c (Just rpKey) rId (NKEY notifierKey) Nothing >>= \case
NID nId -> pure nId
_ -> throwE SMPUnexpectedResponse
-- | Send SMP message.
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#send-message
sendSMPMessage :: SMPClient -> Maybe SndPrivateSignKey -> SenderId -> MsgBody -> ExceptT SMPClientError IO ()
sendSMPMessage c spKey sId msg =
sendSMPCommand c spKey sId (SEND msg) Nothing >>= \case
OK -> pure ()
_ -> throwE SMPUnexpectedResponse
-- | Acknowledge message delivery (server deletes the message).
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#acknowledge-message-delivery
ackSMPMessage :: SMPClient -> RcvPrivateSignKey -> QueueId -> ExceptT SMPClientError IO ()
ackSMPMessage c@SMPClient {smpServer, msgQ} rpKey rId =
sendSMPCommand c (Just rpKey) rId ACK Nothing >>= \case
OK -> return ()
cmd@MSG {} ->
lift . atomically $ writeTBQueue msgQ (smpServer, rId, cmd)
_ -> throwE SMPUnexpectedResponse
-- | Irreversibly suspend SMP queue.
-- The existing messages from the queue will still be delivered.
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#suspend-queue
suspendSMPQueue :: SMPClient -> RcvPrivateSignKey -> QueueId -> ExceptT SMPClientError IO ()
suspendSMPQueue = okSMPCommand OFF
-- | Irreversibly delete SMP queue and all messages in it.
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#delete-queue
deleteSMPQueue :: SMPClient -> RcvPrivateSignKey -> QueueId -> ExceptT SMPClientError IO ()
deleteSMPQueue = okSMPCommand DEL
okSMPCommand :: PartyI p => Command p -> SMPClient -> C.APrivateSignKey -> QueueId -> ExceptT SMPClientError IO ()
okSMPCommand cmd c pKey qId =
sendSMPCommand c (Just pKey) qId cmd Nothing >>= \case
OK -> return ()
_ -> throwE SMPUnexpectedResponse
-- | Send SMP command
-- TODO sign all requests (SEND of SMP confirmation would be signed with the same key that is passed to the recipient)
sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateSignKey -> QueueId -> Command p -> Maybe Int -> ExceptT SMPClientError IO BrokerMsg
sendSMPCommand SMPClient {sndQ, sentCommands, clientCorrId, sessionId, tcpTimeout} pKey qId cmd cmdTimeout_ = do
corrId <- lift_ getNextCorrId
t <- signTransmission $ encodeTransmission sessionId (corrId, qId, cmd)
ExceptT $ sendRecv corrId t
where
lift_ :: STM a -> ExceptT SMPClientError IO a
lift_ action = ExceptT $ Right <$> atomically action
getNextCorrId :: STM CorrId
getNextCorrId = do
i <- stateTVar clientCorrId $ \i -> (i, i + 1)
pure . CorrId $ bshow i
signTransmission :: ByteString -> ExceptT SMPClientError IO SentRawTransmission
signTransmission t = case pKey of
Nothing -> return (Nothing, t)
Just pk -> do
sig <- liftError SMPSignatureError $ C.sign pk t
return (Just sig, t)
-- two separate "atomically" needed to avoid blocking
sendRecv :: CorrId -> SentRawTransmission -> IO Response
sendRecv corrId t = atomically (send corrId t) >>= withTimeout . atomically . takeTMVar
where
withTimeout a = fromMaybe (Left SMPResponseTimeout) <$> timeout (fromMaybe tcpTimeout cmdTimeout_) a
send :: CorrId -> SentRawTransmission -> STM (TMVar Response)
send corrId t = do
r <- newEmptyTMVar
TM.insert corrId (Request qId r) sentCommands
writeTBQueue sndQ t
return r