mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 20:45:52 +00:00
Logging (#24)
* logging with simple-logger * refactor Agent.Client * move logging to Agent.Client * clean up * log command name only
This commit is contained in:
committed by
Efim Poberezkin
parent
c30a4cd1ff
commit
d82c286a54
@@ -2,6 +2,7 @@
|
||||
|
||||
module Main where
|
||||
|
||||
import Control.Logger.Simple
|
||||
import Simplex.Messaging.Agent (runSMPAgent)
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Client (smpDefaultConfig)
|
||||
@@ -16,7 +17,12 @@ cfg =
|
||||
smpCfg = smpDefaultConfig
|
||||
}
|
||||
|
||||
logCfg :: LogConfig
|
||||
logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
putStrLn $ "SMP agent listening on port " ++ tcpPort (cfg :: AgentConfig)
|
||||
runSMPAgent cfg
|
||||
setLogLevel LogInfo -- LogError
|
||||
withGlobalLogging logCfg $
|
||||
runSMPAgent cfg
|
||||
|
||||
@@ -22,6 +22,7 @@ dependencies:
|
||||
- iso8601-time == 0.1.*
|
||||
- mtl
|
||||
- network == 3.1.*
|
||||
- simple-logger == 0.1.*
|
||||
- sqlite-simple == 0.4.*
|
||||
- stm
|
||||
- template-haskell == 2.15.*
|
||||
|
||||
@@ -10,26 +10,26 @@
|
||||
|
||||
module Simplex.Messaging.Agent (runSMPAgent) where
|
||||
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift
|
||||
import Control.Monad.Reader
|
||||
import Crypto.Random
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.Map as M
|
||||
import Data.Time.Clock
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Text.Encoding
|
||||
import Simplex.Messaging.Agent.Client
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite
|
||||
import Simplex.Messaging.Agent.Store.Types
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Client
|
||||
import Simplex.Messaging.Client (SMPServerTransmission)
|
||||
import Simplex.Messaging.Server (randomBytes)
|
||||
import Simplex.Messaging.Server.Transmission (PrivateKey, PublicKey)
|
||||
import Simplex.Messaging.Server.Transmission (CorrId (..), PrivateKey)
|
||||
import qualified Simplex.Messaging.Server.Transmission as SMP
|
||||
import Simplex.Messaging.Transport
|
||||
import UnliftIO.Async
|
||||
import UnliftIO.Concurrent
|
||||
import UnliftIO.Exception (SomeException)
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.IO
|
||||
@@ -44,24 +44,40 @@ runSMPAgent cfg@AgentConfig {tcpPort} = do
|
||||
smpAgent = runTCPServer tcpPort $ \h -> do
|
||||
liftIO $ putLn h "Welcome to SMP v0.2.0 agent"
|
||||
q <- asks $ tbqSize . config
|
||||
c <- atomically $ newAgentClient q
|
||||
n <- asks clientCounter
|
||||
c <- atomically $ newAgentClient n q
|
||||
logInfo $ "client " <> showText (clientId c) <> " connected to Agent"
|
||||
race_ (connectClient h c) (runClient c)
|
||||
|
||||
connectClient :: MonadUnliftIO m => Handle -> AgentClient -> m ()
|
||||
connectClient h c = race_ (send h c) (receive h c)
|
||||
connectClient h c = do
|
||||
race_ (send h c) (receive h c)
|
||||
logInfo $ "client " <> showText (clientId c) <> " disconnected from Agent"
|
||||
|
||||
runClient :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
|
||||
runClient c = race_ (subscriber c) (client c)
|
||||
|
||||
receive :: MonadUnliftIO m => Handle -> AgentClient -> m ()
|
||||
receive h AgentClient {rcvQ, sndQ} =
|
||||
forever $
|
||||
tGet SClient h >>= \(corrId, cAlias, command) -> atomically $ case command of
|
||||
Right cmd -> writeTBQueue rcvQ (corrId, cAlias, cmd)
|
||||
Left e -> writeTBQueue sndQ (corrId, cAlias, ERR e)
|
||||
receive :: forall m. MonadUnliftIO m => Handle -> AgentClient -> m ()
|
||||
receive h c@AgentClient {rcvQ, sndQ} = forever $ do
|
||||
(corrId, cAlias, cmdOrErr) <- tGet SClient h
|
||||
case cmdOrErr of
|
||||
Right cmd -> write rcvQ (corrId, cAlias, cmd)
|
||||
Left e -> write sndQ (corrId, cAlias, ERR e)
|
||||
where
|
||||
write :: TBQueue (ATransmission p) -> ATransmission p -> m ()
|
||||
write q t = do
|
||||
logClient c "-->" t
|
||||
atomically $ writeTBQueue q t
|
||||
|
||||
send :: MonadUnliftIO m => Handle -> AgentClient -> m ()
|
||||
send h AgentClient {sndQ} = forever $ atomically (readTBQueue sndQ) >>= tPut h
|
||||
send h c@AgentClient {sndQ} = forever $ do
|
||||
t <- atomically $ readTBQueue sndQ
|
||||
tPut h t
|
||||
logClient c "<--" t
|
||||
|
||||
logClient :: MonadUnliftIO m => AgentClient -> ByteString -> ATransmission a -> m ()
|
||||
logClient AgentClient {clientId} dir (CorrId corrId, cAlias, cmd) = do
|
||||
logInfo . decodeUtf8 $ B.unwords [B.pack $ show clientId, dir, "A :", corrId, cAlias, B.takeWhile (/= ' ') $ serializeCommand cmd]
|
||||
|
||||
client :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
|
||||
client c@AgentClient {rcvQ, sndQ} = forever $ do
|
||||
@@ -71,7 +87,7 @@ client c@AgentClient {rcvQ, sndQ} = forever $ do
|
||||
Right _ -> return ()
|
||||
|
||||
withStore ::
|
||||
(MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m) =>
|
||||
AgentMonad m =>
|
||||
(forall m'. (MonadUnliftIO m', MonadError StoreError m') => SQLiteStore -> m' a) ->
|
||||
m a
|
||||
withStore action = do
|
||||
@@ -83,21 +99,7 @@ withStore action = do
|
||||
handleInternal :: (MonadError StoreError m') => SomeException -> m' a
|
||||
handleInternal _ = throwError SEInternal
|
||||
|
||||
liftSMP :: (MonadUnliftIO m, MonadError ErrorType m) => ExceptT SMPClientError IO a -> m a
|
||||
liftSMP action =
|
||||
liftIO (first smpClientError <$> runExceptT action) >>= liftEither
|
||||
where
|
||||
smpClientError :: SMPClientError -> ErrorType
|
||||
smpClientError = \case
|
||||
SMPServerError e -> SMP e
|
||||
_ -> INTERNAL -- TODO handle other errors
|
||||
|
||||
processCommand ::
|
||||
forall m.
|
||||
(MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m) =>
|
||||
AgentClient ->
|
||||
ATransmission 'Client ->
|
||||
m ()
|
||||
processCommand :: forall m. AgentMonad m => AgentClient -> ATransmission 'Client -> m ()
|
||||
processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
|
||||
case cmd of
|
||||
NEW smpServer -> createNewConnection smpServer
|
||||
@@ -110,21 +112,20 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
|
||||
createNewConnection server = do
|
||||
-- TODO create connection alias if not passed
|
||||
-- make connAlias Maybe?
|
||||
(rcvQueue, qInfo) <- newReceiveQueue server
|
||||
withStore $ \st -> createRcvConn st connAlias rcvQueue
|
||||
(rq, qInfo) <- newReceiveQueue c server
|
||||
withStore $ \st -> createRcvConn st connAlias rq
|
||||
respond $ INV qInfo
|
||||
|
||||
joinConnection :: SMPQueueInfo -> ReplyMode -> m ()
|
||||
joinConnection qInfo@(SMPQueueInfo srv _ _) replyMode = do
|
||||
-- TODO create connection alias if not passed
|
||||
-- make connAlias Maybe?
|
||||
(sndQueue, senderKey) <- newSendQueue qInfo
|
||||
withStore $ \st -> createSndConn st connAlias sndQueue
|
||||
sendConfirmation c sndQueue senderKey
|
||||
sendHello c sndQueue
|
||||
(sq, senderKey) <- newSendQueue qInfo
|
||||
withStore $ \st -> createSndConn st connAlias sq
|
||||
connectToSendQueue c sq senderKey
|
||||
case replyMode of
|
||||
ReplyOn -> sendReplyQInfo srv sndQueue
|
||||
ReplyVia srv' -> sendReplyQInfo srv' sndQueue
|
||||
ReplyOn -> sendReplyQInfo srv sq
|
||||
ReplyVia srv' -> sendReplyQInfo srv' sq
|
||||
ReplyOff -> return ()
|
||||
respond CON
|
||||
|
||||
@@ -137,7 +138,7 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
|
||||
_ -> throwError PROHIBITED -- NOT_READY ?
|
||||
where
|
||||
sendMsg sq = do
|
||||
sendAgentMessage sq $ A_MSG msgBody
|
||||
sendAgentMessage c sq $ A_MSG msgBody
|
||||
-- TODO respond $ SENT aMsgId
|
||||
respond OK
|
||||
|
||||
@@ -154,38 +155,9 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
|
||||
|
||||
sendReplyQInfo :: SMPServer -> SendQueue -> m ()
|
||||
sendReplyQInfo srv sq = do
|
||||
(rq, qInfo) <- newReceiveQueue srv
|
||||
(rq, qInfo) <- newReceiveQueue c srv
|
||||
withStore $ \st -> addRcvQueue st connAlias rq
|
||||
sendAgentMessage sq $ REPLY qInfo
|
||||
|
||||
newReceiveQueue :: SMPServer -> m (ReceiveQueue, SMPQueueInfo)
|
||||
newReceiveQueue server = do
|
||||
smp <- getSMPServerClient c server
|
||||
g <- asks idsDrg
|
||||
recipientKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair
|
||||
let rcvPrivateKey = recipientKey
|
||||
(rcvId, sId) <- liftSMP $ createSMPQueue smp rcvPrivateKey recipientKey
|
||||
encryptKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair
|
||||
let decryptKey = encryptKey
|
||||
rcvQueue =
|
||||
ReceiveQueue
|
||||
{ server,
|
||||
rcvId,
|
||||
rcvPrivateKey,
|
||||
sndId = Just sId,
|
||||
sndKey = Nothing,
|
||||
decryptKey,
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
return (rcvQueue, SMPQueueInfo server sId encryptKey)
|
||||
|
||||
sendAgentMessage :: SendQueue -> AMessage -> m ()
|
||||
sendAgentMessage SendQueue {server, sndId, sndPrivateKey, encryptKey} agentMsg = do
|
||||
smp <- getSMPServerClient c server
|
||||
msg <- mkAgentMessage encryptKey agentMsg
|
||||
liftSMP $ sendSMPMessage smp sndPrivateKey sndId msg
|
||||
sendAgentMessage c sq $ REPLY qInfo
|
||||
|
||||
respond :: ACommand 'Agent -> m ()
|
||||
respond resp = atomically $ writeTBQueue sndQ (corrId, connAlias, resp)
|
||||
@@ -198,87 +170,68 @@ subscriber c@AgentClient {msgQ} = forever $ do
|
||||
Left e -> liftIO $ print e
|
||||
Right _ -> return ()
|
||||
|
||||
processSMPTransmission ::
|
||||
forall m.
|
||||
(MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m) =>
|
||||
AgentClient ->
|
||||
SMPServerTransmission ->
|
||||
m ()
|
||||
processSMPTransmission :: forall m. AgentMonad m => AgentClient -> SMPServerTransmission -> m ()
|
||||
processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do
|
||||
case cmd of
|
||||
SMP.MSG _ srvTs msgBody -> do
|
||||
-- TODO deduplicate with previously received
|
||||
(connAlias, rcvQueue@ReceiveQueue {decryptKey, status}) <- withStore $ \st -> getReceiveQueue st srv rId
|
||||
(connAlias, rq@ReceiveQueue {decryptKey, status}) <- withStore $ \st -> getReceiveQueue st srv rId
|
||||
agentMsg <- liftEither . parseSMPMessage =<< decryptMessage decryptKey msgBody
|
||||
case agentMsg of
|
||||
SMPConfirmation senderKey -> do
|
||||
logServer "<--" c srv rId "MSG <KEY>"
|
||||
case status of
|
||||
New -> do
|
||||
-- TODO currently it automatically allows whoever sends the confirmation
|
||||
-- Commands CONF and LET are not implemented yet
|
||||
-- They are probably not needed in v0.2?
|
||||
-- TODO notification that connection confirmed?
|
||||
secureQueue rcvQueue senderKey
|
||||
sendAck c rcvQueue
|
||||
withStore $ \st -> updateRcvQueueStatus st rq Confirmed
|
||||
-- TODO update sender key in the store
|
||||
secureQueue c rq senderKey
|
||||
withStore $ \st -> updateRcvQueueStatus st rq Secured
|
||||
sendAck c rq
|
||||
s ->
|
||||
-- TODO maybe send notification to the user
|
||||
liftIO . putStrLn $ "unexpected SMP confirmation, queue status " <> show s
|
||||
SMPMessage {agentMessage, agentMsgId, agentTimestamp} ->
|
||||
case agentMessage of
|
||||
HELLO _verifyKey _ -> do
|
||||
logServer "<--" c srv rId "MSG <HELLO>"
|
||||
-- TODO send status update to the user?
|
||||
withStore $ \st -> updateRcvQueueStatus st rcvQueue Active
|
||||
sendAck c rcvQueue
|
||||
withStore $ \st -> updateRcvQueueStatus st rq Active
|
||||
sendAck c rq
|
||||
REPLY qInfo -> do
|
||||
(sndQueue, senderKey) <- newSendQueue qInfo
|
||||
withStore $ \st -> addSndQueue st connAlias sndQueue
|
||||
sendConfirmation c sndQueue senderKey
|
||||
sendHello c sndQueue
|
||||
atomically $ writeTBQueue sndQ ("", connAlias, CON)
|
||||
sendAck c rcvQueue
|
||||
logServer "<--" c srv rId "MSG <REPLY>"
|
||||
-- TODO move senderKey inside SendQueue
|
||||
(sq, senderKey) <- newSendQueue qInfo
|
||||
withStore $ \st -> addSndQueue st connAlias sq
|
||||
connectToSendQueue c sq senderKey
|
||||
notify connAlias CON
|
||||
sendAck c rq
|
||||
A_MSG body -> do
|
||||
logServer "<--" c srv rId "MSG <MSG>"
|
||||
-- TODO check message status
|
||||
let msg = MSG agentMsgId agentTimestamp srvTs MsgOk body
|
||||
atomically $ writeTBQueue sndQ ("", connAlias, msg)
|
||||
notify connAlias $ MSG agentMsgId agentTimestamp srvTs MsgOk body
|
||||
return ()
|
||||
SMP.END -> return ()
|
||||
_ -> liftIO $ do
|
||||
putStrLn "unexpected response"
|
||||
print cmd
|
||||
SMP.END -> do
|
||||
logServer "<--" c srv rId "END"
|
||||
return ()
|
||||
_ -> logServer "<--" c srv rId $ "unexpected:" <> (B.pack . show) cmd
|
||||
where
|
||||
secureQueue :: ReceiveQueue -> SMP.SenderKey -> m ()
|
||||
secureQueue rq@ReceiveQueue {rcvPrivateKey} senderKey = do
|
||||
withStore $ \st -> updateRcvQueueStatus st rq Confirmed
|
||||
-- TODO update sender key in the store
|
||||
smp <- getSMPServerClient c srv
|
||||
liftSMP $ secureSMPQueue smp rcvPrivateKey rId senderKey
|
||||
withStore $ \st -> updateRcvQueueStatus st rq Secured
|
||||
notify :: ConnAlias -> ACommand 'Agent -> m ()
|
||||
notify connAlias msg = atomically $ writeTBQueue sndQ ("", connAlias, msg)
|
||||
|
||||
connectToSendQueue :: AgentMonad m => AgentClient -> SendQueue -> SMP.SenderKey -> m ()
|
||||
connectToSendQueue c sq senderKey = do
|
||||
sendConfirmation c sq senderKey
|
||||
withStore $ \st -> updateSndQueueStatus st sq Confirmed
|
||||
sendHello c sq
|
||||
withStore $ \st -> updateSndQueueStatus st sq Active
|
||||
|
||||
decryptMessage :: MonadUnliftIO m => PrivateKey -> ByteString -> m ByteString
|
||||
decryptMessage _decryptKey = return
|
||||
|
||||
getSMPServerClient ::
|
||||
forall m.
|
||||
(MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m) =>
|
||||
AgentClient ->
|
||||
SMPServer ->
|
||||
m SMPClient
|
||||
getSMPServerClient AgentClient {smpClients, msgQ} srv =
|
||||
atomically (M.lookup srv <$> readTVar smpClients)
|
||||
>>= maybe newSMPClient return
|
||||
where
|
||||
newSMPClient :: m SMPClient
|
||||
newSMPClient = do
|
||||
cfg <- asks $ smpCfg . config
|
||||
c <- liftIO (getSMPClient srv cfg msgQ) `E.catch` throwErr (BROKER smpErrTCPConnection)
|
||||
atomically . modifyTVar smpClients $ M.insert srv c
|
||||
return c
|
||||
|
||||
throwErr :: ErrorType -> SomeException -> m a
|
||||
throwErr err e = do
|
||||
liftIO . putStrLn $ "Exception: " ++ show e -- TODO remove
|
||||
throwError err
|
||||
|
||||
newSendQueue ::
|
||||
(MonadUnliftIO m, MonadReader Env m) => SMPQueueInfo -> m (SendQueue, SMP.SenderKey)
|
||||
newSendQueue (SMPQueueInfo smpServer senderId encryptKey) = do
|
||||
@@ -299,69 +252,3 @@ newSendQueue (SMPQueueInfo smpServer senderId encryptKey) = do
|
||||
ackMode = AckMode On
|
||||
}
|
||||
return (sndQueue, senderKey)
|
||||
|
||||
sendConfirmation ::
|
||||
forall m.
|
||||
(MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m) =>
|
||||
AgentClient ->
|
||||
SendQueue ->
|
||||
SMP.SenderKey ->
|
||||
m ()
|
||||
sendConfirmation c sq@SendQueue {server, sndId} senderKey = do
|
||||
-- TODO send initial confirmation with signature - change in SMP server
|
||||
smp <- getSMPServerClient c server
|
||||
msg <- mkConfirmation
|
||||
liftSMP $ sendSMPMessage smp "" sndId msg
|
||||
withStore $ \st -> updateSndQueueStatus st sq Confirmed
|
||||
where
|
||||
mkConfirmation :: m SMP.MsgBody
|
||||
mkConfirmation = do
|
||||
let msg = serializeSMPMessage $ SMPConfirmation senderKey
|
||||
-- TODO encryption
|
||||
return msg
|
||||
|
||||
sendHello ::
|
||||
forall m.
|
||||
(MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m) =>
|
||||
AgentClient ->
|
||||
SendQueue ->
|
||||
m ()
|
||||
sendHello c sq@SendQueue {server, sndId, sndPrivateKey, encryptKey} = do
|
||||
smp <- getSMPServerClient c server
|
||||
msg <- mkHello "5678" $ AckMode On -- TODO verifyKey
|
||||
_send smp 20 msg
|
||||
withStore $ \st -> updateSndQueueStatus st sq Active
|
||||
where
|
||||
mkHello :: PublicKey -> AckMode -> m ByteString
|
||||
mkHello verifyKey ackMode =
|
||||
mkAgentMessage encryptKey $ HELLO verifyKey ackMode
|
||||
|
||||
_send :: SMPClient -> Int -> ByteString -> m ()
|
||||
_send _ 0 _ = throwError INTERNAL -- TODO different error
|
||||
_send smp retry msg = do
|
||||
liftSMP (sendSMPMessage smp sndPrivateKey sndId msg)
|
||||
`catchError` ( \case
|
||||
SMP SMP.AUTH -> do
|
||||
liftIO $ threadDelay 100000
|
||||
_send smp (retry - 1) msg
|
||||
_ -> throwError INTERNAL -- TODO wrap client error in some constructor
|
||||
)
|
||||
|
||||
sendAck :: (MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m) => AgentClient -> ReceiveQueue -> m ()
|
||||
sendAck c ReceiveQueue {server, rcvId, rcvPrivateKey} = do
|
||||
smp <- getSMPServerClient c server
|
||||
liftSMP $ ackSMPMessage smp rcvPrivateKey rcvId
|
||||
|
||||
mkAgentMessage :: (MonadUnliftIO m) => PrivateKey -> AMessage -> m ByteString
|
||||
mkAgentMessage _encKey agentMessage = do
|
||||
agentTimestamp <- liftIO getCurrentTime
|
||||
let msg =
|
||||
serializeSMPMessage
|
||||
SMPMessage
|
||||
{ agentMsgId = 0,
|
||||
agentTimestamp,
|
||||
previousMsgHash = "1234", -- TODO hash of the previous message
|
||||
agentMessage
|
||||
}
|
||||
-- TODO encryption
|
||||
return msg
|
||||
|
||||
208
src/Simplex/Messaging/Agent/Client.hs
Normal file
208
src/Simplex/Messaging/Agent/Client.hs
Normal file
@@ -0,0 +1,208 @@
|
||||
{-# LANGUAGE ConstraintKinds #-}
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Client
|
||||
( AgentClient (..),
|
||||
newAgentClient,
|
||||
AgentMonad,
|
||||
getSMPServerClient,
|
||||
newReceiveQueue,
|
||||
sendConfirmation,
|
||||
sendHello,
|
||||
secureQueue,
|
||||
sendAgentMessage,
|
||||
sendAck,
|
||||
logServer,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift
|
||||
import Control.Monad.Reader
|
||||
import Control.Monad.Trans.Except
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString.Base64
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Text.Encoding
|
||||
import Data.Time.Clock
|
||||
import Numeric.Natural
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Client
|
||||
import Simplex.Messaging.Server (randomBytes)
|
||||
import Simplex.Messaging.Server.Transmission (PrivateKey, PublicKey, SenderKey)
|
||||
import qualified Simplex.Messaging.Server.Transmission as SMP
|
||||
import UnliftIO.Concurrent
|
||||
import UnliftIO.Exception (SomeException)
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM
|
||||
|
||||
data AgentClient = AgentClient
|
||||
{ rcvQ :: TBQueue (ATransmission 'Client),
|
||||
sndQ :: TBQueue (ATransmission 'Agent),
|
||||
msgQ :: TBQueue SMPServerTransmission,
|
||||
smpClients :: TVar (Map SMPServer SMPClient),
|
||||
clientId :: Int
|
||||
}
|
||||
|
||||
newAgentClient :: TVar Int -> Natural -> STM AgentClient
|
||||
newAgentClient cc qSize = do
|
||||
rcvQ <- newTBQueue qSize
|
||||
sndQ <- newTBQueue qSize
|
||||
msgQ <- newTBQueue qSize
|
||||
smpClients <- newTVar M.empty
|
||||
clientId <- (+ 1) <$> readTVar cc
|
||||
writeTVar cc clientId
|
||||
return AgentClient {rcvQ, sndQ, msgQ, smpClients, clientId}
|
||||
|
||||
type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError ErrorType m)
|
||||
|
||||
getSMPServerClient :: forall m. AgentMonad m => AgentClient -> SMPServer -> m SMPClient
|
||||
getSMPServerClient AgentClient {smpClients, msgQ} srv =
|
||||
atomically (M.lookup srv <$> readTVar smpClients)
|
||||
>>= maybe newSMPClient return
|
||||
where
|
||||
newSMPClient :: m SMPClient
|
||||
newSMPClient = do
|
||||
cfg <- asks $ smpCfg . config
|
||||
c <- liftIO (getSMPClient srv cfg msgQ) `E.catch` throwErr (BROKER smpErrTCPConnection)
|
||||
atomically . modifyTVar smpClients $ M.insert srv c
|
||||
return c
|
||||
|
||||
throwErr :: ErrorType -> SomeException -> m a
|
||||
throwErr err e = do
|
||||
liftIO . putStrLn $ "Exception: " ++ show e -- TODO remove
|
||||
throwError err
|
||||
|
||||
withSMP :: forall a m. AgentMonad m => AgentClient -> SMPServer -> (SMPClient -> ExceptT SMPClientError IO a) -> m a
|
||||
withSMP c srv action =
|
||||
(getSMPServerClient c srv >>= runAction) `catchError` logServerError
|
||||
where
|
||||
runAction :: SMPClient -> m a
|
||||
runAction smp =
|
||||
liftIO (first smpClientError <$> runExceptT (action smp))
|
||||
>>= liftEither
|
||||
|
||||
smpClientError :: SMPClientError -> ErrorType
|
||||
smpClientError = \case
|
||||
SMPServerError e -> SMP e
|
||||
-- TODO handle other errors
|
||||
_ -> INTERNAL
|
||||
|
||||
logServerError :: ErrorType -> m a
|
||||
logServerError e = do
|
||||
logServer "<--" c srv "" $ (B.pack . show) e
|
||||
throwError e
|
||||
|
||||
withLogSMP :: AgentMonad m => AgentClient -> SMPServer -> SMP.QueueId -> ByteString -> (SMPClient -> ExceptT SMPClientError IO a) -> m a
|
||||
withLogSMP c srv qId cmdStr action = do
|
||||
logServer "-->" c srv qId cmdStr
|
||||
res <- withSMP c srv action
|
||||
logServer "<--" c srv qId "OK"
|
||||
return res
|
||||
|
||||
newReceiveQueue :: AgentMonad m => AgentClient -> SMPServer -> m (ReceiveQueue, SMPQueueInfo)
|
||||
newReceiveQueue c srv = do
|
||||
g <- asks idsDrg
|
||||
recipientKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair
|
||||
let rcvPrivateKey = recipientKey
|
||||
logServer "-->" c srv "" "NEW"
|
||||
(rcvId, sId) <- withSMP c srv $ \smp -> createSMPQueue smp rcvPrivateKey recipientKey
|
||||
logServer "<--" c srv "" $ B.unwords ["IDS", logSecret rcvId, logSecret sId]
|
||||
encryptKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair
|
||||
let decryptKey = encryptKey
|
||||
rcvQueue =
|
||||
ReceiveQueue
|
||||
{ server = srv,
|
||||
rcvId,
|
||||
rcvPrivateKey,
|
||||
sndId = Just sId,
|
||||
sndKey = Nothing,
|
||||
decryptKey,
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
return (rcvQueue, SMPQueueInfo srv sId encryptKey)
|
||||
|
||||
logServer :: AgentMonad m => ByteString -> AgentClient -> SMPServer -> SMP.QueueId -> ByteString -> m ()
|
||||
logServer dir AgentClient {clientId} SMPServer {host, port} qId cmdStr =
|
||||
logInfo . decodeUtf8 $ B.unwords ["A", "(" <> (B.pack . show) clientId <> ")", dir, server, ":", logSecret qId, cmdStr]
|
||||
where
|
||||
server = B.pack $ host <> maybe "" (":" <>) port
|
||||
|
||||
logSecret :: ByteString -> ByteString
|
||||
logSecret bs = encode $ B.take 3 bs
|
||||
|
||||
sendConfirmation :: forall m. AgentMonad m => AgentClient -> SendQueue -> SenderKey -> m ()
|
||||
sendConfirmation c SendQueue {server, sndId} senderKey = do
|
||||
-- TODO send initial confirmation with signature - change in SMP server
|
||||
msg <- mkConfirmation
|
||||
withLogSMP c server sndId "SEND <KEY>" $ \smp ->
|
||||
sendSMPMessage smp "" sndId msg
|
||||
where
|
||||
mkConfirmation :: m SMP.MsgBody
|
||||
mkConfirmation = do
|
||||
let msg = serializeSMPMessage $ SMPConfirmation senderKey
|
||||
-- TODO encryption
|
||||
return msg
|
||||
|
||||
sendHello :: forall m. AgentMonad m => AgentClient -> SendQueue -> m ()
|
||||
sendHello c SendQueue {server, sndId, sndPrivateKey, encryptKey} = do
|
||||
msg <- mkHello "5678" $ AckMode On -- TODO verifyKey
|
||||
withLogSMP c server sndId "SEND <HELLO> (retrying)" $
|
||||
send 20 msg
|
||||
where
|
||||
mkHello :: PublicKey -> AckMode -> m ByteString
|
||||
mkHello verifyKey ackMode =
|
||||
mkAgentMessage encryptKey $ HELLO verifyKey ackMode
|
||||
|
||||
send :: Int -> ByteString -> SMPClient -> ExceptT SMPClientError IO ()
|
||||
send 0 _ _ = throwE SMPResponseTimeout -- TODO different error
|
||||
send retry msg smp =
|
||||
sendSMPMessage smp sndPrivateKey sndId msg `catchE` \case
|
||||
SMPServerError SMP.AUTH -> do
|
||||
threadDelay 100000
|
||||
send (retry - 1) msg smp
|
||||
e -> throwE e
|
||||
|
||||
secureQueue :: AgentMonad m => AgentClient -> ReceiveQueue -> SenderKey -> m ()
|
||||
secureQueue c ReceiveQueue {server, rcvId, rcvPrivateKey} senderKey =
|
||||
withLogSMP c server rcvId "KEY <key>" $ \smp ->
|
||||
secureSMPQueue smp rcvPrivateKey rcvId senderKey
|
||||
|
||||
sendAck :: AgentMonad m => AgentClient -> ReceiveQueue -> m ()
|
||||
sendAck c ReceiveQueue {server, rcvId, rcvPrivateKey} =
|
||||
withLogSMP c server rcvId "ACK" $ \smp ->
|
||||
ackSMPMessage smp rcvPrivateKey rcvId
|
||||
|
||||
sendAgentMessage :: AgentMonad m => AgentClient -> SendQueue -> AMessage -> m ()
|
||||
sendAgentMessage c SendQueue {server, sndId, sndPrivateKey, encryptKey} agentMsg = do
|
||||
msg <- mkAgentMessage encryptKey agentMsg
|
||||
withLogSMP c server sndId "SEND <message>" $ \smp ->
|
||||
sendSMPMessage smp sndPrivateKey sndId msg
|
||||
|
||||
mkAgentMessage :: MonadUnliftIO m => PrivateKey -> AMessage -> m ByteString
|
||||
mkAgentMessage _encKey agentMessage = do
|
||||
agentTimestamp <- liftIO getCurrentTime
|
||||
let msg =
|
||||
serializeSMPMessage
|
||||
SMPMessage
|
||||
{ agentMsgId = 0,
|
||||
agentTimestamp,
|
||||
previousMsgHash = "1234", -- TODO hash of the previous message
|
||||
agentMessage
|
||||
}
|
||||
-- TODO encryption
|
||||
return msg
|
||||
@@ -7,12 +7,9 @@ module Simplex.Messaging.Agent.Env.SQLite where
|
||||
|
||||
import Control.Monad.IO.Unlift
|
||||
import Crypto.Random
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Network.Socket
|
||||
import Numeric.Natural
|
||||
import Simplex.Messaging.Agent.Store.SQLite
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Client
|
||||
import UnliftIO.STM
|
||||
|
||||
@@ -27,26 +24,13 @@ data AgentConfig = AgentConfig
|
||||
data Env = Env
|
||||
{ config :: AgentConfig,
|
||||
idsDrg :: TVar ChaChaDRG,
|
||||
db :: SQLiteStore
|
||||
db :: SQLiteStore,
|
||||
clientCounter :: TVar Int
|
||||
}
|
||||
|
||||
data AgentClient = AgentClient
|
||||
{ rcvQ :: TBQueue (ATransmission Client),
|
||||
sndQ :: TBQueue (ATransmission Agent),
|
||||
msgQ :: TBQueue SMPServerTransmission,
|
||||
smpClients :: TVar (Map SMPServer SMPClient)
|
||||
}
|
||||
|
||||
newAgentClient :: Natural -> STM AgentClient
|
||||
newAgentClient qSize = do
|
||||
rcvQ <- newTBQueue qSize
|
||||
sndQ <- newTBQueue qSize
|
||||
msgQ <- newTBQueue qSize
|
||||
smpClients <- newTVar M.empty
|
||||
return AgentClient {rcvQ, sndQ, msgQ, smpClients}
|
||||
|
||||
newEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env
|
||||
newEnv config = do
|
||||
idsDrg <- drgNew >>= newTVarIO
|
||||
db <- newSQLiteStore $ dbFile config
|
||||
return Env {config, idsDrg, db}
|
||||
clientCounter <- newTVarIO 0
|
||||
return Env {config, idsDrg, db, clientCounter}
|
||||
|
||||
@@ -37,6 +37,7 @@ packages:
|
||||
extra-deps:
|
||||
- sqlite-simple-0.4.18.0@sha256:3ceea56375c0a3590c814e411a4eb86943f8d31b93b110ca159c90689b6b39e5,3002
|
||||
- direct-sqlite-2.3.26@sha256:04e835402f1508abca383182023e4e2b9b86297b8533afbd4e57d1a5652e0c23,3718
|
||||
- simple-logger-0.1.0@sha256:be8ede4bd251a9cac776533bae7fb643369ebd826eb948a9a18df1a8dd252ff8,1079
|
||||
# - network-run-0.2.4@sha256:7dbb06def522dab413bce4a46af476820bffdff2071974736b06f52f4ab57c96,885
|
||||
# - git: https://github.com/commercialhaskell/stack.git
|
||||
# commit: e7b331f14bcffb8367cd58fbfc8b40ec7642100a
|
||||
|
||||
Reference in New Issue
Block a user