diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 43c30ecd8..7cdc35353 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -69,6 +69,7 @@ module Simplex.Messaging.Agent rejectContact, subscribeConnection, subscribeConnections, + subscribeAllConnections, getConnectionMessages, getNotificationConns, resubscribeConnection, @@ -132,6 +133,7 @@ module Simplex.Messaging.Agent ) where +import Control.Concurrent.STM (retry) import Control.Logger.Simple import Control.Monad import Control.Monad.Except @@ -227,6 +229,7 @@ import Simplex.RemoteControl.Client import Simplex.RemoteControl.Invitation import Simplex.RemoteControl.Types import System.Mem.Weak (deRefWeak) +import UnliftIO.Async (mapConcurrently) import UnliftIO.Concurrent (forkFinally, forkIO, killThread, mkWeakThreadId, threadDelay) import qualified UnliftIO.Exception as E import UnliftIO.STM @@ -441,6 +444,10 @@ subscribeConnections :: AgentClient -> [ConnId] -> AE (Map ConnId (Either AgentE subscribeConnections c = withAgentEnv c . subscribeConnections' c {-# INLINE subscribeConnections #-} +-- | Subscribe to all connections +subscribeAllConnections :: AgentClient -> AE () +subscribeAllConnections c = withAgentEnv c $ subscribeAllConnections' c + -- | Get messages for connections (GET commands) getConnectionMessages :: AgentClient -> NonEmpty ConnMsgReq -> IO (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta))) getConnectionMessages c = withAgentEnv' c . getConnectionMessages' c @@ -1272,7 +1279,7 @@ subscribeConnections_ c conns = do let (subRs, cs) = foldr partitionResultsConns ([], []) conns resumeDelivery cs resumeConnCmds c $ map fst cs - rcvRs <- lift $ connResults <$> subscribeQueues c (concatMap rcvQueues cs) False + rcvRs <- lift $ connResults <$> subscribeQueues c False (concatMap rcvQueues cs) rcvRs' <- storeClientServiceAssocs rcvRs ns <- asks ntfSupervisor lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs' cs @@ -1328,19 +1335,21 @@ subscribeConnections_ c conns = do where groupConnIds oks (connId, SomeConn _ conn) acc@(csCreate, csDelete) | connId `S.notMember` oks = acc - | enableNtfs (toConnData conn) = (connId : csCreate, csDelete) + | enableNtfs = (connId : csCreate, csDelete) | otherwise = (csCreate, connId : csDelete) + where + ConnData {enableNtfs} = toConnData conn sendNtfCmd cmd = mapM_ (\cids -> atomically $ writeTBQueue (ntfSubQ ns) (cmd, cids)) . L.nonEmpty resumeDelivery :: [(ConnId, SomeConnSub)] -> AM () resumeDelivery conns' = do deliverTo <- S.fromList <$> withStore' c getConnectionsForDelivery let conns'' = filter ((`S.member` deliverTo) . fst) conns' - lift $ mapM_ (mapM_ (\(cData, sqs) -> mapM_ (resumeMsgDelivery c cData) sqs) . sndQueues) conns'' - sndQueues :: (ConnId, SomeConnSub) -> Maybe (ConnData, NonEmpty SndQueue) + lift $ mapM_ (mapM_ (resumeMsgDelivery c) . sndQueues) conns'' + sndQueues :: (ConnId, SomeConnSub) -> [SndQueue] sndQueues (_, SomeConn _ conn) = case conn of - DuplexConnection cData _ sqs -> Just (cData, sqs) - SndConnection cData sq -> Just (cData, [sq]) - _ -> Nothing + DuplexConnection _ _ sqs -> L.toList sqs + SndConnection _ sq -> [sq] + _ -> [] notifyResultError :: Map ConnId (Either AgentErrorType (Maybe ClientServiceId)) -> AM () notifyResultError rs = do let actual = M.size rs @@ -1348,6 +1357,54 @@ subscribeConnections_ c conns = do when (actual /= expected) . atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ INTERNAL $ "subscribeConnections result size: " <> show actual <> ", expected " <> show expected) +subscribeAllConnections' :: AgentClient -> AM () +subscribeAllConnections' c = do + userSrvs <- withStore' c getSubscriptionServers + maxPending <- asks $ maxPendingSubscriptions . config + currPending <- newTVarIO 0 + unless (null userSrvs) $ do + rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs + let (errs, oks) = partitionEithers rs + logInfo $ "subscribed " <> tshow (sum oks) <> " queues" + forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",) + resumeAllDelivery + resumeAllCommands c + where + subscribeUserServer :: Int -> TVar Int -> (UserId, SMPServer) -> AM' (Either AgentErrorType Int) + subscribeUserServer maxPending currPending (userId, srv) = do + atomically $ whenM ((maxPending <=) <$> readTVar currPending) retry + tryAllErrors' $ do + qs <- withStore' c $ \db -> do + qs <- getUserServerRcvQueueSubs db userId srv + atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction + pure qs + let n = length qs + lift $ subscribe qs `E.finally` atomically (modifyTVar' currPending $ subtract n) + pure n + where + subscribe qs = do + rs <- subscribeUserServerQueues c userId srv qs + -- TODO [certs rcv] storeClientServiceAssocs store associations of queues with client service ID + ns <- asks ntfSupervisor + whenM (liftIO $ hasInstantNotifications ns) $ sendNtfCreate ns rs + sendNtfCreate :: NtfSupervisor -> [(RcvQueueSub, Either AgentErrorType (Maybe SMP.ServiceId))] -> AM' () + sendNtfCreate ns rs = do + let (csCreate, csDelete) = foldl' groupConnIds (S.empty, S.empty) rs + sendNtfCmd NSCCreate csCreate + sendNtfCmd NSCSmpDelete csDelete + where + groupConnIds acc@(!csCreate, !csDelete) (RcvQueueSub {connId, enableNtfs}, r) = case r of + Left e + | not (temporaryAgentError e) -> acc + _ + | enableNtfs -> (S.insert connId csCreate, csDelete) + | otherwise -> (csCreate, S.insert connId csDelete) + sendNtfCmd cmd = mapM_ (\cIds -> atomically $ writeTBQueue (ntfSubQ ns) (cmd, cIds)) . L.nonEmpty . S.toList + resumeAllDelivery :: AM () + resumeAllDelivery = do + sqs <- withStore' c getAllSndQueuesForDelivery + lift $ mapM_ (resumeMsgDelivery c) sqs + resubscribeConnection' :: AgentClient -> ConnId -> AM (Maybe ClientServiceId) resubscribeConnection' c connId = toConnResult connId =<< resubscribeConnections' c [connId] {-# INLINE resubscribeConnection' #-} @@ -1500,6 +1557,11 @@ resumeConnCmds c connIds = do connSrvs <- withStore' c (`getPendingCommandServers` connIds) lift $ mapM_ (\(connId, srvs) -> mapM_ (resumeSrvCmds c connId) srvs) connSrvs +resumeAllCommands :: AgentClient -> AM () +resumeAllCommands c = do + connSrvs <- withStore' c getAllPendingCommandConns `catchAllErrors` (\e -> liftIO (print e) >> throwE e) + lift $ mapM_ (uncurry $ resumeSrvCmds c) connSrvs + getAsyncCmdWorker :: Bool -> AgentClient -> ConnId -> Maybe SMPServer -> AM' Worker getAsyncCmdWorker hasWork c connId server = getAgentWorker "async_cmd" hasWork c (connId, server) (asyncCmdWorkers c) (runCommandProcessing c connId server) @@ -1596,7 +1658,7 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do _ -> internalErr "ICQSecure: no switching queue found" _ -> internalErr "ICQSecure: queue address not found in connection" ICQDelete rId -> do - withServer $ \srv -> tryWithLock "ICQDelete" . withDuplexConn $ \(DuplexConnection cData rqs sqs) -> do + withServer $ \srv -> tryWithLock "ICQDelete" . withDuplexConn $ \(DuplexConnection cData@ConnData {enableNtfs} rqs sqs) -> do case removeQ (srv, rId) rqs of Nothing -> internalErr "ICQDelete: queue address not found in connection" Just (rq'@RcvQueue {primary}, rq'' : rqs') @@ -1611,7 +1673,7 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do where finalizeSwitch = do withStore' c $ \db -> deleteConnRcvQueue db rq' - when (enableNtfs cData) $ do + when enableNtfs $ do ns <- asks ntfSupervisor liftIO $ sendNtfSubCommand ns (NSCCreate, [connId]) let conn' = DuplexConnection cData (rq'' :| rqs') sqs @@ -1685,15 +1747,15 @@ enqueueMessage c cData sq msgFlags aMessage = {-# INLINE enqueueMessage #-} -- this function is used only for sending messages in batch, it returns the list of successes to enqueue additional deliveries -enqueueMessageB :: forall t. Traversable t => AgentClient -> t (Either AgentErrorType (Either AgentErrorType (ConnData, NonEmpty SndQueue), Maybe PQEncryption, MsgFlags, ValueOrRef AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId)))) +enqueueMessageB :: forall t. Traversable t => AgentClient -> t (Either AgentErrorType (Either AgentErrorType (ConnData, NonEmpty SndQueue), Maybe PQEncryption, MsgFlags, ValueOrRef AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe ([SndQueue], AgentMsgId)))) enqueueMessageB c reqs = do cfg <- asks config (_, reqMids) <- unsafeWithStore c $ \db -> do mapAccumLM (\ids r -> storeSentMsg db cfg ids r `E.catchAny` \e -> (ids,) <$> handleInternal e) IM.empty reqs - forME reqMids $ \((csqs_, _, _, _), InternalId msgId, pqSecr) -> forM csqs_ $ \(cData, sq :| sqs) -> do - submitPendingMsg c cData sq + forME reqMids $ \((csqs_, _, _, _), InternalId msgId, pqSecr) -> forM csqs_ $ \(_, sq :| sqs) -> do + submitPendingMsg c sq let sqs' = filter isActiveSndQ sqs - pure ((msgId, pqSecr), if null sqs' then Nothing else Just (cData, sqs', msgId)) + pure ((msgId, pqSecr), if null sqs' then Nothing else Just (sqs', msgId)) where storeSentMsg :: DB.Connection -> @@ -1741,7 +1803,7 @@ enqueueMessageB c reqs = do -- msgBody is empty, because snd_messages record is linked to snd_message_bodies msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgFlags, msgBody = "", pqEncryption = pqEnc, internalHash, prevMsgHash, sndMsgPrepData_ = Just SndMsgPrepData {encryptKey = mek, paddedLen, sndMsgBodyId}} liftIO $ createSndMsg db connId msgData - liftIO $ createSndMsgDelivery db connId sq internalId + liftIO $ createSndMsgDelivery db sq internalId pure (req, internalId, pqEnc) handleInternal :: E.SomeException -> IO (Either AgentErrorType b) handleInternal = pure . Left . INTERNAL . show @@ -1753,41 +1815,44 @@ encodeAgentMsgStr aMessage internalSndId prevMsgHash = do agentMsg = AgentMessage privHeader aMessage in smpEncode agentMsg -enqueueSavedMessage :: AgentClient -> ConnData -> AgentMsgId -> SndQueue -> AM' () -enqueueSavedMessage c cData msgId sq = enqueueSavedMessageB c $ Identity (cData, [sq], msgId) +enqueueSavedMessage :: AgentClient -> AgentMsgId -> SndQueue -> AM' () +enqueueSavedMessage c msgId sq = enqueueSavedMessageB c $ Identity ([sq], msgId) {-# INLINE enqueueSavedMessage #-} -enqueueSavedMessageB :: Foldable t => AgentClient -> t (ConnData, [SndQueue], AgentMsgId) -> AM' () +enqueueSavedMessageB :: Foldable t => AgentClient -> t ([SndQueue], AgentMsgId) -> AM' () enqueueSavedMessageB c reqs = do -- saving to the database is in the start to avoid race conditions when delivery is read from queue before it is saved void $ withStoreBatch' c $ \db -> concatMap (storeDeliveries db) reqs - forM_ reqs $ \(cData, sqs, _) -> - forM sqs $ submitPendingMsg c cData + -- TODO this needs to be optimized to insert them in one query + forM_ reqs $ \(sqs, _) -> forM sqs $ submitPendingMsg c where - storeDeliveries :: DB.Connection -> (ConnData, [SndQueue], AgentMsgId) -> [IO ()] - storeDeliveries db (ConnData {connId}, sqs, msgId) = do + storeDeliveries :: DB.Connection -> ([SndQueue], AgentMsgId) -> [IO ()] + storeDeliveries db (sqs, msgId) = do let mId = InternalId msgId - in map (\sq -> createSndMsgDelivery db connId sq mId) sqs + in map (\sq -> createSndMsgDelivery db sq mId) sqs -resumeMsgDelivery :: AgentClient -> ConnData -> SndQueue -> AM' () -resumeMsgDelivery = void .:. getDeliveryWorker False +resumeMsgDelivery :: AgentClient -> SndQueue -> AM' () +-- hasWork is passed as False to avoid unnecessary write to TMVar: +-- - new worker is always created by "some work to do". +-- - if the worker already exists, there is no need to "push" it again. +resumeMsgDelivery = void .: getDeliveryWorker False {-# INLINE resumeMsgDelivery #-} -getDeliveryWorker :: Bool -> AgentClient -> ConnData -> SndQueue -> AM' (Worker, TMVar ()) -getDeliveryWorker hasWork c cData sq = - getAgentWorker' fst mkLock "msg_delivery" hasWork c (qAddress sq) (smpDeliveryWorkers c) (runSmpQueueMsgDelivery c cData sq) +getDeliveryWorker :: Bool -> AgentClient -> SndQueue -> AM' (Worker, TMVar ()) +getDeliveryWorker hasWork c sq = + getAgentWorker' fst mkLock "msg_delivery" hasWork c (qAddress sq) (smpDeliveryWorkers c) (runSmpQueueMsgDelivery c sq) where mkLock w = do retryLock <- newEmptyTMVar pure (w, retryLock) -submitPendingMsg :: AgentClient -> ConnData -> SndQueue -> AM' () -submitPendingMsg c cData sq = do +submitPendingMsg :: AgentClient -> SndQueue -> AM' () +submitPendingMsg c sq = do atomically $ modifyTVar' (msgDeliveryOp c) $ \s -> s {opsInProgress = opsInProgress s + 1} - void $ getDeliveryWorker True c cData sq + void $ getDeliveryWorker True c sq -runSmpQueueMsgDelivery :: AgentClient -> ConnData -> SndQueue -> (Worker, TMVar ()) -> AM () -runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq@SndQueue {userId, server, queueMode} (Worker {doWork}, qLock) = do +runSmpQueueMsgDelivery :: AgentClient -> SndQueue -> (Worker, TMVar ()) -> AM () +runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server, queueMode} (Worker {doWork}, qLock) = do AgentConfig {messageRetryInterval = ri, messageTimeout, helloTimeout, quotaExceededTimeout} <- asks config forever $ do atomically $ endAgentOperation c AOSndNetwork @@ -2074,7 +2139,7 @@ synchronizeRatchet' c connId pqSupport' force = withConnLock c connId "synchroni AgentConfig {e2eEncryptVRange} <- asks config g <- asks random (pk1, pk2, pKem, e2eParams) <- liftIO $ CR.generateRcvE2EParams g (maxVersion e2eEncryptVRange) pqSupport' - enqueueRatchetKeyMsgs c cData' sqs e2eParams + enqueueRatchetKeyMsgs c sqs e2eParams withStore' c $ \db -> do setConnRatchetSync db connId RSStarted setRatchetX3dhKeys db connId pk1 pk2 pKem @@ -2431,8 +2496,8 @@ toggleConnectionNtfs' c connId enable = do _ -> throwE $ CONN SIMPLEX "toggleConnectionNtfs" where toggle :: ConnData -> AM () - toggle cData - | enableNtfs cData == enable = pure () + toggle ConnData {enableNtfs} + | enableNtfs == enable = pure () | otherwise = do withStore' c $ \db -> setConnectionNtfs db connId enable ns <- asks ntfSupervisor @@ -3188,7 +3253,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId sendReplyKey = do g <- asks random (pk1, pk2, pKem, e2eParams) <- liftIO $ CR.generateRcvE2EParams g e2eVersion pqSupport - enqueueRatchetKeyMsgs c cData' sqs e2eParams + enqueueRatchetKeyMsgs c sqs e2eParams pure (pk1, pk2, pKem) notifyRatchetSyncError = do let cData'' = cData' {ratchetSyncState = RSRequired} :: ConnData @@ -3264,7 +3329,7 @@ secureConfirmQueueAsync c cData rq_ sq srv connInfo e2eEncryption_ subMode = do sqSecured <- agentSecureSndQueue c NRMBackground cData sq (qInfo, service) <- mkAgentConfirmation c NRMBackground cData rq_ sq srv connInfo subMode storeConfirmation c cData sq e2eEncryption_ qInfo - lift $ submitPendingMsg c cData sq + lift $ submitPendingMsg c sq pure (sqSecured, service) secureConfirmQueue :: AgentClient -> NetworkRequestMode -> ConnData -> Maybe RcvQueue -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> SubscriptionMode -> AM (SndQueueSecured, Maybe ClientServiceId) @@ -3310,7 +3375,7 @@ mkAgentConfirmation c nm cData rq_ sq srv connInfo subMode = do enqueueConfirmation :: AgentClient -> ConnData -> SndQueue -> ConnInfo -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> AM () enqueueConfirmation c cData sq connInfo e2eEncryption_ = do storeConfirmation c cData sq e2eEncryption_ $ AgentConnInfo connInfo - lift $ submitPendingMsg c cData sq + lift $ submitPendingMsg c sq storeConfirmation :: AgentClient -> ConnData -> SndQueue -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> AgentMessage -> AM () storeConfirmation c cData@ConnData {connId, pqSupport, connAgentVersion = v} sq e2eEncryption_ agentMsg = do @@ -3326,18 +3391,18 @@ storeConfirmation c cData@ConnData {connId, pqSupport, connAgentVersion = v} sq msgType = agentMessageType agentMsg msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgBody, pqEncryption, msgFlags = SMP.MsgFlags {notification = True}, internalHash, prevMsgHash, sndMsgPrepData_ = Nothing} liftIO $ createSndMsg db connId msgData - liftIO $ createSndMsgDelivery db connId sq internalId + liftIO $ createSndMsgDelivery db sq internalId -enqueueRatchetKeyMsgs :: AgentClient -> ConnData -> NonEmpty SndQueue -> CR.RcvE2ERatchetParams 'C.X448 -> AM () -enqueueRatchetKeyMsgs c cData (sq :| sqs) e2eEncryption = do - msgId <- enqueueRatchetKey c cData sq e2eEncryption - mapM_ (lift . enqueueSavedMessage c cData msgId) $ filter isActiveSndQ sqs +enqueueRatchetKeyMsgs :: AgentClient -> NonEmpty SndQueue -> CR.RcvE2ERatchetParams 'C.X448 -> AM () +enqueueRatchetKeyMsgs c (sq :| sqs) e2eEncryption = do + msgId <- enqueueRatchetKey c sq e2eEncryption + mapM_ (lift . enqueueSavedMessage c msgId) $ filter isActiveSndQ sqs -enqueueRatchetKey :: AgentClient -> ConnData -> SndQueue -> CR.RcvE2ERatchetParams 'C.X448 -> AM AgentMsgId -enqueueRatchetKey c cData@ConnData {connId} sq e2eEncryption = do +enqueueRatchetKey :: AgentClient -> SndQueue -> CR.RcvE2ERatchetParams 'C.X448 -> AM AgentMsgId +enqueueRatchetKey c sq@SndQueue {connId} e2eEncryption = do aVRange <- asks $ smpAgentVRange . config msgId <- storeRatchetKey $ maxVersion aVRange - lift $ submitPendingMsg c cData sq + lift $ submitPendingMsg c sq pure $ unId msgId where storeRatchetKey :: VersionSMPA -> AM InternalId @@ -3352,7 +3417,7 @@ enqueueRatchetKey c cData@ConnData {connId} sq e2eEncryption = do -- this message is e2e encrypted with queue key, not with double ratchet msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgBody, pqEncryption = PQEncOff, msgFlags = SMP.MsgFlags {notification = True}, internalHash, prevMsgHash, sndMsgPrepData_ = Nothing} liftIO $ createSndMsg db connId msgData - liftIO $ createSndMsgDelivery db connId sq internalId + liftIO $ createSndMsgDelivery db sq internalId pure internalId -- encoded AgentMessage -> encoded EncAgentMessage diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index db30bdff5..88d5a5bce 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -48,6 +48,7 @@ module Simplex.Messaging.Agent.Client newRcvQueue, newRcvQueue_, subscribeQueues, + subscribeUserServerQueues, getQueueMessage, decryptSMPMessage, failSubscription, @@ -427,7 +428,7 @@ getAgentWorker' toW fromW name hasWork c@AgentClient {agentEnv} key ws work = do newWorker :: AgentClient -> STM Worker newWorker c = do workerId <- stateTVar (workerSeq c) $ \next -> (next, next + 1) - doWork <- newTMVar () + doWork <- newTMVar () -- new worker is created with "some work to do" (indicated by () in TMVar) action <- newTMVar Nothing restarts <- newTVar $ RestartCount 0 0 pure Worker {workerId, doWork, action, restarts} @@ -733,7 +734,7 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess mode <- getSessionModeIO c let resubscribe | (mode == TSMEntity) == isJust cId = resubscribeSMPSession c tSess - | otherwise = void $ subscribeQueues c qs True + | otherwise = void $ subscribeQueues c True qs runReaderT resubscribe env resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' () @@ -1401,6 +1402,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl shortLink, clientService = ClientService DBNewEntity <$> serviceId, status = New, + enableNtfs, dbQueueId = DBNewEntity, primary = True, dbReplaceQueueId = Nothing, @@ -1509,30 +1511,51 @@ serverHostError = \case _ -> False -- | Batch by transport session and subscribe queues. The list of results can have a different order. -subscribeQueues :: AgentClient -> [RcvQueueSub] -> Bool -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))] -subscribeQueues c qs withEvents = do +subscribeQueues :: AgentClient -> Bool -> [RcvQueueSub] -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))] +subscribeQueues c withEvents qs = do (errs, qs') <- checkQueues c qs atomically $ modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId qs')) qss <- batchQueues mkSMPTSession qs' <$> getSessionModeIO c - mapM_ addPendingSubs qss - rs <- mapConcurrently subscribeQueues_ qss + mapM_ (addPendingSubs c) qss + rs <- mapConcurrently (subscribeQueues_ c withEvents) qss when withEvents $ forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map (first qConnId) pure $ map (second Left) errs <> concatMap L.toList rs + +addPendingSubs :: AgentClient -> (SMPTransportSession, NonEmpty RcvQueueSub) -> AM' () +addPendingSubs c (tSess, qs') = atomically $ SS.batchAddPendingSubs tSess (L.toList qs') $ currentSubs c + +subscribeQueues_ :: AgentClient -> Bool -> (SMPTransportSession, NonEmpty RcvQueueSub) -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId)) +subscribeQueues_ c withEvents qs'@(tSess@(_, srv, _), _) = do + (rs, active) <- subscribeSessQueues_ c withEvents qs' + if active + then when (hasTempErrors rs) resubscribe $> rs + else do + logWarn "subcription batch result for replaced SMP client, resubscribing" + -- we use BROKER NETWORK error here instead of the original error, so it becomes temporary. + resubscribe $> L.map (second $ Left . toNESubscribeError) rs where - addPendingSubs (tSess, qs') = atomically $ SS.batchAddPendingSubs tSess (L.toList qs') $ currentSubs c - subscribeQueues_ qs'@(tSess@(_, srv, _), _) = do - (rs, active) <- subscribeSessQueues_ c qs' withEvents - if active - then when (hasTempErrors rs) resubscribe $> rs - else do - logWarn "subcription batch result for replaced SMP client, resubscribing" - -- we use BROKER NETWORK error here instead of the original error, so it becomes temporary. - resubscribe $> L.map (second $ Left . toNESubscribeError) rs - where - -- treating host errors as temporary here as well - hasTempErrors = any (either temporaryOrHostError (const False) . snd) - toNESubscribeError = BROKER (B.unpack $ strEncode srv) . NETWORK . NESubscribeError . show - resubscribe = resubscribeSMPSession c tSess + -- treating host errors as temporary here as well + hasTempErrors = any (either temporaryOrHostError (const False) . snd) + toNESubscribeError = BROKER (B.unpack $ strEncode srv) . NETWORK . NESubscribeError . show + resubscribe = resubscribeSMPSession c tSess + +subscribeUserServerQueues :: AgentClient -> UserId -> SMPServer -> [RcvQueueSub] -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))] +subscribeUserServerQueues c userId srv qs = do + mode <- getSessionModeIO c + if mode == TSMEntity + then subscribeQueues c True qs + else do + let tSess = (userId, srv, Nothing) + (errs, qs_) <- checkQueues c qs + forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map (first qConnId) + let errs' = map (second Left) errs + case L.nonEmpty qs_ of + Just qs' -> do + atomically $ modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId $ L.toList qs')) + addPendingSubs c (tSess, qs') + rs <- subscribeQueues_ c True (tSess, qs') + pure $ errs' <> L.toList rs + Nothing -> pure errs' -- only "checked" queues are subscribed checkQueues :: AgentClient -> [RcvQueueSub] -> AM' ([(RcvQueueSub, AgentErrorType)], [RcvQueueSub]) @@ -1547,14 +1570,14 @@ checkQueues c = fmap partitionEithers . mapM checkQueue resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [RcvQueueSub] -> AM' () resubscribeSessQueues c tSess qs = do (errs, qs_) <- checkQueues c qs - forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c (tSess, qs') True + forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c True (tSess, qs') forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map (first qConnId) -subscribeSessQueues_ :: AgentClient -> (SMPTransportSession, NonEmpty RcvQueueSub) -> Bool -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool) -subscribeSessQueues_ c qs withEvents = sendClientBatch_ "SUB" False subscribeQueues_ c NRMBackground qs +subscribeSessQueues_ :: AgentClient -> Bool -> (SMPTransportSession, NonEmpty RcvQueueSub) -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool) +subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c NRMBackground qs where - subscribeQueues_ :: SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId), Bool) - subscribeQueues_ smp qs' = do + subscribe_ :: SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId), Bool) + subscribe_ smp qs' = do let (userId, srv, _) = tSess atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs' rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs' @@ -1646,12 +1669,6 @@ getRemovedSubs AgentClient {removedSubs} k = TM.lookup k removedSubs >>= maybe n TM.insert k s removedSubs pure s -addPendingSubscription :: AgentClient -> RcvQueueSub -> STM () -addPendingSubscription c rq = do - modifyTVar' (subscrConns c) $ S.insert $ qConnId rq - tSess <- mkSMPTransportSession c rq - SS.addPendingSub tSess rq $ currentSubs c - addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> AM' () addNewQueueSubscription c rq' tSess sessId = do let rq = rcvQueueSub rq' diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index e15ffa48c..a6cd86d11 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -166,6 +166,7 @@ data AgentConfig = AgentConfig ntfBatchSize :: Int, ntfSubFirstCheckInterval :: NominalDiffTime, ntfSubCheckInterval :: NominalDiffTime, + maxPendingSubscriptions :: Int, caCertificateFile :: FilePath, privateKeyFile :: FilePath, certificateFile :: FilePath, @@ -237,6 +238,7 @@ defaultAgentConfig = ntfBatchSize = 150, ntfSubFirstCheckInterval = nominalDay, ntfSubCheckInterval = 3 * nominalDay, + maxPendingSubscriptions = 35000, -- CA certificate private key is not needed for initialization -- ! we do not generate these caCertificateFile = "/etc/opt/simplex-agent/ca.crt", diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 543974660..801ac6727 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -88,6 +88,8 @@ data StoredRcvQueue (q :: DBStored) = RcvQueue clientService :: Maybe (StoredClientService q), -- | queue status status :: QueueStatus, + -- | to enable notifications for this queue - this field is duplicated from ConnData + enableNtfs :: Bool, -- | database queue ID (within connection) dbQueueId :: DBEntityId' q, -- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set) @@ -110,6 +112,7 @@ data RcvQueueSub = RcvQueueSub rcvId :: SMP.RecipientId, rcvPrivateKey :: RcvPrivateAuthKey, status :: QueueStatus, + enableNtfs :: Bool, dbQueueId :: Int64, primary :: Bool, dbReplaceQueueId :: Maybe Int64 @@ -117,8 +120,8 @@ data RcvQueueSub = RcvQueueSub deriving (Show) rcvQueueSub :: RcvQueue -> RcvQueueSub -rcvQueueSub RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, status, dbQueueId = DBEntityId dbQueueId, primary, dbReplaceQueueId} = - RcvQueueSub {userId, connId, server, rcvId, rcvPrivateKey, status, dbQueueId, primary, dbReplaceQueueId} +rcvQueueSub RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, status, enableNtfs, dbQueueId = DBEntityId dbQueueId, primary, dbReplaceQueueId} = + RcvQueueSub {userId, connId, server, rcvId, rcvPrivateKey, status, enableNtfs, dbQueueId, primary, dbReplaceQueueId} data ShortLinkCreds = ShortLinkCreds { shortLinkId :: SMP.LinkId, diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index b24ceaf74..212d65584 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -39,6 +39,8 @@ module Simplex.Messaging.Agent.Store.AgentStore updateNewConnRcv, updateNewConnSnd, createSndConn, + getSubscriptionServers, + getUserServerRcvQueueSubs, getConn, getDeletedConn, getConns, @@ -110,6 +112,7 @@ module Simplex.Messaging.Agent.Store.AgentStore updateSndMsgRcpt, getPendingQueueMsg, getConnectionsForDelivery, + getAllSndQueuesForDelivery, updatePendingMsgRIState, deletePendingMsgs, getExpiredSndMessages, @@ -137,6 +140,7 @@ module Simplex.Messaging.Agent.Store.AgentStore -- Async commands createCommand, getPendingCommandServers, + getAllPendingCommandConns, getPendingServerCommand, updateCommandServer, deleteCommand, @@ -884,8 +888,8 @@ createSndMsg db connId sndMsgData@SndMsgData {internalSndId, internalHash} = do insertSndMsgDetails_ db connId sndMsgData updateSndMsgHash db connId internalSndId internalHash -createSndMsgDelivery :: DB.Connection -> ConnId -> SndQueue -> InternalId -> IO () -createSndMsgDelivery db connId SndQueue {dbQueueId} msgId = +createSndMsgDelivery :: DB.Connection -> SndQueue -> InternalId -> IO () +createSndMsgDelivery db SndQueue {connId, dbQueueId} msgId = DB.execute db "INSERT INTO snd_message_deliveries (conn_id, snd_queue_id, internal_id) VALUES (?, ?, ?)" (connId, dbQueueId, msgId) getSndMsgViaRcpt :: DB.Connection -> ConnId -> InternalSndId -> IO (Either StoreError SndMsg) @@ -917,6 +921,15 @@ getConnectionsForDelivery :: DB.Connection -> IO [ConnId] getConnectionsForDelivery db = map fromOnly <$> DB.query_ db "SELECT DISTINCT conn_id FROM snd_message_deliveries WHERE failed = 0" +getAllSndQueuesForDelivery :: DB.Connection -> IO [SndQueue] +getAllSndQueuesForDelivery db = map toSndQueue <$> DB.query_ db (sndQueueQuery <> " " <> delivery) + where + delivery = [sql| + JOIN (SELECT DISTINCT conn_id, snd_queue_id FROM snd_message_deliveries WHERE failed = 0) d + ON d.conn_id = q.conn_id AND d.snd_queue_id = q.snd_queue_id + WHERE c.deleted = 0 + |] + getPendingQueueMsg :: DB.Connection -> ConnId -> SndQueue -> IO (Either StoreError (Maybe (Maybe RcvQueue, PendingMsgData))) getPendingQueueMsg db connId SndQueue {dbQueueId} = getWorkItem "message" getMsgId getMsgData markMsgFailed @@ -1319,6 +1332,21 @@ getPendingCommandServers db connIds = smpServer (host, port, keyHash) = SMPServer <$> host <*> port <*> keyHash conns = S.fromList connIds +getAllPendingCommandConns :: DB.Connection -> IO [(ConnId, Maybe SMPServer)] +getAllPendingCommandConns db = + map toResult + <$> DB.query_ + db + [sql| + SELECT DISTINCT c.conn_id, c.host, c.port, COALESCE(c.server_key_hash, s.key_hash) + FROM commands c + JOIN connections cs ON c.conn_id = cs.conn_id + LEFT JOIN servers s ON s.host = c.host AND s.port = c.port + WHERE cs.deleted = 0 + |] + where + toResult (connId, host, port, keyHash) = (connId, SMPServer <$> host <*> port <*> keyHash) + getPendingServerCommand :: DB.Connection -> ConnId -> Maybe SMPServer -> IO (Either StoreError (Maybe PendingCommand)) getPendingServerCommand db connId srv_ = getWorkItem "command" getCmdId getCommand markCommandFailed where @@ -2023,6 +2051,30 @@ newQueueId_ :: [Only Int64] -> DBEntityId newQueueId_ [] = DBEntityId 1 newQueueId_ (Only maxId : _) = DBEntityId (maxId + 1) +-- * subscribe all connections + +getSubscriptionServers :: DB.Connection -> IO [(UserId, SMPServer)] +getSubscriptionServers db = + map toUserServer + <$> DB.query_ + db + [sql| + SELECT DISTINCT c.user_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash) + FROM rcv_queues q + JOIN servers s ON q.host = s.host AND q.port = s.port + JOIN connections c ON q.conn_id = c.conn_id + WHERE c.deleted = 0 AND q.deleted = 0 + |] + where + toUserServer :: (UserId, NonEmpty TransportHost, ServiceName, C.KeyHash) -> (UserId, SMPServer) + toUserServer (userId, host, port, keyHash) = (userId, SMPServer host port keyHash) + +getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub] +getUserServerRcvQueueSubs db userId srv = + map toRcvQueueSub <$> DB.query db (rcvQueueSubQuery <> condition) (userId, host srv, port srv) + where + condition = " WHERE c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ?" + -- * getConn helpers getConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn) @@ -2229,7 +2281,7 @@ rcvQueueQuery :: Query rcvQueueQuery = [sql| SELECT c.user_id, COALESCE(q.server_key_hash, s.key_hash), q.conn_id, q.host, q.port, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret, - q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.queue_mode, q.status, + q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.queue_mode, q.status, c.enable_ntfs, q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id, q.switch_status, q.smp_client_version, q.delete_errors, q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret, q.link_id, q.link_key, q.link_priv_sig_key, q.link_enc_fixed_data @@ -2240,13 +2292,13 @@ rcvQueueQuery = toRcvQueue :: (UserId, C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateAuthKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, Maybe QueueMode) - :. (QueueStatus, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int) + :. (QueueStatus, Maybe BoolInt, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int) :. (Maybe SMP.NtfPublicAuthKey, Maybe SMP.NtfPrivateAuthKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret) :. (Maybe SMP.LinkId, Maybe LinkKey, Maybe C.PrivateKeyEd25519, Maybe EncDataBytes) -> RcvQueue toRcvQueue ( (userId, keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode) - :. (status, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors) + :. (status, enableNtfs_, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) :. (shortLinkId_, shortLinkKey_, linkPrivSigKey_, linkEncFixedData_) ) = @@ -2258,8 +2310,9 @@ toRcvQueue shortLink = case (shortLinkId_, shortLinkKey_, linkPrivSigKey_, linkEncFixedData_) of (Just shortLinkId, Just shortLinkKey, Just linkPrivSigKey, Just linkEncFixedData) -> Just ShortLinkCreds {shortLinkId, shortLinkKey, linkPrivSigKey, linkEncFixedData} _ -> Nothing + enableNtfs = maybe True unBI enableNtfs_ -- TODO [certs rcv] read client service - in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode, shortLink, clientService = Nothing, status, dbQueueId, primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion, clientNtfCreds, deleteErrors} + in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode, shortLink, clientService = Nothing, status, enableNtfs, dbQueueId, primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion, clientNtfCreds, deleteErrors} -- | returns all connection queue credentials, the first queue is the primary one getRcvQueueSubsByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueueSub)) @@ -2270,16 +2323,17 @@ getRcvQueueSubsByConnId_ db connId = rcvQueueSubQuery :: Query rcvQueueSubQuery = [sql| - SELECT c.user_id, q.conn_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash), q.rcv_id, q.rcv_private_key, q.status, + SELECT c.user_id, q.conn_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash), q.rcv_id, q.rcv_private_key, q.status, c.enable_ntfs, q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id FROM rcv_queues q JOIN servers s ON q.host = s.host AND q.port = s.port JOIN connections c ON q.conn_id = c.conn_id |] -toRcvQueueSub :: (UserId, ConnId, NonEmpty TransportHost, ServiceName, C.KeyHash, SMP.RecipientId, SMP.RcvPrivateAuthKey, QueueStatus, Int64, BoolInt, Maybe Int64) -> RcvQueueSub -toRcvQueueSub (userId, connId, host, port, keyHash, rcvId, rcvPrivateKey, status, dbQueueId, BI primary, dbReplaceQueueId) = - RcvQueueSub {userId, connId, server = SMPServer host port keyHash, rcvId, dbQueueId, primary, dbReplaceQueueId, rcvPrivateKey, status} +toRcvQueueSub :: (UserId, ConnId, NonEmpty TransportHost, ServiceName, C.KeyHash, SMP.RecipientId, SMP.RcvPrivateAuthKey, QueueStatus, Maybe BoolInt, Int64, BoolInt, Maybe Int64) -> RcvQueueSub +toRcvQueueSub (userId, connId, host, port, keyHash, rcvId, rcvPrivateKey, status, enableNtfs_, dbQueueId, BI primary, dbReplaceQueueId) = + let enableNtfs = maybe True unBI enableNtfs_ + in RcvQueueSub {userId, connId, server = SMPServer host port keyHash, rcvId, rcvPrivateKey, status, enableNtfs, dbQueueId, primary, dbReplaceQueueId} getRcvQueueById :: DB.Connection -> ConnId -> Int64 -> IO (Either StoreError RcvQueue) getRcvQueueById db connId dbRcvId = diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 8a4d2bf58..39af85977 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -435,7 +435,7 @@ functionalAPITests ps = do describe "Batching SMP commands" $ do -- disable this and enable the following test to run tests with coverage it "should subscribe to multiple (200) subscriptions with batching" $ - testBatchedSubscriptions 200 10 ps + testBatchedSubscriptions 200 20 ps skip "faster version of the previous test (200 subscriptions gets very slow with test coverage)" $ it "should subscribe to multiple (6) subscriptions with batching" $ testBatchedSubscriptions 6 3 ps @@ -2391,8 +2391,8 @@ testSuspendingAgentTimeout ps = withAgentClients2 $ \a b -> do pure () testBatchedSubscriptions :: Int -> Int -> (ASrvTransport, AStoreType) -> IO () -testBatchedSubscriptions nCreate nDel ps@(t, ASType qsType _) = - withAgentClientsCfgServers2 agentCfg agentCfg initAgentServers2 $ \a b -> do +testBatchedSubscriptions nCreate nDel ps@(t, ASType qsType _) = do + (conns, conns') <- withAgentClientsCfgServers2 agentCfg agentCfg initAgentServers2 $ \a b -> do conns <- runServers $ do conns <- replicateM nCreate $ makeConnection_ PQSupportOff True a b forM_ conns $ \(aId, bId) -> exchangeGreetings_ PQEncOff a bId b aId @@ -2401,21 +2401,23 @@ testBatchedSubscriptions nCreate nDel ps@(t, ASType qsType _) = delete b aIds' liftIO $ threadDelay 1000000 pure conns - ("", "", DOWN {}) <- nGet a - ("", "", DOWN {}) <- nGet a - ("", "", DOWN {}) <- nGet b - ("", "", DOWN {}) <- nGet b + let conns' = drop nDel conns + (aIds', bIds') = unzip conns' + down a bIds' + down b aIds' + runServers $ do + up a bIds' + up b aIds' + down a bIds' + down b aIds' + pure (conns, conns') + withAgentClientsCfgServers2 agentCfg agentCfg initAgentServers2 $ \a b -> do runServers $ do - ("", "", UP {}) <- nGet a - ("", "", UP {}) <- nGet a - ("", "", UP {}) <- nGet b - ("", "", UP {}) <- nGet b liftIO $ threadDelay 1000000 let (aIds, bIds) = unzip conns - conns' = drop nDel conns (aIds', bIds') = unzip conns' - subscribe a bIds - subscribe b aIds + subscribe a bIds' + subscribe b aIds' forM_ conns' $ \(aId, bId) -> exchangeGreetingsMsgId_ PQEncOff 4 a bId b aId void $ resubscribeConnections a bIds void $ resubscribeConnections b aIds @@ -2425,14 +2427,18 @@ testBatchedSubscriptions nCreate nDel ps@(t, ASType qsType _) = deleteFail a bIds' deleteFail b aIds' where + down c cs = do + ("", "", DOWN _ cs1) <- nGet c + ("", "", DOWN _ cs2) <- nGet c + liftIO $ S.fromList (cs1 ++ cs2) `shouldBe` S.fromList cs + up c cs = do + ("", "", UP _ cs1) <- nGet c + ("", "", UP _ cs2) <- nGet c + liftIO $ S.fromList (cs1 ++ cs2) `shouldBe` S.fromList cs subscribe :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO () subscribe c cs = do - r <- subscribeConnections c cs - liftIO $ do - let dc = S.fromList $ take nDel cs - all isRight (M.withoutKeys r dc) `shouldBe` True - all (== Left (CONN NOT_FOUND "")) (M.restrictKeys r dc) `shouldBe` True - M.keys r `shouldMatchList` cs + subscribeAllConnections c + liftIO $ up c cs delete :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO () delete c cs = do r <- deleteConnections c cs @@ -2462,8 +2468,10 @@ testBatchedPendingMessages nCreate nMsgs = runRight_ $ forM_ msgConns $ \(_, bId) -> sendMessage a bId SMP.noMsgFlags "hello" replicateM_ nMsgs $ get a =##> \case ("", cId, SENT _) -> isJust $ find ((cId ==) . snd) msgConns; _ -> False withB $ \b -> runRight_ $ do - r <- subscribeConnections b $ map fst conns - liftIO $ all isRight r `shouldBe` True + let aIds = map fst conns + subscribeAllConnections b + ("", "", UP _ aIds') <- nGet b + liftIO $ S.fromList aIds' `shouldBe` S.fromList aIds replicateM_ nMsgs $ do ("", cId, Msg' msgId _ "hello") <- get b liftIO $ isJust (find ((cId ==) . fst) msgConns) `shouldBe` True diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 49e35ab82..b3681965f 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -229,6 +229,7 @@ rcvQueue1 = shortLink = Nothing, clientService = Nothing, status = New, + enableNtfs = True, dbQueueId = DBNewEntity, primary = True, dbReplaceQueueId = Nothing, @@ -441,6 +442,7 @@ testUpgradeSndConnToDuplex = shortLink = Nothing, clientService = Nothing, status = New, + enableNtfs = True, dbQueueId = DBNewEntity, rcvSwchStatus = Nothing, primary = True, @@ -564,7 +566,7 @@ testCreateSndMsg_ db expectedPrevHash connId sq sndMsgData@SndMsgData {..} = do `shouldReturn` Right (internalId, internalSndId, expectedPrevHash) createSndMsg db connId sndMsgData `shouldReturn` () - createSndMsgDelivery db connId sq internalId + createSndMsgDelivery db sq internalId `shouldReturn` () testCreateSndMsg :: SpecWith DBStore diff --git a/tests/CoreTests/TSessionSubs.hs b/tests/CoreTests/TSessionSubs.hs index b89b397c7..d31aa323f 100644 --- a/tests/CoreTests/TSessionSubs.hs +++ b/tests/CoreTests/TSessionSubs.hs @@ -125,6 +125,7 @@ dummyRQ userId server connId rcvId = rcvId, rcvPrivateKey = C.APrivateAuthKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe", status = New, + enableNtfs = False, dbQueueId = 0, primary = True, dbReplaceQueueId = Nothing diff --git a/tests/Test.hs b/tests/Test.hs index 153f176f6..3e36e192d 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -103,8 +103,8 @@ main = do testStoreDBOpts "src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql" around_ (postgressBracket testServerDBConnectInfo) $ do - xdescribe "SMP server via TLS, postgres+jornal message store" $ - before (pure (transport @TLS, ASType SQSPostgres SMSJournal)) serverTests + -- xdescribe "SMP server via TLS, postgres+jornal message store" $ + -- before (pure (transport @TLS, ASType SQSPostgres SMSJournal)) serverTests describe "SMP server via TLS, postgres-only message store" $ before (pure (transport @TLS, ASType SQSPostgres SMSPostgres)) serverTests #endif @@ -128,19 +128,19 @@ main = do describe "Notifications server (SMP server: jornal store)" $ ntfServerTests (transport @TLS, ASType SQSMemory SMSJournal) around_ (postgressBracket testServerDBConnectInfo) $ do - xdescribe "Notifications server (SMP server: postgres+jornal store)" $ - ntfServerTests (transport @TLS, ASType SQSPostgres SMSJournal) + -- xdescribe "Notifications server (SMP server: postgres+jornal store)" $ + -- ntfServerTests (transport @TLS, ASType SQSPostgres SMSJournal) describe "Notifications server (SMP server: postgres-only store)" $ ntfServerTests (transport @TLS, ASType SQSPostgres SMSPostgres) around_ (postgressBracket testServerDBConnectInfo) $ do - xdescribe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal) + -- xdescribe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal) describe "SMP client agent, server postgres-only message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSPostgres) - xdescribe "SMP proxy, postgres+jornal message store" $ - before (pure $ ASType SQSPostgres SMSJournal) smpProxyTests + -- xdescribe "SMP proxy, postgres+jornal message store" $ + -- before (pure $ ASType SQSPostgres SMSJournal) smpProxyTests describe "SMP proxy, postgres-only message store" $ before (pure $ ASType SQSPostgres SMSPostgres) smpProxyTests #endif - xdescribe "SMP client agent, server jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal) + -- xdescribe "SMP client agent, server jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal) describe "SMP client agent, server memory message store" $ agentTests (transport @TLS, ASType SQSMemory SMSMemory) describe "SMP proxy, jornal message store" $ before (pure $ ASType SQSMemory SMSJournal) smpProxyTests