diff --git a/apps/smp-server/Main.hs b/apps/smp-server/Main.hs index 05161bde1..0f14d7ca0 100644 --- a/apps/smp-server/Main.hs +++ b/apps/smp-server/Main.hs @@ -37,6 +37,7 @@ serverConfig :: ServerConfig serverConfig = ServerConfig { tbqSize = 16, + msgQueueQuota = 256, queueIdBytes = 12, msgIdBytes = 6, -- below parameters are set based on ini file /etc/opt/simplex/smp-server.ini diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 77fd5f622..a3be20c1f 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -412,6 +412,7 @@ runSrvMsgDelivery c@AgentClient {subQ} srv = do withRetryInterval ri $ \loop -> do sendAgentMessage c sq msgBody `catchError` \case + SMP SMP.QUOTA -> loop e@SMP {} -> notify connId $ MERR mId e _ -> loop notify connId $ SENT mId diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 3d438d540..85d6e8369 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -192,6 +192,8 @@ data ErrorType CMD CommandError | -- | command authorization error - bad signature or non-existing SMP queue AUTH + | -- | SMP queue capacity is exceeded on the server + QUOTA | -- | ACK command is sent without message to be acknowledged NO_MSG | -- | internal server error diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 221793d2f..85eb66447 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -296,16 +296,19 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = QueueActive -> do ms <- asks msgStore msg <- mkMessage + quota <- asks $ msgQueueQuota . config atomically $ do - q <- getMsgQueue ms (recipientId qr) - writeMsg q msg - return ok + q <- getMsgQueue ms (recipientId qr) quota + isFull q >>= \case + False -> writeMsg q msg $> ok + True -> pure $ err QUOTA deliverMessage :: (MsgQueue -> STM (Maybe Message)) -> RecipientId -> Sub -> m Transmission deliverMessage tryPeek rId = \case Sub {subThread = NoSub} -> do ms <- asks msgStore - q <- atomically $ getMsgQueue ms rId + quota <- asks $ msgQueueQuota . config + q <- atomically $ getMsgQueue ms rId quota atomically (tryPeek q) >>= \case Nothing -> forkSub q $> ok Just msg -> atomically setDelivered $> mkResp corrId rId (msgCmd msg) diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 83282f03f..5b640db7d 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -25,6 +25,7 @@ import UnliftIO.STM data ServerConfig = ServerConfig { transports :: [(ServiceName, ATransport)], tbqSize :: Natural, + msgQueueQuota :: Natural, queueIdBytes :: Int, msgIdBytes :: Int, storeLog :: Maybe (StoreLog 'ReadMode), diff --git a/src/Simplex/Messaging/Server/MsgStore.hs b/src/Simplex/Messaging/Server/MsgStore.hs index e2cb8791a..3d729af60 100644 --- a/src/Simplex/Messaging/Server/MsgStore.hs +++ b/src/Simplex/Messaging/Server/MsgStore.hs @@ -3,6 +3,7 @@ module Simplex.Messaging.Server.MsgStore where import Data.Time.Clock +import Numeric.Natural import Simplex.Messaging.Protocol (Encoded, MsgBody, RecipientId) data Message = Message @@ -12,10 +13,11 @@ data Message = Message } class MonadMsgStore s q m | s -> q where - getMsgQueue :: s -> RecipientId -> m q + getMsgQueue :: s -> RecipientId -> Natural -> m q delMsgQueue :: s -> RecipientId -> m () class MonadMsgQueue q m where + isFull :: q -> m Bool writeMsg :: q -> Message -> m () -- non blocking tryPeekMsg :: q -> m (Maybe Message) -- non blocking peekMsg :: q -> m Message -- blocking diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index f5b0e670f..6d0fb63a0 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -8,11 +8,12 @@ module Simplex.Messaging.Server.MsgStore.STM where import Data.Map.Strict (Map) import qualified Data.Map.Strict as M +import Numeric.Natural import Simplex.Messaging.Protocol (RecipientId) import Simplex.Messaging.Server.MsgStore import UnliftIO.STM -newtype MsgQueue = MsgQueue {msgQueue :: TQueue Message} +newtype MsgQueue = MsgQueue {msgQueue :: TBQueue Message} newtype MsgStoreData = MsgStoreData {messages :: Map RecipientId MsgQueue} @@ -22,13 +23,13 @@ newMsgStore :: STM STMMsgStore newMsgStore = newTVar $ MsgStoreData M.empty instance MonadMsgStore STMMsgStore MsgQueue STM where - getMsgQueue :: STMMsgStore -> RecipientId -> STM MsgQueue - getMsgQueue store rId = do + getMsgQueue :: STMMsgStore -> RecipientId -> Natural -> STM MsgQueue + getMsgQueue store rId quota = do m <- messages <$> readTVar store maybe (newQ m) return $ M.lookup rId m where newQ m' = do - q <- MsgQueue <$> newTQueue + q <- MsgQueue <$> newTBQueue quota writeTVar store . MsgStoreData $ M.insert rId q m' return q @@ -37,15 +38,18 @@ instance MonadMsgStore STMMsgStore MsgQueue STM where modifyTVar store $ MsgStoreData . M.delete rId . messages instance MonadMsgQueue MsgQueue STM where + isFull :: MsgQueue -> STM Bool + isFull = isFullTBQueue . msgQueue + writeMsg :: MsgQueue -> Message -> STM () - writeMsg = writeTQueue . msgQueue + writeMsg = writeTBQueue . msgQueue tryPeekMsg :: MsgQueue -> STM (Maybe Message) - tryPeekMsg = tryPeekTQueue . msgQueue + tryPeekMsg = tryPeekTBQueue . msgQueue peekMsg :: MsgQueue -> STM Message - peekMsg = peekTQueue . msgQueue + peekMsg = peekTBQueue . msgQueue -- atomic delete (== read) last and peek next message if available tryDelPeekMsg :: MsgQueue -> STM (Maybe Message) - tryDelPeekMsg (MsgQueue q) = tryReadTQueue q >> tryPeekTQueue q + tryDelPeekMsg (MsgQueue q) = tryReadTBQueue q >> tryPeekTBQueue q diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 5b7dad9c2..5bd38d61e 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -57,6 +57,7 @@ cfg = ServerConfig { transports = undefined, tbqSize = 1, + msgQueueQuota = 4, queueIdBytes = 12, msgIdBytes = 6, storeLog = Nothing,