smp: batch queue association updates on subscriptions (#1760)

* smp: batch queue association updates on subscriptions

* refactor to fused batching

* simpler

* batch assoc functions

* clean up

* fix

---------

Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
This commit is contained in:
Evgeny
2026-05-08 09:36:35 +01:00
committed by GitHub
parent ef3339ae4f
commit 8bd3193280
6 changed files with 214 additions and 42 deletions
@@ -0,0 +1,126 @@
# Server: batch queue service associations
When a batch of SUB or NSUB commands arrives from a service client, each command that needs a new or removed service association calls `setQueueService` individually - one DB write per command. For 135 commands per batch, that's 135 individual `UPDATE msg_queues` queries.
## Goal
Reduce to at most 2 DB queries per batch (one for rcv associations, one for ntf associations), using `UPDATE ... RETURNING recipient_id` to identify which queues were actually updated.
Also fuse message pre-fetch and association batching into a single batch preparation step with a clean contract.
## Contract
```haskell
prepareBatch :: Maybe ServiceId -> NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))))
```
`Left e` = batch-level failure (message pre-fetch or association query failed entirely). All SUBs/NSUBs in the batch get this error.
`Right map` = per-queue results as a tuple:
- `Maybe Message` - pre-fetched message for SUB queues, `Nothing` for NSUB or no message
- `Maybe (Either ErrorType ())` - association result. `Nothing` = no update needed. `Just (Right ())` = update succeeded. `Just (Left e)` = update failed for this queue.
One map, one lookup per queue. `processCommand` passes both values to `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue`.
Queues not in the map (non-SUB/NSUB commands, failed verification) are not affected.
## prepareBatch implementation
One accumulating fold over the batch, collecting three lists:
- `subMsgQs :: [StoreQueue s]` - SUB queues for message pre-fetch
- `rcvAssocQs :: [StoreQueue s]` - SUB queues needing `rcv_service_id` update (`clntServiceId /= rcvServiceId qr`)
- `ntfAssocQs :: [StoreQueue s]` - NSUB queues needing `ntf_service_id` update (`clntServiceId /= ntfServiceId` from `NtfCreds`)
Classification reads from the already-loaded `QueueRec` in `VerifiedTransmission` - no extra DB query.
Then three store calls (each skipped if its list is empty):
1. `tryPeekMsgs ms subMsgQs` -> `Map RecipientId Message`
2. `setRcvQueueServices (queueStore ms) clntServiceId rcvAssocQs` -> `Set RecipientId`
3. `setNtfQueueServices (queueStore ms) clntServiceId ntfAssocQs` -> `Set RecipientId`
Then one pass to merge results into `Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))`:
- For each SUB queue: `(M.lookup rId msgMap, assocResult rId rcvUpdated rcvAssocQs)`
- For each NSUB queue: `(Nothing, assocResult rId ntfUpdated ntfAssocQs)`
Where `assocResult rId updated assocQs` = if the queue was in `assocQs` (needed update), then `Just (Right ())` if `rId` is in `updated`, else `Just (Left AUTH)`. If not in `assocQs` (no update needed), `Nothing`.
If any of the three calls fails entirely, return `Left e`.
## Store interface
Replace the polymorphic `setQueueServices` with two plain functions in `QueueStoreClass`:
```haskell
setRcvQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId)
setNtfQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId)
```
No `SParty p` polymorphism. Each function knows its column.
### Postgres implementation
`setRcvQueueServices`:
```sql
UPDATE msg_queues SET rcv_service_id = ?
WHERE recipient_id IN ? AND deleted_at IS NULL
RETURNING recipient_id
```
`setNtfQueueServices`:
```sql
UPDATE msg_queues SET ntf_service_id = ?
WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL
RETURNING recipient_id
```
After each batch query, for each queue in the returned set:
1. Read QueueRec TVar, update with new serviceId
2. Write store log entry
### STM implementation
Loop over queues, call existing per-item logic, collect succeeded `RecipientId`s into a Set.
## Downstream changes in Server.hs
### processCommand
Gains one parameter: `Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))`.
SUB case: `M.lookup entId prepared` gives `Just (msg_, assocResult)` or `Nothing`. Pass both to `subscribeQueueAndDeliver`.
NSUB case: `M.lookup entId prepared` gives `Just (Nothing, assocResult)` or `Nothing`. Pass `assocResult` to `subscribeNotifications`.
Forwarded commands: pass `M.empty`.
### subscribeQueueAndDeliver
Takes `Maybe Message` and `Maybe (Either ErrorType ())` as before. No change in how it uses them.
### sharedSubscribeQueue
Takes `Maybe (Either ErrorType ())`. On paths needing association update:
- `Just (Left e)` -> return error
- `Just (Right ())` -> skip `setQueueService`, proceed with STM work
- `Nothing` -> no update needed, proceed with existing logic
## Implementation order (top-down)
1. Define the `prepareBatch` contract and thread one map through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue` (Server.hs)
2. Implement `prepareBatch` with the fold, three calls, and merge (Server.hs)
3. Add `setRcvQueueServices` and `setNtfQueueServices` to `QueueStoreClass` (Types.hs)
4. Implement for Postgres with batch `UPDATE ... RETURNING` (Postgres.hs)
5. Implement for STM as loop (STM.hs)
6. Implement for Journal as delegation (Journal.hs)
At step 2, store functions can initially be stubs returning empty sets. Steps 3-6 fill in the real implementations.
## Files changed
| File | Change |
|---|---|
| `src/Simplex/Messaging/Server.hs` | `prepareBatch` with fold + merge; one map parameter through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue` |
| `src/Simplex/Messaging/Server/QueueStore/Types.hs` | Add `setRcvQueueServices`, `setNtfQueueServices` to `QueueStoreClass` |
| `src/Simplex/Messaging/Server/QueueStore/Postgres.hs` | Implement with batch `UPDATE ... RETURNING` + per-item TVar/log updates |
| `src/Simplex/Messaging/Server/QueueStore/STM.hs` | Implement as loop |
| `src/Simplex/Messaging/Server/MsgStore/Journal.hs` | Delegate to underlying store |
+53 -41
View File
@@ -1366,19 +1366,37 @@ client
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
let THandleParams {thVersion} = thParams'
clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams')
process msgMap t acc@(rs, msgs) =
process batchSubs t acc@(rs, msgs) =
(maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_)))
<$> processCommand clntServiceId thVersion msgMap t
<$> processCommand clntServiceId thVersion batchSubs t
forever $ do
batch <- atomically (readTBQueue rcvQ)
msgMap <- prefetchMsgs batch
foldrM (process msgMap) ([], []) batch
batchSubs <- prepareBatchSubs clntServiceId batch
foldrM (process batchSubs) ([], []) batch
>>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_)
where
prefetchMsgs :: NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId Message))
prefetchMsgs batch =
let subQs = [q | (Just (q, _), (_, _, Cmd SRecipient SUB)) <- L.toList batch]
in if null subQs then pure $ Right M.empty else liftIO $ runExceptT $ tryPeekMsgs ms subQs
prepareBatchSubs ::
Maybe ServiceId ->
NonEmpty (VerifiedTransmission s) ->
M s (Either ErrorType (Map RecipientId Message, Map RecipientId (Either ErrorType ()), Map RecipientId (Either ErrorType ())))
prepareBatchSubs clntServiceId_ batch = do
let (subMsgQs, rcvAssocQs, ntfAssocQs) = foldr partitionSubs ([], [], []) batch
partitionSubs t (msgQs, rcvQs, ntfQs) = case t of
(Just (q, qr), (_, _, Cmd SRecipient SUB))
| clntServiceId_ /= rcvServiceId qr -> (q : msgQs, q : rcvQs, ntfQs)
| otherwise -> (q : msgQs, rcvQs, ntfQs)
(Just (q, qr), (_, _, Cmd SNotifier NSUB))
| clntServiceId_ /= (notifier qr >>= ntfServiceId) -> (msgQs, rcvQs, q : ntfQs)
_ -> (msgQs, rcvQs, ntfQs)
liftIO $ runExceptT $ do
rcvAssocs <- ifNotNull rcvAssocQs $ setService SRecipientService clntServiceId_
ntfAssocs <- ifNotNull ntfAssocQs $ setService SNotifierService clntServiceId_
msgs <- ifNotNull subMsgQs $ tryPeekMsgs ms
pure (msgs, rcvAssocs, ntfAssocs)
where
ifNotNull qs f = if null qs then pure M.empty else f qs
setService :: (PartyI p, ServiceParty p) => SParty p -> Maybe ServiceId -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId (Either ErrorType ()))
setService party sId = ExceptT . setQueueServices (queueStore ms) party sId
processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage)
processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of
@@ -1460,8 +1478,8 @@ client
mkIncProxyStats ps psOwn own sel = do
incStat $ sel ps
when own $ incStat $ sel psOwn
processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
processCommand clntServiceId clntVersion msgMap (q_, (corrId, entId, cmd)) = case cmd of
processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message, Map RecipientId (Either ErrorType ()), Map RecipientId (Either ErrorType ())) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
processCommand clntServiceId clntVersion batchSubs (q_, (corrId, entId, cmd)) = case cmd of
Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command)
Cmd SSender command -> case command of
SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k
@@ -1472,7 +1490,9 @@ client
LKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k $>> getQueueLink_ q qr
LGET -> withQueue $ \q qr -> checkContact qr $ getQueueLink_ q qr
Cmd SNotifier NSUB -> response . (corrId,entId,) <$> case q_ of
Just (q, QueueRec {notifier = Just ntfCreds}) -> subscribeNotifications q ntfCreds
Just (q, QueueRec {notifier = Just ntfCreds}) ->
either (pure . ERR) (\_ -> subscribeNotifications q ntfCreds)
$ batchSubs >>= \(_, _, ntfAssocs) -> sequence (M.lookup (recipientId q) ntfAssocs)
_ -> pure $ ERR INTERNAL
Cmd SNotifierService (NSUBS n idsHash) -> response . (corrId,entId,) <$> case clntServiceId of
Just serviceId -> subscribeServiceNotifications serviceId (n, idsHash)
@@ -1485,9 +1505,9 @@ client
pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth
Cmd SRecipient command ->
case command of
SUB -> case msgMap of
SUB -> case batchSubs >>= \(msgs, rcvAssocs, _) -> sequence (M.lookup entId rcvAssocs) $> msgs of
Left e -> pure $ Just (err e, Nothing)
Right msgs -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs)
Right msgs -> withQueue' $ subscribeQueueAndDeliver $ M.lookup entId msgs
GET -> withQueue getMessage
ACK msgId -> withQueue $ acknowledgeMsg msgId
KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey
@@ -1632,9 +1652,7 @@ client
subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} =
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
Nothing ->
sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case
Left e -> pure (err e, Nothing)
Right s -> deliver s
deliver =<< sharedSubscribeQueue q rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices
Just s@Sub {subThread} -> do
stats <- asks serverStats
case subThread of
@@ -1735,26 +1753,22 @@ client
else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err') (action q)
subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg
subscribeNotifications q NtfCreds {ntfServiceId} =
sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices >>= \case
Left e -> pure $ ERR e
Right (hasSub, _) -> do
when (isNothing clntServiceId) $
asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub)
pure $ SOK clntServiceId
subscribeNotifications q NtfCreds {ntfServiceId} = do
(hasSub, _) <- sharedSubscribeQueue q ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices
when (isNothing clntServiceId) $
asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub)
pure $ SOK clntServiceId
sharedSubscribeQueue ::
(PartyI p, ServiceParty p) =>
StoreQueue s ->
SParty p ->
Maybe ServiceId ->
ServerSubscribers s ->
(Client s -> TMap QueueId sub) ->
(Client s -> TVar (Int64, IdsHash)) ->
STM sub ->
(ServerStats -> ServiceStats) ->
M s (Either ErrorType (Bool, Maybe sub))
sharedSubscribeQueue q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
M s (Bool, Maybe sub)
sharedSubscribeQueue q queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
stats <- asks serverStats
let incSrvStat sel = incStat $ sel $ servicesSel stats
writeSub = writeTQueue (subQ srvSubscribers) (CSClient entId queueServiceId clntServiceId, clientId)
@@ -1768,25 +1782,23 @@ client
incSrvStat srvSubCount
incSrvStat srvSubQueues
incSrvStat srvAssocDuplicate
pure $ Right (hasSub, Nothing)
| otherwise -> runExceptT $ do
-- new or updated queue-service association
ExceptT $ setQueueService (queueStore ms) q party (Just serviceId)
pure (hasSub, Nothing)
| otherwise -> do
-- association already done in prepareBatchSubs
hasSub <- atomically $ (<$ incServiceQueueSubs) =<< hasServiceSub
atomically writeSub
liftIO $ do
unless hasSub $ incSrvStat srvSubCount
incSrvStat srvSubQueues
incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId
unless hasSub $ incSrvStat srvSubCount
incSrvStat srvSubQueues
incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId
pure (hasSub, Nothing)
where
hasServiceSub = ((0 /=) . fst) <$> readTVar (clientServiceSubs clnt)
-- This function is used when queue association with the service is created.
incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDs hash
incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDS hash
Nothing -> case queueServiceId of
Just _ -> runExceptT $ do
ExceptT $ setQueueService (queueStore ms) q party Nothing
liftIO $ incSrvStat srvAssocRemoved
Just _ -> do
-- unassociation already done in prepareBatchSubs
incSrvStat srvAssocRemoved
-- getSubscription may be Just for receiving service, where clientSubs also hold active deliveries for service subscriptions.
-- For notification service it can only be Just if storage and session states diverge.
r <- atomically $ getSubscription >>= newSub
@@ -1795,7 +1807,7 @@ client
Nothing -> do
r@(hasSub, _) <- atomically $ getSubscription >>= newSub
unless hasSub $ atomically writeSub
pure $ Right r
pure r
where
getSubscription = TM.lookup entId $ clientSubs clnt
newSub = \case
@@ -2094,7 +2106,7 @@ client
-- rejectOrVerify filters allowed commands, no need to repeat it here.
-- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types).
-- `fst` removes empty message that is only returned for `SUB` command
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) t'')
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right (M.empty, M.empty, M.empty)) t'')
-- encode response
r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of
[] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right
@@ -353,6 +353,8 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
{-# INLINE getCreateService #-}
setQueueService = withQS setQueueService
{-# INLINE setQueueService #-}
setQueueServices = withQS setQueueServices
{-# INLINE setQueueServices #-}
getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s))
{-# INLINE getQueueNtfServices #-}
getServiceQueueCountHash = withQS (getServiceQueueCountHash @(JournalQueue s))
@@ -91,7 +91,7 @@ import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPServiceRole (..))
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, maybeFirstRow', tshow, (<$$>))
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, maybeFirstRow', tshow, (<$$>), ($>>=))
import System.Exit (exitFailure)
import System.IO (IOMode (..), hFlush, stdout)
import UnliftIO.STM
@@ -504,6 +504,32 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
atomically $ writeTVar (queueRec sq) $ Just q'
withLog "setQueueService" st $ \sl -> logQueueService sl rId party serviceId
setQueueServices :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> Maybe ServiceId -> [q] -> IO (Either ErrorType (M.Map RecipientId (Either ErrorType ())))
setQueueServices _ _ _ [] = pure $ Right M.empty
setQueueServices st party serviceId qs = E.uninterruptibleMask_ $ runExceptT $ do
updated <- S.fromList <$> withDB' "setQueueServices" st (\db ->
map fromOnly <$> DB.query db updateQuery (serviceId, In (map recipientId qs)))
results <- liftIO $ forM qs $ \sq -> do
let rId = recipientId sq
(rId,) <$> if S.member rId updated
then readQueueRecIO (queueRec sq) $>>= \q -> do
atomically $ writeTVar (queueRec sq) $ Just $ updateRec q
withLog "setQueueServices" st $ \sl -> logQueueService sl rId party serviceId
pure $ Right ()
else pure $ Left AUTH
pure $ M.fromList results
where
updateQuery = case party of
SRecipientService ->
"UPDATE msg_queues SET rcv_service_id = ? WHERE recipient_id IN ? AND deleted_at IS NULL RETURNING recipient_id"
SNotifierService ->
"UPDATE msg_queues SET ntf_service_id = ? WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL RETURNING recipient_id"
updateRec q = case party of
SRecipientService -> q {rcvServiceId = serviceId}
SNotifierService -> case notifier q of
Just nc -> q {notifier = Just nc {ntfServiceId = serviceId}}
Nothing -> q
getQueueNtfServices :: PostgresQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
getQueueNtfServices st ntfs = E.uninterruptibleMask_ $ runExceptT $ do
snIds <-
@@ -337,6 +337,10 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
mapM_ (removeServiceQueue st serviceSel qId) prevSrvId
mapM_ (addServiceQueue st serviceSel qId) serviceId
setQueueServices st party serviceId qs = Right . M.fromList <$> mapM setQueue qs
where
setQueue sq = (recipientId sq,) <$> setQueueService st sq party serviceId
getQueueNtfServices :: STMQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
getQueueNtfServices st ntfs = do
ss <- readTVarIO (services st)
@@ -16,6 +16,7 @@ import Control.Concurrent.STM
import Control.Monad
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import Data.Text (Text)
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
@@ -51,6 +52,7 @@ class StoreQueueClass q => QueueStoreClass q s where
deleteStoreQueue :: s -> q -> IO (Either ErrorType QueueRec)
getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId)
setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())
setQueueServices :: (PartyI p, ServiceParty p) => s -> SParty p -> Maybe ServiceId -> [q] -> IO (Either ErrorType (Map RecipientId (Either ErrorType ())))
getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
getServiceQueueCountHash :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType (Int64, IdsHash))