Sending messages end to end (#21)

* duplex connection end-to-end (working, debug logs)

* agent: send, receive, acknowledge messages

* logging proposal

* logging: client/server (dis)connections

* agent scenario testing framework

* add tests, remove logs

* clean up
This commit is contained in:
Evgeny Poberezkin
2021-01-20 21:01:54 +00:00
committed by GitHub
parent 1a40b91331
commit ddbf00fc46
10 changed files with 292 additions and 72 deletions
+48 -11
View File
@@ -42,7 +42,7 @@ runSMPAgent cfg@AgentConfig {tcpPort} = do
where
smpAgent :: (MonadUnliftIO m', MonadReader Env m') => m' ()
smpAgent = runTCPServer tcpPort $ \h -> do
putLn h "Welcome to SMP v0.2.0 agent"
liftIO $ putLn h "Welcome to SMP v0.2.0 agent"
q <- asks $ tbqSize . config
c <- atomically $ newAgentClient q
race_ (connectClient h c) (runClient c)
@@ -102,6 +102,8 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
case cmd of
NEW smpServer -> createNewConnection smpServer
JOIN smpQueueInfo replyMode -> joinConnection smpQueueInfo replyMode
SEND msgBody -> sendMessage msgBody
ACK aMsgId -> ackMessage aMsgId
_ -> throwError PROHIBITED
where
createNewConnection :: SMPServer -> m ()
@@ -126,11 +128,35 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
ReplyOff -> return ()
respond CON
sendMessage :: SMP.MsgBody -> m ()
sendMessage msgBody =
withStore (`getConn` connAlias) >>= \case
SomeConn _ (DuplexConnection _ _ sq) -> sendMsg sq
SomeConn _ (SendConnection _ sq) -> sendMsg sq
-- TODO possibly there should be a separate error type trying to send the message to the connection without SendQueue
_ -> throwError PROHIBITED -- NOT_READY ?
where
sendMsg sq = do
sendAgentMessage sq $ A_MSG msgBody
-- TODO respond $ SENT aMsgId
respond OK
ackMessage :: AgentMsgId -> m ()
ackMessage _aMsgId =
withStore (`getConn` connAlias) >>= \case
SomeConn _ (DuplexConnection _ rq _) -> ackMsg rq
SomeConn _ (ReceiveConnection _ rq) -> ackMsg rq
-- TODO possibly there should be a separate error type trying to send the message to the connection without SendQueue
-- NOT_READY ?
_ -> throwError PROHIBITED
where
ackMsg rq = sendAck c rq >> respond OK
sendReplyQInfo :: SMPServer -> SendQueue -> m ()
sendReplyQInfo srv sndQueue = do
(rcvQueue, qInfo) <- newReceiveQueue srv
withStore $ \st -> addRcvQueue st connAlias rcvQueue
sendAgentMessage sndQueue $ REPLY qInfo
sendReplyQInfo srv sq = do
(rq, qInfo) <- newReceiveQueue srv
withStore $ \st -> addRcvQueue st connAlias rq
sendAgentMessage sq $ REPLY qInfo
newReceiveQueue :: SMPServer -> m (ReceiveQueue, SMPQueueInfo)
newReceiveQueue server = do
@@ -180,34 +206,40 @@ processSMPTransmission ::
m ()
processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do
case cmd of
SMP.MSG _msgId _ts msgBody -> do
SMP.MSG _ srvTs msgBody -> do
-- TODO deduplicate with previously received
(connAlias, rcvQueue@ReceiveQueue {decryptKey, status}) <- withStore $ \st -> getReceiveQueue st srv rId
agentMsg <- liftEither . parseSMPMessage =<< decryptMessage decryptKey msgBody
case agentMsg of
SMPConfirmation senderKey -> do
case status of
New ->
New -> do
-- TODO currently it automatically allows whoever sends the confirmation
-- Commands CONF and LET are not implemented yet
-- They are probably not needed in v0.2?
-- TODO notification that connection confirmed?
secureQueue rcvQueue senderKey
sendAck c rcvQueue
s ->
-- TODO maybe send notification to the user
liftIO . putStrLn $ "unexpected SMP confirmation, queue status " <> show s
SMPMessage {agentMessage} ->
SMPMessage {agentMessage, agentMsgId, agentTimestamp} ->
case agentMessage of
HELLO _verifyKey _ -> do
-- TODO send status update to the user?
withStore $ \st -> updateRcvQueueStatus st rcvQueue Active
sendAck c rcvQueue
REPLY qInfo -> do
(sndQueue, senderKey) <- newSendQueue qInfo
withStore $ \st -> addSndQueue st connAlias sndQueue
sendConfirmation c sndQueue senderKey
sendHello c sndQueue
atomically $ writeTBQueue sndQ ("", connAlias, CON)
A_MSG _msgBody -> return ()
sendAck c rcvQueue
A_MSG body -> do
-- TODO check message status
let msg = MSG agentMsgId agentTimestamp srvTs MsgOk body
atomically $ writeTBQueue sndQ ("", connAlias, msg)
return ()
SMP.END -> return ()
_ -> liftIO $ do
@@ -296,7 +328,7 @@ sendHello ::
m ()
sendHello c sq@SendQueue {server, sndId, sndPrivateKey, encryptKey} = do
smp <- getSMPServerClient c server
msg <- mkHello "" $ AckMode On -- TODO verifyKey
msg <- mkHello "5678" $ AckMode On -- TODO verifyKey
_send smp 20 msg
withStore $ \st -> updateSndQueueStatus st sq Active
where
@@ -315,6 +347,11 @@ sendHello c sq@SendQueue {server, sndId, sndPrivateKey, encryptKey} = do
_ -> throwError INTERNAL -- TODO wrap client error in some constructor
)
sendAck :: (MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m) => AgentClient -> ReceiveQueue -> m ()
sendAck c ReceiveQueue {server, rcvId, rcvPrivateKey} = do
smp <- getSMPServerClient c server
liftSMP $ ackSMPMessage smp rcvPrivateKey rcvId
mkAgentMessage :: (MonadUnliftIO m) => PrivateKey -> AMessage -> m ByteString
mkAgentMessage _encKey agentMessage = do
agentTimestamp <- liftIO getCurrentTime
@@ -323,7 +360,7 @@ mkAgentMessage _encKey agentMessage = do
SMPMessage
{ agentMsgId = 0,
agentTimestamp,
previousMsgHash = "",
previousMsgHash = "1234", -- TODO hash of the previous message
agentMessage
}
-- TODO encryption
+47 -15
View File
@@ -130,12 +130,12 @@ parseSMPMessage = parse (smpMessageP <* A.endOfLine) $ SYNTAX errBadMessage
smpClientMessageP =
SMPMessage
<$> A.decimal <* A.space
<*> tsIso8601P <* A.space
<*> tsISO8601P <* A.space
<*> base64P <* A.endOfLine
<*> agentMessageP
tsIso8601P :: Parser UTCTime
tsIso8601P = maybe (fail "timestamp") pure . parseISO8601 . B.unpack =<< A.takeTill (== ' ')
tsISO8601P :: Parser UTCTime
tsISO8601P = maybe (fail "timestamp") pure . parseISO8601 . B.unpack =<< A.takeTill (== ' ')
serializeSMPMessage :: SMPMessage -> ByteString
serializeSMPMessage = \case
@@ -164,7 +164,7 @@ agentMessageP =
smpQueueInfoP :: Parser SMPQueueInfo
smpQueueInfoP =
SMPQueueInfo <$> ("smp::" *> smpServerP <* "::") <*> (base64P <* "::") <*> base64P
"smp::" *> (SMPQueueInfo <$> smpServerP <* "::" <*> base64P <* "::" <*> base64P)
smpServerP :: Parser SMPServer
smpServerP = SMPServer <$> server <*> port <*> msgHash
@@ -190,12 +190,12 @@ errParams = Left $ SYNTAX errBadParameters
serializeAgentMessage :: AMessage -> ByteString
serializeAgentMessage = \case
HELLO _verKey _ackMode -> "HELLO" -- TODO
HELLO verifyKey ackMode -> "HELLO " <> encode verifyKey <> if ackMode == AckMode Off then " NO_ACK" else ""
REPLY qInfo -> "REPLY " <> serializeSmpQueueInfo qInfo
A_MSG msgBody -> "A_MSG " <> msgBody
A_MSG body -> "MSG " <> serializeMsg body <> "\n"
serializeSmpQueueInfo :: SMPQueueInfo -> ByteString
serializeSmpQueueInfo (SMPQueueInfo srv qId ek) = "smp::" <> serializeServer srv <> "::" <> encode qId <> "::" <> encode ek
serializeSmpQueueInfo (SMPQueueInfo srv qId ek) = B.intercalate "::" ["smp", serializeServer srv, encode qId, encode ek]
serializeServer :: SMPServer -> ByteString
serializeServer SMPServer {host, port, keyHash} = B.pack $ host <> maybe "" (':' :) port <> maybe "" (('#' :) . B.unpack) keyHash
@@ -233,7 +233,7 @@ data QueueDirection = SND | RCV deriving (Show)
data QueueStatus = New | Confirmed | Secured | Active | Disabled
deriving (Eq, Show, Read)
type AgentMsgId = Int
type AgentMsgId = Integer
data MsgStatus = MsgOk | MsgError MsgErrorType
deriving (Show)
@@ -243,7 +243,6 @@ data MsgErrorType = MsgSkipped AgentMsgId AgentMsgId | MsgBadId AgentMsgId | Msg
data ErrorType
= UNKNOWN
| UNSUPPORTED -- TODO remove once all commands implemented
| PROHIBITED
| SYNTAX Int
| BROKER Natural
@@ -291,16 +290,30 @@ parseCommandP =
"NEW " *> newCmd
<|> "INV " *> invResp
<|> "JOIN " *> joinCmd
<|> "SEND " *> sendCmd
<|> "MSG " *> message
<|> "ACK " *> acknowledge
-- <|> "ERR " *> agentError - TODO
<|> "CON" $> ACmd SAgent CON
<|> "OK" $> ACmd SAgent OK
where
newCmd = ACmd SClient . NEW <$> smpServerP
invResp = ACmd SAgent . INV <$> smpQueueInfoP
joinCmd = ACmd SClient <$> (JOIN <$> smpQueueInfoP <*> replyMode)
sendCmd = ACmd SClient <$> (SEND <$> A.takeByteString)
message =
let sp = A.space; msgId = A.decimal <* sp; ts = tsISO8601P <* sp; body = A.takeByteString
in ACmd SAgent <$> (MSG <$> msgId <*> ts <*> ts <*> status <* sp <*> body)
acknowledge = ACmd SClient <$> (ACK <$> A.decimal)
replyMode =
" NO_REPLY" $> ReplyOff
<|> A.space *> (ReplyVia <$> smpServerP)
<|> pure ReplyOn
status = "OK" $> MsgOk <|> "ERR " *> (MsgError <$> msgErrorType)
msgErrorType =
"ID " *> (MsgBadId <$> A.decimal)
<|> "NO_ID " *> (MsgSkipped <$> A.decimal <* A.space <*> A.decimal)
<|> "HASH" $> MsgBadHash
parseCommand :: ByteString -> Either ErrorType ACmd
parseCommand = parse parseCommandP $ SYNTAX errBadCommand
@@ -310,6 +323,10 @@ serializeCommand = \case
NEW srv -> "NEW " <> serializeServer srv
INV qInfo -> "INV " <> serializeSmpQueueInfo qInfo
JOIN qInfo rMode -> "JOIN " <> serializeSmpQueueInfo qInfo <> replyMode rMode
SEND msgBody -> "SEND " <> serializeMsg msgBody
MSG aMsgId aTs ts st body ->
B.unwords ["MSG", B.pack $ show aMsgId, B.pack $ formatISO8601Millis aTs, B.pack $ formatISO8601Millis ts, msgStatus st, serializeMsg body]
ACK aMsgId -> "ACK " <> B.pack (show aMsgId)
CON -> "CON"
ERR e -> "ERR " <> B.pack (show e)
OK -> "OK"
@@ -320,14 +337,27 @@ serializeCommand = \case
ReplyOff -> " NO_REPLY"
ReplyVia srv -> " " <> serializeServer srv
ReplyOn -> ""
msgStatus :: MsgStatus -> ByteString
msgStatus = \case
MsgOk -> "OK"
MsgError e ->
"ERR" <> case e of
MsgSkipped fromMsgId toMsgId ->
B.unwords ["NO_ID", B.pack $ show fromMsgId, B.pack $ show toMsgId]
MsgBadId aMsgId -> "ID " <> B.pack (show aMsgId)
MsgBadHash -> "HASH"
tPutRaw :: MonadIO m => Handle -> ARawTransmission -> m ()
-- TODO - save function as in the server Transmission - re-use?
serializeMsg :: ByteString -> ByteString
serializeMsg body = B.pack (show $ B.length body) <> "\n" <> body
tPutRaw :: Handle -> ARawTransmission -> IO ()
tPutRaw h (corrId, connAlias, command) = do
putLn h corrId
putLn h connAlias
putLn h command
tGetRaw :: MonadIO m => Handle -> m ARawTransmission
tGetRaw :: Handle -> IO ARawTransmission
tGetRaw h = do
corrId <- getLn h
connAlias <- getLn h
@@ -335,11 +365,12 @@ tGetRaw h = do
return (corrId, connAlias, command)
tPut :: MonadIO m => Handle -> ATransmission p -> m ()
tPut h (corrId, connAlias, command) = tPutRaw h (bs corrId, connAlias, serializeCommand command)
tPut h (corrId, connAlias, command) =
liftIO $ tPutRaw h (bs corrId, connAlias, serializeCommand command)
-- | get client and agent transmissions
tGet :: forall m p. MonadIO m => SAParty p -> Handle -> m (ATransmissionOrError p)
tGet party h = tGetRaw h >>= tParseLoadBody
tGet party h = liftIO (tGetRaw h) >>= tParseLoadBody
where
tParseLoadBody :: ARawTransmission -> m (ATransmissionOrError p)
tParseLoadBody t@(corrId, connAlias, command) = do
@@ -370,13 +401,14 @@ tGet party h = tGetRaw h >>= tParseLoadBody
MSG agentMsgId srvTS agentTS status body -> MSG agentMsgId srvTS agentTS status <$$> getMsgBody body
cmd -> return $ Right cmd
-- TODO refactor with server
getMsgBody :: MsgBody -> m (Either ErrorType MsgBody)
getMsgBody msgBody =
case B.unpack msgBody of
':' : body -> return . Right $ B.pack body
str -> case readMaybe str :: Maybe Int of
Just size -> do
body <- getBytes h size
Just size -> liftIO $ do
body <- B.hGet h size
s <- getLn h
return $ if B.null s then Right body else Left SIZE
Nothing -> return . Left $ SYNTAX errMessageBody
+8 -7
View File
@@ -94,14 +94,15 @@ getSMPClient smpServer@SMPServer {host, port} SMPClientConfig {qSize, defaultPor
receive SMPClient {rcvQ} h = forever $ tGet fromServer h >>= atomically . writeTBQueue rcvQ
process :: SMPClient -> IO ()
process SMPClient {rcvQ, sentCommands} = forever . atomically $ do
(_, (corrId, qId, respOrErr)) <- readTBQueue rcvQ
cs <- readTVar sentCommands
process SMPClient {rcvQ, sentCommands} = forever $ do
(_, (corrId, qId, respOrErr)) <- atomically $ readTBQueue rcvQ
cs <- readTVarIO sentCommands
case M.lookup corrId cs of
Nothing -> case respOrErr of
Right (Cmd SBroker cmd) -> writeTBQueue msgQ (smpServer, qId, cmd)
_ -> return ()
Just Request {queueId, responseVar} -> do
Nothing -> do
case respOrErr of
Right (Cmd SBroker cmd) -> atomically $ writeTBQueue msgQ (smpServer, qId, cmd)
_ -> return ()
Just Request {queueId, responseVar} -> atomically $ do
modifyTVar sentCommands $ M.delete corrId
putTMVar responseVar $
if queueId == qId
+1 -1
View File
@@ -58,7 +58,7 @@ runSMPServer cfg@ServerConfig {tcpPort} = do
runClient :: (MonadUnliftIO m, MonadReader Env m) => Handle -> m ()
runClient h = do
putLn h "Welcome to SMP v0.2.0"
liftIO $ putLn h "Welcome to SMP v0.2.0"
q <- asks $ tbqSize . config
c <- atomically $ newClient q
s <- asks server
+7 -6
View File
@@ -191,14 +191,14 @@ errNoQueueId = 5
errMessageBody :: Int
errMessageBody = 6
tPutRaw :: MonadIO m => Handle -> RawTransmission -> m ()
tPutRaw :: Handle -> RawTransmission -> IO ()
tPutRaw h (signature, corrId, queueId, command) = do
putLn h signature
putLn h corrId
putLn h queueId
putLn h command
tGetRaw :: MonadIO m => Handle -> m RawTransmission
tGetRaw :: Handle -> IO RawTransmission
tGetRaw h = do
signature <- getLn h
corrId <- getLn h
@@ -207,7 +207,8 @@ tGetRaw h = do
return (signature, corrId, queueId, command)
tPut :: MonadIO m => Handle -> Transmission -> m ()
tPut h (signature, (corrId, queueId, command)) = tPutRaw h (encode signature, bs corrId, encode queueId, serializeCommand command)
tPut h (signature, (corrId, queueId, command)) =
liftIO $ tPutRaw h (encode signature, bs corrId, encode queueId, serializeCommand command)
fromClient :: Cmd -> Either ErrorType Cmd
fromClient = \case
@@ -223,7 +224,7 @@ fromServer = \case
-- `fromParty` is used to limit allowed senders - `fromClient` or `fromServer` should be used
tGet :: forall m. MonadIO m => (Cmd -> Either ErrorType Cmd) -> Handle -> m TransmissionOrError
tGet fromParty h = do
(signature, corrId, queueId, command) <- tGetRaw h
(signature, corrId, queueId, command) <- liftIO $ tGetRaw h
let decodedTransmission = liftM2 (,corrId,,command) (decode signature) (decode queueId)
either (const $ tError corrId) tParseLoadBody decodedTransmission
where
@@ -272,8 +273,8 @@ tGet fromParty h = do
case B.unpack msgBody of
':' : body -> return . Right $ B.pack body
str -> case readMaybe str :: Maybe Int of
Just size -> do
body <- getBytes h size
Just size -> liftIO $ do
body <- B.hGet h size
s <- getLn h
return $ if B.null s then Right body else Left SIZE
Nothing -> return . Left $ SYNTAX errMessageBody
+16 -18
View File
@@ -2,6 +2,7 @@
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Simplex.Messaging.Transport where
@@ -20,8 +21,8 @@ import UnliftIO.Exception (Exception, IOException)
import qualified UnliftIO.Exception as E
import qualified UnliftIO.IO as IO
startTCPServer :: MonadIO m => ServiceName -> m Socket
startTCPServer port = liftIO . withSocketsDo $ resolve >>= open
startTCPServer :: ServiceName -> IO Socket
startTCPServer port = withSocketsDo $ resolve >>= open
where
resolve = do
let hints = defaultHints {addrFlags = [AI_PASSIVE], addrSocketType = Stream}
@@ -36,18 +37,18 @@ startTCPServer port = liftIO . withSocketsDo $ resolve >>= open
runTCPServer :: MonadUnliftIO m => ServiceName -> (Handle -> m ()) -> m ()
runTCPServer port server =
E.bracket (startTCPServer port) (liftIO . close) $ \sock -> forever $ do
h <- acceptTCPConn sock
E.bracket (liftIO $ startTCPServer port) (liftIO . close) $ \sock -> forever $ do
h <- liftIO $ acceptTCPConn sock
forkFinally (server h) (const $ IO.hClose h)
acceptTCPConn :: MonadIO m => Socket -> m Handle
acceptTCPConn sock = liftIO $ do
acceptTCPConn :: Socket -> IO Handle
acceptTCPConn sock = do
(conn, _) <- accept sock
getSocketHandle conn
startTCPClient :: MonadUnliftIO m => HostName -> ServiceName -> m Handle
startTCPClient :: HostName -> ServiceName -> IO Handle
startTCPClient host port =
liftIO . withSocketsDo $
withSocketsDo $
resolve >>= foldM tryOpen (Left err) >>= either E.throwIO return
where
err :: IOException
@@ -70,22 +71,19 @@ startTCPClient host port =
runTCPClient :: MonadUnliftIO m => HostName -> ServiceName -> (Handle -> m a) -> m a
runTCPClient host port client = do
h <- startTCPClient host port
h <- liftIO $ startTCPClient host port
client h `E.finally` IO.hClose h
getSocketHandle :: MonadIO m => Socket -> m Handle
getSocketHandle conn = liftIO $ do
getSocketHandle :: Socket -> IO Handle
getSocketHandle conn = do
h <- socketToHandle conn ReadWriteMode
hSetBinaryMode h True
hSetNewlineMode h NewlineMode {inputNL = CRLF, outputNL = CRLF}
hSetBuffering h LineBuffering
return h
putLn :: MonadIO m => Handle -> ByteString -> m ()
putLn h = liftIO . hPutStrLn h . B.unpack
putLn :: Handle -> ByteString -> IO ()
putLn h = B.hPut h . (<> "\r\n")
getLn :: MonadIO m => Handle -> m ByteString
getLn h = B.pack <$> liftIO (hGetLine h)
getBytes :: MonadIO m => Handle -> Int -> m ByteString
getBytes h = liftIO . B.hGet h
getLn :: Handle -> IO ByteString
getLn h = B.pack <$> hGetLine h