mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-07 15:22:03 +00:00
tests: block on tcp server creation (#99)
Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
@@ -10,6 +10,7 @@
|
||||
|
||||
module Simplex.Messaging.Agent
|
||||
( runSMPAgent,
|
||||
runSMPAgentBlocking,
|
||||
getSMPAgentClient,
|
||||
runSMPAgentClient,
|
||||
)
|
||||
@@ -43,10 +44,13 @@ import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM
|
||||
|
||||
runSMPAgent :: (MonadRandom m, MonadUnliftIO m) => AgentConfig -> m ()
|
||||
runSMPAgent cfg@AgentConfig {tcpPort} = runReaderT smpAgent =<< newSMPAgentEnv cfg
|
||||
runSMPAgent cfg = newEmptyTMVarIO >>= (`runSMPAgentBlocking` cfg)
|
||||
|
||||
runSMPAgentBlocking :: (MonadRandom m, MonadUnliftIO m) => TMVar Bool -> AgentConfig -> m ()
|
||||
runSMPAgentBlocking started cfg@AgentConfig {tcpPort} = runReaderT smpAgent =<< newSMPAgentEnv cfg
|
||||
where
|
||||
smpAgent :: (MonadUnliftIO m', MonadReader Env m') => m' ()
|
||||
smpAgent = runTCPServer tcpPort $ \h -> do
|
||||
smpAgent = runTCPServer started tcpPort $ \h -> do
|
||||
liftIO $ putLn h "Welcome to SMP v0.2.0 agent"
|
||||
c <- getSMPAgentClient
|
||||
logConnection c True
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
-- TODO move randomBytes to another module
|
||||
module Simplex.Messaging.Server (runSMPServer, randomBytes) where
|
||||
module Simplex.Messaging.Server (runSMPServer, runSMPServerBlocking, randomBytes) where
|
||||
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Monad
|
||||
@@ -39,14 +39,17 @@ import UnliftIO.IO
|
||||
import UnliftIO.STM
|
||||
|
||||
runSMPServer :: (MonadRandom m, MonadUnliftIO m) => ServerConfig -> m ()
|
||||
runSMPServer cfg@ServerConfig {tcpPort} = do
|
||||
runSMPServer cfg = newEmptyTMVarIO >>= (`runSMPServerBlocking` cfg)
|
||||
|
||||
runSMPServerBlocking :: (MonadRandom m, MonadUnliftIO m) => TMVar Bool -> ServerConfig -> m ()
|
||||
runSMPServerBlocking started cfg@ServerConfig {tcpPort} = do
|
||||
env <- newEnv cfg
|
||||
runReaderT smpServer env
|
||||
where
|
||||
smpServer :: (MonadUnliftIO m, MonadReader Env m) => m ()
|
||||
smpServer = do
|
||||
s <- asks server
|
||||
race_ (runTCPServer tcpPort runClient) (serverThread s)
|
||||
race_ (runTCPServer started tcpPort runClient) (serverThread s)
|
||||
|
||||
serverThread :: MonadUnliftIO m => Server -> m ()
|
||||
serverThread Server {subscribedQ, subscribers} = forever . atomically $ do
|
||||
|
||||
@@ -24,10 +24,10 @@ import qualified UnliftIO.Exception as E
|
||||
import qualified UnliftIO.IO as IO
|
||||
import UnliftIO.STM
|
||||
|
||||
runTCPServer :: MonadUnliftIO m => ServiceName -> (Handle -> m ()) -> m ()
|
||||
runTCPServer port server = do
|
||||
runTCPServer :: MonadUnliftIO m => TMVar Bool -> ServiceName -> (Handle -> m ()) -> m ()
|
||||
runTCPServer started port server = do
|
||||
clients <- newTVarIO S.empty
|
||||
E.bracket (liftIO $ startTCPServer port) (liftIO . closeServer clients) $ \sock -> forever $ do
|
||||
E.bracket (liftIO $ startTCPServer started port) (liftIO . closeServer clients) \sock -> forever $ do
|
||||
h <- liftIO $ acceptTCPConn sock
|
||||
tid <- forkFinally (server h) (const $ IO.hClose h)
|
||||
atomically . modifyTVar clients $ S.insert tid
|
||||
@@ -35,8 +35,8 @@ runTCPServer port server = do
|
||||
closeServer :: TVar (Set ThreadId) -> Socket -> IO ()
|
||||
closeServer clients sock = readTVarIO clients >>= mapM_ killThread >> close sock
|
||||
|
||||
startTCPServer :: ServiceName -> IO Socket
|
||||
startTCPServer port = withSocketsDo $ resolve >>= open
|
||||
startTCPServer :: TMVar Bool -> ServiceName -> IO Socket
|
||||
startTCPServer started port = withSocketsDo $ resolve >>= open >>= setStarted
|
||||
where
|
||||
resolve =
|
||||
let hints = defaultHints {addrFlags = [AI_PASSIVE], addrSocketType = Stream}
|
||||
@@ -48,6 +48,7 @@ startTCPServer port = withSocketsDo $ resolve >>= open
|
||||
bind sock $ addrAddress addr
|
||||
listen sock 1024
|
||||
return sock
|
||||
setStarted sock = atomically (putTMVar started True) >> pure sock
|
||||
|
||||
acceptTCPConn :: Socket -> IO Handle
|
||||
acceptTCPConn sock = accept sock >>= getSocketHandle . fst
|
||||
|
||||
Reference in New Issue
Block a user