{-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeApplications #-} {-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} {-# OPTIONS_GHC -Wno-orphans #-} module CoreTests.MsgStoreTests where import AgentTests.FunctionalAPITests (runRight, runRight_) import Control.Concurrent.STM import Control.Exception (bracket) import Control.Monad import Control.Monad.IO.Class import Control.Monad.Trans.Except import Crypto.Random (ChaChaDRG) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Base64.URL as B64 import Data.Maybe (fromJust) import Data.Time.Clock.System (getSystemTime) import Simplex.Messaging.Crypto (pattern MaxLenBS) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Protocol (EntityId (..), Message (..), RecipientId, SParty (..), noMsgFlags) import Simplex.Messaging.Server (MessageStats (..), exportMessages, importMessages, printMessageStats) import Simplex.Messaging.Server.Env.STM (journalMsgStoreDepth, readWriteQueueStore) import Simplex.Messaging.Server.MsgStore.Journal import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.StoreLog (closeStoreLog, logCreateQueue) import SMPClient (testStoreLogFile, testStoreMsgsDir, testStoreMsgsDir2, testStoreMsgsFile, testStoreMsgsFile2) import System.Directory (copyFile, createDirectoryIfMissing, listDirectory, removeFile, renameFile) import System.FilePath (()) import System.IO (IOMode (..), hClose, withFile) import Test.Hspec msgStoreTests :: Spec msgStoreTests = do around (withMsgStore testSMTStoreConfig) $ describe "STM message store" someMsgStoreTests around (withMsgStore testJournalStoreCfg) $ describe "Journal message store" $ do someMsgStoreTests it "should export and import journal store" testExportImportStore describe "queue state" $ do it "should restore queue state from the last line" testQueueState it "should recover when message is written and state is not" testMessageState describe "missing files" $ do it "should create read file when missing" testReadFileMissing it "should switch to write file when read file missing" testReadFileMissingSwitch it "should create write file when missing" testWriteFileMissing it "should create read file when read and write files are missing" testReadAndWriteFilesMissing where someMsgStoreTests :: STMQueueStore s => SpecWith s someMsgStoreTests = do it "should get queue and store/read messages" testGetQueue it "should not fail on EOF when changing read journal" testChangeReadJournal withMsgStore :: STMQueueStore s => MsgStoreConfig s -> (s -> IO ()) -> IO () withMsgStore cfg = bracket (newMsgStore cfg) closeMsgStore testSMTStoreConfig :: STMStoreConfig testSMTStoreConfig = STMStoreConfig {storePath = Nothing, quota = 3} testJournalStoreCfg :: JournalStoreConfig testJournalStoreCfg = JournalStoreConfig { storePath = testStoreMsgsDir, pathParts = journalMsgStoreDepth, quota = 3, maxMsgCount = 4, maxStateLines = 2, stateTailSize = 256, idleInterval = 21600 } mkMessage :: MonadIO m => ByteString -> m Message mkMessage body = liftIO $ do g <- C.newRandom msgTs <- getSystemTime msgId <- atomically $ C.randomBytes 24 g pure Message {msgId, msgTs, msgFlags = noMsgFlags, msgBody = C.unsafeMaxLenBS body} pattern Msg :: ByteString -> Maybe Message pattern Msg s <- Just Message {msgBody = MaxLenBS s} deriving instance Eq MsgQueueState deriving instance Eq (JournalState t) deriving instance Eq (SJournalType t) testNewQueueRec :: TVar ChaChaDRG -> Bool -> IO (RecipientId, QueueRec) testNewQueueRec g sndSecure = do rId <- atomically $ EntityId <$> C.randomBytes 24 g senderId <- atomically $ EntityId <$> C.randomBytes 24 g (recipientKey, _) <- atomically $ C.generateAuthKeyPair C.SX25519 g (k, pk) <- atomically $ C.generateKeyPair @'C.X25519 g let qr = QueueRec { recipientId = rId, recipientKey, rcvDhSecret = C.dh' k pk, senderId, senderKey = Nothing, sndSecure, notifier = Nothing, status = EntityActive, updatedAt = Nothing } pure (rId, qr) testGetQueue :: STMQueueStore s => s -> IO () testGetQueue ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True runRight_ $ do q <- ExceptT $ addQueue ms qr let write s = writeMsg ms rId q True =<< mkMessage s Just (Message {msgId = mId1}, True) <- write "message 1" Just (Message {msgId = mId2}, False) <- write "message 2" Just (Message {msgId = mId3}, False) <- write "message 3" Msg "message 1" <- tryPeekMsg ms rId q Msg "message 1" <- tryPeekMsg ms rId q Nothing <- tryDelMsg ms rId q mId2 Msg "message 1" <- tryDelMsg ms rId q mId1 Nothing <- tryDelMsg ms rId q mId1 Msg "message 2" <- tryPeekMsg ms rId q Nothing <- tryDelMsg ms rId q mId1 (Nothing, Msg "message 2") <- tryDelPeekMsg ms rId q mId1 (Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms rId q mId2 (Nothing, Msg "message 3") <- tryDelPeekMsg ms rId q mId2 Msg "message 3" <- tryPeekMsg ms rId q (Msg "message 3", Nothing) <- tryDelPeekMsg ms rId q mId3 Nothing <- tryDelMsg ms rId q mId2 Nothing <- tryDelMsg ms rId q mId3 Nothing <- tryPeekMsg ms rId q Just (Message {msgId = mId4}, True) <- write "message 4" Msg "message 4" <- tryPeekMsg ms rId q Just (Message {msgId = mId5}, False) <- write "message 5" (Nothing, Msg "message 4") <- tryDelPeekMsg ms rId q mId3 (Msg "message 4", Msg "message 5") <- tryDelPeekMsg ms rId q mId4 Just (Message {msgId = mId6}, False) <- write "message 6" Just (Message {msgId = mId7}, False) <- write "message 7" Nothing <- write "message 8" Msg "message 5" <- tryPeekMsg ms rId q (Nothing, Msg "message 5") <- tryDelPeekMsg ms rId q mId4 (Msg "message 5", Msg "message 6") <- tryDelPeekMsg ms rId q mId5 (Msg "message 6", Msg "message 7") <- tryDelPeekMsg ms rId q mId6 (Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg ms rId q mId7 (Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms rId q mId8 (Nothing, Nothing) <- tryDelPeekMsg ms rId q mId8 void $ ExceptT $ deleteQueue ms rId q testChangeReadJournal :: STMQueueStore s => s -> IO () testChangeReadJournal ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True runRight_ $ do q <- ExceptT $ addQueue ms qr let write s = writeMsg ms rId q True =<< mkMessage s Just (Message {msgId = mId1}, True) <- write "message 1" (Msg "message 1", Nothing) <- tryDelPeekMsg ms rId q mId1 Just (Message {msgId = mId2}, True) <- write "message 2" (Msg "message 2", Nothing) <- tryDelPeekMsg ms rId q mId2 Just (Message {msgId = mId3}, True) <- write "message 3" (Msg "message 3", Nothing) <- tryDelPeekMsg ms rId q mId3 Just (Message {msgId = mId4}, True) <- write "message 4" (Msg "message 4", Nothing) <- tryDelPeekMsg ms rId q mId4 Just (Message {msgId = mId5}, True) <- write "message 5" (Msg "message 5", Nothing) <- tryDelPeekMsg ms rId q mId5 void $ ExceptT $ deleteQueue ms rId q testExportImportStore :: JournalMsgStore -> IO () testExportImportStore ms = do g <- C.newRandom (rId1, qr1) <- testNewQueueRec g True (rId2, qr2) <- testNewQueueRec g True sl <- readWriteQueueStore testStoreLogFile ms runRight_ $ do let write rId q s = writeMsg ms rId q True =<< mkMessage s q1 <- ExceptT $ addQueue ms qr1 liftIO $ logCreateQueue sl qr1 Just (Message {}, True) <- write rId1 q1 "message 1" Just (Message {}, False) <- write rId1 q1 "message 2" q2 <- ExceptT $ addQueue ms qr2 liftIO $ logCreateQueue sl qr2 Just (Message {msgId = mId3}, True) <- write rId2 q2 "message 3" Just (Message {msgId = mId4}, False) <- write rId2 q2 "message 4" (Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms rId2 q2 mId3 (Msg "message 4", Nothing) <- tryDelPeekMsg ms rId2 q2 mId4 Just (Message {}, True) <- write rId2 q2 "message 5" Just (Message {}, False) <- write rId2 q2 "message 6" Just (Message {}, False) <- write rId2 q2 "message 7" Nothing <- write rId2 q2 "message 8" pure () length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3 exportMessages False ms testStoreMsgsFile False renameFile testStoreMsgsFile (testStoreMsgsFile <> ".copy") closeMsgStore ms closeStoreLog sl exportMessages False ms testStoreMsgsFile False (B.readFile testStoreMsgsFile `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".copy") let cfg = (testJournalStoreCfg :: JournalStoreConfig) {storePath = testStoreMsgsDir2} ms' <- newMsgStore cfg readWriteQueueStore testStoreLogFile ms' >>= closeStoreLog stats@MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- importMessages False ms' testStoreMsgsFile Nothing printMessageStats "Messages" stats length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 4 -- state file is backed up, 2 message files exportMessages False ms' testStoreMsgsFile2 False (B.readFile testStoreMsgsFile2 `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".bak") stmStore <- newMsgStore testSMTStoreConfig readWriteQueueStore testStoreLogFile stmStore >>= closeStoreLog MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- importMessages False stmStore testStoreMsgsFile2 Nothing exportMessages False stmStore testStoreMsgsFile False (B.sort <$> B.readFile testStoreMsgsFile `shouldReturn`) =<< (B.sort <$> B.readFile (testStoreMsgsFile2 <> ".bak")) testQueueState :: JournalMsgStore -> IO () testQueueState ms = do g <- C.newRandom rId <- EntityId <$> atomically (C.randomBytes 24 g) let dir = msgQueueDirectory ms rId statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId) createDirectoryIfMissing True dir state <- newMsgQueueState <$> newJournalId (random ms) withFile statePath WriteMode (`appendState` state) length . lines <$> readFile statePath `shouldReturn` 1 readQueueState statePath `shouldReturn` state length <$> listDirectory dir `shouldReturn` 1 -- no backup let state1 = state { size = 1, readState = (readState state) {msgCount = 1, byteCount = 100}, writeState = (writeState state) {msgPos = 1, msgCount = 1, bytePos = 100, byteCount = 100} } withFile statePath AppendMode (`appendState` state1) length . lines <$> readFile statePath `shouldReturn` 2 readQueueState statePath `shouldReturn` state1 length <$> listDirectory dir `shouldReturn` 1 -- no backup let state2 = state { size = 2, readState = (readState state) {msgCount = 2, byteCount = 200}, writeState = (writeState state) {msgPos = 2, msgCount = 2, bytePos = 200, byteCount = 200} } withFile statePath AppendMode (`appendState` state2) length . lines <$> readFile statePath `shouldReturn` 3 copyFile statePath (statePath <> ".2") readQueueState statePath `shouldReturn` state2 length <$> listDirectory dir `shouldReturn` 3 -- new state, copy + backup length . lines <$> readFile statePath `shouldReturn` 1 -- corrupt the only line corruptFile statePath newState <- readQueueState statePath newState `shouldBe` newMsgQueueState (journalId $ writeState newState) -- corrupt the last line renameFile (statePath <> ".2") statePath removeOtherFiles dir statePath length . lines <$> readFile statePath `shouldReturn` 3 corruptFile statePath readQueueState statePath `shouldReturn` state1 length <$> listDirectory dir `shouldReturn` 2 length . lines <$> readFile statePath `shouldReturn` 1 where readQueueState statePath = do (state, h) <- readWriteQueueState ms statePath hClose h pure state corruptFile f = do s <- readFile f removeFile f writeFile f $ take (length s - 4) s removeOtherFiles dir keep = do names <- listDirectory dir forM_ names $ \name -> let f = dir name in unless (f == keep) $ removeFile f testMessageState :: JournalMsgStore -> IO () testMessageState ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True let dir = msgQueueDirectory ms rId statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId) write q s = writeMsg ms rId q True =<< mkMessage s mId1 <- runRight $ do q <- ExceptT $ addQueue ms qr Just (Message {msgId = mId1}, True) <- write q "message 1" Just (Message {}, False) <- write q "message 2" liftIO $ closeMsgQueue q pure mId1 ls <- B.lines <$> B.readFile statePath B.writeFile statePath $ B.unlines $ take (length ls - 1) ls runRight_ $ do q <- ExceptT $ getQueue ms SRecipient rId Just (Message {msgId = mId3}, False) <- write q "message 3" (Msg "message 1", Msg "message 3") <- tryDelPeekMsg ms rId q mId1 (Msg "message 3", Nothing) <- tryDelPeekMsg ms rId q mId3 liftIO $ closeMsgQueue q testReadFileMissing :: JournalMsgStore -> IO () testReadFileMissing ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True let write q s = writeMsg ms rId q True =<< mkMessage s q <- runRight $ do q <- ExceptT $ addQueue ms qr Just (Message {}, True) <- write q "message 1" Msg "message 1" <- tryPeekMsg ms rId q pure q mq <- fromJust <$> readTVarIO (msgQueue_' q) MsgQueueState {readState = rs} <- readTVarIO $ state mq closeMsgStore ms let path = journalFilePath (queueDirectory $ queue mq) $ journalId rs removeFile path runRight_ $ do q' <- ExceptT $ getQueue ms SRecipient rId Nothing <- tryPeekMsg ms rId q' Just (Message {}, True) <- write q' "message 2" Msg "message 2" <- tryPeekMsg ms rId q' pure () testReadFileMissingSwitch :: JournalMsgStore -> IO () testReadFileMissingSwitch ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True q <- writeMessages ms rId qr mq <- fromJust <$> readTVarIO (msgQueue_' q) MsgQueueState {readState = rs} <- readTVarIO $ state mq closeMsgStore ms let path = journalFilePath (queueDirectory $ queue mq) $ journalId rs removeFile path runRight_ $ do q' <- ExceptT $ getQueue ms SRecipient rId Just (Message {}, False) <- writeMsg ms rId q' True =<< mkMessage "message 6" Msg "message 5" <- tryPeekMsg ms rId q' pure () testWriteFileMissing :: JournalMsgStore -> IO () testWriteFileMissing ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True q <- writeMessages ms rId qr mq <- fromJust <$> readTVarIO (msgQueue_' q) MsgQueueState {writeState = ws} <- readTVarIO $ state mq closeMsgStore ms let path = journalFilePath (queueDirectory $ queue mq) $ journalId ws print path removeFile path runRight_ $ do q' <- ExceptT $ getQueue ms SRecipient rId Just Message {msgId = mId3} <- tryPeekMsg ms rId q' (Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms rId q' mId3 Just Message {msgId = mId4} <- tryPeekMsg ms rId q' (Msg "message 4", Nothing) <- tryDelPeekMsg ms rId q' mId4 Just (Message {}, True) <- writeMsg ms rId q' True =<< mkMessage "message 6" Msg "message 6" <- tryPeekMsg ms rId q' pure () testReadAndWriteFilesMissing :: JournalMsgStore -> IO () testReadAndWriteFilesMissing ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True q <- writeMessages ms rId qr mq <- fromJust <$> readTVarIO (msgQueue_' q) MsgQueueState {readState = rs, writeState = ws} <- readTVarIO $ state mq closeMsgStore ms removeFile $ journalFilePath (queueDirectory $ queue mq) $ journalId rs removeFile $ journalFilePath (queueDirectory $ queue mq) $ journalId ws runRight_ $ do q' <- ExceptT $ getQueue ms SRecipient rId Nothing <- tryPeekMsg ms rId q' Just (Message {}, True) <- writeMsg ms rId q' True =<< mkMessage "message 6" Msg "message 6" <- tryPeekMsg ms rId q' pure () writeMessages :: JournalMsgStore -> RecipientId -> QueueRec -> IO JournalQueue writeMessages ms rId qr = runRight $ do q <- ExceptT $ addQueue ms qr let write s = writeMsg ms rId q True =<< mkMessage s Just (Message {msgId = mId1}, True) <- write "message 1" Just (Message {msgId = mId2}, False) <- write "message 2" Just (Message {}, False) <- write "message 3" (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms rId q mId1 (Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms rId q mId2 Just (Message {}, False) <- write "message 4" Just (Message {}, False) <- write "message 5" pure q