diff --git a/simplexmq.cabal b/simplexmq.cabal index 3223328b2..8cc7726d9 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -109,6 +109,7 @@ library Simplex.Messaging.Agent.Store.Postgres.Options Simplex.Messaging.Agent.Store.Shared Simplex.Messaging.Agent.TRcvQueues + Simplex.Messaging.Agent.TSessionSubs Simplex.Messaging.Client Simplex.Messaging.Client.Agent Simplex.Messaging.Compression diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index bd9026771..1fdec5e45 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -555,13 +555,14 @@ testProtocolServer c nm userId srv = withAgentEnv' c $ case protocolTypeI @p of -- | set SOCKS5 proxy on/off and optionally set TCP timeouts for fast network setNetworkConfig :: AgentClient -> NetworkConfig -> IO () setNetworkConfig c@AgentClient {useNetworkConfig, proxySessTs} cfg' = do - (spChanged, changed) <- atomically $ do + ts <- getCurrentTime + changed <- atomically $ do (_, cfg) <- readTVar useNetworkConfig let changed = cfg /= cfg' !cfgSlow = slowNetworkConfig cfg' when changed $ writeTVar useNetworkConfig (cfgSlow, cfg') - pure (socksProxy cfg /= socksProxy cfg', changed) - when spChanged $ getCurrentTime >>= atomically . writeTVar proxySessTs + when (socksProxy cfg /= socksProxy cfg') $ writeTVar proxySessTs ts + pure changed when changed $ reconnectAllServers c setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO () @@ -1270,7 +1271,7 @@ subscribeConnections_ c conns = do let (subRs, cs) = foldr partitionResultsConns ([], []) conns resumeDelivery cs resumeConnCmds c $ map fst cs - rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concatMap rcvQueues cs) + rcvRs <- lift $ connResults <$> subscribeQueues c (concatMap rcvQueues cs) False rcvRs' <- storeClientServiceAssocs rcvRs ns <- asks ntfSupervisor lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs' cs diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index c984250d2..1a5fe56da 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -201,6 +201,7 @@ import Data.Bifunctor (bimap, first, second) import qualified Data.ByteString.Base64 as B64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import Data.Composition ((.:)) import Data.Either (isRight, partitionEithers) import Data.Functor (($>)) import Data.Int (Int64) @@ -217,6 +218,7 @@ import Data.Text.Encoding import Data.Time (UTCTime, addUTCTime, defaultTimeLocale, formatTime, getCurrentTime) import Data.Time.Clock.System (getSystemTime) import Data.Word (Word16) +import GHC.Conc (unsafeIOToSTM) import Network.Socket (HostName) import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError) import qualified Simplex.FileTransfer.Client as X @@ -234,7 +236,9 @@ import Simplex.Messaging.Agent.Stats import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction) import qualified Simplex.Messaging.Agent.Store.DB as DB -import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues (getRcvQueues)) +import Simplex.Messaging.Agent.TSessionSubs (TSessionSubs) +import qualified Simplex.Messaging.Agent.TSessionSubs as SS +import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues) import qualified Simplex.Messaging.Agent.TRcvQueues as RQ import Simplex.Messaging.Client import qualified Simplex.Messaging.Crypto as C @@ -311,8 +315,6 @@ type NtfClientVar = ClientVar NtfResponse type XFTPClientVar = ClientVar FileResponse -type SMPTransportSession = TransportSession SMP.BrokerMsg - type NtfTransportSession = TransportSession NtfResponse type XFTPTransportSession = TransportSession FileResponse @@ -339,6 +341,7 @@ data AgentClient = AgentClient subscrConns :: TVar (Set ConnId), activeSubs :: TRcvQueues (SessionId, RcvQueueSub), pendingSubs :: TRcvQueues RcvQueueSub, + currentSubs :: TSessionSubs, removedSubs :: TMap (UserId, SMPServer, SMP.RecipientId) SMPClientError, workerSeq :: TVar Int, smpDeliveryWorkers :: TMap SndQAddr (Worker, TMVar ()), @@ -507,6 +510,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai subscrConns <- newTVarIO S.empty activeSubs <- RQ.empty pendingSubs <- RQ.empty + currentSubs <- SS.emptyIO removedSubs <- TM.emptyIO workerSeq <- newTVarIO 0 smpDeliveryWorkers <- TM.emptyIO @@ -546,6 +550,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai subscrConns, activeSubs, pendingSubs, + currentSubs, removedSubs, workerSeq, smpDeliveryWorkers, @@ -701,10 +706,11 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do ts <- readTVarIO proxySessTs smp <- ExceptT $ getProtocolClient g nm tSess cfg presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs + atomically $ SS.setSessionId (sessionId $ thParams smp) tSess $ currentSubs c pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs} smpClientDisconnected :: AgentClient -> SMPTransportSession -> Env -> SMPClientVar -> TMap SMPServer ProxiedRelayVar -> SMPClient -> IO () -smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess@(userId, srv, qId) env v prs client = do +smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess@(userId, srv, cId) env v prs client = do removeClientAndSubs >>= serverDown logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv where @@ -712,29 +718,38 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess -- because we can have a race condition when a new current client could have already -- made subscriptions active, and the old client would be processing diconnection later. removeClientAndSubs :: IO ([RcvQueueSub], [ConnId]) - removeClientAndSubs = atomically $ do - removeSessVar v tSess smpClients - ifM (readTVar active) removeSubs (pure ([], [])) + removeClientAndSubs = do + (qs, cs, subs) <- atomically $ do + removeSessVar v tSess smpClients + ifM (readTVar active) removeSubs (pure ([], [], M.empty)) + -- TODO [subs] remove logs + when (M.keysSet subs /= S.fromList (map queueId qs)) $ error $ "setSubsPending different queues: " <> show (S.size $ M.keysSet subs) <> " " <> show (S.size $ S.fromList $ map queueId qs) + when (S.fromList (map qConnId $ M.elems subs) /= S.fromList (map qConnId qs)) $ error $ "setSubsPending different connections: " <> show (S.size (S.fromList $ map qConnId $ M.elems subs)) <> " " <> show (S.size $ S.fromList $ map qConnId qs) + pure (qs, cs) where sessId = sessionId $ thParams client removeSubs = do (qs, cs) <- RQ.getDelSessQueues tSess sessId $ activeSubs c RQ.batchAddQueues qs $ pendingSubs c + -- TODO [subs] + mode <- getSessionMode c + subs <- SS.setSubsPending mode tSess sessId $ currentSubs c -- this removes proxied relays that this client created sessions to destSrvs <- M.keys <$> readTVar prs - forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, qId) smpProxiedRelays - pure (qs, cs) + forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, cId) smpProxiedRelays + pure (qs, cs, subs) serverDown :: ([RcvQueueSub], [ConnId]) -> IO () serverDown (qs, conns) = whenM (readTVarIO active) $ do - notifySub "" $ hostEvent' DISCONNECT client - unless (null conns) $ notifySub "" $ DOWN srv conns + notifySub c "" $ hostEvent' DISCONNECT client + unless (null conns) $ notifySub c "" $ DOWN srv conns unless (null qs) $ do - atomically $ mapM_ (releaseGetLock c) qs - runReaderT (resubscribeSMPSession c tSess) env - - notifySub :: forall e. AEntityI e => ConnId -> AEvent e -> IO () - notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd) + releaseGetLocksIO c qs + mode <- getSessionModeIO c + let resubscribe + | (mode == TSMEntity) == isJust cId = resubscribeSMPSession c tSess + | otherwise = void $ subscribeQueues c qs True + runReaderT resubscribe env resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' () resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do @@ -743,9 +758,16 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do where getWorkerVar ts = ifM - (not <$> RQ.hasSessQueues tSess (pendingSubs c)) + (not <$> hasQueues) (pure Nothing) -- prevent race with cleanup and adding pending queues in another call (Just <$> getSessVar workerSeq tSess smpSubWorkers ts) + where + hasQueues = do + -- TODO [subs] + yes <- RQ.hasSessQueues tSess (pendingSubs c) + yes' <- SS.hasPendingSubs tSess $ currentSubs c + when (yes /= yes') $ unsafeIOToSTM $ error "hasPendingSubs different result" + pure yes newSubWorker v = do a <- async $ void (E.tryAny runSubWorker) >> atomically (cleanup v) atomically $ putTMVar (sessionVar v) a @@ -753,10 +775,13 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do ri <- asks $ reconnectInterval . config withRetryForeground ri isForeground (isNetworkOnline c) $ \_ loop -> do pending <- liftIO $ RQ.getSessQueues tSess $ pendingSubs c - forM_ (L.nonEmpty pending) $ \qs -> do + -- TODO [subs] + subs <- atomically $ SS.getPendingSubs tSess $ currentSubs c + when (M.keysSet subs /= S.fromList (map queueId pending)) $ error "getPendingSubs different queues" + unless (null pending) $ do liftIO $ waitUntilForeground c liftIO $ waitForUserNetwork c - reconnectSMPClient c tSess qs + handleNotify $ resubscribeSessQueues c tSess pending loop isForeground = (ASForeground ==) <$> readTVar (agentState c) cleanup :: SessionVar (Async ()) -> STM () @@ -765,28 +790,11 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do -- Not waiting may result in terminated worker remaining in the map. whenM (isEmptyTMVar $ sessionVar v) retry removeSessVar v tSess smpSubWorkers - -reconnectSMPClient :: AgentClient -> SMPTransportSession -> NonEmpty RcvQueueSub -> AM' () -reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do - cs <- liftIO $ RQ.getSessConns tSess $ activeSubs c - (rs, sessId_) <- subscribeQueues c $ L.toList qs - let (errs, okConns) = partitionEithers $ map (\(RcvQueueSub {connId}, r) -> bimap (connId,) (const connId) r) rs - conns = filter (`S.notMember` cs) okConns - unless (null conns) $ notifySub "" $ UP srv conns - let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs - mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs - forM_ (listToMaybe tempErrs) $ \(connId, e) -> do - when (null okConns && S.null cs && null finalErrs) . liftIO $ - forM_ sessId_ $ \sessId -> do - -- We only close the client session that was used to subscribe. - v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing) - mapM_ (closeClient_ c) v_ - notifySub connId $ ERR e - where handleNotify :: AM' () -> AM' () - handleNotify = E.handleAny $ notifySub "" . ERR . INTERNAL . show - notifySub :: forall e. AEntityI e => ConnId -> AEvent e -> AM' () - notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd) + handleNotify = E.handleAny $ notifySub c "" . ERR . INTERNAL . show + +notifySub :: forall e m. (AEntityI e, MonadIO m) => AgentClient -> ConnId -> AEvent e -> m () +notifySub c connId cmd = liftIO $ nonBlockingWriteTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd) getNtfServerClient :: AgentClient -> NetworkRequestMode -> NtfTransportSession -> AM NtfClient getNtfServerClient c@AgentClient {active, ntfClients, workerSeq, proxySessTs, presetDomains} nm tSess@(_, srv, _) = do @@ -929,8 +937,9 @@ closeAgentClient c = do atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst) clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker - atomically . RQ.clear $ activeSubs c - atomically . RQ.clear $ pendingSubs c + atomically $ RQ.clear $ activeSubs c + atomically $ RQ.clear $ pendingSubs c + atomically $ SS.clear $ currentSubs c clear subscrConns clear getMsgLocks where @@ -1071,7 +1080,7 @@ withLogClient c nm tSess entId cmdStr action = withLogClient_ c nm tSess entId c withSMPClient :: SMPQueueRec q => AgentClient -> NetworkRequestMode -> q -> ByteString -> (SMPClient -> ExceptT SMPClientError IO a) -> AM a withSMPClient c nm q cmdStr action = do - tSess <- mkSMPTransportSession c q + tSess <- mkSMPTransportSessionIO c q withLogClient c nm tSess (unEntityId $ queueId q) cmdStr $ action . connectedClient sendOrProxySMPMessage :: AgentClient -> NetworkRequestMode -> UserId -> SMPServer -> ConnId -> ByteString -> Maybe SMP.SndPrivateAuthKey -> SMP.SenderId -> MsgFlags -> SMP.MsgBody -> AM (Maybe SMPServer) @@ -1336,14 +1345,18 @@ getXFTPWorkPath = do maybe getTemporaryDirectory pure workDir mkTransportSession :: MonadIO m => AgentClient -> UserId -> ProtoServer msg -> ByteString -> m (TransportSession msg) -mkTransportSession c userId srv sessEntId = mkTSession userId srv sessEntId <$> getSessionMode c +mkTransportSession c userId srv sessEntId = mkTSession userId srv sessEntId <$> getSessionModeIO c {-# INLINE mkTransportSession #-} mkTSession :: UserId -> ProtoServer msg -> ByteString -> TransportSessionMode -> TransportSession msg mkTSession userId srv sessEntId mode = (userId, srv, if mode == TSMEntity then Just sessEntId else Nothing) {-# INLINE mkTSession #-} -mkSMPTransportSession :: (SMPQueueRec q, MonadIO m) => AgentClient -> q -> m SMPTransportSession +mkSMPTransportSessionIO :: (SMPQueueRec q, MonadIO m) => AgentClient -> q -> m SMPTransportSession +mkSMPTransportSessionIO c q = mkSMPTSession q <$> getSessionModeIO c +{-# INLINE mkSMPTransportSessionIO #-} + +mkSMPTransportSession :: SMPQueueRec q => AgentClient -> q -> STM SMPTransportSession mkSMPTransportSession c q = mkSMPTSession q <$> getSessionMode c {-# INLINE mkSMPTransportSession #-} @@ -1351,8 +1364,12 @@ mkSMPTSession :: SMPQueueRec q => q -> TransportSessionMode -> SMPTransportSessi mkSMPTSession q = mkTSession (qUserId q) (qServer q) (qConnId q) {-# INLINE mkSMPTSession #-} -getSessionMode :: MonadIO m => AgentClient -> m TransportSessionMode -getSessionMode = fmap sessionMode . getNetworkConfig +getSessionModeIO :: MonadIO m => AgentClient -> m TransportSessionMode +getSessionModeIO = fmap (sessionMode . snd) . readTVarIO . useNetworkConfig +{-# INLINE getSessionModeIO #-} + +getSessionMode :: AgentClient -> STM TransportSessionMode +getSessionMode = fmap (sessionMode . snd) . readTVar . useNetworkConfig {-# INLINE getSessionMode #-} newRcvQueue :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SConnectionMode c -> Bool -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId) @@ -1500,46 +1517,92 @@ serverHostError = \case SMP.TRANSPORT TEVersion -> True _ -> False --- | Subscribe to queues. The list of results can have a different order. -subscribeQueues :: AgentClient -> [RcvQueueSub] -> AM' ([(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))], Maybe SessionId) -subscribeQueues c qs = do - (errs, qs') <- partitionEithers <$> mapM checkQueue qs - atomically $ do - modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId qs')) - RQ.batchAddQueues qs' $ pendingSubs c - env <- ask - -- only "checked" queues are subscribed - session <- newTVarIO Nothing - rs <- sendTSessionBatches "SUB" mkSMPTSession (subscribeQueues_ env session) c NRMBackground qs' - (errs <> rs,) <$> readTVarIO session +-- | Batch by transport session and subscribe queues. The list of results can have a different order. +subscribeQueues :: AgentClient -> [RcvQueueSub] -> Bool -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))] +subscribeQueues c qs withEvents = do + (errs, qs') <- checkQueues c qs + atomically $ modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId qs')) + qss <- batchQueues mkSMPTSession c qs' + mapM_ addPendingSubs qss + rs <- mapConcurrently subscribeQueues_ qss + when (withEvents && not (null errs)) $ notifySub c "" $ ERRS $ map (first qConnId) errs + pure $ map (second Left) errs <> concatMap L.toList rs where - checkQueue rq = do - prohibited <- liftIO $ hasGetLock c rq - pure $ if prohibited then Left (rq, Left $ CMD PROHIBITED "subscribeQueues") else Right rq - subscribeQueues_ :: Env -> TVar (Maybe SessionId) -> SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId)) - subscribeQueues_ env session smp qs' = do - let (userId, srv, _) = transportSession' smp - atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs' - rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs' - active <- - atomically $ - ifM - (activeClientSession c tSess sessId) - (writeTVar session (Just sessId) >> processSubResults rs $> True) - (incSMPServerStat' c userId srv connSubIgnored (length rs) $> False) + addPendingSubs (tSess, qs') = atomically $ do + let qs'' = L.toList qs' + RQ.batchAddQueues qs'' $ pendingSubs c + SS.batchAddPendingSubs qs'' tSess $ currentSubs c + subscribeQueues_ qs'@(tSess@(_, srv, _), _) = do + (rs, active) <- subscribeSessQueues_ c qs' withEvents if active then when (hasTempErrors rs) resubscribe $> rs else do logWarn "subcription batch result for replaced SMP client, resubscribing" - -- TODO we probably use PCENetworkError here instead of the original error, so it becomes temporary. - resubscribe $> L.map (second $ Left . PCENetworkError . NESubscribeError . show) rs + -- we use BROKER NETWORK error here instead of the original error, so it becomes temporary. + resubscribe $> L.map (second $ Left . toNESubscribeError) rs + where + -- treating host errors as temporary here as well + hasTempErrors = any (either temporaryOrHostError (const False) . snd) + toNESubscribeError = BROKER (B.unpack $ strEncode srv) . NETWORK . NESubscribeError . show + resubscribe = resubscribeSMPSession c tSess + +-- only "checked" queues are subscribed +checkQueues :: AgentClient -> [RcvQueueSub] -> AM' ([(RcvQueueSub, AgentErrorType)], [RcvQueueSub]) +checkQueues c = fmap partitionEithers . mapM checkQueue + where + checkQueue rq = do + prohibited <- liftIO $ hasGetLock c rq + pure $ if prohibited then Left (rq, CMD PROHIBITED "checkQueues") else Right rq + +-- This function expects that all queues belong to one transport session, +-- and that they are already added to pending subscriptions. +resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [RcvQueueSub] -> AM' () +resubscribeSessQueues c tSess qs = do + (errs, qs_) <- checkQueues c qs + forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c (tSess, qs') True + unless (null errs) $ notifySub c "" $ ERRS $ map (first qConnId) errs + +subscribeSessQueues_ :: AgentClient -> (SMPTransportSession, NonEmpty RcvQueueSub) -> Bool -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool) +subscribeSessQueues_ c qs withEvents = sendClientBatch_ "SUB" False subscribeQueues_ c NRMBackground qs + where + subscribeQueues_ :: SMPClient -> NonEmpty RcvQueueSub -> IO (BatchResponses RcvQueueSub SMPClientError (Maybe ServiceId), Bool) + subscribeQueues_ smp qs' = do + let (userId, srv, _) = tSess + atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs' + rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs' + cs_ <- + if withEvents + then do + cs <- liftIO $ RQ.getSessConns tSess $ activeSubs c + -- TODO [subs] + subs <- atomically $ SS.getActiveSubs tSess $ currentSubs c + when (S.fromList (map qConnId $ M.elems subs) /= cs) $ error "getActiveSubs different connections" + pure $ Just cs + else pure Nothing + active <- + atomically $ + ifM + (activeClientSession c tSess sessId) + (processSubResults rs $> True) + (incSMPServerStat' c userId srv connSubIgnored (length rs) $> False) + forM_ cs_ $ \cs -> do + let (errs, okConns) = partitionEithers $ map (\(RcvQueueSub {connId}, r) -> bimap (connId,) (const connId) r) $ L.toList rs + conns = filter (`S.notMember` cs) okConns + unless (null conns) $ notifySub c "" $ UP srv conns + let (tempErrs, finalErrs) = partition (temporaryClientError . snd) errs + mapM_ (\(connId, e) -> notifySub c connId $ ERR $ protocolClientError SMP (clientServer smp) e) finalErrs + forM_ (listToMaybe tempErrs) $ \(connId, e) -> do + when (null okConns && S.null cs && null finalErrs && active) $ liftIO $ do + -- We only close the client session that was used to subscribe. + v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing) + mapM_ (closeClient_ c) v_ + notifySub c connId $ ERR $ protocolClientError SMP (clientServer smp) e + pure (rs, active) where tSess = transportSession' smp sessId = sessionId $ thParams smp - hasTempErrors = any (either temporaryClientError (const False) . snd) processSubResults :: NonEmpty (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> STM () processSubResults = mapM_ $ uncurry $ processSubResult c sessId - resubscribe = resubscribeSMPSession c tSess `runReaderT` env activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool activeClientSession c tSess sessId = sameSess <$> tryReadSessVar tSess (smpClients c) @@ -1554,25 +1617,30 @@ type BatchResponses q e r = NonEmpty (q, Either e r) -- it includes arguments in the results instead. sendTSessionBatches :: forall q r. ByteString -> (q -> TransportSessionMode -> SMPTransportSession) -> (SMPClient -> NonEmpty q -> IO (BatchResponses q SMPClientError r)) -> AgentClient -> NetworkRequestMode -> [q] -> AM' [(q, Either AgentErrorType r)] sendTSessionBatches statCmd mkSession action c nm qs = - concatMap L.toList <$> (mapConcurrently sendClientBatch =<< batchQueues) + concatMap L.toList <$> (mapConcurrently (sendClientBatch statCmd action c nm) =<< batchQueues mkSession c qs) + +batchQueues :: (q -> TransportSessionMode -> SMPTransportSession) -> AgentClient -> [q] -> AM' [(SMPTransportSession, NonEmpty q)] +batchQueues mkSession c qs = do + mode <- getSessionModeIO c + pure . M.assocs $ foldr (batch mode) M.empty qs where - batchQueues :: AM' [(SMPTransportSession, NonEmpty q)] - batchQueues = do - mode <- getSessionMode c - pure . M.assocs $ foldr (batch mode) M.empty qs + batch mode q m = + let tSess = mkSession q mode + in M.alter (Just . maybe [q] (q <|)) tSess m + +sendClientBatch :: ByteString -> (SMPClient -> NonEmpty q -> IO (BatchResponses q SMPClientError r)) -> AgentClient -> NetworkRequestMode -> (SMPTransportSession, NonEmpty q) -> AM' (BatchResponses q AgentErrorType r) +sendClientBatch statCmd action c nm qs = fmap fst $ sendClientBatch_ statCmd () (fmap (,()) .: action) c nm qs +{-# INLINE sendClientBatch #-} + +sendClientBatch_ :: ByteString -> res -> (SMPClient -> NonEmpty q -> IO (BatchResponses q SMPClientError r, res)) -> AgentClient -> NetworkRequestMode -> (SMPTransportSession, NonEmpty q) -> AM' (BatchResponses q AgentErrorType r, res) +sendClientBatch_ statCmd errRes action c nm (tSess@(_, srv, _), qs') = + tryAllErrors' (getSMPServerClient c nm tSess) >>= \case + Left e -> pure (L.map (,Left e) qs', errRes) + Right (SMPConnectedClient smp _) -> liftIO $ do + logServer' "-->" c srv (bshow (length qs') <> " queues") statCmd + first (L.map agentError) <$> action smp qs' where - batch mode q m = - let tSess = mkSession q mode - in M.alter (Just . maybe [q] (q <|)) tSess m - sendClientBatch :: (SMPTransportSession, NonEmpty q) -> AM' (BatchResponses q AgentErrorType r) - sendClientBatch (tSess@(_, srv, _), qs') = - tryAllErrors' (getSMPServerClient c nm tSess) >>= \case - Left e -> pure $ L.map (,Left e) qs' - Right (SMPConnectedClient smp _) -> liftIO $ do - logServer' "-->" c srv (bshow (length qs') <> " queues") statCmd - L.map agentError <$> action smp qs' - where - agentError = second . first $ protocolClientError SMP $ clientServer smp + agentError = second . first $ protocolClientError SMP $ clientServer smp sendBatch :: SomeRcvQueue q => (SMPClient -> NetworkRequestMode -> NonEmpty (SMP.RecipientId, SMP.RcvPrivateAuthKey) -> IO (NonEmpty (Either SMPClientError a))) -> SMPClient -> NetworkRequestMode -> NonEmpty q -> IO (BatchResponses q SMPClientError a) sendBatch smpCmdFunc smp nm qs = L.zip qs <$> smpCmdFunc smp nm (L.map queueCreds qs) @@ -1580,20 +1648,29 @@ sendBatch smpCmdFunc smp nm qs = L.zip qs <$> smpCmdFunc smp nm (L.map queueCred queueCreds q = (queueId q, rcvAuthKey q) addSubscription :: AgentClient -> SessionId -> RcvQueueSub -> STM () -addSubscription c sessId rq@RcvQueueSub {connId} = do - modifyTVar' (subscrConns c) $ S.insert connId +addSubscription c sessId rq = do + modifyTVar' (subscrConns c) $ S.insert $ qConnId rq RQ.addSessQueue (sessId, rq) $ activeSubs c RQ.deleteQueue rq $ pendingSubs c + -- TODO [subs] + tSess <- mkSMPTransportSession c rq + SS.addActiveSub sessId rq tSess $ currentSubs c failSubscription :: SomeRcvQueue q => AgentClient -> q -> SMPClientError -> STM () failSubscription c rq e = do RQ.deleteQueue rq (pendingSubs c) TM.insert (RQ.qKey rq) e (removedSubs c) + -- TODO [subs] + tSess <- mkSMPTransportSession c rq + SS.deletePendingSub (queueId rq) tSess $ currentSubs c addPendingSubscription :: AgentClient -> RcvQueueSub -> STM () -addPendingSubscription c rq@RcvQueueSub {connId} = do - modifyTVar' (subscrConns c) $ S.insert connId +addPendingSubscription c rq = do + modifyTVar' (subscrConns c) $ S.insert $ qConnId rq RQ.addQueue rq $ pendingSubs c + -- TODO [subs] + tSess <- mkSMPTransportSession c rq + SS.addPendingSub rq tSess $ currentSubs c addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> AM' () addNewQueueSubscription c rq' tSess sessId = do @@ -1607,11 +1684,23 @@ addNewQueueSubscription c rq' tSess sessId = do unless same $ resubscribeSMPSession c tSess hasActiveSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool -hasActiveSubscription c rq = RQ.hasQueue rq $ activeSubs c +hasActiveSubscription c rq = do + yes <- RQ.hasQueue rq $ activeSubs c + -- TODO [subs] + tSess <- mkSMPTransportSession c rq + yes' <- SS.hasActiveSub (queueId rq) tSess $ currentSubs c + when (yes /= yes') $ unsafeIOToSTM $ error "hasActiveSub different result" + pure yes {-# INLINE hasActiveSubscription #-} hasPendingSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool -hasPendingSubscription c rq = RQ.hasQueue rq $ pendingSubs c +hasPendingSubscription c rq = do + yes <- RQ.hasQueue rq $ pendingSubs c + -- TODO [subs] + tSess <- mkSMPTransportSession c rq + yes' <- SS.hasPendingSub (queueId rq) tSess $ currentSubs c + when (yes /= yes') $ unsafeIOToSTM $ error "hasPendingSub different result" + pure yes {-# INLINE hasPendingSubscription #-} removeSubscription :: SomeRcvQueue q => AgentClient -> ConnId -> q -> STM () @@ -1619,12 +1708,19 @@ removeSubscription c connId rq = do modifyTVar' (subscrConns c) $ S.delete connId RQ.deleteQueue rq $ activeSubs c RQ.deleteQueue rq $ pendingSubs c + -- TODO [subs] + tSess <- mkSMPTransportSession c rq + SS.deleteSub (queueId rq) tSess $ currentSubs c removeSubscriptions :: SomeRcvQueue q => AgentClient -> [ConnId] -> [q] -> STM () removeSubscriptions c connIds rqs = do unless (null connIds) $ modifyTVar' (subscrConns c) (`S.difference` (S.fromList connIds)) RQ.batchDeleteQueues rqs $ activeSubs c RQ.batchDeleteQueues rqs $ pendingSubs c + -- TODO [subs] batch + forM_ rqs $ \rq -> do + tSess <- mkSMPTransportSession c rq + SS.deleteSub (queueId rq) tSess $ currentSubs c getSubscriptions :: AgentClient -> IO (Set ConnId) getSubscriptions = readTVarIO . subscrConns @@ -1782,6 +1878,13 @@ releaseGetLock c rq = TM.lookup (qServer rq, queueId rq) (getMsgLocks c) >>= mapM_ (`tryPutTMVar` ()) {-# INLINE releaseGetLock #-} +releaseGetLocksIO :: SomeRcvQueue q => AgentClient -> [q] -> IO () +releaseGetLocksIO c rqs = do + locks <- readTVarIO $ getMsgLocks c + forM_ rqs $ \rq -> + forM_ (M.lookup ((qServer rq, queueId rq)) locks) $ \lock -> + atomically $ tryPutTMVar lock () + suspendQueue :: AgentClient -> NetworkRequestMode -> RcvQueue -> AM () suspendQueue c nm rq@RcvQueue {rcvId, rcvPrivateKey} = withSMPClient c nm rq "OFF" $ \smp -> @@ -2327,7 +2430,7 @@ getAgentSubsTotal c userIds = do pure (SMPServerSubs {ssActive, ssPending}, sess) where getSubsCount :: (AgentClient -> TRcvQueues q) -> IO Int - getSubsCount subs = M.foldrWithKey' addSub 0 <$> readTVarIO (getRcvQueues $ subs c) + getSubsCount subs = M.foldrWithKey' addSub 0 <$> readTVarIO (RQ.getRcvQueues $ subs c) addSub :: (UserId, SMPServer, SMP.RecipientId) -> q -> Int -> Int addSub (userId, _, _) _ cnt = if userId `elem` userIds then cnt + 1 else cnt hasSession :: [(SMPTransportSession, SMPClientVar)] -> IO Bool @@ -2367,8 +2470,8 @@ getAgentServersSummary c@AgentClient {smpServersStats, xftpServersStats, ntfServ } where getServerSubs = do - subs <- M.foldrWithKey' (addSub incActive) M.empty <$> readTVarIO (getRcvQueues $ activeSubs c) - M.foldrWithKey' (addSub incPending) subs <$> readTVarIO (getRcvQueues $ pendingSubs c) + subs <- M.foldrWithKey' (addSub incActive) M.empty <$> readTVarIO (RQ.getRcvQueues $ activeSubs c) + M.foldrWithKey' (addSub incPending) subs <$> readTVarIO (RQ.getRcvQueues $ pendingSubs c) where addSub f (userId, srv, _) _ = M.alter (Just . f . fromMaybe SMPServerSubs {ssActive = 0, ssPending = 0}) (userId, srv) incActive ss = ss {ssActive = ssActive ss + 1} @@ -2410,7 +2513,7 @@ getAgentSubscriptions c = do pure $ SubscriptionsInfo {activeSubscriptions, pendingSubscriptions, removedSubscriptions} where getSubs :: (AgentClient -> TRcvQueues q) -> IO [SubInfo] - getSubs sel = map (`subInfo` Nothing) . M.keys <$> readTVarIO (getRcvQueues $ sel c) + getSubs sel = map (`subInfo` Nothing) . M.keys <$> readTVarIO (RQ.getRcvQueues $ sel c) getRemovedSubs = map (uncurry subInfo . second Just) . M.assocs <$> readTVarIO (removedSubs c) subInfo :: (UserId, SMPServer, SMP.RecipientId) -> Maybe SMPClientError -> SubInfo subInfo (uId, srv, rId) err = SubInfo {userId = uId, server = enc srv, rcvId = enc rId, subError = show <$> err} diff --git a/src/Simplex/Messaging/Agent/TRcvQueues.hs b/src/Simplex/Messaging/Agent/TRcvQueues.hs index cb7d220d4..c51145210 100644 --- a/src/Simplex/Messaging/Agent/TRcvQueues.hs +++ b/src/Simplex/Messaging/Agent/TRcvQueues.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE LambdaCase #-} diff --git a/src/Simplex/Messaging/Agent/TSessionSubs.hs b/src/Simplex/Messaging/Agent/TSessionSubs.hs new file mode 100644 index 000000000..7d3028dfa --- /dev/null +++ b/src/Simplex/Messaging/Agent/TSessionSubs.hs @@ -0,0 +1,164 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} + +module Simplex.Messaging.Agent.TSessionSubs + ( TSessionSubs (sessionSubs), + emptyIO, + clear, + hasActiveSub, + hasPendingSub, + addPendingSub, + setSessionId, + addActiveSub, + batchAddPendingSubs, + deletePendingSub, + deleteSub, + batchDeleteSubs, + hasPendingSubs, + getPendingSubs, + getActiveSubs, + setSubsPending, + ) +where + +import Control.Concurrent.STM +import Control.Monad +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as M +import Data.Maybe (isJust) +import qualified Data.Set as S +import Simplex.Messaging.Agent.Protocol (SMPQueue (..)) +import Simplex.Messaging.Agent.Store (RcvQueueSub (..), SomeRcvQueue) +import Simplex.Messaging.Client (SMPTransportSession, TransportSessionMode (..)) +import Simplex.Messaging.Protocol (RecipientId) +import Simplex.Messaging.TMap (TMap) +import qualified Simplex.Messaging.TMap as TM +import Simplex.Messaging.Transport +import Simplex.Messaging.Util (($>>=)) + +-- the fields in this record have the same data with swapped keys for lookup efficiency, +-- and all methods must maintain this invariant. +data TSessionSubs = TSessionSubs + { sessionSubs :: TMap SMPTransportSession SessSubs + } + +data SessSubs = SessSubs + { subsSessId :: TVar (Maybe SessionId), + activeSubs :: TMap RecipientId RcvQueueSub, + pendingSubs :: TMap RecipientId RcvQueueSub + } + +emptyIO :: IO TSessionSubs +emptyIO = TSessionSubs <$> TM.emptyIO +{-# INLINE emptyIO #-} + +clear :: TSessionSubs -> STM () +clear = TM.clear . sessionSubs +{-# INLINE clear #-} + +lookupSubs :: SMPTransportSession -> TSessionSubs -> STM (Maybe SessSubs) +lookupSubs tSess = TM.lookup tSess . sessionSubs +{-# INLINE lookupSubs #-} + +getSessSubs :: SMPTransportSession -> TSessionSubs -> STM SessSubs +getSessSubs tSess ss = lookupSubs tSess ss >>= maybe new pure + where + new = do + s <- SessSubs <$> newTVar Nothing <*> newTVar M.empty <*> newTVar M.empty + TM.insert tSess s $ sessionSubs ss + pure s + +hasActiveSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool +hasActiveSub = hasQueue_ activeSubs +{-# INLINE hasActiveSub #-} + +hasPendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool +hasPendingSub = hasQueue_ pendingSubs +{-# INLINE hasPendingSub #-} + +hasQueue_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool +hasQueue_ subs rId tSess ss = isJust <$> (lookupSubs tSess ss $>>= TM.lookup rId . subs) +{-# INLINE hasQueue_ #-} + +addPendingSub :: RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM () +addPendingSub rq tSess ss = getSessSubs tSess ss >>= TM.insert (rcvId rq) rq . pendingSubs + +setSessionId :: SessionId -> SMPTransportSession -> TSessionSubs -> STM () +setSessionId sessId tSess ss = do + s <- getSessSubs tSess ss + readTVar (subsSessId s) >>= \case + Nothing -> writeTVar (subsSessId s) (Just sessId) + Just sessId' -> unless (sessId == sessId') $ void $ setSubsPending_ s $ Just sessId + +addActiveSub :: SessionId -> RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM () +addActiveSub sessId rq tSess ss = do + s <- getSessSubs tSess ss + sessId' <- readTVar $ subsSessId s + let rId = rcvId rq + if Just sessId == sessId' + then do + TM.insert rId rq $ activeSubs s + TM.delete rId $ pendingSubs s + else TM.insert rId rq $ pendingSubs s + +batchAddPendingSubs :: [RcvQueueSub] -> SMPTransportSession -> TSessionSubs -> STM () +batchAddPendingSubs rqs tSess ss = do + s <- getSessSubs tSess ss + modifyTVar' (pendingSubs s) $ M.union $ M.fromList $ map (\rq -> (rcvId rq, rq)) rqs + +deletePendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM () +deletePendingSub rId tSess = lookupSubs tSess >=> mapM_ (TM.delete rId . pendingSubs) + +deleteSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM () +deleteSub rId tSess = lookupSubs tSess >=> mapM_ (\s -> TM.delete rId (activeSubs s) >> TM.delete rId (pendingSubs s)) + +batchDeleteSubs :: SomeRcvQueue q => [q] -> SMPTransportSession -> TSessionSubs -> STM () +batchDeleteSubs rqs tSess = lookupSubs tSess >=> mapM_ (\s -> delete (activeSubs s) >> delete (pendingSubs s)) + where + rIds = S.fromList $ map queueId rqs + delete = (`modifyTVar'` (`M.withoutKeys` rIds)) + +hasPendingSubs :: SMPTransportSession -> TSessionSubs -> STM Bool +hasPendingSubs tSess = lookupSubs tSess >=> maybe (pure False) (fmap (not . null) . readTVar . pendingSubs) + +getPendingSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) +getPendingSubs = getSubs_ pendingSubs +{-# INLINE getPendingSubs #-} + +getActiveSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) +getActiveSubs = getSubs_ activeSubs +{-# INLINE getActiveSubs #-} + +getSubs_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) +getSubs_ subs tSess = lookupSubs tSess >=> maybe (pure M.empty) (readTVar . subs) + +setSubsPending :: TransportSessionMode -> SMPTransportSession -> SessionId -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) +setSubsPending mode tSess@(uId, srv, connId_) sessId tss@(TSessionSubs ss) + | entitySession == isJust connId_ = + TM.lookup tSess ss >>= withSessSubs (`setSubsPending_` Nothing) + | otherwise = + TM.lookupDelete tSess ss >>= withSessSubs setPendingChangeMode + where + entitySession = mode == TSMEntity + sessEntId = if entitySession then Just else const Nothing + withSessSubs run = \case + Nothing -> pure M.empty + Just s -> do + sessId' <- readTVar $ subsSessId s + if Just sessId == sessId' then run s else pure M.empty + setPendingChangeMode s = do + subs <- M.union <$> readTVar (activeSubs s) <*> readTVar (pendingSubs s) + unless (null subs) $ + forM_ subs $ \rq -> addPendingSub rq (uId, srv, sessEntId (connId rq)) tss + pure subs + +setSubsPending_ :: SessSubs -> Maybe SessionId -> STM (Map RecipientId RcvQueueSub) +setSubsPending_ s sessId_ = do + writeTVar (subsSessId s) sessId_ + let as = activeSubs s + subs <- readTVar as + unless (null subs) $ do + writeTVar as M.empty + modifyTVar' (pendingSubs s) $ M.union subs + pure subs diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index a2b884c9e..f78eb8e05 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -29,6 +29,7 @@ module Simplex.Messaging.Client ( -- * Connect (disconnect) client to (from) SMP server TransportSession, + SMPTransportSession, ProtocolClient (thParams, sessionTs), SMPClient, ProxiedRelay (..), @@ -549,6 +550,8 @@ type UserId = Int64 -- Please note that for SMP connection ID is used as entity ID, not queue ID. type TransportSession msg = (UserId, ProtoServer msg, Maybe ByteString) +type SMPTransportSession = TransportSession BrokerMsg + -- | Connects to 'ProtocolServer' using passed client configuration -- and queue for messages and notifications. -- diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index c00d78aba..8a4d2bf58 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -3569,6 +3569,7 @@ testTwoUsers = withAgentClients2 $ \a b -> do liftIO $ threadDelay 250000 ("", "", DOWN _ _) <- nGet a ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGet a a `hasClients` 2 exchangeGreetingsMsgId 4 a bId1 b aId1 @@ -3595,6 +3596,8 @@ testTwoUsers = withAgentClients2 $ \a b -> do ("", "", DOWN _ _) <- nGet a ("", "", UP _ _) <- nGet a ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGet a a `hasClients` 4 exchangeGreetingsMsgId 6 a bId1 b aId1 exchangeGreetingsMsgId 6 a bId1' b aId1' diff --git a/tests/Test.hs b/tests/Test.hs index 364080e0c..1339904a5 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -103,7 +103,7 @@ main = do testStoreDBOpts "src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql" around_ (postgressBracket testServerDBConnectInfo) $ do - describe "SMP server via TLS, postgres+jornal message store" $ + xdescribe "SMP server via TLS, postgres+jornal message store" $ before (pure (transport @TLS, ASType SQSPostgres SMSJournal)) serverTests describe "SMP server via TLS, postgres-only message store" $ before (pure (transport @TLS, ASType SQSPostgres SMSPostgres)) serverTests @@ -128,19 +128,19 @@ main = do describe "Notifications server (SMP server: jornal store)" $ ntfServerTests (transport @TLS, ASType SQSMemory SMSJournal) around_ (postgressBracket testServerDBConnectInfo) $ do - describe "Notifications server (SMP server: postgres+jornal store)" $ + xdescribe "Notifications server (SMP server: postgres+jornal store)" $ ntfServerTests (transport @TLS, ASType SQSPostgres SMSJournal) describe "Notifications server (SMP server: postgres-only store)" $ ntfServerTests (transport @TLS, ASType SQSPostgres SMSPostgres) around_ (postgressBracket testServerDBConnectInfo) $ do - describe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal) - describe "SMP client agent, postgres-only message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSPostgres) - describe "SMP proxy, postgres+jornal message store" $ + xdescribe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal) + fdescribe "SMP client agent, postgres-only message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSPostgres) + xdescribe "SMP proxy, postgres+jornal message store" $ before (pure $ ASType SQSPostgres SMSJournal) smpProxyTests describe "SMP proxy, postgres-only message store" $ before (pure $ ASType SQSPostgres SMSPostgres) smpProxyTests #endif - describe "SMP client agent, jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal) + xdescribe "SMP client agent, jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal) describe "SMP proxy, jornal message store" $ before (pure $ ASType SQSMemory SMSJournal) smpProxyTests describe "XFTP" $ do diff --git a/tests/Util.hs b/tests/Util.hs index 9a4049d68..6e5a3f48a 100644 --- a/tests/Util.hs +++ b/tests/Util.hs @@ -48,7 +48,7 @@ newtype TestWrapper a = TestWrapper a -- TODO [ntfdb] running wiht LogWarn level shows potential issue "Queue count differs" testLogLevel :: LogLevel -testLogLevel = LogError +testLogLevel = LogWarn instance Example a => Example (TestWrapper a) where type Arg (TestWrapper a) = Arg a @@ -56,7 +56,7 @@ instance Example a => Example (TestWrapper a) where ci <- envCI runTest `E.catches` [E.Handler (onTestFailure ci), E.Handler (onTestException ci)] where - tt = 120 + tt = 30 runTest = timeout (tt * 1000000) (evaluateExample action params hooks state) `finally` callCommand "sync" >>= \case Just r -> pure r