mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 01:35:22 +00:00
Receive messages (#17)
* framework to parse and process agent messages * update SMPClient functions to accept private key * process messages (WIP) * agent: create reply queue and send REPLY message with qInfo * refactor agent commands * refactor processSMPTransmission * agent: logic to process REPLY message
This commit is contained in:
committed by
Efim Poberezkin
parent
c1f15c9d93
commit
10fb667ff3
+210
-73
@@ -15,7 +15,9 @@ import Control.Monad.IO.Unlift
|
||||
import Control.Monad.Reader
|
||||
import Crypto.Random
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.Map as M
|
||||
import Data.Time.Clock
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite
|
||||
@@ -23,10 +25,11 @@ import Simplex.Messaging.Agent.Store.Types
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Client
|
||||
import Simplex.Messaging.Server (randomBytes)
|
||||
import Simplex.Messaging.Server.Transmission (PublicKey)
|
||||
import Simplex.Messaging.Server.Transmission (PrivateKey, PublicKey)
|
||||
import qualified Simplex.Messaging.Server.Transmission as SMP
|
||||
import Simplex.Messaging.Transport
|
||||
import UnliftIO.Async
|
||||
import UnliftIO.Concurrent
|
||||
import UnliftIO.Exception (SomeException)
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.IO
|
||||
@@ -95,96 +98,230 @@ processCommand ::
|
||||
AgentClient ->
|
||||
ATransmission 'Client ->
|
||||
m ()
|
||||
processCommand AgentClient {sndQ, msgQ, smpClients} (corrId, connAlias, cmd) =
|
||||
processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
|
||||
case cmd of
|
||||
NEW smpServer -> createNewConnection smpServer
|
||||
JOIN smpQueueInfo replyMode -> joinConnection smpQueueInfo replyMode
|
||||
_ -> throwError PROHIBITED
|
||||
where
|
||||
createNewConnection :: SMPServer -> m ()
|
||||
createNewConnection smpServer = do
|
||||
c <- getSMPServerClient smpServer
|
||||
createNewConnection server = do
|
||||
-- TODO create connection alias if not passed
|
||||
-- make connAlias Maybe?
|
||||
(rcvQueue, qInfo) <- newReceiveQueue server
|
||||
withStore $ \st -> createRcvConn st connAlias rcvQueue
|
||||
respond $ INV qInfo
|
||||
|
||||
joinConnection :: SMPQueueInfo -> ReplyMode -> m ()
|
||||
joinConnection qInfo replyMode = do
|
||||
-- TODO create connection alias if not passed
|
||||
-- make connAlias Maybe?
|
||||
(sndQueue, senderKey) <- newSendQueue qInfo
|
||||
withStore $ \st -> createSndConn st connAlias sndQueue
|
||||
sendConfirmation c sndQueue senderKey
|
||||
sendHello c sndQueue
|
||||
sendReplyQueue sndQueue replyMode
|
||||
respond OK
|
||||
|
||||
sendReplyQueue :: SendQueue -> ReplyMode -> m ()
|
||||
sendReplyQueue sndQueue = \case
|
||||
ReplyOff -> return ()
|
||||
ReplyOn srv -> do
|
||||
(rcvQueue, qInfo) <- newReceiveQueue srv
|
||||
withStore $ \st -> addRcvQueue st connAlias rcvQueue
|
||||
sendAgentMessage sndQueue $ REPLY qInfo
|
||||
|
||||
newReceiveQueue :: SMPServer -> m (ReceiveQueue, SMPQueueInfo)
|
||||
newReceiveQueue server = do
|
||||
smp <- getSMPServerClient c server
|
||||
g <- asks idsDrg
|
||||
recipientKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair
|
||||
let rcvPrivateKey = recipientKey
|
||||
(recipientId, senderId) <- liftSMP $ createSMPQueue c recipientKey
|
||||
(rcvId, sId) <- liftSMP $ createSMPQueue smp rcvPrivateKey recipientKey
|
||||
encryptKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair
|
||||
let decryptKey = encryptKey
|
||||
withStore $ \st ->
|
||||
createRcvConn st connAlias $
|
||||
ReceiveQueue
|
||||
{ server = smpServer,
|
||||
rcvId = recipientId,
|
||||
rcvPrivateKey,
|
||||
sndId = Just senderId,
|
||||
sndKey = Nothing,
|
||||
decryptKey,
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
respond . INV $ SMPQueueInfo smpServer senderId encryptKey
|
||||
rcvQueue =
|
||||
ReceiveQueue
|
||||
{ server,
|
||||
rcvId,
|
||||
rcvPrivateKey,
|
||||
sndId = Just sId,
|
||||
sndKey = Nothing,
|
||||
decryptKey,
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
return (rcvQueue, SMPQueueInfo server sId encryptKey)
|
||||
|
||||
joinConnection :: SMPQueueInfo -> ReplyMode -> m ()
|
||||
joinConnection (SMPQueueInfo smpServer senderId encryptKey) _ = do
|
||||
c <- getSMPServerClient smpServer
|
||||
g <- asks idsDrg
|
||||
senderKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair
|
||||
verifyKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair
|
||||
-- TODO create connection with NEW status, it will be upgraded to CONFIRMED status once SMP server replies OK to SEND
|
||||
msg <- mkConfirmation encryptKey senderKey
|
||||
let sndPrivateKey = senderKey
|
||||
signKey = verifyKey
|
||||
withStore $ \st ->
|
||||
createSndConn st connAlias $
|
||||
SendQueue
|
||||
{ server = smpServer,
|
||||
sndId = senderId,
|
||||
sndPrivateKey,
|
||||
encryptKey,
|
||||
signKey,
|
||||
-- verifyKey,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
liftSMP $ sendSMPMessage c "" senderId msg
|
||||
withStore $ \st -> updateQueueStatus st connAlias SND Confirmed
|
||||
respond OK
|
||||
sendAgentMessage :: SendQueue -> AMessage -> m ()
|
||||
sendAgentMessage SendQueue {server, sndId, sndPrivateKey, encryptKey} agentMsg = do
|
||||
smp <- getSMPServerClient c server
|
||||
msg <- mkAgentMessage encryptKey agentMsg
|
||||
liftSMP $ sendSMPMessage smp sndPrivateKey sndId msg
|
||||
|
||||
replyError :: ErrorType -> SomeException -> m a
|
||||
replyError err e = do
|
||||
respond :: ACommand 'Agent -> m ()
|
||||
respond resp = atomically $ writeTBQueue sndQ (corrId, connAlias, resp)
|
||||
|
||||
subscriber :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
|
||||
subscriber c@AgentClient {msgQ} = forever $ do
|
||||
-- TODO this will only process messages and notifications
|
||||
t <- atomically $ readTBQueue msgQ
|
||||
runExceptT (processSMPTransmission c t) >>= \case
|
||||
Left e -> liftIO $ print e
|
||||
Right _ -> return ()
|
||||
|
||||
processSMPTransmission ::
|
||||
forall m.
|
||||
(MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m) =>
|
||||
AgentClient ->
|
||||
SMPServerTransmission ->
|
||||
m ()
|
||||
processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do
|
||||
case cmd of
|
||||
SMP.MSG _msgId _ts msgBody -> do
|
||||
-- TODO deduplicate with previously received
|
||||
(connAlias, rcvQueue@ReceiveQueue {decryptKey, status}) <- withStore $ \st -> getReceiveQueue st srv rId
|
||||
agentMsg <- liftEither . parseSMPMessage =<< decryptMessage decryptKey msgBody
|
||||
case agentMsg of
|
||||
SMPConfirmation senderKey -> do
|
||||
case status of
|
||||
New ->
|
||||
-- TODO currently it automatically allows whoever sends the confirmation
|
||||
-- Commands CONF and LET are not implemented yet
|
||||
-- They are probably not needed in v0.2?
|
||||
-- TODO notification that connection confirmed?
|
||||
secureQueue rcvQueue senderKey
|
||||
s ->
|
||||
-- TODO maybe send notification to the user
|
||||
liftIO . putStrLn $ "unexpected SMP confirmation, queue status " <> show s
|
||||
SMPMessage {agentMessage} ->
|
||||
case agentMessage of
|
||||
HELLO _verifyKey _ -> return ()
|
||||
REPLY qInfo -> do
|
||||
(sndQueue, senderKey) <- newSendQueue qInfo
|
||||
withStore $ \st -> addSndQueue st connAlias sndQueue
|
||||
sendConfirmation c sndQueue senderKey
|
||||
sendHello c sndQueue
|
||||
atomically $ writeTBQueue sndQ ("", connAlias, CON)
|
||||
A_MSG _msgBody -> return ()
|
||||
return ()
|
||||
SMP.END -> return ()
|
||||
_ -> liftIO $ do
|
||||
putStrLn "unexpected response"
|
||||
print cmd
|
||||
where
|
||||
secureQueue :: ReceiveQueue -> SMP.SenderKey -> m ()
|
||||
secureQueue ReceiveQueue {rcvId, rcvPrivateKey} senderKey = do
|
||||
withStore $ \st -> updateReceiveQueueStatus st rcvId Confirmed
|
||||
-- TODO update sender key in the store
|
||||
smp <- getSMPServerClient c srv
|
||||
liftSMP $ secureSMPQueue smp rcvPrivateKey rId senderKey
|
||||
withStore $ \st -> updateReceiveQueueStatus st rcvId Secured
|
||||
|
||||
decryptMessage :: MonadUnliftIO m => PrivateKey -> ByteString -> m ByteString
|
||||
decryptMessage _decryptKey = return
|
||||
|
||||
getSMPServerClient ::
|
||||
forall m.
|
||||
(MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m) =>
|
||||
AgentClient ->
|
||||
SMPServer ->
|
||||
m SMPClient
|
||||
getSMPServerClient AgentClient {smpClients, msgQ} srv =
|
||||
atomically (M.lookup srv <$> readTVar smpClients)
|
||||
>>= maybe newSMPClient return
|
||||
where
|
||||
newSMPClient :: m SMPClient
|
||||
newSMPClient = do
|
||||
cfg <- asks $ smpCfg . config
|
||||
c <- liftIO (getSMPClient srv cfg msgQ) `E.catch` throwErr (BROKER smpErrTCPConnection)
|
||||
atomically . modifyTVar smpClients $ M.insert srv c
|
||||
return c
|
||||
|
||||
throwErr :: ErrorType -> SomeException -> m a
|
||||
throwErr err e = do
|
||||
liftIO . putStrLn $ "Exception: " ++ show e -- TODO remove
|
||||
throwError err
|
||||
|
||||
getSMPServerClient :: SMPServer -> m SMPClient
|
||||
getSMPServerClient srv =
|
||||
atomically (M.lookup srv <$> readTVar smpClients)
|
||||
>>= maybe newSMPClient return
|
||||
where
|
||||
newSMPClient :: m SMPClient
|
||||
newSMPClient = do
|
||||
cfg <- asks $ smpCfg . config
|
||||
c <- liftIO (getSMPClient srv cfg msgQ) `E.catch` replyError (BROKER smpErrTCPConnection)
|
||||
atomically . modifyTVar smpClients $ M.insert srv c
|
||||
return c
|
||||
newSendQueue ::
|
||||
(MonadUnliftIO m, MonadReader Env m) => SMPQueueInfo -> m (SendQueue, SMP.SenderKey)
|
||||
newSendQueue (SMPQueueInfo smpServer senderId encryptKey) = do
|
||||
g <- asks idsDrg
|
||||
senderKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair
|
||||
verifyKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair
|
||||
let sndPrivateKey = senderKey
|
||||
signKey = verifyKey
|
||||
sndQueue =
|
||||
SendQueue
|
||||
{ server = smpServer,
|
||||
sndId = senderId,
|
||||
sndPrivateKey,
|
||||
encryptKey,
|
||||
signKey,
|
||||
-- verifyKey,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
return (sndQueue, senderKey)
|
||||
|
||||
mkConfirmation :: PublicKey -> PublicKey -> m SMP.MsgBody
|
||||
mkConfirmation _encKey senderKey = do
|
||||
let msg = "KEY " <> senderKey <> "\r\n\r\n"
|
||||
sendConfirmation ::
|
||||
forall m.
|
||||
(MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m) =>
|
||||
AgentClient ->
|
||||
SendQueue ->
|
||||
SMP.SenderKey ->
|
||||
m ()
|
||||
sendConfirmation c SendQueue {server, sndId} senderKey = do
|
||||
-- TODO send initial confirmation with signature - change in SMP server
|
||||
smp <- getSMPServerClient c server
|
||||
msg <- mkConfirmation
|
||||
liftSMP $ sendSMPMessage smp "" sndId msg
|
||||
withStore $ \st -> updateSendQueueStatus st sndId Confirmed
|
||||
where
|
||||
mkConfirmation :: m SMP.MsgBody
|
||||
mkConfirmation = do
|
||||
let msg = serializeSMPMessage $ SMPConfirmation senderKey
|
||||
-- TODO encryption
|
||||
return msg
|
||||
|
||||
respond :: ACommand 'Agent -> m ()
|
||||
respond c = atomically $ writeTBQueue sndQ (corrId, connAlias, c)
|
||||
sendHello ::
|
||||
forall m.
|
||||
(MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m) =>
|
||||
AgentClient ->
|
||||
SendQueue ->
|
||||
m ()
|
||||
sendHello c SendQueue {server, sndId, sndPrivateKey, encryptKey} = do
|
||||
smp <- getSMPServerClient c server
|
||||
msg <- mkHello "" $ AckMode On -- TODO verifyKey
|
||||
_send smp 20 msg
|
||||
withStore $ \st -> updateSendQueueStatus st sndId Active
|
||||
where
|
||||
mkHello :: PublicKey -> AckMode -> m ByteString
|
||||
mkHello verifyKey ackMode =
|
||||
mkAgentMessage encryptKey $ HELLO verifyKey ackMode
|
||||
|
||||
subscriber :: MonadUnliftIO m => AgentClient -> m ()
|
||||
subscriber AgentClient {msgQ} = forever $ do
|
||||
-- TODO this will only process messages and notifications
|
||||
(_srv, _qId, _cmd) <- atomically $ readTBQueue msgQ
|
||||
-- case respOrErr of
|
||||
-- Right (Cmd _ (MSG msgId ts msgBody)) ->
|
||||
-- writeTBQueue messageQ (qId, Message {msgId, ts, msgBody})
|
||||
-- Right (Cmd _ END) -> writeTBQueue endSubQ qId
|
||||
-- -- TODO maybe have one more queue to write unexpected responses
|
||||
-- _ -> return ()
|
||||
return ()
|
||||
_send :: SMPClient -> Int -> ByteString -> m ()
|
||||
_send _ 0 _ = throwError INTERNAL -- TODO different error
|
||||
_send smp retry msg = do
|
||||
liftSMP (sendSMPMessage smp sndPrivateKey sndId msg)
|
||||
`catchError` ( \case
|
||||
SMP SMP.AUTH -> do
|
||||
liftIO $ threadDelay 100000
|
||||
_send smp (retry - 1) msg
|
||||
_ -> throwError INTERNAL -- TODO wrap client error in some constructor
|
||||
)
|
||||
|
||||
mkAgentMessage :: (MonadUnliftIO m) => PrivateKey -> AMessage -> m ByteString
|
||||
mkAgentMessage _encKey agentMessage = do
|
||||
agentTimestamp <- liftIO getCurrentTime
|
||||
let msg =
|
||||
serializeSMPMessage
|
||||
SMPMessage
|
||||
{ agentMsgId = 0,
|
||||
agentTimestamp,
|
||||
previousMsgHash = "",
|
||||
agentMessage
|
||||
}
|
||||
-- TODO encryption
|
||||
return msg
|
||||
|
||||
@@ -15,13 +15,13 @@ import Data.Time.Clock (UTCTime)
|
||||
import Data.Type.Equality
|
||||
import Simplex.Messaging.Agent.Store.Types
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Server.Transmission (PrivateKey, PublicKey, QueueId)
|
||||
import Simplex.Messaging.Server.Transmission (PrivateKey, PublicKey, RecipientId, SenderId)
|
||||
|
||||
data ReceiveQueue = ReceiveQueue
|
||||
{ server :: SMPServer,
|
||||
rcvId :: QueueId,
|
||||
rcvId :: RecipientId,
|
||||
rcvPrivateKey :: PrivateKey,
|
||||
sndId :: Maybe QueueId,
|
||||
sndId :: Maybe SenderId,
|
||||
sndKey :: Maybe PublicKey,
|
||||
decryptKey :: PrivateKey,
|
||||
verifyKey :: Maybe PublicKey,
|
||||
@@ -32,7 +32,7 @@ data ReceiveQueue = ReceiveQueue
|
||||
|
||||
data SendQueue = SendQueue
|
||||
{ server :: SMPServer,
|
||||
sndId :: QueueId,
|
||||
sndId :: SenderId,
|
||||
sndPrivateKey :: PrivateKey,
|
||||
encryptKey :: PublicKey,
|
||||
signKey :: PrivateKey,
|
||||
@@ -98,11 +98,14 @@ class Monad m => MonadAgentStore s m where
|
||||
createRcvConn :: s -> ConnAlias -> ReceiveQueue -> m ()
|
||||
createSndConn :: s -> ConnAlias -> SendQueue -> m ()
|
||||
getConn :: s -> ConnAlias -> m SomeConn
|
||||
getReceiveQueue :: s -> SMPServer -> RecipientId -> m (ConnAlias, ReceiveQueue)
|
||||
deleteConn :: s -> ConnAlias -> m ()
|
||||
addSndQueue :: s -> ConnAlias -> SendQueue -> m ()
|
||||
addRcvQueue :: s -> ConnAlias -> ReceiveQueue -> m ()
|
||||
removeSndAuth :: s -> ConnAlias -> m ()
|
||||
updateQueueStatus :: s -> ConnAlias -> QueueDirection -> QueueStatus -> m ()
|
||||
updateReceiveQueueStatus :: s -> RecipientId -> QueueStatus -> m ()
|
||||
updateSendQueueStatus :: s -> SenderId -> QueueStatus -> m ()
|
||||
createMsg :: s -> ConnAlias -> QueueDirection -> AgentMsgId -> AMessage -> m ()
|
||||
getLastMsg :: s -> ConnAlias -> QueueDirection -> m MessageDelivery
|
||||
getMsg :: s -> ConnAlias -> QueueDirection -> AgentMsgId -> m MessageDelivery
|
||||
|
||||
@@ -34,6 +34,7 @@ import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Schema
|
||||
import Simplex.Messaging.Agent.Store.Types
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Server.Transmission (RecipientId, SenderId)
|
||||
import Simplex.Messaging.Util
|
||||
import Text.Read
|
||||
import qualified UnliftIO.Exception as E
|
||||
@@ -382,6 +383,9 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
|
||||
return $ SomeConn SCSend (SendConnection connAlias sndQ)
|
||||
_ -> throwError SEBadConn
|
||||
|
||||
getReceiveQueue :: SQLiteStore -> SMPServer -> RecipientId -> m (ConnAlias, ReceiveQueue)
|
||||
getReceiveQueue _st _smpServer _recipientId = throwError SEInternal
|
||||
|
||||
-- TODO make transactional
|
||||
addSndQueue :: SQLiteStore -> ConnAlias -> SendQueue -> m ()
|
||||
addSndQueue st connAlias sndQueue =
|
||||
@@ -438,6 +442,12 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
|
||||
Just qId -> updateSndQueueStatus st qId status
|
||||
Nothing -> throwError SEBadQueueDirection
|
||||
|
||||
updateReceiveQueueStatus :: SQLiteStore -> RecipientId -> QueueStatus -> m ()
|
||||
updateReceiveQueueStatus _st _rId _status = throwError SENotImplemented
|
||||
|
||||
updateSendQueueStatus :: SQLiteStore -> SenderId -> QueueStatus -> m ()
|
||||
updateSendQueueStatus _st _sId _status = throwError SENotImplemented
|
||||
|
||||
-- TODO decrease duplication of queue direction checks?
|
||||
createMsg :: SQLiteStore -> ConnAlias -> QueueDirection -> AgentMsgId -> AMessage -> m ()
|
||||
createMsg st connAlias qDirection agentMsgId msg = do
|
||||
@@ -445,23 +455,23 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
|
||||
RCV -> do
|
||||
(rcvQId, _) <- getConnection st connAlias
|
||||
case rcvQId of
|
||||
Just _ -> insertMsg st connAlias qDirection agentMsgId $ serializeMsg msg
|
||||
Just _ -> insertMsg st connAlias qDirection agentMsgId $ serializeAgentMessage msg
|
||||
Nothing -> throwError SEBadQueueDirection
|
||||
SND -> do
|
||||
(_, sndQId) <- getConnection st connAlias
|
||||
case sndQId of
|
||||
Just _ -> insertMsg st connAlias qDirection agentMsgId $ serializeMsg msg
|
||||
Just _ -> insertMsg st connAlias qDirection agentMsgId $ serializeAgentMessage msg
|
||||
Nothing -> throwError SEBadQueueDirection
|
||||
|
||||
getLastMsg :: SQLiteStore -> ConnAlias -> QueueDirection -> m MessageDelivery
|
||||
getLastMsg _st _connAlias _dir = throwError SEInternal
|
||||
getLastMsg _st _connAlias _dir = throwError SENotImplemented
|
||||
|
||||
getMsg :: SQLiteStore -> ConnAlias -> QueueDirection -> AgentMsgId -> m MessageDelivery
|
||||
getMsg _st _connAlias _dir _msgId = throwError SEInternal
|
||||
getMsg _st _connAlias _dir _msgId = throwError SENotImplemented
|
||||
|
||||
-- TODO missing status parameter?
|
||||
updateMsgStatus :: SQLiteStore -> ConnAlias -> QueueDirection -> AgentMsgId -> m ()
|
||||
updateMsgStatus _st _connAlias _dir _msgId = throwError SEInternal
|
||||
updateMsgStatus _st _connAlias _dir _msgId = throwError SENotImplemented
|
||||
|
||||
deleteMsg :: SQLiteStore -> ConnAlias -> QueueDirection -> AgentMsgId -> m ()
|
||||
deleteMsg _st _connAlias _dir _msgId = throwError SEInternal
|
||||
deleteMsg _st _connAlias _dir _msgId = throwError SENotImplemented
|
||||
|
||||
@@ -13,4 +13,5 @@ data StoreError
|
||||
| SEBadConnType ConnType
|
||||
| SEBadQueueStatus
|
||||
| SEBadQueueDirection
|
||||
| SENotImplemented -- TODO remove
|
||||
deriving (Eq, Show, Exception)
|
||||
|
||||
@@ -20,6 +20,7 @@ import qualified Data.ByteString.Char8 as B
|
||||
import Data.Kind
|
||||
import Data.List.Split (splitOn)
|
||||
import Data.Time.Clock (UTCTime)
|
||||
import Data.Time.ISO8601
|
||||
import Data.Type.Equality
|
||||
import Data.Typeable ()
|
||||
import Network.Socket
|
||||
@@ -69,12 +70,14 @@ data ACmd where
|
||||
deriving instance Show ACmd
|
||||
|
||||
data ACommand (p :: AParty) where
|
||||
NEW :: SMPServer -> ACommand Client
|
||||
NEW :: SMPServer -> ACommand Client -- response INV
|
||||
INV :: SMPQueueInfo -> ACommand Agent
|
||||
JOIN :: SMPQueueInfo -> ReplyMode -> ACommand Client
|
||||
CON :: ACommand Agent
|
||||
CONF :: OtherPartyId -> ACommand Agent
|
||||
LET :: OtherPartyId -> ACommand Client
|
||||
JOIN :: SMPQueueInfo -> ReplyMode -> ACommand Client -- response OK
|
||||
CON :: ACommand Agent -- notification that connection is established
|
||||
-- TODO currently it automatically allows whoever sends the confirmation
|
||||
READY :: ACommand Agent
|
||||
-- CONF :: OtherPartyId -> ACommand Agent
|
||||
-- LET :: OtherPartyId -> ACommand Client
|
||||
SUB :: SubMode -> ACommand Client
|
||||
END :: ACommand Agent
|
||||
-- QST :: QueueDirection -> ACommand Client
|
||||
@@ -92,13 +95,34 @@ deriving instance Show (ACommand p)
|
||||
|
||||
type Message = ByteString
|
||||
|
||||
data SMPMessage
|
||||
= SMPConfirmation PublicKey
|
||||
| SMPMessage
|
||||
{ agentMsgId :: Integer,
|
||||
agentTimestamp :: UTCTime,
|
||||
previousMsgHash :: ByteString,
|
||||
agentMessage :: AMessage
|
||||
}
|
||||
|
||||
data AMessage where
|
||||
HELLO :: VerificationKey -> AckMode -> AMessage
|
||||
REPLY :: SMPQueueInfo -> AMessage
|
||||
A_MSG :: MsgBody -> AMessage
|
||||
|
||||
parseMessage :: Message -> Either ErrorType AMessage
|
||||
parseMessage msg = case B.words msg of
|
||||
parseSMPMessage :: ByteString -> Either ErrorType SMPMessage
|
||||
parseSMPMessage _ = Left INTERNAL
|
||||
|
||||
serializeSMPMessage :: SMPMessage -> ByteString
|
||||
serializeSMPMessage = \case
|
||||
SMPConfirmation sKey -> "KEY " <> sKey <> "\r\n\r\n"
|
||||
SMPMessage {agentMsgId, agentTimestamp, previousMsgHash, agentMessage} ->
|
||||
"\r\n" <> messageHeader agentMsgId agentTimestamp previousMsgHash <> "\r\n" <> serializeAgentMessage agentMessage
|
||||
where
|
||||
messageHeader agentMsgId agentTimestamp previousMsgHash =
|
||||
B.unwords [B.pack $ show agentMsgId, B.pack (formatISO8601Millis agentTimestamp), encode previousMsgHash]
|
||||
|
||||
parseAgentMessage :: ByteString -> Either ErrorType AMessage
|
||||
parseAgentMessage msg = case B.words msg of
|
||||
["HELLO", key, ackMode] -> HELLO key <$> parseAckMode ackMode
|
||||
["REPLY", qInfo] -> REPLY <$> parseSmpQueueInfo qInfo
|
||||
["A_MSG", msgBody] -> Right $ A_MSG msgBody
|
||||
@@ -137,11 +161,11 @@ getMode mode = case mode of
|
||||
errParams :: Either ErrorType a
|
||||
errParams = Left $ SYNTAX errBadParameters
|
||||
|
||||
serializeMsg :: AMessage -> Message
|
||||
serializeMsg = \case
|
||||
serializeAgentMessage :: AMessage -> ByteString
|
||||
serializeAgentMessage = \case
|
||||
HELLO _verKey _ackMode -> "HELLO" -- TODO
|
||||
REPLY qInfo -> "REPLY" <> serializeSmpQueueInfo qInfo
|
||||
A_MSG msgBody -> "A_MSG" <> msgBody -- ? whitespaces missing
|
||||
REPLY qInfo -> "REPLY " <> serializeSmpQueueInfo qInfo
|
||||
A_MSG msgBody -> "A_MSG " <> msgBody -- ? whitespaces missing
|
||||
|
||||
serializeSmpQueueInfo :: SMPQueueInfo -> ByteString
|
||||
serializeSmpQueueInfo (SMPQueueInfo srv qId ek) = "smp::" <> serializeServer srv <> "::" <> encode qId <> "::" <> encode ek
|
||||
|
||||
@@ -52,7 +52,7 @@ data SMPClient = SMPClient
|
||||
msgQ :: TBQueue SMPServerTransmission
|
||||
}
|
||||
|
||||
type SMPServerTransmission = (SMPServer, RecipientId, Cmd)
|
||||
type SMPServerTransmission = (SMPServer, RecipientId, Command 'Broker)
|
||||
|
||||
data SMPClientConfig = SMPClientConfig
|
||||
{ qSize :: Natural,
|
||||
@@ -99,8 +99,8 @@ getSMPClient smpServer@SMPServer {host, port} SMPClientConfig {qSize, defaultPor
|
||||
cs <- readTVar sentCommands
|
||||
case M.lookup corrId cs of
|
||||
Nothing -> case respOrErr of
|
||||
Right resp -> writeTBQueue msgQ (smpServer, qId, resp)
|
||||
Left _ -> return ()
|
||||
Right (Cmd SBroker cmd) -> writeTBQueue msgQ (smpServer, qId, cmd)
|
||||
_ -> return ()
|
||||
Just Request {queueId, responseVar} -> do
|
||||
modifyTVar sentCommands $ M.delete corrId
|
||||
putTMVar responseVar $
|
||||
@@ -120,31 +120,32 @@ data SMPClientError
|
||||
| SMPClientError
|
||||
deriving (Eq, Show, Exception)
|
||||
|
||||
createSMPQueue :: SMPClient -> RecipientKey -> ExceptT SMPClientError IO (RecipientId, SenderId)
|
||||
createSMPQueue c rKey =
|
||||
createSMPQueue :: SMPClient -> PrivateKey -> RecipientKey -> ExceptT SMPClientError IO (RecipientId, SenderId)
|
||||
createSMPQueue c _rpKey rKey =
|
||||
-- TODO add signing this request too - requires changes in the server
|
||||
sendSMPCommand c "" "" (Cmd SRecipient $ NEW rKey) >>= \case
|
||||
Cmd _ (IDS rId sId) -> return (rId, sId)
|
||||
_ -> throwE SMPUnexpectedResponse
|
||||
|
||||
subscribeSMPQueue :: SMPClient -> RecipientKey -> QueueId -> ExceptT SMPClientError IO ()
|
||||
subscribeSMPQueue c@SMPClient {smpServer, msgQ} rKey rId =
|
||||
sendSMPCommand c rKey rId (Cmd SRecipient SUB) >>= \case
|
||||
subscribeSMPQueue :: SMPClient -> PrivateKey -> RecipientId -> ExceptT SMPClientError IO ()
|
||||
subscribeSMPQueue c@SMPClient {smpServer, msgQ} rpKey rId =
|
||||
sendSMPCommand c rpKey rId (Cmd SRecipient SUB) >>= \case
|
||||
Cmd _ OK -> return ()
|
||||
cmd@(Cmd _ MSG {}) ->
|
||||
Cmd _ cmd@MSG {} ->
|
||||
lift . atomically $ writeTBQueue msgQ (smpServer, rId, cmd)
|
||||
_ -> throwE SMPUnexpectedResponse
|
||||
|
||||
secureSMPQueue :: SMPClient -> RecipientKey -> QueueId -> SenderKey -> ExceptT SMPClientError IO ()
|
||||
secureSMPQueue c rKey rId senderKey = okSMPCommand (Cmd SRecipient $ KEY senderKey) c rKey rId
|
||||
secureSMPQueue :: SMPClient -> PrivateKey -> RecipientId -> SenderKey -> ExceptT SMPClientError IO ()
|
||||
secureSMPQueue c rpKey rId senderKey = okSMPCommand (Cmd SRecipient $ KEY senderKey) c rpKey rId
|
||||
|
||||
sendSMPMessage :: SMPClient -> SenderKey -> QueueId -> MsgBody -> ExceptT SMPClientError IO ()
|
||||
sendSMPMessage c sKey sId msg = okSMPCommand (Cmd SSender $ SEND msg) c sKey sId
|
||||
sendSMPMessage :: SMPClient -> PrivateKey -> SenderId -> MsgBody -> ExceptT SMPClientError IO ()
|
||||
sendSMPMessage c spKey sId msg = okSMPCommand (Cmd SSender $ SEND msg) c spKey sId
|
||||
|
||||
ackSMPMessage :: SMPClient -> RecipientKey -> QueueId -> ExceptT SMPClientError IO ()
|
||||
ackSMPMessage c@SMPClient {smpServer, msgQ} rKey rId =
|
||||
sendSMPCommand c rKey rId (Cmd SRecipient ACK) >>= \case
|
||||
ackSMPMessage c@SMPClient {smpServer, msgQ} rpKey rId =
|
||||
sendSMPCommand c rpKey rId (Cmd SRecipient ACK) >>= \case
|
||||
Cmd _ OK -> return ()
|
||||
cmd@(Cmd _ MSG {}) ->
|
||||
Cmd _ cmd@MSG {} ->
|
||||
lift . atomically $ writeTBQueue msgQ (smpServer, rId, cmd)
|
||||
_ -> throwE SMPUnexpectedResponse
|
||||
|
||||
|
||||
Reference in New Issue
Block a user