mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-03 19:41:26 +00:00
Merge branch 'master' into rcv-services
This commit is contained in:
@@ -1597,8 +1597,7 @@ subscribeAllConnections' :: AgentClient -> Bool -> Maybe UserId -> AM ()
|
||||
subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
|
||||
userSrvs <- withStore' c (`getSubscriptionServers` onlyNeeded)
|
||||
unless (null userSrvs) $ do
|
||||
maxPending <- asks $ maxPendingSubscriptions . config
|
||||
currPending <- newTVarIO 0
|
||||
batchSize <- asks $ subsBatchSize . config
|
||||
let userSrvs' = case activeUserId_ of
|
||||
Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs
|
||||
Nothing -> userSrvs
|
||||
@@ -1610,7 +1609,7 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
|
||||
-- On successful service subscription, only unassociated queues will be subscribed.
|
||||
userSrvs2 <- withStore' c $ \db -> mapM (getService db useServices) userSrvs'
|
||||
userSrvs3 <- lift $ mapConcurrently subscribeService userSrvs2
|
||||
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs3
|
||||
rs <- lift $ mapConcurrently (subscribeUserServer batchSize) userSrvs3
|
||||
let (errs, oks) = partitionEithers rs
|
||||
logInfo $ "subscribed " <> tshow (sum oks) <> " queues"
|
||||
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",)
|
||||
@@ -1647,18 +1646,16 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
|
||||
unassocQueues :: AM Bool
|
||||
unassocQueues = False <$ withStore' c (\db -> removeRcvServiceAssocs db userId srv)
|
||||
_ -> pure False
|
||||
subscribeUserServer :: Int -> TVar Int -> ((UserId, SMPServer), ServiceAssoc) -> AM' (Either AgentErrorType Int)
|
||||
subscribeUserServer maxPending currPending ((userId, srv), hasService) = do
|
||||
atomically $ whenM ((maxPending <=) <$> readTVar currPending) retry
|
||||
tryAllErrors' $ do
|
||||
qs <- withStore' c $ \db -> do
|
||||
qs <- getUserServerRcvQueueSubs db userId srv onlyNeeded hasService
|
||||
unless (null qs) $ atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction
|
||||
pure qs
|
||||
let n = length qs
|
||||
unless (null qs) $ lift $ subscribe qs `E.finally` atomically (modifyTVar' currPending $ subtract n)
|
||||
pure n
|
||||
subscribeUserServer :: Int -> ((UserId, SMPServer), ServiceAssoc) -> AM' (Either AgentErrorType Int)
|
||||
subscribeUserServer batchSize ((userId, srv), hasService) = tryAllErrors' $ loop 0 Nothing
|
||||
where
|
||||
loop !n cursor_ = do
|
||||
qs <- withStore' c $ \db -> getUserServerRcvQueueSubs db userId srv onlyNeeded hasService batchSize cursor_
|
||||
if null qs then pure n else do
|
||||
lift $ subscribe qs
|
||||
let n' = n + length qs
|
||||
lastRcvId = Just $ queueId $ last qs
|
||||
if length qs < batchSize then pure n' else loop n' lastRcvId
|
||||
subscribe qs = do
|
||||
rs <- subscribeUserServerQueues c userId srv qs
|
||||
ns <- asks ntfSupervisor
|
||||
|
||||
@@ -1661,9 +1661,15 @@ checkQueues c = fmap partitionEithers . mapM checkQueue
|
||||
resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [RcvQueueSub] -> AM' ()
|
||||
resubscribeSessQueues _ _ [] = pure ()
|
||||
resubscribeSessQueues c tSess qs = do
|
||||
batchSize <- asks $ subsBatchSize . config
|
||||
(errs, qs_) <- checkQueues c qs
|
||||
forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c True (tSess, qs')
|
||||
subscribeChunks $ toChunks batchSize qs_
|
||||
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map (first qConnId)
|
||||
where
|
||||
subscribeChunks [] = pure ()
|
||||
subscribeChunks (qs' : rest) = do
|
||||
(_, active) <- subscribeSessQueues_ c True (tSess, qs')
|
||||
when active $ subscribeChunks rest
|
||||
|
||||
subscribeSessQueues_ :: AgentClient -> Bool -> (SMPTransportSession, NonEmpty RcvQueueSub) -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool)
|
||||
subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c NRMBackground qs
|
||||
|
||||
@@ -169,7 +169,7 @@ data AgentConfig = AgentConfig
|
||||
ntfBatchSize :: Int,
|
||||
ntfSubFirstCheckInterval :: NominalDiffTime,
|
||||
ntfSubCheckInterval :: NominalDiffTime,
|
||||
maxPendingSubscriptions :: Int,
|
||||
subsBatchSize :: Int,
|
||||
caCertificateFile :: FilePath,
|
||||
privateKeyFile :: FilePath,
|
||||
certificateFile :: FilePath,
|
||||
@@ -242,7 +242,7 @@ defaultAgentConfig =
|
||||
ntfBatchSize = 150,
|
||||
ntfSubFirstCheckInterval = nominalDay,
|
||||
ntfSubCheckInterval = 3 * nominalDay,
|
||||
maxPendingSubscriptions = 35000,
|
||||
subsBatchSize = 1350,
|
||||
-- CA certificate private key is not needed for initialization
|
||||
-- ! we do not generate these
|
||||
caCertificateFile = "/etc/opt/simplex-agent/ca.crt",
|
||||
|
||||
@@ -2336,14 +2336,14 @@ getSubscriptionServers db onlyNeeded =
|
||||
toUserServer (userId, host, port, keyHash) = (userId, SMPServer host port keyHash)
|
||||
|
||||
-- TODO [certs rcv] check index for getting queues with service present
|
||||
getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> Bool -> ServiceAssoc -> IO [RcvQueueSub]
|
||||
getUserServerRcvQueueSubs db userId (SMPServer h p kh) onlyNeeded hasService =
|
||||
map toRcvQueueSub
|
||||
<$> DB.query
|
||||
db
|
||||
(rcvQueueSubQuery <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ? AND COALESCE(q.server_key_hash, s.key_hash) = ?" <> serviceCond)
|
||||
(userId, h, p, kh)
|
||||
getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> Bool -> ServiceAssoc -> Int -> Maybe SMP.RecipientId -> IO [RcvQueueSub]
|
||||
getUserServerRcvQueueSubs db userId (SMPServer h p kh) onlyNeeded hasService limit cursor_ =
|
||||
map toRcvQueueSub <$> case cursor_ of
|
||||
Nothing -> DB.query db (q <> orderLimit) (userId, h, p, kh, limit)
|
||||
Just cursor -> DB.query db (q <> " AND q.rcv_id > ? " <> orderLimit) (userId, h, p, kh, cursor, limit)
|
||||
where
|
||||
q = rcvQueueSubQuery <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ? AND COALESCE(q.server_key_hash, s.key_hash) = ?" <> serviceCond
|
||||
orderLimit = " ORDER BY q.rcv_id LIMIT ?"
|
||||
toSubscribe
|
||||
| onlyNeeded = " WHERE q.to_subscribe = 1 AND "
|
||||
| otherwise = " WHERE "
|
||||
|
||||
Reference in New Issue
Block a user