agent: subscribe all connections (#1655)

* agent: subscribe all connections

* query, version

* BoolInt

* add query to errors

* Revert "add query to errors"

This reverts commit 32a1f7fe11.

* fix optional field

* version

* limit number of in-flight subscriptions to 35000
This commit is contained in:
Evgeny
2025-10-09 13:43:48 +01:00
committed by GitHub
parent 318ddf692a
commit 9cda20381f
9 changed files with 273 additions and 121 deletions
+112 -47
View File
@@ -69,6 +69,7 @@ module Simplex.Messaging.Agent
rejectContact,
subscribeConnection,
subscribeConnections,
subscribeAllConnections,
getConnectionMessages,
getNotificationConns,
resubscribeConnection,
@@ -132,6 +133,7 @@ module Simplex.Messaging.Agent
)
where
import Control.Concurrent.STM (retry)
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Except
@@ -227,6 +229,7 @@ import Simplex.RemoteControl.Client
import Simplex.RemoteControl.Invitation
import Simplex.RemoteControl.Types
import System.Mem.Weak (deRefWeak)
import UnliftIO.Async (mapConcurrently)
import UnliftIO.Concurrent (forkFinally, forkIO, killThread, mkWeakThreadId, threadDelay)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
@@ -441,6 +444,10 @@ subscribeConnections :: AgentClient -> [ConnId] -> AE (Map ConnId (Either AgentE
subscribeConnections c = withAgentEnv c . subscribeConnections' c
{-# INLINE subscribeConnections #-}
-- | Subscribe to all connections
subscribeAllConnections :: AgentClient -> AE ()
subscribeAllConnections c = withAgentEnv c $ subscribeAllConnections' c
-- | Get messages for connections (GET commands)
getConnectionMessages :: AgentClient -> NonEmpty ConnMsgReq -> IO (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))
getConnectionMessages c = withAgentEnv' c . getConnectionMessages' c
@@ -1272,7 +1279,7 @@ subscribeConnections_ c conns = do
let (subRs, cs) = foldr partitionResultsConns ([], []) conns
resumeDelivery cs
resumeConnCmds c $ map fst cs
rcvRs <- lift $ connResults <$> subscribeQueues c (concatMap rcvQueues cs) False
rcvRs <- lift $ connResults <$> subscribeQueues c False (concatMap rcvQueues cs)
rcvRs' <- storeClientServiceAssocs rcvRs
ns <- asks ntfSupervisor
lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs' cs
@@ -1328,19 +1335,21 @@ subscribeConnections_ c conns = do
where
groupConnIds oks (connId, SomeConn _ conn) acc@(csCreate, csDelete)
| connId `S.notMember` oks = acc
| enableNtfs (toConnData conn) = (connId : csCreate, csDelete)
| enableNtfs = (connId : csCreate, csDelete)
| otherwise = (csCreate, connId : csDelete)
where
ConnData {enableNtfs} = toConnData conn
sendNtfCmd cmd = mapM_ (\cids -> atomically $ writeTBQueue (ntfSubQ ns) (cmd, cids)) . L.nonEmpty
resumeDelivery :: [(ConnId, SomeConnSub)] -> AM ()
resumeDelivery conns' = do
deliverTo <- S.fromList <$> withStore' c getConnectionsForDelivery
let conns'' = filter ((`S.member` deliverTo) . fst) conns'
lift $ mapM_ (mapM_ (\(cData, sqs) -> mapM_ (resumeMsgDelivery c cData) sqs) . sndQueues) conns''
sndQueues :: (ConnId, SomeConnSub) -> Maybe (ConnData, NonEmpty SndQueue)
lift $ mapM_ (mapM_ (resumeMsgDelivery c) . sndQueues) conns''
sndQueues :: (ConnId, SomeConnSub) -> [SndQueue]
sndQueues (_, SomeConn _ conn) = case conn of
DuplexConnection cData _ sqs -> Just (cData, sqs)
SndConnection cData sq -> Just (cData, [sq])
_ -> Nothing
DuplexConnection _ _ sqs -> L.toList sqs
SndConnection _ sq -> [sq]
_ -> []
notifyResultError :: Map ConnId (Either AgentErrorType (Maybe ClientServiceId)) -> AM ()
notifyResultError rs = do
let actual = M.size rs
@@ -1348,6 +1357,54 @@ subscribeConnections_ c conns = do
when (actual /= expected) . atomically $
writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ INTERNAL $ "subscribeConnections result size: " <> show actual <> ", expected " <> show expected)
subscribeAllConnections' :: AgentClient -> AM ()
subscribeAllConnections' c = do
userSrvs <- withStore' c getSubscriptionServers
maxPending <- asks $ maxPendingSubscriptions . config
currPending <- newTVarIO 0
unless (null userSrvs) $ do
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs
let (errs, oks) = partitionEithers rs
logInfo $ "subscribed " <> tshow (sum oks) <> " queues"
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",)
resumeAllDelivery
resumeAllCommands c
where
subscribeUserServer :: Int -> TVar Int -> (UserId, SMPServer) -> AM' (Either AgentErrorType Int)
subscribeUserServer maxPending currPending (userId, srv) = do
atomically $ whenM ((maxPending <=) <$> readTVar currPending) retry
tryAllErrors' $ do
qs <- withStore' c $ \db -> do
qs <- getUserServerRcvQueueSubs db userId srv
atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction
pure qs
let n = length qs
lift $ subscribe qs `E.finally` atomically (modifyTVar' currPending $ subtract n)
pure n
where
subscribe qs = do
rs <- subscribeUserServerQueues c userId srv qs
-- TODO [certs rcv] storeClientServiceAssocs store associations of queues with client service ID
ns <- asks ntfSupervisor
whenM (liftIO $ hasInstantNotifications ns) $ sendNtfCreate ns rs
sendNtfCreate :: NtfSupervisor -> [(RcvQueueSub, Either AgentErrorType (Maybe SMP.ServiceId))] -> AM' ()
sendNtfCreate ns rs = do
let (csCreate, csDelete) = foldl' groupConnIds (S.empty, S.empty) rs
sendNtfCmd NSCCreate csCreate
sendNtfCmd NSCSmpDelete csDelete
where
groupConnIds acc@(!csCreate, !csDelete) (RcvQueueSub {connId, enableNtfs}, r) = case r of
Left e
| not (temporaryAgentError e) -> acc
_
| enableNtfs -> (S.insert connId csCreate, csDelete)
| otherwise -> (csCreate, S.insert connId csDelete)
sendNtfCmd cmd = mapM_ (\cIds -> atomically $ writeTBQueue (ntfSubQ ns) (cmd, cIds)) . L.nonEmpty . S.toList
resumeAllDelivery :: AM ()
resumeAllDelivery = do
sqs <- withStore' c getAllSndQueuesForDelivery
lift $ mapM_ (resumeMsgDelivery c) sqs
resubscribeConnection' :: AgentClient -> ConnId -> AM (Maybe ClientServiceId)
resubscribeConnection' c connId = toConnResult connId =<< resubscribeConnections' c [connId]
{-# INLINE resubscribeConnection' #-}
@@ -1500,6 +1557,11 @@ resumeConnCmds c connIds = do
connSrvs <- withStore' c (`getPendingCommandServers` connIds)
lift $ mapM_ (\(connId, srvs) -> mapM_ (resumeSrvCmds c connId) srvs) connSrvs
resumeAllCommands :: AgentClient -> AM ()
resumeAllCommands c = do
connSrvs <- withStore' c getAllPendingCommandConns `catchAllErrors` (\e -> liftIO (print e) >> throwE e)
lift $ mapM_ (uncurry $ resumeSrvCmds c) connSrvs
getAsyncCmdWorker :: Bool -> AgentClient -> ConnId -> Maybe SMPServer -> AM' Worker
getAsyncCmdWorker hasWork c connId server =
getAgentWorker "async_cmd" hasWork c (connId, server) (asyncCmdWorkers c) (runCommandProcessing c connId server)
@@ -1596,7 +1658,7 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
_ -> internalErr "ICQSecure: no switching queue found"
_ -> internalErr "ICQSecure: queue address not found in connection"
ICQDelete rId -> do
withServer $ \srv -> tryWithLock "ICQDelete" . withDuplexConn $ \(DuplexConnection cData rqs sqs) -> do
withServer $ \srv -> tryWithLock "ICQDelete" . withDuplexConn $ \(DuplexConnection cData@ConnData {enableNtfs} rqs sqs) -> do
case removeQ (srv, rId) rqs of
Nothing -> internalErr "ICQDelete: queue address not found in connection"
Just (rq'@RcvQueue {primary}, rq'' : rqs')
@@ -1611,7 +1673,7 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
where
finalizeSwitch = do
withStore' c $ \db -> deleteConnRcvQueue db rq'
when (enableNtfs cData) $ do
when enableNtfs $ do
ns <- asks ntfSupervisor
liftIO $ sendNtfSubCommand ns (NSCCreate, [connId])
let conn' = DuplexConnection cData (rq'' :| rqs') sqs
@@ -1685,15 +1747,15 @@ enqueueMessage c cData sq msgFlags aMessage =
{-# INLINE enqueueMessage #-}
-- this function is used only for sending messages in batch, it returns the list of successes to enqueue additional deliveries
enqueueMessageB :: forall t. Traversable t => AgentClient -> t (Either AgentErrorType (Either AgentErrorType (ConnData, NonEmpty SndQueue), Maybe PQEncryption, MsgFlags, ValueOrRef AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId))))
enqueueMessageB :: forall t. Traversable t => AgentClient -> t (Either AgentErrorType (Either AgentErrorType (ConnData, NonEmpty SndQueue), Maybe PQEncryption, MsgFlags, ValueOrRef AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe ([SndQueue], AgentMsgId))))
enqueueMessageB c reqs = do
cfg <- asks config
(_, reqMids) <- unsafeWithStore c $ \db -> do
mapAccumLM (\ids r -> storeSentMsg db cfg ids r `E.catchAny` \e -> (ids,) <$> handleInternal e) IM.empty reqs
forME reqMids $ \((csqs_, _, _, _), InternalId msgId, pqSecr) -> forM csqs_ $ \(cData, sq :| sqs) -> do
submitPendingMsg c cData sq
forME reqMids $ \((csqs_, _, _, _), InternalId msgId, pqSecr) -> forM csqs_ $ \(_, sq :| sqs) -> do
submitPendingMsg c sq
let sqs' = filter isActiveSndQ sqs
pure ((msgId, pqSecr), if null sqs' then Nothing else Just (cData, sqs', msgId))
pure ((msgId, pqSecr), if null sqs' then Nothing else Just (sqs', msgId))
where
storeSentMsg ::
DB.Connection ->
@@ -1741,7 +1803,7 @@ enqueueMessageB c reqs = do
-- msgBody is empty, because snd_messages record is linked to snd_message_bodies
msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgFlags, msgBody = "", pqEncryption = pqEnc, internalHash, prevMsgHash, sndMsgPrepData_ = Just SndMsgPrepData {encryptKey = mek, paddedLen, sndMsgBodyId}}
liftIO $ createSndMsg db connId msgData
liftIO $ createSndMsgDelivery db connId sq internalId
liftIO $ createSndMsgDelivery db sq internalId
pure (req, internalId, pqEnc)
handleInternal :: E.SomeException -> IO (Either AgentErrorType b)
handleInternal = pure . Left . INTERNAL . show
@@ -1753,41 +1815,44 @@ encodeAgentMsgStr aMessage internalSndId prevMsgHash = do
agentMsg = AgentMessage privHeader aMessage
in smpEncode agentMsg
enqueueSavedMessage :: AgentClient -> ConnData -> AgentMsgId -> SndQueue -> AM' ()
enqueueSavedMessage c cData msgId sq = enqueueSavedMessageB c $ Identity (cData, [sq], msgId)
enqueueSavedMessage :: AgentClient -> AgentMsgId -> SndQueue -> AM' ()
enqueueSavedMessage c msgId sq = enqueueSavedMessageB c $ Identity ([sq], msgId)
{-# INLINE enqueueSavedMessage #-}
enqueueSavedMessageB :: Foldable t => AgentClient -> t (ConnData, [SndQueue], AgentMsgId) -> AM' ()
enqueueSavedMessageB :: Foldable t => AgentClient -> t ([SndQueue], AgentMsgId) -> AM' ()
enqueueSavedMessageB c reqs = do
-- saving to the database is in the start to avoid race conditions when delivery is read from queue before it is saved
void $ withStoreBatch' c $ \db -> concatMap (storeDeliveries db) reqs
forM_ reqs $ \(cData, sqs, _) ->
forM sqs $ submitPendingMsg c cData
-- TODO this needs to be optimized to insert them in one query
forM_ reqs $ \(sqs, _) -> forM sqs $ submitPendingMsg c
where
storeDeliveries :: DB.Connection -> (ConnData, [SndQueue], AgentMsgId) -> [IO ()]
storeDeliveries db (ConnData {connId}, sqs, msgId) = do
storeDeliveries :: DB.Connection -> ([SndQueue], AgentMsgId) -> [IO ()]
storeDeliveries db (sqs, msgId) = do
let mId = InternalId msgId
in map (\sq -> createSndMsgDelivery db connId sq mId) sqs
in map (\sq -> createSndMsgDelivery db sq mId) sqs
resumeMsgDelivery :: AgentClient -> ConnData -> SndQueue -> AM' ()
resumeMsgDelivery = void .:. getDeliveryWorker False
resumeMsgDelivery :: AgentClient -> SndQueue -> AM' ()
-- hasWork is passed as False to avoid unnecessary write to TMVar:
-- - new worker is always created by "some work to do".
-- - if the worker already exists, there is no need to "push" it again.
resumeMsgDelivery = void .: getDeliveryWorker False
{-# INLINE resumeMsgDelivery #-}
getDeliveryWorker :: Bool -> AgentClient -> ConnData -> SndQueue -> AM' (Worker, TMVar ())
getDeliveryWorker hasWork c cData sq =
getAgentWorker' fst mkLock "msg_delivery" hasWork c (qAddress sq) (smpDeliveryWorkers c) (runSmpQueueMsgDelivery c cData sq)
getDeliveryWorker :: Bool -> AgentClient -> SndQueue -> AM' (Worker, TMVar ())
getDeliveryWorker hasWork c sq =
getAgentWorker' fst mkLock "msg_delivery" hasWork c (qAddress sq) (smpDeliveryWorkers c) (runSmpQueueMsgDelivery c sq)
where
mkLock w = do
retryLock <- newEmptyTMVar
pure (w, retryLock)
submitPendingMsg :: AgentClient -> ConnData -> SndQueue -> AM' ()
submitPendingMsg c cData sq = do
submitPendingMsg :: AgentClient -> SndQueue -> AM' ()
submitPendingMsg c sq = do
atomically $ modifyTVar' (msgDeliveryOp c) $ \s -> s {opsInProgress = opsInProgress s + 1}
void $ getDeliveryWorker True c cData sq
void $ getDeliveryWorker True c sq
runSmpQueueMsgDelivery :: AgentClient -> ConnData -> SndQueue -> (Worker, TMVar ()) -> AM ()
runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq@SndQueue {userId, server, queueMode} (Worker {doWork}, qLock) = do
runSmpQueueMsgDelivery :: AgentClient -> SndQueue -> (Worker, TMVar ()) -> AM ()
runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server, queueMode} (Worker {doWork}, qLock) = do
AgentConfig {messageRetryInterval = ri, messageTimeout, helloTimeout, quotaExceededTimeout} <- asks config
forever $ do
atomically $ endAgentOperation c AOSndNetwork
@@ -2074,7 +2139,7 @@ synchronizeRatchet' c connId pqSupport' force = withConnLock c connId "synchroni
AgentConfig {e2eEncryptVRange} <- asks config
g <- asks random
(pk1, pk2, pKem, e2eParams) <- liftIO $ CR.generateRcvE2EParams g (maxVersion e2eEncryptVRange) pqSupport'
enqueueRatchetKeyMsgs c cData' sqs e2eParams
enqueueRatchetKeyMsgs c sqs e2eParams
withStore' c $ \db -> do
setConnRatchetSync db connId RSStarted
setRatchetX3dhKeys db connId pk1 pk2 pKem
@@ -2431,8 +2496,8 @@ toggleConnectionNtfs' c connId enable = do
_ -> throwE $ CONN SIMPLEX "toggleConnectionNtfs"
where
toggle :: ConnData -> AM ()
toggle cData
| enableNtfs cData == enable = pure ()
toggle ConnData {enableNtfs}
| enableNtfs == enable = pure ()
| otherwise = do
withStore' c $ \db -> setConnectionNtfs db connId enable
ns <- asks ntfSupervisor
@@ -3188,7 +3253,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
sendReplyKey = do
g <- asks random
(pk1, pk2, pKem, e2eParams) <- liftIO $ CR.generateRcvE2EParams g e2eVersion pqSupport
enqueueRatchetKeyMsgs c cData' sqs e2eParams
enqueueRatchetKeyMsgs c sqs e2eParams
pure (pk1, pk2, pKem)
notifyRatchetSyncError = do
let cData'' = cData' {ratchetSyncState = RSRequired} :: ConnData
@@ -3264,7 +3329,7 @@ secureConfirmQueueAsync c cData rq_ sq srv connInfo e2eEncryption_ subMode = do
sqSecured <- agentSecureSndQueue c NRMBackground cData sq
(qInfo, service) <- mkAgentConfirmation c NRMBackground cData rq_ sq srv connInfo subMode
storeConfirmation c cData sq e2eEncryption_ qInfo
lift $ submitPendingMsg c cData sq
lift $ submitPendingMsg c sq
pure (sqSecured, service)
secureConfirmQueue :: AgentClient -> NetworkRequestMode -> ConnData -> Maybe RcvQueue -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> SubscriptionMode -> AM (SndQueueSecured, Maybe ClientServiceId)
@@ -3310,7 +3375,7 @@ mkAgentConfirmation c nm cData rq_ sq srv connInfo subMode = do
enqueueConfirmation :: AgentClient -> ConnData -> SndQueue -> ConnInfo -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> AM ()
enqueueConfirmation c cData sq connInfo e2eEncryption_ = do
storeConfirmation c cData sq e2eEncryption_ $ AgentConnInfo connInfo
lift $ submitPendingMsg c cData sq
lift $ submitPendingMsg c sq
storeConfirmation :: AgentClient -> ConnData -> SndQueue -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> AgentMessage -> AM ()
storeConfirmation c cData@ConnData {connId, pqSupport, connAgentVersion = v} sq e2eEncryption_ agentMsg = do
@@ -3326,18 +3391,18 @@ storeConfirmation c cData@ConnData {connId, pqSupport, connAgentVersion = v} sq
msgType = agentMessageType agentMsg
msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgBody, pqEncryption, msgFlags = SMP.MsgFlags {notification = True}, internalHash, prevMsgHash, sndMsgPrepData_ = Nothing}
liftIO $ createSndMsg db connId msgData
liftIO $ createSndMsgDelivery db connId sq internalId
liftIO $ createSndMsgDelivery db sq internalId
enqueueRatchetKeyMsgs :: AgentClient -> ConnData -> NonEmpty SndQueue -> CR.RcvE2ERatchetParams 'C.X448 -> AM ()
enqueueRatchetKeyMsgs c cData (sq :| sqs) e2eEncryption = do
msgId <- enqueueRatchetKey c cData sq e2eEncryption
mapM_ (lift . enqueueSavedMessage c cData msgId) $ filter isActiveSndQ sqs
enqueueRatchetKeyMsgs :: AgentClient -> NonEmpty SndQueue -> CR.RcvE2ERatchetParams 'C.X448 -> AM ()
enqueueRatchetKeyMsgs c (sq :| sqs) e2eEncryption = do
msgId <- enqueueRatchetKey c sq e2eEncryption
mapM_ (lift . enqueueSavedMessage c msgId) $ filter isActiveSndQ sqs
enqueueRatchetKey :: AgentClient -> ConnData -> SndQueue -> CR.RcvE2ERatchetParams 'C.X448 -> AM AgentMsgId
enqueueRatchetKey c cData@ConnData {connId} sq e2eEncryption = do
enqueueRatchetKey :: AgentClient -> SndQueue -> CR.RcvE2ERatchetParams 'C.X448 -> AM AgentMsgId
enqueueRatchetKey c sq@SndQueue {connId} e2eEncryption = do
aVRange <- asks $ smpAgentVRange . config
msgId <- storeRatchetKey $ maxVersion aVRange
lift $ submitPendingMsg c cData sq
lift $ submitPendingMsg c sq
pure $ unId msgId
where
storeRatchetKey :: VersionSMPA -> AM InternalId
@@ -3352,7 +3417,7 @@ enqueueRatchetKey c cData@ConnData {connId} sq e2eEncryption = do
-- this message is e2e encrypted with queue key, not with double ratchet
msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgBody, pqEncryption = PQEncOff, msgFlags = SMP.MsgFlags {notification = True}, internalHash, prevMsgHash, sndMsgPrepData_ = Nothing}
liftIO $ createSndMsg db connId msgData
liftIO $ createSndMsgDelivery db connId sq internalId
liftIO $ createSndMsgDelivery db sq internalId
pure internalId
-- encoded AgentMessage -> encoded EncAgentMessage
+48 -31
View File
@@ -48,6 +48,7 @@ module Simplex.Messaging.Agent.Client
newRcvQueue,
newRcvQueue_,
subscribeQueues,
subscribeUserServerQueues,
getQueueMessage,
decryptSMPMessage,
failSubscription,
@@ -427,7 +428,7 @@ getAgentWorker' toW fromW name hasWork c@AgentClient {agentEnv} key ws work = do
newWorker :: AgentClient -> STM Worker
newWorker c = do
workerId <- stateTVar (workerSeq c) $ \next -> (next, next + 1)
doWork <- newTMVar ()
doWork <- newTMVar () -- new worker is created with "some work to do" (indicated by () in TMVar)
action <- newTMVar Nothing
restarts <- newTVar $ RestartCount 0 0
pure Worker {workerId, doWork, action, restarts}
@@ -733,7 +734,7 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess
mode <- getSessionModeIO c
let resubscribe
| (mode == TSMEntity) == isJust cId = resubscribeSMPSession c tSess
| otherwise = void $ subscribeQueues c qs True
| otherwise = void $ subscribeQueues c True qs
runReaderT resubscribe env
resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' ()
@@ -1401,6 +1402,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
shortLink,
clientService = ClientService DBNewEntity <$> serviceId,
status = New,
enableNtfs,
dbQueueId = DBNewEntity,
primary = True,
dbReplaceQueueId = Nothing,
@@ -1509,30 +1511,51 @@ serverHostError = \case
_ -> False
-- | Batch by transport session and subscribe queues. The list of results can have a different order.
subscribeQueues :: AgentClient -> [RcvQueueSub] -> Bool -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))]
subscribeQueues c qs withEvents = do
subscribeQueues :: AgentClient -> Bool -> [RcvQueueSub] -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))]
subscribeQueues c withEvents qs = do
(errs, qs') <- checkQueues c qs
atomically $ modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId qs'))
qss <- batchQueues mkSMPTSession qs' <$> getSessionModeIO c
mapM_ addPendingSubs qss
rs <- mapConcurrently subscribeQueues_ qss
mapM_ (addPendingSubs c) qss
rs <- mapConcurrently (subscribeQueues_ c withEvents) qss
when withEvents $ forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map (first qConnId)
pure $ map (second Left) errs <> concatMap L.toList rs
addPendingSubs :: AgentClient -> (SMPTransportSession, NonEmpty RcvQueueSub) -> AM' ()
addPendingSubs c (tSess, qs') = atomically $ SS.batchAddPendingSubs tSess (L.toList qs') $ currentSubs c
subscribeQueues_ :: AgentClient -> Bool -> (SMPTransportSession, NonEmpty RcvQueueSub) -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId))
subscribeQueues_ c withEvents qs'@(tSess@(_, srv, _), _) = do
(rs, active) <- subscribeSessQueues_ c withEvents qs'
if active
then when (hasTempErrors rs) resubscribe $> rs
else do
logWarn "subcription batch result for replaced SMP client, resubscribing"
-- we use BROKER NETWORK error here instead of the original error, so it becomes temporary.
resubscribe $> L.map (second $ Left . toNESubscribeError) rs
where
addPendingSubs (tSess, qs') = atomically $ SS.batchAddPendingSubs tSess (L.toList qs') $ currentSubs c
subscribeQueues_ qs'@(tSess@(_, srv, _), _) = do
(rs, active) <- subscribeSessQueues_ c qs' withEvents
if active
then when (hasTempErrors rs) resubscribe $> rs
else do
logWarn "subcription batch result for replaced SMP client, resubscribing"
-- we use BROKER NETWORK error here instead of the original error, so it becomes temporary.
resubscribe $> L.map (second $ Left . toNESubscribeError) rs
where
-- treating host errors as temporary here as well
hasTempErrors = any (either temporaryOrHostError (const False) . snd)
toNESubscribeError = BROKER (B.unpack $ strEncode srv) . NETWORK . NESubscribeError . show
resubscribe = resubscribeSMPSession c tSess
-- treating host errors as temporary here as well
hasTempErrors = any (either temporaryOrHostError (const False) . snd)
toNESubscribeError = BROKER (B.unpack $ strEncode srv) . NETWORK . NESubscribeError . show
resubscribe = resubscribeSMPSession c tSess
subscribeUserServerQueues :: AgentClient -> UserId -> SMPServer -> [RcvQueueSub] -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))]
subscribeUserServerQueues c userId srv qs = do
mode <- getSessionModeIO c
if mode == TSMEntity
then subscribeQueues c True qs
else do
let tSess = (userId, srv, Nothing)
(errs, qs_) <- checkQueues c qs
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map (first qConnId)
let errs' = map (second Left) errs
case L.nonEmpty qs_ of
Just qs' -> do
atomically $ modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId $ L.toList qs'))
addPendingSubs c (tSess, qs')
rs <- subscribeQueues_ c True (tSess, qs')
pure $ errs' <> L.toList rs
Nothing -> pure errs'
-- only "checked" queues are subscribed
checkQueues :: AgentClient -> [RcvQueueSub] -> AM' ([(RcvQueueSub, AgentErrorType)], [RcvQueueSub])
@@ -1547,14 +1570,14 @@ checkQueues c = fmap partitionEithers . mapM checkQueue
resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [RcvQueueSub] -> AM' ()
resubscribeSessQueues c tSess qs = do
(errs, qs_) <- checkQueues c qs
forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c (tSess, qs') True
forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c True (tSess, qs')
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map (first qConnId)
subscribeSessQueues_ :: AgentClient -> (SMPTransportSession, NonEmpty RcvQueueSub) -> Bool -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool)
subscribeSessQueues_ c qs withEvents = sendClientBatch_ "SUB" False subscribeQueues_ c NRMBackground qs
subscribeSessQueues_ :: AgentClient -> Bool -> (SMPTransportSession, NonEmpty RcvQueueSub) -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool)
subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c NRMBackground qs
where
subscribeQueues_ :: SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId), Bool)
subscribeQueues_ smp qs' = do
subscribe_ :: SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId), Bool)
subscribe_ smp qs' = do
let (userId, srv, _) = tSess
atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs'
rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs'
@@ -1646,12 +1669,6 @@ getRemovedSubs AgentClient {removedSubs} k = TM.lookup k removedSubs >>= maybe n
TM.insert k s removedSubs
pure s
addPendingSubscription :: AgentClient -> RcvQueueSub -> STM ()
addPendingSubscription c rq = do
modifyTVar' (subscrConns c) $ S.insert $ qConnId rq
tSess <- mkSMPTransportSession c rq
SS.addPendingSub tSess rq $ currentSubs c
addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> AM' ()
addNewQueueSubscription c rq' tSess sessId = do
let rq = rcvQueueSub rq'
@@ -166,6 +166,7 @@ data AgentConfig = AgentConfig
ntfBatchSize :: Int,
ntfSubFirstCheckInterval :: NominalDiffTime,
ntfSubCheckInterval :: NominalDiffTime,
maxPendingSubscriptions :: Int,
caCertificateFile :: FilePath,
privateKeyFile :: FilePath,
certificateFile :: FilePath,
@@ -237,6 +238,7 @@ defaultAgentConfig =
ntfBatchSize = 150,
ntfSubFirstCheckInterval = nominalDay,
ntfSubCheckInterval = 3 * nominalDay,
maxPendingSubscriptions = 35000,
-- CA certificate private key is not needed for initialization
-- ! we do not generate these
caCertificateFile = "/etc/opt/simplex-agent/ca.crt",
+5 -2
View File
@@ -88,6 +88,8 @@ data StoredRcvQueue (q :: DBStored) = RcvQueue
clientService :: Maybe (StoredClientService q),
-- | queue status
status :: QueueStatus,
-- | to enable notifications for this queue - this field is duplicated from ConnData
enableNtfs :: Bool,
-- | database queue ID (within connection)
dbQueueId :: DBEntityId' q,
-- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set)
@@ -110,6 +112,7 @@ data RcvQueueSub = RcvQueueSub
rcvId :: SMP.RecipientId,
rcvPrivateKey :: RcvPrivateAuthKey,
status :: QueueStatus,
enableNtfs :: Bool,
dbQueueId :: Int64,
primary :: Bool,
dbReplaceQueueId :: Maybe Int64
@@ -117,8 +120,8 @@ data RcvQueueSub = RcvQueueSub
deriving (Show)
rcvQueueSub :: RcvQueue -> RcvQueueSub
rcvQueueSub RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, status, dbQueueId = DBEntityId dbQueueId, primary, dbReplaceQueueId} =
RcvQueueSub {userId, connId, server, rcvId, rcvPrivateKey, status, dbQueueId, primary, dbReplaceQueueId}
rcvQueueSub RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, status, enableNtfs, dbQueueId = DBEntityId dbQueueId, primary, dbReplaceQueueId} =
RcvQueueSub {userId, connId, server, rcvId, rcvPrivateKey, status, enableNtfs, dbQueueId, primary, dbReplaceQueueId}
data ShortLinkCreds = ShortLinkCreds
{ shortLinkId :: SMP.LinkId,
+64 -10
View File
@@ -39,6 +39,8 @@ module Simplex.Messaging.Agent.Store.AgentStore
updateNewConnRcv,
updateNewConnSnd,
createSndConn,
getSubscriptionServers,
getUserServerRcvQueueSubs,
getConn,
getDeletedConn,
getConns,
@@ -110,6 +112,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
updateSndMsgRcpt,
getPendingQueueMsg,
getConnectionsForDelivery,
getAllSndQueuesForDelivery,
updatePendingMsgRIState,
deletePendingMsgs,
getExpiredSndMessages,
@@ -137,6 +140,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
-- Async commands
createCommand,
getPendingCommandServers,
getAllPendingCommandConns,
getPendingServerCommand,
updateCommandServer,
deleteCommand,
@@ -884,8 +888,8 @@ createSndMsg db connId sndMsgData@SndMsgData {internalSndId, internalHash} = do
insertSndMsgDetails_ db connId sndMsgData
updateSndMsgHash db connId internalSndId internalHash
createSndMsgDelivery :: DB.Connection -> ConnId -> SndQueue -> InternalId -> IO ()
createSndMsgDelivery db connId SndQueue {dbQueueId} msgId =
createSndMsgDelivery :: DB.Connection -> SndQueue -> InternalId -> IO ()
createSndMsgDelivery db SndQueue {connId, dbQueueId} msgId =
DB.execute db "INSERT INTO snd_message_deliveries (conn_id, snd_queue_id, internal_id) VALUES (?, ?, ?)" (connId, dbQueueId, msgId)
getSndMsgViaRcpt :: DB.Connection -> ConnId -> InternalSndId -> IO (Either StoreError SndMsg)
@@ -917,6 +921,15 @@ getConnectionsForDelivery :: DB.Connection -> IO [ConnId]
getConnectionsForDelivery db =
map fromOnly <$> DB.query_ db "SELECT DISTINCT conn_id FROM snd_message_deliveries WHERE failed = 0"
getAllSndQueuesForDelivery :: DB.Connection -> IO [SndQueue]
getAllSndQueuesForDelivery db = map toSndQueue <$> DB.query_ db (sndQueueQuery <> " " <> delivery)
where
delivery = [sql|
JOIN (SELECT DISTINCT conn_id, snd_queue_id FROM snd_message_deliveries WHERE failed = 0) d
ON d.conn_id = q.conn_id AND d.snd_queue_id = q.snd_queue_id
WHERE c.deleted = 0
|]
getPendingQueueMsg :: DB.Connection -> ConnId -> SndQueue -> IO (Either StoreError (Maybe (Maybe RcvQueue, PendingMsgData)))
getPendingQueueMsg db connId SndQueue {dbQueueId} =
getWorkItem "message" getMsgId getMsgData markMsgFailed
@@ -1319,6 +1332,21 @@ getPendingCommandServers db connIds =
smpServer (host, port, keyHash) = SMPServer <$> host <*> port <*> keyHash
conns = S.fromList connIds
getAllPendingCommandConns :: DB.Connection -> IO [(ConnId, Maybe SMPServer)]
getAllPendingCommandConns db =
map toResult
<$> DB.query_
db
[sql|
SELECT DISTINCT c.conn_id, c.host, c.port, COALESCE(c.server_key_hash, s.key_hash)
FROM commands c
JOIN connections cs ON c.conn_id = cs.conn_id
LEFT JOIN servers s ON s.host = c.host AND s.port = c.port
WHERE cs.deleted = 0
|]
where
toResult (connId, host, port, keyHash) = (connId, SMPServer <$> host <*> port <*> keyHash)
getPendingServerCommand :: DB.Connection -> ConnId -> Maybe SMPServer -> IO (Either StoreError (Maybe PendingCommand))
getPendingServerCommand db connId srv_ = getWorkItem "command" getCmdId getCommand markCommandFailed
where
@@ -2023,6 +2051,30 @@ newQueueId_ :: [Only Int64] -> DBEntityId
newQueueId_ [] = DBEntityId 1
newQueueId_ (Only maxId : _) = DBEntityId (maxId + 1)
-- * subscribe all connections
getSubscriptionServers :: DB.Connection -> IO [(UserId, SMPServer)]
getSubscriptionServers db =
map toUserServer
<$> DB.query_
db
[sql|
SELECT DISTINCT c.user_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash)
FROM rcv_queues q
JOIN servers s ON q.host = s.host AND q.port = s.port
JOIN connections c ON q.conn_id = c.conn_id
WHERE c.deleted = 0 AND q.deleted = 0
|]
where
toUserServer :: (UserId, NonEmpty TransportHost, ServiceName, C.KeyHash) -> (UserId, SMPServer)
toUserServer (userId, host, port, keyHash) = (userId, SMPServer host port keyHash)
getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub]
getUserServerRcvQueueSubs db userId srv =
map toRcvQueueSub <$> DB.query db (rcvQueueSubQuery <> condition) (userId, host srv, port srv)
where
condition = " WHERE c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ?"
-- * getConn helpers
getConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
@@ -2229,7 +2281,7 @@ rcvQueueQuery :: Query
rcvQueueQuery =
[sql|
SELECT c.user_id, COALESCE(q.server_key_hash, s.key_hash), q.conn_id, q.host, q.port, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret,
q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.queue_mode, q.status,
q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.queue_mode, q.status, c.enable_ntfs,
q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id, q.switch_status, q.smp_client_version, q.delete_errors,
q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret,
q.link_id, q.link_key, q.link_priv_sig_key, q.link_enc_fixed_data
@@ -2240,13 +2292,13 @@ rcvQueueQuery =
toRcvQueue ::
(UserId, C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateAuthKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, Maybe QueueMode)
:. (QueueStatus, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int)
:. (QueueStatus, Maybe BoolInt, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int)
:. (Maybe SMP.NtfPublicAuthKey, Maybe SMP.NtfPrivateAuthKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret)
:. (Maybe SMP.LinkId, Maybe LinkKey, Maybe C.PrivateKeyEd25519, Maybe EncDataBytes) ->
RcvQueue
toRcvQueue
( (userId, keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode)
:. (status, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors)
:. (status, enableNtfs_, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors)
:. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)
:. (shortLinkId_, shortLinkKey_, linkPrivSigKey_, linkEncFixedData_)
) =
@@ -2258,8 +2310,9 @@ toRcvQueue
shortLink = case (shortLinkId_, shortLinkKey_, linkPrivSigKey_, linkEncFixedData_) of
(Just shortLinkId, Just shortLinkKey, Just linkPrivSigKey, Just linkEncFixedData) -> Just ShortLinkCreds {shortLinkId, shortLinkKey, linkPrivSigKey, linkEncFixedData}
_ -> Nothing
enableNtfs = maybe True unBI enableNtfs_
-- TODO [certs rcv] read client service
in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode, shortLink, clientService = Nothing, status, dbQueueId, primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion, clientNtfCreds, deleteErrors}
in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode, shortLink, clientService = Nothing, status, enableNtfs, dbQueueId, primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion, clientNtfCreds, deleteErrors}
-- | returns all connection queue credentials, the first queue is the primary one
getRcvQueueSubsByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueueSub))
@@ -2270,16 +2323,17 @@ getRcvQueueSubsByConnId_ db connId =
rcvQueueSubQuery :: Query
rcvQueueSubQuery =
[sql|
SELECT c.user_id, q.conn_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash), q.rcv_id, q.rcv_private_key, q.status,
SELECT c.user_id, q.conn_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash), q.rcv_id, q.rcv_private_key, q.status, c.enable_ntfs,
q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id
FROM rcv_queues q
JOIN servers s ON q.host = s.host AND q.port = s.port
JOIN connections c ON q.conn_id = c.conn_id
|]
toRcvQueueSub :: (UserId, ConnId, NonEmpty TransportHost, ServiceName, C.KeyHash, SMP.RecipientId, SMP.RcvPrivateAuthKey, QueueStatus, Int64, BoolInt, Maybe Int64) -> RcvQueueSub
toRcvQueueSub (userId, connId, host, port, keyHash, rcvId, rcvPrivateKey, status, dbQueueId, BI primary, dbReplaceQueueId) =
RcvQueueSub {userId, connId, server = SMPServer host port keyHash, rcvId, dbQueueId, primary, dbReplaceQueueId, rcvPrivateKey, status}
toRcvQueueSub :: (UserId, ConnId, NonEmpty TransportHost, ServiceName, C.KeyHash, SMP.RecipientId, SMP.RcvPrivateAuthKey, QueueStatus, Maybe BoolInt, Int64, BoolInt, Maybe Int64) -> RcvQueueSub
toRcvQueueSub (userId, connId, host, port, keyHash, rcvId, rcvPrivateKey, status, enableNtfs_, dbQueueId, BI primary, dbReplaceQueueId) =
let enableNtfs = maybe True unBI enableNtfs_
in RcvQueueSub {userId, connId, server = SMPServer host port keyHash, rcvId, rcvPrivateKey, status, enableNtfs, dbQueueId, primary, dbReplaceQueueId}
getRcvQueueById :: DB.Connection -> ConnId -> Int64 -> IO (Either StoreError RcvQueue)
getRcvQueueById db connId dbRcvId =
+30 -22
View File
@@ -435,7 +435,7 @@ functionalAPITests ps = do
describe "Batching SMP commands" $ do
-- disable this and enable the following test to run tests with coverage
it "should subscribe to multiple (200) subscriptions with batching" $
testBatchedSubscriptions 200 10 ps
testBatchedSubscriptions 200 20 ps
skip "faster version of the previous test (200 subscriptions gets very slow with test coverage)" $
it "should subscribe to multiple (6) subscriptions with batching" $
testBatchedSubscriptions 6 3 ps
@@ -2391,8 +2391,8 @@ testSuspendingAgentTimeout ps = withAgentClients2 $ \a b -> do
pure ()
testBatchedSubscriptions :: Int -> Int -> (ASrvTransport, AStoreType) -> IO ()
testBatchedSubscriptions nCreate nDel ps@(t, ASType qsType _) =
withAgentClientsCfgServers2 agentCfg agentCfg initAgentServers2 $ \a b -> do
testBatchedSubscriptions nCreate nDel ps@(t, ASType qsType _) = do
(conns, conns') <- withAgentClientsCfgServers2 agentCfg agentCfg initAgentServers2 $ \a b -> do
conns <- runServers $ do
conns <- replicateM nCreate $ makeConnection_ PQSupportOff True a b
forM_ conns $ \(aId, bId) -> exchangeGreetings_ PQEncOff a bId b aId
@@ -2401,21 +2401,23 @@ testBatchedSubscriptions nCreate nDel ps@(t, ASType qsType _) =
delete b aIds'
liftIO $ threadDelay 1000000
pure conns
("", "", DOWN {}) <- nGet a
("", "", DOWN {}) <- nGet a
("", "", DOWN {}) <- nGet b
("", "", DOWN {}) <- nGet b
let conns' = drop nDel conns
(aIds', bIds') = unzip conns'
down a bIds'
down b aIds'
runServers $ do
up a bIds'
up b aIds'
down a bIds'
down b aIds'
pure (conns, conns')
withAgentClientsCfgServers2 agentCfg agentCfg initAgentServers2 $ \a b -> do
runServers $ do
("", "", UP {}) <- nGet a
("", "", UP {}) <- nGet a
("", "", UP {}) <- nGet b
("", "", UP {}) <- nGet b
liftIO $ threadDelay 1000000
let (aIds, bIds) = unzip conns
conns' = drop nDel conns
(aIds', bIds') = unzip conns'
subscribe a bIds
subscribe b aIds
subscribe a bIds'
subscribe b aIds'
forM_ conns' $ \(aId, bId) -> exchangeGreetingsMsgId_ PQEncOff 4 a bId b aId
void $ resubscribeConnections a bIds
void $ resubscribeConnections b aIds
@@ -2425,14 +2427,18 @@ testBatchedSubscriptions nCreate nDel ps@(t, ASType qsType _) =
deleteFail a bIds'
deleteFail b aIds'
where
down c cs = do
("", "", DOWN _ cs1) <- nGet c
("", "", DOWN _ cs2) <- nGet c
liftIO $ S.fromList (cs1 ++ cs2) `shouldBe` S.fromList cs
up c cs = do
("", "", UP _ cs1) <- nGet c
("", "", UP _ cs2) <- nGet c
liftIO $ S.fromList (cs1 ++ cs2) `shouldBe` S.fromList cs
subscribe :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO ()
subscribe c cs = do
r <- subscribeConnections c cs
liftIO $ do
let dc = S.fromList $ take nDel cs
all isRight (M.withoutKeys r dc) `shouldBe` True
all (== Left (CONN NOT_FOUND "")) (M.restrictKeys r dc) `shouldBe` True
M.keys r `shouldMatchList` cs
subscribeAllConnections c
liftIO $ up c cs
delete :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO ()
delete c cs = do
r <- deleteConnections c cs
@@ -2462,8 +2468,10 @@ testBatchedPendingMessages nCreate nMsgs =
runRight_ $ forM_ msgConns $ \(_, bId) -> sendMessage a bId SMP.noMsgFlags "hello"
replicateM_ nMsgs $ get a =##> \case ("", cId, SENT _) -> isJust $ find ((cId ==) . snd) msgConns; _ -> False
withB $ \b -> runRight_ $ do
r <- subscribeConnections b $ map fst conns
liftIO $ all isRight r `shouldBe` True
let aIds = map fst conns
subscribeAllConnections b
("", "", UP _ aIds') <- nGet b
liftIO $ S.fromList aIds' `shouldBe` S.fromList aIds
replicateM_ nMsgs $ do
("", cId, Msg' msgId _ "hello") <- get b
liftIO $ isJust (find ((cId ==) . fst) msgConns) `shouldBe` True
+3 -1
View File
@@ -229,6 +229,7 @@ rcvQueue1 =
shortLink = Nothing,
clientService = Nothing,
status = New,
enableNtfs = True,
dbQueueId = DBNewEntity,
primary = True,
dbReplaceQueueId = Nothing,
@@ -441,6 +442,7 @@ testUpgradeSndConnToDuplex =
shortLink = Nothing,
clientService = Nothing,
status = New,
enableNtfs = True,
dbQueueId = DBNewEntity,
rcvSwchStatus = Nothing,
primary = True,
@@ -564,7 +566,7 @@ testCreateSndMsg_ db expectedPrevHash connId sq sndMsgData@SndMsgData {..} = do
`shouldReturn` Right (internalId, internalSndId, expectedPrevHash)
createSndMsg db connId sndMsgData
`shouldReturn` ()
createSndMsgDelivery db connId sq internalId
createSndMsgDelivery db sq internalId
`shouldReturn` ()
testCreateSndMsg :: SpecWith DBStore
+1
View File
@@ -125,6 +125,7 @@ dummyRQ userId server connId rcvId =
rcvId,
rcvPrivateKey = C.APrivateAuthKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe",
status = New,
enableNtfs = False,
dbQueueId = 0,
primary = True,
dbReplaceQueueId = Nothing
+8 -8
View File
@@ -103,8 +103,8 @@ main = do
testStoreDBOpts
"src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql"
around_ (postgressBracket testServerDBConnectInfo) $ do
xdescribe "SMP server via TLS, postgres+jornal message store" $
before (pure (transport @TLS, ASType SQSPostgres SMSJournal)) serverTests
-- xdescribe "SMP server via TLS, postgres+jornal message store" $
-- before (pure (transport @TLS, ASType SQSPostgres SMSJournal)) serverTests
describe "SMP server via TLS, postgres-only message store" $
before (pure (transport @TLS, ASType SQSPostgres SMSPostgres)) serverTests
#endif
@@ -128,19 +128,19 @@ main = do
describe "Notifications server (SMP server: jornal store)" $
ntfServerTests (transport @TLS, ASType SQSMemory SMSJournal)
around_ (postgressBracket testServerDBConnectInfo) $ do
xdescribe "Notifications server (SMP server: postgres+jornal store)" $
ntfServerTests (transport @TLS, ASType SQSPostgres SMSJournal)
-- xdescribe "Notifications server (SMP server: postgres+jornal store)" $
-- ntfServerTests (transport @TLS, ASType SQSPostgres SMSJournal)
describe "Notifications server (SMP server: postgres-only store)" $
ntfServerTests (transport @TLS, ASType SQSPostgres SMSPostgres)
around_ (postgressBracket testServerDBConnectInfo) $ do
xdescribe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal)
-- xdescribe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal)
describe "SMP client agent, server postgres-only message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSPostgres)
xdescribe "SMP proxy, postgres+jornal message store" $
before (pure $ ASType SQSPostgres SMSJournal) smpProxyTests
-- xdescribe "SMP proxy, postgres+jornal message store" $
-- before (pure $ ASType SQSPostgres SMSJournal) smpProxyTests
describe "SMP proxy, postgres-only message store" $
before (pure $ ASType SQSPostgres SMSPostgres) smpProxyTests
#endif
xdescribe "SMP client agent, server jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal)
-- xdescribe "SMP client agent, server jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal)
describe "SMP client agent, server memory message store" $ agentTests (transport @TLS, ASType SQSMemory SMSMemory)
describe "SMP proxy, jornal message store" $
before (pure $ ASType SQSMemory SMSJournal) smpProxyTests