mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 14:16:00 +00:00
test concurrent send and receipts (#866)
This commit is contained in:
@@ -43,6 +43,7 @@ import Data.Int (Int64)
|
||||
import qualified Data.Map as M
|
||||
import Data.Maybe (isNothing)
|
||||
import qualified Data.Set as S
|
||||
import Data.Time.Clock (diffUTCTime, getCurrentTime)
|
||||
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
|
||||
import Data.Type.Equality
|
||||
import SMPAgentClient
|
||||
@@ -310,6 +311,7 @@ functionalAPITests t = do
|
||||
describe "Delivery receipts" $ do
|
||||
it "should send and receive delivery receipt" $ withSmpServer t testDeliveryReceipts
|
||||
it "should send delivery receipt only in connection v3+" $ testDeliveryReceiptsVersion t
|
||||
it "send delivery receipts concurrently with messages" $ testDeliveryReceiptsConcurrent t
|
||||
|
||||
testBasicAuth :: ATransport -> Bool -> (Maybe BasicAuth, Version) -> (Maybe BasicAuth, Version) -> (Maybe BasicAuth, Version) -> IO Int
|
||||
testBasicAuth t allowNewQueues srv@(srvAuth, srvVersion) clnt1 clnt2 = do
|
||||
@@ -1928,6 +1930,57 @@ testDeliveryReceiptsVersion t = do
|
||||
disconnectAgentClient a'
|
||||
disconnectAgentClient b'
|
||||
|
||||
testDeliveryReceiptsConcurrent :: (HasCallStack) => ATransport -> IO ()
|
||||
testDeliveryReceiptsConcurrent t =
|
||||
withSmpServerConfigOn t cfg {msgQueueQuota = 128} testPort $ \_ -> do
|
||||
withAgentClients2 $ \a b -> do
|
||||
(aId, bId) <- runRight $ makeConnection a b
|
||||
t1 <- liftIO getCurrentTime
|
||||
concurrently_ (runClient "a" a bId) (runClient "b" b aId)
|
||||
t2 <- liftIO getCurrentTime
|
||||
diffUTCTime t2 t1 `shouldSatisfy` (< 15)
|
||||
liftIO $ noMessages a "nothing else should be delivered to alice"
|
||||
liftIO $ noMessages b "nothing else should be delivered to bob"
|
||||
where
|
||||
runClient :: String -> AgentClient -> ConnId -> IO ()
|
||||
runClient _cName client connId = do
|
||||
concurrently_ send receive
|
||||
where
|
||||
numMsgs = 100
|
||||
send = runRight_ $
|
||||
replicateM_ numMsgs $ do
|
||||
-- liftIO $ print $ cName <> ": sendMessage"
|
||||
void $ sendMessage client connId SMP.noMsgFlags "hello"
|
||||
receive =
|
||||
runRight_ $
|
||||
-- for each sent message: 1 SENT, 1 RCVD, 1 OK for acknowledging RCVD
|
||||
-- for each received message: 1 MSG, 1 OK for acknowledging MSG
|
||||
receiveLoop (numMsgs * 5)
|
||||
receiveLoop :: Int -> ExceptT AgentErrorType IO ()
|
||||
receiveLoop 0 = pure ()
|
||||
receiveLoop n = do
|
||||
r <- getWithTimeout
|
||||
case r of
|
||||
(_, _, SENT _) -> do
|
||||
-- liftIO $ print $ cName <> ": SENT"
|
||||
pure ()
|
||||
(_, _, MSG MsgMeta {recipient = (msgId, _), integrity = MsgOk} _ _) -> do
|
||||
-- liftIO $ print $ cName <> ": MSG " <> show msgId
|
||||
ackMessageAsync client (B.pack . show $ n) connId msgId (Just "")
|
||||
(_, _, RCVD MsgMeta {recipient = (msgId, _), integrity = MsgOk} _) -> do
|
||||
-- liftIO $ print $ cName <> ": RCVD " <> show msgId
|
||||
ackMessageAsync client (B.pack . show $ n) connId msgId Nothing
|
||||
(_, _, OK) -> do
|
||||
-- liftIO $ print $ cName <> ": OK"
|
||||
pure ()
|
||||
r' -> error $ "unexpected event: " <> show r'
|
||||
receiveLoop (n - 1)
|
||||
getWithTimeout :: ExceptT AgentErrorType IO (AEntityTransmission 'AEConn)
|
||||
getWithTimeout = do
|
||||
1000000 `timeout` get client >>= \case
|
||||
Just r -> pure r
|
||||
_ -> error "timeout"
|
||||
|
||||
testTwoUsers :: HasCallStack => IO ()
|
||||
testTwoUsers = withAgentClients2 $ \a b -> do
|
||||
let nc = netCfg initAgentServers
|
||||
|
||||
Reference in New Issue
Block a user