mirror of
https://github.com/simplex-chat/simplex-chat.git
synced 2026-06-09 10:43:02 +00:00
stress test app
This commit is contained in:
@@ -0,0 +1,239 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PostfixOperators #-}
|
||||
|
||||
module Simplex.StressTest where
|
||||
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Control.Concurrent.Async (concurrently_)
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad (forever, when)
|
||||
import qualified Data.ByteString as B
|
||||
import Data.Char (isDigit)
|
||||
import Data.Maybe (fromJust)
|
||||
import qualified Data.Text as T
|
||||
import Simplex.Chat.Controller (ChatController (..))
|
||||
import Simplex.Chat.Types (Profile (..), User (..))
|
||||
import Simplex.StressTest.ChatClient
|
||||
import Test.Hspec
|
||||
|
||||
aliceProfile :: Profile
|
||||
aliceProfile = Profile {displayName = "alice", fullName = "Alice"}
|
||||
|
||||
bobProfile :: Profile
|
||||
bobProfile = Profile {displayName = "bob", fullName = "Bob"}
|
||||
|
||||
cathProfile :: Profile
|
||||
cathProfile = Profile {displayName = "cath", fullName = "Catherine"}
|
||||
|
||||
danProfile :: Profile
|
||||
danProfile = Profile {displayName = "dan", fullName = "Daniel"}
|
||||
|
||||
chatTests :: Spec
|
||||
chatTests =
|
||||
describe "server stress test" $
|
||||
fit "should stress server with many chats and messages" testStressServer
|
||||
|
||||
testStressServer :: IO ()
|
||||
testStressServer =
|
||||
withTmpFiles $ do
|
||||
sentTVar <- newTVarIO (0 :: Int)
|
||||
concurrentlyN_ $
|
||||
forever
|
||||
( do
|
||||
threadDelay 5000000
|
||||
sent <- readTVarIO sentTVar
|
||||
print $ show sent
|
||||
) :
|
||||
map
|
||||
( \i ->
|
||||
testChat2' (i * 2 -1, aliceProfile) (i * 2, bobProfile) $
|
||||
\alice bob -> do
|
||||
connectUsers alice bob
|
||||
loop i alice bob sentTVar 1
|
||||
)
|
||||
(take 100 ([1 ..] :: [Int]))
|
||||
where
|
||||
loop :: Int -> TestCC -> TestCC -> TVar Int -> Int -> IO ()
|
||||
loop i alice bob sentTVar k = do
|
||||
alice `send` "@bob hi"
|
||||
bob `send` "@alice hi"
|
||||
when (k `mod` 100 == 0) $ do
|
||||
print $ show i <> " - +200"
|
||||
atomically $ modifyTVar sentTVar (+ 200)
|
||||
-- threadDelay 500
|
||||
loop i alice bob sentTVar $ k + 1
|
||||
|
||||
startFileTransfer :: TestCC -> TestCC -> IO ()
|
||||
startFileTransfer alice bob = do
|
||||
alice #> "/f @bob ./tests/fixtures/test.jpg"
|
||||
alice <## "use /fc 1 to cancel sending"
|
||||
bob <# "alice> sends file test.jpg (136.5 KiB / 139737 bytes)"
|
||||
bob <## "use /fr 1 [<dir>/ | <path>] to receive it"
|
||||
bob ##> "/fr 1 ./tests/tmp"
|
||||
bob <## "saving file 1 from alice to ./tests/tmp/test.jpg"
|
||||
concurrently_
|
||||
(bob <## "started receiving file 1 (test.jpg) from alice")
|
||||
(alice <## "started sending file 1 (test.jpg) to bob")
|
||||
|
||||
checkPartialTransfer :: IO ()
|
||||
checkPartialTransfer = do
|
||||
src <- B.readFile "./tests/fixtures/test.jpg"
|
||||
dest <- B.readFile "./tests/tmp/test.jpg"
|
||||
B.unpack src `shouldStartWith` B.unpack dest
|
||||
B.length src > B.length dest `shouldBe` True
|
||||
|
||||
connectUsers :: TestCC -> TestCC -> IO ()
|
||||
connectUsers cc1 cc2 = do
|
||||
name1 <- showName cc1
|
||||
name2 <- showName cc2
|
||||
cc1 ##> "/c"
|
||||
inv <- getInvitation cc1
|
||||
cc2 ##> ("/c " <> inv)
|
||||
cc2 <## "confirmation sent!"
|
||||
concurrently_
|
||||
(cc2 <## (name1 <> ": contact is connected"))
|
||||
(cc1 <## (name2 <> ": contact is connected"))
|
||||
|
||||
showName :: TestCC -> IO String
|
||||
showName (TestCC ChatController {currentUser} _ _ _ _) = do
|
||||
Just User {localDisplayName, profile = Profile {fullName}} <- readTVarIO currentUser
|
||||
pure . T.unpack $ localDisplayName <> " (" <> fullName <> ")"
|
||||
|
||||
createGroup2 :: String -> TestCC -> TestCC -> IO ()
|
||||
createGroup2 gName cc1 cc2 = do
|
||||
connectUsers cc1 cc2
|
||||
name2 <- userName cc2
|
||||
cc1 ##> ("/g " <> gName)
|
||||
cc1 <## ("group #" <> gName <> " is created")
|
||||
cc1 <## ("use /a " <> gName <> " <name> to add members")
|
||||
addMember gName cc1 cc2
|
||||
cc2 ##> ("/j " <> gName)
|
||||
concurrently_
|
||||
(cc1 <## ("#" <> gName <> ": " <> name2 <> " joined the group"))
|
||||
(cc2 <## ("#" <> gName <> ": you joined the group"))
|
||||
|
||||
createGroup3 :: String -> TestCC -> TestCC -> TestCC -> IO ()
|
||||
createGroup3 gName cc1 cc2 cc3 = do
|
||||
createGroup2 gName cc1 cc2
|
||||
connectUsers cc1 cc3
|
||||
name3 <- userName cc3
|
||||
sName2 <- showName cc2
|
||||
sName3 <- showName cc3
|
||||
addMember gName cc1 cc3
|
||||
cc3 ##> ("/j " <> gName)
|
||||
concurrentlyN_
|
||||
[ cc1 <## ("#" <> gName <> ": " <> name3 <> " joined the group"),
|
||||
do
|
||||
cc3 <## ("#" <> gName <> ": you joined the group")
|
||||
cc3 <## ("#" <> gName <> ": member " <> sName2 <> " is connected"),
|
||||
do
|
||||
cc2 <## ("#" <> gName <> ": alice added " <> sName3 <> " to the group (connecting...)")
|
||||
cc2 <## ("#" <> gName <> ": new member " <> name3 <> " is connected")
|
||||
]
|
||||
|
||||
addMember :: String -> TestCC -> TestCC -> IO ()
|
||||
addMember gName inviting invitee = do
|
||||
name1 <- userName inviting
|
||||
memName <- userName invitee
|
||||
inviting ##> ("/a " <> gName <> " " <> memName)
|
||||
concurrentlyN_
|
||||
[ inviting <## ("invitation to join the group #" <> gName <> " sent to " <> memName),
|
||||
do
|
||||
invitee <## ("#" <> gName <> ": " <> name1 <> " invites you to join the group as admin")
|
||||
invitee <## ("use /j " <> gName <> " to accept")
|
||||
]
|
||||
|
||||
-- | test sending direct messages
|
||||
(<##>) :: TestCC -> TestCC -> IO ()
|
||||
cc1 <##> cc2 = do
|
||||
name1 <- userName cc1
|
||||
name2 <- userName cc2
|
||||
cc1 #> ("@" <> name2 <> " hi")
|
||||
cc2 <# (name1 <> "> hi")
|
||||
cc2 #> ("@" <> name1 <> " hey")
|
||||
cc1 <# (name2 <> "> hey")
|
||||
|
||||
userName :: TestCC -> IO [Char]
|
||||
userName (TestCC ChatController {currentUser} _ _ _ _) = T.unpack . localDisplayName . fromJust <$> readTVarIO currentUser
|
||||
|
||||
(##>) :: TestCC -> String -> IO ()
|
||||
cc ##> cmd = do
|
||||
cc `send` cmd
|
||||
cc <## cmd
|
||||
|
||||
(#>) :: TestCC -> String -> IO ()
|
||||
cc #> cmd = do
|
||||
cc `send` cmd
|
||||
cc <# cmd
|
||||
|
||||
(#$>) :: (Eq a, Show a) => TestCC -> (String, String -> a, a) -> Expectation
|
||||
cc #$> (cmd, f, res) = do
|
||||
cc ##> cmd
|
||||
(f <$> getTermLine cc) `shouldReturn` res
|
||||
|
||||
chat :: String -> [(Int, String)]
|
||||
chat = read
|
||||
|
||||
(#$$>) :: TestCC -> (String, [(String, String)]) -> Expectation
|
||||
cc #$$> (cmd, res) = do
|
||||
cc ##> cmd
|
||||
line <- getTermLine cc
|
||||
let chats = read line
|
||||
chats `shouldMatchList` res
|
||||
|
||||
send :: TestCC -> String -> IO ()
|
||||
send TestCC {chatController = cc} cmd = atomically $ writeTBQueue (inputQ cc) cmd
|
||||
|
||||
(<##) :: TestCC -> String -> Expectation
|
||||
cc <## line = getTermLine cc `shouldReturn` line
|
||||
|
||||
(<###) :: TestCC -> [String] -> Expectation
|
||||
_ <### [] = pure ()
|
||||
cc <### ls = do
|
||||
line <- getTermLine cc
|
||||
if line `elem` ls
|
||||
then cc <### filter (/= line) ls
|
||||
else error $ "unexpected output: " <> line
|
||||
|
||||
(<#) :: TestCC -> String -> Expectation
|
||||
cc <# line = (dropTime <$> getTermLine cc) `shouldReturn` line
|
||||
|
||||
(</) :: TestCC -> Expectation
|
||||
(</) = (<// 500000)
|
||||
|
||||
(<#?) :: TestCC -> TestCC -> Expectation
|
||||
cc1 <#? cc2 = do
|
||||
name <- userName cc2
|
||||
sName <- showName cc2
|
||||
cc2 <## "connection request sent!"
|
||||
cc1 <## (sName <> " wants to connect to you!")
|
||||
cc1 <## ("to accept: /ac " <> name)
|
||||
cc1 <## ("to reject: /rc " <> name <> " (the sender will NOT be notified)")
|
||||
|
||||
dropTime :: String -> String
|
||||
dropTime msg = case splitAt 6 msg of
|
||||
([m, m', ':', s, s', ' '], text) ->
|
||||
if all isDigit [m, m', s, s'] then text else error "invalid time"
|
||||
_ -> error "invalid time"
|
||||
|
||||
getInvitation :: TestCC -> IO String
|
||||
getInvitation cc = do
|
||||
cc <## "pass this invitation link to your contact (via another channel):"
|
||||
cc <## ""
|
||||
inv <- getTermLine cc
|
||||
cc <## ""
|
||||
cc <## "and ask them to connect: /c <invitation_link_above>"
|
||||
pure inv
|
||||
|
||||
getContactLink :: TestCC -> Bool -> IO String
|
||||
getContactLink cc created = do
|
||||
cc <## if created then "Your new chat address is created!" else "Your chat address:"
|
||||
cc <## ""
|
||||
link <- getTermLine cc
|
||||
cc <## ""
|
||||
cc <## "Anybody can send you contact requests with: /c <contact_link_above>"
|
||||
cc <## "to show it again: /sa"
|
||||
cc <## "to delete it: /da (accepted contacts will remain connected)"
|
||||
pure link
|
||||
@@ -0,0 +1,198 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedLists #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
|
||||
module Simplex.StressTest.ChatClient where
|
||||
|
||||
import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread)
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.STM
|
||||
import Control.Exception (bracket, bracket_)
|
||||
import Control.Monad.Except
|
||||
import Data.List (dropWhileEnd)
|
||||
import Network.Socket
|
||||
import Simplex.Chat
|
||||
import Simplex.Chat.Controller (ChatConfig (..), ChatController (..))
|
||||
import Simplex.Chat.Options
|
||||
import Simplex.Chat.Store
|
||||
import Simplex.Chat.Terminal
|
||||
import Simplex.Chat.Terminal.Output (newChatTerminal)
|
||||
import Simplex.Chat.Types (Profile)
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
import Simplex.Messaging.Client (SMPClientConfig (..), smpDefaultConfig)
|
||||
import Simplex.Messaging.Server (runSMPServerBlocking)
|
||||
import Simplex.Messaging.Server.Env.STM
|
||||
import Simplex.Messaging.Transport
|
||||
import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive)
|
||||
import qualified System.Terminal as C
|
||||
import System.Terminal.Internal (VirtualTerminal (..), VirtualTerminalSettings (..), withVirtualTerminal)
|
||||
import System.Timeout (timeout)
|
||||
import Test.Hspec (Expectation, shouldReturn)
|
||||
|
||||
testDBPrefix :: FilePath
|
||||
testDBPrefix = "tests/tmp/test"
|
||||
|
||||
serverPort :: ServiceName
|
||||
serverPort = "5001"
|
||||
|
||||
opts :: ChatOpts
|
||||
opts =
|
||||
ChatOpts
|
||||
{ dbFilePrefix = undefined,
|
||||
smpServers = ["smp://Ufcpyx7utrV45fUopHVvKh4NECi5Z3Fa1TyL4L7tGgc=@smp7.simplex.im"],
|
||||
-- smpServers = ["smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5001"],
|
||||
logging = False
|
||||
}
|
||||
|
||||
termSettings :: VirtualTerminalSettings
|
||||
termSettings =
|
||||
VirtualTerminalSettings
|
||||
{ virtualType = "xterm",
|
||||
virtualWindowSize = pure C.Size {height = 24, width = 1000},
|
||||
virtualEvent = retry,
|
||||
virtualInterrupt = retry
|
||||
}
|
||||
|
||||
data TestCC = TestCC
|
||||
{ chatController :: ChatController,
|
||||
virtualTerminal :: VirtualTerminal,
|
||||
chatAsync :: Async (),
|
||||
termAsync :: Async (),
|
||||
termQ :: TQueue String
|
||||
}
|
||||
|
||||
aCfg :: AgentConfig
|
||||
aCfg = agentConfig defaultChatConfig
|
||||
|
||||
cfg :: ChatConfig
|
||||
cfg =
|
||||
defaultChatConfig
|
||||
{ agentConfig =
|
||||
aCfg
|
||||
{ reconnectInterval = (reconnectInterval aCfg) {initialInterval = 50000},
|
||||
smpCfg = smpDefaultConfig {tcpTimeout = 10000000}
|
||||
},
|
||||
testView = True
|
||||
}
|
||||
|
||||
virtualSimplexChat :: FilePath -> Profile -> IO TestCC
|
||||
virtualSimplexChat dbFilePrefix profile = do
|
||||
st <- createStore (dbFilePrefix <> "_chat.db") 1 False
|
||||
Right user <- runExceptT $ createUser st profile True
|
||||
t <- withVirtualTerminal termSettings pure
|
||||
ct <- newChatTerminal t
|
||||
cc <- newChatController st (Just user) cfg opts {dbFilePrefix} (const $ pure ()) -- no notifications
|
||||
chatAsync <- async $ runSimplexChat user ct cc
|
||||
termQ <- newTQueueIO
|
||||
termAsync <- async $ readTerminalOutput t termQ
|
||||
pure TestCC {chatController = cc, virtualTerminal = t, chatAsync, termAsync, termQ}
|
||||
|
||||
readTerminalOutput :: VirtualTerminal -> TQueue String -> IO ()
|
||||
readTerminalOutput t termQ = do
|
||||
let w = virtualWindow t
|
||||
winVar <- atomically $ newTVar . init =<< readTVar w
|
||||
forever . atomically $ do
|
||||
win <- readTVar winVar
|
||||
win' <- init <$> readTVar w
|
||||
if win' == win
|
||||
then retry
|
||||
else do
|
||||
let diff = getDiff win' win
|
||||
forM_ diff $ writeTQueue termQ
|
||||
writeTVar winVar win'
|
||||
where
|
||||
getDiff :: [String] -> [String] -> [String]
|
||||
getDiff win win' = getDiff_ 1 (length win) win win'
|
||||
getDiff_ :: Int -> Int -> [String] -> [String] -> [String]
|
||||
getDiff_ n len win' win =
|
||||
let diff = drop (len - n) win'
|
||||
in if drop n win <> diff == win'
|
||||
then map (dropWhileEnd (== ' ')) diff
|
||||
else getDiff_ (n + 1) len win' win
|
||||
|
||||
withTmpFiles :: IO () -> IO ()
|
||||
withTmpFiles =
|
||||
bracket_
|
||||
(createDirectoryIfMissing False "tests/tmp")
|
||||
(removeDirectoryRecursive "tests/tmp")
|
||||
|
||||
testChat2' :: (Int, Profile) -> (Int, Profile) -> (TestCC -> TestCC -> IO ()) -> IO ()
|
||||
testChat2' (i1, p1) (i2, p2) test = do
|
||||
cc1 <- virtualSimplexChat (testDBPrefix <> show i1) p1
|
||||
cc2 <- virtualSimplexChat (testDBPrefix <> show i2) p2
|
||||
test cc1 cc2
|
||||
|
||||
testChatN :: [Profile] -> ([TestCC] -> IO ()) -> IO ()
|
||||
testChatN ps test = withTmpFiles $ do
|
||||
let envs = zip ps $ map ((testDBPrefix <>) . show) [(1 :: Int) ..]
|
||||
tcs <- getTestCCs envs []
|
||||
test tcs
|
||||
concurrentlyN_ $ map (<// 100000) tcs
|
||||
where
|
||||
getTestCCs [] tcs = pure tcs
|
||||
getTestCCs ((p, db) : envs') tcs = (:) <$> virtualSimplexChat db p <*> getTestCCs envs' tcs
|
||||
|
||||
(<//) :: TestCC -> Int -> Expectation
|
||||
(<//) cc t = timeout t (getTermLine cc) `shouldReturn` Nothing
|
||||
|
||||
getTermLine :: TestCC -> IO String
|
||||
getTermLine = atomically . readTQueue . termQ
|
||||
|
||||
testChat2 :: Profile -> Profile -> (TestCC -> TestCC -> IO ()) -> IO ()
|
||||
testChat2 p1 p2 test = testChatN [p1, p2] test_
|
||||
where
|
||||
test_ :: [TestCC] -> IO ()
|
||||
test_ [tc1, tc2] = test tc1 tc2
|
||||
test_ _ = error "expected 2 chat clients"
|
||||
|
||||
testChat3 :: Profile -> Profile -> Profile -> (TestCC -> TestCC -> TestCC -> IO ()) -> IO ()
|
||||
testChat3 p1 p2 p3 test = testChatN [p1, p2, p3] test_
|
||||
where
|
||||
test_ :: [TestCC] -> IO ()
|
||||
test_ [tc1, tc2, tc3] = test tc1 tc2 tc3
|
||||
test_ _ = error "expected 3 chat clients"
|
||||
|
||||
testChat4 :: Profile -> Profile -> Profile -> Profile -> (TestCC -> TestCC -> TestCC -> TestCC -> IO ()) -> IO ()
|
||||
testChat4 p1 p2 p3 p4 test = testChatN [p1, p2, p3, p4] test_
|
||||
where
|
||||
test_ :: [TestCC] -> IO ()
|
||||
test_ [tc1, tc2, tc3, tc4] = test tc1 tc2 tc3 tc4
|
||||
test_ _ = error "expected 4 chat clients"
|
||||
|
||||
concurrentlyN_ :: [IO a] -> IO ()
|
||||
concurrentlyN_ = mapConcurrently_ id
|
||||
|
||||
serverCfg :: ServerConfig
|
||||
serverCfg =
|
||||
ServerConfig
|
||||
{ transports = [(serverPort, transport @TLS)],
|
||||
tbqSize = 1,
|
||||
serverTbqSize = 1,
|
||||
msgQueueQuota = 4,
|
||||
queueIdBytes = 12,
|
||||
msgIdBytes = 6,
|
||||
storeLog = Nothing,
|
||||
caCertificateFile = "tests/fixtures/tls/ca.crt",
|
||||
privateKeyFile = "tests/fixtures/tls/server.key",
|
||||
certificateFile = "tests/fixtures/tls/server.crt"
|
||||
}
|
||||
|
||||
withSmpServer :: IO a -> IO a
|
||||
withSmpServer = serverBracket (`runSMPServerBlocking` serverCfg) (pure ()) . const
|
||||
|
||||
serverBracket :: (TMVar Bool -> IO ()) -> IO () -> (ThreadId -> IO a) -> IO a
|
||||
serverBracket process afterProcess f = do
|
||||
started <- newEmptyTMVarIO
|
||||
bracket
|
||||
(forkIOWithUnmask ($ process started))
|
||||
(\t -> killThread t >> afterProcess >> waitFor started "stop")
|
||||
(\t -> waitFor started "start" >> f t)
|
||||
where
|
||||
waitFor started s =
|
||||
5000000 `timeout` atomically (takeTMVar started) >>= \case
|
||||
Nothing -> error $ "server did not " <> s
|
||||
_ -> pure ()
|
||||
Reference in New Issue
Block a user