diff --git a/simplexmq.cabal b/simplexmq.cabal index e49a72a1f..9f7b23bc4 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -519,6 +519,7 @@ test-suite simplexmq-test , generic-random ==1.5.* , hashable , hspec ==2.11.* + , hspec-core ==2.11.* , http-client , http-types , http2 diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index 42632a7a7..a287a065b 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -53,7 +53,6 @@ data NtfServerConfig = NtfServerConfig subIdBytes :: Int, regCodeBytes :: Int, clientQSize :: Natural, - subQSize :: Natural, pushQSize :: Natural, smpAgentCfg :: SMPClientAgentConfig, apnsConfig :: APNSPushClientConfig, @@ -94,11 +93,11 @@ data NtfEnv = NtfEnv } newNtfServerEnv :: NtfServerConfig -> IO NtfEnv -newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, startOptions} = do +newNtfServerEnv config@NtfServerConfig {pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, startOptions} = do when (compactLog startOptions) $ compactDbStoreLog $ dbStoreLogPath dbStoreConfig random <- C.newRandom store <- newNtfDbStore dbStoreConfig - subscriber <- newNtfSubscriber subQSize smpAgentCfg random + subscriber <- newNtfSubscriber smpAgentCfg random pushServer <- newNtfPushServer pushQSize apnsConfig tlsServerCreds <- loadServerCredential ntfCredentials Fingerprint fp <- loadFingerprint ntfCredentials @@ -121,8 +120,8 @@ data NtfSubscriber = NtfSubscriber type SMPSubscriberVar = SessionVar SMPSubscriber -newNtfSubscriber :: Natural -> SMPClientAgentConfig -> TVar ChaChaDRG -> IO NtfSubscriber -newNtfSubscriber qSize smpAgentCfg random = do +newNtfSubscriber :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO NtfSubscriber +newNtfSubscriber smpAgentCfg random = do smpSubscribers <- TM.emptyIO subscriberSeq <- newTVarIO 0 smpAgent <- newSMPClientAgent smpAgentCfg random diff --git a/src/Simplex/Messaging/Notifications/Server/Main.hs b/src/Simplex/Messaging/Notifications/Server/Main.hs index a073eee18..23954506a 100644 --- a/src/Simplex/Messaging/Notifications/Server/Main.hs +++ b/src/Simplex/Messaging/Notifications/Server/Main.hs @@ -233,7 +233,6 @@ ntfServerCLI cfgPath logPath = subIdBytes = 24, regCodeBytes = 32, clientQSize = 64, - subQSize = 2048, pushQSize = 32768, smpAgentCfg = defaultSMPClientAgentConfig diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index db041c4e7..544459484 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -87,7 +87,7 @@ import Network.Socket (ServiceName, Socket, socketToHandle) import qualified Network.TLS as TLS import Numeric.Natural (Natural) import Simplex.Messaging.Agent.Lock -import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), SMPClient, SMPClientError, forwardSMPTransmission, smpProxyError, temporaryClientError) +import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), SMPClient, SMPClientError, forwardSMPTransmission, nonBlockingWriteTBQueue, smpProxyError, temporaryClientError) import Simplex.Messaging.Client.Agent (OwnServer, SMPClientAgent (..), SMPClientAgentEvent (..), closeSMPClientAgent, getSMPServerClient'', isOwnServer, lookupSMPServerClient, getConnectedSMPServerClient) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding @@ -162,8 +162,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt stopServer s liftIO $ exitSuccess raceAny_ - ( serverThread s "server subscribedQ" subscribedQ subscribers subClients pendingSubEvents subscriptions cancelSub - : serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubClients pendingNtfSubEvents ntfSubscriptions (\_ -> pure ()) + ( serverThread "server subscribers" s subscribers subscriptions cancelSub + : serverThread "server ntfSubscribers" s ntfSubscribers ntfSubscriptions (\_ -> pure ()) : deliverNtfsThread s : sendPendingEvtsThread s : receiveFromProxyAgent pa @@ -229,66 +229,63 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt serverThread :: forall s. - Server -> String -> - (Server -> TQueue (QueueId, ClientId, Subscribed)) -> - (Server -> TMap QueueId (TVar AClient)) -> - (Server -> TVar (IM.IntMap AClient)) -> - (Server -> TVar (IM.IntMap (NonEmpty (QueueId, Subscribed)))) -> + Server -> + (Server -> ServerSubscribers) -> (forall st. Client st -> TMap QueueId s) -> (s -> IO ()) -> M () - serverThread s label subQ subs subClnts pendingEvts clientSubs unsub = do + serverThread label srv srvSubscribers clientSubs unsub = do labelMyThread label - cls <- asks clients - liftIO . forever $ - (atomically (readTQueue $ subQ s) >>= atomically . updateSubscribers cls) + liftIO . forever $ do + -- Reading clients outside of `updateSubscribers` transaction to avoid transaction re-evaluation on each new connected client. + -- In case client disconnects during the transaction (its `connected` property is read), + -- the transaction will still be re-evaluated, and the client won't be stored as subscribed. + sub@(_, clntId, _) <- atomically $ readTQueue subQ + c_ <- getServerClient clntId srv + atomically (updateSubscribers c_ sub) $>>= endPreviousSubscriptions >>= mapM_ unsub where - updateSubscribers :: TVar (IM.IntMap (Maybe AClient)) -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, Subscribed), AClient)) - updateSubscribers cls (qId, clntId, subscribed) = - -- Client lookup by ID is in the same STM transaction. - -- In case client disconnects during the transaction, - -- it will be re-evaluated, and the client won't be stored as subscribed. - (readTVar cls >>= updateSub . IM.lookup clntId) - $>>= clientToBeNotified + ServerSubscribers {subQ, queueSubscribers, subClients, pendingEvents} = srvSubscribers srv + updateSubscribers :: Maybe AClient -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, BrokerMsg), AClient)) + updateSubscribers c_ (qId, clntId, subscribed) = updateSub $>>= clientToBeNotified where - ss = subs s - updateSub = \case - Just (Just clnt) - | subscribed -> do - modifyTVar' (subClnts s) $ IM.insert clntId clnt -- add client to server's subscribed cients - TM.lookup qId ss >>= -- insert subscribed and current client - maybe - (newTVar clnt >>= \cv -> TM.insert qId cv ss $> Nothing) - (\cv -> Just <$> swapTVar cv clnt) - | otherwise -> do - removeWhenNoSubs clnt - TM.lookupDelete qId ss >>= mapM readTVar - -- This case catches Just Nothing - it cannot happen here. - -- Nothing is there only before client thread is started. - _ -> TM.lookup qId ss >>= mapM readTVar -- do not insert client if it is already disconnected, but send END to any other client - clientToBeNotified ac@(AClient _ _ c') - | clntId == clientId c' = pure Nothing - | otherwise = (\yes -> if yes then Just ((qId, subscribed), ac) else Nothing) <$> readTVar (connected c') - endPreviousSubscriptions :: ((QueueId, Subscribed), AClient) -> IO (Maybe s) - endPreviousSubscriptions (qEvt@(qId, _), ac@(AClient _ _ c)) = do - atomically $ modifyTVar' (pendingEvts s) $ IM.alter (Just . maybe [qEvt] (qEvt <|)) (clientId c) + updateSub = case c_ of + Just c@(AClient _ _ Client {connected}) -> ifM (readTVar connected) (updateSubConnected c) updateSubDisconnected + Nothing -> updateSubDisconnected + updateSubConnected c + | subscribed = do + modifyTVar' subClients $ IS.insert clntId -- add client to server's subscribed cients + upsertSubscribedClient qId c queueSubscribers + | otherwise = do + removeWhenNoSubs c + lookupDeleteSubscribedClient qId queueSubscribers + -- do not insert client if it is already disconnected, but send END to any other client + updateSubDisconnected = lookupDeleteSubscribedClient qId queueSubscribers + clientToBeNotified ac@(AClient _ _ Client {clientId, connected}) + | clntId == clientId = pure Nothing + | otherwise = (\yes -> if yes then Just ((qId, subEvt), ac) else Nothing) <$> readTVar connected + where + subEvt = if subscribed then END else DELD + endPreviousSubscriptions :: ((QueueId, BrokerMsg), AClient) -> IO (Maybe s) + endPreviousSubscriptions (evt@(qId, _), ac@(AClient _ _ c)) = do + atomically $ modifyTVar' pendingEvents $ IM.alter (Just . maybe [evt] (evt <|)) (clientId c) atomically $ do sub <- TM.lookupDelete qId (clientSubs c) removeWhenNoSubs ac $> sub -- remove client from server's subscribed cients - removeWhenNoSubs (AClient _ _ c) = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' (subClnts s) $ IM.delete (clientId c) + removeWhenNoSubs (AClient _ _ c) = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' subClients $ IS.delete (clientId c) deliverNtfsThread :: Server -> M () - deliverNtfsThread Server {ntfSubClients} = do + deliverNtfsThread srv@Server {ntfSubscribers} = do ntfInt <- asks $ ntfDeliveryInterval . config NtfStore ns <- asks ntfStore stats <- asks serverStats liftIO $ forever $ do threadDelay ntfInt - readTVarIO ntfSubClients >>= mapM_ (deliverNtfs ns stats) + cIds <- IS.toList <$> readTVarIO (subClients ntfSubscribers) + forM_ cIds $ \cId -> getServerClient cId srv >>= mapM_ (deliverNtfs ns stats) where deliverNtfs ns stats (AClient _ _ Client {clientId, ntfSubscriptions, sndQ, connected}) = whenM (currentClient readTVarIO) $ do @@ -308,7 +305,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt writeTBQueue sndQ ts pure $ length ts_ currentClient :: Monad m => (forall a. TVar a -> m a) -> m Bool - currentClient rd = (&&) <$> rd connected <*> (IM.member clientId <$> rd ntfSubClients) + currentClient rd = (&&) <$> rd connected <*> (IS.member clientId <$> rd (subClients ntfSubscribers)) addNtfs :: [Transmission BrokerMsg] -> (NotifierId, TVar [MsgNtf]) -> STM [Transmission BrokerMsg] addNtfs acc (nId, v) = readTVar v >>= \case @@ -324,37 +321,30 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt atomicModifyIORef'_ (msgNtfsB stats) (+ (len `div` 80 + 1)) -- up to 80 NMSG in the batch sendPendingEvtsThread :: Server -> M () - sendPendingEvtsThread s = do + sendPendingEvtsThread srv@Server {subscribers, ntfSubscribers} = do endInt <- asks $ pendingENDInterval . config - cls <- asks clients - forever $ do + stats <- asks serverStats + liftIO $ forever $ do threadDelay endInt - sendPending cls $ pendingSubEvents s - sendPending cls $ pendingNtfSubEvents s + sendPending subscribers stats + sendPending ntfSubscribers stats where - sendPending cls ref = do - ends <- atomically $ swapTVar ref IM.empty - unless (null ends) $ forM_ (IM.assocs ends) $ \(cId, qEvts) -> - mapM_ (queueEvts qEvts) . join . IM.lookup cId =<< readTVarIO cls - queueEvts qEvts (AClient _ _ c@Client {connected, sndQ = q}) = - whenM (readTVarIO connected) $ do - sent <- atomically $ tryWriteTBQueue q ts - if sent - then updateEndStats - else -- if queue is full it can block - forkClient c ("sendPendingEvtsThread.queueEvts") $ - atomically (writeTBQueue q ts) >> updateEndStats + sendPending ServerSubscribers {pendingEvents} stats = do + pending <- atomically $ swapTVar pendingEvents IM.empty + unless (null pending) $ forM_ (IM.assocs pending) $ \(cId, evts) -> + getServerClient cId srv >>= mapM_ (enqueueEvts evts) where - ts = L.map (\(qId, subscribed) -> (CorrId "", qId, evt subscribed)) qEvts - evt True = END - evt False = DELD - -- this accounts for both END and DELD events - updateEndStats = do - stats <- asks serverStats - let len = L.length qEvts - when (len > 0) $ liftIO $ do - atomicModifyIORef'_ (qSubEnd stats) (+ len) - atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1)) -- up to 255 ENDs or DELDs in the batch + enqueueEvts evts (AClient _ _ Client {connected, sndQ}) = + whenM (readTVarIO connected) $ + nonBlockingWriteTBQueue sndQ ts >> updateEndStats + where + ts = L.map (\(qId, evt) -> (CorrId "", qId, evt)) evts + -- this accounts for both END and DELD events + updateEndStats = do + let len = L.length evts + when (len > 0) $ do + atomicModifyIORef'_ (qSubEnd stats) (+ len) + atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1)) -- up to 255 ENDs or DELDs in the batch receiveFromProxyAgent :: ProxyAgent -> M () receiveFromProxyAgent ProxyAgent {smpAgent = SMPClientAgent {agentQ}} = @@ -581,20 +571,23 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount, rtsOptions} getRealTimeMetrics :: Env -> IO RealTimeMetrics - getRealTimeMetrics Env {clients, sockets, msgStore = AMS _ _ ms, server = Server {subscribers, notifiers, subClients, ntfSubClients}} = do + getRealTimeMetrics Env {sockets, msgStore = AMS _ _ ms, server = srv@Server {subscribers, ntfSubscribers}} = do socketStats <- mapM (traverse getSocketStats) =<< readTVarIO sockets #if MIN_VERSION_base(4,18,0) threadsCount <- length <$> listThreads #else let threadsCount = 0 #endif - clientsCount <- IM.size <$> readTVarIO clients - smpSubsCount <- M.size <$> readTVarIO subscribers - smpSubClientsCount <- IM.size <$> readTVarIO subClients - ntfSubsCount <- M.size <$> readTVarIO notifiers - ntfSubClientsCount <- IM.size <$> readTVarIO ntfSubClients + clientsCount <- IM.size <$> getServerClients srv + smpSubs <- getSubscribersMetrics subscribers + ntfSubs <- getSubscribersMetrics ntfSubscribers loadedCounts <- loadedQueueCounts ms - pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount, loadedCounts} + pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubs, ntfSubs, loadedCounts} + where + getSubscribersMetrics ServerSubscribers {queueSubscribers, subClients} = do + subsCount <- M.size <$> getSubscribedClients queueSubscribers + subClientsCount <- IS.size <$> readTVarIO subClients + pure RTSubscriberMetrics {subsCount, subClientsCount} runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M () runClient signKey tp h = do @@ -653,9 +646,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt CPSuspend -> withAdminRole $ hPutStrLn h "suspend not implemented" CPResume -> withAdminRole $ hPutStrLn h "resume not implemented" CPClients -> withAdminRole $ do - active <- unliftIO u (asks clients) >>= readTVarIO + cls <- getServerClients srv hPutStrLn h "clientId,sessionId,connected,createdAt,rcvActiveAt,sndActiveAt,age,subscriptions" - forM_ (IM.toList active) $ \(cid, cl) -> forM_ cl $ \(AClient _ _ Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions}) -> do + forM_ (IM.toList cls) $ \(cid, (AClient _ _ Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions})) -> do connected' <- bshow <$> readTVarIO connected rcvActiveAt' <- strEncode <$> readTVarIO rcvActiveAt sndActiveAt' <- strEncode <$> readTVarIO sndActiveAt @@ -767,8 +760,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt #else hPutStrLn h "Threads: not available on GHC 8.10" #endif - Env {clients, server = Server {subscribers, notifiers, subClients, ntfSubClients}} <- unliftIO u ask - activeClients <- readTVarIO clients + let Server {subscribers, ntfSubscribers} = srv + activeClients <- getServerClients srv hPutStrLn h $ "Clients: " <> show (IM.size activeClients) when (r == CPRAdmin) $ do clQs <- clientTBQueueLengths' activeClients @@ -782,30 +775,25 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt hPutStrLn h $ "Ntf subscriptions (via clients): " <> show ntfSubCnt hPutStrLn h $ "Ntf subscribed clients (via clients): " <> show ntfClCnt hPutStrLn h $ "Ntf subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show ntfClQs - putActiveClientsInfo "SMP" subscribers False - putActiveClientsInfo "Ntf" notifiers True - putSubscribedClients "SMP" subClients False - putSubscribedClients "Ntf" ntfSubClients True + putSubscribersInfo "SMP" subscribers False + putSubscribersInfo "Ntf" ntfSubscribers True where - putActiveClientsInfo :: String -> TMap QueueId (TVar AClient) -> Bool -> IO () - putActiveClientsInfo protoName clients showIds = do - activeSubs <- readTVarIO clients + putSubscribersInfo :: String -> ServerSubscribers -> Bool -> IO () + putSubscribersInfo protoName ServerSubscribers {queueSubscribers, subClients} showIds = do + activeSubs <- getSubscribedClients queueSubscribers hPutStrLn h $ protoName <> " subscriptions: " <> show (M.size activeSubs) clnts <- countSubClients activeSubs hPutStrLn h $ protoName <> " subscribed clients: " <> show (IS.size clnts) <> (if showIds then " " <> show (IS.toList clnts) else "") + clnts' <- readTVarIO subClients + hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IS.size clnts') <> (if showIds then " " <> show clnts' else "") where - countSubClients :: M.Map QueueId (TVar AClient) -> IO IS.IntSet - countSubClients = foldM (\ !s c -> (`IS.insert` s) . clientId' <$> readTVarIO c) IS.empty - putSubscribedClients :: String -> TVar (IM.IntMap AClient) -> Bool -> IO () - putSubscribedClients protoName subClnts showIds = do - clnts <- readTVarIO subClnts - hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IM.size clnts) <> (if showIds then " " <> show (IM.keys clnts) else "") - countClientSubs :: (forall s. Client s -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap (Maybe AClient) -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) + countSubClients :: M.Map QueueId (TVar (Maybe AClient)) -> IO IS.IntSet + countSubClients = foldM (\ !s c -> maybe s ((`IS.insert` s) . clientId') <$> readTVarIO c) IS.empty + countClientSubs :: (forall s. Client s -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) countClientSubs subSel countSubs_ = foldM addSubs (0, (0, 0, 0, 0), 0, (0, 0, 0)) where - addSubs :: (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) -> Maybe AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) - addSubs acc Nothing = pure acc - addSubs (!subCnt, cnts@(!c1, !c2, !c3, !c4), !clCnt, !qs) (Just acl@(AClient _ _ cl)) = do + addSubs :: (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) -> AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) + addSubs (!subCnt, cnts@(!c1, !c2, !c3, !c4), !clCnt, !qs) acl@(AClient _ _ cl) = do subs <- readTVarIO $ subSel cl cnts' <- case countSubs_ of Nothing -> pure cnts @@ -816,8 +804,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt clCnt' = if cnt == 0 then clCnt else clCnt + 1 qs' <- if cnt == 0 then pure qs else addQueueLengths qs acl pure (subCnt + cnt, cnts', clCnt', qs') - clientTBQueueLengths' :: Foldable t => t (Maybe AClient) -> IO (Natural, Natural, Natural) - clientTBQueueLengths' = foldM (\acc -> maybe (pure acc) (addQueueLengths acc)) (0, 0, 0) + clientTBQueueLengths' :: Foldable t => t AClient -> IO (Natural, Natural, Natural) + clientTBQueueLengths' = foldM addQueueLengths (0, 0, 0) addQueueLengths (!rl, !sl, !ml) (AClient _ _ cl) = do (rl', sl', ml') <- queueLengths cl pure (rl + rl', sl + sl', ml + ml') @@ -896,30 +884,28 @@ runClientTransport :: Transport c => THandleSMP c 'TServer -> M () runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessionId}} = do q <- asks $ tbqSize . config ts <- liftIO getSystemTime - active <- asks clients nextClientId <- asks clientSeq clientId <- atomically $ stateTVar nextClientId $ \next -> (next, next + 1) - atomically $ modifyTVar' active $ IM.insert clientId Nothing AMS qt mt ms <- asks msgStore c <- liftIO $ newClient qt mt clientId q thVersion sessionId ts - runClientThreads qt mt ms active c clientId `finally` clientDisconnected c + runClientThreads qt mt ms c `finally` clientDisconnected c where - runClientThreads :: MsgStoreClass (MsgStore qs ms) => SQSType qs -> SMSType ms -> MsgStore qs ms -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore qs ms) -> IS.Key -> M () - runClientThreads qt mt ms active c clientId = do - atomically $ modifyTVar' active $ IM.insert clientId $ Just (AClient qt mt c) + runClientThreads :: MsgStoreClass (MsgStore qs ms) => SQSType qs -> SMSType ms -> MsgStore qs ms -> Client (MsgStore qs ms) -> M () + runClientThreads qt mt ms c = do s <- asks server - expCfg <- asks $ inactiveClientExpiration . config - th <- newMVar h -- put TH under a fair lock to interleave messages and command responses - labelMyThread . B.unpack $ "client $" <> encode sessionId - raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams s ms c, receive h ms c] <> disconnectThread_ c s expCfg + whenM (liftIO $ insertServerClient (AClient qt mt c) s) $ do + expCfg <- asks $ inactiveClientExpiration . config + th <- newMVar h -- put TH under a fair lock to interleave messages and command responses + labelMyThread . B.unpack $ "client $" <> encode sessionId + raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams s ms c, receive h ms c] <> disconnectThread_ c s expCfg disconnectThread_ :: Client s -> Server -> Maybe ExpirationConfig -> [M ()] disconnectThread_ c s (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c s)] disconnectThread_ _ _ _ = [] - noSubscriptions Client {clientId} s = do - hasSubs <- IM.member clientId <$> readTVarIO (subClients s) + noSubscriptions Client {clientId} Server {subscribers, ntfSubscribers} = do + hasSubs <- IS.member clientId <$> readTVarIO (subClients subscribers) if hasSubs then pure False - else not . IM.member clientId <$> readTVarIO (ntfSubClients s) + else not . IS.member clientId <$> readTVarIO (subClients ntfSubscribers) clientDisconnected :: Client s -> M () clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connected, sessionId, endThreads} = do @@ -931,26 +917,17 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connecte ntfSubs <- atomically $ swapTVar ntfSubscriptions M.empty liftIO $ mapM_ cancelSub subs whenM (asks serverActive >>= readTVarIO) $ do - Server {subscribers, notifiers, subClients, ntfSubClients} <- asks server + srv@Server {subscribers, ntfSubscribers} <- asks server liftIO $ updateSubscribers subs subscribers - liftIO $ updateSubscribers ntfSubs notifiers - asks clients >>= atomically . (`modifyTVar'` IM.delete clientId) - atomically $ modifyTVar' subClients $ IM.delete clientId - atomically $ modifyTVar' ntfSubClients $ IM.delete clientId + liftIO $ updateSubscribers ntfSubs ntfSubscribers + liftIO $ deleteServerClient clientId srv tIds <- atomically $ swapTVar endThreads IM.empty liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds where - updateSubscribers :: M.Map QueueId a -> TMap QueueId (TVar AClient) -> IO () - updateSubscribers subs srvSubs = - forM_ (M.keys subs) $ \qId -> - -- lookup of the subscribed client TVar can be in separate transaction, - -- as long as the client is read in the same transaction - - -- it prevents removing the next subscribed client. - TM.lookupIO qId srvSubs >>= - mapM_ (\c' -> atomically $ whenM (sameClientId c <$> readTVar c') $ TM.delete qId srvSubs) - -sameClientId :: Client s -> AClient -> Bool -sameClientId Client {clientId} ac = clientId == clientId' ac + updateSubscribers :: M.Map QueueId a -> ServerSubscribers -> IO () + updateSubscribers subs ServerSubscribers {queueSubscribers, subClients} = do + mapM_ (\qId -> deleteSubcribedClient qId c queueSubscribers) (M.keys subs) + atomically $ modifyTVar' subClients $ IS.delete clientId cancelSub :: Sub -> IO () cancelSub s = case subThread s of @@ -1151,7 +1128,7 @@ forkClient Client {endThreads, endThreadSeq} label action = do client :: forall s. MsgStoreClass s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M () client thParams' - Server {subscribedQ, ntfSubscribedQ, subscribers} + Server {subscribers, ntfSubscribers} ms clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" @@ -1372,7 +1349,7 @@ client Left e -> pure $ ERR e Right nId_ -> do incStat . ntfCreated =<< asks serverStats - forM_ nId_ $ \nId -> atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False) + forM_ nId_ $ \nId -> atomically $ writeTQueue (subQ ntfSubscribers) (nId, clientId, False) pure $ NID notifierId rcvPublicDhKey deleteQueueNotifier_ :: StoreQueue s -> M (Transmission BrokerMsg) @@ -1383,7 +1360,7 @@ client stats <- asks serverStats deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId) when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted) - atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False) + atomically $ writeTQueue (subQ ntfSubscribers) (nId, clientId, False) incStat $ ntfDeleted stats pure ok Right Nothing -> pure ok @@ -1394,7 +1371,7 @@ client subscribeQueue :: StoreQueue s -> QueueRec -> M (Transmission BrokerMsg) subscribeQueue q qr = - atomically (TM.lookup rId subscriptions) >>= \case + liftIO (TM.lookupIO rId subscriptions) >>= \case Nothing -> newSub >>= deliver True Just s@Sub {subThread} -> do stats <- asks serverStats @@ -1410,7 +1387,7 @@ client rId = recipientId q newSub :: M Sub newSub = time "SUB newSub" . atomically $ do - writeTQueue subscribedQ (rId, clientId, True) + writeTQueue (subQ subscribers) (rId, clientId, True) sub <- newSubscription NoSub TM.insert rId sub subscriptions pure sub @@ -1486,7 +1463,7 @@ client pure ok where newSub = do - writeTQueue ntfSubscribedQ (entId, clientId, True) + writeTQueue (subQ ntfSubscribers) (entId, clientId, True) TM.insert entId () ntfSubscriptions acknowledgeMsg :: MsgId -> StoreQueue s -> QueueRec -> M (Transmission BrokerMsg) @@ -1522,7 +1499,7 @@ client incStat $ msgRecv stats if isGet then incStat $ msgRecvGet stats - else pure () -- TODO skip notification delivery for delivered message + else pure () -- TODO skip notification delivery for delivered message -- skipping delivery fails tests, it should be counted in msgNtfSkipped -- forM_ (notifierId <$> notifier qr) $ \nId -> do -- ns <- asks ntfStore @@ -1595,18 +1572,19 @@ client -- - nothing was delivered to this subscription (to avoid race conditions with the recipient). tryDeliverMessage :: Message -> IO () tryDeliverMessage msg = - -- the subscription is checked outside of STM to avoid transaction cost + -- the subscribed client var is read outside of STM to avoid transaction cost -- in case no client is subscribed. - whenM (TM.memberIO rId subscribers) $ - atomically deliverToSub >>= mapM_ forkDeliver + getSubscribedClient rId (queueSubscribers subscribers) + $>>= atomically . deliverToSub + >>= mapM_ forkDeliver where rId = recipientId q - deliverToSub = - -- lookup has ot be in the same transaction, + deliverToSub rcv = + -- reading client TVar in the same transaction, -- so that if subscription ends, it re-evalutates -- and delivery is cancelled - -- the new client will receive message in response to SUB. - (TM.lookup rId subscribers >>= mapM readTVar) + readTVar rcv $>>= \rc@(AClient _ _ Client {subscriptions = subs, sndQ = sndQ'}) -> TM.lookup rId subs $>>= \s@Sub {subThread, delivered} -> case subThread of ProhibitSub -> pure Nothing @@ -1635,9 +1613,9 @@ client labelMyThread $ B.unpack ("client $" <> encode sessionId) <> " deliver/SEND" -- lookup can be outside of STM transaction, -- as long as the check that it is the same client is inside. - TM.lookupIO rId subscribers >>= mapM_ deliverIfSame - deliverIfSame rc' = time "deliver" . atomically $ - whenM (sameClientId rc <$> readTVar rc') $ + getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame + deliverIfSame rcv = time "deliver" . atomically $ + whenM (sameClient rc rcv) $ tryTakeTMVar delivered >>= \case Just _ -> pure () -- if a message was already delivered, should not deliver more Nothing -> do @@ -1750,7 +1728,7 @@ client Right qr -> do -- Possibly, the same should be done if the queue is suspended, but currently we do not use it atomically $ do - writeTQueue subscribedQ (entId, clientId, False) + writeTQueue (subQ subscribers) (entId, clientId, False) -- queue is usually deleted by the same client that is currently subscribed, -- we delete subscription here, so the client with no subscriptions can be disconnected. TM.delete entId subscriptions @@ -1760,7 +1738,7 @@ client stats <- asks serverStats deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId) when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted) - atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False) + atomically $ writeTQueue (subQ ntfSubscribers) (nId, clientId, False) updateDeletedStats qr pure ok Left e -> pure $ err e @@ -1985,7 +1963,7 @@ restoreServerNtfs = renameFile f $ f <> ".bak" let NtfStore ns' = ns storedQueues <- M.size <$> readTVarIO ns' - logNote $ "notifications restored, " <> tshow lineCount <> " lines processed" + logNote $ "notifications restored, " <> tshow lineCount <> " lines processed" pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues} where restoreNtf :: NtfStore -> Int64 -> (Int, Int, Int) -> LB.ByteString -> ExceptT String IO (Int, Int, Int) diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 8895ba8ed..12d03c8f8 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -18,7 +18,59 @@ #endif {-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} -module Simplex.Messaging.Server.Env.STM where +module Simplex.Messaging.Server.Env.STM + ( ServerConfig (..), + ServerStoreCfg (..), + AServerStoreCfg (..), + StorePaths (..), + StartOptions (..), + Env (..), + Server (..), + ServerSubscribers (..), + SubscribedClients, + ProxyAgent (..), + Client (..), + AClient (..), + ClientId, + Subscribed, + Sub (..), + ServerSub (..), + SubscriptionThread (..), + MsgStore, + AMsgStore (..), + AStoreType (..), + newEnv, + mkJournalStoreConfig, + newClient, + getServerClients, + getServerClient, + insertServerClient, + deleteServerClient, + getSubscribedClients, + getSubscribedClient, + upsertSubscribedClient, + lookupDeleteSubscribedClient, + deleteSubcribedClient, + sameClientId, + sameClient, + clientId', + newSubscription, + newProhibitedSub, + defaultMsgQueueQuota, + defMsgExpirationDays, + defNtfExpirationHours, + defaultMessageExpiration, + defaultNtfExpiration, + defaultInactiveClientExpiration, + defaultProxyClientConcurrency, + defaultMaxJournalMsgCount, + defaultMaxJournalStateLines, + defaultIdleQueueInterval, + journalMsgStoreDepth, + readWriteQueueStore, + noPostgresExit, + ) +where import Control.Concurrent (ThreadId) import Control.Logger.Simple @@ -29,9 +81,12 @@ import Data.ByteString.Char8 (ByteString) import Data.Int (Int64) import Data.IntMap.Strict (IntMap) import qualified Data.IntMap.Strict as IM +import Data.IntSet (IntSet) +import qualified Data.IntSet as IS import Data.Kind (Constraint) import Data.List (intercalate) import Data.List.NonEmpty (NonEmpty) +import Data.Map.Strict (Map) import Data.Maybe (isJust) import qualified Data.Text as T import Data.Time.Clock (getCurrentTime, nominalDay) @@ -66,6 +121,7 @@ import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ATransport, VersionRangeSMP, VersionSMP) import Simplex.Messaging.Transport.Server +import Simplex.Messaging.Util (ifM, whenM, ($>>=)) import System.Directory (doesFileExist) import System.Exit (exitFailure) import System.IO (IOMode (..)) @@ -203,7 +259,6 @@ data Env = Env serverStats :: ServerStats, sockets :: TVar [(ServiceName, SocketState)], clientSeq :: TVar ClientId, - clients :: TVar (IntMap (Maybe AClient)), proxyAgent :: ProxyAgent -- senders served on this proxy } @@ -236,17 +291,72 @@ data AMsgStore = type Subscribed = Bool data Server = Server - { subscribedQ :: TQueue (RecipientId, ClientId, Subscribed), - subscribers :: TMap RecipientId (TVar AClient), - ntfSubscribedQ :: TQueue (NotifierId, ClientId, Subscribed), - notifiers :: TMap NotifierId (TVar AClient), - subClients :: TVar (IntMap AClient), -- clients with SMP subscriptions - ntfSubClients :: TVar (IntMap AClient), -- clients with Ntf subscriptions - pendingSubEvents :: TVar (IntMap (NonEmpty (RecipientId, Subscribed))), - pendingNtfSubEvents :: TVar (IntMap (NonEmpty (NotifierId, Subscribed))), + { clients :: ServerClients, + subscribers :: ServerSubscribers, + ntfSubscribers :: ServerSubscribers, savingLock :: Lock } +-- not exported, to prevent concurrent IntMap lookups inside STM transactions. +newtype ServerClients = ServerClients {serverClients :: TVar (IntMap AClient)} + +data ServerSubscribers = ServerSubscribers + { subQ :: TQueue (QueueId, ClientId, Subscribed), + queueSubscribers :: SubscribedClients, + subClients :: TVar IntSet, + pendingEvents :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg))) + } + +-- not exported, to prevent accidental concurrent Map lookups inside STM transactions. +-- Map stores TVars with pointers to the clients rather than client ID to allow reading the same TVar +-- inside transactions to ensure that transaction is re-evaluated in case subscriber changes. +-- Storing Maybe allows to have continuity of subscription when the same user client disconnects and re-connects - +-- any STM transaction that reads subscribed client will re-evaluate in this case. +-- The subscriptions that were made at any point are not removed - +-- this is a better trade-off with intermittently connected mobile clients. +data SubscribedClients = SubscribedClients (TMap EntityId (TVar (Maybe AClient))) + +getSubscribedClients :: SubscribedClients -> IO (Map EntityId (TVar (Maybe AClient))) +getSubscribedClients (SubscribedClients cs) = readTVarIO cs + +getSubscribedClient :: EntityId -> SubscribedClients -> IO (Maybe (TVar (Maybe AClient))) +getSubscribedClient entId (SubscribedClients cs) = TM.lookupIO entId cs +{-# INLINE getSubscribedClient #-} + +-- insert subscribed and current client, return previously subscribed client if it is different +upsertSubscribedClient :: EntityId -> AClient -> SubscribedClients -> STM (Maybe AClient) +upsertSubscribedClient entId ac@(AClient _ _ c) (SubscribedClients cs) = + TM.lookup entId cs >>= \case + Nothing -> Nothing <$ TM.insertM entId (newTVar (Just ac)) cs + Just cv -> + readTVar cv >>= \case + Just c' | sameClientId c c' -> pure Nothing + c_ -> c_ <$ writeTVar cv (Just ac) + +-- lookup and delete currently subscribed client +lookupDeleteSubscribedClient :: EntityId -> SubscribedClients -> STM (Maybe AClient) +lookupDeleteSubscribedClient entId (SubscribedClients cs) = + TM.lookupDelete entId cs $>>= (`swapTVar` Nothing) + +deleteSubcribedClient :: EntityId -> Client s -> SubscribedClients -> IO () +deleteSubcribedClient entId c (SubscribedClients cs) = + -- lookup of the subscribed client TVar can be in separate transaction, + -- as long as the client is read in the same transaction - + -- it prevents removing the next subscribed client and also avoids STM contention for the Map. + TM.lookupIO entId cs >>= mapM_ (\cv -> atomically $ whenM (sameClient c cv) $ delete cv) + where + delete cv = do + writeTVar cv Nothing + TM.delete entId cs + +sameClientId :: Client s -> AClient -> Bool +sameClientId Client {clientId} ac = clientId == clientId' ac +{-# INLINE sameClientId #-} + +sameClient :: Client s -> TVar (Maybe AClient) -> STM Bool +sameClient c cv = maybe False (sameClientId c) <$> readTVar cv +{-# INLINE sameClient #-} + newtype ProxyAgent = ProxyAgent { smpAgent :: SMPClientAgent } @@ -288,16 +398,40 @@ data Sub = Sub newServer :: IO Server newServer = do - subscribedQ <- newTQueueIO - subscribers <- TM.emptyIO - ntfSubscribedQ <- newTQueueIO - notifiers <- TM.emptyIO - subClients <- newTVarIO IM.empty - ntfSubClients <- newTVarIO IM.empty - pendingSubEvents <- newTVarIO IM.empty - pendingNtfSubEvents <- newTVarIO IM.empty + clients <- ServerClients <$> newTVarIO mempty + subscribers <- newServerSubscribers + ntfSubscribers <- newServerSubscribers savingLock <- createLockIO - return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, subClients, ntfSubClients, pendingSubEvents, pendingNtfSubEvents, savingLock} + return Server {clients, subscribers, ntfSubscribers, savingLock} + +getServerClients :: Server -> IO (IntMap AClient) +getServerClients = readTVarIO . serverClients . clients +{-# INLINE getServerClients #-} + +getServerClient :: ClientId -> Server -> IO (Maybe AClient) +getServerClient cId s = IM.lookup cId <$> getServerClients s +{-# INLINE getServerClient #-} + +insertServerClient :: AClient -> Server -> IO Bool +insertServerClient ac@(AClient _ _ Client {clientId, connected}) Server {clients} = + atomically $ + ifM + (readTVar connected) + (True <$ modifyTVar' (serverClients clients) (IM.insert clientId ac)) + (pure False) +{-# INLINE insertServerClient #-} + +deleteServerClient :: ClientId -> Server -> IO () +deleteServerClient cId Server {clients} = atomically $ modifyTVar' (serverClients clients) $ IM.delete cId +{-# INLINE deleteServerClient #-} + +newServerSubscribers :: IO ServerSubscribers +newServerSubscribers = do + subQ <- newTQueueIO + queueSubscribers <- SubscribedClients <$> TM.emptyIO + subClients <- newTVarIO IS.empty + pendingEvents <- newTVarIO IM.empty + pure ServerSubscribers {subQ, queueSubscribers, subClients, pendingEvents} newClient :: SQSType qs -> SMSType ms -> ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> IO (Client (MsgStore qs ms)) newClient _ _ clientId qSize thVersion sessionId createdAt = do @@ -312,7 +446,24 @@ newClient _ _ clientId qSize thVersion sessionId createdAt = do connected <- newTVarIO True rcvActiveAt <- newTVarIO createdAt sndActiveAt <- newTVarIO createdAt - return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, procThreads, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt} + return + Client + { clientId, + subscriptions, + ntfSubscriptions, + rcvQ, + sndQ, + msgQ, + procThreads, + endThreads, + endThreadSeq, + thVersion, + sessionId, + connected, + createdAt, + rcvActiveAt, + sndActiveAt + } newSubscription :: SubscriptionThread -> STM Sub newSubscription st = do @@ -362,9 +513,24 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp serverStats <- newServerStats =<< getCurrentTime sockets <- newTVarIO [] clientSeq <- newTVarIO 0 - clients <- newTVarIO mempty proxyAgent <- newSMPProxyAgent smpAgentCfg random - pure Env {serverActive, config, serverInfo, server, serverIdentity, msgStore, ntfStore, random, tlsServerCreds, httpServerCreds, serverStats, sockets, clientSeq, clients, proxyAgent} + pure + Env + { serverActive, + config, + serverInfo, + server, + serverIdentity, + msgStore, + ntfStore, + random, + tlsServerCreds, + httpServerCreds, + serverStats, + sockets, + clientSeq, + proxyAgent + } where loadStoreLog :: StoreQueueClass q => (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO () loadStoreLog mkQ f st = do diff --git a/src/Simplex/Messaging/Server/Prometheus.hs b/src/Simplex/Messaging/Server/Prometheus.hs index 39dbc854f..2aea7ac6a 100644 --- a/src/Simplex/Messaging/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Server/Prometheus.hs @@ -33,13 +33,16 @@ data RealTimeMetrics = RealTimeMetrics { socketStats :: [(ServiceName, SocketStats)], threadsCount :: Int, clientsCount :: Int, - smpSubsCount :: Int, - smpSubClientsCount :: Int, - ntfSubsCount :: Int, - ntfSubClientsCount :: Int, + smpSubs :: RTSubscriberMetrics, + ntfSubs :: RTSubscriberMetrics, loadedCounts :: LoadedQueueCounts } +data RTSubscriberMetrics = RTSubscriberMetrics + { subsCount :: Int, + subClientsCount :: Int + } + {-# FOURMOLU_DISABLE\n#-} prometheusMetrics :: ServerMetrics -> RealTimeMetrics -> UTCTime -> Text prometheusMetrics sm rtm ts = @@ -50,10 +53,8 @@ prometheusMetrics sm rtm ts = { socketStats, threadsCount, clientsCount, - smpSubsCount, - smpSubClientsCount, - ntfSubsCount, - ntfSubClientsCount, + smpSubs, + ntfSubs, loadedCounts } = rtm ServerStatsData @@ -367,21 +368,21 @@ prometheusMetrics sm rtm ts = \# TYPE simplex_smp_clients_total gauge\n\ \simplex_smp_clients_total " <> mshow clientsCount <> "\n\ \\n\ - \# HELP simplex_smp_subscribtion_total Total subscriptions\n\ + \# HELP simplex_smp_subscribtion_total Total SMP subscriptions\n\ \# TYPE simplex_smp_subscribtion_total gauge\n\ - \simplex_smp_subscribtion_total " <> mshow smpSubsCount <> "\n# smpSubs\n\ + \simplex_smp_subscribtion_total " <> mshow (subsCount smpSubs) <> "\n# smp.subsCount\n\ \\n\ - \# HELP simplex_smp_subscribtion_clients_total Subscribed clients, first counting method\n\ + \# HELP simplex_smp_subscribtion_clients_total Subscribed clients\n\ \# TYPE simplex_smp_subscribtion_clients_total gauge\n\ - \simplex_smp_subscribtion_clients_total " <> mshow smpSubClientsCount <> "\n# smpSubClients\n\ + \simplex_smp_subscribtion_clients_total " <> mshow (subClientsCount smpSubs) <> "\n# smp.subClientsCount\n\ \\n\ \# HELP simplex_smp_subscription_ntf_total Total notification subscripbtions (from ntf server)\n\ \# TYPE simplex_smp_subscription_ntf_total gauge\n\ - \simplex_smp_subscription_ntf_total " <> mshow ntfSubsCount <> "\n# ntfSubs\n\ + \simplex_smp_subscription_ntf_total " <> mshow (subsCount ntfSubs) <> "\n# ntf.subsCount\n\ \\n\ - \# HELP simplex_smp_subscription_ntf_clients_total Total subscribed NTF servers, first counting method\n\ + \# HELP simplex_smp_subscription_ntf_clients_total Total subscribed NTF servers\n\ \# TYPE simplex_smp_subscription_ntf_clients_total gauge\n\ - \simplex_smp_subscription_ntf_clients_total " <> mshow ntfSubClientsCount <> "\n# ntfSubClients\n\ + \simplex_smp_subscription_ntf_clients_total " <> mshow (subClientsCount ntfSubs) <> "\n# ntf.subClientsCount\n\ \\n\ \# HELP simplex_smp_loaded_queues_queue_count Total loaded queues count (all queues for memory/journal storage)\n\ \# TYPE simplex_smp_loaded_queues_queue_count gauge\n\ diff --git a/src/Simplex/Messaging/TMap.hs b/src/Simplex/Messaging/TMap.hs index 02d20b695..b743ce7bc 100644 --- a/src/Simplex/Messaging/TMap.hs +++ b/src/Simplex/Messaging/TMap.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE TupleSections #-} + module Simplex.Messaging.TMap ( TMap, emptyIO, @@ -11,7 +13,6 @@ module Simplex.Messaging.TMap insert, insertM, delete, - lookupInsert, lookupDelete, adjust, update, @@ -71,12 +72,8 @@ delete :: Ord k => k -> TMap k a -> STM () delete k m = modifyTVar' m $ M.delete k {-# INLINE delete #-} -lookupInsert :: Ord k => k -> a -> TMap k a -> STM (Maybe a) -lookupInsert k v m = stateTVar m $ \mv -> (M.lookup k mv, M.insert k v mv) -{-# INLINE lookupInsert #-} - lookupDelete :: Ord k => k -> TMap k a -> STM (Maybe a) -lookupDelete k m = stateTVar m $ \mv -> (M.lookup k mv, M.delete k mv) +lookupDelete k m = stateTVar m $ M.alterF (,Nothing) k {-# INLINE lookupDelete #-} adjust :: Ord k => (a -> a) -> k -> TMap k a -> STM () diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index 07f806b56..eed1580ae 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -16,7 +16,7 @@ import AgentTests.ServerChoice (serverChoiceTests) import AgentTests.ShortLinkTests (shortLinkTests) import Simplex.Messaging.Server.Env.STM (AStoreType (..)) import Simplex.Messaging.Transport (ATransport (..)) -import Test.Hspec +import Test.Hspec hiding (fit, it) #if defined(dbPostgres) import Fixtures @@ -47,7 +47,7 @@ agentTests ps = do #endif describe "Functional API" $ functionalAPITests ps describe "Chosen servers" serverChoiceTests -#if defined(dbServerPostgres) +#if defined(dbServerPostgres) around_ (postgressBracket ntfTestServerDBConnectInfo) $ describe "Notification tests" $ notificationTests ps #endif diff --git a/tests/AgentTests/ConnectionRequestTests.hs b/tests/AgentTests/ConnectionRequestTests.hs index 1782d3ccd..13ee3e156 100644 --- a/tests/AgentTests/ConnectionRequestTests.hs +++ b/tests/AgentTests/ConnectionRequestTests.hs @@ -28,7 +28,8 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (EntityId (..), ProtocolServer (..), QueueMode (..), currentSMPClientVersion, supportedSMPClientVRange, pattern VersionSMPC) import Simplex.Messaging.ServiceScheme (ServiceScheme (..)) import Simplex.Messaging.Version -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util srv :: SMPServer srv = SMPServer "smp.simplex.im,jjbyvoemxysm7qxap7m5d5m35jzv5qq6gnlv7s4rsn7tdwwmuqciwpid.onion" "5223" (C.KeyHash "\215m\248\251") @@ -288,7 +289,7 @@ connectionRequestTests = smpEncodingTest queueV1NoPort smpEncodingTest connectionRequest -- smpEncodingTest connectionRequestNoQM -- this fails, because of queue mode patch - smpEncodingTest connectionRequestContact -- this passes because of queue mode patch in ConnReqUriData encoding + smpEncodingTest connectionRequestContact -- this passes because of queue mode patch in ConnReqUriData encoding smpEncodingTest connectionRequest1 smpEncodingTest connectionRequest2queues smpEncodingTest connectionRequestNew @@ -334,12 +335,12 @@ connectionRequestTests = restoreShortLink [srv] (contact srv2 (LinkKey "0123456789abcdef0123456789abcdef")) `shouldBe` contact srv2 (LinkKey "0123456789abcdef0123456789abcdef") Right (lnk :: ConnShortLink 'CMContact) <- pure $ strDecode "https://localhost/a#4AkRDmhf64tdRlN406g8lJRg5OCmhD6ynIhi6glOcCM?p=7001&c=LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI" - Right (lnk' :: ConnShortLink 'CMContact) <- pure $ strDecode "https://localhost/a#4AkRDmhf64tdRlN406g8lJRg5OCmhD6ynIhi6glOcCM" + Right (lnk' :: ConnShortLink 'CMContact) <- pure $ strDecode "https://localhost/a#4AkRDmhf64tdRlN406g8lJRg5OCmhD6ynIhi6glOcCM" let presetSrv :: SMPServer = "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7001" shortenShortLink [presetSrv] lnk `shouldBe` lnk' restoreShortLink [presetSrv] lnk' `shouldBe` lnk Right (inv :: ConnShortLink 'CMInvitation) <- pure $ strDecode "https://localhost/i#tnUaHYp8saREmyEHR93SBpl8ySHBchOt/LJ1ZQUzxH9Udb0jw5wmJACv5o6oe8e7BsX_hUCUMTSY?p=7001&c=LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI" - Right (inv' :: ConnShortLink 'CMInvitation) <- pure $ strDecode "https://localhost/i#tnUaHYp8saREmyEHR93SBpl8ySHBchOt/LJ1ZQUzxH9Udb0jw5wmJACv5o6oe8e7BsX_hUCUMTSY" + Right (inv' :: ConnShortLink 'CMInvitation) <- pure $ strDecode "https://localhost/i#tnUaHYp8saREmyEHR93SBpl8ySHBchOt/LJ1ZQUzxH9Udb0jw5wmJACv5o6oe8e7BsX_hUCUMTSY" shortenShortLink [presetSrv] inv `shouldBe` inv' restoreShortLink [presetSrv] inv' `shouldBe` inv where diff --git a/tests/AgentTests/DoubleRatchetTests.hs b/tests/AgentTests/DoubleRatchetTests.hs index ac42c73ad..eef5be27f 100644 --- a/tests/AgentTests/DoubleRatchetTests.hs +++ b/tests/AgentTests/DoubleRatchetTests.hs @@ -26,13 +26,14 @@ import qualified Data.Map.Strict as M import Data.Type.Equality import Simplex.Messaging.Crypto (Algorithm (..), AlgorithmI, CryptoError, DhAlgorithm) import qualified Simplex.Messaging.Crypto as C -import Simplex.Messaging.Crypto.SNTRUP761.Bindings import Simplex.Messaging.Crypto.Ratchet +import Simplex.Messaging.Crypto.SNTRUP761.Bindings import Simplex.Messaging.Encoding import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Util ((<$$>)) import Simplex.Messaging.Version -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util doubleRatchetTests :: Spec doubleRatchetTests = do @@ -82,7 +83,6 @@ runMessageTests initRatchets_ agreeRatchetKEMs = do withRatchets_ @X25519 initRatchets_ test withRatchets_ @X448 initRatchets_ test - testAlgs :: (forall a. (AlgorithmI a, DhAlgorithm a) => C.SAlgorithm a -> IO ()) -> IO () testAlgs test = test C.SX25519 >> test C.SX448 diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index a3a8d7056..0f2774862 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -108,7 +108,7 @@ import Simplex.Messaging.Version (VersionRange (..)) import qualified Simplex.Messaging.Version as V import Simplex.Messaging.Version.Internal (Version (..)) import System.Directory (copyFile, renameFile) -import Test.Hspec +import Test.Hspec hiding (fit, it) import UnliftIO import Util import XFTPClient (testXFTPServer) @@ -3577,7 +3577,7 @@ exchangeGreetingsMsgId_ :: HasCallStack => PQEncryption -> Int64 -> AgentClient exchangeGreetingsMsgId_ = exchangeGreetingsViaProxyMsgId_ False exchangeGreetingsViaProxy :: HasCallStack => Bool -> AgentClient -> ConnId -> AgentClient -> ConnId -> ExceptT AgentErrorType IO () -exchangeGreetingsViaProxy viaProxy = exchangeGreetingsViaProxyMsgId_ viaProxy PQEncOn 2 +exchangeGreetingsViaProxy viaProxy = exchangeGreetingsViaProxyMsgId_ viaProxy PQEncOn 2 exchangeGreetingsViaProxyMsgId_ :: HasCallStack => Bool -> PQEncryption -> Int64 -> AgentClient -> ConnId -> AgentClient -> ConnId -> ExceptT AgentErrorType IO () exchangeGreetingsViaProxyMsgId_ viaProxy pqEnc msgId alice bobId bob aliceId = do diff --git a/tests/AgentTests/MigrationTests.hs b/tests/AgentTests/MigrationTests.hs index ae90944f4..56bf4e128 100644 --- a/tests/AgentTests/MigrationTests.hs +++ b/tests/AgentTests/MigrationTests.hs @@ -11,7 +11,8 @@ import Simplex.Messaging.Agent.Store.Interface import Simplex.Messaging.Agent.Store.Migrations (migrationsToRun) import Simplex.Messaging.Agent.Store.Shared import System.Random (randomIO) -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util #if defined(dbPostgres) import qualified Data.ByteString.Char8 as B import Database.PostgreSQL.Simple (fromOnly) diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index 2c3ba40d4..7ad997aea 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -61,7 +61,7 @@ import Data.Time.Clock.System (systemToUTCTime) import qualified Database.PostgreSQL.Simple as PSQL import NtfClient import SMPAgentClient (agentCfg, initAgentServers, initAgentServers2, testDB, testDB2, testNtfServer, testNtfServer2) -import SMPClient (cfgMS, cfgJ2QS, cfgVPrev, ntfTestPort, ntfTestPort2, serverStoreConfig, testPort, testPort2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn, xit'') +import SMPClient (cfgJ2QS, cfgMS, cfgVPrev, ntfTestPort, ntfTestPort2, serverStoreConfig, testPort, testPort2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn) import Simplex.Messaging.Agent hiding (createConnection, joinConnection, sendMessage) import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..), withStore') import Simplex.Messaging.Agent.Env.SQLite (AgentConfig, Env (..), InitialAgentServers) @@ -83,8 +83,9 @@ import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server.Env.STM (AStoreType (..), ServerConfig (..)) import Simplex.Messaging.Transport (ATransport) import System.Process (callCommand) -import Test.Hspec +import Test.Hspec hiding (fit, it) import UnliftIO +import Util #if defined(dbPostgres) import Database.PostgreSQL.Simple.SqlQQ (sql) #else @@ -156,10 +157,10 @@ notificationTests ps@(t, _) = do it "should resume subscriptions after SMP server is restarted" $ withAPNSMockServer $ \apns -> withNtfServer t $ testNotificationsSMPRestart ps apns - describe "Notifications after SMP server restart" $ + describe "Notifications after SMP server restart (batched)" $ it "should resume batched subscriptions after SMP server is restarted" $ withAPNSMockServer $ \apns -> - withNtfServer t $ testNotificationsSMPRestartBatch 100 ps apns + withNtfServer t $ testNotificationsSMPRestartBatch 50 ps apns describe "should switch notifications to the new queue" $ testServerMatrix2 ps $ \servers -> withAPNSMockServer $ \apns -> @@ -227,8 +228,6 @@ v .-> key = do testNtfTokenRepeatRegistration :: APNSMockServer -> IO () testNtfTokenRepeatRegistration apns = do - -- setLogLevel LogError -- LogDebug - -- withGlobalLogging logCfg $ do withAgent 1 agentCfg initAgentServers testDB $ \a -> runRight_ $ do let tkn = DeviceToken PPApnsTest "abcd" NTRegistered <- registerNtfToken a tkn NMPeriodic @@ -248,8 +247,6 @@ testNtfTokenRepeatRegistration apns = do testNtfTokenSecondRegistration :: APNSMockServer -> IO () testNtfTokenSecondRegistration apns = - -- setLogLevel LogError -- LogDebug - -- withGlobalLogging logCfg $ do withAgentClients2 $ \a a' -> runRight_ $ do let tkn = DeviceToken PPApnsTest "abcd" NTRegistered <- registerNtfToken a tkn NMPeriodic @@ -559,7 +556,6 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag verifyNtfToken alice tkn vNonce verification NTActive <- checkNtfToken alice tkn -- send message - liftIO $ threadDelay 250000 1 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello" get bob ##> ("", aliceId, SENT $ baseId + 1) -- notification @@ -571,11 +567,10 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag -- alice client already has subscription for the connection, [Left (CMD PROHIBITED _)] <- getConnectionMessages alice [ConnMsgReq cId 1 $ Just $ systemToUTCTime msgTs] - threadDelay 500000 + threadDelay 1000000 suspendAgent alice 0 closeDBStore store threadDelay 1000000 >> callCommand "sync" >> threadDelay 1000000 - putStrLn "before opening the database from another agent" -- aliceNtf client doesn't have subscription and is allowed to get notification message withAgent 3 aliceCfg initAgentServers testDB $ \aliceNtf -> do @@ -583,7 +578,6 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag pure () threadDelay 1000000 >> callCommand "sync" >> threadDelay 1000000 - putStrLn "after closing the database in another agent" reopenDBStore store foregroundAgent alice threadDelay 500000 diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index fb6c72996..b390d6bd6 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -52,11 +52,12 @@ import Simplex.Messaging.Crypto.File (CryptoFile (..)) import Simplex.Messaging.Crypto.Ratchet (InitialKeys (..), pattern PQSupportOn) import qualified Simplex.Messaging.Crypto.Ratchet as CR import Simplex.Messaging.Encoding.String (StrEncoding (..)) -import Simplex.Messaging.Protocol (EntityId (..), SubscriptionMode (..), QueueMode (..), pattern VersionSMPC) +import Simplex.Messaging.Protocol (EntityId (..), QueueMode (..), SubscriptionMode (..), pattern VersionSMPC) import qualified Simplex.Messaging.Protocol as SMP import System.Random -import Test.Hspec +import Test.Hspec hiding (fit, it) import UnliftIO.Directory (removeFile) +import Util testDB :: String testDB = "tests/tmp/smp-agent.test.db" diff --git a/tests/AgentTests/SchemaDump.hs b/tests/AgentTests/SchemaDump.hs index 736863364..b64e2ec81 100644 --- a/tests/AgentTests/SchemaDump.hs +++ b/tests/AgentTests/SchemaDump.hs @@ -18,7 +18,8 @@ import Simplex.Messaging.Agent.Store.Shared (Migration (..), MigrationConfirmati import Simplex.Messaging.Util (ifM) import System.Directory (doesFileExist, removeFile) import System.Process (readCreateProcess, shell) -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util testDB :: FilePath testDB = "tests/tmp/test_agent_schema.db" diff --git a/tests/AgentTests/ServerChoice.hs b/tests/AgentTests/ServerChoice.hs index 12e690888..2513f5f6d 100644 --- a/tests/AgentTests/ServerChoice.hs +++ b/tests/AgentTests/ServerChoice.hs @@ -14,8 +14,9 @@ import Simplex.Messaging.Agent.Client hiding (userServers) import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Client (defaultNetworkConfig) import Simplex.Messaging.Protocol -import Test.Hspec +import Test.Hspec hiding (fit, it) import Test.QuickCheck +import Util import XFTPClient (testXFTPServer) serverChoiceTests :: Spec diff --git a/tests/AgentTests/ShortLinkTests.hs b/tests/AgentTests/ShortLinkTests.hs index e91472f99..d3aeca0cc 100644 --- a/tests/AgentTests/ShortLinkTests.hs +++ b/tests/AgentTests/ShortLinkTests.hs @@ -11,7 +11,8 @@ import Control.Monad.Except import Simplex.Messaging.Agent.Protocol (AgentErrorType (..), ConnectionMode (..), LinkKey (..), SMPAgentError (..), linkUserData, supportedSMPAgentVRange) import qualified Simplex.Messaging.Crypto as C import qualified Simplex.Messaging.Crypto.ShortLink as SL -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util shortLinkTests :: Spec shortLinkTests = do @@ -20,7 +21,7 @@ shortLinkTests = do it "should fail to decrypt invitation data with bad hash" testInvShortLinkBadDataHash describe "contact short link" $ do it "should encrypt and decrypt data" testContactShortLink - it "should encrypt updated user data" testUpdateContactShortLink + it "should encrypt updated user data" testUpdateContactShortLink it "should fail to decrypt contact data with bad hash" testContactShortLinkBadDataHash it "should fail to decrypt contact data with bad signature" testContactShortLinkBadSignature diff --git a/tests/CLITests.hs b/tests/CLITests.hs index 51d5d6c68..ceb4d9e6e 100644 --- a/tests/CLITests.hs +++ b/tests/CLITests.hs @@ -35,12 +35,13 @@ import System.Environment (withArgs) import System.FilePath (()) import System.IO.Silently (capture_) import System.Timeout (timeout) -import Test.Hspec +import Test.Hspec hiding (fit, it) import Test.Main (withStdin) import UnliftIO (catchAny) import UnliftIO.Async (async, cancel) import UnliftIO.Concurrent (threadDelay) import UnliftIO.Exception (bracket) +import Util #if defined(dbServerPostgres) import qualified Database.PostgreSQL.Simple as PSQL diff --git a/tests/CoreTests/BatchingTests.hs b/tests/CoreTests/BatchingTests.hs index 3e6a3fa40..cdecffabb 100644 --- a/tests/CoreTests/BatchingTests.hs +++ b/tests/CoreTests/BatchingTests.hs @@ -24,7 +24,8 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Protocol import Simplex.Messaging.Transport -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util batchingTests :: Spec batchingTests = do diff --git a/tests/CoreTests/CryptoFileTests.hs b/tests/CoreTests/CryptoFileTests.hs index a1af00c08..ce9fa7c35 100644 --- a/tests/CoreTests/CryptoFileTests.hs +++ b/tests/CoreTests/CryptoFileTests.hs @@ -13,7 +13,8 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.File (CryptoFile (..), FTCryptoError (..)) import qualified Simplex.Messaging.Crypto.File as CF import System.Directory (getFileSize) -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util cryptoFileTests :: Spec cryptoFileTests = do diff --git a/tests/CoreTests/CryptoTests.hs b/tests/CoreTests/CryptoTests.hs index 0a6ca90db..8e4d9a258 100644 --- a/tests/CoreTests/CryptoTests.hs +++ b/tests/CoreTests/CryptoTests.hs @@ -24,9 +24,10 @@ import qualified Simplex.Messaging.Crypto as C import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Crypto.SNTRUP761.Bindings import Simplex.Messaging.Transport.Client -import Test.Hspec +import Test.Hspec hiding (fit, it) import Test.Hspec.QuickCheck (modifyMaxSuccess) import Test.QuickCheck +import Util cryptoTests :: Spec cryptoTests = do diff --git a/tests/CoreTests/EncodingTests.hs b/tests/CoreTests/EncodingTests.hs index a89499777..dc453c4c0 100644 --- a/tests/CoreTests/EncodingTests.hs +++ b/tests/CoreTests/EncodingTests.hs @@ -16,9 +16,10 @@ import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Transport.Client (TransportHost (..)) -import Test.Hspec +import Test.Hspec hiding (fit, it) import Test.Hspec.QuickCheck (modifyMaxSuccess) import Test.QuickCheck +import Util int64 :: Int64 int64 = 1234567890123456789 diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 5c0e7f95b..2d121e8ef 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -10,8 +10,8 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeApplications #-} -{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} {-# OPTIONS_GHC -Wno-orphans #-} +{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} module CoreTests.MsgStoreTests where @@ -23,13 +23,14 @@ import Control.Monad import Control.Monad.IO.Class import Control.Monad.Trans.Except import Crypto.Random (ChaChaDRG) +import qualified Data.ByteString.Base64.URL as B64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import qualified Data.ByteString.Base64.URL as B64 import Data.List (isPrefixOf, isSuffixOf) import Data.Maybe (fromJust) import Data.Time.Clock (addUTCTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) +import SMPClient (testStoreLogFile, testStoreMsgsDir, testStoreMsgsDir2, testStoreMsgsFile, testStoreMsgsFile2) import Simplex.Messaging.Crypto (pattern MaxLenBS) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Protocol (EntityId (..), LinkId, Message (..), QueueLinkData, RecipientId, SParty (..), noMsgFlags) @@ -43,11 +44,11 @@ import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.StoreLog (closeStoreLog, logCreateQueue) -import SMPClient (testStoreLogFile, testStoreMsgsDir, testStoreMsgsDir2, testStoreMsgsFile, testStoreMsgsFile2) import System.Directory (copyFile, createDirectoryIfMissing, listDirectory, removeFile, renameFile) import System.FilePath (()) import System.IO (IOMode (..), withFile) -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util msgStoreTests :: Spec msgStoreTests = do @@ -256,7 +257,6 @@ testQueueState ms = do length . lines <$> readFile statePath `shouldReturn` 1 readQueueState ms statePath `shouldReturn` (Just state, False) length <$> listDirectory dir `shouldReturn` 1 -- no backup - let state1 = state { size = 1, @@ -267,7 +267,6 @@ testQueueState ms = do length . lines <$> readFile statePath `shouldReturn` 2 readQueueState ms statePath `shouldReturn` (Just state1, False) length <$> listDirectory dir `shouldReturn` 1 -- no backup - let state2 = state { size = 2, @@ -343,7 +342,7 @@ testRemoveJournals ms = do runRight $ do q <- ExceptT $ addQueue ms rId qr Just (Message {msgId = mId1}, True) <- write q "message 1" - Just (Message {msgId = mId2}, False) <- write q "message 2" + Just (Message {msgId = mId2}, False) <- write q "message 2" (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1 (Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2 liftIO $ closeMsgQueue ms q diff --git a/tests/CoreTests/RetryIntervalTests.hs b/tests/CoreTests/RetryIntervalTests.hs index d4eee9ed6..36a50f30d 100644 --- a/tests/CoreTests/RetryIntervalTests.hs +++ b/tests/CoreTests/RetryIntervalTests.hs @@ -8,14 +8,15 @@ import Control.Concurrent.STM import Control.Monad (when) import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds) import Simplex.Messaging.Agent.RetryInterval -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util retryIntervalTests :: Spec retryIntervalTests = do describe "Retry interval with 2 modes and lock" $ do testRetryIntervalSameMode testRetryIntervalSwitchMode - describe "Foreground retry interval" $ do + describe "Foreground retry interval" $ do testRetryForeground testRetryToBackground testRetrySkipWhenForeground @@ -103,7 +104,7 @@ testRetryForeground = when (length ints < 8) $ loop (reverse <$> readTVarIO intervals) `shouldReturn` [0, 1, 1, 1, 2, 3, 4, 4] (reverse <$> readTVarIO reportedIntervals) - `shouldReturn` [ 10000, 10000, 15000, 22500, 33750, 40000, 40000, 40000] + `shouldReturn` [10000, 10000, 15000, 22500, 33750, 40000, 40000, 40000] testRetryToBackground :: Spec testRetryToBackground = @@ -124,7 +125,7 @@ testRetryToBackground = ) (reverse <$> readTVarIO intervals) `shouldReturn` [0, 1, 1, 1, 2, 3, 4, 4] (reverse <$> readTVarIO reportedIntervals) - `shouldReturn` [ 10000, 10000, 15000, 22500, 33750, 40000, 40000, 40000] + `shouldReturn` [10000, 10000, 15000, 22500, 33750, 40000, 40000, 40000] testRetrySkipWhenForeground :: Spec testRetrySkipWhenForeground = @@ -149,7 +150,7 @@ testRetrySkipWhenForeground = ) (reverse <$> readTVarIO intervals) `shouldReturn` [0, 1, 1, 1, 2, 0, 1, 1, 1, 2, 3, 1] (reverse <$> readTVarIO reportedIntervals) - `shouldReturn` [ 10000, 10000, 15000, 22500, 33750, 10000, 10000, 15000, 22500, 33750, 40000, 10000] + `shouldReturn` [10000, 10000, 15000, 22500, 33750, 10000, 10000, 15000, 22500, 33750, 40000, 10000] addInterval :: TVar [Int] -> TVar UTCTime -> IO [Int] addInterval intervals ts = do diff --git a/tests/CoreTests/SOCKSSettings.hs b/tests/CoreTests/SOCKSSettings.hs index 1c510e3c4..931315cc7 100644 --- a/tests/CoreTests/SOCKSSettings.hs +++ b/tests/CoreTests/SOCKSSettings.hs @@ -12,7 +12,8 @@ import Simplex.Messaging.Client import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (ErrorType) import Simplex.Messaging.Transport.Client -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util socksSettingsTests :: Spec socksSettingsTests = do diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index b1ab3cb9d..cb5861d7a 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -29,7 +29,8 @@ import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..)) import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.StoreLog -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util testPublicAuthKey :: C.APublicAuthKey testPublicAuthKey = C.APublicAuthKey C.SEd25519 (C.publicKey "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe") @@ -81,19 +82,19 @@ storeLogTests = saved = [CreateQueue rId' qr'], compacted = [CreateQueue rId' qr'], state = M.fromList [(rId', qr')] - }, + }, SLTC { name = "create new queue, add link data", saved = [CreateQueue rId' qr' {queueData = Nothing}, CreateLink rId' lnkId qd], compacted = [CreateQueue rId' qr'], state = M.fromList [(rId', qr')] - }, + }, SLTC { name = "create new queue with link data, delete data", saved = [CreateQueue rId' qr', DeleteLink rId'], compacted = [CreateQueue rId' qr' {queueData = Nothing}], state = M.fromList [(rId', qr' {queueData = Nothing})] - }, + }, SLTC { name = "secure queue", saved = [CreateQueue rId qr, SecureQueue rId testPublicAuthKey], diff --git a/tests/CoreTests/TRcvQueuesTests.hs b/tests/CoreTests/TRcvQueuesTests.hs index 4098fd0f4..c1b5b1c2b 100644 --- a/tests/CoreTests/TRcvQueuesTests.hs +++ b/tests/CoreTests/TRcvQueuesTests.hs @@ -17,9 +17,10 @@ import Simplex.Messaging.Agent.Protocol (ConnId, QueueStatus (..), UserId) import Simplex.Messaging.Agent.Store (DBQueueId (..), RcvQueue, StoredRcvQueue (..)) import qualified Simplex.Messaging.Agent.TRcvQueues as RQ import qualified Simplex.Messaging.Crypto as C -import Simplex.Messaging.Protocol (EntityId (..), RecipientId, SMPServer, QueueMode (..), pattern NoEntity, pattern VersionSMPC) -import Test.Hspec +import Simplex.Messaging.Protocol (EntityId (..), QueueMode (..), RecipientId, SMPServer, pattern NoEntity, pattern VersionSMPC) +import Test.Hspec hiding (fit, it) import UnliftIO +import Util tRcvQueuesTests :: Spec tRcvQueuesTests = do @@ -120,7 +121,7 @@ getSessQueuesTest = do atomically (RQ.hasSessQueues tSess3 trq) `shouldReturn` False let tSess4 = (0, "smp://1234-w==@alpha", Nothing) RQ.getSessQueues tSess4 trq `shouldReturn` [dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2", dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1"] - atomically (RQ.hasSessQueues tSess4 trq) `shouldReturn`True + atomically (RQ.hasSessQueues tSess4 trq) `shouldReturn` True getDelSessQueuesTest :: IO () getDelSessQueuesTest = do diff --git a/tests/CoreTests/UtilTests.hs b/tests/CoreTests/UtilTests.hs index 2254ecafd..4159f25e1 100644 --- a/tests/CoreTests/UtilTests.hs +++ b/tests/CoreTests/UtilTests.hs @@ -8,8 +8,9 @@ import Control.Monad.Except import Control.Monad.IO.Class import Data.IORef import Simplex.Messaging.Util -import Test.Hspec +import Test.Hspec hiding (fit, it) import qualified UnliftIO.Exception as UE +import Util utilTests :: Spec utilTests = do diff --git a/tests/CoreTests/VersionRangeTests.hs b/tests/CoreTests/VersionRangeTests.hs index ff53cc6ca..44f31a5dd 100644 --- a/tests/CoreTests/VersionRangeTests.hs +++ b/tests/CoreTests/VersionRangeTests.hs @@ -11,9 +11,10 @@ import GHC.Generics (Generic) import Generic.Random (genericArbitraryU) import Simplex.Messaging.Version import Simplex.Messaging.Version.Internal -import Test.Hspec +import Test.Hspec hiding (fit, it) import Test.Hspec.QuickCheck (modifyMaxSuccess) import Test.QuickCheck +import Util data V = V1 | V2 | V3 | V4 | V5 deriving (Eq, Enum, Ord, Generic, Show) diff --git a/tests/FileDescriptionTests.hs b/tests/FileDescriptionTests.hs index 10c719888..b4b8ffd28 100644 --- a/tests/FileDescriptionTests.hs +++ b/tests/FileDescriptionTests.hs @@ -16,7 +16,8 @@ import Simplex.Messaging.Encoding.String (StrEncoding (..)) import Simplex.Messaging.Protocol (EntityId (..)) import Simplex.Messaging.ServiceScheme (ServiceScheme (..)) import System.Directory (removeFile) -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util fileDescriptionTests :: Spec fileDescriptionTests = do diff --git a/tests/NtfClient.hs b/tests/NtfClient.hs index f20264cb8..5c624ee9d 100644 --- a/tests/NtfClient.hs +++ b/tests/NtfClient.hs @@ -55,7 +55,7 @@ import Simplex.Messaging.Transport.Client import Simplex.Messaging.Transport.HTTP2 (HTTP2Body (..), http2TLSParams) import Simplex.Messaging.Transport.HTTP2.Server import Simplex.Messaging.Transport.Server -import Test.Hspec +import Test.Hspec hiding (fit, it) import UnliftIO.Async import UnliftIO.Concurrent import qualified UnliftIO.Exception as E @@ -83,7 +83,7 @@ ntfTestPrometheusMetricsFile :: FilePath ntfTestPrometheusMetricsFile = "tests/tmp/ntf-server-metrics.txt" ntfTestStoreDBOpts :: DBOpts -ntfTestStoreDBOpts = +ntfTestStoreDBOpts = DBOpts { connstr = ntfTestServerDBConnstr, schema = "ntf_server", @@ -99,10 +99,10 @@ ntfTestServerDBConnstr = "postgresql://ntf_test_server_user@/ntf_test_server_db" ntfTestServerDBConnectInfo :: ConnectInfo ntfTestServerDBConnectInfo = - defaultConnectInfo { - connectUser = "ntf_test_server_user", - connectDatabase = "ntf_test_server_db" - } + defaultConnectInfo + { connectUser = "ntf_test_server_user", + connectDatabase = "ntf_test_server_db" + } ntfTestDBCfg :: PostgresStoreCfg ntfTestDBCfg = @@ -134,7 +134,6 @@ ntfServerCfg = subIdBytes = 24, regCodeBytes = 32, clientQSize = 2, - subQSize = 2, pushQSize = 2, smpAgentCfg = defaultSMPClientAgentConfig {persistErrorInterval = 0}, apnsConfig = diff --git a/tests/NtfServerTests.hs b/tests/NtfServerTests.hs index 3803e08fa..987301ff4 100644 --- a/tests/NtfServerTests.hs +++ b/tests/NtfServerTests.hs @@ -43,8 +43,9 @@ import Simplex.Messaging.Notifications.Transport (THandleNTF) import Simplex.Messaging.Parsers (parse, parseAll) import Simplex.Messaging.Protocol hiding (notification) import Simplex.Messaging.Transport -import Test.Hspec +import Test.Hspec hiding (fit, it) import UnliftIO.STM +import Util ntfServerTests :: ATransport -> Spec ntfServerTests t = do @@ -243,7 +244,7 @@ registerToken nh apns token = do let Right verification = nd .-> "verification" Right nonce = C.cbNonce <$> nd .-> "nonce" Right pt = C.cbDecrypt dhSecret nonce verification - in NtfRegCode pt + in NtfRegCode pt let code = decryptCode ntfData pure (tknKey, dhSecret, tId, code) diff --git a/tests/PostgresSchemaDump.hs b/tests/PostgresSchemaDump.hs index de96e76ac..dbacce3f3 100644 --- a/tests/PostgresSchemaDump.hs +++ b/tests/PostgresSchemaDump.hs @@ -17,7 +17,8 @@ import Simplex.Messaging.Util (ifM, whenM) import System.Directory (doesFileExist, removeFile) import System.Environment (lookupEnv) import System.Process (readCreateProcess, shell) -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util testSchemaPath :: FilePath testSchemaPath = "tests/tmp/test_schema.sql" diff --git a/tests/RemoteControl.hs b/tests/RemoteControl.hs index 4b6db594b..d4987b33b 100644 --- a/tests/RemoteControl.hs +++ b/tests/RemoteControl.hs @@ -19,9 +19,10 @@ import qualified Simplex.RemoteControl.Client as RC import Simplex.RemoteControl.Discovery (mkLastLocalHost, preferAddress) import Simplex.RemoteControl.Invitation (RCSignedInvitation, verifySignedInvitation) import Simplex.RemoteControl.Types -import Test.Hspec +import Test.Hspec hiding (fit, it) import UnliftIO import UnliftIO.Concurrent +import Util remoteControlTests :: Spec remoteControlTests = do diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 693049cca..7eb49fc38 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -14,7 +14,6 @@ module SMPClient where -import Control.Logger.Simple (LogLevel (..)) import Control.Monad.Except (runExceptT) import Data.ByteString.Char8 (ByteString) import Data.List.NonEmpty (NonEmpty) @@ -38,7 +37,8 @@ import Simplex.Messaging.Version import Simplex.Messaging.Version.Internal import System.Environment (lookupEnv) import System.Info (os) -import Test.Hspec +import System.Process (callCommand) +import Test.Hspec hiding (fit, it) import UnliftIO.Concurrent import qualified UnliftIO.Exception as E import UnliftIO.STM (TMVar, atomically, newEmptyTMVarIO, putTMVar, takeTMVar) @@ -82,7 +82,7 @@ testStoreLogFile2 :: FilePath testStoreLogFile2 = "tests/tmp/smp-server-store.log.2" testStoreDBOpts :: DBOpts -testStoreDBOpts = +testStoreDBOpts = DBOpts { connstr = testServerDBConnstr, schema = "smp_server", @@ -176,7 +176,7 @@ journalCfg :: ServerConfig -> FilePath -> FilePath -> ServerConfig journalCfg cfg' storeLogFile storeMsgsPath = cfg' {serverStoreCfg = ASSCfg SQSMemory SMSJournal SSCMemoryJournal {storeLogFile, storeMsgsPath}} journalCfgDB :: ServerConfig -> DBOpts -> FilePath -> ServerConfig -journalCfgDB cfg' dbOpts storeMsgsPath' = +journalCfgDB cfg' dbOpts storeMsgsPath' = let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCYesUp, deletedTTL = 86400} in cfg' {serverStoreCfg = ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeCfg, storeMsgsPath'}} @@ -228,7 +228,7 @@ cfgMS msType = } defaultStartOptions :: StartOptions -defaultStartOptions = StartOptions {maintenance = False, compactLog = False, logLevel = LogError, skipWarnings = False, confirmMigrations = MCYesUp} +defaultStartOptions = StartOptions {maintenance = False, compactLog = False, logLevel = testLogLevel, skipWarnings = False, confirmMigrations = MCYesUp} serverStoreConfig :: AStoreType -> AServerStoreCfg serverStoreConfig = serverStoreConfig_ False @@ -303,7 +303,7 @@ serverBracket process afterProcess f = do started <- newEmptyTMVarIO E.bracket (forkIOWithUnmask (\unmask -> unmask (process started) `E.catchAny` handleStartError started)) - (\t -> killThread t >> afterProcess >> waitFor started "stop") + (\t -> killThread t >> afterProcess >> waitFor started "stop" >> callCommand "sync") (\t -> waitFor started "start" >> f t >>= \r -> r <$ threadDelay 100000) where -- it putTMVar is called twise to unlock both parts of the bracket in case of start failure diff --git a/tests/SMPProxyTests.hs b/tests/SMPProxyTests.hs index c26e97902..b4af2d6ef 100644 --- a/tests/SMPProxyTests.hs +++ b/tests/SMPProxyTests.hs @@ -45,7 +45,7 @@ import Simplex.Messaging.Util (bshow, tshow) import Simplex.Messaging.Version (mkVersionRange) import System.FilePath (splitExtensions) import System.Random (randomRIO) -import Test.Hspec +import Test.Hspec hiding (fit, it) import UnliftIO import Util #if defined(dbPostgres) diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index df633b570..14b73c1a4 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -17,7 +17,7 @@ module ServerTests where import Control.Concurrent (ThreadId, killThread, threadDelay) import Control.Concurrent.STM -import Control.Exception (SomeException, try, throwIO) +import Control.Exception (SomeException, throwIO, try) import Control.Monad import Control.Monad.IO.Class import CoreTests.MsgStoreTests (testJournalStoreCfg) @@ -39,7 +39,7 @@ import Simplex.Messaging.Server (exportMessages) import Simplex.Messaging.Server.Env.STM (AServerStoreCfg (..), AStoreType (..), ServerConfig (..), ServerStoreCfg (..), readWriteQueueStore) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..), QStoreCfg (..)) -import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SQSType (..), SMSType (..), newMsgStore) +import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SMSType (..), SQSType (..), newMsgStore) import Simplex.Messaging.Server.Stats (PeriodStatsData (..), ServerStatsData (..)) import Simplex.Messaging.Server.StoreLog (StoreLogRecord (..), closeStoreLog) import Simplex.Messaging.Transport @@ -50,8 +50,8 @@ import System.IO (IOMode (..), withFile) import System.TimeIt (timeItT) import System.Timeout import Test.HUnit -import Test.Hspec -import Util (removeFileIfExists) +import Test.Hspec hiding (fit, it) +import Util serverTests :: SpecWith (ATransport, AStoreType) serverTests = do @@ -687,7 +687,7 @@ testWithStoreLog = runClient _ test' = testSMPClient test' `shouldReturn` () serverStoreLogCfg :: AStoreType -> (ServerConfig, Bool) -serverStoreLogCfg msType = +serverStoreLogCfg msType = let serverStoreCfg = serverStoreConfig_ True msType cfg' = (cfgMS msType) {serverStoreCfg, storeNtfsFile = Just testStoreNtfsFile, serverStatsBackupFile = Just testServerStatsBackupFile} compacting = case msType of @@ -918,7 +918,9 @@ testTiming = (C.AuthAlg C.SX25519, C.AuthAlg C.SX25519, 200) -- correct key type ] timeRepeat n = fmap fst . timeItT . forM_ (replicate n ()) . const - similarTime t1 t2 = abs (t2 / t1 - 1) < 0.30 -- normally the difference between "no queue" and "wrong key" is less than 5% + similarTime t1 t2 + | t1 <= t2 = abs (1 - t1 / t2) < 0.35 -- normally the difference between "no queue" and "wrong key" is less than 5% + | otherwise = similarTime t2 t1 testSameTiming :: forall c. Transport c => THandleSMP c 'TClient -> THandleSMP c 'TClient -> (C.AuthAlg, C.AuthAlg, Int) -> Expectation testSameTiming rh sh (C.AuthAlg goodKeyAlg, C.AuthAlg badKeyAlg, n) = do g <- C.newRandom @@ -1091,8 +1093,8 @@ testBlockMessageQueue = pure a testInvQueueLinkData :: SpecWith (ATransport, AStoreType) -testInvQueueLinkData = - it "create and access queue short link data for 1-time invitation" $ \(ATransport t, msType) -> +testInvQueueLinkData = + it "create and access queue short link data for 1-time invitation" $ \(ATransport t, msType) -> smpTest2 t msType $ \r s -> do g <- C.newRandom (rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g @@ -1144,8 +1146,8 @@ testInvQueueLinkData = rId2 `shouldBe` rId testContactQueueLinkData :: SpecWith (ATransport, AStoreType) -testContactQueueLinkData = - it "create and access queue short link data for contact address" $ \(ATransport t, msType) -> +testContactQueueLinkData = + it "create and access queue short link data for contact address" $ \(ATransport t, msType) -> smpTest2 t msType $ \r s -> do g <- C.newRandom (rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g diff --git a/tests/Test.hs b/tests/Test.hs index f0827f5fe..06c627514 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -30,7 +30,8 @@ import Simplex.Messaging.Transport (TLS, Transport (..)) -- import Simplex.Messaging.Transport.WebSockets (WS) import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive) import System.Environment (setEnv) -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util import XFTPAgent import XFTPCLI import XFTPServerTests (xftpServerTests) @@ -59,8 +60,7 @@ logCfg = LogConfig {lc_file = Nothing, lc_stderr = True} main :: IO () main = do - -- TODO [ntfdb] running wiht LogWarn level shows potential issue "Queue count differs" - setLogLevel LogError -- LogInfo -- also change in SMPClient.hs in defaultStartOptions + setLogLevel testLogLevel withGlobalLogging logCfg $ do setEnv "APNS_KEY_ID" "H82WD9K9AQ" setEnv "APNS_KEY_FILE" "./tests/fixtures/AuthKey_H82WD9K9AQ.p8" diff --git a/tests/Util.hs b/tests/Util.hs index 0ad371b69..75d596642 100644 --- a/tests/Util.hs +++ b/tests/Util.hs @@ -1,12 +1,21 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeFamilies #-} + module Util where +import Control.Concurrent.Async +import Control.Exception as E +import Control.Logger.Simple import Control.Monad (replicateM, when) import Data.Either (partitionEithers) import Data.List (tails) import GHC.Conc (getNumCapabilities, getNumProcessors, setNumCapabilities) import System.Directory (doesFileExist, removeFile) -import Test.Hspec -import UnliftIO +import System.Timeout (timeout) +import Test.Hspec hiding (fit, it) +import qualified Test.Hspec as Hspec +import Test.Hspec.Core.Spec (Example (..), Result (..), ResultStatus (..)) skip :: String -> SpecWith a -> SpecWith a skip = before_ . pendingWith @@ -32,3 +41,37 @@ removeFileIfExists :: FilePath -> IO () removeFileIfExists filePath = do fileExists <- doesFileExist filePath when fileExists $ removeFile filePath + +newtype TestWrapper a = TestWrapper a + +-- TODO [ntfdb] running wiht LogWarn level shows potential issue "Queue count differs" +testLogLevel :: LogLevel +testLogLevel = LogError + +instance Example a => Example (TestWrapper a) where + type Arg (TestWrapper a) = Arg a + evaluateExample (TestWrapper action) params hooks state = do + let tt = 120 + runTest = + timeout (tt * 1000000) (evaluateExample action params hooks state) >>= \case + Just r -> pure r + Nothing -> throwIO $ userError $ "test timed out after " <> show tt <> " seconds" + retryTest = do + putStrLn "Retrying with more logs..." + setLogLevel LogNote + runTest `finally` setLogLevel testLogLevel -- change this to match log level in Test.hs + E.try runTest >>= \case + Right r -> case resultStatus r of + Failure loc_ reason -> do + putStrLn $ "Test failed: location " ++ show loc_ ++ ", reason: " ++ show reason + retryTest + _ -> pure r + Left (e :: E.SomeException) -> do + putStrLn $ "Test exception: " ++ show e + retryTest + +it :: (HasCallStack, Example a) => String -> a -> SpecWith (Arg a) +it label action = Hspec.it label (TestWrapper action) + +fit :: (HasCallStack, Example a) => String -> a -> SpecWith (Arg a) +fit = fmap focus . it diff --git a/tests/XFTPAgent.hs b/tests/XFTPAgent.hs index bfb601465..ab8b9d6f6 100644 --- a/tests/XFTPAgent.hs +++ b/tests/XFTPAgent.hs @@ -42,9 +42,10 @@ import Simplex.Messaging.Transport (ALPN) import Simplex.Messaging.Util (tshow) import System.Directory (doesDirectoryExist, doesFileExist, getFileSize, listDirectory, removeFile) import System.FilePath (()) -import Test.Hspec +import Test.Hspec hiding (fit, it) import UnliftIO import UnliftIO.Concurrent +import Util import XFTPCLI import XFTPClient #if defined(dbPostgres) diff --git a/tests/XFTPCLI.hs b/tests/XFTPCLI.hs index 567db6f9b..0f308b61a 100644 --- a/tests/XFTPCLI.hs +++ b/tests/XFTPCLI.hs @@ -9,7 +9,8 @@ import System.Directory (createDirectoryIfMissing, getFileSize, listDirectory, r import System.Environment (withArgs) import System.FilePath (()) import System.IO.Silently (capture_) -import Test.Hspec +import Test.Hspec hiding (fit, it) +import Util import XFTPClient (testXFTPServerStr, testXFTPServerStr2, withXFTPServer, withXFTPServer2, xftpServerFiles, xftpServerFiles2) xftpCLITests :: Spec diff --git a/tests/XFTPClient.hs b/tests/XFTPClient.hs index f99283a79..be6558125 100644 --- a/tests/XFTPClient.hs +++ b/tests/XFTPClient.hs @@ -19,7 +19,7 @@ import Simplex.FileTransfer.Transport (supportedFileServerVRange, supportedXFTPh import Simplex.Messaging.Protocol (XFTPServer) import Simplex.Messaging.Transport (ALPN) import Simplex.Messaging.Transport.Server -import Test.Hspec +import Test.Hspec hiding (fit, it) xftpTest :: HasCallStack => (HasCallStack => XFTPClient -> IO ()) -> Expectation xftpTest test = runXFTPTest test `shouldReturn` () diff --git a/tests/XFTPServerTests.hs b/tests/XFTPServerTests.hs index 5193e56cf..fe271f401 100644 --- a/tests/XFTPServerTests.hs +++ b/tests/XFTPServerTests.hs @@ -32,8 +32,9 @@ import Simplex.Messaging.Protocol (BasicAuth, EntityId (..), pattern NoEntity) import Simplex.Messaging.Server.Expiration (ExpirationConfig (..)) import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive, removeFile) import System.FilePath (()) -import Test.Hspec +import Test.Hspec hiding (fit, it) import UnliftIO.STM +import Util import XFTPClient xftpServerTests :: Spec