smp server: batch processing of subscription messages

This commit is contained in:
Evgeny @ SimpleX Chat
2026-03-28 12:02:01 +00:00
parent 3134d6206d
commit 2dca962fc6
7 changed files with 291 additions and 14 deletions
@@ -0,0 +1,152 @@
# Server: batched SUB command processing
Implementation plan for Part 1 of [RFC 2026-03-28-subscription-performance](../rfcs/2026-03-28-subscription-performance.md).
## Current state
When a batch of ~135 SUB commands arrives, the server already batches:
- Queue record lookups (`getQueueRecs` in `receive`, Server.hs:1151)
- Command verification (`verifyLoadedQueue`, Server.hs:1152)
But command processing is per-command (`foldrM process` in `client`, Server.hs:1372-1375). Each SUB calls `subscribeQueueAndDeliver` which calls `tryPeekMsg` - one DB query per queue. For Postgres, that's ~135 individual `SELECT ... FROM messages WHERE recipient_id = ? ORDER BY message_id ASC LIMIT 1` queries per batch.
## Goal
Replace ~135 individual message peek queries with 1 batched query per batch. No protocol changes.
## Implementation
### Step 1: Add `tryPeekMsgs` to MsgStoreClass
File: `src/Simplex/Messaging/Server/MsgStore/Types.hs`
Add to `MsgStoreClass`:
```haskell
tryPeekMsgs :: s -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId Message)
```
Returns a map from recipient ID to earliest pending message for each queue that has one. Queues with no messages are absent from the map.
### Step 2: Parameterize `deliver` to accept pre-fetched message
File: `src/Simplex/Messaging/Server.hs`
Currently `deliver` (inside `subscribeQueueAndDeliver`, line 1641) calls `tryPeekMsg ms q`. Add a parameter for an optional pre-fetched message:
```haskell
deliver :: Maybe Message -> (Bool, Maybe Sub) -> M s ResponseAndMessage
deliver prefetchedMsg (hasSub, sub_) = do
stats <- asks serverStats
fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do
msg_ <- maybe (tryPeekMsg ms q) (pure . Just) prefetchedMsg
...
```
When `Nothing` is passed, falls back to individual `tryPeekMsg` (existing behavior). When `Just msg` is passed, uses it directly (batched path).
### Step 3: Pre-fetch messages before the processing loop
File: `src/Simplex/Messaging/Server.hs`
Currently (lines 1372-1375):
```haskell
forever $
atomically (readTBQueue rcvQ)
>>= foldrM process ([], [])
>>= \(rs_, msgs) -> ...
```
Add a pre-fetch step before the existing loop:
```haskell
forever $ do
batch <- atomically (readTBQueue rcvQ)
msgMap <- prefetchMsgs batch
foldrM (process msgMap) ([], []) batch
>>= \(rs_, msgs) -> ...
```
`prefetchMsgs` scans the batch, collects queues from SUB commands that have a verified queue (`q_ = Just (q, _)`), calls `tryPeekMsgs` once, returns the map. For batches with no SUBs it returns an empty map (no DB call).
`process` passes the looked-up message (or Nothing) through to `processCommand` and down to `deliver`.
The `foldrM process` loop, `processCommand`, `subscribeQueueAndDeliver`, and all other command handlers stay structurally the same. Only `deliver` gains one parameter, and the `client` loop gains one pre-fetch call.
### Step 4: Review
Review the typeclass signature and server usage. Confirm the interface has the right shape before implementing store backends.
### Step 5: Implement for each store backend
#### Postgres
File: `src/Simplex/Messaging/Server/MsgStore/Postgres.hs`
Single query using `DISTINCT ON`:
```sql
SELECT DISTINCT ON (recipient_id)
recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
FROM messages
WHERE recipient_id IN ?
ORDER BY recipient_id, message_id ASC
```
Build `Map RecipientId Message` from results.
#### STM
File: `src/Simplex/Messaging/Server/MsgStore/STM.hs`
Loop over queues, call `tryPeekMsg` for each, collect into map.
#### Journal
File: `src/Simplex/Messaging/Server/MsgStore/Journal.hs`
Loop over queues, call `tryPeekMsg` for each, collect into map.
### Step 6: Handle edge cases
1. **Mixed batches**: `prefetchMsgs` collects only SUB queues. Non-SUB commands get Nothing for the pre-fetched message and process unchanged.
2. **Already-subscribed queues**: Include in pre-fetch - `deliver` is called for re-SUBs too (delivers pending message).
3. **Service subscriptions**: The pre-fetch doesn't care about service state. `sharedSubscribeQueue` handles service association in STM; message peek is the same.
4. **Error queues**: Verification errors from `receive` are Left values in the batch. `prefetchMsgs` only looks at Right values with SUB commands.
5. **Empty pre-fetch**: If batch has no SUBs (e.g., all ACKs), `prefetchMsgs` returns empty map, no DB call made.
### Step 7: Batch other commands (future, not in scope)
The same pattern (pre-fetch before loop, parameterize handler) can extend to:
- `ACK` with `tryDelPeekMsg` - batch delete+peek
- `GET` with `tryPeekMsg` - same map lookup
Lower priority since these don't have the N-at-once pattern of subscriptions.
## File changes summary
| File | Change |
|---|---|
| `src/Simplex/Messaging/Server/MsgStore/Types.hs` | Add `tryPeekMsgs` to typeclass |
| `src/Simplex/Messaging/Server/MsgStore/Postgres.hs` | Implement `tryPeekMsgs` with batch SQL |
| `src/Simplex/Messaging/Server/MsgStore/STM.hs` | Implement `tryPeekMsgs` as loop |
| `src/Simplex/Messaging/Server/MsgStore/Journal.hs` | Implement `tryPeekMsgs` as loop |
| `src/Simplex/Messaging/Server.hs` | Add `prefetchMsgs`, parameterize `deliver` |
## Testing
1. Existing server tests must pass unchanged (correctness preserved).
2. Add a test that subscribes a batch of queues (some with pending messages, some without) and verifies all get correct SOK + MSG responses.
3. Prometheus metrics: existing `qSub` stat should still increment correctly.
## Performance expectation
For 300K queues across ~2200 batches:
- Before: ~300K individual DB queries
- After: ~2200 batched DB queries (one per batch of ~135)
- ~136x reduction in DB round-trips
@@ -0,0 +1,90 @@
# Subscription performance
No protocol changes. This is an implementation RFC addressing subscription performance bottlenecks in both the SMP router and the agent.
## Problem
Subscribing large numbers of queues is slow. A messaging client with ~300K queues per router across 3 routers takes over 1 hour to subscribe. For comparison, the NTF server with ~1M queues per router across 12 routers took 20-30 minutes (prior to NTF client services, now in master).
Even on fast networks (cloud VMs), a client with 1.1M active subscriptions needed ~1.5M attempts (commands sent) to fully subscribe - ~36% retry rate caused by the timeout cascade described below.
### Root causes
#### 1. Router: per-command processing in batches
Batch verification and queue lookups are already done efficiently for the whole batch in `Server.hs`. But `processCommand` is called per-command in a loop - each SUB does its own individual DB query for message peek/delivery. With ~135 SUBs per batch (current SMP version), that's 135 individual DB queries per batch instead of 1 batched query.
For 300K queues, that's ~2200 batches x 135 queries = ~300K individual DB queries on the router, which is the dominant bottleneck when using PostgreSQL storage.
NSUB is cheaper because it just registers for notifications without message delivery - no per-queue DB query.
#### 2. Agent: all queues read and sent at once
`getUserServerRcvQueueSubs` reads all queues for a `(userId, server)` pair in one query with no LIMIT. For 300K queues, the entire result set is loaded into memory, then all ~2200 batches are queued to send without waiting for responses.
The NTF server agent uses cursor-style reading with configurable batch sizes (900 subs per chunk, 90K per DB fetch) and waits for each chunk to be processed before fetching the next.
#### 3. No backpressure on sends
`nonBlockingWriteTBQueue` bypasses the `sndQ` bound by forking a thread when the queue is full. All batches are queued immediately, and all their response timers start simultaneously. A 30-second per-response timeout means later batches time out not because the router is slow to respond to them specifically, but because they're waiting in the router's receive queue behind thousands of earlier commands.
This causes cascading timeouts: timed-out responses trigger `resubscribeSMPSession`, which retries all pending subs. Three consecutive timeouts can trigger connection drop via the monitor thread, causing a full reconnection and retry of everything.
## Solution
### Part 1: Router - batched command processing
Move the per-command processing loop inside command handlers so that commands of the same type within a batch can be processed together.
Current flow:
```
receive batch -> verify all -> lookup queues all -> for each command: processCommand (individual DB query)
```
Proposed flow:
```
receive batch -> verify all -> lookup queues all -> group by command type -> process group:
SUB group: one batched message peek query for all queues
NSUB group: batch registration (already cheap, but can batch DB writes)
other commands: process individually as before
```
For SUB, the batched processing would:
1. Collect all queue IDs from the SUB group
2. Perform a single DB query to peek messages for all queues
3. Distribute results back to individual responses
This reduces ~135 DB queries per batch to 1, cutting router-side DB load by ~100x for subscriptions.
Commands where batching doesn't matter (SEND, ACK, KEY, etc.) continue to be processed individually.
### Part 2: Agent - cursor-based subscription with backpressure
Replace the all-at-once fetch-and-send pattern with cursor-style batching, similar to what the NTF server agent does.
Changes to `subscribeUserServer`:
1. Fetch queues in fixed-size batches (e.g., configurable, default ~1000) using LIMIT/OFFSET or cursor-based pagination.
2. Send each batch and wait for responses before sending the next.
3. Remove the use of `nonBlockingWriteTBQueue` for subscription batches - use blocking writes or structured backpressure so response timers don't start until the batch is actually sent.
This ensures:
- Memory usage is bounded (not 300K queue records in memory at once)
- Response timeouts are meaningful (timer starts when the router receives the batch, not when it's queued locally)
- Retries are scoped to the failed batch, not all pending subs
- Works on slow/lossy networks by naturally pacing sends
### Part 3: Response timeout for batches
The current per-response 30-second timeout doesn't account for batch processing time. Options:
1. **Stagger deadlines**: later responses in a batch get proportionally more time. The `rcvConcurrency` field was designed for this but is never used.
2. **Per-batch timeout**: instead of timing individual responses, timeout the entire batch with a budget proportional to batch size.
3. **No timeout for subscription responses**: since subscriptions are sent as batches with backpressure (Part 2), and the connection is monitored by pings, individual response timeouts may not be needed. A subscription that doesn't get a response will be retried on reconnect.
## Priority and ordering
Part 1 (router batching) gives the biggest improvement and is independent of Parts 2/3.
Part 2 (agent cursor + backpressure) eliminates the retry cascade and is critical for slow networks.
Part 3 (timeout handling) is a refinement that can be addressed after Parts 1 and 2.
+18 -12
View File
@@ -1366,14 +1366,20 @@ client
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
let THandleParams {thVersion} = thParams'
clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams')
process t acc@(rs, msgs) =
process msgMap t acc@(rs, msgs) =
(maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_)))
<$> processCommand clntServiceId thVersion t
forever $
atomically (readTBQueue rcvQ)
>>= foldrM process ([], [])
<$> processCommand clntServiceId thVersion msgMap t
forever $ do
batch <- atomically (readTBQueue rcvQ)
msgMap <- prefetchMsgs batch
foldrM (process msgMap) ([], []) batch
>>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_)
where
prefetchMsgs :: NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId Message))
prefetchMsgs batch =
let subQueues = [q | (Just (q, _), (_, _, Cmd SRecipient SUB)) <- L.toList batch]
in liftIO $ runExceptT $ tryPeekMsgs ms subQueues
processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage)
processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of
PRXY srv auth -> ifM allowProxy getRelay (pure $ Just $ ERR $ PROXY BASIC_AUTH)
@@ -1454,8 +1460,8 @@ client
mkIncProxyStats ps psOwn own sel = do
incStat $ sel ps
when own $ incStat $ sel psOwn
processCommand :: Maybe ServiceId -> VersionSMP -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
processCommand clntServiceId clntVersion (q_, (corrId, entId, cmd)) = case cmd of
processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
processCommand clntServiceId clntVersion msgMap (q_, (corrId, entId, cmd)) = case cmd of
Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command)
Cmd SSender command -> case command of
SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k
@@ -1479,7 +1485,7 @@ client
pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth
Cmd SRecipient command ->
case command of
SUB -> withQueue' subscribeQueueAndDeliver
SUB -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId <$> msgMap)
GET -> withQueue getMessage
ACK msgId -> withQueue $ acknowledgeMsg msgId
KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey
@@ -1620,8 +1626,8 @@ client
suspendQueue_ :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg)
suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q
subscribeQueueAndDeliver :: StoreQueue s -> QueueRec -> M s ResponseAndMessage
subscribeQueueAndDeliver q qr@QueueRec {rcvServiceId} =
subscribeQueueAndDeliver :: Either ErrorType (Maybe Message) -> StoreQueue s -> QueueRec -> M s ResponseAndMessage
subscribeQueueAndDeliver prefetchedMsg q qr@QueueRec {rcvServiceId} =
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
Nothing ->
sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case
@@ -1642,7 +1648,7 @@ client
deliver (hasSub, sub_) = do
stats <- asks serverStats
fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do
msg_ <- tryPeekMsg ms q
msg_ <- liftEither prefetchedMsg
msg' <- forM msg_ $ \msg -> liftIO $ do
ts <- getSystemSeconds
sub <- maybe (atomically getSub) pure sub_
@@ -2087,7 +2093,7 @@ client
-- rejectOrVerify filters allowed commands, no need to repeat it here.
-- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types).
-- `fst` removes empty message that is only returned for `SUB` command
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion t'')
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) t'')
-- encode response
r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of
[] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right
@@ -64,7 +64,7 @@ import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (sort)
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe)
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, mapMaybe)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
@@ -672,6 +672,9 @@ instance MsgStoreClass (JournalMsgStore s) where
atomically $ writeTVar tipMsg $ Just (Just ml)
pure $ Just msg
tryPeekMsgs st qs =
M.fromList . catMaybes <$> mapM (\q -> (recipientId' q,) <$$> tryPeekMsg st q) qs
tryDeleteMsg_ :: JournalQueue s -> JournalMsgQueue s -> Bool -> StoreIO s ()
tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $
void $
@@ -41,7 +41,7 @@ import Data.List (intersperse)
import qualified Data.Map.Strict as M
import Data.Text (Text)
import Data.Time.Clock.System (SystemTime (..))
import Database.PostgreSQL.Simple (Binary (..), Only (..), (:.) (..))
import Database.PostgreSQL.Simple (Binary (..), In (..), Only (..), (:.) (..))
import qualified Database.PostgreSQL.Simple as DB
import qualified Database.PostgreSQL.Simple.Copy as DB
import Database.PostgreSQL.Simple.SqlQQ (sql)
@@ -246,6 +246,25 @@ instance MsgStoreClass PostgresMsgStore where
tryPeekMsg ms q = isolateQueue ms q "tryPeekMsg" $ tryPeekMsg_ q ()
{-# INLINE tryPeekMsg #-}
tryPeekMsgs :: PostgresMsgStore -> [PostgresQueue] -> ExceptT ErrorType IO (Map RecipientId Message)
tryPeekMsgs _ms [] = pure M.empty
tryPeekMsgs ms qs =
uninterruptibleMask_ $
withDB' "tryPeekMsgs" (queueStore_ ms) $ \db ->
M.fromList . map toRcvMsg <$>
DB.query
db
[sql|
SELECT DISTINCT ON (recipient_id)
recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
FROM messages
WHERE recipient_id IN ?
ORDER BY recipient_id, message_id ASC
|]
(Only (In (map recipientId' qs)))
where
toRcvMsg (Only rId :. msg) = (rId, toMessage msg)
tryDelMsg :: PostgresMsgStore -> PostgresQueue -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg ms q msgId =
uninterruptibleMask_ $
@@ -24,6 +24,7 @@ import Control.Monad.Trans.Except
import Data.Functor (($>))
import Data.Int (Int64)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import Data.Text (Text)
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.MsgStore.Types
@@ -176,6 +177,9 @@ instance MsgStoreClass STMMsgStore where
tryPeekMsg_ _ = tryPeekTQueue . msgTQueue
{-# INLINE tryPeekMsg_ #-}
tryPeekMsgs st qs =
M.fromList . catMaybes <$> mapM (\q -> (recipientId' q,) <$$> tryPeekMsg st q) qs
tryDeleteMsg_ :: STMQueue -> STMMsgQueue -> Bool -> STM ()
tryDeleteMsg_ _ STMMsgQueue {msgTQueue = q, size} _logState =
tryReadTQueue q >>= \case
@@ -41,6 +41,7 @@ import Control.Monad.Trans.Except
import Data.Functor (($>))
import Data.Int (Int64)
import Data.Kind
import Data.Map.Strict (Map)
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import Data.Time.Clock.System (SystemTime (systemSeconds))
@@ -87,6 +88,8 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M
isolateQueue :: s -> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
unsafeRunStore :: StoreQueue s -> Text -> StoreMonad s a -> IO a
tryPeekMsgs :: s -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId Message)
-- default implementations are overridden for PostgreSQL storage of messages
tryPeekMsg :: s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
tryPeekMsg st q = snd <$$> withPeekMsgQueue st q "tryPeekMsg" pure