server: make main SMP server queues unbounded (#802)

This commit is contained in:
Evgeny Poberezkin
2023-07-14 21:07:45 +01:00
committed by GitHub
parent 3fee468051
commit 1901e96ecc
4 changed files with 14 additions and 14 deletions

View File

@@ -122,7 +122,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
serverThread ::
forall s.
Server ->
(Server -> TBQueue (QueueId, Client)) ->
(Server -> TQueue (QueueId, Client)) ->
(Server -> TMap QueueId Client) ->
(Client -> TMap QueueId s) ->
(s -> IO ()) ->
@@ -134,7 +134,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
where
updateSubscribers :: STM (Maybe (QueueId, Client))
updateSubscribers = do
(qId, clnt) <- readTBQueue $ subQ s
(qId, clnt) <- readTQueue $ subQ s
let clientToBeNotified = \c' ->
if sameClientSession clnt c'
then pure Nothing
@@ -477,7 +477,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
where
newSub :: m (TVar Sub)
newSub = time "SUB newSub" . atomically $ do
writeTBQueue subscribedQ (rId, clnt)
writeTQueue subscribedQ (rId, clnt)
sub <- newTVar =<< newSubscription NoSub
TM.insert rId sub subscriptions
pure sub
@@ -522,7 +522,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
subscribeNotifications :: m (Transmission BrokerMsg)
subscribeNotifications = time "NSUB" . atomically $ do
unlessM (TM.member queueId ntfSubscriptions) $ do
writeTBQueue ntfSubscribedQ (queueId, clnt)
writeTQueue ntfSubscribedQ (queueId, clnt)
TM.insert queueId () ntfSubscriptions
pure ok

View File

@@ -39,7 +39,7 @@ import UnliftIO.STM
data ServerConfig = ServerConfig
{ transports :: [(ServiceName, ATransport)],
tbqSize :: Natural,
serverTbqSize :: Natural,
-- serverTbqSize :: Natural,
msgQueueQuota :: Int,
queueIdBytes :: Int,
msgIdBytes :: Int,
@@ -103,9 +103,9 @@ data Env = Env
}
data Server = Server
{ subscribedQ :: TBQueue (RecipientId, Client),
{ subscribedQ :: TQueue (RecipientId, Client),
subscribers :: TMap RecipientId Client,
ntfSubscribedQ :: TBQueue (NotifierId, Client),
ntfSubscribedQ :: TQueue (NotifierId, Client),
notifiers :: TMap NotifierId Client
}
@@ -127,11 +127,11 @@ data Sub = Sub
delivered :: TMVar MsgId
}
newServer :: Natural -> STM Server
newServer qSize = do
subscribedQ <- newTBQueue qSize
newServer :: STM Server
newServer = do
subscribedQ <- newTQueue
subscribers <- TM.empty
ntfSubscribedQ <- newTBQueue qSize
ntfSubscribedQ <- newTQueue
notifiers <- TM.empty
return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers}
@@ -152,7 +152,7 @@ newSubscription subThread = do
newEnv :: forall m. (MonadUnliftIO m, MonadRandom m) => ServerConfig -> m Env
newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, storeLogFile} = do
server <- atomically $ newServer (serverTbqSize config)
server <- atomically newServer
queueStore <- atomically newQueueStore
msgStore <- atomically newMsgStore
idsDrg <- drgNew >>= newTVarIO

View File

@@ -166,7 +166,7 @@ smpServerCLI cfgPath logPath =
ServerConfig
{ transports = iniTransports ini,
tbqSize = 64,
serverTbqSize = 1024,
-- serverTbqSize = 1024,
msgQueueQuota = 128,
queueIdBytes = 24,
msgIdBytes = 24, -- must be at least 24 bytes, it is used as 192-bit nonce for XSalsa20