diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index dcd02ca5e..4a8eb1467 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -122,7 +122,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do serverThread :: forall s. Server -> - (Server -> TBQueue (QueueId, Client)) -> + (Server -> TQueue (QueueId, Client)) -> (Server -> TMap QueueId Client) -> (Client -> TMap QueueId s) -> (s -> IO ()) -> @@ -134,7 +134,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do where updateSubscribers :: STM (Maybe (QueueId, Client)) updateSubscribers = do - (qId, clnt) <- readTBQueue $ subQ s + (qId, clnt) <- readTQueue $ subQ s let clientToBeNotified = \c' -> if sameClientSession clnt c' then pure Nothing @@ -477,7 +477,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv where newSub :: m (TVar Sub) newSub = time "SUB newSub" . atomically $ do - writeTBQueue subscribedQ (rId, clnt) + writeTQueue subscribedQ (rId, clnt) sub <- newTVar =<< newSubscription NoSub TM.insert rId sub subscriptions pure sub @@ -522,7 +522,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv subscribeNotifications :: m (Transmission BrokerMsg) subscribeNotifications = time "NSUB" . atomically $ do unlessM (TM.member queueId ntfSubscriptions) $ do - writeTBQueue ntfSubscribedQ (queueId, clnt) + writeTQueue ntfSubscribedQ (queueId, clnt) TM.insert queueId () ntfSubscriptions pure ok diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 609438848..812bb462b 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -39,7 +39,7 @@ import UnliftIO.STM data ServerConfig = ServerConfig { transports :: [(ServiceName, ATransport)], tbqSize :: Natural, - serverTbqSize :: Natural, + -- serverTbqSize :: Natural, msgQueueQuota :: Int, queueIdBytes :: Int, msgIdBytes :: Int, @@ -103,9 +103,9 @@ data Env = Env } data Server = Server - { subscribedQ :: TBQueue (RecipientId, Client), + { subscribedQ :: TQueue (RecipientId, Client), subscribers :: TMap RecipientId Client, - ntfSubscribedQ :: TBQueue (NotifierId, Client), + ntfSubscribedQ :: TQueue (NotifierId, Client), notifiers :: TMap NotifierId Client } @@ -127,11 +127,11 @@ data Sub = Sub delivered :: TMVar MsgId } -newServer :: Natural -> STM Server -newServer qSize = do - subscribedQ <- newTBQueue qSize +newServer :: STM Server +newServer = do + subscribedQ <- newTQueue subscribers <- TM.empty - ntfSubscribedQ <- newTBQueue qSize + ntfSubscribedQ <- newTQueue notifiers <- TM.empty return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers} @@ -152,7 +152,7 @@ newSubscription subThread = do newEnv :: forall m. (MonadUnliftIO m, MonadRandom m) => ServerConfig -> m Env newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, storeLogFile} = do - server <- atomically $ newServer (serverTbqSize config) + server <- atomically newServer queueStore <- atomically newQueueStore msgStore <- atomically newMsgStore idsDrg <- drgNew >>= newTVarIO diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index df15cebda..b1d92991b 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -166,7 +166,7 @@ smpServerCLI cfgPath logPath = ServerConfig { transports = iniTransports ini, tbqSize = 64, - serverTbqSize = 1024, + -- serverTbqSize = 1024, msgQueueQuota = 128, queueIdBytes = 24, msgIdBytes = 24, -- must be at least 24 bytes, it is used as 192-bit nonce for XSalsa20 diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index c49b65ee6..470fcf215 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -82,7 +82,7 @@ cfg = ServerConfig { transports = undefined, tbqSize = 1, - serverTbqSize = 1, + -- serverTbqSize = 1, msgQueueQuota = 4, queueIdBytes = 24, msgIdBytes = 24,