mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-15 03:05:08 +00:00
make sizes of IDs a server configuration
This commit is contained in:
+11
-5
@@ -15,9 +15,15 @@ import Numeric.Natural
|
||||
import Transmission
|
||||
import UnliftIO.STM
|
||||
|
||||
data Env = Env
|
||||
data Config = Config
|
||||
{ tcpPort :: ServiceName,
|
||||
queueSize :: Natural,
|
||||
connIdBytes :: Int,
|
||||
msgIdBytes :: Int
|
||||
}
|
||||
|
||||
data Env = Env
|
||||
{ config :: Config,
|
||||
server :: Server,
|
||||
connStore :: STMConnStore,
|
||||
msgStore :: STMMsgStore,
|
||||
@@ -48,10 +54,10 @@ newClient qSize = do
|
||||
sndQ <- newTBQueue qSize
|
||||
return Client {connections, rcvQ, sndQ}
|
||||
|
||||
newEnv :: (MonadUnliftIO m, MonadRandom m) => String -> Natural -> m Env
|
||||
newEnv tcpPort queueSize = do
|
||||
server <- atomically $ newServer queueSize
|
||||
newEnv :: (MonadUnliftIO m, MonadRandom m) => Config -> m Env
|
||||
newEnv config = do
|
||||
server <- atomically $ newServer (queueSize config)
|
||||
connStore <- atomically newConnStore
|
||||
msgStore <- atomically newMsgStore
|
||||
idsDrg <- drgNew >>= newTVarIO
|
||||
return Env {tcpPort, queueSize, server, connStore, msgStore, idsDrg}
|
||||
return Env {config, server, connStore, msgStore, idsDrg}
|
||||
|
||||
+11
-9
@@ -1,16 +1,18 @@
|
||||
module Main where
|
||||
|
||||
import Network.Socket (ServiceName)
|
||||
import Numeric.Natural
|
||||
import Env.STM
|
||||
import Server (runSMPServer)
|
||||
|
||||
port :: ServiceName
|
||||
port = "5223"
|
||||
|
||||
queuePerClient :: Natural
|
||||
queuePerClient = 16
|
||||
cfg :: Config
|
||||
cfg =
|
||||
Config
|
||||
{ tcpPort = "5223",
|
||||
queueSize = 16,
|
||||
connIdBytes = 12,
|
||||
msgIdBytes = 6
|
||||
}
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
putStrLn $ "Listening on port " ++ port
|
||||
runSMPServer port queuePerClient
|
||||
putStrLn $ "Listening on port " ++ tcpPort cfg
|
||||
runSMPServer cfg
|
||||
|
||||
+15
-11
@@ -25,8 +25,6 @@ import Data.Time.Clock
|
||||
import Env.STM
|
||||
import MsgStore
|
||||
import MsgStore.STM (MsgQueue)
|
||||
import Network.Socket
|
||||
import Numeric.Natural
|
||||
import Transmission
|
||||
import Transport
|
||||
import UnliftIO.Async
|
||||
@@ -34,15 +32,15 @@ import UnliftIO.Concurrent
|
||||
import UnliftIO.IO
|
||||
import UnliftIO.STM
|
||||
|
||||
runSMPServer :: (MonadRandom m, MonadUnliftIO m) => ServiceName -> Natural -> m ()
|
||||
runSMPServer port queueSize = do
|
||||
env <- newEnv port queueSize
|
||||
runSMPServer :: (MonadRandom m, MonadUnliftIO m) => Config -> m ()
|
||||
runSMPServer cfg@Config {tcpPort} = do
|
||||
env <- newEnv cfg
|
||||
runReaderT smpServer env
|
||||
where
|
||||
smpServer :: (MonadUnliftIO m, MonadReader Env m) => m ()
|
||||
smpServer = do
|
||||
s <- asks server
|
||||
race_ (runTCPServer port runClient) (serverThread s)
|
||||
race_ (runTCPServer tcpPort runClient) (serverThread s)
|
||||
|
||||
serverThread :: MonadUnliftIO m => Server -> m ()
|
||||
serverThread Server {subscribedQ, connections} = forever . atomically $ do
|
||||
@@ -56,7 +54,7 @@ runSMPServer port queueSize = do
|
||||
runClient :: (MonadUnliftIO m, MonadReader Env m) => Handle -> m ()
|
||||
runClient h = do
|
||||
putLn h "Welcome to SMP"
|
||||
q <- asks queueSize
|
||||
q <- asks $ queueSize . config
|
||||
c <- atomically $ newClient q
|
||||
s <- asks server
|
||||
raceAny_ [send h c, client c s, receive h c]
|
||||
@@ -142,7 +140,9 @@ client clnt@Client {connections, rcvQ, sndQ} Server {subscribedQ} =
|
||||
void $ subscribeConn rId
|
||||
return $ IDS rId sId
|
||||
Left e -> return $ ERR e
|
||||
getIds = liftM2 (,) (randomId 16) (randomId 16)
|
||||
getIds = do
|
||||
n <- asks $ connIdBytes . config
|
||||
liftM2 (,) (randomId n) (randomId n)
|
||||
|
||||
subscribeConn :: RecipientId -> m Signed
|
||||
subscribeConn rId = do
|
||||
@@ -166,14 +166,18 @@ client clnt@Client {connections, rcvQ, sndQ} Server {subscribedQ} =
|
||||
getConn st SSender sId
|
||||
>>= fmap (mkSigned sId) . either (return . ERR) storeMessage
|
||||
where
|
||||
mkMessage :: m Message
|
||||
mkMessage = do
|
||||
msgId <- asks (msgIdBytes . config) >>= randomId
|
||||
ts <- liftIO getCurrentTime
|
||||
return $ Message {msgId, ts, msgBody}
|
||||
|
||||
storeMessage :: Connection -> m (Command 'Broker)
|
||||
storeMessage c = case status c of
|
||||
ConnActive -> do
|
||||
ms <- asks msgStore
|
||||
q <- getMsgQueue ms (recipientId c)
|
||||
msgId <- randomId 8
|
||||
ts <- liftIO getCurrentTime
|
||||
writeMsg q $ Message {msgId, ts, msgBody}
|
||||
mkMessage >>= writeMsg q
|
||||
return OK
|
||||
ConnOff -> return $ ERR AUTH
|
||||
|
||||
|
||||
+10
-4
@@ -5,8 +5,8 @@ module SMPClient where
|
||||
|
||||
import Control.Monad.IO.Unlift
|
||||
import Crypto.Random
|
||||
import Env.STM
|
||||
import Network.Socket
|
||||
import Numeric.Natural
|
||||
import Server
|
||||
import Transmission
|
||||
import Transport
|
||||
@@ -29,13 +29,19 @@ testPort = "5000"
|
||||
testHost :: HostName
|
||||
testHost = "localhost"
|
||||
|
||||
queueSize :: Natural
|
||||
queueSize = 2
|
||||
cfg :: Config
|
||||
cfg =
|
||||
Config
|
||||
{ tcpPort = testPort,
|
||||
queueSize = 1,
|
||||
connIdBytes = 12,
|
||||
msgIdBytes = 6
|
||||
}
|
||||
|
||||
runSmpTest :: (MonadUnliftIO m, MonadRandom m) => (Handle -> m a) -> m a
|
||||
runSmpTest test =
|
||||
E.bracket
|
||||
(forkIO $ runSMPServer testPort queueSize)
|
||||
(forkIO $ runSMPServer cfg)
|
||||
(liftIO . killThread)
|
||||
\_ -> testSMPClient "localhost" testPort test
|
||||
|
||||
|
||||
Reference in New Issue
Block a user