mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-01 03:05:57 +00:00
* agent schema/methods/types/store methods for notifications tokens * register notification token on the server * agent commands for notification tokens * refactor initial servers from AgentConfig * agent store functions for notification tokens * server STM store methods for tokens * fix protocol client for ntfs (use generic handshake), minimal server and agent tests * server command to verify ntf token
82 lines
3.6 KiB
Haskell
82 lines
3.6 KiB
Haskell
{-# LANGUAGE FlexibleContexts #-}
|
|
{-# LANGUAGE NamedFieldPuns #-}
|
|
{-# LANGUAGE OverloadedStrings #-}
|
|
{-# LANGUAGE ScopedTypeVariables #-}
|
|
|
|
module Simplex.Messaging.Agent.Server
|
|
( -- * SMP agent over TCP
|
|
runSMPAgent,
|
|
runSMPAgentBlocking,
|
|
)
|
|
where
|
|
|
|
import Control.Logger.Simple (logInfo)
|
|
import Control.Monad.Except
|
|
import Control.Monad.IO.Unlift (MonadUnliftIO)
|
|
import Control.Monad.Reader
|
|
import Crypto.Random (MonadRandom)
|
|
import Data.ByteString.Char8 (ByteString)
|
|
import qualified Data.ByteString.Char8 as B
|
|
import Data.Text.Encoding (decodeUtf8)
|
|
import Simplex.Messaging.Agent
|
|
import Simplex.Messaging.Agent.Env.SQLite
|
|
import Simplex.Messaging.Agent.Protocol
|
|
import Simplex.Messaging.Transport (ATransport (..), TProxy, Transport (..), simplexMQVersion)
|
|
import Simplex.Messaging.Transport.Server (loadTLSServerParams, runTransportServer)
|
|
import Simplex.Messaging.Util (bshow)
|
|
import UnliftIO.Async (race_)
|
|
import qualified UnliftIO.Exception as E
|
|
import UnliftIO.STM
|
|
|
|
-- | Runs an SMP agent as a TCP service using passed configuration.
|
|
--
|
|
-- See a full agent executable here: https://github.com/simplex-chat/simplexmq/blob/master/apps/smp-agent/Main.hs
|
|
runSMPAgent :: (MonadRandom m, MonadUnliftIO m) => ATransport -> AgentConfig -> InitialAgentServers -> m ()
|
|
runSMPAgent t cfg initServers = do
|
|
started <- newEmptyTMVarIO
|
|
runSMPAgentBlocking t started cfg initServers
|
|
|
|
-- | Runs an SMP agent as a TCP service using passed configuration with signalling.
|
|
--
|
|
-- This function uses passed TMVar to signal when the server is ready to accept TCP requests (True)
|
|
-- and when it is disconnected from the TCP socket once the server thread is killed (False).
|
|
runSMPAgentBlocking :: (MonadRandom m, MonadUnliftIO m) => ATransport -> TMVar Bool -> AgentConfig -> InitialAgentServers -> m ()
|
|
runSMPAgentBlocking (ATransport t) started cfg@AgentConfig {tcpPort, caCertificateFile, certificateFile, privateKeyFile} initServers = do
|
|
runReaderT (smpAgent t) =<< newSMPAgentEnv cfg
|
|
where
|
|
smpAgent :: forall c m'. (Transport c, MonadUnliftIO m', MonadReader Env m') => TProxy c -> m' ()
|
|
smpAgent _ = do
|
|
-- tlsServerParams is not in Env to avoid breaking functional API w/t key and certificate generation
|
|
tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile
|
|
runTransportServer started tcpPort tlsServerParams $ \(h :: c) -> do
|
|
liftIO . putLn h $ "Welcome to SMP agent v" <> B.pack simplexMQVersion
|
|
c <- getAgentClient initServers
|
|
logConnection c True
|
|
race_ (connectClient h c) (runAgentClient c)
|
|
`E.finally` disconnectAgentClient c
|
|
|
|
connectClient :: Transport c => MonadUnliftIO m => c -> AgentClient -> m ()
|
|
connectClient h c = race_ (send h c) (receive h c)
|
|
|
|
receive :: forall c m. (Transport c, MonadUnliftIO m) => c -> AgentClient -> m ()
|
|
receive h c@AgentClient {rcvQ, subQ} = forever $ do
|
|
(corrId, connId, cmdOrErr) <- tGet SClient h
|
|
case cmdOrErr of
|
|
Right cmd -> write rcvQ (corrId, connId, cmd)
|
|
Left e -> write subQ (corrId, connId, ERR e)
|
|
where
|
|
write :: TBQueue (ATransmission p) -> ATransmission p -> m ()
|
|
write q t = do
|
|
logClient c "-->" t
|
|
atomically $ writeTBQueue q t
|
|
|
|
send :: (Transport c, MonadUnliftIO m) => c -> AgentClient -> m ()
|
|
send h c@AgentClient {subQ} = forever $ do
|
|
t <- atomically $ readTBQueue subQ
|
|
tPut h t
|
|
logClient c "<--" t
|
|
|
|
logClient :: MonadUnliftIO m => AgentClient -> ByteString -> ATransmission a -> m ()
|
|
logClient AgentClient {clientId} dir (corrId, connId, cmd) = do
|
|
logInfo . decodeUtf8 $ B.unwords [bshow clientId, dir, "A :", corrId, connId, B.takeWhile (/= ' ') $ serializeCommand cmd]
|