From 80aa56cbcce92f4b61cda06a965eef3d0f640df1 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Thu, 9 Oct 2025 18:33:27 +0100 Subject: [PATCH] agent: track which queues need subscribing for iOS NSE (#1657) * agent: track which queues need subscribing for iOS NSE * fix down migration * fix, cleanup --- simplexmq.cabal | 2 + src/Simplex/Messaging/Agent.hs | 31 +++++---- src/Simplex/Messaging/Agent/Client.hs | 1 + .../Messaging/Agent/Store/AgentStore.hs | 68 +++++++++++-------- .../Agent/Store/Postgres/Migrations/App.hs | 4 +- .../M20251009_queue_to_subscribe.hs | 23 +++++++ .../Agent/Store/SQLite/Migrations/App.hs | 4 +- .../M20251009_queue_to_subscribe.hs | 20 ++++++ .../Store/SQLite/Migrations/agent_schema.sql | 2 + tests/AgentTests/FunctionalAPITests.hs | 4 +- tests/AgentTests/SQLiteTests.hs | 14 ++-- 11 files changed, 121 insertions(+), 52 deletions(-) create mode 100644 src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20251009_queue_to_subscribe.hs create mode 100644 src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20251009_queue_to_subscribe.hs diff --git a/simplexmq.cabal b/simplexmq.cabal index cfe61a802..cf72107d4 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -163,6 +163,7 @@ library Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete + Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe else exposed-modules: Simplex.Messaging.Agent.Store.SQLite @@ -210,6 +211,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250203_msg_bodies Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250322_short_links Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitations_remove_cascade_delete + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe if flag(client_postgres) || flag(server_postgres) exposed-modules: Simplex.Messaging.Agent.Store.Postgres diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 7cdc35353..cd0d092a4 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -152,7 +152,7 @@ import Data.Functor.Identity import Data.Int (Int64) import Data.IntMap.Strict (IntMap) import qualified Data.IntMap.Strict as IM -import Data.List (find) +import Data.List (find, sortOn) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) @@ -445,8 +445,8 @@ subscribeConnections c = withAgentEnv c . subscribeConnections' c {-# INLINE subscribeConnections #-} -- | Subscribe to all connections -subscribeAllConnections :: AgentClient -> AE () -subscribeAllConnections c = withAgentEnv c $ subscribeAllConnections' c +subscribeAllConnections :: AgentClient -> Bool -> Maybe UserId -> AE () +subscribeAllConnections c = withAgentEnv c .: subscribeAllConnections' c -- | Get messages for connections (GET commands) getConnectionMessages :: AgentClient -> NonEmpty ConnMsgReq -> IO (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta))) @@ -972,7 +972,7 @@ newRcvConnSrv c nm userId connId enableNtfs cMode userData_ clientData pqInitKey ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing (rq, qUri, tSess, sessId) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \e -> liftIO (print e) >> throwE e atomically $ incSMPServerStat c userId srv connCreated - rq' <- withStore c $ \db -> updateNewConnRcv db connId rq + rq' <- withStore c $ \db -> updateNewConnRcv db connId rq subMode lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId mapM_ (newQueueNtfSubscription c rq') ntfServer_ pure (rq', qUri) @@ -1224,7 +1224,7 @@ createReplyQueue c nm ConnData {userId, connId, enableNtfs} SndQueue {smpClientV (rq, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode atomically $ incSMPServerStat c userId (qServer rq) connCreated let qInfo = toVersionT qUri smpClientVersion - rq' <- withStore c $ \db -> upgradeSndConnToDuplex db connId rq + rq' <- withStore c $ \db -> upgradeSndConnToDuplex db connId rq subMode lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId mapM_ (newQueueNtfSubscription c rq') ntfServer_ pure (qInfo, clientServiceId rq') @@ -1357,25 +1357,30 @@ subscribeConnections_ c conns = do when (actual /= expected) . atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ INTERNAL $ "subscribeConnections result size: " <> show actual <> ", expected " <> show expected) -subscribeAllConnections' :: AgentClient -> AM () -subscribeAllConnections' c = do - userSrvs <- withStore' c getSubscriptionServers - maxPending <- asks $ maxPendingSubscriptions . config - currPending <- newTVarIO 0 +subscribeAllConnections' :: AgentClient -> Bool -> Maybe UserId -> AM () +subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do + userSrvs <- withStore' c (`getSubscriptionServers` onlyNeeded) unless (null userSrvs) $ do - rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs + maxPending <- asks $ maxPendingSubscriptions . config + currPending <- newTVarIO 0 + let userSrvs' = case activeUserId_ of + Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs + Nothing -> userSrvs + rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs' let (errs, oks) = partitionEithers rs logInfo $ "subscribed " <> tshow (sum oks) <> " queues" forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",) + withStore' c unsetQueuesToSubscribe resumeAllDelivery resumeAllCommands c where + handleErr = (`catchAllErrors` \e -> notifySub' c "" (ERR e) >> throwE e) subscribeUserServer :: Int -> TVar Int -> (UserId, SMPServer) -> AM' (Either AgentErrorType Int) subscribeUserServer maxPending currPending (userId, srv) = do atomically $ whenM ((maxPending <=) <$> readTVar currPending) retry tryAllErrors' $ do qs <- withStore' c $ \db -> do - qs <- getUserServerRcvQueueSubs db userId srv + qs <- getUserServerRcvQueueSubs db userId srv onlyNeeded atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction pure qs let n = length qs @@ -2097,7 +2102,7 @@ switchDuplexConnection c nm (DuplexConnection cData@ConnData {connId, userId} rq -- The problem is that currently subscription already exists, and we do not support queues with credentials but without subscriptions. (q, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv' clientVRange SCMInvitation False SMSubscribe let rq' = (q :: NewRcvQueue) {primary = True, dbReplaceQueueId = Just dbQueueId} - rq'' <- withStore c $ \db -> addConnRcvQueue db connId rq' + rq'' <- withStore c $ \db -> addConnRcvQueue db connId rq' SMSubscribe lift $ addNewQueueSubscription c rq'' tSess sessId void . enqueueMessages c cData sqs SMP.noMsgFlags $ QADD [(qUri, Just (server, sndId))] rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSendingQADD diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 88d5a5bce..33be6a8be 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -158,6 +158,7 @@ module Simplex.Messaging.Agent.Client unsafeWithStore, storeError, notifySub, + notifySub', userServers, pickServer, getNextServer, diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 212d65584..a9334c331 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -41,6 +41,7 @@ module Simplex.Messaging.Agent.Store.AgentStore createSndConn, getSubscriptionServers, getUserServerRcvQueueSubs, + unsetQueuesToSubscribe, getConn, getDeletedConn, getConns, @@ -392,15 +393,15 @@ createNewConn db gVar cData cMode = do fst <$$> createConn_ gVar cData (\connId -> createConnRecord db connId cData cMode) -- TODO [certs rcv] store clientServiceId from NewRcvQueue -updateNewConnRcv :: DB.Connection -> ConnId -> NewRcvQueue -> IO (Either StoreError RcvQueue) -updateNewConnRcv db connId rq = +updateNewConnRcv :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> IO (Either StoreError RcvQueue) +updateNewConnRcv db connId rq subMode = getConn db connId $>>= \case (SomeConn _ NewConnection {}) -> updateConn (SomeConn _ RcvConnection {}) -> updateConn -- to allow retries (SomeConn c _) -> pure . Left . SEBadConnType "updateNewConnRcv" $ connType c where updateConn :: IO (Either StoreError RcvQueue) - updateConn = Right <$> addConnRcvQueue_ db connId rq + updateConn = Right <$> addConnRcvQueue_ db connId rq subMode updateNewConnSnd :: DB.Connection -> ConnId -> NewSndQueue -> IO (Either StoreError SndQueue) updateNewConnSnd db connId sq = @@ -482,25 +483,25 @@ upgradeRcvConnToDuplex db connId sq = (SomeConn c _) -> pure . Left . SEBadConnType "upgradeRcvConnToDuplex" $ connType c -- TODO [certs rcv] store clientServiceId from NewRcvQueue -upgradeSndConnToDuplex :: DB.Connection -> ConnId -> NewRcvQueue -> IO (Either StoreError RcvQueue) -upgradeSndConnToDuplex db connId rq = +upgradeSndConnToDuplex :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> IO (Either StoreError RcvQueue) +upgradeSndConnToDuplex db connId rq subMode = getConn db connId >>= \case - Right (SomeConn _ SndConnection {}) -> Right <$> addConnRcvQueue_ db connId rq + Right (SomeConn _ SndConnection {}) -> Right <$> addConnRcvQueue_ db connId rq subMode Right (SomeConn c _) -> pure . Left . SEBadConnType "upgradeSndConnToDuplex" $ connType c _ -> pure $ Left SEConnNotFound -- TODO [certs rcv] store clientServiceId from NewRcvQueue -addConnRcvQueue :: DB.Connection -> ConnId -> NewRcvQueue -> IO (Either StoreError RcvQueue) -addConnRcvQueue db connId rq = +addConnRcvQueue :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> IO (Either StoreError RcvQueue) +addConnRcvQueue db connId rq subMode = getConn db connId >>= \case - Right (SomeConn _ DuplexConnection {}) -> Right <$> addConnRcvQueue_ db connId rq + Right (SomeConn _ DuplexConnection {}) -> Right <$> addConnRcvQueue_ db connId rq subMode Right (SomeConn c _) -> pure . Left . SEBadConnType "addConnRcvQueue" $ connType c _ -> pure $ Left SEConnNotFound -addConnRcvQueue_ :: DB.Connection -> ConnId -> NewRcvQueue -> IO RcvQueue -addConnRcvQueue_ db connId rq@RcvQueue {server} = do +addConnRcvQueue_ :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> IO RcvQueue +addConnRcvQueue_ db connId rq@RcvQueue {server} subMode = do serverKeyHash_ <- createServer_ db server - insertRcvQueue_ db connId rq serverKeyHash_ + insertRcvQueue_ db connId rq subMode serverKeyHash_ addConnSndQueue :: DB.Connection -> ConnId -> NewSndQueue -> IO (Either StoreError SndQueue) addConnSndQueue db connId sq = @@ -1983,8 +1984,8 @@ upsertNtfServer_ db ProtocolServer {host, port, keyHash} = do -- * createRcvConn helpers -insertRcvQueue_ :: DB.Connection -> ConnId -> NewRcvQueue -> Maybe C.KeyHash -> IO RcvQueue -insertRcvQueue_ db connId' rq@RcvQueue {..} serverKeyHash_ = do +insertRcvQueue_ :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> Maybe C.KeyHash -> IO RcvQueue +insertRcvQueue_ db connId' rq@RcvQueue {..} subMode serverKeyHash_ = do -- to preserve ID if the queue already exists. -- possibly, it can be done in one query. currQId_ <- maybeFirstRow fromOnly $ DB.query db "SELECT rcv_queue_id FROM rcv_queues WHERE conn_id = ? AND host = ? AND port = ? AND snd_id = ?" (connId', host server, port server, sndId) @@ -1994,19 +1995,20 @@ insertRcvQueue_ db connId' rq@RcvQueue {..} serverKeyHash_ = do [sql| INSERT INTO rcv_queues ( host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret, - snd_id, queue_mode, status, rcv_queue_id, rcv_primary, replace_rcv_queue_id, smp_client_version, server_key_hash, + snd_id, queue_mode, status, to_subscribe, rcv_queue_id, rcv_primary, replace_rcv_queue_id, smp_client_version, server_key_hash, link_id, link_key, link_priv_sig_key, link_enc_fixed_data, ntf_public_key, ntf_private_key, ntf_id, rcv_ntf_dh_secret - ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?); + ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?); |] ( (host server, port server, rcvId, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret) - :. (sndId, queueMode, status, qId, BI primary, dbReplaceQueueId, smpClientVersion, serverKeyHash_) + :. (sndId, queueMode, status, BI toSubscribe, qId, BI primary, dbReplaceQueueId, smpClientVersion, serverKeyHash_) :. (shortLinkId <$> shortLink, shortLinkKey <$> shortLink, linkPrivSigKey <$> shortLink, linkEncFixedData <$> shortLink) :. ntfCredsFields ) -- TODO [certs rcv] save client service pure (rq :: NewRcvQueue) {connId = connId', dbQueueId = qId, clientService = Nothing} where + toSubscribe = subMode == SMOnlyCreate ntfCredsFields = case clientNtfCreds of Just ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} -> (Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret) @@ -2053,27 +2055,37 @@ newQueueId_ (Only maxId : _) = DBEntityId (maxId + 1) -- * subscribe all connections -getSubscriptionServers :: DB.Connection -> IO [(UserId, SMPServer)] -getSubscriptionServers db = - map toUserServer - <$> DB.query_ - db +getSubscriptionServers :: DB.Connection -> Bool -> IO [(UserId, SMPServer)] +getSubscriptionServers db onlyNeeded = + map toUserServer <$> DB.query_ db (select <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0") + where + select = [sql| SELECT DISTINCT c.user_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash) FROM rcv_queues q JOIN servers s ON q.host = s.host AND q.port = s.port JOIN connections c ON q.conn_id = c.conn_id - WHERE c.deleted = 0 AND q.deleted = 0 |] - where + toSubscribe + | onlyNeeded = " WHERE q.to_subscribe = 1 AND " + | otherwise = " WHERE " toUserServer :: (UserId, NonEmpty TransportHost, ServiceName, C.KeyHash) -> (UserId, SMPServer) toUserServer (userId, host, port, keyHash) = (userId, SMPServer host port keyHash) -getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub] -getUserServerRcvQueueSubs db userId srv = - map toRcvQueueSub <$> DB.query db (rcvQueueSubQuery <> condition) (userId, host srv, port srv) +getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> Bool -> IO [RcvQueueSub] +getUserServerRcvQueueSubs db userId srv onlyNeeded = + map toRcvQueueSub + <$> DB.query + db + (rcvQueueSubQuery <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ?") + (userId, host srv, port srv) where - condition = " WHERE c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ?" + toSubscribe + | onlyNeeded = " WHERE q.to_subscribe = 1 AND " + | otherwise = " WHERE " + +unsetQueuesToSubscribe :: DB.Connection -> IO () +unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1" -- * getConn helpers diff --git a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/App.hs b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/App.hs index 565a06760..ecf0e1377 100644 --- a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/App.hs +++ b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/App.hs @@ -8,6 +8,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20241210_initial import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete +import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe import Simplex.Messaging.Agent.Store.Shared (Migration (..)) schemaMigrations :: [(String, Text, Maybe Text)] @@ -15,7 +16,8 @@ schemaMigrations = [ ("20241210_initial", m20241210_initial, Nothing), ("20250203_msg_bodies", m20250203_msg_bodies, Just down_m20250203_msg_bodies), ("20250322_short_links", m20250322_short_links, Just down_m20250322_short_links), - ("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete) + ("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete), + ("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20251009_queue_to_subscribe.hs b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20251009_queue_to_subscribe.hs new file mode 100644 index 000000000..8a0bf5862 --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20251009_queue_to_subscribe.hs @@ -0,0 +1,23 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe where + +import Data.Text (Text) +import qualified Data.Text as T +import Text.RawString.QQ (r) + +m20251009_queue_to_subscribe :: Text +m20251009_queue_to_subscribe = + T.pack + [r| +ALTER TABLE rcv_queues ADD COLUMN to_subscribe SMALLINT NOT NULL DEFAULT 0; +CREATE INDEX idx_rcv_queues_to_subscribe ON rcv_queues(to_subscribe); +|] + +down_m20251009_queue_to_subscribe :: Text +down_m20251009_queue_to_subscribe = + T.pack + [r| +DROP INDEX idx_rcv_queues_to_subscribe; +ALTER TABLE rcv_queues DROP COLUMN to_subscribe; +|] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/App.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/App.hs index 9d5d65ea7..73e931393 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/App.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/App.hs @@ -44,6 +44,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20241224_ratchet_e2e_snd import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250203_msg_bodies import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250322_short_links import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitations_remove_cascade_delete +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe import Simplex.Messaging.Agent.Store.Shared (Migration (..)) schemaMigrations :: [(String, Query, Maybe Query)] @@ -87,7 +88,8 @@ schemaMigrations = ("m20241224_ratchet_e2e_snd_params", m20241224_ratchet_e2e_snd_params, Just down_m20241224_ratchet_e2e_snd_params), ("m20250203_msg_bodies", m20250203_msg_bodies, Just down_m20250203_msg_bodies), ("m20250322_short_links", m20250322_short_links, Just down_m20250322_short_links), - ("m20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete) + ("m20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete), + ("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20251009_queue_to_subscribe.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20251009_queue_to_subscribe.hs new file mode 100644 index 000000000..76ee208df --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20251009_queue_to_subscribe.hs @@ -0,0 +1,20 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20251009_queue_to_subscribe :: Query +m20251009_queue_to_subscribe = + [sql| +ALTER TABLE rcv_queues ADD COLUMN to_subscribe INTEGER NOT NULL DEFAULT 0; +CREATE INDEX idx_rcv_queues_to_subscribe ON rcv_queues(to_subscribe); +|] + +down_m20251009_queue_to_subscribe :: Query +down_m20251009_queue_to_subscribe = + [sql| +DROP INDEX idx_rcv_queues_to_subscribe; +ALTER TABLE rcv_queues DROP COLUMN to_subscribe; +|] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index 89a3bf52d..0a3439124 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -47,6 +47,7 @@ CREATE TABLE rcv_queues( ntf_private_key BLOB, ntf_id BLOB, rcv_ntf_dh_secret BLOB, + to_subscribe INTEGER NOT NULL DEFAULT 0, rcv_queue_id INTEGER CHECK(rcv_queue_id NOT NULL), rcv_primary INTEGER CHECK(rcv_primary NOT NULL), replace_rcv_queue_id INTEGER NULL, @@ -437,6 +438,7 @@ CREATE TABLE inv_short_links( FOREIGN KEY(host, port) REFERENCES servers ON DELETE RESTRICT ON UPDATE CASCADE ); CREATE UNIQUE INDEX idx_rcv_queues_ntf ON rcv_queues(host, port, ntf_id); +CREATE INDEX idx_rcv_queues_to_subscribe ON rcv_queues(to_subscribe); CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(conn_id, rcv_queue_id); CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(conn_id, snd_queue_id); CREATE INDEX idx_snd_message_deliveries ON snd_message_deliveries( diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 39af85977..bc6b7e6f0 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -2437,7 +2437,7 @@ testBatchedSubscriptions nCreate nDel ps@(t, ASType qsType _) = do liftIO $ S.fromList (cs1 ++ cs2) `shouldBe` S.fromList cs subscribe :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO () subscribe c cs = do - subscribeAllConnections c + subscribeAllConnections c False Nothing liftIO $ up c cs delete :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO () delete c cs = do @@ -2469,7 +2469,7 @@ testBatchedPendingMessages nCreate nMsgs = replicateM_ nMsgs $ get a =##> \case ("", cId, SENT _) -> isJust $ find ((cId ==) . snd) msgConns; _ -> False withB $ \b -> runRight_ $ do let aIds = map fst conns - subscribeAllConnections b + subscribeAllConnections b False Nothing ("", "", UP _ aIds') <- nGet b liftIO $ S.fromList aIds' `shouldBe` S.fromList aIds replicateM_ nMsgs $ do diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index b3681965f..53adf653a 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -261,7 +261,7 @@ sndQueue1 = createRcvConn :: DB.Connection -> TVar ChaChaDRG -> ConnData -> NewRcvQueue -> SConnectionMode c -> IO (Either StoreError (ConnId, RcvQueue)) createRcvConn db g cData rq cMode = runExceptT $ do connId <- ExceptT $ createNewConn db g cData cMode - rq' <- ExceptT $ updateNewConnRcv db connId rq + rq' <- ExceptT $ updateNewConnRcv db connId rq SMSubscribe pure (connId, rq') testCreateRcvConn :: SpecWith DBStore @@ -307,7 +307,7 @@ testCreateSndConn = dbQueueId `shouldBe` DBEntityId 1 getConn db "conn1" `shouldReturn` Right (SomeConn SCSnd (SndConnection cData1 sq)) - Right rq@RcvQueue {dbQueueId = dbQueueId'} <- upgradeSndConnToDuplex db "conn1" rcvQueue1 + Right rq@RcvQueue {dbQueueId = dbQueueId'} <- upgradeSndConnToDuplex db "conn1" rcvQueue1 SMSubscribe dbQueueId' `shouldBe` DBEntityId 1 getConn db "conn1" `shouldReturn` Right (SomeConn SCDuplex (DuplexConnection cData1 [rq] [sq])) @@ -319,7 +319,7 @@ testCreateSndConnRandomID = Right (connId, sq) <- createSndConn db g cData1 {connId = ""} sndQueue1 getConn db connId `shouldReturn` Right (SomeConn SCSnd (SndConnection cData1 {connId} sq)) - Right (rq@RcvQueue {dbQueueId = dbQueueId'}) <- upgradeSndConnToDuplex db connId rcvQueue1 + Right (rq@RcvQueue {dbQueueId = dbQueueId'}) <- upgradeSndConnToDuplex db connId rcvQueue1 SMSubscribe dbQueueId' `shouldBe` DBEntityId 1 getConn db connId `shouldReturn` Right (SomeConn SCDuplex (DuplexConnection cData1 {connId} [rq] [sq])) @@ -418,7 +418,7 @@ testUpgradeRcvConnToDuplex = } upgradeRcvConnToDuplex db "conn1" anotherSndQueue `shouldReturn` Left (SEBadConnType "upgradeRcvConnToDuplex" CSnd) - _ <- upgradeSndConnToDuplex db "conn1" rcvQueue1 + _ <- upgradeSndConnToDuplex db "conn1" rcvQueue1 SMSubscribe upgradeRcvConnToDuplex db "conn1" anotherSndQueue `shouldReturn` Left (SEBadConnType "upgradeRcvConnToDuplex" CDuplex) @@ -451,10 +451,10 @@ testUpgradeSndConnToDuplex = clientNtfCreds = Nothing, deleteErrors = 0 } - upgradeSndConnToDuplex db "conn1" anotherRcvQueue + upgradeSndConnToDuplex db "conn1" anotherRcvQueue SMSubscribe `shouldReturn` Left (SEBadConnType "upgradeSndConnToDuplex" CRcv) _ <- upgradeRcvConnToDuplex db "conn1" sndQueue1 - upgradeSndConnToDuplex db "conn1" anotherRcvQueue + upgradeSndConnToDuplex db "conn1" anotherRcvQueue SMSubscribe `shouldReturn` Left (SEBadConnType "upgradeSndConnToDuplex" CDuplex) testSetRcvQueueStatus :: SpecWith DBStore @@ -681,7 +681,7 @@ testGetPendingServerCommand st = do Right (Just PendingCommand {corrId}) <- getPendingServerCommand db connId Nothing corrId `shouldBe` "2" - Right _ <- updateNewConnRcv db connId rcvQueue1 + Right _ <- updateNewConnRcv db connId rcvQueue1 SMSubscribe Right Nothing <- getPendingServerCommand db connId $ Just smpServer1 Right () <- createCommand db "3" connId (Just smpServer1) command corruptCmd db "3" connId