diff --git a/apps/simplex-stress-test/Main.hs b/apps/simplex-stress-test/Main.hs new file mode 100644 index 0000000000..7d45262a64 --- /dev/null +++ b/apps/simplex-stress-test/Main.hs @@ -0,0 +1,11 @@ +module Main where + +import Simplex.StressTest +import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive) +import Test.Hspec + +main :: IO () +main = do + createDirectoryIfMissing False "tests/tmp" + hspec $ describe "SimpleX chat client" chatTests + removeDirectoryRecursive "tests/tmp" diff --git a/package.yaml b/package.yaml index 746adde477..7292c85ce7 100644 --- a/package.yaml +++ b/package.yaml @@ -25,7 +25,9 @@ dependencies: - directory == 1.3.* - exceptions == 0.10.* - filepath == 1.4.* + - hspec == 2.7.* - mtl == 2.2.* + - network == 3.1.* - optparse-applicative >= 0.15 && < 0.17 - process == 1.6.* - simple-logger == 0.1.* @@ -50,6 +52,12 @@ executables: ghc-options: - -threaded + simplex-stress-test: + source-dirs: apps/simplex-stress-test + main: Main.hs + dependencies: + - simplex-chat + tests: simplex-chat-test: source-dirs: tests diff --git a/simplex-chat.cabal b/simplex-chat.cabal index 7f665785e3..735b51af96 100644 --- a/simplex-chat.cabal +++ b/simplex-chat.cabal @@ -40,6 +40,8 @@ library Simplex.Chat.Types Simplex.Chat.Util Simplex.Chat.View + Simplex.StressTest + Simplex.StressTest.ChatClient other-modules: Paths_simplex_chat hs-source-dirs: @@ -59,7 +61,9 @@ library , directory ==1.3.* , exceptions ==0.10.* , filepath ==1.4.* + , hspec ==2.7.* , mtl ==2.2.* + , network ==3.1.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* , simple-logger ==0.1.* @@ -94,7 +98,47 @@ executable simplex-chat , directory ==1.3.* , exceptions ==0.10.* , filepath ==1.4.* + , hspec ==2.7.* , mtl ==2.2.* + , network ==3.1.* + , optparse-applicative >=0.15 && <0.17 + , process ==1.6.* + , simple-logger ==0.1.* + , simplex-chat + , simplexmq ==1.0.* + , sqlite-simple ==0.4.* + , stm ==2.5.* + , terminal ==0.2.* + , text ==1.2.* + , time ==1.9.* + , unliftio ==0.2.* + , unliftio-core ==0.2.* + default-language: Haskell2010 + +executable simplex-stress-test + main-is: Main.hs + other-modules: + Paths_simplex_chat + hs-source-dirs: + apps/simplex-stress-test + ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns + build-depends: + aeson ==2.0.* + , ansi-terminal >=0.10 && <0.12 + , async ==2.2.* + , attoparsec ==0.14.* + , base >=4.7 && <5 + , base64-bytestring >=1.0 && <1.3 + , bytestring ==0.10.* + , composition ==1.0.* + , containers ==0.6.* + , cryptonite >=0.27 && <0.30 + , directory ==1.3.* + , exceptions ==0.10.* + , filepath ==1.4.* + , hspec ==2.7.* + , mtl ==2.2.* + , network ==3.1.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* , simple-logger ==0.1.* diff --git a/src/Simplex/StressTest.hs b/src/Simplex/StressTest.hs new file mode 100644 index 0000000000..415199b16a --- /dev/null +++ b/src/Simplex/StressTest.hs @@ -0,0 +1,239 @@ +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PostfixOperators #-} + +module Simplex.StressTest where + +import Control.Concurrent (threadDelay) +import Control.Concurrent.Async (concurrently_) +import Control.Concurrent.STM +import Control.Monad (forever, when) +import qualified Data.ByteString as B +import Data.Char (isDigit) +import Data.Maybe (fromJust) +import qualified Data.Text as T +import Simplex.Chat.Controller (ChatController (..)) +import Simplex.Chat.Types (Profile (..), User (..)) +import Simplex.StressTest.ChatClient +import Test.Hspec + +aliceProfile :: Profile +aliceProfile = Profile {displayName = "alice", fullName = "Alice"} + +bobProfile :: Profile +bobProfile = Profile {displayName = "bob", fullName = "Bob"} + +cathProfile :: Profile +cathProfile = Profile {displayName = "cath", fullName = "Catherine"} + +danProfile :: Profile +danProfile = Profile {displayName = "dan", fullName = "Daniel"} + +chatTests :: Spec +chatTests = + describe "server stress test" $ + fit "should stress server with many chats and messages" testStressServer + +testStressServer :: IO () +testStressServer = + withTmpFiles $ do + sentTVar <- newTVarIO (0 :: Int) + concurrentlyN_ $ + forever + ( do + threadDelay 5000000 + sent <- readTVarIO sentTVar + print $ show sent + ) : + map + ( \i -> + testChat2' (i * 2 -1, aliceProfile) (i * 2, bobProfile) $ + \alice bob -> do + connectUsers alice bob + loop i alice bob sentTVar 1 + ) + (take 100 ([1 ..] :: [Int])) + where + loop :: Int -> TestCC -> TestCC -> TVar Int -> Int -> IO () + loop i alice bob sentTVar k = do + alice `send` "@bob hi" + bob `send` "@alice hi" + when (k `mod` 100 == 0) $ do + print $ show i <> " - +200" + atomically $ modifyTVar sentTVar (+ 200) + -- threadDelay 500 + loop i alice bob sentTVar $ k + 1 + +startFileTransfer :: TestCC -> TestCC -> IO () +startFileTransfer alice bob = do + alice #> "/f @bob ./tests/fixtures/test.jpg" + alice <## "use /fc 1 to cancel sending" + bob <# "alice> sends file test.jpg (136.5 KiB / 139737 bytes)" + bob <## "use /fr 1 [/ | ] to receive it" + bob ##> "/fr 1 ./tests/tmp" + bob <## "saving file 1 from alice to ./tests/tmp/test.jpg" + concurrently_ + (bob <## "started receiving file 1 (test.jpg) from alice") + (alice <## "started sending file 1 (test.jpg) to bob") + +checkPartialTransfer :: IO () +checkPartialTransfer = do + src <- B.readFile "./tests/fixtures/test.jpg" + dest <- B.readFile "./tests/tmp/test.jpg" + B.unpack src `shouldStartWith` B.unpack dest + B.length src > B.length dest `shouldBe` True + +connectUsers :: TestCC -> TestCC -> IO () +connectUsers cc1 cc2 = do + name1 <- showName cc1 + name2 <- showName cc2 + cc1 ##> "/c" + inv <- getInvitation cc1 + cc2 ##> ("/c " <> inv) + cc2 <## "confirmation sent!" + concurrently_ + (cc2 <## (name1 <> ": contact is connected")) + (cc1 <## (name2 <> ": contact is connected")) + +showName :: TestCC -> IO String +showName (TestCC ChatController {currentUser} _ _ _ _) = do + Just User {localDisplayName, profile = Profile {fullName}} <- readTVarIO currentUser + pure . T.unpack $ localDisplayName <> " (" <> fullName <> ")" + +createGroup2 :: String -> TestCC -> TestCC -> IO () +createGroup2 gName cc1 cc2 = do + connectUsers cc1 cc2 + name2 <- userName cc2 + cc1 ##> ("/g " <> gName) + cc1 <## ("group #" <> gName <> " is created") + cc1 <## ("use /a " <> gName <> " to add members") + addMember gName cc1 cc2 + cc2 ##> ("/j " <> gName) + concurrently_ + (cc1 <## ("#" <> gName <> ": " <> name2 <> " joined the group")) + (cc2 <## ("#" <> gName <> ": you joined the group")) + +createGroup3 :: String -> TestCC -> TestCC -> TestCC -> IO () +createGroup3 gName cc1 cc2 cc3 = do + createGroup2 gName cc1 cc2 + connectUsers cc1 cc3 + name3 <- userName cc3 + sName2 <- showName cc2 + sName3 <- showName cc3 + addMember gName cc1 cc3 + cc3 ##> ("/j " <> gName) + concurrentlyN_ + [ cc1 <## ("#" <> gName <> ": " <> name3 <> " joined the group"), + do + cc3 <## ("#" <> gName <> ": you joined the group") + cc3 <## ("#" <> gName <> ": member " <> sName2 <> " is connected"), + do + cc2 <## ("#" <> gName <> ": alice added " <> sName3 <> " to the group (connecting...)") + cc2 <## ("#" <> gName <> ": new member " <> name3 <> " is connected") + ] + +addMember :: String -> TestCC -> TestCC -> IO () +addMember gName inviting invitee = do + name1 <- userName inviting + memName <- userName invitee + inviting ##> ("/a " <> gName <> " " <> memName) + concurrentlyN_ + [ inviting <## ("invitation to join the group #" <> gName <> " sent to " <> memName), + do + invitee <## ("#" <> gName <> ": " <> name1 <> " invites you to join the group as admin") + invitee <## ("use /j " <> gName <> " to accept") + ] + +-- | test sending direct messages +(<##>) :: TestCC -> TestCC -> IO () +cc1 <##> cc2 = do + name1 <- userName cc1 + name2 <- userName cc2 + cc1 #> ("@" <> name2 <> " hi") + cc2 <# (name1 <> "> hi") + cc2 #> ("@" <> name1 <> " hey") + cc1 <# (name2 <> "> hey") + +userName :: TestCC -> IO [Char] +userName (TestCC ChatController {currentUser} _ _ _ _) = T.unpack . localDisplayName . fromJust <$> readTVarIO currentUser + +(##>) :: TestCC -> String -> IO () +cc ##> cmd = do + cc `send` cmd + cc <## cmd + +(#>) :: TestCC -> String -> IO () +cc #> cmd = do + cc `send` cmd + cc <# cmd + +(#$>) :: (Eq a, Show a) => TestCC -> (String, String -> a, a) -> Expectation +cc #$> (cmd, f, res) = do + cc ##> cmd + (f <$> getTermLine cc) `shouldReturn` res + +chat :: String -> [(Int, String)] +chat = read + +(#$$>) :: TestCC -> (String, [(String, String)]) -> Expectation +cc #$$> (cmd, res) = do + cc ##> cmd + line <- getTermLine cc + let chats = read line + chats `shouldMatchList` res + +send :: TestCC -> String -> IO () +send TestCC {chatController = cc} cmd = atomically $ writeTBQueue (inputQ cc) cmd + +(<##) :: TestCC -> String -> Expectation +cc <## line = getTermLine cc `shouldReturn` line + +(<###) :: TestCC -> [String] -> Expectation +_ <### [] = pure () +cc <### ls = do + line <- getTermLine cc + if line `elem` ls + then cc <### filter (/= line) ls + else error $ "unexpected output: " <> line + +(<#) :: TestCC -> String -> Expectation +cc <# line = (dropTime <$> getTermLine cc) `shouldReturn` line + +( Expectation +( TestCC -> Expectation +cc1 <#? cc2 = do + name <- userName cc2 + sName <- showName cc2 + cc2 <## "connection request sent!" + cc1 <## (sName <> " wants to connect to you!") + cc1 <## ("to accept: /ac " <> name) + cc1 <## ("to reject: /rc " <> name <> " (the sender will NOT be notified)") + +dropTime :: String -> String +dropTime msg = case splitAt 6 msg of + ([m, m', ':', s, s', ' '], text) -> + if all isDigit [m, m', s, s'] then text else error "invalid time" + _ -> error "invalid time" + +getInvitation :: TestCC -> IO String +getInvitation cc = do + cc <## "pass this invitation link to your contact (via another channel):" + cc <## "" + inv <- getTermLine cc + cc <## "" + cc <## "and ask them to connect: /c " + pure inv + +getContactLink :: TestCC -> Bool -> IO String +getContactLink cc created = do + cc <## if created then "Your new chat address is created!" else "Your chat address:" + cc <## "" + link <- getTermLine cc + cc <## "" + cc <## "Anybody can send you contact requests with: /c " + cc <## "to show it again: /sa" + cc <## "to delete it: /da (accepted contacts will remain connected)" + pure link diff --git a/src/Simplex/StressTest/ChatClient.hs b/src/Simplex/StressTest/ChatClient.hs new file mode 100644 index 0000000000..700887b52a --- /dev/null +++ b/src/Simplex/StressTest/ChatClient.hs @@ -0,0 +1,198 @@ +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedLists #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeApplications #-} + +module Simplex.StressTest.ChatClient where + +import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread) +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Exception (bracket, bracket_) +import Control.Monad.Except +import Data.List (dropWhileEnd) +import Network.Socket +import Simplex.Chat +import Simplex.Chat.Controller (ChatConfig (..), ChatController (..)) +import Simplex.Chat.Options +import Simplex.Chat.Store +import Simplex.Chat.Terminal +import Simplex.Chat.Terminal.Output (newChatTerminal) +import Simplex.Chat.Types (Profile) +import Simplex.Messaging.Agent.Env.SQLite +import Simplex.Messaging.Agent.RetryInterval +import Simplex.Messaging.Client (SMPClientConfig (..), smpDefaultConfig) +import Simplex.Messaging.Server (runSMPServerBlocking) +import Simplex.Messaging.Server.Env.STM +import Simplex.Messaging.Transport +import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive) +import qualified System.Terminal as C +import System.Terminal.Internal (VirtualTerminal (..), VirtualTerminalSettings (..), withVirtualTerminal) +import System.Timeout (timeout) +import Test.Hspec (Expectation, shouldReturn) + +testDBPrefix :: FilePath +testDBPrefix = "tests/tmp/test" + +serverPort :: ServiceName +serverPort = "5001" + +opts :: ChatOpts +opts = + ChatOpts + { dbFilePrefix = undefined, + smpServers = ["smp://Ufcpyx7utrV45fUopHVvKh4NECi5Z3Fa1TyL4L7tGgc=@smp7.simplex.im"], + -- smpServers = ["smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5001"], + logging = False + } + +termSettings :: VirtualTerminalSettings +termSettings = + VirtualTerminalSettings + { virtualType = "xterm", + virtualWindowSize = pure C.Size {height = 24, width = 1000}, + virtualEvent = retry, + virtualInterrupt = retry + } + +data TestCC = TestCC + { chatController :: ChatController, + virtualTerminal :: VirtualTerminal, + chatAsync :: Async (), + termAsync :: Async (), + termQ :: TQueue String + } + +aCfg :: AgentConfig +aCfg = agentConfig defaultChatConfig + +cfg :: ChatConfig +cfg = + defaultChatConfig + { agentConfig = + aCfg + { reconnectInterval = (reconnectInterval aCfg) {initialInterval = 50000}, + smpCfg = smpDefaultConfig {tcpTimeout = 10000000} + }, + testView = True + } + +virtualSimplexChat :: FilePath -> Profile -> IO TestCC +virtualSimplexChat dbFilePrefix profile = do + st <- createStore (dbFilePrefix <> "_chat.db") 1 False + Right user <- runExceptT $ createUser st profile True + t <- withVirtualTerminal termSettings pure + ct <- newChatTerminal t + cc <- newChatController st (Just user) cfg opts {dbFilePrefix} (const $ pure ()) -- no notifications + chatAsync <- async $ runSimplexChat user ct cc + termQ <- newTQueueIO + termAsync <- async $ readTerminalOutput t termQ + pure TestCC {chatController = cc, virtualTerminal = t, chatAsync, termAsync, termQ} + +readTerminalOutput :: VirtualTerminal -> TQueue String -> IO () +readTerminalOutput t termQ = do + let w = virtualWindow t + winVar <- atomically $ newTVar . init =<< readTVar w + forever . atomically $ do + win <- readTVar winVar + win' <- init <$> readTVar w + if win' == win + then retry + else do + let diff = getDiff win' win + forM_ diff $ writeTQueue termQ + writeTVar winVar win' + where + getDiff :: [String] -> [String] -> [String] + getDiff win win' = getDiff_ 1 (length win) win win' + getDiff_ :: Int -> Int -> [String] -> [String] -> [String] + getDiff_ n len win' win = + let diff = drop (len - n) win' + in if drop n win <> diff == win' + then map (dropWhileEnd (== ' ')) diff + else getDiff_ (n + 1) len win' win + +withTmpFiles :: IO () -> IO () +withTmpFiles = + bracket_ + (createDirectoryIfMissing False "tests/tmp") + (removeDirectoryRecursive "tests/tmp") + +testChat2' :: (Int, Profile) -> (Int, Profile) -> (TestCC -> TestCC -> IO ()) -> IO () +testChat2' (i1, p1) (i2, p2) test = do + cc1 <- virtualSimplexChat (testDBPrefix <> show i1) p1 + cc2 <- virtualSimplexChat (testDBPrefix <> show i2) p2 + test cc1 cc2 + +testChatN :: [Profile] -> ([TestCC] -> IO ()) -> IO () +testChatN ps test = withTmpFiles $ do + let envs = zip ps $ map ((testDBPrefix <>) . show) [(1 :: Int) ..] + tcs <- getTestCCs envs [] + test tcs + concurrentlyN_ $ map ( virtualSimplexChat db p <*> getTestCCs envs' tcs + +( Int -> Expectation +( IO String +getTermLine = atomically . readTQueue . termQ + +testChat2 :: Profile -> Profile -> (TestCC -> TestCC -> IO ()) -> IO () +testChat2 p1 p2 test = testChatN [p1, p2] test_ + where + test_ :: [TestCC] -> IO () + test_ [tc1, tc2] = test tc1 tc2 + test_ _ = error "expected 2 chat clients" + +testChat3 :: Profile -> Profile -> Profile -> (TestCC -> TestCC -> TestCC -> IO ()) -> IO () +testChat3 p1 p2 p3 test = testChatN [p1, p2, p3] test_ + where + test_ :: [TestCC] -> IO () + test_ [tc1, tc2, tc3] = test tc1 tc2 tc3 + test_ _ = error "expected 3 chat clients" + +testChat4 :: Profile -> Profile -> Profile -> Profile -> (TestCC -> TestCC -> TestCC -> TestCC -> IO ()) -> IO () +testChat4 p1 p2 p3 p4 test = testChatN [p1, p2, p3, p4] test_ + where + test_ :: [TestCC] -> IO () + test_ [tc1, tc2, tc3, tc4] = test tc1 tc2 tc3 tc4 + test_ _ = error "expected 4 chat clients" + +concurrentlyN_ :: [IO a] -> IO () +concurrentlyN_ = mapConcurrently_ id + +serverCfg :: ServerConfig +serverCfg = + ServerConfig + { transports = [(serverPort, transport @TLS)], + tbqSize = 1, + serverTbqSize = 1, + msgQueueQuota = 4, + queueIdBytes = 12, + msgIdBytes = 6, + storeLog = Nothing, + caCertificateFile = "tests/fixtures/tls/ca.crt", + privateKeyFile = "tests/fixtures/tls/server.key", + certificateFile = "tests/fixtures/tls/server.crt" + } + +withSmpServer :: IO a -> IO a +withSmpServer = serverBracket (`runSMPServerBlocking` serverCfg) (pure ()) . const + +serverBracket :: (TMVar Bool -> IO ()) -> IO () -> (ThreadId -> IO a) -> IO a +serverBracket process afterProcess f = do + started <- newEmptyTMVarIO + bracket + (forkIOWithUnmask ($ process started)) + (\t -> killThread t >> afterProcess >> waitFor started "stop") + (\t -> waitFor started "start" >> f t) + where + waitFor started s = + 5000000 `timeout` atomically (takeTMVar started) >>= \case + Nothing -> error $ "server did not " <> s + _ -> pure () diff --git a/tests/ChatTests.hs b/tests/ChatTests.hs index 102845627f..f10bba0d82 100644 --- a/tests/ChatTests.hs +++ b/tests/ChatTests.hs @@ -6,10 +6,8 @@ module ChatTests where import ChatClient -import Control.Concurrent (threadDelay) import Control.Concurrent.Async (concurrently_) import Control.Concurrent.STM -import Control.Monad (forever, when) import qualified Data.ByteString as B import Data.Char (isDigit) import Data.Maybe (fromJust) @@ -59,36 +57,6 @@ chatTests = do it "should deduplicate contact requests with profile change" testDeduplicateContactRequestsProfileChange it "should reject contact and delete contact link" testRejectContactAndDeleteUserContact it "should delete connection requests when contact link deleted" testDeleteConnectionRequests - describe "server stress test" $ - fit "should stress server with many chats and messages" testStressServer - -testStressServer :: IO () -testStressServer = - withTmpFiles $ do - sentTVar <- newTVarIO (0 :: Int) - concurrentlyN_ $ - -- forever - -- ( do - -- threadDelay 5000000 - -- sent <- readTVarIO sentTVar - -- print $ show sent - -- ) : - map - ( \i -> - testChat2' (i * 2 -1, aliceProfile) (i * 2, bobProfile) $ - \alice bob -> do - connectUsers alice bob - loop alice bob sentTVar 0 - ) - (take 100 ([1 ..] :: [Int])) - where - loop :: TestCC -> TestCC -> TVar Int -> Int -> IO () - loop alice bob sentTVar k = do - alice `send` "@bob hi" - bob `send` "@alice hi" - -- when (k `mod` 1000 == 0) $ atomically $ modifyTVar sentTVar (+ 2000) - threadDelay 500 - loop alice bob sentTVar $ k + 1 testAddContact :: IO () testAddContact =