diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 7d046eb4c..ac9497f87 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1597,8 +1597,7 @@ subscribeAllConnections' :: AgentClient -> Bool -> Maybe UserId -> AM () subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do userSrvs <- withStore' c (`getSubscriptionServers` onlyNeeded) unless (null userSrvs) $ do - maxPending <- asks $ maxPendingSubscriptions . config - currPending <- newTVarIO 0 + batchSize <- asks $ subsBatchSize . config let userSrvs' = case activeUserId_ of Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs Nothing -> userSrvs @@ -1610,7 +1609,7 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do -- On successful service subscription, only unassociated queues will be subscribed. userSrvs2 <- withStore' c $ \db -> mapM (getService db useServices) userSrvs' userSrvs3 <- lift $ mapConcurrently subscribeService userSrvs2 - rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs3 + rs <- lift $ mapConcurrently (subscribeUserServer batchSize) userSrvs3 let (errs, oks) = partitionEithers rs logInfo $ "subscribed " <> tshow (sum oks) <> " queues" forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",) @@ -1647,18 +1646,16 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do unassocQueues :: AM Bool unassocQueues = False <$ withStore' c (\db -> removeRcvServiceAssocs db userId srv) _ -> pure False - subscribeUserServer :: Int -> TVar Int -> ((UserId, SMPServer), ServiceAssoc) -> AM' (Either AgentErrorType Int) - subscribeUserServer maxPending currPending ((userId, srv), hasService) = do - atomically $ whenM ((maxPending <=) <$> readTVar currPending) retry - tryAllErrors' $ do - qs <- withStore' c $ \db -> do - qs <- getUserServerRcvQueueSubs db userId srv onlyNeeded hasService - unless (null qs) $ atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction - pure qs - let n = length qs - unless (null qs) $ lift $ subscribe qs `E.finally` atomically (modifyTVar' currPending $ subtract n) - pure n + subscribeUserServer :: Int -> ((UserId, SMPServer), ServiceAssoc) -> AM' (Either AgentErrorType Int) + subscribeUserServer batchSize ((userId, srv), hasService) = tryAllErrors' $ loop 0 Nothing where + loop !n cursor_ = do + qs <- withStore' c $ \db -> getUserServerRcvQueueSubs db userId srv onlyNeeded hasService batchSize cursor_ + if null qs then pure n else do + lift $ subscribe qs + let n' = n + length qs + lastRcvId = Just $ queueId $ last qs + if length qs < batchSize then pure n' else loop n' lastRcvId subscribe qs = do rs <- subscribeUserServerQueues c userId srv qs ns <- asks ntfSupervisor diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index f154a88f4..8f331991e 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1661,9 +1661,15 @@ checkQueues c = fmap partitionEithers . mapM checkQueue resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [RcvQueueSub] -> AM' () resubscribeSessQueues _ _ [] = pure () resubscribeSessQueues c tSess qs = do + batchSize <- asks $ subsBatchSize . config (errs, qs_) <- checkQueues c qs - forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c True (tSess, qs') + subscribeChunks $ toChunks batchSize qs_ forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map (first qConnId) + where + subscribeChunks [] = pure () + subscribeChunks (qs' : rest) = do + (_, active) <- subscribeSessQueues_ c True (tSess, qs') + when active $ subscribeChunks rest subscribeSessQueues_ :: AgentClient -> Bool -> (SMPTransportSession, NonEmpty RcvQueueSub) -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool) subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c NRMBackground qs diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 900323ede..af2842e1a 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -169,7 +169,7 @@ data AgentConfig = AgentConfig ntfBatchSize :: Int, ntfSubFirstCheckInterval :: NominalDiffTime, ntfSubCheckInterval :: NominalDiffTime, - maxPendingSubscriptions :: Int, + subsBatchSize :: Int, caCertificateFile :: FilePath, privateKeyFile :: FilePath, certificateFile :: FilePath, @@ -242,7 +242,7 @@ defaultAgentConfig = ntfBatchSize = 150, ntfSubFirstCheckInterval = nominalDay, ntfSubCheckInterval = 3 * nominalDay, - maxPendingSubscriptions = 35000, + subsBatchSize = 1350, -- CA certificate private key is not needed for initialization -- ! we do not generate these caCertificateFile = "/etc/opt/simplex-agent/ca.crt", diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index f6d1daebe..a392b9335 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -2336,14 +2336,14 @@ getSubscriptionServers db onlyNeeded = toUserServer (userId, host, port, keyHash) = (userId, SMPServer host port keyHash) -- TODO [certs rcv] check index for getting queues with service present -getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> Bool -> ServiceAssoc -> IO [RcvQueueSub] -getUserServerRcvQueueSubs db userId (SMPServer h p kh) onlyNeeded hasService = - map toRcvQueueSub - <$> DB.query - db - (rcvQueueSubQuery <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ? AND COALESCE(q.server_key_hash, s.key_hash) = ?" <> serviceCond) - (userId, h, p, kh) +getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> Bool -> ServiceAssoc -> Int -> Maybe SMP.RecipientId -> IO [RcvQueueSub] +getUserServerRcvQueueSubs db userId (SMPServer h p kh) onlyNeeded hasService limit cursor_ = + map toRcvQueueSub <$> case cursor_ of + Nothing -> DB.query db (q <> orderLimit) (userId, h, p, kh, limit) + Just cursor -> DB.query db (q <> " AND q.rcv_id > ? " <> orderLimit) (userId, h, p, kh, cursor, limit) where + q = rcvQueueSubQuery <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ? AND COALESCE(q.server_key_hash, s.key_hash) = ?" <> serviceCond + orderLimit = " ORDER BY q.rcv_id LIMIT ?" toSubscribe | onlyNeeded = " WHERE q.to_subscribe = 1 AND " | otherwise = " WHERE "