diff --git a/simplexmq.cabal b/simplexmq.cabal index 3223328b2..cfe61a802 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -108,7 +108,7 @@ library Simplex.Messaging.Agent.Store.Migrations.App Simplex.Messaging.Agent.Store.Postgres.Options Simplex.Messaging.Agent.Store.Shared - Simplex.Messaging.Agent.TRcvQueues + Simplex.Messaging.Agent.TSessionSubs Simplex.Messaging.Client Simplex.Messaging.Client.Agent Simplex.Messaging.Compression @@ -474,7 +474,7 @@ test-suite simplexmq-test CoreTests.RetryIntervalTests CoreTests.SOCKSSettings CoreTests.StoreLogTests - CoreTests.TRcvQueuesTests + CoreTests.TSessionSubs CoreTests.UtilTests CoreTests.VersionRangeTests FileDescriptionTests diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index bd9026771..1fdec5e45 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -555,13 +555,14 @@ testProtocolServer c nm userId srv = withAgentEnv' c $ case protocolTypeI @p of -- | set SOCKS5 proxy on/off and optionally set TCP timeouts for fast network setNetworkConfig :: AgentClient -> NetworkConfig -> IO () setNetworkConfig c@AgentClient {useNetworkConfig, proxySessTs} cfg' = do - (spChanged, changed) <- atomically $ do + ts <- getCurrentTime + changed <- atomically $ do (_, cfg) <- readTVar useNetworkConfig let changed = cfg /= cfg' !cfgSlow = slowNetworkConfig cfg' when changed $ writeTVar useNetworkConfig (cfgSlow, cfg') - pure (socksProxy cfg /= socksProxy cfg', changed) - when spChanged $ getCurrentTime >>= atomically . writeTVar proxySessTs + when (socksProxy cfg /= socksProxy cfg') $ writeTVar proxySessTs ts + pure changed when changed $ reconnectAllServers c setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO () @@ -1270,7 +1271,7 @@ subscribeConnections_ c conns = do let (subRs, cs) = foldr partitionResultsConns ([], []) conns resumeDelivery cs resumeConnCmds c $ map fst cs - rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concatMap rcvQueues cs) + rcvRs <- lift $ connResults <$> subscribeQueues c (concatMap rcvQueues cs) False rcvRs' <- storeClientServiceAssocs rcvRs ns <- asks ntfSupervisor lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs' cs diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index c984250d2..f5f303b83 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -201,6 +201,8 @@ import Data.Bifunctor (bimap, first, second) import qualified Data.ByteString.Base64 as B64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import Data.Composition ((.:), (.:.)) +import Data.Containers.ListUtils (nubOrd) import Data.Either (isRight, partitionEithers) import Data.Functor (($>)) import Data.Int (Int64) @@ -234,8 +236,8 @@ import Simplex.Messaging.Agent.Stats import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction) import qualified Simplex.Messaging.Agent.Store.DB as DB -import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues (getRcvQueues)) -import qualified Simplex.Messaging.Agent.TRcvQueues as RQ +import Simplex.Messaging.Agent.TSessionSubs (TSessionSubs) +import qualified Simplex.Messaging.Agent.TSessionSubs as SS import Simplex.Messaging.Client import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding @@ -311,8 +313,6 @@ type NtfClientVar = ClientVar NtfResponse type XFTPClientVar = ClientVar FileResponse -type SMPTransportSession = TransportSession SMP.BrokerMsg - type NtfTransportSession = TransportSession NtfResponse type XFTPTransportSession = TransportSession FileResponse @@ -337,8 +337,7 @@ data AgentClient = AgentClient userNetworkInfo :: TVar UserNetworkInfo, userNetworkUpdated :: TVar (Maybe UTCTime), subscrConns :: TVar (Set ConnId), - activeSubs :: TRcvQueues (SessionId, RcvQueueSub), - pendingSubs :: TRcvQueues RcvQueueSub, + currentSubs :: TSessionSubs, removedSubs :: TMap (UserId, SMPServer, SMP.RecipientId) SMPClientError, workerSeq :: TVar Int, smpDeliveryWorkers :: TMap SndQAddr (Worker, TMVar ()), @@ -505,8 +504,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai userNetworkInfo <- newTVarIO $ UserNetworkInfo UNOther True userNetworkUpdated <- newTVarIO Nothing subscrConns <- newTVarIO S.empty - activeSubs <- RQ.empty - pendingSubs <- RQ.empty + currentSubs <- SS.emptyIO removedSubs <- TM.emptyIO workerSeq <- newTVarIO 0 smpDeliveryWorkers <- TM.emptyIO @@ -544,8 +542,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai userNetworkInfo, userNetworkUpdated, subscrConns, - activeSubs, - pendingSubs, + currentSubs, removedSubs, workerSeq, smpDeliveryWorkers, @@ -701,10 +698,11 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do ts <- readTVarIO proxySessTs smp <- ExceptT $ getProtocolClient g nm tSess cfg presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs + atomically $ SS.setSessionId (sessionId $ thParams smp) tSess $ currentSubs c pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs} smpClientDisconnected :: AgentClient -> SMPTransportSession -> Env -> SMPClientVar -> TMap SMPServer ProxiedRelayVar -> SMPClient -> IO () -smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess@(userId, srv, qId) env v prs client = do +smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess@(userId, srv, cId) env v prs client = do removeClientAndSubs >>= serverDown logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv where @@ -718,23 +716,26 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess where sessId = sessionId $ thParams client removeSubs = do - (qs, cs) <- RQ.getDelSessQueues tSess sessId $ activeSubs c - RQ.batchAddQueues qs $ pendingSubs c + mode <- getSessionMode c + subs <- SS.setSubsPending mode tSess sessId $ currentSubs c + let qs = M.elems subs + cs = nubOrd $ map qConnId qs -- this removes proxied relays that this client created sessions to destSrvs <- M.keys <$> readTVar prs - forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, qId) smpProxiedRelays + forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, cId) smpProxiedRelays pure (qs, cs) serverDown :: ([RcvQueueSub], [ConnId]) -> IO () serverDown (qs, conns) = whenM (readTVarIO active) $ do - notifySub "" $ hostEvent' DISCONNECT client - unless (null conns) $ notifySub "" $ DOWN srv conns + notifySub c "" $ hostEvent' DISCONNECT client + unless (null conns) $ notifySub c "" $ DOWN srv conns unless (null qs) $ do - atomically $ mapM_ (releaseGetLock c) qs - runReaderT (resubscribeSMPSession c tSess) env - - notifySub :: forall e. AEntityI e => ConnId -> AEvent e -> IO () - notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd) + releaseGetLocksIO c qs + mode <- getSessionModeIO c + let resubscribe + | (mode == TSMEntity) == isJust cId = resubscribeSMPSession c tSess + | otherwise = void $ subscribeQueues c qs True + runReaderT resubscribe env resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' () resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do @@ -743,7 +744,7 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do where getWorkerVar ts = ifM - (not <$> RQ.hasSessQueues tSess (pendingSubs c)) + (not <$> SS.hasPendingSubs tSess (currentSubs c)) (pure Nothing) -- prevent race with cleanup and adding pending queues in another call (Just <$> getSessVar workerSeq tSess smpSubWorkers ts) newSubWorker v = do @@ -752,11 +753,11 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do runSubWorker = do ri <- asks $ reconnectInterval . config withRetryForeground ri isForeground (isNetworkOnline c) $ \_ loop -> do - pending <- liftIO $ RQ.getSessQueues tSess $ pendingSubs c - forM_ (L.nonEmpty pending) $ \qs -> do + pending <- atomically $ SS.getPendingSubs tSess $ currentSubs c + unless (M.null pending) $ do liftIO $ waitUntilForeground c liftIO $ waitForUserNetwork c - reconnectSMPClient c tSess qs + handleNotify $ resubscribeSessQueues c tSess $ M.elems pending loop isForeground = (ASForeground ==) <$> readTVar (agentState c) cleanup :: SessionVar (Async ()) -> STM () @@ -765,28 +766,11 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do -- Not waiting may result in terminated worker remaining in the map. whenM (isEmptyTMVar $ sessionVar v) retry removeSessVar v tSess smpSubWorkers - -reconnectSMPClient :: AgentClient -> SMPTransportSession -> NonEmpty RcvQueueSub -> AM' () -reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do - cs <- liftIO $ RQ.getSessConns tSess $ activeSubs c - (rs, sessId_) <- subscribeQueues c $ L.toList qs - let (errs, okConns) = partitionEithers $ map (\(RcvQueueSub {connId}, r) -> bimap (connId,) (const connId) r) rs - conns = filter (`S.notMember` cs) okConns - unless (null conns) $ notifySub "" $ UP srv conns - let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs - mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs - forM_ (listToMaybe tempErrs) $ \(connId, e) -> do - when (null okConns && S.null cs && null finalErrs) . liftIO $ - forM_ sessId_ $ \sessId -> do - -- We only close the client session that was used to subscribe. - v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing) - mapM_ (closeClient_ c) v_ - notifySub connId $ ERR e - where handleNotify :: AM' () -> AM' () - handleNotify = E.handleAny $ notifySub "" . ERR . INTERNAL . show - notifySub :: forall e. AEntityI e => ConnId -> AEvent e -> AM' () - notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd) + handleNotify = E.handleAny $ notifySub c "" . ERR . INTERNAL . show + +notifySub :: forall e m. (AEntityI e, MonadIO m) => AgentClient -> ConnId -> AEvent e -> m () +notifySub c connId cmd = liftIO $ nonBlockingWriteTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd) getNtfServerClient :: AgentClient -> NetworkRequestMode -> NtfTransportSession -> AM NtfClient getNtfServerClient c@AgentClient {active, ntfClients, workerSeq, proxySessTs, presetDomains} nm tSess@(_, srv, _) = do @@ -929,8 +913,7 @@ closeAgentClient c = do atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst) clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker - atomically . RQ.clear $ activeSubs c - atomically . RQ.clear $ pendingSubs c + atomically $ SS.clear $ currentSubs c clear subscrConns clear getMsgLocks where @@ -1071,7 +1054,7 @@ withLogClient c nm tSess entId cmdStr action = withLogClient_ c nm tSess entId c withSMPClient :: SMPQueueRec q => AgentClient -> NetworkRequestMode -> q -> ByteString -> (SMPClient -> ExceptT SMPClientError IO a) -> AM a withSMPClient c nm q cmdStr action = do - tSess <- mkSMPTransportSession c q + tSess <- mkSMPTransportSessionIO c q withLogClient c nm tSess (unEntityId $ queueId q) cmdStr $ action . connectedClient sendOrProxySMPMessage :: AgentClient -> NetworkRequestMode -> UserId -> SMPServer -> ConnId -> ByteString -> Maybe SMP.SndPrivateAuthKey -> SMP.SenderId -> MsgFlags -> SMP.MsgBody -> AM (Maybe SMPServer) @@ -1336,14 +1319,18 @@ getXFTPWorkPath = do maybe getTemporaryDirectory pure workDir mkTransportSession :: MonadIO m => AgentClient -> UserId -> ProtoServer msg -> ByteString -> m (TransportSession msg) -mkTransportSession c userId srv sessEntId = mkTSession userId srv sessEntId <$> getSessionMode c +mkTransportSession c userId srv sessEntId = mkTSession userId srv sessEntId <$> getSessionModeIO c {-# INLINE mkTransportSession #-} mkTSession :: UserId -> ProtoServer msg -> ByteString -> TransportSessionMode -> TransportSession msg mkTSession userId srv sessEntId mode = (userId, srv, if mode == TSMEntity then Just sessEntId else Nothing) {-# INLINE mkTSession #-} -mkSMPTransportSession :: (SMPQueueRec q, MonadIO m) => AgentClient -> q -> m SMPTransportSession +mkSMPTransportSessionIO :: (SMPQueueRec q, MonadIO m) => AgentClient -> q -> m SMPTransportSession +mkSMPTransportSessionIO c q = mkSMPTSession q <$> getSessionModeIO c +{-# INLINE mkSMPTransportSessionIO #-} + +mkSMPTransportSession :: SMPQueueRec q => AgentClient -> q -> STM SMPTransportSession mkSMPTransportSession c q = mkSMPTSession q <$> getSessionMode c {-# INLINE mkSMPTransportSession #-} @@ -1351,8 +1338,12 @@ mkSMPTSession :: SMPQueueRec q => q -> TransportSessionMode -> SMPTransportSessi mkSMPTSession q = mkTSession (qUserId q) (qServer q) (qConnId q) {-# INLINE mkSMPTSession #-} -getSessionMode :: MonadIO m => AgentClient -> m TransportSessionMode -getSessionMode = fmap sessionMode . getNetworkConfig +getSessionModeIO :: MonadIO m => AgentClient -> m TransportSessionMode +getSessionModeIO = fmap (sessionMode . snd) . readTVarIO . useNetworkConfig +{-# INLINE getSessionModeIO #-} + +getSessionMode :: AgentClient -> STM TransportSessionMode +getSessionMode = fmap (sessionMode . snd) . readTVar . useNetworkConfig {-# INLINE getSessionMode #-} newRcvQueue :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SConnectionMode c -> Bool -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId) @@ -1500,46 +1491,84 @@ serverHostError = \case SMP.TRANSPORT TEVersion -> True _ -> False --- | Subscribe to queues. The list of results can have a different order. -subscribeQueues :: AgentClient -> [RcvQueueSub] -> AM' ([(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))], Maybe SessionId) -subscribeQueues c qs = do - (errs, qs') <- partitionEithers <$> mapM checkQueue qs - atomically $ do - modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId qs')) - RQ.batchAddQueues qs' $ pendingSubs c - env <- ask - -- only "checked" queues are subscribed - session <- newTVarIO Nothing - rs <- sendTSessionBatches "SUB" mkSMPTSession (subscribeQueues_ env session) c NRMBackground qs' - (errs <> rs,) <$> readTVarIO session +-- | Batch by transport session and subscribe queues. The list of results can have a different order. +subscribeQueues :: AgentClient -> [RcvQueueSub] -> Bool -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))] +subscribeQueues c qs withEvents = do + (errs, qs') <- checkQueues c qs + atomically $ modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId qs')) + qss <- batchQueues mkSMPTSession c qs' <$> getSessionModeIO c + mapM_ addPendingSubs qss + rs <- mapConcurrently subscribeQueues_ qss + when (withEvents && not (null errs)) $ notifySub c "" $ ERRS $ map (first qConnId) errs + pure $ map (second Left) errs <> concatMap L.toList rs where - checkQueue rq = do - prohibited <- liftIO $ hasGetLock c rq - pure $ if prohibited then Left (rq, Left $ CMD PROHIBITED "subscribeQueues") else Right rq - subscribeQueues_ :: Env -> TVar (Maybe SessionId) -> SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId)) - subscribeQueues_ env session smp qs' = do - let (userId, srv, _) = transportSession' smp - atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs' - rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs' - active <- - atomically $ - ifM - (activeClientSession c tSess sessId) - (writeTVar session (Just sessId) >> processSubResults rs $> True) - (incSMPServerStat' c userId srv connSubIgnored (length rs) $> False) + addPendingSubs (tSess, qs') = atomically $ SS.batchAddPendingSubs (L.toList qs') tSess $ currentSubs c + subscribeQueues_ qs'@(tSess@(_, srv, _), _) = do + (rs, active) <- subscribeSessQueues_ c qs' withEvents if active then when (hasTempErrors rs) resubscribe $> rs else do logWarn "subcription batch result for replaced SMP client, resubscribing" - -- TODO we probably use PCENetworkError here instead of the original error, so it becomes temporary. - resubscribe $> L.map (second $ Left . PCENetworkError . NESubscribeError . show) rs + -- we use BROKER NETWORK error here instead of the original error, so it becomes temporary. + resubscribe $> L.map (second $ Left . toNESubscribeError) rs + where + -- treating host errors as temporary here as well + hasTempErrors = any (either temporaryOrHostError (const False) . snd) + toNESubscribeError = BROKER (B.unpack $ strEncode srv) . NETWORK . NESubscribeError . show + resubscribe = resubscribeSMPSession c tSess + +-- only "checked" queues are subscribed +checkQueues :: AgentClient -> [RcvQueueSub] -> AM' ([(RcvQueueSub, AgentErrorType)], [RcvQueueSub]) +checkQueues c = fmap partitionEithers . mapM checkQueue + where + checkQueue rq = do + prohibited <- liftIO $ hasGetLock c rq + pure $ if prohibited then Left (rq, CMD PROHIBITED "checkQueues") else Right rq + +-- This function expects that all queues belong to one transport session, +-- and that they are already added to pending subscriptions. +resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [RcvQueueSub] -> AM' () +resubscribeSessQueues c tSess qs = do + (errs, qs_) <- checkQueues c qs + forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c (tSess, qs') True + unless (null errs) $ notifySub c "" $ ERRS $ map (first qConnId) errs + +subscribeSessQueues_ :: AgentClient -> (SMPTransportSession, NonEmpty RcvQueueSub) -> Bool -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool) +subscribeSessQueues_ c qs withEvents = sendClientBatch_ "SUB" False subscribeQueues_ c NRMBackground qs + where + subscribeQueues_ :: SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId), Bool) + subscribeQueues_ smp qs' = do + let (userId, srv, _) = tSess + atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs' + rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs' + cs_ <- + if withEvents + then Just . S.fromList . map qConnId . M.elems <$> atomically (SS.getActiveSubs tSess $ currentSubs c) + else pure Nothing + active <- + atomically $ + ifM + (activeClientSession c tSess sessId) + (processSubResults rs $> True) + (incSMPServerStat' c userId srv connSubIgnored (length rs) $> False) + forM_ cs_ $ \cs -> do + let (errs, okConns) = partitionEithers $ map (\(RcvQueueSub {connId}, r) -> bimap (connId,) (const connId) r) $ L.toList rs + conns = filter (`S.notMember` cs) okConns + unless (null conns) $ notifySub c "" $ UP srv conns + let (tempErrs, finalErrs) = partition (temporaryClientError . snd) errs + mapM_ (\(connId, e) -> notifySub c connId $ ERR $ protocolClientError SMP (clientServer smp) e) finalErrs + forM_ (listToMaybe tempErrs) $ \(connId, e) -> do + when (null okConns && S.null cs && null finalErrs && active) $ liftIO $ do + -- We only close the client session that was used to subscribe. + v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing) + mapM_ (closeClient_ c) v_ + notifySub c connId $ ERR $ protocolClientError SMP (clientServer smp) e + pure (rs, active) where tSess = transportSession' smp sessId = sessionId $ thParams smp - hasTempErrors = any (either temporaryClientError (const False) . snd) processSubResults :: NonEmpty (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> STM () processSubResults = mapM_ $ uncurry $ processSubResult c sessId - resubscribe = resubscribeSMPSession c tSess `runReaderT` env activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool activeClientSession c tSess sessId = sameSess <$> tryReadSessVar tSess (smpClients c) @@ -1553,26 +1582,30 @@ type BatchResponses q e r = NonEmpty (q, Either e r) -- Please note: this function does not preserve order of results to be the same as the order of arguments, -- it includes arguments in the results instead. sendTSessionBatches :: forall q r. ByteString -> (q -> TransportSessionMode -> SMPTransportSession) -> (SMPClient -> NonEmpty q -> IO (BatchResponses q SMPClientError r)) -> AgentClient -> NetworkRequestMode -> [q] -> AM' [(q, Either AgentErrorType r)] -sendTSessionBatches statCmd mkSession action c nm qs = - concatMap L.toList <$> (mapConcurrently sendClientBatch =<< batchQueues) +sendTSessionBatches statCmd mkSession action c nm qs = do + qs' <- batchQueues mkSession c qs <$> getSessionModeIO c + concatMap L.toList <$> mapConcurrently (sendClientBatch statCmd action c nm) qs' + +batchQueues :: (q -> TransportSessionMode -> SMPTransportSession) -> AgentClient -> [q] -> TransportSessionMode -> [(SMPTransportSession, NonEmpty q)] +batchQueues mkSession c qs mode = M.assocs $ foldr batch M.empty qs where - batchQueues :: AM' [(SMPTransportSession, NonEmpty q)] - batchQueues = do - mode <- getSessionMode c - pure . M.assocs $ foldr (batch mode) M.empty qs + batch q m = + let tSess = mkSession q mode + in M.alter (Just . maybe [q] (q <|)) tSess m + +sendClientBatch :: ByteString -> (SMPClient -> NonEmpty q -> IO (BatchResponses q SMPClientError r)) -> AgentClient -> NetworkRequestMode -> (SMPTransportSession, NonEmpty q) -> AM' (BatchResponses q AgentErrorType r) +sendClientBatch statCmd action = fmap fst .:. sendClientBatch_ statCmd () (fmap (,()) .: action) +{-# INLINE sendClientBatch #-} + +sendClientBatch_ :: ByteString -> res -> (SMPClient -> NonEmpty q -> IO (BatchResponses q SMPClientError r, res)) -> AgentClient -> NetworkRequestMode -> (SMPTransportSession, NonEmpty q) -> AM' (BatchResponses q AgentErrorType r, res) +sendClientBatch_ statCmd errRes action c nm (tSess@(_, srv, _), qs') = + tryAllErrors' (getSMPServerClient c nm tSess) >>= \case + Left e -> pure (L.map (,Left e) qs', errRes) + Right (SMPConnectedClient smp _) -> liftIO $ do + logServer' "-->" c srv (bshow (length qs') <> " queues") statCmd + first (L.map agentError) <$> action smp qs' where - batch mode q m = - let tSess = mkSession q mode - in M.alter (Just . maybe [q] (q <|)) tSess m - sendClientBatch :: (SMPTransportSession, NonEmpty q) -> AM' (BatchResponses q AgentErrorType r) - sendClientBatch (tSess@(_, srv, _), qs') = - tryAllErrors' (getSMPServerClient c nm tSess) >>= \case - Left e -> pure $ L.map (,Left e) qs' - Right (SMPConnectedClient smp _) -> liftIO $ do - logServer' "-->" c srv (bshow (length qs') <> " queues") statCmd - L.map agentError <$> action smp qs' - where - agentError = second . first $ protocolClientError SMP $ clientServer smp + agentError = second . first $ protocolClientError SMP $ clientServer smp sendBatch :: SomeRcvQueue q => (SMPClient -> NetworkRequestMode -> NonEmpty (SMP.RecipientId, SMP.RcvPrivateAuthKey) -> IO (NonEmpty (Either SMPClientError a))) -> SMPClient -> NetworkRequestMode -> NonEmpty q -> IO (BatchResponses q SMPClientError a) sendBatch smpCmdFunc smp nm qs = L.zip qs <$> smpCmdFunc smp nm (L.map queueCreds qs) @@ -1580,20 +1613,22 @@ sendBatch smpCmdFunc smp nm qs = L.zip qs <$> smpCmdFunc smp nm (L.map queueCred queueCreds q = (queueId q, rcvAuthKey q) addSubscription :: AgentClient -> SessionId -> RcvQueueSub -> STM () -addSubscription c sessId rq@RcvQueueSub {connId} = do - modifyTVar' (subscrConns c) $ S.insert connId - RQ.addSessQueue (sessId, rq) $ activeSubs c - RQ.deleteQueue rq $ pendingSubs c +addSubscription c sessId rq = do + modifyTVar' (subscrConns c) $ S.insert $ qConnId rq + tSess <- mkSMPTransportSession c rq + SS.addActiveSub sessId rq tSess $ currentSubs c failSubscription :: SomeRcvQueue q => AgentClient -> q -> SMPClientError -> STM () failSubscription c rq e = do - RQ.deleteQueue rq (pendingSubs c) - TM.insert (RQ.qKey rq) e (removedSubs c) + TM.insert (qUserId rq, qServer rq, queueId rq) e (removedSubs c) + tSess <- mkSMPTransportSession c rq + SS.deletePendingSub (queueId rq) tSess $ currentSubs c addPendingSubscription :: AgentClient -> RcvQueueSub -> STM () -addPendingSubscription c rq@RcvQueueSub {connId} = do - modifyTVar' (subscrConns c) $ S.insert connId - RQ.addQueue rq $ pendingSubs c +addPendingSubscription c rq = do + modifyTVar' (subscrConns c) $ S.insert $ qConnId rq + tSess <- mkSMPTransportSession c rq + SS.addPendingSub rq tSess $ currentSubs c addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> AM' () addNewQueueSubscription c rq' tSess sessId = do @@ -1607,24 +1642,28 @@ addNewQueueSubscription c rq' tSess sessId = do unless same $ resubscribeSMPSession c tSess hasActiveSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool -hasActiveSubscription c rq = RQ.hasQueue rq $ activeSubs c +hasActiveSubscription c rq = do + tSess <- mkSMPTransportSession c rq + SS.hasActiveSub (queueId rq) tSess $ currentSubs c {-# INLINE hasActiveSubscription #-} hasPendingSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool -hasPendingSubscription c rq = RQ.hasQueue rq $ pendingSubs c +hasPendingSubscription c rq = do + tSess <- mkSMPTransportSession c rq + SS.hasPendingSub (queueId rq) tSess $ currentSubs c {-# INLINE hasPendingSubscription #-} removeSubscription :: SomeRcvQueue q => AgentClient -> ConnId -> q -> STM () removeSubscription c connId rq = do modifyTVar' (subscrConns c) $ S.delete connId - RQ.deleteQueue rq $ activeSubs c - RQ.deleteQueue rq $ pendingSubs c + tSess <- mkSMPTransportSession c rq + SS.deleteSub (queueId rq) tSess $ currentSubs c removeSubscriptions :: SomeRcvQueue q => AgentClient -> [ConnId] -> [q] -> STM () -removeSubscriptions c connIds rqs = do +removeSubscriptions c connIds qs = do unless (null connIds) $ modifyTVar' (subscrConns c) (`S.difference` (S.fromList connIds)) - RQ.batchDeleteQueues rqs $ activeSubs c - RQ.batchDeleteQueues rqs $ pendingSubs c + qss <- batchQueues mkSMPTSession c qs <$> getSessionMode c + forM_ qss $ \(tSess, qs') -> SS.batchDeleteSubs (L.toList qs') tSess $ currentSubs c getSubscriptions :: AgentClient -> IO (Set ConnId) getSubscriptions = readTVarIO . subscrConns @@ -1782,6 +1821,13 @@ releaseGetLock c rq = TM.lookup (qServer rq, queueId rq) (getMsgLocks c) >>= mapM_ (`tryPutTMVar` ()) {-# INLINE releaseGetLock #-} +releaseGetLocksIO :: SomeRcvQueue q => AgentClient -> [q] -> IO () +releaseGetLocksIO c rqs = do + locks <- readTVarIO $ getMsgLocks c + forM_ rqs $ \rq -> + forM_ (M.lookup ((qServer rq, queueId rq)) locks) $ \lock -> + atomically $ tryPutTMVar lock () + suspendQueue :: AgentClient -> NetworkRequestMode -> RcvQueue -> AM () suspendQueue c nm rq@RcvQueue {rcvId, rcvPrivateKey} = withSMPClient c nm rq "OFF" $ \smp -> @@ -2321,15 +2367,16 @@ data ServerSessions = ServerSessions getAgentSubsTotal :: AgentClient -> [UserId] -> IO (SMPServerSubs, Bool) getAgentSubsTotal c userIds = do - ssActive <- getSubsCount activeSubs - ssPending <- getSubsCount pendingSubs + (ssActive, ssPending) <- SS.foldSessionSubs addSub (0, 0) $ currentSubs c sess <- hasSession . M.toList =<< readTVarIO (smpClients c) pure (SMPServerSubs {ssActive, ssPending}, sess) where - getSubsCount :: (AgentClient -> TRcvQueues q) -> IO Int - getSubsCount subs = M.foldrWithKey' addSub 0 <$> readTVarIO (getRcvQueues $ subs c) - addSub :: (UserId, SMPServer, SMP.RecipientId) -> q -> Int -> Int - addSub (userId, _, _) _ cnt = if userId `elem` userIds then cnt + 1 else cnt + addSub :: (Int, Int) -> (SMPTransportSession, SS.SessSubs) -> IO (Int, Int) + addSub acc@(!ssActive, !ssPending) ((userId, _, _), s) + | userId `elem` userIds = do + (active, pending) <- SS.mapSubs M.size s + pure (ssActive + active, ssPending + pending) + | otherwise = pure acc hasSession :: [(SMPTransportSession, SMPClientVar)] -> IO Bool hasSession = \case [] -> pure False @@ -2366,13 +2413,12 @@ getAgentServersSummary c@AgentClient {smpServersStats, xftpServersStats, ntfServ ntfServersSessions } where - getServerSubs = do - subs <- M.foldrWithKey' (addSub incActive) M.empty <$> readTVarIO (getRcvQueues $ activeSubs c) - M.foldrWithKey' (addSub incPending) subs <$> readTVarIO (getRcvQueues $ pendingSubs c) + getServerSubs = SS.foldSessionSubs addSub M.empty $ currentSubs c where - addSub f (userId, srv, _) _ = M.alter (Just . f . fromMaybe SMPServerSubs {ssActive = 0, ssPending = 0}) (userId, srv) - incActive ss = ss {ssActive = ssActive ss + 1} - incPending ss = ss {ssPending = ssPending ss + 1} + addSub subs ((userId, srv, _), s) = do + (active, pending) <- SS.mapSubs M.size s + let add ss = ss {ssActive = ssActive ss + active, ssPending = ssPending ss + pending} + pure $ M.alter (Just . add . fromMaybe (SMPServerSubs 0 0)) (userId, srv) subs Env {xftpAgent = XFTPAgent {xftpRcvWorkers, xftpSndWorkers, xftpDelWorkers}} = agentEnv getXFTPWorkerSrvs workers = foldM addSrv [] . M.toList =<< readTVarIO workers where @@ -2404,13 +2450,14 @@ data SubscriptionsInfo = SubscriptionsInfo getAgentSubscriptions :: AgentClient -> IO SubscriptionsInfo getAgentSubscriptions c = do - activeSubscriptions <- getSubs activeSubs - pendingSubscriptions <- getSubs pendingSubs + (activeSubscriptions, pendingSubscriptions) <- SS.foldSessionSubs addSubs ([], []) $ currentSubs c removedSubscriptions <- getRemovedSubs pure $ SubscriptionsInfo {activeSubscriptions, pendingSubscriptions, removedSubscriptions} where - getSubs :: (AgentClient -> TRcvQueues q) -> IO [SubInfo] - getSubs sel = map (`subInfo` Nothing) . M.keys <$> readTVarIO (getRcvQueues $ sel c) + addSubs :: ([SubInfo], [SubInfo]) -> (SMPTransportSession, SS.SessSubs) -> IO ([SubInfo], [SubInfo]) + addSubs (active, pending) ((userId, srv, _), s) = do + (active', pending') <- SS.mapSubs (map (\rId -> subInfo (userId, srv, rId) Nothing) . M.keys) s + pure (active' ++ active, pending' ++ pending) getRemovedSubs = map (uncurry subInfo . second Just) . M.assocs <$> readTVarIO (removedSubs c) subInfo :: (UserId, SMPServer, SMP.RecipientId) -> Maybe SMPClientError -> SubInfo subInfo (uId, srv, rId) err = SubInfo {userId = uId, server = enc srv, rcvId = enc rId, subError = show <$> err} diff --git a/src/Simplex/Messaging/Agent/TRcvQueues.hs b/src/Simplex/Messaging/Agent/TRcvQueues.hs deleted file mode 100644 index cb7d220d4..000000000 --- a/src/Simplex/Messaging/Agent/TRcvQueues.hs +++ /dev/null @@ -1,104 +0,0 @@ -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE LambdaCase #-} - -module Simplex.Messaging.Agent.TRcvQueues - ( TRcvQueues (getRcvQueues), - empty, - clear, - hasQueue, - addQueue, - addSessQueue, - batchAddQueues, - deleteQueue, - batchDeleteQueues, - hasSessQueues, - getSessQueues, - getSessConns, - getDelSessQueues, - qKey, - ) -where - -import Control.Concurrent.STM -import Data.Foldable (foldl') -import qualified Data.Map.Strict as M -import qualified Data.Set as S -import Simplex.Messaging.Agent.Protocol (ConnId, SMPQueue (..), UserId) -import Simplex.Messaging.Agent.Store (RcvQueueSub (..), SMPQueueRec (..), SomeRcvQueue) -import Simplex.Messaging.Protocol (QueueId, RecipientId, SMPServer) -import Simplex.Messaging.TMap (TMap) -import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Transport - --- the fields in this record have the same data with swapped keys for lookup efficiency, --- and all methods must maintain this invariant. -data TRcvQueues q = TRcvQueues - { getRcvQueues :: TMap (UserId, SMPServer, RecipientId) q - } - -empty :: IO (TRcvQueues q) -empty = TRcvQueues <$> TM.emptyIO - -clear :: TRcvQueues q -> STM () -clear (TRcvQueues qs) = TM.clear qs - -hasQueue :: SomeRcvQueue q => q -> TRcvQueues q' -> STM Bool -hasQueue rq (TRcvQueues qs) = TM.member (qKey rq) qs - -addQueue :: RcvQueueSub -> TRcvQueues RcvQueueSub -> STM () -addQueue rq = addQueue_ rq rq -{-# INLINE addQueue #-} - -addSessQueue :: (SessionId, RcvQueueSub) -> TRcvQueues (SessionId, RcvQueueSub) -> STM () -addSessQueue q@(_, rq) = addQueue_ rq q -{-# INLINE addSessQueue #-} - -addQueue_ :: RcvQueueSub -> q -> TRcvQueues q -> STM () -addQueue_ rq q (TRcvQueues qs) = TM.insert (qKey rq) q qs -{-# INLINE addQueue_ #-} - --- Save time by aggregating modifyTVar' -batchAddQueues :: [RcvQueueSub] -> TRcvQueues RcvQueueSub -> STM () -batchAddQueues rqs (TRcvQueues qs) = - modifyTVar' qs $ \m -> foldl' (\rqs' rq -> M.insert (qKey rq) rq rqs') m rqs - -deleteQueue :: SomeRcvQueue q => q -> TRcvQueues q' -> STM () -deleteQueue rq (TRcvQueues qs) = TM.delete (qKey rq) qs -{-# INLINE deleteQueue #-} - -batchDeleteQueues :: SomeRcvQueue q => [q] -> TRcvQueues q' -> STM () -batchDeleteQueues rqs (TRcvQueues qs) = - modifyTVar' qs $ \m -> foldl' (\rqs' rq -> M.delete (qKey rq) rqs') m rqs - -hasSessQueues :: (UserId, SMPServer, Maybe ConnId) -> TRcvQueues RcvQueueSub -> STM Bool -hasSessQueues tSess (TRcvQueues qs) = any (`isSession` tSess) <$> readTVar qs - -getSessQueues :: (UserId, SMPServer, Maybe ConnId) -> TRcvQueues RcvQueueSub -> IO [RcvQueueSub] -getSessQueues tSess (TRcvQueues qs) = M.foldl' addQ [] <$> readTVarIO qs - where - addQ qs' rq = if rq `isSession` tSess then rq : qs' else qs' - -getSessConns :: (UserId, SMPServer, Maybe ConnId) -> TRcvQueues (SessionId, RcvQueueSub) -> IO (S.Set ConnId) -getSessConns tSess (TRcvQueues qs) = M.foldl' addConn S.empty <$> readTVarIO qs - where - addConn cIds (_, rq) = if rq `isSession` tSess then S.insert (connId rq) cIds else cIds - -getDelSessQueues :: (UserId, SMPServer, Maybe ConnId) -> SessionId -> TRcvQueues (SessionId, RcvQueueSub) -> STM ([RcvQueueSub], [ConnId]) -getDelSessQueues tSess sessId' (TRcvQueues qs) = do - (removedQs, removedConns, qs'') <- (\qs' -> M.foldl' delQ ([], S.empty, qs') qs') <$> readTVar qs - writeTVar qs $! qs'' - let removedConns' = S.toList $ removedConns `S.difference` queueConns qs'' - pure (removedQs, removedConns') - where - delQ acc@(removed, cIds, qs') (sessId, rq) - | rq `isSession` tSess && sessId == sessId' = (rq : removed, S.insert (connId rq) cIds, M.delete (qKey rq) qs') - | otherwise = acc - queueConns = M.foldl' (\cIds (_, rq) -> S.insert (connId rq) cIds) S.empty - -isSession :: RcvQueueSub -> (UserId, SMPServer, Maybe ConnId) -> Bool -isSession rq (uId, srv, connId_) = - userId rq == uId && server rq == srv && maybe True (connId rq ==) connId_ - -qKey :: SomeRcvQueue q => q -> (UserId, SMPServer, QueueId) -qKey rq = (qUserId rq, qServer rq, queueId rq) -{-# INLINE qKey #-} diff --git a/src/Simplex/Messaging/Agent/TSessionSubs.hs b/src/Simplex/Messaging/Agent/TSessionSubs.hs new file mode 100644 index 000000000..f1ebf7f5e --- /dev/null +++ b/src/Simplex/Messaging/Agent/TSessionSubs.hs @@ -0,0 +1,174 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} + +module Simplex.Messaging.Agent.TSessionSubs + ( TSessionSubs (sessionSubs), + SessSubs (..), + emptyIO, + clear, + hasActiveSub, + hasPendingSub, + addPendingSub, + setSessionId, + addActiveSub, + batchAddPendingSubs, + deletePendingSub, + deleteSub, + batchDeleteSubs, + hasPendingSubs, + getPendingSubs, + getActiveSubs, + setSubsPending, + foldSessionSubs, + mapSubs, + ) +where + +import Control.Concurrent.STM +import Control.Monad +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as M +import Data.Maybe (isJust) +import qualified Data.Set as S +import Simplex.Messaging.Agent.Protocol (SMPQueue (..)) +import Simplex.Messaging.Agent.Store (RcvQueueSub (..), SomeRcvQueue) +import Simplex.Messaging.Client (SMPTransportSession, TransportSessionMode (..)) +import Simplex.Messaging.Protocol (RecipientId) +import Simplex.Messaging.TMap (TMap) +import qualified Simplex.Messaging.TMap as TM +import Simplex.Messaging.Transport +import Simplex.Messaging.Util (($>>=)) + +data TSessionSubs = TSessionSubs + { sessionSubs :: TMap SMPTransportSession SessSubs + } + +data SessSubs = SessSubs + { subsSessId :: TVar (Maybe SessionId), + activeSubs :: TMap RecipientId RcvQueueSub, + pendingSubs :: TMap RecipientId RcvQueueSub + } + +emptyIO :: IO TSessionSubs +emptyIO = TSessionSubs <$> TM.emptyIO +{-# INLINE emptyIO #-} + +clear :: TSessionSubs -> STM () +clear = TM.clear . sessionSubs +{-# INLINE clear #-} + +lookupSubs :: SMPTransportSession -> TSessionSubs -> STM (Maybe SessSubs) +lookupSubs tSess = TM.lookup tSess . sessionSubs +{-# INLINE lookupSubs #-} + +getSessSubs :: SMPTransportSession -> TSessionSubs -> STM SessSubs +getSessSubs tSess ss = lookupSubs tSess ss >>= maybe new pure + where + new = do + s <- SessSubs <$> newTVar Nothing <*> newTVar M.empty <*> newTVar M.empty + TM.insert tSess s $ sessionSubs ss + pure s + +hasActiveSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool +hasActiveSub = hasQueue_ activeSubs +{-# INLINE hasActiveSub #-} + +hasPendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool +hasPendingSub = hasQueue_ pendingSubs +{-# INLINE hasPendingSub #-} + +hasQueue_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool +hasQueue_ subs rId tSess ss = isJust <$> (lookupSubs tSess ss $>>= TM.lookup rId . subs) +{-# INLINE hasQueue_ #-} + +addPendingSub :: RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM () +addPendingSub rq tSess ss = getSessSubs tSess ss >>= TM.insert (rcvId rq) rq . pendingSubs + +setSessionId :: SessionId -> SMPTransportSession -> TSessionSubs -> STM () +setSessionId sessId tSess ss = do + s <- getSessSubs tSess ss + readTVar (subsSessId s) >>= \case + Nothing -> writeTVar (subsSessId s) (Just sessId) + Just sessId' -> unless (sessId == sessId') $ void $ setSubsPending_ s $ Just sessId + +addActiveSub :: SessionId -> RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM () +addActiveSub sessId rq tSess ss = do + s <- getSessSubs tSess ss + sessId' <- readTVar $ subsSessId s + let rId = rcvId rq + if Just sessId == sessId' + then do + TM.insert rId rq $ activeSubs s + TM.delete rId $ pendingSubs s + else TM.insert rId rq $ pendingSubs s + +batchAddPendingSubs :: [RcvQueueSub] -> SMPTransportSession -> TSessionSubs -> STM () +batchAddPendingSubs rqs tSess ss = do + s <- getSessSubs tSess ss + modifyTVar' (pendingSubs s) $ M.union $ M.fromList $ map (\rq -> (rcvId rq, rq)) rqs + +deletePendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM () +deletePendingSub rId tSess = lookupSubs tSess >=> mapM_ (TM.delete rId . pendingSubs) + +deleteSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM () +deleteSub rId tSess = lookupSubs tSess >=> mapM_ (\s -> TM.delete rId (activeSubs s) >> TM.delete rId (pendingSubs s)) + +batchDeleteSubs :: SomeRcvQueue q => [q] -> SMPTransportSession -> TSessionSubs -> STM () +batchDeleteSubs rqs tSess = lookupSubs tSess >=> mapM_ (\s -> delete (activeSubs s) >> delete (pendingSubs s)) + where + rIds = S.fromList $ map queueId rqs + delete = (`modifyTVar'` (`M.withoutKeys` rIds)) + +hasPendingSubs :: SMPTransportSession -> TSessionSubs -> STM Bool +hasPendingSubs tSess = lookupSubs tSess >=> maybe (pure False) (fmap (not . null) . readTVar . pendingSubs) + +getPendingSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) +getPendingSubs = getSubs_ pendingSubs +{-# INLINE getPendingSubs #-} + +getActiveSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) +getActiveSubs = getSubs_ activeSubs +{-# INLINE getActiveSubs #-} + +getSubs_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) +getSubs_ subs tSess = lookupSubs tSess >=> maybe (pure M.empty) (readTVar . subs) + +setSubsPending :: TransportSessionMode -> SMPTransportSession -> SessionId -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) +setSubsPending mode tSess@(uId, srv, connId_) sessId tss@(TSessionSubs ss) + | entitySession == isJust connId_ = + TM.lookup tSess ss >>= withSessSubs (`setSubsPending_` Nothing) + | otherwise = + TM.lookupDelete tSess ss >>= withSessSubs setPendingChangeMode + where + entitySession = mode == TSMEntity + sessEntId = if entitySession then Just else const Nothing + withSessSubs run = \case + Nothing -> pure M.empty + Just s -> do + sessId' <- readTVar $ subsSessId s + if Just sessId == sessId' then run s else pure M.empty + setPendingChangeMode s = do + subs <- M.union <$> readTVar (activeSubs s) <*> readTVar (pendingSubs s) + unless (null subs) $ + forM_ subs $ \rq -> addPendingSub rq (uId, srv, sessEntId (connId rq)) tss + pure subs + +setSubsPending_ :: SessSubs -> Maybe SessionId -> STM (Map RecipientId RcvQueueSub) +setSubsPending_ s sessId_ = do + writeTVar (subsSessId s) sessId_ + let as = activeSubs s + subs <- readTVar as + unless (null subs) $ do + writeTVar as M.empty + modifyTVar' (pendingSubs s) $ M.union subs + pure subs + +foldSessionSubs :: (a -> (SMPTransportSession, SessSubs) -> IO a) -> a -> TSessionSubs -> IO a +foldSessionSubs f a = foldM f a . M.assocs <=< readTVarIO . sessionSubs + +mapSubs :: (Map RecipientId RcvQueueSub -> a) -> SessSubs -> IO (a, a) +mapSubs f s = do + active <- readTVarIO $ activeSubs s + pending <- readTVarIO $ pendingSubs s + pure (f active, f pending) diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index a2b884c9e..f78eb8e05 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -29,6 +29,7 @@ module Simplex.Messaging.Client ( -- * Connect (disconnect) client to (from) SMP server TransportSession, + SMPTransportSession, ProtocolClient (thParams, sessionTs), SMPClient, ProxiedRelay (..), @@ -549,6 +550,8 @@ type UserId = Int64 -- Please note that for SMP connection ID is used as entity ID, not queue ID. type TransportSession msg = (UserId, ProtoServer msg, Maybe ByteString) +type SMPTransportSession = TransportSession BrokerMsg + -- | Connects to 'ProtocolServer' using passed client configuration -- and queue for messages and notifications. -- diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index c00d78aba..8a4d2bf58 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -3569,6 +3569,7 @@ testTwoUsers = withAgentClients2 $ \a b -> do liftIO $ threadDelay 250000 ("", "", DOWN _ _) <- nGet a ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGet a a `hasClients` 2 exchangeGreetingsMsgId 4 a bId1 b aId1 @@ -3595,6 +3596,8 @@ testTwoUsers = withAgentClients2 $ \a b -> do ("", "", DOWN _ _) <- nGet a ("", "", UP _ _) <- nGet a ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGet a a `hasClients` 4 exchangeGreetingsMsgId 6 a bId1 b aId1 exchangeGreetingsMsgId 6 a bId1' b aId1' diff --git a/tests/CoreTests/TRcvQueuesTests.hs b/tests/CoreTests/TRcvQueuesTests.hs deleted file mode 100644 index bc350e788..000000000 --- a/tests/CoreTests/TRcvQueuesTests.hs +++ /dev/null @@ -1,212 +0,0 @@ -{-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE PatternSynonyms #-} -{-# LANGUAGE TupleSections #-} -{-# LANGUAGE TypeApplications #-} -{-# OPTIONS_GHC -Wno-orphans #-} - -module CoreTests.TRcvQueuesTests where - -import AgentTests.EqInstances () -import qualified Data.ByteString.Char8 as B -import qualified Data.Map as M -import qualified Data.Set as S -import Data.String (IsString (..)) -import Simplex.Messaging.Agent.Protocol (ConnId, QueueStatus (..), UserId) -import Simplex.Messaging.Agent.Store (RcvQueueSub (..)) -import qualified Simplex.Messaging.Agent.TRcvQueues as RQ -import qualified Simplex.Messaging.Crypto as C -import Simplex.Messaging.Protocol (EntityId (..), RecipientId, SMPServer) -import Simplex.Messaging.Transport (SessionId) -import Test.Hspec hiding (fit, it) -import UnliftIO -import Util - -tRcvQueuesTests :: Spec -tRcvQueuesTests = do - describe "connection API" $ do - it "hasConn" hasConnTest - it "hasConn, batch add" hasConnTestBatch - it "hasConn, batch idempotent" batchIdempotentTest - it "deleteQueue" deleteQueueTest - describe "session API" $ do - it "getSessQueues" getSessQueuesTest - it "getDelSessQueues" getDelSessQueuesTest - describe "queue transfer" $ do - it "getDelSessQueues-batchAddQueues preserves total length" removeSubsTest - -instance IsString EntityId where fromString = EntityId . B.pack - -checkDataInvariant' :: RQ.TRcvQueues (SessionId, RcvQueueSub) -> IO Bool -checkDataInvariant' = checkDataInvariant_ snd - -checkDataInvariant :: RQ.TRcvQueues RcvQueueSub -> IO Bool -checkDataInvariant = checkDataInvariant_ id - -checkDataInvariant_ :: (q -> RcvQueueSub) -> RQ.TRcvQueues q -> IO Bool -checkDataInvariant_ toRQ trq = atomically $ do - qs <- readTVar $ RQ.getRcvQueues trq - let inv3 = all (\(k, q) -> RQ.qKey (toRQ q) == k) (M.assocs qs) - pure inv3 - -hasConnTest :: IO () -hasConnTest = do - trq <- RQ.empty - let q1 = dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1" - q2 = dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2" - q3 = dummyRQ 0 "smp://1234-w==@beta" "c3" "r3" - atomically $ RQ.addQueue q1 trq - checkDataInvariant trq `shouldReturn` True - atomically $ RQ.addQueue q2 trq - checkDataInvariant trq `shouldReturn` True - atomically $ RQ.addQueue q3 trq - checkDataInvariant trq `shouldReturn` True - atomically (RQ.hasQueue q1 trq) `shouldReturn` True - atomically (RQ.hasQueue q2 trq) `shouldReturn` True - atomically (RQ.hasQueue q3 trq) `shouldReturn` True - atomically (RQ.hasQueue (dummyRQ 0 "smp://1234-w==@alpha" "c4" "nope") trq) `shouldReturn` False - -hasConnTestBatch :: IO () -hasConnTestBatch = do - trq <- RQ.empty - let q1 = dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1" - q2 = dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2" - q3 = dummyRQ 0 "smp://1234-w==@beta" "c3" "r3" - let qs = [q1, q2, q3] - atomically $ RQ.batchAddQueues qs trq - checkDataInvariant trq `shouldReturn` True - atomically (RQ.hasQueue q1 trq) `shouldReturn` True - atomically (RQ.hasQueue q2 trq) `shouldReturn` True - atomically (RQ.hasQueue q3 trq) `shouldReturn` True - atomically (RQ.hasQueue (dummyRQ 0 "smp://1234-w==@alpha" "c4" "nope") trq) `shouldReturn` False - -batchIdempotentTest :: IO () -batchIdempotentTest = do - trq <- RQ.empty - let qs = [dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1", dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2", dummyRQ 0 "smp://1234-w==@beta" "c3" "r3"] - atomically $ RQ.batchAddQueues qs trq - checkDataInvariant trq `shouldReturn` True - qs' <- readTVarIO $ RQ.getRcvQueues trq - atomically $ RQ.batchAddQueues qs trq - checkDataInvariant trq `shouldReturn` True - readTVarIO (RQ.getRcvQueues trq) `shouldReturn` qs' - -deleteQueueTest :: IO () -deleteQueueTest = do - trq <- RQ.empty - let q1 = dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1" - atomically $ do - RQ.addQueue q1 trq - RQ.addQueue (dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2") trq - RQ.addQueue (dummyRQ 0 "smp://1234-w==@beta" "c3" "r3") trq - checkDataInvariant trq `shouldReturn` True - atomically $ RQ.deleteQueue q1 trq - checkDataInvariant trq `shouldReturn` True - atomically $ RQ.deleteQueue (dummyRQ 0 "smp://1234-w==@alpha" "c4" "nope") trq - checkDataInvariant trq `shouldReturn` True - -getSessQueuesTest :: IO () -getSessQueuesTest = do - trq <- RQ.empty - atomically $ RQ.addQueue (dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1") trq - checkDataInvariant trq `shouldReturn` True - atomically $ RQ.addQueue (dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2") trq - checkDataInvariant trq `shouldReturn` True - atomically $ RQ.addQueue (dummyRQ 0 "smp://1234-w==@beta" "c3" "r3") trq - checkDataInvariant trq `shouldReturn` True - atomically $ RQ.addQueue (dummyRQ 1 "smp://1234-w==@beta" "c4" "r4") trq - checkDataInvariant trq `shouldReturn` True - let tSess1 = (0, "smp://1234-w==@alpha", Just "c1") - RQ.getSessQueues tSess1 trq `shouldReturn` [dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1"] - atomically (RQ.hasSessQueues tSess1 trq) `shouldReturn` True - let tSess2 = (1, "smp://1234-w==@alpha", Just "c1") - RQ.getSessQueues tSess2 trq `shouldReturn` [] - atomically (RQ.hasSessQueues tSess2 trq) `shouldReturn` False - let tSess3 = (0, "smp://1234-w==@alpha", Just "nope") - RQ.getSessQueues tSess3 trq `shouldReturn` [] - atomically (RQ.hasSessQueues tSess3 trq) `shouldReturn` False - let tSess4 = (0, "smp://1234-w==@alpha", Nothing) - RQ.getSessQueues tSess4 trq `shouldReturn` [dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2", dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1"] - atomically (RQ.hasSessQueues tSess4 trq) `shouldReturn` True - -getDelSessQueuesTest :: IO () -getDelSessQueuesTest = do - trq <- RQ.empty - let q1 = dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1" - q2 = dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2" - q3 = dummyRQ 0 "smp://1234-w==@beta" "c3" "r3" - q4 = dummyRQ 1 "smp://1234-w==@beta" "c4" "r4" - qs = - [ ("1", q1), - ("1", q2), - ("1", q3), - ("1", q4) - ] - mapM_ (\q -> atomically $ RQ.addSessQueue q trq) qs - checkDataInvariant' trq `shouldReturn` True - -- no user - atomically (RQ.getDelSessQueues (2, "smp://1234-w==@alpha", Nothing) "1" trq) `shouldReturn` ([], []) - checkDataInvariant' trq `shouldReturn` True - -- wrong user - atomically (RQ.getDelSessQueues (1, "smp://1234-w==@alpha", Nothing) "1" trq) `shouldReturn` ([], []) - checkDataInvariant' trq `shouldReturn` True - -- connections intact - atomically (RQ.hasQueue q1 trq) `shouldReturn` True - atomically (RQ.hasQueue q2 trq) `shouldReturn` True - atomically (RQ.getDelSessQueues (0, "smp://1234-w==@alpha", Nothing) "1" trq) `shouldReturn` ([dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2", dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1"], ["c1", "c2"]) - checkDataInvariant' trq `shouldReturn` True - -- connections gone - atomically (RQ.hasQueue q1 trq) `shouldReturn` False - atomically (RQ.hasQueue q2 trq) `shouldReturn` False - -- non-matched connections intact - atomically (RQ.hasQueue q3 trq) `shouldReturn` True - atomically (RQ.hasQueue q4 trq) `shouldReturn` True - RQ.getSessConns (0, "smp://1234-w==@alpha", Nothing) trq `shouldReturn` S.fromList [] - RQ.getSessConns (0, "smp://1234-w==@beta", Nothing) trq `shouldReturn` S.fromList ["c3"] - RQ.getSessConns (1, "smp://1234-w==@beta", Nothing) trq `shouldReturn` S.fromList ["c4"] - -removeSubsTest :: IO () -removeSubsTest = do - aq <- RQ.empty - let qs = - [ ("1", dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1"), - ("1", dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2"), - ("1", dummyRQ 0 "smp://1234-w==@beta" "c3" "r3"), - ("1", dummyRQ 1 "smp://1234-w==@beta" "c4" "r4") - ] - mapM_ (\q -> atomically $ RQ.addSessQueue q aq) qs - - pq <- RQ.empty - atomically (totalSize aq pq) `shouldReturn` 4 - - atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@alpha", Nothing) "1" aq >>= (`RQ.batchAddQueues` pq) . fst - atomically (totalSize aq pq) `shouldReturn` 4 - - atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@beta", Just "non-existent") "1" aq >>= (`RQ.batchAddQueues` pq) . fst - atomically (totalSize aq pq) `shouldReturn` 4 - - atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@localhost", Nothing) "1" aq >>= (`RQ.batchAddQueues` pq) . fst - atomically (totalSize aq pq) `shouldReturn` 4 - - atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@beta", Just "c3") "1" aq >>= (`RQ.batchAddQueues` pq) . fst - atomically (totalSize aq pq) `shouldReturn` 4 - -totalSize :: RQ.TRcvQueues q -> RQ.TRcvQueues q' -> STM Int -totalSize a b = do - qsizeA <- M.size <$> readTVar (RQ.getRcvQueues a) - qsizeB <- M.size <$> readTVar (RQ.getRcvQueues b) - pure $ qsizeA + qsizeB - -dummyRQ :: UserId -> SMPServer -> ConnId -> RecipientId -> RcvQueueSub -dummyRQ userId server connId rcvId = - RcvQueueSub - { userId, - connId, - server, - rcvId, - rcvPrivateKey = C.APrivateAuthKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe", - status = New, - dbQueueId = 0, - primary = True, - dbReplaceQueueId = Nothing - } diff --git a/tests/CoreTests/TSessionSubs.hs b/tests/CoreTests/TSessionSubs.hs new file mode 100644 index 000000000..7e0025e09 --- /dev/null +++ b/tests/CoreTests/TSessionSubs.hs @@ -0,0 +1,131 @@ +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeApplications #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +module CoreTests.TSessionSubs where + +import AgentTests.EqInstances () +import Control.Monad +import qualified Data.ByteString.Char8 as B +import Data.List (foldl') +import qualified Data.Map as M +import Data.String (IsString (..)) +import Simplex.Messaging.Agent.Protocol (ConnId, QueueStatus (..), UserId) +import Simplex.Messaging.Agent.Store (RcvQueueSub (..)) +import qualified Simplex.Messaging.Agent.TSessionSubs as SS +import Simplex.Messaging.Client (SMPTransportSession, TransportSessionMode (..)) +import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Protocol (EntityId (..), RecipientId, SMPServer) +import Simplex.Messaging.Transport (SessionId) +import Test.Hspec hiding (fit, it) +import UnliftIO +import Util + +tSessionSubsTests :: Spec +tSessionSubsTests = it "subscription lifecycle" $ testSessionSubs + +instance IsString EntityId where fromString = EntityId . B.pack + +dumpSessionSubs :: SS.TSessionSubs -> IO (M.Map SMPTransportSession (Maybe SessionId, (M.Map RecipientId RcvQueueSub, M.Map RecipientId RcvQueueSub))) +dumpSessionSubs = + readTVarIO . SS.sessionSubs + >=> mapM (\s -> (,) <$> readTVarIO (SS.subsSessId s) <*> SS.mapSubs id s) + +srv1 :: SMPServer +srv1 = "smp://1234-w==@alpha" + +srv2 :: SMPServer +srv2 = "smp://1234-w==@beta" + +testSessionSubs :: IO () +testSessionSubs = do + ss <- SS.emptyIO + ss' <- SS.emptyIO + let q1 = dummyRQ 1 srv1 "c1" "r1" + q2 = dummyRQ 1 srv1 "c2" "r2" + q3 = dummyRQ 1 srv2 "c3" "r3" + q4 = dummyRQ 1 srv2 "c4" "r4" + tSess1 = (1, srv1, Nothing) + tSess2 = (1, srv2, Nothing) + atomically (SS.addPendingSub q1 tSess1 ss) + atomically (SS.addPendingSub q2 tSess1 ss) + atomically (SS.hasPendingSubs tSess1 ss) `shouldReturn` True + atomically (SS.hasPendingSubs tSess2 ss) `shouldReturn` False + atomically (SS.addPendingSub q3 tSess2 ss) + atomically (SS.hasPendingSubs tSess2 ss) `shouldReturn` True + atomically (SS.batchAddPendingSubs [q1, q2] tSess1 ss') + atomically (SS.batchAddPendingSubs [q3] tSess2 ss') + atomically (SS.getPendingSubs tSess1 ss) `shouldReturn` M.fromList [("r1", q1), ("r2", q2)] + atomically (SS.getActiveSubs tSess1 ss) `shouldReturn` M.fromList [] + atomically (SS.getPendingSubs tSess2 ss) `shouldReturn` M.fromList [("r3", q3)] + st <- dumpSessionSubs ss + dumpSessionSubs ss' `shouldReturn` st + countSubs ss `shouldReturn` (0, 3) + atomically (SS.hasPendingSub (rcvId q1) tSess1 ss) `shouldReturn` True + atomically (SS.hasActiveSub (rcvId q1) tSess1 ss) `shouldReturn` False + atomically (SS.hasPendingSub (rcvId q4) tSess1 ss) `shouldReturn` False + atomically (SS.hasActiveSub (rcvId q4) tSess1 ss) `shouldReturn` False + -- setting active queue without setting session ID would keep it as pending + atomically $ SS.addActiveSub "123" q1 tSess1 ss + atomically (SS.hasPendingSub (rcvId q1) tSess1 ss) `shouldReturn` True + atomically (SS.hasActiveSub (rcvId q1) tSess1 ss) `shouldReturn` False + dumpSessionSubs ss `shouldReturn` st + countSubs ss `shouldReturn` (0, 3) + -- setting active queues + atomically $ SS.setSessionId "123" tSess1 ss + atomically $ SS.addActiveSub "123" q1 tSess1 ss + atomically (SS.hasPendingSub (rcvId q1) tSess1 ss) `shouldReturn` False + atomically (SS.hasActiveSub (rcvId q1) tSess1 ss) `shouldReturn` True + atomically (SS.getActiveSubs tSess1 ss) `shouldReturn` M.fromList [("r1", q1)] + atomically (SS.getPendingSubs tSess1 ss) `shouldReturn` M.fromList [("r2", q2)] + countSubs ss `shouldReturn` (1, 2) + atomically $ SS.setSessionId "456" tSess2 ss + atomically $ SS.addActiveSub "456" q4 tSess2 ss + atomically (SS.hasPendingSub (rcvId q4) tSess2 ss) `shouldReturn` False + atomically (SS.hasActiveSub (rcvId q4) tSess2 ss) `shouldReturn` True + atomically (SS.hasActiveSub (rcvId q4) tSess1 ss) `shouldReturn` False -- wrong transport session + atomically (SS.getActiveSubs tSess2 ss) `shouldReturn` M.fromList [("r4", q4)] + atomically (SS.getPendingSubs tSess2 ss) `shouldReturn` M.fromList [("r3", q3)] + countSubs ss `shouldReturn` (2, 2) + -- setting pending queues + st' <- dumpSessionSubs ss + atomically (SS.setSubsPending TSMUser tSess1 "abc" ss) `shouldReturn` M.empty -- wrong session + dumpSessionSubs ss `shouldReturn` st' + atomically (SS.setSubsPending TSMUser tSess1 "123" ss) `shouldReturn` M.fromList [("r1", q1)] + atomically (SS.getActiveSubs tSess1 ss) `shouldReturn` M.fromList [] + atomically (SS.getPendingSubs tSess1 ss) `shouldReturn` M.fromList [("r1", q1), ("r2", q2)] + countSubs ss `shouldReturn` (1, 3) + -- delete subs + atomically $ SS.deletePendingSub (rcvId q1) tSess1 ss + atomically (SS.getPendingSubs tSess1 ss) `shouldReturn` M.fromList [("r2", q2)] + countSubs ss `shouldReturn` (1, 2) + atomically $ SS.deleteSub (rcvId q2) tSess1 ss + atomically (SS.getPendingSubs tSess1 ss) `shouldReturn` M.fromList [] + countSubs ss `shouldReturn` (1, 1) + atomically (SS.getActiveSubs tSess2 ss) `shouldReturn` M.fromList [("r4", q4)] + atomically $ SS.deleteSub (rcvId q4) tSess2 ss + atomically (SS.getActiveSubs tSess2 ss) `shouldReturn` M.fromList [] + countSubs ss `shouldReturn` (0, 1) + countSubs ss' `shouldReturn` (0, 3) + atomically $ SS.batchDeleteSubs [q1, q2] tSess1 ss' + countSubs ss' `shouldReturn` (0, 1) + +countSubs :: SS.TSessionSubs -> IO (Int, Int) +countSubs = fmap (foldl' (\(n1, n2) (_, (m1, m2)) -> (n1 + M.size m1, n2 + M.size m2)) (0, 0)) . dumpSessionSubs + +dummyRQ :: UserId -> SMPServer -> ConnId -> RecipientId -> RcvQueueSub +dummyRQ userId server connId rcvId = + RcvQueueSub + { userId, + connId, + server, + rcvId, + rcvPrivateKey = C.APrivateAuthKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe", + status = New, + dbQueueId = 0, + primary = True, + dbReplaceQueueId = Nothing + } diff --git a/tests/Test.hs b/tests/Test.hs index 364080e0c..0d20431c2 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -15,7 +15,7 @@ import CoreTests.MsgStoreTests import CoreTests.RetryIntervalTests import CoreTests.SOCKSSettings import CoreTests.StoreLogTests -import CoreTests.TRcvQueuesTests +import CoreTests.TSessionSubs import CoreTests.UtilTests import CoreTests.VersionRangeTests import FileDescriptionTests (fileDescriptionTests) @@ -90,7 +90,7 @@ main = do #else describe "Store log tests" storeLogTests #endif - describe "TRcvQueues tests" tRcvQueuesTests + describe "TSessionSubs tests" tSessionSubsTests describe "Util tests" utilTests describe "Agent core tests" agentCoreTests #if defined(dbServerPostgres) @@ -103,7 +103,7 @@ main = do testStoreDBOpts "src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql" around_ (postgressBracket testServerDBConnectInfo) $ do - describe "SMP server via TLS, postgres+jornal message store" $ + xdescribe "SMP server via TLS, postgres+jornal message store" $ before (pure (transport @TLS, ASType SQSPostgres SMSJournal)) serverTests describe "SMP server via TLS, postgres-only message store" $ before (pure (transport @TLS, ASType SQSPostgres SMSPostgres)) serverTests @@ -128,14 +128,14 @@ main = do describe "Notifications server (SMP server: jornal store)" $ ntfServerTests (transport @TLS, ASType SQSMemory SMSJournal) around_ (postgressBracket testServerDBConnectInfo) $ do - describe "Notifications server (SMP server: postgres+jornal store)" $ + xdescribe "Notifications server (SMP server: postgres+jornal store)" $ ntfServerTests (transport @TLS, ASType SQSPostgres SMSJournal) describe "Notifications server (SMP server: postgres-only store)" $ ntfServerTests (transport @TLS, ASType SQSPostgres SMSPostgres) around_ (postgressBracket testServerDBConnectInfo) $ do - describe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal) + xdescribe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal) describe "SMP client agent, postgres-only message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSPostgres) - describe "SMP proxy, postgres+jornal message store" $ + xdescribe "SMP proxy, postgres+jornal message store" $ before (pure $ ASType SQSPostgres SMSJournal) smpProxyTests describe "SMP proxy, postgres-only message store" $ before (pure $ ASType SQSPostgres SMSPostgres) smpProxyTests