From c9c075fd49689649a63273d27de1317421663c79 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Wed, 23 Oct 2024 09:17:23 +0100 Subject: [PATCH] smp server: merge quota messages and set queue to "over quota" state after restoring, server tests with journal and memory store (#1380) * smp: run server tests with journal and memory store * merge quota messages, set queue to "over quota" state after restoring * fix test --- src/Simplex/Messaging/Server.hs | 61 +++-- .../Messaging/Server/MsgStore/Journal.hs | 33 +-- src/Simplex/Messaging/Server/MsgStore/STM.hs | 13 +- .../Messaging/Server/MsgStore/Types.hs | 14 +- tests/CoreTests/MsgStoreTests.hs | 13 +- tests/NtfServerTests.hs | 2 +- tests/SMPClient.hs | 64 ++--- tests/ServerTests.hs | 228 +++++++++--------- tests/Test.hs | 13 +- 9 files changed, 243 insertions(+), 198 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index efde4e667..0574b9387 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -155,7 +155,7 @@ smpServer :: TMVar Bool -> ServerConfig -> Maybe AttachHTTP -> M () smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHTTP_ = do s <- asks server pa <- asks proxyAgent - msgStats <- restoreServerMessages + msgStats <- processServerMessages ntfStats <- restoreServerNtfs liftIO $ printMessageStats "messages" msgStats liftIO $ printMessageStats "notifications" ntfStats @@ -215,8 +215,6 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT saveServer :: Bool -> M () saveServer drainMsgs = do withLog closeStoreLog - AMS _ ms <- asks msgStore - liftIO $ closeMsgStore ms saveServerMessages drainMsgs saveServerNtfs saveServerStats @@ -383,7 +381,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT void $ withActiveMsgQueues ms (\_ -> expireQueueMsgs stats old) 0 where expireQueueMsgs stats old q acc = - runExceptT (deleteExpiredMsgs q old) >>= \case + runExceptT (deleteExpiredMsgs q True old) >>= \case Right deleted -> (acc + deleted) <$ atomicModifyIORef'_ (msgExpired stats) (+ deleted) Left _ -> pure 0 @@ -1476,7 +1474,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s expireMessages :: MsgStoreClass s => MsgQueue s -> Maybe ExpirationConfig -> ServerStats -> ExceptT ErrorType IO () expireMessages q msgExp stats = do - deleted <- maybe (pure 0) (deleteExpiredMsgs q <=< liftIO . expireBeforeEpoch) msgExp + deleted <- maybe (pure 0) (deleteExpiredMsgs q True <=< liftIO . expireBeforeEpoch) msgExp liftIO $ when (deleted > 0) $ atomicModifyIORef'_ (msgExpired stats) (+ deleted) -- The condition for delivery of the message is: @@ -1733,7 +1731,9 @@ saveServerMessages drainMsgs = AMS SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of Just f -> liftIO $ exportMessages ms f drainMsgs Nothing -> logInfo "undelivered messages are not saved" - AMS SMSJournal _ -> logInfo "closed journal message storage" + AMS SMSJournal ms -> do + liftIO $ closeMsgStore ms + logInfo "closed journal message storage" exportMessages :: MsgStoreClass s => s -> FilePath -> Bool -> IO () exportMessages ms f drainMsgs = do @@ -1744,8 +1744,8 @@ exportMessages ms f drainMsgs = do saveQueueMsgs h rId q acc = getQueueMessages drainMsgs q >>= \msgs -> (acc + length msgs) <$ BLD.hPutBuilder h (encodeMessages rId msgs) encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n') -restoreServerMessages :: M MessageStats -restoreServerMessages = do +processServerMessages :: M MessageStats +processServerMessages = do old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch) asks msgStore >>= liftIO . processMessages old_ where @@ -1772,23 +1772,25 @@ restoreServerMessages = do exitFailure where expireQueue = do - expired'' <- deleteExpiredMsgs q old + expired'' <- deleteExpiredMsgs q False old stored'' <- liftIO $ getQueueSize q + liftIO $ logQueueState q liftIO $ closeMsgQueue q pure (stored'', expired'') processValidateQueue q (!stored, !qCount) = getQueueSize q >>= \stored' -> pure (stored + stored', qCount + 1) -importMessages :: MsgStoreClass s => s -> FilePath -> Maybe Int64 -> IO MessageStats +importMessages :: forall s. MsgStoreClass s => s -> FilePath -> Maybe Int64 -> IO MessageStats importMessages ms f old_ = do logInfo $ "restoring messages from file " <> T.pack f - LB.readFile f >>= runExceptT . foldM restoreMsg (0, 0, 0) . zip [0..] . LB.lines >>= \case + LB.readFile f >>= runExceptT . foldM restoreMsg (0, Nothing, (0, 0, M.empty)) . zip [0..] . LB.lines >>= \case Left e -> do putStrLn "" logError . T.pack $ "error restoring messages: " <> e liftIO exitFailure - Right (lineCount, storedMsgsCount, expiredMsgsCount) -> do + Right (lineCount, _, (storedMsgsCount, expiredMsgsCount, overQuota)) -> do putStrLn $ "Processed " <> show lineCount <> " lines" renameFile f $ f <> ".bak" + mapM_ setOverQuota_ overQuota logQueueStates ms storedQueues <- M.size <$> readTVarIO (activeMsgQueues ms) pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues} @@ -1796,24 +1798,37 @@ importMessages ms f old_ = do progress i = do liftIO $ putStr $ "Processed " <> show i <> " lines\r" hFlush stdout - restoreMsg :: (Int, Int, Int) -> (Int, LB.ByteString) -> ExceptT String IO (Int, Int, Int) - restoreMsg (!lineCount, !stored, !expired) (i, s') = do + restoreMsg :: (Int, Maybe (RecipientId, MsgQueue s), (Int, Int, M.Map RecipientId (MsgQueue s))) -> (Int, LB.ByteString) -> ExceptT String IO (Int, Maybe (RecipientId, MsgQueue s), (Int, Int, M.Map RecipientId (MsgQueue s))) + restoreMsg (!lineCount, q_, (!stored, !expired, !overQuota)) (i, s') = do when (i `mod` 1000 == 0) $ progress i MLRv3 rId msg <- liftEither . first (msgErr "parsing") $ strDecode s liftError show $ addToMsgQueue rId msg where s = LB.toStrict s' addToMsgQueue rId msg = do - q <- getMsgQueue ms rId - (isExpired, logFull) <- case msg of + q <- case q_ of + -- to avoid lookup when restoring the next message to the same queue + Just (rId', q') | rId' == rId -> pure q' + _ -> getMsgQueue ms rId + (lineCount + 1,Just (rId, q),) <$> case msg of Message {msgTs} - | maybe True (systemSeconds msgTs >=) old_ -> (False,) . isNothing <$> writeMsg ms q False msg - | otherwise -> pure (True, False) - MessageQuota {} -> writeMsg ms q False msg $> (False, False) - when logFull . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg) - let !stored' = if logFull || isExpired then stored else stored + 1 - !expired' = if isExpired then expired + 1 else expired - pure (lineCount + 1, stored', expired') + | maybe True (systemSeconds msgTs >=) old_ -> do + writeMsg ms q False msg >>= \case + Just _ -> pure (stored + 1, expired, overQuota) + Nothing -> do + logError $ decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg) + pure (stored, expired, overQuota) + | otherwise -> pure (stored, expired + 1, overQuota) + MessageQuota {} -> + -- queue was over quota at some point, + -- it will be set as over quota once fully imported + mergeQuotaMsgs >> writeMsg ms q False msg $> (stored, expired, M.insert rId q overQuota) + where + -- if the first message in queue head is "quota", remove it. + mergeQuotaMsgs = isolateQueue q "mergeQuotaMsgs" $ + tryPeekMsg_ q >>= \case + Just MessageQuota {} -> tryDeleteMsg_ q False + _ -> pure () msgErr :: Show e => String -> e -> String msgErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s) diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 13df614a8..564a9a804 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -28,7 +28,7 @@ module Simplex.Messaging.Server.MsgStore.Journal readWriteQueueState, newMsgQueueState, newJournalId, - logQueueState, + appendState, queueLogFileName, logFileExt, ) @@ -255,11 +255,12 @@ instance MsgStoreClass JournalMsgStore where (Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping")) logQueueStates :: JournalMsgStore -> IO () - logQueueStates st = withActiveMsgQueues st (\_ q _ -> logState q) () - where - logState q = - readTVarIO (handles q) - >>= maybe (pure ()) (\hs -> readTVarIO (state q) >>= logQueueState (stateHandle hs)) + logQueueStates ms = withActiveMsgQueues ms (\_ q _ -> logQueueState q) () + + logQueueState :: JournalMsgQueue -> IO () + logQueueState q = + readTVarIO (handles q) + >>= maybe (pure ()) (\hs -> readTVarIO (state q) >>= appendState (stateHandle hs)) getMsgQueue :: JournalMsgStore -> RecipientId -> ExceptT ErrorType IO JournalMsgQueue getMsgQueue ms@JournalMsgStore {queueLocks, msgQueues, random} rId = @@ -318,7 +319,7 @@ instance MsgStoreClass JournalMsgStore where else pure Nothing where JournalStoreConfig {quota, maxMsgCount} = config ms - msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg} + !msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg} writeToJournal st@MsgQueueState {writeState, readState = rs, size} canWrt' msg' = do let msgStr = strEncode msg' `B.snoc` '\n' msgLen = fromIntegral $ B.length msgStr @@ -349,6 +350,10 @@ instance MsgStoreClass JournalMsgStore where atomically $ writeTVar handles $ Just $ hs {writeHandle = Just wh} pure (newJournalState journalId, wh) + -- can ONLY be used while restoring messages, not while server running + setOverQuota_ :: JournalMsgQueue -> IO () + setOverQuota_ JournalMsgQueue {state} = atomically $ modifyTVar' state $ \st -> st {canWrite = False} + getQueueSize :: JournalMsgQueue -> IO Int getQueueSize JournalMsgQueue {state} = size <$> readTVarIO state @@ -363,13 +368,13 @@ instance MsgStoreClass JournalMsgStore where atomically $ writeTVar tipMsg $ Just (Just ml) pure $ Just msg - tryDeleteMsg_ :: JournalMsgQueue -> StoreIO () - tryDeleteMsg_ q@JournalMsgQueue {tipMsg, handles} = StoreIO $ + tryDeleteMsg_ :: JournalMsgQueue -> Bool -> StoreIO () + tryDeleteMsg_ q@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ void $ readTVarIO tipMsg -- if there is no cached tipMsg, do nothing $>>= (pure . fmap snd) $>>= \len -> readTVarIO handles - $>>= \hs -> updateReadPos q True len hs $> Just () + $>>= \hs -> updateReadPos q logState len hs $> Just () isolateQueue :: JournalMsgQueue -> String -> StoreIO a -> ExceptT ErrorType IO a isolateQueue q op (StoreIO a) = tryStore op $ withLock' (queueLock $ queue q) op $ a @@ -416,11 +421,11 @@ chooseReadJournal q log' hs = do updateQueueState :: JournalMsgQueue -> Bool -> MsgQueueHandles -> MsgQueueState -> IO () updateQueueState q log' hs st = do unless (validQueueState st) $ E.throwIO $ userError $ "updating to invalid state: " <> show st - when log' $ logQueueState (stateHandle hs) st + when log' $ appendState (stateHandle hs) st atomically $ writeTVar (state q) st -logQueueState :: Handle -> MsgQueueState -> IO () -logQueueState h st = B.hPutStr h $ strEncode st `B.snoc` '\n' +appendState :: Handle -> MsgQueueState -> IO () +appendState h st = B.hPutStr h $ strEncode st `B.snoc` '\n' updateReadPos :: JournalMsgQueue -> Bool -> Int64 -> MsgQueueHandles -> IO () updateReadPos q log' len hs = do @@ -555,7 +560,7 @@ readWriteQueueState JournalMsgStore {random, config} statePath = pure r writeQueueState st = do sh <- openFile statePath AppendMode - logQueueState sh st + appendState sh st pure (st, sh) validQueueState :: MsgQueueState -> Bool diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 17591650c..b08e99abe 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -62,6 +62,8 @@ instance MsgStoreClass STMMsgStore where logQueueStates _ = pure () + logQueueState _ = pure () + -- The reason for double lookup is that majority of messaging queues exist, -- because multiple messages are sent to the same queue, -- so the first lookup without STM transaction will return the queue faster. @@ -104,10 +106,13 @@ instance MsgStoreClass STMMsgStore where modifyTVar' size (+ 1) if canWrt' then writeTQueue q msg $> Just (msg, empty) - else (writeTQueue q $! msgQuota) $> Nothing + else writeTQueue q msgQuota $> Nothing else pure Nothing where - msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg} + !msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg} + + setOverQuota_ :: STMMsgQueue -> IO () + setOverQuota_ q = atomically $ writeTVar (canWrite q) False getQueueSize :: STMMsgQueue -> IO Int getQueueSize STMMsgQueue {size} = readTVarIO size @@ -116,8 +121,8 @@ instance MsgStoreClass STMMsgStore where tryPeekMsg_ = tryPeekTQueue . msgQueue {-# INLINE tryPeekMsg_ #-} - tryDeleteMsg_ :: STMMsgQueue -> STM () - tryDeleteMsg_ STMMsgQueue {msgQueue = q, size} = + tryDeleteMsg_ :: STMMsgQueue -> Bool -> STM () + tryDeleteMsg_ STMMsgQueue {msgQueue = q, size} _logState = tryReadTQueue q >>= \case Just _ -> modifyTVar' size (subtract 1) _ -> pure () diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index dbaefba6f..cd480b0ae 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -26,14 +26,16 @@ class Monad (StoreMonad s) => MsgStoreClass s where activeMsgQueues :: s -> TMap RecipientId (MsgQueue s) withAllMsgQueues :: s -> (RecipientId -> MsgQueue s -> a -> IO a) -> a -> IO a logQueueStates :: s -> IO () + logQueueState :: MsgQueue s -> IO () getMsgQueue :: s -> RecipientId -> ExceptT ErrorType IO (MsgQueue s) delMsgQueue :: s -> RecipientId -> IO () delMsgQueueSize :: s -> RecipientId -> IO Int getQueueMessages :: Bool -> MsgQueue s -> IO [Message] writeMsg :: s -> MsgQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) + setOverQuota_ :: MsgQueue s -> IO () -- can ONLY be used while restoring messages, not while server running getQueueSize :: MsgQueue s -> IO Int tryPeekMsg_ :: MsgQueue s -> StoreMonad s (Maybe Message) - tryDeleteMsg_ :: MsgQueue s -> StoreMonad s () + tryDeleteMsg_ :: MsgQueue s -> Bool -> StoreMonad s () isolateQueue :: MsgQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a data MSType = MSMemory | MSJournal @@ -57,7 +59,7 @@ tryDelMsg mq msgId' = tryPeekMsg_ mq >>= \case msg_@(Just msg) | msgId msg == msgId' -> - tryDeleteMsg_ mq >> pure msg_ + tryDeleteMsg_ mq True >> pure msg_ _ -> pure Nothing -- atomic delete (== read) last and peek next message if available @@ -66,16 +68,16 @@ tryDelPeekMsg mq msgId' = isolateQueue mq "tryDelPeekMsg" $ tryPeekMsg_ mq >>= \case msg_@(Just msg) - | msgId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ mq >> tryPeekMsg_ mq) + | msgId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ mq True >> tryPeekMsg_ mq) | otherwise -> pure (Nothing, msg_) _ -> pure (Nothing, Nothing) -deleteExpiredMsgs :: MsgStoreClass s => MsgQueue s -> Int64 -> ExceptT ErrorType IO Int -deleteExpiredMsgs mq old = isolateQueue mq "deleteExpiredMsgs" $ loop 0 +deleteExpiredMsgs :: MsgStoreClass s => MsgQueue s -> Bool -> Int64 -> ExceptT ErrorType IO Int +deleteExpiredMsgs mq logState old = isolateQueue mq "deleteExpiredMsgs" $ loop 0 where loop dc = tryPeekMsg_ mq >>= \case Just Message {msgTs} | systemSeconds msgTs < old -> - tryDeleteMsg_ mq >> loop (dc + 1) + tryDeleteMsg_ mq logState >> loop (dc + 1) _ -> pure dc diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index c3344bfa6..eaa9cc25b 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -22,7 +22,7 @@ import Data.Time.Clock.System (getSystemTime) import Simplex.Messaging.Crypto (pattern MaxLenBS) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Protocol (EntityId (..), Message (..), noMsgFlags) -import Simplex.Messaging.Server (MessageStats (..), exportMessages, importMessages) +import Simplex.Messaging.Server (MessageStats (..), exportMessages, importMessages, printMessageStats) import Simplex.Messaging.Server.Env.STM (journalMsgStoreDepth) import Simplex.Messaging.Server.MsgStore.Journal import Simplex.Messaging.Server.MsgStore.STM @@ -163,14 +163,15 @@ testExportImportStore ms = do exportMessages ms testStoreMsgsFile False let cfg = (testJournalStoreCfg :: JournalStoreConfig) {storePath = testStoreMsgsDir2} ms' <- newMsgStore cfg - MessageStats {storedMsgsCount = 6, expiredMsgsCount = 0, storedQueues = 2} <- + stats@MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- importMessages ms' testStoreMsgsFile Nothing + printMessageStats "Messages" stats length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3 -- state file is backed up exportMessages ms' testStoreMsgsFile2 False (B.readFile testStoreMsgsFile2 `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".bak") stmStore <- newMsgStore testSMTStoreConfig - MessageStats {storedMsgsCount = 6, expiredMsgsCount = 0, storedQueues = 2} <- + MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- importMessages stmStore testStoreMsgsFile2 Nothing exportMessages stmStore testStoreMsgsFile False (B.sort <$> B.readFile testStoreMsgsFile `shouldReturn`) =<< (B.sort <$> B.readFile (testStoreMsgsFile2 <> ".bak")) @@ -183,7 +184,7 @@ testQueueState ms = do statePath = dir (queueLogFileName <> logFileExt) createDirectoryIfMissing True dir state <- newMsgQueueState <$> newJournalId (random ms) - withFile statePath WriteMode (`logQueueState` state) + withFile statePath WriteMode (`appendState` state) length . lines <$> readFile statePath `shouldReturn` 1 readQueueState statePath `shouldReturn` state length <$> listDirectory dir `shouldReturn` 1 -- no backup @@ -194,7 +195,7 @@ testQueueState ms = do readState = (readState state) {msgCount = 1, byteCount = 100}, writeState = (writeState state) {msgPos = 1, msgCount = 1, bytePos = 100, byteCount = 100} } - withFile statePath AppendMode (`logQueueState` state1) + withFile statePath AppendMode (`appendState` state1) length . lines <$> readFile statePath `shouldReturn` 2 readQueueState statePath `shouldReturn` state1 length <$> listDirectory dir `shouldReturn` 1 -- no backup @@ -205,7 +206,7 @@ testQueueState ms = do readState = (readState state) {msgCount = 2, byteCount = 200}, writeState = (writeState state) {msgPos = 2, msgCount = 2, bytePos = 200, byteCount = 200} } - withFile statePath AppendMode (`logQueueState` state2) + withFile statePath AppendMode (`appendState` state2) length . lines <$> readFile statePath `shouldReturn` 3 copyFile statePath (statePath <> ".2") readQueueState statePath `shouldReturn` state2 diff --git a/tests/NtfServerTests.hs b/tests/NtfServerTests.hs index b9560976d..aebbf0f72 100644 --- a/tests/NtfServerTests.hs +++ b/tests/NtfServerTests.hs @@ -105,7 +105,7 @@ testNotificationSubscription (ATransport t) = (dhPub, dhPriv :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g let tkn = DeviceToken PPApnsTest "abcd" withAPNSMockServer $ \apns -> - smpTest2 t $ \rh sh -> + smpTest2' t $ \rh sh -> ntfTest t $ \nh -> do -- create queue (sId, rId, rKey, rcvDhSecret) <- createAndSecureQueue rh sPub diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 0055e8503..0efb64c02 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -1,6 +1,7 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NumericUnderscores #-} @@ -107,19 +108,24 @@ testSMPClient_ host port vr client = do | otherwise = Nothing cfg :: ServerConfig -cfg = +cfg = cfgMS (AMSType SMSJournal) + +cfgMS :: AMSType -> ServerConfig +cfgMS msType = ServerConfig { transports = [], smpHandshakeTimeout = 60000000, tbqSize = 1, - msgStoreType = AMSType SMSJournal, + msgStoreType = msType, msgQueueQuota = 4, maxJournalMsgCount = 5, maxJournalStateLines = 2, queueIdBytes = 24, msgIdBytes = 24, storeLogFile = Just testStoreLogFile, - storeMsgsFile = Just testStoreMsgsDir, + storeMsgsFile = Just $ case msType of + AMSType SMSJournal -> testStoreMsgsDir + AMSType SMSMemory -> testStoreMsgsFile, storeNtfsFile = Nothing, allowNewQueues = True, newQueueBasicAuth = Nothing, @@ -178,18 +184,17 @@ proxyVRangeV8 :: VersionRangeSMP proxyVRangeV8 = mkVersionRange batchCmdsSMPVersion sendingProxySMPVersion withSmpServerStoreMsgLogOn :: HasCallStack => ATransport -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a -withSmpServerStoreMsgLogOn t = - withSmpServerConfigOn - t - cfg - { storeLogFile = Just testStoreLogFile, - storeMsgsFile = Just testStoreMsgsDir, - storeNtfsFile = Just testStoreNtfsFile, - serverStatsBackupFile = Just testServerStatsBackupFile - } +withSmpServerStoreMsgLogOn = (`withSmpServerStoreMsgLogOnMS` AMSType SMSJournal) + +withSmpServerStoreMsgLogOnMS :: HasCallStack => ATransport -> AMSType -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a +withSmpServerStoreMsgLogOnMS t msType = + withSmpServerConfigOn t (cfgMS msType) {storeNtfsFile = Just testStoreNtfsFile, serverStatsBackupFile = Just testServerStatsBackupFile} withSmpServerStoreLogOn :: HasCallStack => ATransport -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a -withSmpServerStoreLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, serverStatsBackupFile = Just testServerStatsBackupFile} +withSmpServerStoreLogOn = (`withSmpServerStoreLogOnMS` AMSType SMSJournal) + +withSmpServerStoreLogOnMS :: HasCallStack => ATransport -> AMSType -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a +withSmpServerStoreLogOnMS t msType = withSmpServerConfigOn t (cfgMS msType) {serverStatsBackupFile = Just testServerStatsBackupFile} withSmpServerConfigOn :: HasCallStack => ATransport -> ServerConfig -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a withSmpServerConfigOn t cfg' port' = @@ -222,11 +227,11 @@ withSmpServer t = withSmpServerOn t testPort withSmpServerProxy :: HasCallStack => ATransport -> IO a -> IO a withSmpServerProxy t = withSmpServerConfigOn t proxyCfg testPort . const -runSmpTest :: forall c a. (HasCallStack, Transport c) => (HasCallStack => THandleSMP c 'TClient -> IO a) -> IO a -runSmpTest test = withSmpServer (transport @c) $ testSMPClient test +runSmpTest :: forall c a. (HasCallStack, Transport c) => AMSType -> (HasCallStack => THandleSMP c 'TClient -> IO a) -> IO a +runSmpTest msType test = withSmpServerConfigOn (transport @c) (cfgMS msType) testPort $ \_ -> testSMPClient test -runSmpTestN :: forall c a. (HasCallStack, Transport c) => Int -> (HasCallStack => [THandleSMP c 'TClient] -> IO a) -> IO a -runSmpTestN = runSmpTestNCfg cfg supportedClientSMPRelayVRange +runSmpTestN :: forall c a. (HasCallStack, Transport c) => AMSType -> Int -> (HasCallStack => [THandleSMP c 'TClient] -> IO a) -> IO a +runSmpTestN msType = runSmpTestNCfg (cfgMS msType) supportedClientSMPRelayVRange runSmpTestNCfg :: forall c a. (HasCallStack, Transport c) => ServerConfig -> VersionRangeSMP -> Int -> (HasCallStack => [THandleSMP c 'TClient] -> IO a) -> IO a runSmpTestNCfg srvCfg clntVR nClients test = withSmpServerConfigOn (transport @c) srvCfg testPort $ \_ -> run nClients [] @@ -241,7 +246,7 @@ smpServerTest :: TProxy c -> (Maybe TransmissionAuth, ByteString, ByteString, smp) -> IO (Maybe TransmissionAuth, ByteString, ByteString, BrokerMsg) -smpServerTest _ t = runSmpTest $ \h -> tPut' h t >> tGet' h +smpServerTest _ t = runSmpTest (AMSType SMSJournal) $ \h -> tPut' h t >> tGet' h where tPut' :: THandleSMP c 'TClient -> (Maybe TransmissionAuth, ByteString, ByteString, smp) -> IO () tPut' h@THandle {params = THandleParams {sessionId, implySessId}} (sig, corrId, queueId, smp) = do @@ -252,14 +257,17 @@ smpServerTest _ t = runSmpTest $ \h -> tPut' h t >> tGet' h [(Nothing, _, (CorrId corrId, EntityId qId, Right cmd))] <- tGet h pure (Nothing, corrId, qId, cmd) -smpTest :: (HasCallStack, Transport c) => TProxy c -> (HasCallStack => THandleSMP c 'TClient -> IO ()) -> Expectation -smpTest _ test' = runSmpTest test' `shouldReturn` () +smpTest :: (HasCallStack, Transport c) => TProxy c -> AMSType -> (HasCallStack => THandleSMP c 'TClient -> IO ()) -> Expectation +smpTest _ msType test' = runSmpTest msType test' `shouldReturn` () -smpTestN :: (HasCallStack, Transport c) => Int -> (HasCallStack => [THandleSMP c 'TClient] -> IO ()) -> Expectation -smpTestN n test' = runSmpTestN n test' `shouldReturn` () +smpTestN :: (HasCallStack, Transport c) => AMSType -> Int -> (HasCallStack => [THandleSMP c 'TClient] -> IO ()) -> Expectation +smpTestN msType n test' = runSmpTestN msType n test' `shouldReturn` () -smpTest2 :: forall c. (HasCallStack, Transport c) => TProxy c -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation -smpTest2 = smpTest2Cfg cfg supportedClientSMPRelayVRange +smpTest2' :: forall c. (HasCallStack, Transport c) => TProxy c -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation +smpTest2' = (`smpTest2` AMSType SMSJournal) + +smpTest2 :: forall c. (HasCallStack, Transport c) => TProxy c -> AMSType -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation +smpTest2 t msType = smpTest2Cfg (cfgMS msType) supportedClientSMPRelayVRange t smpTest2Cfg :: forall c. (HasCallStack, Transport c) => ServerConfig -> VersionRangeSMP -> TProxy c -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation smpTest2Cfg srvCfg clntVR _ test' = runSmpTestNCfg srvCfg clntVR 2 _test `shouldReturn` () @@ -268,15 +276,15 @@ smpTest2Cfg srvCfg clntVR _ test' = runSmpTestNCfg srvCfg clntVR 2 _test `should _test [h1, h2] = test' h1 h2 _test _ = error "expected 2 handles" -smpTest3 :: forall c. (HasCallStack, Transport c) => TProxy c -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation -smpTest3 _ test' = smpTestN 3 _test +smpTest3 :: forall c. (HasCallStack, Transport c) => TProxy c -> AMSType -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation +smpTest3 _ msType test' = smpTestN msType 3 _test where _test :: HasCallStack => [THandleSMP c 'TClient] -> IO () _test [h1, h2, h3] = test' h1 h2 h3 _test _ = error "expected 3 handles" -smpTest4 :: forall c. (HasCallStack, Transport c) => TProxy c -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation -smpTest4 _ test' = smpTestN 4 _test +smpTest4 :: forall c. (HasCallStack, Transport c) => TProxy c -> AMSType -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation +smpTest4 _ msType test' = smpTestN msType 4 _test where _test :: HasCallStack => [THandleSMP c 'TClient] -> IO () _test [h1, h2, h3, h4] = test' h1 h2 h3 h4 diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 965967878..c133d94fc 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -51,32 +51,31 @@ import System.Timeout import Test.HUnit import Test.Hspec -serverTests :: ATransport -> Spec -serverTests t@(ATransport t') = do - describe "SMP syntax" $ syntaxTests t +serverTests :: SpecWith (ATransport, AMSType) +serverTests = do describe "SMP queues" $ do - describe "NEW and KEY commands, SEND messages" $ testCreateSecure t + describe "NEW and KEY commands, SEND messages" testCreateSecure describe "NEW and SKEY commands" $ do - testCreateSndSecure t - testSndSecureProhibited t - describe "NEW, OFF and DEL commands, SEND messages" $ testCreateDelete t - describe "Stress test" $ stressTest t - describe "allowNewQueues setting" $ testAllowNewQueues t' + testCreateSndSecure + testSndSecureProhibited + describe "NEW, OFF and DEL commands, SEND messages" testCreateDelete + describe "Stress test" stressTest + describe "allowNewQueues setting" testAllowNewQueues describe "SMP messages" $ do - describe "duplex communication over 2 SMP connections" $ testDuplex t - describe "switch subscription to another TCP connection" $ testSwitchSub t - describe "GET command" $ testGetCommand t' - describe "GET & SUB commands" $ testGetSubCommands t' - describe "Exceeding queue quota" $ testExceedQueueQuota t' - describe "Store log" $ testWithStoreLog t - describe "Restore messages" $ testRestoreMessages t - describe "Restore messages (old / v2)" $ testRestoreExpireMessages t - describe "Timing of AUTH error" $ testTiming t - describe "Message notifications" $ testMessageNotifications t + describe "duplex communication over 2 SMP connections" testDuplex + describe "switch subscription to another TCP connection" testSwitchSub + describe "GET command" testGetCommand + describe "GET & SUB commands" testGetSubCommands + describe "Exceeding queue quota" testExceedQueueQuota + describe "Store log" testWithStoreLog + describe "Restore messages" testRestoreMessages + describe "Restore messages (old / v2)" testRestoreExpireMessages + describe "Timing of AUTH error" testTiming + describe "Message notifications" testMessageNotifications describe "Message expiration" $ do - testMsgExpireOnSend t' - testMsgExpireOnInterval t' - testMsgNOTExpireOnInterval t' + testMsgExpireOnSend + testMsgExpireOnInterval + testMsgNOTExpireOnInterval pattern Resp :: CorrId -> QueueId -> BrokerMsg -> SignedTransmission ErrorType BrokerMsg pattern Resp corrId queueId command <- (_, _, (corrId, queueId, Right command)) @@ -136,10 +135,10 @@ decryptMsgV3 dhShared nonce body = Right ClientRcvMsgQuota {} -> Left "ClientRcvMsgQuota" Left e -> Left e -testCreateSecure :: ATransport -> Spec -testCreateSecure (ATransport t) = - it "should create (NEW) and secure (KEY) queue" $ - smpTest2 t $ \r s -> do +testCreateSecure :: SpecWith (ATransport, AMSType) +testCreateSecure = + it "should create (NEW) and secure (KEY) queue" $ \(ATransport t, msType) -> + smpTest2 t msType $ \r s -> do g <- C.newRandom (rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd448 g (dhPub, dhPriv :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g @@ -201,10 +200,10 @@ testCreateSecure (ATransport t) = Resp "bcda" _ (ERR LARGE_MSG) <- signSendRecv s sKey ("bcda", sId, _SEND biggerMessage) pure () -testCreateSndSecure :: ATransport -> Spec -testCreateSndSecure (ATransport t) = - it "should create (NEW) and secure (SKEY) queue by sender" $ - smpTest2 t $ \r s -> do +testCreateSndSecure :: SpecWith (ATransport, AMSType) +testCreateSndSecure = + it "should create (NEW) and secure (SKEY) queue by sender" $ \(ATransport t, msType) -> + smpTest2 t msType $ \r s -> do g <- C.newRandom (rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd448 g (dhPub, dhPriv :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g @@ -248,10 +247,10 @@ testCreateSndSecure (ATransport t) = Resp "bcda" _ (ERR LARGE_MSG) <- signSendRecv s sKey ("bcda", sId, _SEND biggerMessage) pure () -testSndSecureProhibited :: ATransport -> Spec -testSndSecureProhibited (ATransport t) = - it "should create (NEW) without allowing sndSecure and fail to and secure queue by sender (SKEY)" $ - smpTest2 t $ \r s -> do +testSndSecureProhibited :: SpecWith (ATransport, AMSType) +testSndSecureProhibited = + it "should create (NEW) without allowing sndSecure and fail to and secure queue by sender (SKEY)" $ \(ATransport t, msType) -> + smpTest2 t msType $ \r s -> do g <- C.newRandom (rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd448 g (dhPub, _dhPriv :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g @@ -263,10 +262,10 @@ testSndSecureProhibited (ATransport t) = (sId2, sId) #== "secures queue, same queue ID in response" (err, ERR AUTH) #== "rejects SKEY when not allowed in NEW command" -testCreateDelete :: ATransport -> Spec -testCreateDelete (ATransport t) = - it "should create (NEW), suspend (OFF) and delete (DEL) queue" $ - smpTest2 t $ \rh sh -> do +testCreateDelete :: SpecWith (ATransport, AMSType) +testCreateDelete = + it "should create (NEW), suspend (OFF) and delete (DEL) queue" $ \(ATransport t, msType) -> + smpTest2 t msType $ \rh sh -> do g <- C.newRandom (rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g (dhPub, dhPriv :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g @@ -334,10 +333,10 @@ testCreateDelete (ATransport t) = Resp "cdab" _ err10 <- signSendRecv rh rKey ("cdab", rId, SUB) (err10, ERR AUTH) #== "rejects SUB when deleted" -stressTest :: ATransport -> Spec -stressTest (ATransport t) = - it "should create many queues, disconnect and re-connect" $ - smpTest3 t $ \h1 h2 h3 -> do +stressTest :: SpecWith (ATransport, AMSType) +stressTest = + it "should create many queues, disconnect and re-connect" $ \(ATransport t, msType) -> + smpTest3 t msType $ \h1 h2 h3 -> do g <- C.newRandom (rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g (dhPub, _ :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g @@ -352,10 +351,10 @@ stressTest (ATransport t) = closeConnection $ connection h2 subscribeQueues h3 -testAllowNewQueues :: forall c. Transport c => TProxy c -> Spec -testAllowNewQueues t = - it "should prohibit creating new queues with allowNewQueues = False" $ do - withSmpServerConfigOn (ATransport t) cfg {allowNewQueues = False} testPort $ \_ -> +testAllowNewQueues :: SpecWith (ATransport, AMSType) +testAllowNewQueues = + it "should prohibit creating new queues with allowNewQueues = False" $ \(ATransport (t :: TProxy c), msType) -> + withSmpServerConfigOn (ATransport t) (cfgMS msType) {allowNewQueues = False} testPort $ \_ -> testSMPClient @c $ \h -> do g <- C.newRandom (rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd448 g @@ -363,10 +362,10 @@ testAllowNewQueues t = Resp "abcd" NoEntity (ERR AUTH) <- signSendRecv h rKey ("abcd", NoEntity, NEW rPub dhPub Nothing SMSubscribe False) pure () -testDuplex :: ATransport -> Spec -testDuplex (ATransport t) = - it "should create 2 simplex connections and exchange messages" $ - smpTest2 t $ \alice bob -> do +testDuplex :: SpecWith (ATransport, AMSType) +testDuplex = + it "should create 2 simplex connections and exchange messages" $ \(ATransport t, msType) -> + smpTest2 t msType $ \alice bob -> do g <- C.newRandom (arPub, arKey) <- atomically $ C.generateAuthKeyPair C.SEd448 g (aDhPub, aDhPriv :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g @@ -418,10 +417,10 @@ testDuplex (ATransport t) = Resp "bcda" _ OK <- signSendRecv bob brKey ("bcda", bRcv, ACK mId5) (bDec mId5 msg5, Right "how are you bob") #== "message received from alice" -testSwitchSub :: ATransport -> Spec -testSwitchSub (ATransport t) = - it "should create simplex connections and switch subscription to another TCP connection" $ - smpTest3 t $ \rh1 rh2 sh -> do +testSwitchSub :: SpecWith (ATransport, AMSType) +testSwitchSub = + it "should create simplex connections and switch subscription to another TCP connection" $ \(ATransport t, msType) -> + smpTest3 t msType $ \rh1 rh2 sh -> do g <- C.newRandom (rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd448 g (dhPub, dhPriv :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g @@ -463,12 +462,12 @@ testSwitchSub (ATransport t) = Nothing -> return () Just _ -> error "nothing else is delivered to the 1st TCP connection" -testGetCommand :: forall c. Transport c => TProxy c -> Spec -testGetCommand t = - it "should retrieve messages from the queue using GET command" $ do +testGetCommand :: SpecWith (ATransport, AMSType) +testGetCommand = + it "should retrieve messages from the queue using GET command" $ \(ATransport (t :: TProxy c), msType) -> do g <- C.newRandom (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g - smpTest t $ \sh -> do + smpTest t msType $ \sh -> do queue <- newEmptyTMVarIO testSMPClient @c $ \rh -> atomically . putTMVar queue =<< createAndSecureQueue rh sPub @@ -482,12 +481,12 @@ testGetCommand t = Resp "4" _ OK <- signSendRecv rh rKey ("4", rId, GET) pure () -testGetSubCommands :: forall c. Transport c => TProxy c -> Spec -testGetSubCommands t = - it "should retrieve messages with GET and receive with SUB, only one ACK would work" $ do +testGetSubCommands :: SpecWith (ATransport, AMSType) +testGetSubCommands = + it "should retrieve messages with GET and receive with SUB, only one ACK would work" $ \(ATransport t, msType) -> do g <- C.newRandom (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g - smpTest3 t $ \rh1 rh2 sh -> do + smpTest3 t msType $ \rh1 rh2 sh -> do (sId, rId, rKey, dhShared) <- createAndSecureQueue rh1 sPub let dec = decryptMsgV3 dhShared Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello 1") @@ -532,10 +531,10 @@ testGetSubCommands t = Resp "12" _ OK <- signSendRecv rh2 rKey ("12", rId, GET) pure () -testExceedQueueQuota :: forall c. Transport c => TProxy c -> Spec -testExceedQueueQuota t = - it "should reply with ERR QUOTA to sender and send QUOTA message to the recipient" $ do - withSmpServerConfigOn (ATransport t) cfg {msgQueueQuota = 2} testPort $ \_ -> +testExceedQueueQuota :: SpecWith (ATransport, AMSType) +testExceedQueueQuota = + it "should reply with ERR QUOTA to sender and send QUOTA message to the recipient" $ \(ATransport (t :: TProxy c), msType) -> do + withSmpServerConfigOn (ATransport t) (cfgMS msType) {msgQueueQuota = 2} testPort $ \_ -> testSMPClient @c $ \sh -> testSMPClient @c $ \rh -> do g <- C.newRandom (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g @@ -559,9 +558,9 @@ testExceedQueueQuota t = Resp "10" _ OK <- signSendRecv rh rKey ("10", rId, ACK mId4) pure () -testWithStoreLog :: ATransport -> Spec -testWithStoreLog at@(ATransport t) = - it "should store simplex queues to log and restore them after server restart" $ do +testWithStoreLog :: SpecWith (ATransport, AMSType) +testWithStoreLog = + it "should store simplex queues to log and restore them after server restart" $ \(at@(ATransport t), msType) -> do g <- C.newRandom (sPub1, sKey1) <- atomically $ C.generateAuthKeyPair C.SEd25519 g (sPub2, sKey2) <- atomically $ C.generateAuthKeyPair C.SEd25519 g @@ -573,7 +572,7 @@ testWithStoreLog at@(ATransport t) = senderId2 <- newTVarIO NoEntity notifierId <- newTVarIO NoEntity - withSmpServerStoreLogOn at testPort . runTest t $ \h -> runClient t $ \h1 -> do + withSmpServerStoreLogOnMS at msType testPort . runTest t $ \h -> runClient t $ \h1 -> do (sId1, rId1, rKey1, dhShared) <- createAndSecureQueue h sPub1 (rcvNtfPubDhKey, _) <- atomically $ C.generateKeyPair g Resp "abcd" _ (NID nId _) <- signSendRecv h rKey1 ("abcd", rId1, NKEY nPub rcvNtfPubDhKey) @@ -606,14 +605,14 @@ testWithStoreLog at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 6 - let cfg' = cfg {msgStoreType = AMSType SMSMemory, storeLogFile = Nothing, storeMsgsFile = Nothing} + let cfg' = (cfgMS msType) {msgStoreType = AMSType SMSMemory, storeLogFile = Nothing, storeMsgsFile = Nothing} withSmpServerConfigOn at cfg' testPort . runTest t $ \h -> do sId1 <- readTVarIO senderId1 -- fails if store log is disabled Resp "bcda" _ (ERR AUTH) <- signSendRecv h sKey1 ("bcda", sId1, _SEND "hello") pure () - withSmpServerStoreLogOn at testPort . runTest t $ \h -> runClient t $ \h1 -> do + withSmpServerStoreLogOnMS at msType testPort . runTest t $ \h -> runClient t $ \h1 -> do -- this queue is restored rId1 <- readTVarIO recipientId1 Just rKey1 <- readTVarIO recipientKey1 @@ -647,9 +646,9 @@ logSize f = Right l -> pure l Left (_ :: SomeException) -> logSize f -testRestoreMessages :: ATransport -> Spec -testRestoreMessages at@(ATransport t) = - it "should store messages on exit and restore on start" $ do +testRestoreMessages :: SpecWith (ATransport, AMSType) +testRestoreMessages = + it "should store messages on exit and restore on start" $ \(at@(ATransport t), msType) -> do removeFileIfExists testStoreLogFile removeFileIfExists testStoreMsgsFile whenM (doesDirectoryExist testStoreMsgsDir) $ removeDirectoryRecursive testStoreMsgsDir @@ -662,7 +661,7 @@ testRestoreMessages at@(ATransport t) = dhShared <- newTVarIO Nothing senderId <- newTVarIO NoEntity - withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do + withSmpServerStoreMsgLogOnMS at msType testPort . runTest t $ \h -> do runClient t $ \h1 -> do (sId, rId, rKey, dh) <- createAndSecureQueue h1 sPub atomically $ do @@ -691,7 +690,7 @@ testRestoreMessages at@(ATransport t) = Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats1 [rId] 5 1 - withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do + withSmpServerStoreMsgLogOnMS at msType testPort . runTest t $ \h -> do Just rKey <- readTVarIO recipientKey Just dh <- readTVarIO dhShared let dec = decryptMsgV3 dh @@ -709,7 +708,7 @@ testRestoreMessages at@(ATransport t) = Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats2 [rId] 5 3 - withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do + withSmpServerStoreMsgLogOnMS at msType testPort . runTest t $ \h -> do Just rKey <- readTVarIO recipientKey Just dh <- readTVarIO dhShared let dec = decryptMsgV3 dh @@ -756,9 +755,9 @@ checkStats s qs sent received = do IS.toList _week `shouldBe` map (hash . unEntityId) qs IS.toList _month `shouldBe` map (hash . unEntityId) qs -testRestoreExpireMessages :: ATransport -> Spec -testRestoreExpireMessages at@(ATransport t) = - it "should store messages on exit and restore on start" $ do +testRestoreExpireMessages :: SpecWith (ATransport, AMSType) +testRestoreExpireMessages = + it "should store messages on exit and restore on start" $ \(at@(ATransport t), msType) -> do g <- C.newRandom (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g recipientId <- newTVarIO NoEntity @@ -766,7 +765,7 @@ testRestoreExpireMessages at@(ATransport t) = dhShared <- newTVarIO Nothing senderId <- newTVarIO NoEntity - withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do + withSmpServerStoreMsgLogOnMS at msType testPort . runTest t $ \h -> do runClient t $ \h1 -> do (sId, rId, rKey, dh) <- createAndSecureQueue h1 sPub atomically $ do @@ -783,36 +782,39 @@ testRestoreExpireMessages at@(ATransport t) = pure () logSize testStoreLogFile `shouldReturn` 2 - exportStoreMessages + exportStoreMessages msType msgs <- B.readFile testStoreMsgsFile length (B.lines msgs) `shouldBe` 4 let expCfg1 = Just ExpirationConfig {ttl = 86400, checkInterval = 43200} - cfg1 = cfg {messageExpiration = expCfg1, serverStatsBackupFile = Just testServerStatsBackupFile} + cfg1 = (cfgMS msType) {messageExpiration = expCfg1, serverStatsBackupFile = Just testServerStatsBackupFile} withSmpServerConfigOn at cfg1 testPort . runTest t $ \_ -> pure () logSize testStoreLogFile `shouldReturn` 1 - exportStoreMessages + exportStoreMessages msType msgs' <- B.readFile testStoreMsgsFile msgs' `shouldBe` msgs let expCfg2 = Just ExpirationConfig {ttl = 2, checkInterval = 43200} - cfg2 = cfg {messageExpiration = expCfg2, serverStatsBackupFile = Just testServerStatsBackupFile} + cfg2 = (cfgMS msType) {messageExpiration = expCfg2, serverStatsBackupFile = Just testServerStatsBackupFile} withSmpServerConfigOn at cfg2 testPort . runTest t $ \_ -> pure () logSize testStoreLogFile `shouldReturn` 1 -- two messages expired - exportStoreMessages + exportStoreMessages msType msgs'' <- B.readFile testStoreMsgsFile length (B.lines msgs'') `shouldBe` 2 B.lines msgs'' `shouldBe` drop 2 (B.lines msgs) Right ServerStatsData {_msgExpired} <- strDecode <$> B.readFile testServerStatsBackupFile _msgExpired `shouldBe` 2 where - exportStoreMessages = do - ms <- newMsgStore testJournalStoreCfg {quota = 4} - removeFileIfExists testStoreMsgsFile - exportMessages ms testStoreMsgsFile False + exportStoreMessages :: AMSType -> IO () + exportStoreMessages = \case + AMSType SMSJournal -> do + ms <- newMsgStore testJournalStoreCfg {quota = 4} + removeFileIfExists testStoreMsgsFile + exportMessages ms testStoreMsgsFile False + AMSType SMSMemory -> pure () runTest :: Transport c => TProxy c -> (THandleSMP c 'TClient -> IO ()) -> ThreadId -> Expectation runTest _ test' server = do testSMPClient test' `shouldReturn` () @@ -832,12 +834,12 @@ createAndSecureQueue h sPub = do (rId', rId) #== "same queue ID" pure (sId, rId, rKey, dhShared) -testTiming :: ATransport -> Spec -testTiming (ATransport t) = +testTiming :: SpecWith (ATransport, AMSType) +testTiming = describe "should have similar time for auth error, whether queue exists or not, for all key types" $ forM_ timingTests $ \tst -> - it (testName tst) $ - smpTest2Cfg cfg (mkVersionRange batchCmdsSMPVersion authCmdsSMPVersion) t $ \rh sh -> + it (testName tst) $ \(ATransport t, msType) -> + smpTest2Cfg (cfgMS msType) (mkVersionRange batchCmdsSMPVersion authCmdsSMPVersion) t $ \rh sh -> testSameTiming rh sh tst where testName :: (C.AuthAlg, C.AuthAlg, Int) -> String @@ -902,13 +904,13 @@ testTiming (ATransport t) = ] ok `shouldBe` True -testMessageNotifications :: ATransport -> Spec -testMessageNotifications (ATransport t) = - it "should create simplex connection, subscribe notifier and deliver notifications" $ do +testMessageNotifications :: SpecWith (ATransport, AMSType) +testMessageNotifications = + it "should create simplex connection, subscribe notifier and deliver notifications" $ \(ATransport t, msType) -> do g <- C.newRandom (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g (nPub, nKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g - smpTest4 t $ \rh sh nh1 nh2 -> do + smpTest4 t msType $ \rh sh nh1 nh2 -> do (sId, rId, rKey, dhShared) <- createAndSecureQueue rh sPub let dec = decryptMsgV3 dhShared (rcvNtfPubDhKey, _) <- atomically $ C.generateKeyPair g @@ -942,12 +944,12 @@ testMessageNotifications (ATransport t) = Nothing -> pure () Just _ -> error "nothing else should be delivered to the 2nd notifier's TCP connection" -testMsgExpireOnSend :: forall c. Transport c => TProxy c -> Spec -testMsgExpireOnSend t = - it "should expire messages that are not received before messageTTL on SEND" $ do +testMsgExpireOnSend :: SpecWith (ATransport, AMSType) +testMsgExpireOnSend = + it "should expire messages that are not received before messageTTL on SEND" $ \(ATransport (t :: TProxy c), msType) -> do g <- C.newRandom (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g - let cfg' = cfg {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 10000}} + let cfg' = (cfgMS msType) {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 10000}} withSmpServerConfigOn (ATransport t) cfg' testPort $ \_ -> testSMPClient @c $ \sh -> do (sId, rId, rKey, dhShared) <- testSMPClient @c $ \rh -> createAndSecureQueue rh sPub @@ -962,13 +964,13 @@ testMsgExpireOnSend t = Nothing -> return () Just _ -> error "nothing else should be delivered" -testMsgExpireOnInterval :: forall c. Transport c => TProxy c -> Spec -testMsgExpireOnInterval t = +testMsgExpireOnInterval :: SpecWith (ATransport, AMSType) +testMsgExpireOnInterval = -- fails on ubuntu - xit' "should expire messages that are not received before messageTTL after expiry interval" $ do + xit' "should expire messages that are not received before messageTTL after expiry interval" $ \(ATransport (t :: TProxy c), msType) -> do g <- C.newRandom (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g - let cfg' = cfg {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}} + let cfg' = (cfgMS msType) {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}} withSmpServerConfigOn (ATransport t) cfg' testPort $ \_ -> testSMPClient @c $ \sh -> do (sId, rId, rKey, _) <- testSMPClient @c $ \rh -> createAndSecureQueue rh sPub @@ -982,12 +984,12 @@ testMsgExpireOnInterval t = Nothing -> return () Just _ -> error "nothing should be delivered" -testMsgNOTExpireOnInterval :: forall c. Transport c => TProxy c -> Spec -testMsgNOTExpireOnInterval t = - it "should NOT expire messages that are not received before messageTTL if expiry interval is large" $ do +testMsgNOTExpireOnInterval :: SpecWith (ATransport, AMSType) +testMsgNOTExpireOnInterval = + it "should NOT expire messages that are not received before messageTTL if expiry interval is large" $ \(ATransport (t :: TProxy c), msType) -> do g <- C.newRandom (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g - let cfg' = cfg {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 10000}} + let cfg' = (cfgMS msType) {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 10000}} withSmpServerConfigOn (ATransport t) cfg' testPort $ \_ -> testSMPClient @c $ \sh -> do (sId, rId, rKey, dhShared) <- testSMPClient @c $ \rh -> createAndSecureQueue rh sPub @@ -1020,8 +1022,8 @@ instance Eq C.ASignature where Just Refl -> s == s' _ -> False -syntaxTests :: ATransport -> Spec -syntaxTests (ATransport t) = do +serverSyntaxTests :: ATransport -> Spec +serverSyntaxTests (ATransport t) = do it "unknown command" $ ("", "abcd", "1234", ('H', 'E', 'L', 'L', 'O')) >#> ("", "abcd", "1234", ERR $ CMD UNKNOWN) describe "NEW" $ do it "no parameters" $ (sampleSig, "bcda", "", NEW_) >#> ("", "bcda", "", ERR $ CMD SYNTAX) diff --git a/tests/Test.hs b/tests/Test.hs index 6ae4edb41..f8505b133 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -25,8 +25,9 @@ import NtfServerTests (ntfServerTests) import RemoteControl (remoteControlTests) import SMPProxyTests (smpProxyTests) import ServerTests +import Simplex.Messaging.Server.MsgStore.Types (AMSType (..), SMSType (..)) import Simplex.Messaging.Transport (TLS, Transport (..)) -import Simplex.Messaging.Transport.WebSockets (WS) +-- import Simplex.Messaging.Transport.WebSockets (WS) import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive) import System.Environment (setEnv) import Test.Hspec @@ -60,8 +61,14 @@ main = do describe "Store log tests" storeLogTests describe "TRcvQueues tests" tRcvQueuesTests describe "Util tests" utilTests - describe "SMP server via TLS" $ serverTests (transport @TLS) - xdescribe "SMP server via WebSockets" $ serverTests (transport @WS) + describe "SMP server via TLS, jornal message store" $ do + describe "SMP syntax" $ serverSyntaxTests (transport @TLS) + before (pure (transport @TLS, AMSType SMSJournal)) serverTests + describe "SMP server via TLS, memory message store" $ + before (pure (transport @TLS, AMSType SMSMemory)) serverTests + -- xdescribe "SMP server via WebSockets" $ do + -- describe "SMP syntax" $ serverSyntaxTests (transport @WS) + -- before (pure (transport @WS, AMSType SMSJournal)) serverTests describe "Notifications server" $ ntfServerTests (transport @TLS) describe "SMP client agent" $ agentTests (transport @TLS) describe "SMP proxy" smpProxyTests