agent: track which queues need subscribing for iOS NSE (#1657)

* agent: track which queues need subscribing for iOS NSE

* fix down migration

* fix, cleanup
This commit is contained in:
Evgeny
2025-10-09 18:33:27 +01:00
committed by GitHub
parent 9cda20381f
commit 80aa56cbcc
11 changed files with 121 additions and 52 deletions

View File

@@ -163,6 +163,7 @@ library
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
else
exposed-modules:
Simplex.Messaging.Agent.Store.SQLite
@@ -210,6 +211,7 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250203_msg_bodies
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250322_short_links
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitations_remove_cascade_delete
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
if flag(client_postgres) || flag(server_postgres)
exposed-modules:
Simplex.Messaging.Agent.Store.Postgres

View File

@@ -152,7 +152,7 @@ import Data.Functor.Identity
import Data.Int (Int64)
import Data.IntMap.Strict (IntMap)
import qualified Data.IntMap.Strict as IM
import Data.List (find)
import Data.List (find, sortOn)
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
@@ -445,8 +445,8 @@ subscribeConnections c = withAgentEnv c . subscribeConnections' c
{-# INLINE subscribeConnections #-}
-- | Subscribe to all connections
subscribeAllConnections :: AgentClient -> AE ()
subscribeAllConnections c = withAgentEnv c $ subscribeAllConnections' c
subscribeAllConnections :: AgentClient -> Bool -> Maybe UserId -> AE ()
subscribeAllConnections c = withAgentEnv c .: subscribeAllConnections' c
-- | Get messages for connections (GET commands)
getConnectionMessages :: AgentClient -> NonEmpty ConnMsgReq -> IO (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))
@@ -972,7 +972,7 @@ newRcvConnSrv c nm userId connId enableNtfs cMode userData_ clientData pqInitKey
ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
(rq, qUri, tSess, sessId) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \e -> liftIO (print e) >> throwE e
atomically $ incSMPServerStat c userId srv connCreated
rq' <- withStore c $ \db -> updateNewConnRcv db connId rq
rq' <- withStore c $ \db -> updateNewConnRcv db connId rq subMode
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
mapM_ (newQueueNtfSubscription c rq') ntfServer_
pure (rq', qUri)
@@ -1224,7 +1224,7 @@ createReplyQueue c nm ConnData {userId, connId, enableNtfs} SndQueue {smpClientV
(rq, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode
atomically $ incSMPServerStat c userId (qServer rq) connCreated
let qInfo = toVersionT qUri smpClientVersion
rq' <- withStore c $ \db -> upgradeSndConnToDuplex db connId rq
rq' <- withStore c $ \db -> upgradeSndConnToDuplex db connId rq subMode
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
mapM_ (newQueueNtfSubscription c rq') ntfServer_
pure (qInfo, clientServiceId rq')
@@ -1357,25 +1357,30 @@ subscribeConnections_ c conns = do
when (actual /= expected) . atomically $
writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ INTERNAL $ "subscribeConnections result size: " <> show actual <> ", expected " <> show expected)
subscribeAllConnections' :: AgentClient -> AM ()
subscribeAllConnections' c = do
userSrvs <- withStore' c getSubscriptionServers
maxPending <- asks $ maxPendingSubscriptions . config
currPending <- newTVarIO 0
subscribeAllConnections' :: AgentClient -> Bool -> Maybe UserId -> AM ()
subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
userSrvs <- withStore' c (`getSubscriptionServers` onlyNeeded)
unless (null userSrvs) $ do
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs
maxPending <- asks $ maxPendingSubscriptions . config
currPending <- newTVarIO 0
let userSrvs' = case activeUserId_ of
Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs
Nothing -> userSrvs
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs'
let (errs, oks) = partitionEithers rs
logInfo $ "subscribed " <> tshow (sum oks) <> " queues"
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",)
withStore' c unsetQueuesToSubscribe
resumeAllDelivery
resumeAllCommands c
where
handleErr = (`catchAllErrors` \e -> notifySub' c "" (ERR e) >> throwE e)
subscribeUserServer :: Int -> TVar Int -> (UserId, SMPServer) -> AM' (Either AgentErrorType Int)
subscribeUserServer maxPending currPending (userId, srv) = do
atomically $ whenM ((maxPending <=) <$> readTVar currPending) retry
tryAllErrors' $ do
qs <- withStore' c $ \db -> do
qs <- getUserServerRcvQueueSubs db userId srv
qs <- getUserServerRcvQueueSubs db userId srv onlyNeeded
atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction
pure qs
let n = length qs
@@ -2097,7 +2102,7 @@ switchDuplexConnection c nm (DuplexConnection cData@ConnData {connId, userId} rq
-- The problem is that currently subscription already exists, and we do not support queues with credentials but without subscriptions.
(q, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv' clientVRange SCMInvitation False SMSubscribe
let rq' = (q :: NewRcvQueue) {primary = True, dbReplaceQueueId = Just dbQueueId}
rq'' <- withStore c $ \db -> addConnRcvQueue db connId rq'
rq'' <- withStore c $ \db -> addConnRcvQueue db connId rq' SMSubscribe
lift $ addNewQueueSubscription c rq'' tSess sessId
void . enqueueMessages c cData sqs SMP.noMsgFlags $ QADD [(qUri, Just (server, sndId))]
rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSendingQADD

View File

@@ -158,6 +158,7 @@ module Simplex.Messaging.Agent.Client
unsafeWithStore,
storeError,
notifySub,
notifySub',
userServers,
pickServer,
getNextServer,

View File

@@ -41,6 +41,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
createSndConn,
getSubscriptionServers,
getUserServerRcvQueueSubs,
unsetQueuesToSubscribe,
getConn,
getDeletedConn,
getConns,
@@ -392,15 +393,15 @@ createNewConn db gVar cData cMode = do
fst <$$> createConn_ gVar cData (\connId -> createConnRecord db connId cData cMode)
-- TODO [certs rcv] store clientServiceId from NewRcvQueue
updateNewConnRcv :: DB.Connection -> ConnId -> NewRcvQueue -> IO (Either StoreError RcvQueue)
updateNewConnRcv db connId rq =
updateNewConnRcv :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> IO (Either StoreError RcvQueue)
updateNewConnRcv db connId rq subMode =
getConn db connId $>>= \case
(SomeConn _ NewConnection {}) -> updateConn
(SomeConn _ RcvConnection {}) -> updateConn -- to allow retries
(SomeConn c _) -> pure . Left . SEBadConnType "updateNewConnRcv" $ connType c
where
updateConn :: IO (Either StoreError RcvQueue)
updateConn = Right <$> addConnRcvQueue_ db connId rq
updateConn = Right <$> addConnRcvQueue_ db connId rq subMode
updateNewConnSnd :: DB.Connection -> ConnId -> NewSndQueue -> IO (Either StoreError SndQueue)
updateNewConnSnd db connId sq =
@@ -482,25 +483,25 @@ upgradeRcvConnToDuplex db connId sq =
(SomeConn c _) -> pure . Left . SEBadConnType "upgradeRcvConnToDuplex" $ connType c
-- TODO [certs rcv] store clientServiceId from NewRcvQueue
upgradeSndConnToDuplex :: DB.Connection -> ConnId -> NewRcvQueue -> IO (Either StoreError RcvQueue)
upgradeSndConnToDuplex db connId rq =
upgradeSndConnToDuplex :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> IO (Either StoreError RcvQueue)
upgradeSndConnToDuplex db connId rq subMode =
getConn db connId >>= \case
Right (SomeConn _ SndConnection {}) -> Right <$> addConnRcvQueue_ db connId rq
Right (SomeConn _ SndConnection {}) -> Right <$> addConnRcvQueue_ db connId rq subMode
Right (SomeConn c _) -> pure . Left . SEBadConnType "upgradeSndConnToDuplex" $ connType c
_ -> pure $ Left SEConnNotFound
-- TODO [certs rcv] store clientServiceId from NewRcvQueue
addConnRcvQueue :: DB.Connection -> ConnId -> NewRcvQueue -> IO (Either StoreError RcvQueue)
addConnRcvQueue db connId rq =
addConnRcvQueue :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> IO (Either StoreError RcvQueue)
addConnRcvQueue db connId rq subMode =
getConn db connId >>= \case
Right (SomeConn _ DuplexConnection {}) -> Right <$> addConnRcvQueue_ db connId rq
Right (SomeConn _ DuplexConnection {}) -> Right <$> addConnRcvQueue_ db connId rq subMode
Right (SomeConn c _) -> pure . Left . SEBadConnType "addConnRcvQueue" $ connType c
_ -> pure $ Left SEConnNotFound
addConnRcvQueue_ :: DB.Connection -> ConnId -> NewRcvQueue -> IO RcvQueue
addConnRcvQueue_ db connId rq@RcvQueue {server} = do
addConnRcvQueue_ :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> IO RcvQueue
addConnRcvQueue_ db connId rq@RcvQueue {server} subMode = do
serverKeyHash_ <- createServer_ db server
insertRcvQueue_ db connId rq serverKeyHash_
insertRcvQueue_ db connId rq subMode serverKeyHash_
addConnSndQueue :: DB.Connection -> ConnId -> NewSndQueue -> IO (Either StoreError SndQueue)
addConnSndQueue db connId sq =
@@ -1983,8 +1984,8 @@ upsertNtfServer_ db ProtocolServer {host, port, keyHash} = do
-- * createRcvConn helpers
insertRcvQueue_ :: DB.Connection -> ConnId -> NewRcvQueue -> Maybe C.KeyHash -> IO RcvQueue
insertRcvQueue_ db connId' rq@RcvQueue {..} serverKeyHash_ = do
insertRcvQueue_ :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> Maybe C.KeyHash -> IO RcvQueue
insertRcvQueue_ db connId' rq@RcvQueue {..} subMode serverKeyHash_ = do
-- to preserve ID if the queue already exists.
-- possibly, it can be done in one query.
currQId_ <- maybeFirstRow fromOnly $ DB.query db "SELECT rcv_queue_id FROM rcv_queues WHERE conn_id = ? AND host = ? AND port = ? AND snd_id = ?" (connId', host server, port server, sndId)
@@ -1994,19 +1995,20 @@ insertRcvQueue_ db connId' rq@RcvQueue {..} serverKeyHash_ = do
[sql|
INSERT INTO rcv_queues
( host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret,
snd_id, queue_mode, status, rcv_queue_id, rcv_primary, replace_rcv_queue_id, smp_client_version, server_key_hash,
snd_id, queue_mode, status, to_subscribe, rcv_queue_id, rcv_primary, replace_rcv_queue_id, smp_client_version, server_key_hash,
link_id, link_key, link_priv_sig_key, link_enc_fixed_data,
ntf_public_key, ntf_private_key, ntf_id, rcv_ntf_dh_secret
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);
|]
( (host server, port server, rcvId, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret)
:. (sndId, queueMode, status, qId, BI primary, dbReplaceQueueId, smpClientVersion, serverKeyHash_)
:. (sndId, queueMode, status, BI toSubscribe, qId, BI primary, dbReplaceQueueId, smpClientVersion, serverKeyHash_)
:. (shortLinkId <$> shortLink, shortLinkKey <$> shortLink, linkPrivSigKey <$> shortLink, linkEncFixedData <$> shortLink)
:. ntfCredsFields
)
-- TODO [certs rcv] save client service
pure (rq :: NewRcvQueue) {connId = connId', dbQueueId = qId, clientService = Nothing}
where
toSubscribe = subMode == SMOnlyCreate
ntfCredsFields = case clientNtfCreds of
Just ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} ->
(Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret)
@@ -2053,27 +2055,37 @@ newQueueId_ (Only maxId : _) = DBEntityId (maxId + 1)
-- * subscribe all connections
getSubscriptionServers :: DB.Connection -> IO [(UserId, SMPServer)]
getSubscriptionServers db =
map toUserServer
<$> DB.query_
db
getSubscriptionServers :: DB.Connection -> Bool -> IO [(UserId, SMPServer)]
getSubscriptionServers db onlyNeeded =
map toUserServer <$> DB.query_ db (select <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0")
where
select =
[sql|
SELECT DISTINCT c.user_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash)
FROM rcv_queues q
JOIN servers s ON q.host = s.host AND q.port = s.port
JOIN connections c ON q.conn_id = c.conn_id
WHERE c.deleted = 0 AND q.deleted = 0
|]
where
toSubscribe
| onlyNeeded = " WHERE q.to_subscribe = 1 AND "
| otherwise = " WHERE "
toUserServer :: (UserId, NonEmpty TransportHost, ServiceName, C.KeyHash) -> (UserId, SMPServer)
toUserServer (userId, host, port, keyHash) = (userId, SMPServer host port keyHash)
getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub]
getUserServerRcvQueueSubs db userId srv =
map toRcvQueueSub <$> DB.query db (rcvQueueSubQuery <> condition) (userId, host srv, port srv)
getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> Bool -> IO [RcvQueueSub]
getUserServerRcvQueueSubs db userId srv onlyNeeded =
map toRcvQueueSub
<$> DB.query
db
(rcvQueueSubQuery <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ?")
(userId, host srv, port srv)
where
condition = " WHERE c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ?"
toSubscribe
| onlyNeeded = " WHERE q.to_subscribe = 1 AND "
| otherwise = " WHERE "
unsetQueuesToSubscribe :: DB.Connection -> IO ()
unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1"
-- * getConn helpers

View File

@@ -8,6 +8,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20241210_initial
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
schemaMigrations :: [(String, Text, Maybe Text)]
@@ -15,7 +16,8 @@ schemaMigrations =
[ ("20241210_initial", m20241210_initial, Nothing),
("20250203_msg_bodies", m20250203_msg_bodies, Just down_m20250203_msg_bodies),
("20250322_short_links", m20250322_short_links, Just down_m20250322_short_links),
("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete)
("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe)
]
-- | The list of migrations in ascending order by date

View File

@@ -0,0 +1,23 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe where
import Data.Text (Text)
import qualified Data.Text as T
import Text.RawString.QQ (r)
m20251009_queue_to_subscribe :: Text
m20251009_queue_to_subscribe =
T.pack
[r|
ALTER TABLE rcv_queues ADD COLUMN to_subscribe SMALLINT NOT NULL DEFAULT 0;
CREATE INDEX idx_rcv_queues_to_subscribe ON rcv_queues(to_subscribe);
|]
down_m20251009_queue_to_subscribe :: Text
down_m20251009_queue_to_subscribe =
T.pack
[r|
DROP INDEX idx_rcv_queues_to_subscribe;
ALTER TABLE rcv_queues DROP COLUMN to_subscribe;
|]

View File

@@ -44,6 +44,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20241224_ratchet_e2e_snd
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250203_msg_bodies
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250322_short_links
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitations_remove_cascade_delete
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
schemaMigrations :: [(String, Query, Maybe Query)]
@@ -87,7 +88,8 @@ schemaMigrations =
("m20241224_ratchet_e2e_snd_params", m20241224_ratchet_e2e_snd_params, Just down_m20241224_ratchet_e2e_snd_params),
("m20250203_msg_bodies", m20250203_msg_bodies, Just down_m20250203_msg_bodies),
("m20250322_short_links", m20250322_short_links, Just down_m20250322_short_links),
("m20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete)
("m20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe)
]
-- | The list of migrations in ascending order by date

View File

@@ -0,0 +1,20 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
m20251009_queue_to_subscribe :: Query
m20251009_queue_to_subscribe =
[sql|
ALTER TABLE rcv_queues ADD COLUMN to_subscribe INTEGER NOT NULL DEFAULT 0;
CREATE INDEX idx_rcv_queues_to_subscribe ON rcv_queues(to_subscribe);
|]
down_m20251009_queue_to_subscribe :: Query
down_m20251009_queue_to_subscribe =
[sql|
DROP INDEX idx_rcv_queues_to_subscribe;
ALTER TABLE rcv_queues DROP COLUMN to_subscribe;
|]

View File

@@ -47,6 +47,7 @@ CREATE TABLE rcv_queues(
ntf_private_key BLOB,
ntf_id BLOB,
rcv_ntf_dh_secret BLOB,
to_subscribe INTEGER NOT NULL DEFAULT 0,
rcv_queue_id INTEGER CHECK(rcv_queue_id NOT NULL),
rcv_primary INTEGER CHECK(rcv_primary NOT NULL),
replace_rcv_queue_id INTEGER NULL,
@@ -437,6 +438,7 @@ CREATE TABLE inv_short_links(
FOREIGN KEY(host, port) REFERENCES servers ON DELETE RESTRICT ON UPDATE CASCADE
);
CREATE UNIQUE INDEX idx_rcv_queues_ntf ON rcv_queues(host, port, ntf_id);
CREATE INDEX idx_rcv_queues_to_subscribe ON rcv_queues(to_subscribe);
CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(conn_id, rcv_queue_id);
CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(conn_id, snd_queue_id);
CREATE INDEX idx_snd_message_deliveries ON snd_message_deliveries(

View File

@@ -2437,7 +2437,7 @@ testBatchedSubscriptions nCreate nDel ps@(t, ASType qsType _) = do
liftIO $ S.fromList (cs1 ++ cs2) `shouldBe` S.fromList cs
subscribe :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO ()
subscribe c cs = do
subscribeAllConnections c
subscribeAllConnections c False Nothing
liftIO $ up c cs
delete :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO ()
delete c cs = do
@@ -2469,7 +2469,7 @@ testBatchedPendingMessages nCreate nMsgs =
replicateM_ nMsgs $ get a =##> \case ("", cId, SENT _) -> isJust $ find ((cId ==) . snd) msgConns; _ -> False
withB $ \b -> runRight_ $ do
let aIds = map fst conns
subscribeAllConnections b
subscribeAllConnections b False Nothing
("", "", UP _ aIds') <- nGet b
liftIO $ S.fromList aIds' `shouldBe` S.fromList aIds
replicateM_ nMsgs $ do

View File

@@ -261,7 +261,7 @@ sndQueue1 =
createRcvConn :: DB.Connection -> TVar ChaChaDRG -> ConnData -> NewRcvQueue -> SConnectionMode c -> IO (Either StoreError (ConnId, RcvQueue))
createRcvConn db g cData rq cMode = runExceptT $ do
connId <- ExceptT $ createNewConn db g cData cMode
rq' <- ExceptT $ updateNewConnRcv db connId rq
rq' <- ExceptT $ updateNewConnRcv db connId rq SMSubscribe
pure (connId, rq')
testCreateRcvConn :: SpecWith DBStore
@@ -307,7 +307,7 @@ testCreateSndConn =
dbQueueId `shouldBe` DBEntityId 1
getConn db "conn1"
`shouldReturn` Right (SomeConn SCSnd (SndConnection cData1 sq))
Right rq@RcvQueue {dbQueueId = dbQueueId'} <- upgradeSndConnToDuplex db "conn1" rcvQueue1
Right rq@RcvQueue {dbQueueId = dbQueueId'} <- upgradeSndConnToDuplex db "conn1" rcvQueue1 SMSubscribe
dbQueueId' `shouldBe` DBEntityId 1
getConn db "conn1"
`shouldReturn` Right (SomeConn SCDuplex (DuplexConnection cData1 [rq] [sq]))
@@ -319,7 +319,7 @@ testCreateSndConnRandomID =
Right (connId, sq) <- createSndConn db g cData1 {connId = ""} sndQueue1
getConn db connId
`shouldReturn` Right (SomeConn SCSnd (SndConnection cData1 {connId} sq))
Right (rq@RcvQueue {dbQueueId = dbQueueId'}) <- upgradeSndConnToDuplex db connId rcvQueue1
Right (rq@RcvQueue {dbQueueId = dbQueueId'}) <- upgradeSndConnToDuplex db connId rcvQueue1 SMSubscribe
dbQueueId' `shouldBe` DBEntityId 1
getConn db connId
`shouldReturn` Right (SomeConn SCDuplex (DuplexConnection cData1 {connId} [rq] [sq]))
@@ -418,7 +418,7 @@ testUpgradeRcvConnToDuplex =
}
upgradeRcvConnToDuplex db "conn1" anotherSndQueue
`shouldReturn` Left (SEBadConnType "upgradeRcvConnToDuplex" CSnd)
_ <- upgradeSndConnToDuplex db "conn1" rcvQueue1
_ <- upgradeSndConnToDuplex db "conn1" rcvQueue1 SMSubscribe
upgradeRcvConnToDuplex db "conn1" anotherSndQueue
`shouldReturn` Left (SEBadConnType "upgradeRcvConnToDuplex" CDuplex)
@@ -451,10 +451,10 @@ testUpgradeSndConnToDuplex =
clientNtfCreds = Nothing,
deleteErrors = 0
}
upgradeSndConnToDuplex db "conn1" anotherRcvQueue
upgradeSndConnToDuplex db "conn1" anotherRcvQueue SMSubscribe
`shouldReturn` Left (SEBadConnType "upgradeSndConnToDuplex" CRcv)
_ <- upgradeRcvConnToDuplex db "conn1" sndQueue1
upgradeSndConnToDuplex db "conn1" anotherRcvQueue
upgradeSndConnToDuplex db "conn1" anotherRcvQueue SMSubscribe
`shouldReturn` Left (SEBadConnType "upgradeSndConnToDuplex" CDuplex)
testSetRcvQueueStatus :: SpecWith DBStore
@@ -681,7 +681,7 @@ testGetPendingServerCommand st = do
Right (Just PendingCommand {corrId}) <- getPendingServerCommand db connId Nothing
corrId `shouldBe` "2"
Right _ <- updateNewConnRcv db connId rcvQueue1
Right _ <- updateNewConnRcv db connId rcvQueue1 SMSubscribe
Right Nothing <- getPendingServerCommand db connId $ Just smpServer1
Right () <- createCommand db "3" connId (Just smpServer1) command
corruptCmd db "3" connId