agent: use primary key index in setRcvServiceAssocs (#1783)

* agent: use primary key index in setRcvServiceAssocs

Previous WHERE rcv_id = ? did not match the (host, port, rcv_id)
primary key prefix and fell back to a table scan via
idx_rcv_queues_client_notice_id. With ~390k rows per queue, each
update in a 1350-row batch scanned the whole table, yielding ~290s
per batch and a multi-hour rcv-services migration.

* agent: pass SMPServer explicitly to setRcvServiceAssocs

Avoid extracting host/port from the first queue inside setRcvServiceAssocs.
The caller already has SMPServer in scope (from tSess) and the call chain
is short, so threading it through is simpler than inspecting the list.
Removes the empty-list guard from setRcvServiceAssocs (it remains in
processRcvServiceAssocs).
This commit is contained in:
sh
2026-05-20 12:56:55 +00:00
committed by GitHub
parent fb2d0a4c3d
commit 118a8e89bb
3 changed files with 16 additions and 10 deletions
+1 -1
View File
@@ -3106,7 +3106,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar
unless (null connIds) $ do
notify' "" $ UP srv connIds
atomically $ incSMPServerStat' c userId srv connSubscribed $ length connIds
readTVarIO serviceRQs >>= processRcvServiceAssocs c
readTVarIO serviceRQs >>= processRcvServiceAssocs c srv
where
withRcvConn :: SMP.RecipientId -> (forall c. RcvQueue -> Connection c -> AM ()) -> AM' ()
withRcvConn rId a = do
+5 -5
View File
@@ -1692,7 +1692,7 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c
unless (null notices) $ takeTMVar $ clientNoticesLock c
pure r
unless (null serviceQs) $ void $
processRcvServiceAssocs c serviceQs `runReaderT` agentEnv c
processRcvServiceAssocs c srv serviceQs `runReaderT` agentEnv c
unless (null notices) $ void $
(processClientNotices c tSess notices `runReaderT` agentEnv c)
`E.finally` atomically (putTMVar (clientNoticesLock c) ())
@@ -1714,10 +1714,10 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c
tSess = transportSession' smp
sessId = sessionId $ thParams smp
processRcvServiceAssocs :: SMPQueue q => AgentClient -> [q] -> AM' ()
processRcvServiceAssocs _ [] = pure ()
processRcvServiceAssocs c serviceQs =
withStore' c (`setRcvServiceAssocs` serviceQs) `catchAllErrors'` \e -> do
processRcvServiceAssocs :: SMPQueue q => AgentClient -> SMPServer -> [q] -> AM' ()
processRcvServiceAssocs _ _ [] = pure ()
processRcvServiceAssocs c srv serviceQs =
withStore' c (\db -> setRcvServiceAssocs db srv serviceQs) `catchAllErrors'` \e -> do
logError $ "processRcvServiceAssocs error: " <> tshow e
notifySub' c "" $ ERR e
@@ -2399,12 +2399,18 @@ unassocUserServerRcvQueueSubs' db userId srv@(SMPServer h p kh) = do
unsetQueuesToSubscribe :: DB.Connection -> IO ()
unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1"
setRcvServiceAssocs :: SMPQueue q => DB.Connection -> [q] -> IO ()
setRcvServiceAssocs db rqs = do
setRcvServiceAssocs :: SMPQueue q => DB.Connection -> SMPServer -> [q] -> IO ()
setRcvServiceAssocs db ProtocolServer {host, port} rqs =
#if defined(dbPostgres)
DB.execute db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id IN ?" $ Only $ In (map queueId rqs)
DB.execute
db
"UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id IN ?"
(host, port, In (map queueId rqs))
#else
DB.executeMany db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id = ?" $ map (Only . queueId) rqs
DB.executeMany
db
"UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id = ?"
(map (\q -> (host, port, queueId q)) rqs)
#endif
removeRcvServiceAssocs :: DB.Connection -> UserId -> SMPServer -> IO ()